From b18af49ac8e13284b6d059c5ea7267e330f20906 Mon Sep 17 00:00:00 2001 From: Nils Pukropp Date: Thu, 2 Apr 2026 18:46:17 +0200 Subject: [PATCH] graceful shutdown --- Cargo.lock | 3 +- Cargo.toml | 2 +- src/daemon.rs | 123 +++++++++++++++++++++++++++++++++++------ src/error.rs | 23 ++++++++ src/modules/network.rs | 118 ++++++++++++++++++++++++--------------- 5 files changed, 206 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 20e1618..8c287d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -353,7 +353,7 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" [[package]] name = "fluxo-rs" -version = "0.3.1" +version = "0.3.2" dependencies = [ "anyhow", "bluer", @@ -1270,6 +1270,7 @@ dependencies = [ "futures-core", "futures-sink", "pin-project-lite", + "slab", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 7579887..64129f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ tracing-subscriber = { version = "0.3.23", features = ["env-filter"] } maestro = { git = "https://github.com/qzed/pbpctrl", package = "maestro" } bluer = { version = "0.17", features = ["bluetoothd", "rfcomm", "id"] } tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "time", "macros", "signal", "process"] } -tokio-util = { version = "0.7", features = ["codec"] } +tokio-util = { version = "0.7", features = ["codec", "time"] } futures = "0.3" libpulse-binding = "2.30" nix = { version = "0.31", features = ["net"] } diff --git a/src/daemon.rs b/src/daemon.rs index fdac9ec..dfde0e9 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -16,6 +16,7 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixListener; use tokio::sync::{RwLock, watch}; use tokio::time::{Duration, Instant, sleep}; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; struct SocketGuard { @@ -56,7 +57,7 @@ pub async fn run_daemon(config_path: Option) -> Result<()> { disks: disks_rx, bluetooth: bt_rx, audio: audio_rx, - health, + health: Arc::clone(&health), }; let listener = UnixListener::bind(&sock_path)?; @@ -64,62 +65,99 @@ pub async fn run_daemon(config_path: Option) -> Result<()> { path: sock_path.clone(), }; - // Signal handling for graceful shutdown in async context - let running = Arc::new(tokio::sync::Notify::new()); - let r_clone = Arc::clone(&running); + // Signal handling for graceful shutdown + let cancel_token = CancellationToken::new(); + let token_clone = cancel_token.clone(); tokio::spawn(async move { tokio::signal::ctrl_c().await.unwrap(); info!("Received shutdown signal, exiting..."); - r_clone.notify_waiters(); + token_clone.cancel(); }); let config_path_clone = config_path.clone(); let config = Arc::new(RwLock::new(crate::config::load_config(config_path))); // 1. Network Task + let token = cancel_token.clone(); + let net_health = Arc::clone(&health); tokio::spawn(async move { info!("Starting Network polling task"); let mut daemon = NetworkDaemon::new(); loop { - daemon.poll(&net_tx).await; - sleep(Duration::from_secs(1)).await; + tokio::select! { + _ = token.cancelled() => break, + _ = sleep(Duration::from_secs(1)) => { + if !is_in_backoff("net", &net_health).await { + let res = daemon.poll(&net_tx).await; + handle_poll_result("net", res, &net_health).await; + } + } + } } + info!("Network task shut down."); }); // 2. Fast Hardware Task (CPU, Mem, Load) + let token = cancel_token.clone(); + let hw_health = Arc::clone(&health); tokio::spawn(async move { info!("Starting Fast Hardware polling task"); let mut daemon = HardwareDaemon::new(); loop { - daemon.poll_fast(&cpu_tx, &mem_tx, &sys_tx).await; - sleep(Duration::from_secs(1)).await; + tokio::select! { + _ = token.cancelled() => break, + _ = sleep(Duration::from_secs(1)) => { + if !is_in_backoff("cpu", &hw_health).await { + daemon.poll_fast(&cpu_tx, &mem_tx, &sys_tx).await; + } + } + } } + info!("Fast Hardware task shut down."); }); // 3. Slow Hardware Task (GPU, Disks) + let token = cancel_token.clone(); + let slow_health = Arc::clone(&health); tokio::spawn(async move { info!("Starting Slow Hardware polling task"); let mut daemon = HardwareDaemon::new(); loop { - daemon.poll_slow(&gpu_tx, &disks_tx).await; - sleep(Duration::from_secs(1)).await; + tokio::select! { + _ = token.cancelled() => break, + _ = sleep(Duration::from_secs(5)) => { + if !is_in_backoff("gpu", &slow_health).await { + daemon.poll_slow(&gpu_tx, &disks_tx).await; + } + } + } } + info!("Slow Hardware task shut down."); }); // 4. Bluetooth Task + let token = cancel_token.clone(); + let bt_health = Arc::clone(&health); let poll_config = Arc::clone(&config); let poll_receivers = receivers.clone(); tokio::spawn(async move { info!("Starting Bluetooth polling task"); let mut daemon = BtDaemon::new(); loop { - let config = poll_config.read().await; - daemon.poll(&bt_tx, &poll_receivers, &config).await; - sleep(Duration::from_secs(1)).await; + tokio::select! { + _ = token.cancelled() => break, + _ = sleep(Duration::from_secs(2)) => { + if !is_in_backoff("bt", &bt_health).await { + let config = poll_config.read().await; + daemon.poll(&bt_tx, &poll_receivers, &config).await; + } + } + } } + info!("Bluetooth task shut down."); }); - // 5. Audio Thread (Event driven - pulse usually needs its own thread) + // 5. Audio Thread (Event driven) let audio_daemon = AudioDaemon::new(); audio_daemon.start(&audio_tx); @@ -127,7 +165,7 @@ pub async fn run_daemon(config_path: Option) -> Result<()> { loop { tokio::select! { - _ = running.notified() => { + _ = cancel_token.cancelled() => { break; } res = listener.accept() => { @@ -190,6 +228,59 @@ pub async fn run_daemon(config_path: Option) -> Result<()> { Ok(()) } +async fn handle_poll_result( + module_name: &str, + result: crate::error::Result<()>, + health_lock: &Arc>>, +) { + let mut lock = health_lock.write().await; + let health = lock.entry(module_name.to_string()).or_default(); + + match result { + Ok(_) => { + if health.consecutive_failures > 0 { + info!( + module = module_name, + "Module recovered after {} failures", health.consecutive_failures + ); + } + health.consecutive_failures = 0; + health.backoff_until = None; + } + Err(e) => { + health.consecutive_failures += 1; + health.last_failure = Some(Instant::now()); + + if !e.is_transient() { + // Fatal errors trigger immediate long backoff + health.backoff_until = Some(Instant::now() + Duration::from_secs(60)); + error!(module = module_name, error = %e, "Fatal module error, entering long cooldown"); + } else if health.consecutive_failures >= 3 { + // Exponential backoff for transient errors: 30s, 60s, 120s... + let backoff_secs = 30 * (2u64.pow(health.consecutive_failures.saturating_sub(3))); + let backoff_secs = backoff_secs.min(3600); // Cap at 1 hour + health.backoff_until = Some(Instant::now() + Duration::from_secs(backoff_secs)); + warn!(module = module_name, error = %e, backoff = backoff_secs, "Repeated transient failures, entering backoff"); + } else { + debug!(module = module_name, error = %e, "Transient module failure (attempt {})", health.consecutive_failures); + } + } + } +} + +async fn is_in_backoff( + module_name: &str, + health_lock: &Arc>>, +) -> bool { + let lock = health_lock.read().await; + if let Some(health) = lock.get(module_name) + && let Some(until) = health.backoff_until + { + return Instant::now() < until; + } + false +} + async fn handle_request( module_name: &str, args: &[&str], diff --git a/src/error.rs b/src/error.rs index 1496ac4..a8c669c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -18,6 +18,15 @@ pub enum FluxoError { #[error("External system error: {0}")] System(String), + #[error("Bluetooth error: {0}")] + Bluetooth(String), + + #[error("Network error: {0}")] + Network(String), + + #[error("Hardware error: {0}")] + Hardware(String), + #[error("IO error: {0}")] Io(#[from] std::io::Error), @@ -28,4 +37,18 @@ pub enum FluxoError { Other(#[from] anyhow::Error), } +impl FluxoError { + pub fn is_transient(&self) -> bool { + matches!( + self, + Self::Io(_) + | Self::System(_) + | Self::Bluetooth(_) + | Self::Network(_) + | Self::Hardware(_) + | Self::Module { .. } + ) + } +} + pub type Result = std::result::Result; diff --git a/src/modules/network.rs b/src/modules/network.rs index 2cc443f..10e0809 100644 --- a/src/modules/network.rs +++ b/src/modules/network.rs @@ -19,6 +19,8 @@ pub struct NetworkDaemon { cached_ip: Option, } +type PollResult = crate::error::Result<(String, Option, Option<(u64, u64)>)>; + impl NetworkDaemon { pub fn new() -> Self { Self { @@ -30,18 +32,21 @@ impl NetworkDaemon { } } - pub async fn poll(&mut self, state_tx: &watch::Sender) { - let (iface, ip_opt, bytes_opt) = tokio::task::spawn_blocking(|| { - let iface = get_primary_interface().unwrap_or_default(); + pub async fn poll( + &mut self, + state_tx: &watch::Sender, + ) -> crate::error::Result<()> { + let (iface, ip_opt, bytes_opt) = tokio::task::spawn_blocking(|| -> PollResult { + let iface = get_primary_interface()?; if iface.is_empty() { - return (String::new(), None, None); + return Ok((String::new(), None, None)); } let ip = get_ip_address(&iface); let bytes = get_bytes(&iface).ok(); - (iface, ip, bytes) + Ok((iface, ip, bytes)) }) .await - .unwrap_or((String::new(), None, None)); + .map_err(|e| crate::error::FluxoError::System(e.to_string()))??; if !iface.is_empty() { if self.cached_interface.as_ref() != Some(&iface) || self.cached_ip.is_none() { @@ -51,54 +56,77 @@ impl NetworkDaemon { } else { self.cached_interface = None; self.cached_ip = None; + // Provide a default state for "No connection" + let mut network = state_tx.borrow().clone(); + network.interface.clear(); + network.ip.clear(); + network.rx_mbps = 0.0; + network.tx_mbps = 0.0; + let _ = state_tx.send(network); + return Err(crate::error::FluxoError::Network( + "No primary interface found".into(), + )); } - if let Some(ref interface) = self.cached_interface { - let time_now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_secs(); - - if let Some((rx_bytes_now, tx_bytes_now)) = bytes_opt { - if self.last_time > 0 && time_now > self.last_time { - let time_diff = (time_now - self.last_time) as f64; - let rx_mbps = (rx_bytes_now.saturating_sub(self.last_rx_bytes)) as f64 - / time_diff - / 1024.0 - / 1024.0; - let tx_mbps = (tx_bytes_now.saturating_sub(self.last_tx_bytes)) as f64 - / time_diff - / 1024.0 - / 1024.0; - - let mut network = state_tx.borrow().clone(); - network.rx_mbps = rx_mbps; - network.tx_mbps = tx_mbps; - network.interface = interface.clone(); - network.ip = self.cached_ip.clone().unwrap_or_default(); - let _ = state_tx.send(network); - } else { - // First poll: no speed data yet, but update interface/ip - let mut network = state_tx.borrow().clone(); - network.interface = interface.clone(); - network.ip = self.cached_ip.clone().unwrap_or_default(); - let _ = state_tx.send(network); - } - - self.last_time = time_now; - self.last_rx_bytes = rx_bytes_now; - self.last_tx_bytes = tx_bytes_now; - } else { - // Read failed, might be down - self.cached_interface = None; - } + let interface = if let Some(ref interface) = self.cached_interface { + interface.clone() } else { // No interface detected let mut network = state_tx.borrow().clone(); network.interface.clear(); network.ip.clear(); + network.rx_mbps = 0.0; + network.tx_mbps = 0.0; let _ = state_tx.send(network); + return Err(crate::error::FluxoError::Network( + "Interface disappeared during poll".into(), + )); + }; + + let time_now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + if let Some((rx_bytes_now, tx_bytes_now)) = bytes_opt { + if self.last_time > 0 && time_now > self.last_time { + let time_diff = (time_now - self.last_time) as f64; + let rx_mbps = (rx_bytes_now.saturating_sub(self.last_rx_bytes)) as f64 + / time_diff + / 1024.0 + / 1024.0; + let tx_mbps = (tx_bytes_now.saturating_sub(self.last_tx_bytes)) as f64 + / time_diff + / 1024.0 + / 1024.0; + + let mut network = state_tx.borrow().clone(); + network.rx_mbps = rx_mbps; + network.tx_mbps = tx_mbps; + network.interface = interface.clone(); + network.ip = self.cached_ip.clone().unwrap_or_default(); + let _ = state_tx.send(network); + } else { + // First poll: no speed data yet, but update interface/ip + let mut network = state_tx.borrow().clone(); + network.interface = interface.clone(); + network.ip = self.cached_ip.clone().unwrap_or_default(); + let _ = state_tx.send(network); + } + + self.last_time = time_now; + self.last_rx_bytes = rx_bytes_now; + self.last_tx_bytes = tx_bytes_now; + } else { + // Read failed, might be down + self.cached_interface = None; + return Err(crate::error::FluxoError::Network(format!( + "Failed to read bytes for {}", + interface + ))); } + + Ok(()) } }