diff --git a/Cargo.lock b/Cargo.lock index 7bc0971..f8ead7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -362,12 +362,13 @@ dependencies = [ "futures", "libpulse-binding", "maestro", - "nix 0.29.0", + "nix 0.31.2", "regex", "serde", "serde_json", "sysinfo", "tempfile", + "thiserror", "tokio", "tokio-util", "toml", @@ -584,9 +585,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "libc" -version = "0.2.183" +version = "0.2.184" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" +checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af" [[package]] name = "libdbus-sys" @@ -710,7 +711,6 @@ dependencies = [ "cfg-if", "cfg_aliases", "libc", - "memoffset", ] [[package]] @@ -723,6 +723,7 @@ dependencies = [ "cfg-if", "cfg_aliases", "libc", + "memoffset", ] [[package]] @@ -1067,9 +1068,9 @@ dependencies = [ [[package]] name = "serde_spanned" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "876ac351060d4f882bb1032b6369eb0aef79ad9df1ea8bc404874d8cc3d0cd98" +checksum = "6662b5879511e06e8999a8a235d848113e942c9124f211511b16466ee2995f26" dependencies = [ "serde_core", ] @@ -1083,6 +1084,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "slab" version = "0.4.12" @@ -1182,6 +1193,26 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "thiserror" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.9" @@ -1201,6 +1232,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", @@ -1243,9 +1275,9 @@ dependencies = [ [[package]] name = "toml" -version = "1.1.0+spec-1.1.0" +version = "1.1.1+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8195ca05e4eb728f4ba94f3e3291661320af739c4e43779cbdfae82ab239fcc" +checksum = "994b95d9e7bae62b34bab0e2a4510b801fa466066a6a8b2b57361fa1eba068ee" dependencies = [ "indexmap", "serde_core", @@ -1258,9 +1290,9 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "1.1.0+spec-1.1.0" +version = "1.1.1+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97251a7c317e03ad83774a8752a7e81fb6067740609f75ea2b585b569a59198f" +checksum = "3165f65f62e28e0115a00b2ebdd37eb6f3b641855f9d636d3cd4103767159ad7" dependencies = [ "serde_core", ] @@ -1279,18 +1311,18 @@ dependencies = [ [[package]] name = "toml_parser" -version = "1.1.0+spec-1.1.0" +version = "1.1.1+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2334f11ee363607eb04df9b8fc8a13ca1715a72ba8662a26ac285c98aabb4011" +checksum = "39ca317ebc49f06bd748bfba29533eac9485569dc9bf80b849024b025e814fb9" dependencies = [ "winnow", ] [[package]] name = "toml_writer" -version = "1.1.0+spec-1.1.0" +version = "1.1.1+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d282ade6016312faf3e41e57ebbba0c073e4056dab1232ab1cb624199648f8ed" +checksum = "756daf9b1013ebe47a8776667b466417e2d4c5679d441c26230efd9ef78692db" [[package]] name = "tracing" diff --git a/Cargo.toml b/Cargo.toml index 0649e06..c063d4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,16 +11,17 @@ regex = "1.12" serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.149" sysinfo = "0.38.4" -toml = "1.0.6" +thiserror = "2.0" +toml = "1.1.1" tracing = "0.1.44" tracing-subscriber = { version = "0.3.23", features = ["env-filter"] } maestro = { git = "https://github.com/qzed/pbpctrl", package = "maestro" } bluer = { version = "0.17", features = ["bluetoothd", "rfcomm", "id"] } -tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "time", "macros"] } +tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "time", "macros", "signal"] } tokio-util = { version = "0.7", features = ["codec"] } futures = "0.3" -libpulse-binding = "2.26" -nix = { version = "0.29", features = ["net"] } +libpulse-binding = "2.30" +nix = { version = "0.31", features = ["net"] } [dev-dependencies] diff --git a/src/daemon.rs b/src/daemon.rs index 64957e0..3a4195f 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1,4 +1,5 @@ use crate::config::Config; +use crate::error::FluxoError; use crate::ipc::socket_path; use crate::modules::WaybarModule; use crate::modules::audio::AudioDaemon; @@ -8,14 +9,12 @@ use crate::modules::network::NetworkDaemon; use crate::state::{AppState, SharedState}; use anyhow::Result; use std::fs; -use std::io::{BufRead, BufReader, Write}; -use std::net::Shutdown; -use std::os::unix::net::UnixListener; use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; -use std::thread; -use std::time::Duration; +use std::sync::Arc; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::UnixListener; +use tokio::sync::RwLock; +use tokio::time::{Duration, Instant, sleep}; use tracing::{debug, error, info, warn}; struct SocketGuard { @@ -29,7 +28,7 @@ impl Drop for SocketGuard { } } -pub fn run_daemon(config_path: Option) -> Result<()> { +pub async fn run_daemon(config_path: Option) -> Result<()> { let sock_path = socket_path(); if fs::metadata(&sock_path).is_ok() { @@ -43,134 +42,126 @@ pub fn run_daemon(config_path: Option) -> Result<()> { path: sock_path.clone(), }; - // Signal handling: set flag so main loop exits cleanly - let running = Arc::new(AtomicBool::new(true)); - let running_clone = Arc::clone(&running); - ctrlc::set_handler(move || { + // Signal handling for graceful shutdown in async context + let running = Arc::new(tokio::sync::Notify::new()); + let r_clone = Arc::clone(&running); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); info!("Received shutdown signal, exiting..."); - running_clone.store(false, Ordering::SeqCst); - })?; + r_clone.notify_waiters(); + }); - // We store the original config_path to allow proper reloading later let config_path_clone = config_path.clone(); let config = Arc::new(RwLock::new(crate::config::load_config(config_path))); - // 1. Network Thread + // 1. Network Task let poll_state = Arc::clone(&state); - let poll_running = Arc::clone(&running); - thread::spawn(move || { - info!("Starting Network polling thread"); + tokio::spawn(async move { + info!("Starting Network polling task"); let mut daemon = NetworkDaemon::new(); - while poll_running.load(Ordering::SeqCst) { - daemon.poll(Arc::clone(&poll_state)); - thread::sleep(Duration::from_secs(1)); + loop { + daemon.poll(Arc::clone(&poll_state)).await; + sleep(Duration::from_secs(1)).await; } }); - // 2. Fast Hardware Thread (CPU, Mem, Load) + // 2. Fast Hardware Task (CPU, Mem, Load) let poll_state = Arc::clone(&state); - let poll_running = Arc::clone(&running); - thread::spawn(move || { - info!("Starting Fast Hardware polling thread"); + tokio::spawn(async move { + info!("Starting Fast Hardware polling task"); let mut daemon = HardwareDaemon::new(); - while poll_running.load(Ordering::SeqCst) { - daemon.poll_fast(Arc::clone(&poll_state)); - thread::sleep(Duration::from_secs(1)); + loop { + daemon.poll_fast(Arc::clone(&poll_state)).await; + sleep(Duration::from_secs(1)).await; } }); - // 3. Slow Hardware Thread (GPU, Disks) + // 3. Slow Hardware Task (GPU, Disks) let poll_state = Arc::clone(&state); - let poll_running = Arc::clone(&running); - thread::spawn(move || { - info!("Starting Slow Hardware polling thread"); + tokio::spawn(async move { + info!("Starting Slow Hardware polling task"); let mut daemon = HardwareDaemon::new(); - while poll_running.load(Ordering::SeqCst) { - daemon.poll_slow(Arc::clone(&poll_state)); - thread::sleep(Duration::from_secs(1)); + loop { + daemon.poll_slow(Arc::clone(&poll_state)).await; + sleep(Duration::from_secs(1)).await; } }); - // 4. Bluetooth Thread + // 4. Bluetooth Task let poll_state = Arc::clone(&state); - let poll_running = Arc::clone(&running); - thread::spawn(move || { - info!("Starting Bluetooth polling thread"); + tokio::spawn(async move { + info!("Starting Bluetooth polling task"); let mut daemon = BtDaemon::new(); - while poll_running.load(Ordering::SeqCst) { - daemon.poll(Arc::clone(&poll_state)); - thread::sleep(Duration::from_secs(1)); + loop { + daemon.poll(Arc::clone(&poll_state)).await; + sleep(Duration::from_secs(1)).await; } }); - // 5. Audio Thread (Event driven) + // 5. Audio Thread (Event driven - pulse usually needs its own thread) let audio_daemon = AudioDaemon::new(); audio_daemon.start(Arc::clone(&state)); info!("Fluxo daemon successfully bound to socket: {}", sock_path); - // Use non-blocking accept so we can check the running flag - listener.set_nonblocking(true)?; - - while running.load(Ordering::SeqCst) { - match listener.accept() { - Ok((mut stream, _)) => { - let state_clone = Arc::clone(&state); - let config_clone = Arc::clone(&config); - let cp_clone = config_path_clone.clone(); - thread::spawn(move || { - let mut reader = BufReader::new(&stream); - let mut request = String::new(); - if let Err(e) = reader.read_line(&mut request) { - error!("Failed to read from IPC stream: {}", e); - return; - } - drop(reader); - - let request = request.trim(); - if request.is_empty() { - return; - } - - let parts: Vec<&str> = request.split_whitespace().collect(); - if let Some(module_name) = parts.first() { - if *module_name == "reload" { - info!("Reloading configuration..."); - let new_config = crate::config::load_config(cp_clone); - if let Ok(mut config_lock) = config_clone.write() { - *config_lock = new_config; - let _ = stream.write_all(b"{\"text\":\"ok\"}"); - info!("Configuration reloaded successfully."); - } else { - error!("Failed to acquire write lock for configuration reload."); - } - let _ = stream.shutdown(Shutdown::Write); - return; - } - - debug!(module = module_name, args = ?&parts[1..], "Handling IPC request"); - let response = - handle_request(module_name, &parts[1..], &state_clone, &config_clone); - if let Err(e) = stream.write_all(response.as_bytes()) { - if e.kind() == std::io::ErrorKind::BrokenPipe - || e.kind() == std::io::ErrorKind::ConnectionReset - { - debug!( - "IPC client disconnected before response could be sent: {}", - e - ); - } else { - error!("Failed to write IPC response: {}", e); - } - } - let _ = stream.shutdown(Shutdown::Write); - } - }); + loop { + tokio::select! { + _ = running.notified() => { + break; } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - thread::sleep(Duration::from_millis(50)); + res = listener.accept() => { + match res { + Ok((mut stream, _)) => { + let state_clone = Arc::clone(&state); + let config_clone = Arc::clone(&config); + let cp_clone = config_path_clone.clone(); + tokio::spawn(async move { + let (reader, mut writer) = stream.split(); + let mut reader = BufReader::new(reader); + let mut request = String::new(); + if let Err(e) = reader.read_line(&mut request).await { + error!("Failed to read from IPC stream: {}", e); + return; + } + + let request = request.trim(); + if request.is_empty() { + return; + } + + let parts: Vec<&str> = request.split_whitespace().collect(); + if let Some(module_name) = parts.first() { + if *module_name == "reload" { + info!("Reloading configuration..."); + let new_config = crate::config::load_config(cp_clone); + let mut config_lock = config_clone.write().await; + *config_lock = new_config; + let _ = writer.write_all(b"{\"text\":\"ok\"}").await; + info!("Configuration reloaded successfully."); + return; + } + + debug!(module = module_name, args = ?&parts[1..], "Handling IPC request"); + let response = + handle_request(module_name, &parts[1..], &state_clone, &config_clone).await; + if let Err(e) = writer.write_all(response.as_bytes()).await { + if e.kind() == std::io::ErrorKind::BrokenPipe + || e.kind() == std::io::ErrorKind::ConnectionReset + { + debug!( + "IPC client disconnected before response could be sent: {}", + e + ); + } else { + error!("Failed to write IPC response: {}", e); + } + } + } + }); + } + Err(e) => error!("Failed to accept incoming connection: {}", e), + } } - Err(e) => error!("Failed to accept incoming connection: {}", e), } } @@ -178,53 +169,127 @@ pub fn run_daemon(config_path: Option) -> Result<()> { Ok(()) } -fn handle_request( +async fn handle_request( module_name: &str, args: &[&str], state: &SharedState, config_lock: &Arc>, ) -> String { - let config = if let Ok(c) = config_lock.read() { - c - } else { - error!("Failed to acquire read lock for configuration."); - return "{\"text\":\"error: config lock failed\"}".to_string(); - }; - - let result = match module_name { - "net" | "network" => crate::modules::network::NetworkModule.run(&config, state, args), - "cpu" => crate::modules::cpu::CpuModule.run(&config, state, args), - "mem" | "memory" => crate::modules::memory::MemoryModule.run(&config, state, args), - "disk" => crate::modules::disk::DiskModule.run(&config, state, args), - "pool" | "btrfs" => crate::modules::btrfs::BtrfsModule.run(&config, state, args), - "vol" => crate::modules::audio::AudioModule.run( - &config, - state, - &["sink", args.first().unwrap_or(&"show")], - ), - "mic" => crate::modules::audio::AudioModule.run( - &config, - state, - &["source", args.first().unwrap_or(&"show")], - ), - "gpu" => crate::modules::gpu::GpuModule.run(&config, state, args), - "sys" => crate::modules::sys::SysModule.run(&config, state, args), - "bt" | "bluetooth" => crate::modules::bt::BtModule.run(&config, state, args), - "power" => crate::modules::power::PowerModule.run(&config, state, args), - "game" => crate::modules::game::GameModule.run(&config, state, args), - _ => { - warn!("Received request for unknown module: '{}'", module_name); - Err(anyhow::anyhow!("Unknown module: {}", module_name)) + // 1. Check Circuit Breaker status + let is_in_backoff = { + let lock = state.read().await; + if let Some(health) = lock.health.get(module_name) { + if let Some(until) = health.backoff_until { + Instant::now() < until + } else { + false + } + } else { + false } }; + if is_in_backoff { + return format!( + "{{\"text\":\"\u{200B}Cooling down ({})\u{200B}\",\"class\":\"error\"}}", + module_name + ); + } + + let config = config_lock.read().await; + + let result = match module_name { + "net" | "network" => { + crate::modules::network::NetworkModule + .run(&config, state, args) + .await + } + "cpu" => { + crate::modules::cpu::CpuModule + .run(&config, state, args) + .await + } + "mem" | "memory" => { + crate::modules::memory::MemoryModule + .run(&config, state, args) + .await + } + "disk" => { + crate::modules::disk::DiskModule + .run(&config, state, args) + .await + } + "pool" | "btrfs" => { + crate::modules::btrfs::BtrfsModule + .run(&config, state, args) + .await + } + "vol" => { + crate::modules::audio::AudioModule + .run(&config, state, &["sink", args.first().unwrap_or(&"show")]) + .await + } + "mic" => { + crate::modules::audio::AudioModule + .run(&config, state, &["source", args.first().unwrap_or(&"show")]) + .await + } + "gpu" => { + crate::modules::gpu::GpuModule + .run(&config, state, args) + .await + } + "sys" => { + crate::modules::sys::SysModule + .run(&config, state, args) + .await + } + "bt" | "bluetooth" => crate::modules::bt::BtModule.run(&config, state, args).await, + "power" => { + crate::modules::power::PowerModule + .run(&config, state, args) + .await + } + "game" => { + crate::modules::game::GameModule + .run(&config, state, args) + .await + } + _ => { + warn!("Received request for unknown module: '{}'", module_name); + Err(FluxoError::Ipc(format!("Unknown module: {}", module_name))) + } + }; + + // 2. Update Health based on result + { + let mut lock = state.write().await; + let health = lock.health.entry(module_name.to_string()).or_default(); + match &result { + Ok(_) => { + health.consecutive_failures = 0; + health.backoff_until = None; + } + Err(e) => { + health.consecutive_failures += 1; + health.last_failure = Some(Instant::now()); + if health.consecutive_failures >= 3 { + // Backoff for 30 seconds after 3 failures + health.backoff_until = Some(Instant::now() + Duration::from_secs(30)); + warn!(module = module_name, error = %e, "Module entered backoff state due to repeated failures"); + } + } + } + } + match result { Ok(output) => serde_json::to_string(&output).unwrap_or_else(|_| "{}".to_string()), Err(e) => { - error!(module = module_name, error = %e, "Module execution failed"); + let error_msg = e.to_string(); + error!(module = module_name, error = %error_msg, "Module execution failed"); let err_out = crate::output::WaybarOutput { text: "\u{200B}Error\u{200B}".to_string(), - tooltip: Some(e.to_string()), + tooltip: Some(error_msg), class: Some("error".to_string()), percentage: None, }; diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..1496ac4 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,31 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +#[allow(dead_code)] +pub enum FluxoError { + #[error("Configuration error: {0}")] + Config(String), + + #[error("Module error ({module}): {message}")] + Module { + module: &'static str, + message: String, + }, + + #[error("Daemon IPC error: {0}")] + Ipc(String), + + #[error("External system error: {0}")] + System(String), + + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("Serialization error: {0}")] + Serialization(#[from] serde_json::Error), + + #[error("Other error: {0}")] + Other(#[from] anyhow::Error), +} + +pub type Result = std::result::Result; diff --git a/src/main.rs b/src/main.rs index 24bd5d4..c1f5dd2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ mod config; mod daemon; +mod error; mod ipc; mod modules; mod output; @@ -88,7 +89,12 @@ fn main() { match &cli.command { Commands::Daemon { config } => { info!("Starting Fluxo daemon..."); - if let Err(e) = daemon::run_daemon(config.clone()) { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + if let Err(e) = rt.block_on(daemon::run_daemon(config.clone())) { error!("Daemon failed: {}", e); process::exit(1); } diff --git a/src/modules/audio.rs b/src/modules/audio.rs index 795525d..85fe256 100644 --- a/src/modules/audio.rs +++ b/src/modules/audio.rs @@ -1,18 +1,25 @@ use crate::config::Config; +use crate::error::{FluxoError, Result}; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::{Result, anyhow}; use libpulse_binding::callbacks::ListResult; use libpulse_binding::context::subscribe::{Facility, InterestMaskSet}; use libpulse_binding::context::{Context, FlagSet as ContextFlag}; use libpulse_binding::mainloop::threaded::Mainloop as ThreadedMainloop; use libpulse_binding::volume::Volume; use std::process::Command; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use tracing::error; +static RUNTIME: LazyLock = LazyLock::new(|| { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create Audio tokio runtime") +}); + pub struct AudioDaemon; impl AudioDaemon { @@ -100,7 +107,7 @@ fn fetch_audio_data_sync(context: &mut Context, state: &SharedState) -> Result<( let st_server = Arc::clone(&state_inner); context.introspect().get_server_info(move |info| { - let mut lock = st_server.write().unwrap(); + let mut lock = RUNTIME.block_on(st_server.write()); lock.audio.sink.name = info .default_sink_name .as_ref() @@ -116,7 +123,7 @@ fn fetch_audio_data_sync(context: &mut Context, state: &SharedState) -> Result<( let st_sink = Arc::clone(&state_inner); context.introspect().get_sink_info_list(move |res| { if let ListResult::Item(item) = res { - let mut lock = st_sink.write().unwrap(); + let mut lock = RUNTIME.block_on(st_sink.write()); // If this matches our default sink name, or if we don't have details for any yet let is_default = item .name @@ -139,7 +146,7 @@ fn fetch_audio_data_sync(context: &mut Context, state: &SharedState) -> Result<( let st_source = Arc::clone(&state_inner); context.introspect().get_source_info_list(move |res| { if let ListResult::Item(item) = res { - let mut lock = st_source.write().unwrap(); + let mut lock = RUNTIME.block_on(st_source.write()); let is_default = item .name .as_ref() @@ -164,7 +171,12 @@ fn fetch_audio_data_sync(context: &mut Context, state: &SharedState) -> Result<( pub struct AudioModule; impl WaybarModule for AudioModule { - fn run(&self, config: &Config, state: &SharedState, args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + state: &SharedState, + args: &[&str], + ) -> Result { let target_type = args.first().unwrap_or(&"sink"); let action = args.get(1).unwrap_or(&"show"); @@ -173,21 +185,24 @@ impl WaybarModule for AudioModule { self.cycle_device(target_type)?; Ok(WaybarOutput::default()) } - "show" => self.get_status(config, state, target_type), - other => Err(anyhow!("Unknown audio action: '{}'", other)), + "show" => self.get_status(config, state, target_type).await, + other => Err(FluxoError::Module { + module: "audio", + message: format!("Unknown audio action: '{}'", other), + }), } } } impl AudioModule { - fn get_status( + async fn get_status( &self, config: &Config, state: &SharedState, target_type: &str, ) -> Result { let audio_state = { - let lock = state.read().unwrap(); + let lock = state.read().await; lock.audio.clone() }; diff --git a/src/modules/bt.rs b/src/modules/bt.rs index 63f9a40..c74213d 100644 --- a/src/modules/bt.rs +++ b/src/modules/bt.rs @@ -1,4 +1,5 @@ use crate::config::Config; +use crate::error::Result as FluxoResult; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::{BtState, SharedState}; @@ -8,8 +9,8 @@ use futures::StreamExt; use std::collections::HashMap; use std::process::Command; use std::sync::{Arc, LazyLock, Mutex}; -use std::time::{Duration, Instant}; use tokio::sync::mpsc; +use tokio::time::{Duration, Instant}; use tracing::{debug, error, info, warn}; // Maestro imports @@ -22,13 +23,6 @@ use maestro::service::MaestroService; #[allow(unused_imports)] use maestro::service::settings::{self, Setting, SettingValue}; -static RUNTIME: LazyLock = LazyLock::new(|| { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("Failed to create BT tokio runtime") -}); - #[derive(Clone, Default)] struct BudsStatus { left_battery: Option, @@ -319,8 +313,8 @@ impl BtDaemon { Self { session: None } } - pub fn poll(&mut self, state: SharedState) { - if let Err(e) = RUNTIME.block_on(self.poll_async(state)) { + pub async fn poll(&mut self, state: SharedState) { + if let Err(e) = self.poll_async(state).await { error!("BT daemon error: {}", e); } } @@ -388,9 +382,8 @@ impl BtDaemon { } } - if let Ok(mut lock) = state.write() { - lock.bluetooth = bt_state; - } + let mut lock = state.write().await; + lock.bluetooth = bt_state; Ok(()) } @@ -486,10 +479,15 @@ static PLUGINS: LazyLock>> = pub struct BtModule; impl WaybarModule for BtModule { - fn run(&self, config: &Config, state: &SharedState, args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + state: &SharedState, + args: &[&str], + ) -> FluxoResult { let action = args.first().unwrap_or(&"show"); let bt_state = { - let lock = state.read().unwrap(); + let lock = state.read().await; lock.bluetooth.clone() }; diff --git a/src/modules/btrfs.rs b/src/modules/btrfs.rs index 801b710..65340e7 100644 --- a/src/modules/btrfs.rs +++ b/src/modules/btrfs.rs @@ -1,18 +1,22 @@ use crate::config::Config; +use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::Result; pub struct BtrfsModule; impl WaybarModule for BtrfsModule { - fn run(&self, config: &Config, state: &SharedState, _args: &[&str]) -> Result { - let disks = if let Ok(s) = state.read() { + async fn run( + &self, + config: &Config, + state: &SharedState, + _args: &[&str], + ) -> Result { + let disks = { + let s = state.read().await; s.disks.clone() - } else { - return Err(anyhow::anyhow!("Failed to read state")); }; let mut total_used: f64 = 0.0; diff --git a/src/modules/cpu.rs b/src/modules/cpu.rs index f266d67..df0e3cb 100644 --- a/src/modules/cpu.rs +++ b/src/modules/cpu.rs @@ -1,24 +1,26 @@ use crate::config::Config; +use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::Result; pub struct CpuModule; impl WaybarModule for CpuModule { - fn run(&self, config: &Config, state: &SharedState, _args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + state: &SharedState, + _args: &[&str], + ) -> Result { let (usage, temp, model) = { - if let Ok(state_lock) = state.read() { - ( - state_lock.cpu.usage, - state_lock.cpu.temp, - state_lock.cpu.model.clone(), - ) - } else { - (0.0, 0.0, String::from("Unknown")) - } + let state_lock = state.read().await; + ( + state_lock.cpu.usage, + state_lock.cpu.temp, + state_lock.cpu.model.clone(), + ) }; let text = format_template( @@ -51,8 +53,8 @@ mod tests { use super::*; use crate::state::{AppState, CpuState, mock_state}; - #[test] - fn test_cpu_normal() { + #[tokio::test] + async fn test_cpu_normal() { let state = mock_state(AppState { cpu: CpuState { usage: 25.0, @@ -62,7 +64,7 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = CpuModule.run(&config, &state, &[]).unwrap(); + let output = CpuModule.run(&config, &state, &[]).await.unwrap(); assert!(output.text.contains("25.0")); assert!(output.text.contains("45.0")); assert_eq!(output.class.as_deref(), Some("normal")); @@ -70,8 +72,8 @@ mod tests { assert_eq!(output.tooltip.as_deref(), Some("Test CPU")); } - #[test] - fn test_cpu_high() { + #[tokio::test] + async fn test_cpu_high() { let state = mock_state(AppState { cpu: CpuState { usage: 80.0, @@ -81,12 +83,12 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = CpuModule.run(&config, &state, &[]).unwrap(); + let output = CpuModule.run(&config, &state, &[]).await.unwrap(); assert_eq!(output.class.as_deref(), Some("high")); } - #[test] - fn test_cpu_max() { + #[tokio::test] + async fn test_cpu_max() { let state = mock_state(AppState { cpu: CpuState { usage: 99.0, @@ -96,7 +98,7 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = CpuModule.run(&config, &state, &[]).unwrap(); + let output = CpuModule.run(&config, &state, &[]).await.unwrap(); assert_eq!(output.class.as_deref(), Some("max")); } } diff --git a/src/modules/disk.rs b/src/modules/disk.rs index 79543c7..aee30ff 100644 --- a/src/modules/disk.rs +++ b/src/modules/disk.rs @@ -1,20 +1,24 @@ use crate::config::Config; +use crate::error::{FluxoError, Result}; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::Result; pub struct DiskModule; impl WaybarModule for DiskModule { - fn run(&self, config: &Config, state: &SharedState, args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + state: &SharedState, + args: &[&str], + ) -> Result { let mountpoint = args.first().unwrap_or(&"/"); - let disks = if let Ok(s) = state.read() { + let disks = { + let s = state.read().await; s.disks.clone() - } else { - return Err(anyhow::anyhow!("Failed to read state")); }; for disk in &disks { @@ -62,7 +66,10 @@ impl WaybarModule for DiskModule { } } - Err(anyhow::anyhow!("Mountpoint {} not found", mountpoint)) + Err(FluxoError::Module { + module: "disk", + message: format!("Mountpoint {} not found", mountpoint), + }) } } @@ -83,30 +90,30 @@ mod tests { }) } - #[test] - fn test_disk_found() { + #[tokio::test] + async fn test_disk_found() { let gb = 1024 * 1024 * 1024; let state = state_with_disk("/", 100 * gb, 60 * gb); let config = Config::default(); - let output = DiskModule.run(&config, &state, &["/"]).unwrap(); + let output = DiskModule.run(&config, &state, &["/"]).await.unwrap(); assert_eq!(output.class.as_deref(), Some("normal")); assert_eq!(output.percentage, Some(40)); // 40% used } - #[test] - fn test_disk_high() { + #[tokio::test] + async fn test_disk_high() { let gb = 1024 * 1024 * 1024; let state = state_with_disk("/", 100 * gb, 15 * gb); let config = Config::default(); - let output = DiskModule.run(&config, &state, &["/"]).unwrap(); + let output = DiskModule.run(&config, &state, &["/"]).await.unwrap(); assert_eq!(output.class.as_deref(), Some("high")); // 85% used } - #[test] - fn test_disk_not_found() { + #[tokio::test] + async fn test_disk_not_found() { let state = mock_state(AppState::default()); let config = Config::default(); - let result = DiskModule.run(&config, &state, &["/nonexistent"]); + let result = DiskModule.run(&config, &state, &["/nonexistent"]).await; assert!(result.is_err()); } } diff --git a/src/modules/game.rs b/src/modules/game.rs index 6e126cf..1ff59f7 100644 --- a/src/modules/game.rs +++ b/src/modules/game.rs @@ -1,8 +1,9 @@ use crate::config::Config; +use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; -use anyhow::{Result, anyhow}; +use anyhow::anyhow; use std::env; use std::io::{Read, Write}; use std::os::unix::net::UnixStream; @@ -10,7 +11,12 @@ use std::os::unix::net::UnixStream; pub struct GameModule; impl WaybarModule for GameModule { - fn run(&self, config: &Config, _state: &SharedState, _args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + _state: &SharedState, + _args: &[&str], + ) -> Result { let is_gamemode = hyprland_ipc("j/getoption animations:enabled") .map(|stdout| stdout.contains("\"int\": 0")) .unwrap_or(false); diff --git a/src/modules/gpu.rs b/src/modules/gpu.rs index d66709e..b5e3e47 100644 --- a/src/modules/gpu.rs +++ b/src/modules/gpu.rs @@ -1,36 +1,30 @@ use crate::config::Config; +use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::Result; pub struct GpuModule; impl WaybarModule for GpuModule { - fn run(&self, config: &Config, state: &SharedState, _args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + state: &SharedState, + _args: &[&str], + ) -> Result { let (active, vendor, usage, vram_used, vram_total, temp, model) = { - if let Ok(state_lock) = state.read() { - ( - state_lock.gpu.active, - state_lock.gpu.vendor.clone(), - state_lock.gpu.usage, - state_lock.gpu.vram_used, - state_lock.gpu.vram_total, - state_lock.gpu.temp, - state_lock.gpu.model.clone(), - ) - } else { - ( - false, - String::from("Unknown"), - 0.0, - 0.0, - 0.0, - 0.0, - String::from("Unknown"), - ) - } + let state_lock = state.read().await; + ( + state_lock.gpu.active, + state_lock.gpu.vendor.clone(), + state_lock.gpu.usage, + state_lock.gpu.vram_used, + state_lock.gpu.vram_total, + state_lock.gpu.temp, + state_lock.gpu.model.clone(), + ) }; if !active { diff --git a/src/modules/hardware.rs b/src/modules/hardware.rs index ff1137c..c3bcda4 100644 --- a/src/modules/hardware.rs +++ b/src/modules/hardware.rs @@ -24,7 +24,7 @@ impl HardwareDaemon { } } - pub fn poll_fast(&mut self, state: SharedState) { + pub async fn poll_fast(&mut self, state: SharedState) { self.sys.refresh_cpu_usage(); self.sys.refresh_memory(); self.components.refresh(true); @@ -70,23 +70,22 @@ impl HardwareDaemon { } } - if let Ok(mut state_lock) = state.write() { - state_lock.cpu.usage = cpu_usage as f64; - state_lock.cpu.temp = cpu_temp; - state_lock.cpu.model = cpu_model; + let mut state_lock = state.write().await; + state_lock.cpu.usage = cpu_usage as f64; + state_lock.cpu.temp = cpu_temp; + state_lock.cpu.model = cpu_model; - state_lock.memory.total_gb = total_mem; - state_lock.memory.used_gb = used_mem; + state_lock.memory.total_gb = total_mem; + state_lock.memory.used_gb = used_mem; - state_lock.sys.load_1 = load_avg.one; - state_lock.sys.load_5 = load_avg.five; - state_lock.sys.load_15 = load_avg.fifteen; - state_lock.sys.uptime = uptime; - state_lock.sys.process_count = process_count; - } + state_lock.sys.load_1 = load_avg.one; + state_lock.sys.load_5 = load_avg.five; + state_lock.sys.load_15 = load_avg.fifteen; + state_lock.sys.uptime = uptime; + state_lock.sys.process_count = process_count; } - pub fn poll_slow(&mut self, state: SharedState) { + pub async fn poll_slow(&mut self, state: SharedState) { // 1. Gather GPU data outside of lock let mut gpu_state = crate::state::GpuState::default(); self.gpu_poll_counter = (self.gpu_poll_counter + 1) % 5; @@ -113,14 +112,13 @@ impl HardwareDaemon { } // 3. Apply to state - if let Ok(mut state_lock) = state.write() { - if should_poll_gpu { - state_lock.gpu = gpu_state; - } + let mut state_lock = state.write().await; + if should_poll_gpu { + state_lock.gpu = gpu_state; + } - if let Some(d) = disks_data { - state_lock.disks = d; - } + if let Some(d) = disks_data { + state_lock.disks = d; } } diff --git a/src/modules/memory.rs b/src/modules/memory.rs index 946d708..f2a9dfe 100644 --- a/src/modules/memory.rs +++ b/src/modules/memory.rs @@ -1,20 +1,22 @@ use crate::config::Config; +use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::Result; pub struct MemoryModule; impl WaybarModule for MemoryModule { - fn run(&self, config: &Config, state: &SharedState, _args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + state: &SharedState, + _args: &[&str], + ) -> Result { let (used_gb, total_gb) = { - if let Ok(state_lock) = state.read() { - (state_lock.memory.used_gb, state_lock.memory.total_gb) - } else { - (0.0, 0.0) - } + let state_lock = state.read().await; + (state_lock.memory.used_gb, state_lock.memory.total_gb) }; let ratio = if total_gb > 0.0 { @@ -53,8 +55,8 @@ mod tests { use super::*; use crate::state::{AppState, MemoryState, mock_state}; - #[test] - fn test_memory_normal() { + #[tokio::test] + async fn test_memory_normal() { let state = mock_state(AppState { memory: MemoryState { used_gb: 8.0, @@ -63,15 +65,15 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = MemoryModule.run(&config, &state, &[]).unwrap(); + let output = MemoryModule.run(&config, &state, &[]).await.unwrap(); assert!(output.text.contains("8.00")); assert!(output.text.contains("32.00")); assert_eq!(output.class.as_deref(), Some("normal")); assert_eq!(output.percentage, Some(25)); // 8/32 = 25% } - #[test] - fn test_memory_high() { + #[tokio::test] + async fn test_memory_high() { let state = mock_state(AppState { memory: MemoryState { used_gb: 26.0, @@ -80,12 +82,12 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = MemoryModule.run(&config, &state, &[]).unwrap(); + let output = MemoryModule.run(&config, &state, &[]).await.unwrap(); assert_eq!(output.class.as_deref(), Some("high")); // 81% } - #[test] - fn test_memory_zero_total() { + #[tokio::test] + async fn test_memory_zero_total() { let state = mock_state(AppState { memory: MemoryState { used_gb: 0.0, @@ -94,7 +96,7 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = MemoryModule.run(&config, &state, &[]).unwrap(); + let output = MemoryModule.run(&config, &state, &[]).await.unwrap(); assert_eq!(output.class.as_deref(), Some("normal")); assert_eq!(output.percentage, Some(0)); } diff --git a/src/modules/mod.rs b/src/modules/mod.rs index ec5a593..56ab6ca 100644 --- a/src/modules/mod.rs +++ b/src/modules/mod.rs @@ -12,10 +12,15 @@ pub mod power; pub mod sys; use crate::config::Config; +use crate::error::Result as FluxoResult; use crate::output::WaybarOutput; use crate::state::SharedState; -use anyhow::Result; pub trait WaybarModule { - fn run(&self, config: &Config, state: &SharedState, args: &[&str]) -> Result; + fn run( + &self, + config: &Config, + state: &SharedState, + args: &[&str], + ) -> impl std::future::Future> + Send; } diff --git a/src/modules/network.rs b/src/modules/network.rs index 876e628..77b7ae4 100644 --- a/src/modules/network.rs +++ b/src/modules/network.rs @@ -1,9 +1,9 @@ use crate::config::Config; +use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::Result; use nix::ifaddrs::getifaddrs; use std::fs; use std::time::{SystemTime, UNIX_EPOCH}; @@ -29,7 +29,7 @@ impl NetworkDaemon { } } - pub fn poll(&mut self, state: SharedState) { + pub async fn poll(&mut self, state: SharedState) { // Re-detect interface on every poll to catch VPNs or route changes immediately if let Ok(iface) = get_primary_interface() && !iface.is_empty() @@ -62,14 +62,14 @@ impl NetworkDaemon { / 1024.0 / 1024.0; - if let Ok(mut state_lock) = state.write() { - state_lock.network.rx_mbps = rx_mbps; - state_lock.network.tx_mbps = tx_mbps; - state_lock.network.interface = interface.clone(); - state_lock.network.ip = self.cached_ip.clone().unwrap_or_default(); - } - } else if let Ok(mut state_lock) = state.write() { + let mut state_lock = state.write().await; + state_lock.network.rx_mbps = rx_mbps; + state_lock.network.tx_mbps = tx_mbps; + state_lock.network.interface = interface.clone(); + state_lock.network.ip = self.cached_ip.clone().unwrap_or_default(); + } else { // First poll: no speed data yet, but update interface/ip + let mut state_lock = state.write().await; state_lock.network.interface = interface.clone(); state_lock.network.ip = self.cached_ip.clone().unwrap_or_default(); } @@ -81,8 +81,9 @@ impl NetworkDaemon { // Read failed, might be down self.cached_interface = None; } - } else if let Ok(mut state_lock) = state.write() { + } else { // No interface detected + let mut state_lock = state.write().await; state_lock.network.interface.clear(); state_lock.network.ip.clear(); } @@ -90,21 +91,20 @@ impl NetworkDaemon { } impl WaybarModule for NetworkModule { - fn run(&self, config: &Config, state: &SharedState, _args: &[&str]) -> Result { - let (interface, ip, rx_mbps, tx_mbps) = if let Ok(s) = state.read() { + async fn run( + &self, + config: &Config, + state: &SharedState, + _args: &[&str], + ) -> Result { + let (interface, ip, rx_mbps, tx_mbps) = { + let s = state.read().await; ( s.network.interface.clone(), s.network.ip.clone(), s.network.rx_mbps, s.network.tx_mbps, ) - } else { - return Ok(WaybarOutput { - text: "No connection".to_string(), - tooltip: None, - class: None, - percentage: None, - }); }; if interface.is_empty() { @@ -208,16 +208,16 @@ mod tests { use super::*; use crate::state::{AppState, NetworkState, mock_state}; - #[test] - fn test_network_no_connection() { + #[tokio::test] + async fn test_network_no_connection() { let state = mock_state(AppState::default()); let config = Config::default(); - let output = NetworkModule.run(&config, &state, &[]).unwrap(); + let output = NetworkModule.run(&config, &state, &[]).await.unwrap(); assert_eq!(output.text, "No connection"); } - #[test] - fn test_network_connected() { + #[tokio::test] + async fn test_network_connected() { let state = mock_state(AppState { network: NetworkState { rx_mbps: 1.5, @@ -228,15 +228,15 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = NetworkModule.run(&config, &state, &[]).unwrap(); + let output = NetworkModule.run(&config, &state, &[]).await.unwrap(); assert!(output.text.contains("eth0")); assert!(output.text.contains("192.168.1.100")); assert!(output.text.contains("1.50")); assert_eq!(output.class.as_deref(), Some("eth0")); } - #[test] - fn test_network_vpn_prefix() { + #[tokio::test] + async fn test_network_vpn_prefix() { let state = mock_state(AppState { network: NetworkState { rx_mbps: 0.0, @@ -247,12 +247,12 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = NetworkModule.run(&config, &state, &[]).unwrap(); + let output = NetworkModule.run(&config, &state, &[]).await.unwrap(); assert!(output.text.starts_with(" ")); } - #[test] - fn test_network_no_ip() { + #[tokio::test] + async fn test_network_no_ip() { let state = mock_state(AppState { network: NetworkState { rx_mbps: 0.0, @@ -263,7 +263,7 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = NetworkModule.run(&config, &state, &[]).unwrap(); + let output = NetworkModule.run(&config, &state, &[]).await.unwrap(); assert!(output.text.contains("No IP")); } } diff --git a/src/modules/power.rs b/src/modules/power.rs index b537dea..dd0171d 100644 --- a/src/modules/power.rs +++ b/src/modules/power.rs @@ -1,15 +1,20 @@ use crate::config::Config; +use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::Result; use std::fs; pub struct PowerModule; impl WaybarModule for PowerModule { - fn run(&self, config: &Config, _state: &SharedState, _args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + _state: &SharedState, + _args: &[&str], + ) -> Result { let critical_threshold = 15; let warning_threshold = 50; diff --git a/src/modules/sys.rs b/src/modules/sys.rs index fceb486..dee9870 100644 --- a/src/modules/sys.rs +++ b/src/modules/sys.rs @@ -1,26 +1,28 @@ use crate::config::Config; +use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::Result; pub struct SysModule; impl WaybarModule for SysModule { - fn run(&self, config: &Config, state: &SharedState, _args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + state: &SharedState, + _args: &[&str], + ) -> Result { let (load1, load5, load15, uptime_secs, process_count) = { - if let Ok(state_lock) = state.read() { - ( - state_lock.sys.load_1, - state_lock.sys.load_5, - state_lock.sys.load_15, - state_lock.sys.uptime, - state_lock.sys.process_count, - ) - } else { - (0.0, 0.0, 0.0, 0, 0) - } + let state_lock = state.read().await; + ( + state_lock.sys.load_1, + state_lock.sys.load_5, + state_lock.sys.load_15, + state_lock.sys.uptime, + state_lock.sys.process_count, + ) }; let hours = uptime_secs / 3600; diff --git a/src/state.rs b/src/state.rs index f634c19..d3e2fad 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,4 +1,7 @@ -use std::sync::{Arc, RwLock}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; +use tokio::time::Instant; #[derive(Default, Clone)] pub struct AppState { @@ -10,6 +13,14 @@ pub struct AppState { pub disks: Vec, pub bluetooth: BtState, pub audio: AudioState, + pub health: HashMap, +} + +#[derive(Clone, Default)] +pub struct ModuleHealth { + pub consecutive_failures: u32, + pub last_failure: Option, + pub backoff_until: Option, } #[derive(Default, Clone)]