Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d28e51de6d | |||
| b18af49ac8 |
Generated
+2
-1
@@ -353,7 +353,7 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fluxo-rs"
|
name = "fluxo-rs"
|
||||||
version = "0.3.1"
|
version = "0.3.2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"bluer",
|
"bluer",
|
||||||
@@ -1270,6 +1270,7 @@ dependencies = [
|
|||||||
"futures-core",
|
"futures-core",
|
||||||
"futures-sink",
|
"futures-sink",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
|
"slab",
|
||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
+2
-2
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "fluxo-rs"
|
name = "fluxo-rs"
|
||||||
version = "0.3.2"
|
version = "0.3.3"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
@@ -18,7 +18,7 @@ tracing-subscriber = { version = "0.3.23", features = ["env-filter"] }
|
|||||||
maestro = { git = "https://github.com/qzed/pbpctrl", package = "maestro" }
|
maestro = { git = "https://github.com/qzed/pbpctrl", package = "maestro" }
|
||||||
bluer = { version = "0.17", features = ["bluetoothd", "rfcomm", "id"] }
|
bluer = { version = "0.17", features = ["bluetoothd", "rfcomm", "id"] }
|
||||||
tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "time", "macros", "signal", "process"] }
|
tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "time", "macros", "signal", "process"] }
|
||||||
tokio-util = { version = "0.7", features = ["codec"] }
|
tokio-util = { version = "0.7", features = ["codec", "time"] }
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
libpulse-binding = "2.30"
|
libpulse-binding = "2.30"
|
||||||
nix = { version = "0.31", features = ["net"] }
|
nix = { version = "0.31", features = ["net"] }
|
||||||
|
|||||||
+107
-16
@@ -16,6 +16,7 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
|||||||
use tokio::net::UnixListener;
|
use tokio::net::UnixListener;
|
||||||
use tokio::sync::{RwLock, watch};
|
use tokio::sync::{RwLock, watch};
|
||||||
use tokio::time::{Duration, Instant, sleep};
|
use tokio::time::{Duration, Instant, sleep};
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{debug, error, info, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
|
|
||||||
struct SocketGuard {
|
struct SocketGuard {
|
||||||
@@ -56,7 +57,7 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
|
|||||||
disks: disks_rx,
|
disks: disks_rx,
|
||||||
bluetooth: bt_rx,
|
bluetooth: bt_rx,
|
||||||
audio: audio_rx,
|
audio: audio_rx,
|
||||||
health,
|
health: Arc::clone(&health),
|
||||||
};
|
};
|
||||||
|
|
||||||
let listener = UnixListener::bind(&sock_path)?;
|
let listener = UnixListener::bind(&sock_path)?;
|
||||||
@@ -64,62 +65,99 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
|
|||||||
path: sock_path.clone(),
|
path: sock_path.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Signal handling for graceful shutdown in async context
|
// Signal handling for graceful shutdown
|
||||||
let running = Arc::new(tokio::sync::Notify::new());
|
let cancel_token = CancellationToken::new();
|
||||||
let r_clone = Arc::clone(&running);
|
let token_clone = cancel_token.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
tokio::signal::ctrl_c().await.unwrap();
|
tokio::signal::ctrl_c().await.unwrap();
|
||||||
info!("Received shutdown signal, exiting...");
|
info!("Received shutdown signal, exiting...");
|
||||||
r_clone.notify_waiters();
|
token_clone.cancel();
|
||||||
});
|
});
|
||||||
|
|
||||||
let config_path_clone = config_path.clone();
|
let config_path_clone = config_path.clone();
|
||||||
let config = Arc::new(RwLock::new(crate::config::load_config(config_path)));
|
let config = Arc::new(RwLock::new(crate::config::load_config(config_path)));
|
||||||
|
|
||||||
// 1. Network Task
|
// 1. Network Task
|
||||||
|
let token = cancel_token.clone();
|
||||||
|
let net_health = Arc::clone(&health);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
info!("Starting Network polling task");
|
info!("Starting Network polling task");
|
||||||
let mut daemon = NetworkDaemon::new();
|
let mut daemon = NetworkDaemon::new();
|
||||||
loop {
|
loop {
|
||||||
daemon.poll(&net_tx).await;
|
tokio::select! {
|
||||||
sleep(Duration::from_secs(1)).await;
|
_ = token.cancelled() => break,
|
||||||
|
_ = sleep(Duration::from_secs(1)) => {
|
||||||
|
if !is_in_backoff("net", &net_health).await {
|
||||||
|
let res = daemon.poll(&net_tx).await;
|
||||||
|
handle_poll_result("net", res, &net_health).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
info!("Network task shut down.");
|
||||||
});
|
});
|
||||||
|
|
||||||
// 2. Fast Hardware Task (CPU, Mem, Load)
|
// 2. Fast Hardware Task (CPU, Mem, Load)
|
||||||
|
let token = cancel_token.clone();
|
||||||
|
let hw_health = Arc::clone(&health);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
info!("Starting Fast Hardware polling task");
|
info!("Starting Fast Hardware polling task");
|
||||||
let mut daemon = HardwareDaemon::new();
|
let mut daemon = HardwareDaemon::new();
|
||||||
loop {
|
loop {
|
||||||
daemon.poll_fast(&cpu_tx, &mem_tx, &sys_tx).await;
|
tokio::select! {
|
||||||
sleep(Duration::from_secs(1)).await;
|
_ = token.cancelled() => break,
|
||||||
|
_ = sleep(Duration::from_secs(1)) => {
|
||||||
|
if !is_in_backoff("cpu", &hw_health).await {
|
||||||
|
daemon.poll_fast(&cpu_tx, &mem_tx, &sys_tx).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
info!("Fast Hardware task shut down.");
|
||||||
});
|
});
|
||||||
|
|
||||||
// 3. Slow Hardware Task (GPU, Disks)
|
// 3. Slow Hardware Task (GPU, Disks)
|
||||||
|
let token = cancel_token.clone();
|
||||||
|
let slow_health = Arc::clone(&health);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
info!("Starting Slow Hardware polling task");
|
info!("Starting Slow Hardware polling task");
|
||||||
let mut daemon = HardwareDaemon::new();
|
let mut daemon = HardwareDaemon::new();
|
||||||
loop {
|
loop {
|
||||||
daemon.poll_slow(&gpu_tx, &disks_tx).await;
|
tokio::select! {
|
||||||
sleep(Duration::from_secs(1)).await;
|
_ = token.cancelled() => break,
|
||||||
|
_ = sleep(Duration::from_secs(5)) => {
|
||||||
|
if !is_in_backoff("gpu", &slow_health).await {
|
||||||
|
daemon.poll_slow(&gpu_tx, &disks_tx).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
info!("Slow Hardware task shut down.");
|
||||||
});
|
});
|
||||||
|
|
||||||
// 4. Bluetooth Task
|
// 4. Bluetooth Task
|
||||||
|
let token = cancel_token.clone();
|
||||||
|
let bt_health = Arc::clone(&health);
|
||||||
let poll_config = Arc::clone(&config);
|
let poll_config = Arc::clone(&config);
|
||||||
let poll_receivers = receivers.clone();
|
let poll_receivers = receivers.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
info!("Starting Bluetooth polling task");
|
info!("Starting Bluetooth polling task");
|
||||||
let mut daemon = BtDaemon::new();
|
let mut daemon = BtDaemon::new();
|
||||||
loop {
|
loop {
|
||||||
let config = poll_config.read().await;
|
tokio::select! {
|
||||||
daemon.poll(&bt_tx, &poll_receivers, &config).await;
|
_ = token.cancelled() => break,
|
||||||
sleep(Duration::from_secs(1)).await;
|
_ = sleep(Duration::from_secs(2)) => {
|
||||||
|
if !is_in_backoff("bt", &bt_health).await {
|
||||||
|
let config = poll_config.read().await;
|
||||||
|
daemon.poll(&bt_tx, &poll_receivers, &config).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
info!("Bluetooth task shut down.");
|
||||||
});
|
});
|
||||||
|
|
||||||
// 5. Audio Thread (Event driven - pulse usually needs its own thread)
|
// 5. Audio Thread (Event driven)
|
||||||
let audio_daemon = AudioDaemon::new();
|
let audio_daemon = AudioDaemon::new();
|
||||||
audio_daemon.start(&audio_tx);
|
audio_daemon.start(&audio_tx);
|
||||||
|
|
||||||
@@ -127,7 +165,7 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
|
|||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = running.notified() => {
|
_ = cancel_token.cancelled() => {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
res = listener.accept() => {
|
res = listener.accept() => {
|
||||||
@@ -190,6 +228,59 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_poll_result(
|
||||||
|
module_name: &str,
|
||||||
|
result: crate::error::Result<()>,
|
||||||
|
health_lock: &Arc<RwLock<HashMap<String, crate::state::ModuleHealth>>>,
|
||||||
|
) {
|
||||||
|
let mut lock = health_lock.write().await;
|
||||||
|
let health = lock.entry(module_name.to_string()).or_default();
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(_) => {
|
||||||
|
if health.consecutive_failures > 0 {
|
||||||
|
info!(
|
||||||
|
module = module_name,
|
||||||
|
"Module recovered after {} failures", health.consecutive_failures
|
||||||
|
);
|
||||||
|
}
|
||||||
|
health.consecutive_failures = 0;
|
||||||
|
health.backoff_until = None;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
health.consecutive_failures += 1;
|
||||||
|
health.last_failure = Some(Instant::now());
|
||||||
|
|
||||||
|
if !e.is_transient() {
|
||||||
|
// Fatal errors trigger immediate long backoff
|
||||||
|
health.backoff_until = Some(Instant::now() + Duration::from_secs(60));
|
||||||
|
error!(module = module_name, error = %e, "Fatal module error, entering long cooldown");
|
||||||
|
} else if health.consecutive_failures >= 3 {
|
||||||
|
// Exponential backoff for transient errors: 30s, 60s, 120s...
|
||||||
|
let backoff_secs = 30 * (2u64.pow(health.consecutive_failures.saturating_sub(3)));
|
||||||
|
let backoff_secs = backoff_secs.min(3600); // Cap at 1 hour
|
||||||
|
health.backoff_until = Some(Instant::now() + Duration::from_secs(backoff_secs));
|
||||||
|
warn!(module = module_name, error = %e, backoff = backoff_secs, "Repeated transient failures, entering backoff");
|
||||||
|
} else {
|
||||||
|
debug!(module = module_name, error = %e, "Transient module failure (attempt {})", health.consecutive_failures);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn is_in_backoff(
|
||||||
|
module_name: &str,
|
||||||
|
health_lock: &Arc<RwLock<HashMap<String, crate::state::ModuleHealth>>>,
|
||||||
|
) -> bool {
|
||||||
|
let lock = health_lock.read().await;
|
||||||
|
if let Some(health) = lock.get(module_name)
|
||||||
|
&& let Some(until) = health.backoff_until
|
||||||
|
{
|
||||||
|
return Instant::now() < until;
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_request(
|
async fn handle_request(
|
||||||
module_name: &str,
|
module_name: &str,
|
||||||
args: &[&str],
|
args: &[&str],
|
||||||
|
|||||||
@@ -18,6 +18,15 @@ pub enum FluxoError {
|
|||||||
#[error("External system error: {0}")]
|
#[error("External system error: {0}")]
|
||||||
System(String),
|
System(String),
|
||||||
|
|
||||||
|
#[error("Bluetooth error: {0}")]
|
||||||
|
Bluetooth(String),
|
||||||
|
|
||||||
|
#[error("Network error: {0}")]
|
||||||
|
Network(String),
|
||||||
|
|
||||||
|
#[error("Hardware error: {0}")]
|
||||||
|
Hardware(String),
|
||||||
|
|
||||||
#[error("IO error: {0}")]
|
#[error("IO error: {0}")]
|
||||||
Io(#[from] std::io::Error),
|
Io(#[from] std::io::Error),
|
||||||
|
|
||||||
@@ -28,4 +37,18 @@ pub enum FluxoError {
|
|||||||
Other(#[from] anyhow::Error),
|
Other(#[from] anyhow::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl FluxoError {
|
||||||
|
pub fn is_transient(&self) -> bool {
|
||||||
|
matches!(
|
||||||
|
self,
|
||||||
|
Self::Io(_)
|
||||||
|
| Self::System(_)
|
||||||
|
| Self::Bluetooth(_)
|
||||||
|
| Self::Network(_)
|
||||||
|
| Self::Hardware(_)
|
||||||
|
| Self::Module { .. }
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, FluxoError>;
|
pub type Result<T> = std::result::Result<T, FluxoError>;
|
||||||
|
|||||||
+73
-45
@@ -19,6 +19,8 @@ pub struct NetworkDaemon {
|
|||||||
cached_ip: Option<String>,
|
cached_ip: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PollResult = crate::error::Result<(String, Option<String>, Option<(u64, u64)>)>;
|
||||||
|
|
||||||
impl NetworkDaemon {
|
impl NetworkDaemon {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
@@ -30,18 +32,21 @@ impl NetworkDaemon {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn poll(&mut self, state_tx: &watch::Sender<NetworkState>) {
|
pub async fn poll(
|
||||||
let (iface, ip_opt, bytes_opt) = tokio::task::spawn_blocking(|| {
|
&mut self,
|
||||||
let iface = get_primary_interface().unwrap_or_default();
|
state_tx: &watch::Sender<NetworkState>,
|
||||||
|
) -> crate::error::Result<()> {
|
||||||
|
let (iface, ip_opt, bytes_opt) = tokio::task::spawn_blocking(|| -> PollResult {
|
||||||
|
let iface = get_primary_interface()?;
|
||||||
if iface.is_empty() {
|
if iface.is_empty() {
|
||||||
return (String::new(), None, None);
|
return Ok((String::new(), None, None));
|
||||||
}
|
}
|
||||||
let ip = get_ip_address(&iface);
|
let ip = get_ip_address(&iface);
|
||||||
let bytes = get_bytes(&iface).ok();
|
let bytes = get_bytes(&iface).ok();
|
||||||
(iface, ip, bytes)
|
Ok((iface, ip, bytes))
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap_or((String::new(), None, None));
|
.map_err(|e| crate::error::FluxoError::System(e.to_string()))??;
|
||||||
|
|
||||||
if !iface.is_empty() {
|
if !iface.is_empty() {
|
||||||
if self.cached_interface.as_ref() != Some(&iface) || self.cached_ip.is_none() {
|
if self.cached_interface.as_ref() != Some(&iface) || self.cached_ip.is_none() {
|
||||||
@@ -51,54 +56,77 @@ impl NetworkDaemon {
|
|||||||
} else {
|
} else {
|
||||||
self.cached_interface = None;
|
self.cached_interface = None;
|
||||||
self.cached_ip = None;
|
self.cached_ip = None;
|
||||||
|
// Provide a default state for "No connection"
|
||||||
|
let mut network = state_tx.borrow().clone();
|
||||||
|
network.interface.clear();
|
||||||
|
network.ip.clear();
|
||||||
|
network.rx_mbps = 0.0;
|
||||||
|
network.tx_mbps = 0.0;
|
||||||
|
let _ = state_tx.send(network);
|
||||||
|
return Err(crate::error::FluxoError::Network(
|
||||||
|
"No primary interface found".into(),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(ref interface) = self.cached_interface {
|
let interface = if let Some(ref interface) = self.cached_interface {
|
||||||
let time_now = SystemTime::now()
|
interface.clone()
|
||||||
.duration_since(UNIX_EPOCH)
|
|
||||||
.unwrap_or_default()
|
|
||||||
.as_secs();
|
|
||||||
|
|
||||||
if let Some((rx_bytes_now, tx_bytes_now)) = bytes_opt {
|
|
||||||
if self.last_time > 0 && time_now > self.last_time {
|
|
||||||
let time_diff = (time_now - self.last_time) as f64;
|
|
||||||
let rx_mbps = (rx_bytes_now.saturating_sub(self.last_rx_bytes)) as f64
|
|
||||||
/ time_diff
|
|
||||||
/ 1024.0
|
|
||||||
/ 1024.0;
|
|
||||||
let tx_mbps = (tx_bytes_now.saturating_sub(self.last_tx_bytes)) as f64
|
|
||||||
/ time_diff
|
|
||||||
/ 1024.0
|
|
||||||
/ 1024.0;
|
|
||||||
|
|
||||||
let mut network = state_tx.borrow().clone();
|
|
||||||
network.rx_mbps = rx_mbps;
|
|
||||||
network.tx_mbps = tx_mbps;
|
|
||||||
network.interface = interface.clone();
|
|
||||||
network.ip = self.cached_ip.clone().unwrap_or_default();
|
|
||||||
let _ = state_tx.send(network);
|
|
||||||
} else {
|
|
||||||
// First poll: no speed data yet, but update interface/ip
|
|
||||||
let mut network = state_tx.borrow().clone();
|
|
||||||
network.interface = interface.clone();
|
|
||||||
network.ip = self.cached_ip.clone().unwrap_or_default();
|
|
||||||
let _ = state_tx.send(network);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.last_time = time_now;
|
|
||||||
self.last_rx_bytes = rx_bytes_now;
|
|
||||||
self.last_tx_bytes = tx_bytes_now;
|
|
||||||
} else {
|
|
||||||
// Read failed, might be down
|
|
||||||
self.cached_interface = None;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// No interface detected
|
// No interface detected
|
||||||
let mut network = state_tx.borrow().clone();
|
let mut network = state_tx.borrow().clone();
|
||||||
network.interface.clear();
|
network.interface.clear();
|
||||||
network.ip.clear();
|
network.ip.clear();
|
||||||
|
network.rx_mbps = 0.0;
|
||||||
|
network.tx_mbps = 0.0;
|
||||||
let _ = state_tx.send(network);
|
let _ = state_tx.send(network);
|
||||||
|
return Err(crate::error::FluxoError::Network(
|
||||||
|
"Interface disappeared during poll".into(),
|
||||||
|
));
|
||||||
|
};
|
||||||
|
|
||||||
|
let time_now = SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.as_secs();
|
||||||
|
|
||||||
|
if let Some((rx_bytes_now, tx_bytes_now)) = bytes_opt {
|
||||||
|
if self.last_time > 0 && time_now > self.last_time {
|
||||||
|
let time_diff = (time_now - self.last_time) as f64;
|
||||||
|
let rx_mbps = (rx_bytes_now.saturating_sub(self.last_rx_bytes)) as f64
|
||||||
|
/ time_diff
|
||||||
|
/ 1024.0
|
||||||
|
/ 1024.0;
|
||||||
|
let tx_mbps = (tx_bytes_now.saturating_sub(self.last_tx_bytes)) as f64
|
||||||
|
/ time_diff
|
||||||
|
/ 1024.0
|
||||||
|
/ 1024.0;
|
||||||
|
|
||||||
|
let mut network = state_tx.borrow().clone();
|
||||||
|
network.rx_mbps = rx_mbps;
|
||||||
|
network.tx_mbps = tx_mbps;
|
||||||
|
network.interface = interface.clone();
|
||||||
|
network.ip = self.cached_ip.clone().unwrap_or_default();
|
||||||
|
let _ = state_tx.send(network);
|
||||||
|
} else {
|
||||||
|
// First poll: no speed data yet, but update interface/ip
|
||||||
|
let mut network = state_tx.borrow().clone();
|
||||||
|
network.interface = interface.clone();
|
||||||
|
network.ip = self.cached_ip.clone().unwrap_or_default();
|
||||||
|
let _ = state_tx.send(network);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.last_time = time_now;
|
||||||
|
self.last_rx_bytes = rx_bytes_now;
|
||||||
|
self.last_tx_bytes = tx_bytes_now;
|
||||||
|
} else {
|
||||||
|
// Read failed, might be down
|
||||||
|
self.cached_interface = None;
|
||||||
|
return Err(crate::error::FluxoError::Network(format!(
|
||||||
|
"Failed to read bytes for {}",
|
||||||
|
interface
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user