From bdbd6a8a40c351d6a58c2c41ba5404901691815b Mon Sep 17 00:00:00 2001 From: Nils Pukropp Date: Thu, 2 Apr 2026 18:11:21 +0200 Subject: [PATCH] added tokio shared states instead of monolithic state --- Cargo.toml | 2 +- src/daemon.rs | 55 ++++++--- src/modules/audio.rs | 90 +++++++-------- src/modules/bt/buds.rs | 35 +++--- src/modules/bt/maestro.rs | 18 +-- src/modules/bt/mod.rs | 235 +++++++++++++++++++------------------- src/modules/btrfs.rs | 8 +- src/modules/cpu.rs | 18 ++- src/modules/disk.rs | 27 +++-- src/modules/game.rs | 17 +-- src/modules/gpu.rs | 20 ++-- src/modules/hardware.rs | 77 ++++++++----- src/modules/memory.rs | 23 ++-- src/modules/mod.rs | 4 +- src/modules/network.rs | 97 ++++++++++------ src/modules/power.rs | 4 +- src/modules/sys.rs | 16 +-- src/state.rs | 85 +++++++++++--- 18 files changed, 479 insertions(+), 352 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c78d65f..3ec1ff9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ tracing = "0.1.44" tracing-subscriber = { version = "0.3.23", features = ["env-filter"] } maestro = { git = "https://github.com/qzed/pbpctrl", package = "maestro" } bluer = { version = "0.17", features = ["bluetoothd", "rfcomm", "id"] } -tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "time", "macros", "signal"] } +tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "time", "macros", "signal", "process"] } tokio-util = { version = "0.7", features = ["codec"] } futures = "0.3" libpulse-binding = "2.30" diff --git a/src/daemon.rs b/src/daemon.rs index 1e3327e..fdac9ec 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -6,14 +6,15 @@ use crate::modules::audio::AudioDaemon; use crate::modules::bt::BtDaemon; use crate::modules::hardware::HardwareDaemon; use crate::modules::network::NetworkDaemon; -use crate::state::{AppState, SharedState}; +use crate::state::AppReceivers; use anyhow::Result; +use std::collections::HashMap; use std::fs; use std::path::PathBuf; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixListener; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, watch}; use tokio::time::{Duration, Instant, sleep}; use tracing::{debug, error, info, warn}; @@ -36,7 +37,28 @@ pub async fn run_daemon(config_path: Option) -> Result<()> { fs::remove_file(&sock_path)?; } - let state: SharedState = Arc::new(RwLock::new(AppState::default())); + let (net_tx, net_rx) = watch::channel(Default::default()); + let (cpu_tx, cpu_rx) = watch::channel(Default::default()); + let (mem_tx, mem_rx) = watch::channel(Default::default()); + let (sys_tx, sys_rx) = watch::channel(Default::default()); + let (gpu_tx, gpu_rx) = watch::channel(Default::default()); + let (disks_tx, disks_rx) = watch::channel(Default::default()); + let (bt_tx, bt_rx) = watch::channel(Default::default()); + let (audio_tx, audio_rx) = watch::channel(Default::default()); + let health = Arc::new(RwLock::new(HashMap::new())); + + let receivers = AppReceivers { + network: net_rx, + cpu: cpu_rx, + memory: mem_rx, + sys: sys_rx, + gpu: gpu_rx, + disks: disks_rx, + bluetooth: bt_rx, + audio: audio_rx, + health, + }; + let listener = UnixListener::bind(&sock_path)?; let _guard = SocketGuard { path: sock_path.clone(), @@ -55,54 +77,51 @@ pub async fn run_daemon(config_path: Option) -> Result<()> { let config = Arc::new(RwLock::new(crate::config::load_config(config_path))); // 1. Network Task - let poll_state = Arc::clone(&state); tokio::spawn(async move { info!("Starting Network polling task"); let mut daemon = NetworkDaemon::new(); loop { - daemon.poll(Arc::clone(&poll_state)).await; + daemon.poll(&net_tx).await; sleep(Duration::from_secs(1)).await; } }); // 2. Fast Hardware Task (CPU, Mem, Load) - let poll_state = Arc::clone(&state); tokio::spawn(async move { info!("Starting Fast Hardware polling task"); let mut daemon = HardwareDaemon::new(); loop { - daemon.poll_fast(Arc::clone(&poll_state)).await; + daemon.poll_fast(&cpu_tx, &mem_tx, &sys_tx).await; sleep(Duration::from_secs(1)).await; } }); // 3. Slow Hardware Task (GPU, Disks) - let poll_state = Arc::clone(&state); tokio::spawn(async move { info!("Starting Slow Hardware polling task"); let mut daemon = HardwareDaemon::new(); loop { - daemon.poll_slow(Arc::clone(&poll_state)).await; + daemon.poll_slow(&gpu_tx, &disks_tx).await; sleep(Duration::from_secs(1)).await; } }); // 4. Bluetooth Task - let poll_state = Arc::clone(&state); let poll_config = Arc::clone(&config); + let poll_receivers = receivers.clone(); tokio::spawn(async move { info!("Starting Bluetooth polling task"); let mut daemon = BtDaemon::new(); loop { let config = poll_config.read().await; - daemon.poll(Arc::clone(&poll_state), &config).await; + daemon.poll(&bt_tx, &poll_receivers, &config).await; sleep(Duration::from_secs(1)).await; } }); // 5. Audio Thread (Event driven - pulse usually needs its own thread) let audio_daemon = AudioDaemon::new(); - audio_daemon.start(Arc::clone(&state)); + audio_daemon.start(&audio_tx); info!("Fluxo daemon successfully bound to socket: {}", sock_path); @@ -114,7 +133,7 @@ pub async fn run_daemon(config_path: Option) -> Result<()> { res = listener.accept() => { match res { Ok((mut stream, _)) => { - let state_clone = Arc::clone(&state); + let state_clone = receivers.clone(); let config_clone = Arc::clone(&config); let cp_clone = config_path_clone.clone(); tokio::spawn(async move { @@ -174,13 +193,13 @@ pub async fn run_daemon(config_path: Option) -> Result<()> { async fn handle_request( module_name: &str, args: &[&str], - state: &SharedState, + state: &AppReceivers, config_lock: &Arc>, ) -> String { // 1. Check Circuit Breaker status let is_in_backoff = { - let lock = state.read().await; - if let Some(health) = lock.health.get(module_name) { + let lock = state.health.read().await; + if let Some(health) = lock.get(module_name) { if let Some(until) = health.backoff_until { Instant::now() < until } else { @@ -265,8 +284,8 @@ async fn handle_request( // 2. Update Health based on result { - let mut lock = state.write().await; - let health = lock.health.entry(module_name.to_string()).or_default(); + let mut lock = state.health.write().await; + let health = lock.entry(module_name.to_string()).or_default(); match &result { Ok(_) => { health.consecutive_failures = 0; diff --git a/src/modules/audio.rs b/src/modules/audio.rs index 85fe256..3194e46 100644 --- a/src/modules/audio.rs +++ b/src/modules/audio.rs @@ -2,24 +2,16 @@ use crate::config::Config; use crate::error::{FluxoError, Result}; use crate::modules::WaybarModule; use crate::output::WaybarOutput; -use crate::state::SharedState; +use crate::state::{AppReceivers, AudioState}; use crate::utils::{TokenValue, format_template}; use libpulse_binding::callbacks::ListResult; use libpulse_binding::context::subscribe::{Facility, InterestMaskSet}; use libpulse_binding::context::{Context, FlagSet as ContextFlag}; use libpulse_binding::mainloop::threaded::Mainloop as ThreadedMainloop; use libpulse_binding::volume::Volume; -use std::process::Command; -use std::sync::{Arc, LazyLock}; +use tokio::sync::watch; use tracing::error; -static RUNTIME: LazyLock = LazyLock::new(|| { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("Failed to create Audio tokio runtime") -}); - pub struct AudioDaemon; impl AudioDaemon { @@ -27,8 +19,8 @@ impl AudioDaemon { Self } - pub fn start(&self, state: SharedState) { - let state_arc = Arc::clone(&state); + pub fn start(&self, state_tx: &watch::Sender) { + let state_tx = state_tx.clone(); std::thread::spawn(move || { let mut mainloop = @@ -64,7 +56,7 @@ impl AudioDaemon { } // Initial fetch - let _ = fetch_audio_data_sync(&mut context, &state_arc); + let _ = fetch_audio_data_sync(&mut context, &state_tx); // Subscribe to events let interest = @@ -89,7 +81,7 @@ impl AudioDaemon { let _ = rx.recv_timeout(Duration::from_secs(2)); { mainloop.lock(); - let _ = fetch_audio_data_sync(&mut context, &state_arc); + let _ = fetch_audio_data_sync(&mut context, &state_tx); mainloop.unlock(); } } @@ -99,68 +91,72 @@ impl AudioDaemon { use std::time::Duration; -fn fetch_audio_data_sync(context: &mut Context, state: &SharedState) -> Result<()> { - let state_inner = Arc::clone(state); - +fn fetch_audio_data_sync( + context: &mut Context, + state_tx: &watch::Sender, +) -> Result<()> { // We fetch all sinks and sources, and also server info to know defaults. // The order doesn't strictly matter as long as we update correctly. - let st_server = Arc::clone(&state_inner); + let tx_server = state_tx.clone(); context.introspect().get_server_info(move |info| { - let mut lock = RUNTIME.block_on(st_server.write()); - lock.audio.sink.name = info + let mut current = tx_server.borrow().clone(); + current.sink.name = info .default_sink_name .as_ref() .map(|s| s.to_string()) .unwrap_or_default(); - lock.audio.source.name = info + current.source.name = info .default_source_name .as_ref() .map(|s| s.to_string()) .unwrap_or_default(); + let _ = tx_server.send(current); }); - let st_sink = Arc::clone(&state_inner); + let tx_sink = state_tx.clone(); context.introspect().get_sink_info_list(move |res| { if let ListResult::Item(item) = res { - let mut lock = RUNTIME.block_on(st_sink.write()); + let mut current = tx_sink.borrow().clone(); // If this matches our default sink name, or if we don't have details for any yet let is_default = item .name .as_ref() - .map(|s| s.as_ref() == lock.audio.sink.name) + .map(|s| s.as_ref() == current.sink.name) .unwrap_or(false); if is_default { - lock.audio.sink.description = item + current.sink.description = item .description .as_ref() .map(|s| s.to_string()) .unwrap_or_default(); - lock.audio.sink.volume = + current.sink.volume = ((item.volume.avg().0 as f64 / Volume::NORMAL.0 as f64) * 100.0).round() as u8; - lock.audio.sink.muted = item.mute; + current.sink.muted = item.mute; + let _ = tx_sink.send(current); } } }); - let st_source = Arc::clone(&state_inner); + let tx_source = state_tx.clone(); context.introspect().get_source_info_list(move |res| { if let ListResult::Item(item) = res { - let mut lock = RUNTIME.block_on(st_source.write()); + let mut current = tx_source.borrow().clone(); let is_default = item .name .as_ref() - .map(|s| s.as_ref() == lock.audio.source.name) + .map(|s| s.as_ref() == current.source.name) .unwrap_or(false); if is_default { - lock.audio.source.description = item + current.source.description = item .description .as_ref() .map(|s| s.to_string()) .unwrap_or_default(); - lock.audio.source.volume = + current.source.volume = ((item.volume.avg().0 as f64 / Volume::NORMAL.0 as f64) * 100.0).round() as u8; - lock.audio.source.muted = item.mute; + current.source.muted = item.mute; + let _ = tx_source.send(current); } } }); @@ -174,7 +170,7 @@ impl WaybarModule for AudioModule { async fn run( &self, config: &Config, - state: &SharedState, + state: &AppReceivers, args: &[&str], ) -> Result { let target_type = args.first().unwrap_or(&"sink"); @@ -182,7 +178,7 @@ impl WaybarModule for AudioModule { match *action { "cycle" => { - self.cycle_device(target_type)?; + self.cycle_device(target_type).await?; Ok(WaybarOutput::default()) } "show" => self.get_status(config, state, target_type).await, @@ -198,13 +194,10 @@ impl AudioModule { async fn get_status( &self, config: &Config, - state: &SharedState, + state: &AppReceivers, target_type: &str, ) -> Result { - let audio_state = { - let lock = state.read().await; - lock.audio.clone() - }; + let audio_state = state.audio.borrow().clone(); let (name, description, volume, muted) = if target_type == "sink" { ( @@ -287,7 +280,7 @@ impl AudioModule { }) } - fn cycle_device(&self, target_type: &str) -> Result<()> { + async fn cycle_device(&self, target_type: &str) -> Result<()> { // Keep using pactl for cycling for now as it's a rare action // but we could also implement it natively later. let set_cmd = if target_type == "sink" { @@ -305,9 +298,10 @@ impl AudioModule { } else { "sources" }; - let output = Command::new("pactl") + let output = tokio::process::Command::new("pactl") .args(["list", "short", list_cmd]) - .output()?; + .output() + .await?; let stdout = String::from_utf8_lossy(&output.stdout); let devices: Vec = stdout @@ -331,7 +325,10 @@ impl AudioModule { return Ok(()); } - let info_output = Command::new("pactl").args(["info"]).output()?; + let info_output = tokio::process::Command::new("pactl") + .args(["info"]) + .output() + .await?; let info_stdout = String::from_utf8_lossy(&info_output.stdout); let search_key = if target_type == "sink" { "Default Sink:" @@ -349,7 +346,10 @@ impl AudioModule { let next_index = (current_index + 1) % devices.len(); let next_dev = &devices[next_index]; - Command::new("pactl").args([set_cmd, next_dev]).status()?; + tokio::process::Command::new("pactl") + .args([set_cmd, next_dev]) + .status() + .await?; Ok(()) } } diff --git a/src/modules/bt/buds.rs b/src/modules/bt/buds.rs index bd17c23..403b06a 100644 --- a/src/modules/bt/buds.rs +++ b/src/modules/bt/buds.rs @@ -1,10 +1,9 @@ use crate::config::Config; use crate::error::{FluxoError, Result as FluxoResult}; -use crate::modules::bt::maestro::{BudsCommand, get_maestro}; -use crate::state::SharedState; +use crate::modules::bt::maestro::BudsCommand; +use crate::state::AppReceivers; use crate::utils::TokenValue; use futures::future::BoxFuture; -use std::sync::Arc; pub trait BtPlugin: Send + Sync { fn name(&self) -> &str; @@ -12,21 +11,21 @@ pub trait BtPlugin: Send + Sync { fn get_data( &self, config: &Config, - state: &SharedState, + state: &AppReceivers, mac: &str, ) -> BoxFuture<'static, FluxoResult>>; fn get_modes( &self, mac: &str, - state: &SharedState, + state: &AppReceivers, ) -> BoxFuture<'static, FluxoResult>>; fn set_mode( &self, mode: &str, mac: &str, - state: &SharedState, + state: &AppReceivers, ) -> BoxFuture<'static, FluxoResult<()>>; - fn cycle_mode(&self, mac: &str, state: &SharedState) -> BoxFuture<'static, FluxoResult<()>>; + fn cycle_mode(&self, mac: &str, state: &AppReceivers) -> BoxFuture<'static, FluxoResult<()>>; } pub struct PixelBudsPlugin; @@ -43,13 +42,13 @@ impl BtPlugin for PixelBudsPlugin { fn get_data( &self, _config: &Config, - state: &SharedState, + state: &AppReceivers, mac: &str, ) -> BoxFuture<'static, FluxoResult>> { let mac = mac.to_string(); - let state = Arc::clone(state); + let state = state.clone(); Box::pin(async move { - let maestro = get_maestro(&state); + let maestro = crate::modules::bt::maestro::get_maestro(&state); maestro.ensure_task(&mac); let status = maestro.get_status(&mac); @@ -91,7 +90,7 @@ impl BtPlugin for PixelBudsPlugin { fn get_modes( &self, _mac: &str, - _state: &SharedState, + _state: &AppReceivers, ) -> BoxFuture<'static, FluxoResult>> { Box::pin(async move { Ok(vec![ @@ -106,13 +105,13 @@ impl BtPlugin for PixelBudsPlugin { &self, mode: &str, mac: &str, - state: &SharedState, + state: &AppReceivers, ) -> BoxFuture<'static, FluxoResult<()>> { let mode = mode.to_string(); let mac = mac.to_string(); - let state = Arc::clone(state); + let state = state.clone(); Box::pin(async move { - get_maestro(&state) + crate::modules::bt::maestro::get_maestro(&state) .send_command(&mac, BudsCommand::SetAnc(mode)) .map_err(|e: anyhow::Error| FluxoError::Module { module: "bt.buds", @@ -121,17 +120,17 @@ impl BtPlugin for PixelBudsPlugin { }) } - fn cycle_mode(&self, mac: &str, state: &SharedState) -> BoxFuture<'static, FluxoResult<()>> { + fn cycle_mode(&self, mac: &str, state: &AppReceivers) -> BoxFuture<'static, FluxoResult<()>> { let mac = mac.to_string(); - let state = Arc::clone(state); + let state = state.clone(); Box::pin(async move { - let status = get_maestro(&state).get_status(&mac); + let status = crate::modules::bt::maestro::get_maestro(&state).get_status(&mac); let next_mode = match status.anc_state.as_str() { "active" => "aware", "aware" => "off", _ => "active", }; - get_maestro(&state) + crate::modules::bt::maestro::get_maestro(&state) .send_command(&mac, BudsCommand::SetAnc(next_mode.to_string())) .map_err(|e: anyhow::Error| FluxoError::Module { module: "bt.buds", diff --git a/src/modules/bt/maestro.rs b/src/modules/bt/maestro.rs index 8b9bcf8..ca33ef0 100644 --- a/src/modules/bt/maestro.rs +++ b/src/modules/bt/maestro.rs @@ -1,4 +1,4 @@ -use crate::state::SharedState; +use crate::state::AppReceivers; use anyhow::{Context, Result}; use futures::StreamExt; use std::collections::HashMap; @@ -39,11 +39,11 @@ pub struct MaestroManager { } impl MaestroManager { - pub fn new(state: SharedState) -> Self { + pub fn new(state: AppReceivers) -> Self { let (tx, mut rx) = mpsc::unbounded_channel::(); let statuses = Arc::new(Mutex::new(HashMap::new())); let statuses_clone = Arc::clone(&statuses); - let state_clone = Arc::clone(&state); + let state_clone = state.clone(); // Start dedicated BT management thread std::thread::spawn(move || { @@ -68,7 +68,7 @@ impl MaestroManager { let mac_clone = mac.clone(); let st_clone = Arc::clone(&statuses_clone); - let state_inner = Arc::clone(&state_clone); + let state_inner = state_clone.clone(); tokio::task::spawn_local(async move { if let Err(e) = buds_task(&mac_clone, st_clone, buds_rx, state_inner).await { @@ -122,7 +122,7 @@ async fn buds_task( mac: &str, statuses: Arc>>, mut rx: mpsc::Receiver, - state: SharedState, + state: AppReceivers, ) -> Result<()> { info!("Starting native Maestro connection task for {}", mac); @@ -215,8 +215,8 @@ async fn buds_task( // Update health { - let mut lock = state.write().await; - let health = lock.health.entry("bt.buds".to_string()).or_default(); + let mut lock = state.health.write().await; + let health = lock.entry("bt.buds".to_string()).or_default(); health.consecutive_failures = 0; health.backoff_until = None; } @@ -357,6 +357,6 @@ pub fn anc_state_to_string(state: &settings::AncState) -> String { static MAESTRO: OnceLock = OnceLock::new(); -pub fn get_maestro(state: &SharedState) -> &MaestroManager { - MAESTRO.get_or_init(|| MaestroManager::new(Arc::clone(state))) +pub fn get_maestro(state: &AppReceivers) -> &MaestroManager { + MAESTRO.get_or_init(|| MaestroManager::new(state.clone())) } diff --git a/src/modules/bt/mod.rs b/src/modules/bt/mod.rs index ecd033c..55f93eb 100644 --- a/src/modules/bt/mod.rs +++ b/src/modules/bt/mod.rs @@ -5,11 +5,11 @@ use crate::config::Config; use crate::error::Result as FluxoResult; use crate::modules::WaybarModule; use crate::output::WaybarOutput; -use crate::state::{BtState, SharedState}; +use crate::state::{AppReceivers, BtState}; use crate::utils::{TokenValue, format_template}; use anyhow::Result; -use std::process::Command; -use std::sync::{Arc, LazyLock}; +use std::sync::LazyLock; +use tokio::sync::watch; use tracing::{error, warn}; use self::buds::{BtPlugin, PixelBudsPlugin}; @@ -23,13 +23,23 @@ impl BtDaemon { Self { session: None } } - pub async fn poll(&mut self, state: SharedState, config: &Config) { - if let Err(e) = self.poll_async(state, config).await { + pub async fn poll( + &mut self, + tx: &watch::Sender, + state: &AppReceivers, + config: &Config, + ) { + if let Err(e) = self.poll_async(tx, state, config).await { error!("BT daemon error: {}", e); } } - async fn poll_async(&mut self, state: SharedState, config: &Config) -> Result<()> { + async fn poll_async( + &mut self, + tx: &watch::Sender, + state: &AppReceivers, + config: &Config, + ) -> Result<()> { if self.session.is_none() { self.session = Some(bluer::Session::new().await?); } @@ -60,7 +70,7 @@ impl BtDaemon { for p in PLUGINS.iter() { if p.can_handle(&bt_state.device_alias, &bt_state.device_address) { - match p.get_data(config, &state, &bt_state.device_address).await { + match p.get_data(config, state, &bt_state.device_address).await { Ok(data) => { bt_state.plugin_data = data .into_iter() @@ -90,8 +100,7 @@ impl BtDaemon { } } - let mut lock = state.write().await; - lock.bluetooth = bt_state; + let _ = tx.send(bt_state); Ok(()) } @@ -103,134 +112,128 @@ static PLUGINS: LazyLock>> = pub struct BtModule; impl WaybarModule for BtModule { - fn run( + async fn run( &self, config: &Config, - state: &SharedState, + state: &AppReceivers, args: &[&str], - ) -> impl std::future::Future> + Send { + ) -> FluxoResult { let action = args.first().cloned().unwrap_or("show").to_string(); let args = args.iter().map(|s| s.to_string()).collect::>(); - let state = Arc::clone(state); - let config = config.clone(); - async move { - let bt_state = { - let lock = state.read().await; - lock.bluetooth.clone() - }; + let bt_state = state.bluetooth.borrow().clone(); - match action.as_str() { - "disconnect" if bt_state.connected => { - let _ = Command::new("bluetoothctl") - .args(["disconnect", &bt_state.device_address]) - .output(); - return Ok(WaybarOutput::default()); + match action.as_str() { + "disconnect" if bt_state.connected => { + let _ = tokio::process::Command::new("bluetoothctl") + .args(["disconnect", &bt_state.device_address]) + .output() + .await; + return Ok(WaybarOutput::default()); + } + "cycle_mode" if bt_state.connected => { + let plugin = PLUGINS + .iter() + .find(|p| p.can_handle(&bt_state.device_alias, &bt_state.device_address)); + if let Some(p) = plugin { + p.cycle_mode(&bt_state.device_address, state).await?; } - "cycle_mode" if bt_state.connected => { + return Ok(WaybarOutput::default()); + } + "get_modes" if bt_state.connected => { + let plugin = PLUGINS + .iter() + .find(|p| p.can_handle(&bt_state.device_alias, &bt_state.device_address)); + let modes = if let Some(p) = plugin { + p.get_modes(&bt_state.device_address, state).await? + } else { + vec![] + }; + return Ok(WaybarOutput { + text: modes.join("\n"), + ..Default::default() + }); + } + "set_mode" if bt_state.connected => { + if let Some(mode) = args.get(1) { let plugin = PLUGINS .iter() .find(|p| p.can_handle(&bt_state.device_alias, &bt_state.device_address)); if let Some(p) = plugin { - p.cycle_mode(&bt_state.device_address, &state).await?; + p.set_mode(mode, &bt_state.device_address, state).await?; } - return Ok(WaybarOutput::default()); } - "get_modes" if bt_state.connected => { - let plugin = PLUGINS - .iter() - .find(|p| p.can_handle(&bt_state.device_alias, &bt_state.device_address)); - let modes = if let Some(p) = plugin { - p.get_modes(&bt_state.device_address, &state).await? - } else { - vec![] - }; - return Ok(WaybarOutput { - text: modes.join("\n"), - ..Default::default() - }); - } - "set_mode" if bt_state.connected => { - if let Some(mode) = args.get(1) { - let plugin = PLUGINS.iter().find(|p| { - p.can_handle(&bt_state.device_alias, &bt_state.device_address) - }); - if let Some(p) = plugin { - p.set_mode(mode, &bt_state.device_address, &state).await?; - } - } - return Ok(WaybarOutput::default()); - } - "show" => {} - _ => {} + return Ok(WaybarOutput::default()); } + "show" => {} + _ => {} + } - if !bt_state.adapter_powered { - return Ok(WaybarOutput { - text: config.bt.format_disabled.clone(), - tooltip: Some("Bluetooth Disabled".to_string()), - class: Some("disabled".to_string()), - percentage: None, - }); - } + if !bt_state.adapter_powered { + return Ok(WaybarOutput { + text: config.bt.format_disabled.clone(), + tooltip: Some("Bluetooth Disabled".to_string()), + class: Some("disabled".to_string()), + percentage: None, + }); + } - if bt_state.connected { - let mut tokens: Vec<(String, TokenValue)> = vec![ - ( - "alias".to_string(), - TokenValue::String(bt_state.device_alias.clone()), - ), - ( - "mac".to_string(), - TokenValue::String(bt_state.device_address.clone()), - ), - ]; + if bt_state.connected { + let mut tokens: Vec<(String, TokenValue)> = vec![ + ( + "alias".to_string(), + TokenValue::String(bt_state.device_alias.clone()), + ), + ( + "mac".to_string(), + TokenValue::String(bt_state.device_address.clone()), + ), + ]; - let mut class = vec!["connected".to_string()]; - let mut has_plugin = false; + let mut class = vec!["connected".to_string()]; + let mut has_plugin = false; - for (k, v) in &bt_state.plugin_data { - if k == "plugin_class" { - class.push(v.clone()); - has_plugin = true; - } else if k == "plugin_error" { - class.push("plugin-error".to_string()); - } else { - tokens.push((k.clone(), TokenValue::String(v.clone()))); - } - } - - let format = if has_plugin { - &config.bt.format_plugin + for (k, v) in &bt_state.plugin_data { + if k == "plugin_class" { + class.push(v.clone()); + has_plugin = true; + } else if k == "plugin_error" { + class.push("plugin-error".to_string()); } else { - &config.bt.format_connected - }; - - let text = format_template(format, &tokens); - let tooltip = format!( - "{} | MAC: {}\nBattery: {}", - bt_state.device_alias, - bt_state.device_address, - bt_state - .battery_percentage - .map(|b| format!("{}%", b)) - .unwrap_or_else(|| "N/A".to_string()) - ); - - Ok(WaybarOutput { - text, - tooltip: Some(tooltip), - class: Some(class.join(" ")), - percentage: bt_state.battery_percentage, - }) - } else { - Ok(WaybarOutput { - text: config.bt.format_disconnected.clone(), - tooltip: Some("Bluetooth On (Disconnected)".to_string()), - class: Some("disconnected".to_string()), - percentage: None, - }) + tokens.push((k.clone(), TokenValue::String(v.clone()))); + } } + + let format = if has_plugin { + &config.bt.format_plugin + } else { + &config.bt.format_connected + }; + + let text = format_template(format, &tokens); + let tooltip = format!( + "{} | MAC: {}\nBattery: {}", + bt_state.device_alias, + bt_state.device_address, + bt_state + .battery_percentage + .map(|b| format!("{}%", b)) + .unwrap_or_else(|| "N/A".to_string()) + ); + + Ok(WaybarOutput { + text, + tooltip: Some(tooltip), + class: Some(class.join(" ")), + percentage: bt_state.battery_percentage, + }) + } else { + Ok(WaybarOutput { + text: config.bt.format_disconnected.clone(), + tooltip: Some("Bluetooth On (Disconnected)".to_string()), + class: Some("disconnected".to_string()), + percentage: None, + }) } } } diff --git a/src/modules/btrfs.rs b/src/modules/btrfs.rs index 65340e7..096926c 100644 --- a/src/modules/btrfs.rs +++ b/src/modules/btrfs.rs @@ -2,7 +2,7 @@ use crate::config::Config; use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; -use crate::state::SharedState; +use crate::state::AppReceivers; use crate::utils::{TokenValue, format_template}; pub struct BtrfsModule; @@ -11,12 +11,12 @@ impl WaybarModule for BtrfsModule { async fn run( &self, config: &Config, - state: &SharedState, + state: &AppReceivers, _args: &[&str], ) -> Result { let disks = { - let s = state.read().await; - s.disks.clone() + let s = state.disks.borrow(); + s.clone() }; let mut total_used: f64 = 0.0; diff --git a/src/modules/cpu.rs b/src/modules/cpu.rs index df0e3cb..619a6c1 100644 --- a/src/modules/cpu.rs +++ b/src/modules/cpu.rs @@ -2,7 +2,7 @@ use crate::config::Config; use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; -use crate::state::SharedState; +use crate::state::AppReceivers; use crate::utils::{TokenValue, format_template}; pub struct CpuModule; @@ -11,16 +11,12 @@ impl WaybarModule for CpuModule { async fn run( &self, config: &Config, - state: &SharedState, + state: &AppReceivers, _args: &[&str], ) -> Result { let (usage, temp, model) = { - let state_lock = state.read().await; - ( - state_lock.cpu.usage, - state_lock.cpu.temp, - state_lock.cpu.model.clone(), - ) + let s = state.cpu.borrow(); + (s.usage, s.temp, s.model.clone()) }; let text = format_template( @@ -64,7 +60,7 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = CpuModule.run(&config, &state, &[]).await.unwrap(); + let output = CpuModule.run(&config, &state.receivers, &[]).await.unwrap(); assert!(output.text.contains("25.0")); assert!(output.text.contains("45.0")); assert_eq!(output.class.as_deref(), Some("normal")); @@ -83,7 +79,7 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = CpuModule.run(&config, &state, &[]).await.unwrap(); + let output = CpuModule.run(&config, &state.receivers, &[]).await.unwrap(); assert_eq!(output.class.as_deref(), Some("high")); } @@ -98,7 +94,7 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = CpuModule.run(&config, &state, &[]).await.unwrap(); + let output = CpuModule.run(&config, &state.receivers, &[]).await.unwrap(); assert_eq!(output.class.as_deref(), Some("max")); } } diff --git a/src/modules/disk.rs b/src/modules/disk.rs index aee30ff..26fc079 100644 --- a/src/modules/disk.rs +++ b/src/modules/disk.rs @@ -2,7 +2,7 @@ use crate::config::Config; use crate::error::{FluxoError, Result}; use crate::modules::WaybarModule; use crate::output::WaybarOutput; -use crate::state::SharedState; +use crate::state::AppReceivers; use crate::utils::{TokenValue, format_template}; pub struct DiskModule; @@ -11,15 +11,12 @@ impl WaybarModule for DiskModule { async fn run( &self, config: &Config, - state: &SharedState, + state: &AppReceivers, args: &[&str], ) -> Result { let mountpoint = args.first().unwrap_or(&"/"); - let disks = { - let s = state.read().await; - s.disks.clone() - }; + let disks = state.disks.borrow().clone(); for disk in &disks { if disk.mount_point == *mountpoint { @@ -76,9 +73,9 @@ impl WaybarModule for DiskModule { #[cfg(test)] mod tests { use super::*; - use crate::state::{AppState, DiskInfo, mock_state}; + use crate::state::{AppState, DiskInfo, MockState, mock_state}; - fn state_with_disk(mount: &str, total: u64, available: u64) -> crate::state::SharedState { + fn state_with_disk(mount: &str, total: u64, available: u64) -> MockState { mock_state(AppState { disks: vec![DiskInfo { mount_point: mount.to_string(), @@ -95,7 +92,10 @@ mod tests { let gb = 1024 * 1024 * 1024; let state = state_with_disk("/", 100 * gb, 60 * gb); let config = Config::default(); - let output = DiskModule.run(&config, &state, &["/"]).await.unwrap(); + let output = DiskModule + .run(&config, &state.receivers, &["/"]) + .await + .unwrap(); assert_eq!(output.class.as_deref(), Some("normal")); assert_eq!(output.percentage, Some(40)); // 40% used } @@ -105,7 +105,10 @@ mod tests { let gb = 1024 * 1024 * 1024; let state = state_with_disk("/", 100 * gb, 15 * gb); let config = Config::default(); - let output = DiskModule.run(&config, &state, &["/"]).await.unwrap(); + let output = DiskModule + .run(&config, &state.receivers, &["/"]) + .await + .unwrap(); assert_eq!(output.class.as_deref(), Some("high")); // 85% used } @@ -113,7 +116,9 @@ mod tests { async fn test_disk_not_found() { let state = mock_state(AppState::default()); let config = Config::default(); - let result = DiskModule.run(&config, &state, &["/nonexistent"]).await; + let result = DiskModule + .run(&config, &state.receivers, &["/nonexistent"]) + .await; assert!(result.is_err()); } } diff --git a/src/modules/game.rs b/src/modules/game.rs index 1ff59f7..e128e13 100644 --- a/src/modules/game.rs +++ b/src/modules/game.rs @@ -2,11 +2,11 @@ use crate::config::Config; use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; -use crate::state::SharedState; +use crate::state::AppReceivers; use anyhow::anyhow; use std::env; -use std::io::{Read, Write}; -use std::os::unix::net::UnixStream; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::UnixStream; pub struct GameModule; @@ -14,10 +14,11 @@ impl WaybarModule for GameModule { async fn run( &self, config: &Config, - _state: &SharedState, + _state: &AppReceivers, _args: &[&str], ) -> Result { let is_gamemode = hyprland_ipc("j/getoption animations:enabled") + .await .map(|stdout| stdout.contains("\"int\": 0")) .unwrap_or(false); @@ -39,16 +40,16 @@ impl WaybarModule for GameModule { } } -fn hyprland_ipc(cmd: &str) -> Result { +async fn hyprland_ipc(cmd: &str) -> Result { let signature = env::var("HYPRLAND_INSTANCE_SIGNATURE") .map_err(|_| anyhow!("HYPRLAND_INSTANCE_SIGNATURE not set"))?; let path = format!("/tmp/hypr/{}/.socket.sock", signature); - let mut stream = UnixStream::connect(path)?; - stream.write_all(cmd.as_bytes())?; + let mut stream = UnixStream::connect(path).await?; + stream.write_all(cmd.as_bytes()).await?; let mut response = String::new(); - stream.read_to_string(&mut response)?; + stream.read_to_string(&mut response).await?; Ok(response) } diff --git a/src/modules/gpu.rs b/src/modules/gpu.rs index b5e3e47..db5023c 100644 --- a/src/modules/gpu.rs +++ b/src/modules/gpu.rs @@ -2,7 +2,7 @@ use crate::config::Config; use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; -use crate::state::SharedState; +use crate::state::AppReceivers; use crate::utils::{TokenValue, format_template}; pub struct GpuModule; @@ -11,19 +11,19 @@ impl WaybarModule for GpuModule { async fn run( &self, config: &Config, - state: &SharedState, + state: &AppReceivers, _args: &[&str], ) -> Result { let (active, vendor, usage, vram_used, vram_total, temp, model) = { - let state_lock = state.read().await; + let s = state.gpu.borrow(); ( - state_lock.gpu.active, - state_lock.gpu.vendor.clone(), - state_lock.gpu.usage, - state_lock.gpu.vram_used, - state_lock.gpu.vram_total, - state_lock.gpu.temp, - state_lock.gpu.model.clone(), + s.active, + s.vendor.clone(), + s.usage, + s.vram_used, + s.vram_total, + s.temp, + s.model.clone(), ) }; diff --git a/src/modules/hardware.rs b/src/modules/hardware.rs index c3bcda4..ffa5d8b 100644 --- a/src/modules/hardware.rs +++ b/src/modules/hardware.rs @@ -1,5 +1,6 @@ -use crate::state::{DiskInfo, SharedState}; +use crate::state::{CpuState, DiskInfo, GpuState, MemoryState, SysState}; use sysinfo::{Components, Disks, System}; +use tokio::sync::watch; pub struct HardwareDaemon { sys: System, @@ -24,7 +25,12 @@ impl HardwareDaemon { } } - pub async fn poll_fast(&mut self, state: SharedState) { + pub async fn poll_fast( + &mut self, + cpu_tx: &watch::Sender, + mem_tx: &watch::Sender, + sys_tx: &watch::Sender, + ) { self.sys.refresh_cpu_usage(); self.sys.refresh_memory(); self.components.refresh(true); @@ -70,28 +76,37 @@ impl HardwareDaemon { } } - let mut state_lock = state.write().await; - state_lock.cpu.usage = cpu_usage as f64; - state_lock.cpu.temp = cpu_temp; - state_lock.cpu.model = cpu_model; + let mut cpu = cpu_tx.borrow().clone(); + cpu.usage = cpu_usage as f64; + cpu.temp = cpu_temp; + cpu.model = cpu_model; + let _ = cpu_tx.send(cpu); - state_lock.memory.total_gb = total_mem; - state_lock.memory.used_gb = used_mem; + let mut mem = mem_tx.borrow().clone(); + mem.total_gb = total_mem; + mem.used_gb = used_mem; + let _ = mem_tx.send(mem); - state_lock.sys.load_1 = load_avg.one; - state_lock.sys.load_5 = load_avg.five; - state_lock.sys.load_15 = load_avg.fifteen; - state_lock.sys.uptime = uptime; - state_lock.sys.process_count = process_count; + let mut sys = sys_tx.borrow().clone(); + sys.load_1 = load_avg.one; + sys.load_5 = load_avg.five; + sys.load_15 = load_avg.fifteen; + sys.uptime = uptime; + sys.process_count = process_count; + let _ = sys_tx.send(sys); } - pub async fn poll_slow(&mut self, state: SharedState) { + pub async fn poll_slow( + &mut self, + gpu_tx: &watch::Sender, + disks_tx: &watch::Sender>, + ) { // 1. Gather GPU data outside of lock let mut gpu_state = crate::state::GpuState::default(); self.gpu_poll_counter = (self.gpu_poll_counter + 1) % 5; let should_poll_gpu = self.gpu_poll_counter == 0; if should_poll_gpu { - self.poll_gpu(&mut gpu_state); + self.poll_gpu(&mut gpu_state).await; } // 2. Gather Disk data outside of lock @@ -99,39 +114,43 @@ impl HardwareDaemon { self.disk_poll_counter = (self.disk_poll_counter + 1) % 10; if self.disk_poll_counter == 0 { disks_data = Some( - Disks::new_with_refreshed_list() - .iter() - .map(|d| DiskInfo { - mount_point: d.mount_point().to_string_lossy().into_owned(), - filesystem: d.file_system().to_string_lossy().to_lowercase(), - total_bytes: d.total_space(), - available_bytes: d.available_space(), - }) - .collect::>(), + tokio::task::spawn_blocking(|| { + Disks::new_with_refreshed_list() + .iter() + .map(|d| DiskInfo { + mount_point: d.mount_point().to_string_lossy().into_owned(), + filesystem: d.file_system().to_string_lossy().to_lowercase(), + total_bytes: d.total_space(), + available_bytes: d.available_space(), + }) + .collect::>() + }) + .await + .unwrap_or_default(), ); } // 3. Apply to state - let mut state_lock = state.write().await; if should_poll_gpu { - state_lock.gpu = gpu_state; + let _ = gpu_tx.send(gpu_state); } if let Some(d) = disks_data { - state_lock.disks = d; + let _ = disks_tx.send(d); } } - fn poll_gpu(&mut self, gpu: &mut crate::state::GpuState) { + async fn poll_gpu(&mut self, gpu: &mut crate::state::GpuState) { gpu.active = false; if (self.gpu_vendor.as_deref() == Some("NVIDIA") || self.gpu_vendor.is_none()) - && let Ok(output) = std::process::Command::new("nvidia-smi") + && let Ok(output) = tokio::process::Command::new("nvidia-smi") .args([ "--query-gpu=utilization.gpu,memory.used,memory.total,temperature.gpu,name", "--format=csv,noheader,nounits", ]) .output() + .await && output.status.success() { let stdout = String::from_utf8_lossy(&output.stdout); diff --git a/src/modules/memory.rs b/src/modules/memory.rs index f2a9dfe..83ed9eb 100644 --- a/src/modules/memory.rs +++ b/src/modules/memory.rs @@ -2,7 +2,7 @@ use crate::config::Config; use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; -use crate::state::SharedState; +use crate::state::AppReceivers; use crate::utils::{TokenValue, format_template}; pub struct MemoryModule; @@ -11,12 +11,12 @@ impl WaybarModule for MemoryModule { async fn run( &self, config: &Config, - state: &SharedState, + state: &AppReceivers, _args: &[&str], ) -> Result { let (used_gb, total_gb) = { - let state_lock = state.read().await; - (state_lock.memory.used_gb, state_lock.memory.total_gb) + let s = state.memory.borrow(); + (s.used_gb, s.total_gb) }; let ratio = if total_gb > 0.0 { @@ -65,7 +65,10 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = MemoryModule.run(&config, &state, &[]).await.unwrap(); + let output = MemoryModule + .run(&config, &state.receivers, &[]) + .await + .unwrap(); assert!(output.text.contains("8.00")); assert!(output.text.contains("32.00")); assert_eq!(output.class.as_deref(), Some("normal")); @@ -82,7 +85,10 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = MemoryModule.run(&config, &state, &[]).await.unwrap(); + let output = MemoryModule + .run(&config, &state.receivers, &[]) + .await + .unwrap(); assert_eq!(output.class.as_deref(), Some("high")); // 81% } @@ -96,7 +102,10 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = MemoryModule.run(&config, &state, &[]).await.unwrap(); + let output = MemoryModule + .run(&config, &state.receivers, &[]) + .await + .unwrap(); assert_eq!(output.class.as_deref(), Some("normal")); assert_eq!(output.percentage, Some(0)); } diff --git a/src/modules/mod.rs b/src/modules/mod.rs index 56ab6ca..d308e7f 100644 --- a/src/modules/mod.rs +++ b/src/modules/mod.rs @@ -14,13 +14,13 @@ pub mod sys; use crate::config::Config; use crate::error::Result as FluxoResult; use crate::output::WaybarOutput; -use crate::state::SharedState; +use crate::state::AppReceivers; pub trait WaybarModule { fn run( &self, config: &Config, - state: &SharedState, + state: &AppReceivers, args: &[&str], ) -> impl std::future::Future> + Send; } diff --git a/src/modules/network.rs b/src/modules/network.rs index 77b7ae4..2cc443f 100644 --- a/src/modules/network.rs +++ b/src/modules/network.rs @@ -2,11 +2,12 @@ use crate::config::Config; use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; -use crate::state::SharedState; +use crate::state::{AppReceivers, NetworkState}; use crate::utils::{TokenValue, format_template}; use nix::ifaddrs::getifaddrs; use std::fs; use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::sync::watch; pub struct NetworkModule; @@ -29,15 +30,23 @@ impl NetworkDaemon { } } - pub async fn poll(&mut self, state: SharedState) { - // Re-detect interface on every poll to catch VPNs or route changes immediately - if let Ok(iface) = get_primary_interface() - && !iface.is_empty() - { - // If the interface changed, or we don't have an IP yet, update the IP + pub async fn poll(&mut self, state_tx: &watch::Sender) { + let (iface, ip_opt, bytes_opt) = tokio::task::spawn_blocking(|| { + let iface = get_primary_interface().unwrap_or_default(); + if iface.is_empty() { + return (String::new(), None, None); + } + let ip = get_ip_address(&iface); + let bytes = get_bytes(&iface).ok(); + (iface, ip, bytes) + }) + .await + .unwrap_or((String::new(), None, None)); + + if !iface.is_empty() { if self.cached_interface.as_ref() != Some(&iface) || self.cached_ip.is_none() { - self.cached_ip = get_ip_address(&iface); - self.cached_interface = Some(iface); + self.cached_ip = ip_opt; + self.cached_interface = Some(iface.clone()); } } else { self.cached_interface = None; @@ -50,7 +59,7 @@ impl NetworkDaemon { .unwrap_or_default() .as_secs(); - if let Ok((rx_bytes_now, tx_bytes_now)) = get_bytes(interface) { + if let Some((rx_bytes_now, tx_bytes_now)) = bytes_opt { if self.last_time > 0 && time_now > self.last_time { let time_diff = (time_now - self.last_time) as f64; let rx_mbps = (rx_bytes_now.saturating_sub(self.last_rx_bytes)) as f64 @@ -62,16 +71,18 @@ impl NetworkDaemon { / 1024.0 / 1024.0; - let mut state_lock = state.write().await; - state_lock.network.rx_mbps = rx_mbps; - state_lock.network.tx_mbps = tx_mbps; - state_lock.network.interface = interface.clone(); - state_lock.network.ip = self.cached_ip.clone().unwrap_or_default(); + let mut network = state_tx.borrow().clone(); + network.rx_mbps = rx_mbps; + network.tx_mbps = tx_mbps; + network.interface = interface.clone(); + network.ip = self.cached_ip.clone().unwrap_or_default(); + let _ = state_tx.send(network); } else { // First poll: no speed data yet, but update interface/ip - let mut state_lock = state.write().await; - state_lock.network.interface = interface.clone(); - state_lock.network.ip = self.cached_ip.clone().unwrap_or_default(); + let mut network = state_tx.borrow().clone(); + network.interface = interface.clone(); + network.ip = self.cached_ip.clone().unwrap_or_default(); + let _ = state_tx.send(network); } self.last_time = time_now; @@ -83,9 +94,10 @@ impl NetworkDaemon { } } else { // No interface detected - let mut state_lock = state.write().await; - state_lock.network.interface.clear(); - state_lock.network.ip.clear(); + let mut network = state_tx.borrow().clone(); + network.interface.clear(); + network.ip.clear(); + let _ = state_tx.send(network); } } } @@ -94,17 +106,12 @@ impl WaybarModule for NetworkModule { async fn run( &self, config: &Config, - state: &SharedState, + state: &AppReceivers, _args: &[&str], ) -> Result { let (interface, ip, rx_mbps, tx_mbps) = { - let s = state.read().await; - ( - s.network.interface.clone(), - s.network.ip.clone(), - s.network.rx_mbps, - s.network.tx_mbps, - ) + let s = state.network.borrow(); + (s.interface.clone(), s.ip.clone(), s.rx_mbps, s.tx_mbps) }; if interface.is_empty() { @@ -148,24 +155,26 @@ impl WaybarModule for NetworkModule { } fn get_primary_interface() -> Result { - let content = fs::read_to_string("/proc/net/route")?; + let content = std::fs::read_to_string("/proc/net/route")?; let mut defaults = Vec::new(); for line in content.lines().skip(1) { let parts: Vec<&str> = line.split_whitespace().collect(); - if parts.len() >= 7 { + if parts.len() >= 8 { let iface = parts[0]; let dest = parts[1]; let metric = parts[6].parse::().unwrap_or(0); + let mask = u32::from_str_radix(parts[7], 16).unwrap_or(0); if dest == "00000000" { - defaults.push((metric, iface.to_string())); + defaults.push((mask, metric, iface.to_string())); } } } - defaults.sort_by_key(|k| k.0); - if let Some((_, dev)) = defaults.first() { + // Sort by mask descending (longest prefix match first), then by metric ascending + defaults.sort_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1))); + if let Some((_, _, dev)) = defaults.first() { Ok(dev.clone()) } else { Ok(String::new()) @@ -212,7 +221,10 @@ mod tests { async fn test_network_no_connection() { let state = mock_state(AppState::default()); let config = Config::default(); - let output = NetworkModule.run(&config, &state, &[]).await.unwrap(); + let output = NetworkModule + .run(&config, &state.receivers, &[]) + .await + .unwrap(); assert_eq!(output.text, "No connection"); } @@ -228,7 +240,10 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = NetworkModule.run(&config, &state, &[]).await.unwrap(); + let output = NetworkModule + .run(&config, &state.receivers, &[]) + .await + .unwrap(); assert!(output.text.contains("eth0")); assert!(output.text.contains("192.168.1.100")); assert!(output.text.contains("1.50")); @@ -247,7 +262,10 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = NetworkModule.run(&config, &state, &[]).await.unwrap(); + let output = NetworkModule + .run(&config, &state.receivers, &[]) + .await + .unwrap(); assert!(output.text.starts_with(" ")); } @@ -263,7 +281,10 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = NetworkModule.run(&config, &state, &[]).await.unwrap(); + let output = NetworkModule + .run(&config, &state.receivers, &[]) + .await + .unwrap(); assert!(output.text.contains("No IP")); } } diff --git a/src/modules/power.rs b/src/modules/power.rs index dd0171d..af2524a 100644 --- a/src/modules/power.rs +++ b/src/modules/power.rs @@ -2,7 +2,7 @@ use crate::config::Config; use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; -use crate::state::SharedState; +use crate::state::AppReceivers; use crate::utils::{TokenValue, format_template}; use std::fs; @@ -12,7 +12,7 @@ impl WaybarModule for PowerModule { async fn run( &self, config: &Config, - _state: &SharedState, + _state: &AppReceivers, _args: &[&str], ) -> Result { let critical_threshold = 15; diff --git a/src/modules/sys.rs b/src/modules/sys.rs index dee9870..ad0866b 100644 --- a/src/modules/sys.rs +++ b/src/modules/sys.rs @@ -2,7 +2,7 @@ use crate::config::Config; use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; -use crate::state::SharedState; +use crate::state::AppReceivers; use crate::utils::{TokenValue, format_template}; pub struct SysModule; @@ -11,17 +11,17 @@ impl WaybarModule for SysModule { async fn run( &self, config: &Config, - state: &SharedState, + state: &AppReceivers, _args: &[&str], ) -> Result { let (load1, load5, load15, uptime_secs, process_count) = { - let state_lock = state.read().await; + let state_lock = state.sys.borrow(); ( - state_lock.sys.load_1, - state_lock.sys.load_5, - state_lock.sys.load_15, - state_lock.sys.uptime, - state_lock.sys.process_count, + state_lock.load_1, + state_lock.load_5, + state_lock.load_15, + state_lock.uptime, + state_lock.process_count, ) }; diff --git a/src/state.rs b/src/state.rs index d3e2fad..d65824a 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,19 +1,19 @@ use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, watch}; use tokio::time::Instant; -#[derive(Default, Clone)] -pub struct AppState { - pub network: NetworkState, - pub cpu: CpuState, - pub memory: MemoryState, - pub sys: SysState, - pub gpu: GpuState, - pub disks: Vec, - pub bluetooth: BtState, - pub audio: AudioState, - pub health: HashMap, +#[derive(Clone)] +pub struct AppReceivers { + pub network: watch::Receiver, + pub cpu: watch::Receiver, + pub memory: watch::Receiver, + pub sys: watch::Receiver, + pub gpu: watch::Receiver, + pub disks: watch::Receiver>, + pub bluetooth: watch::Receiver, + pub audio: watch::Receiver, + pub health: Arc>>, } #[derive(Clone, Default)] @@ -128,9 +128,64 @@ impl Default for GpuState { } } -pub type SharedState = Arc>; +#[cfg(test)] +pub struct MockState { + pub receivers: AppReceivers, + // Keep senders alive so receivers don't return Closed errors + _net_tx: watch::Sender, + _cpu_tx: watch::Sender, + _mem_tx: watch::Sender, + _sys_tx: watch::Sender, + _gpu_tx: watch::Sender, + _disks_tx: watch::Sender>, + _bt_tx: watch::Sender, + _audio_tx: watch::Sender, +} #[cfg(test)] -pub fn mock_state(state: AppState) -> SharedState { - Arc::new(RwLock::new(state)) +#[derive(Default, Clone)] +pub struct AppState { + pub network: NetworkState, + pub cpu: CpuState, + pub memory: MemoryState, + pub sys: SysState, + pub gpu: GpuState, + pub disks: Vec, + pub bluetooth: BtState, + pub audio: AudioState, + pub health: HashMap, +} + +#[cfg(test)] +pub fn mock_state(state: AppState) -> MockState { + let (net_tx, net_rx) = watch::channel(state.network); + let (cpu_tx, cpu_rx) = watch::channel(state.cpu); + let (mem_tx, mem_rx) = watch::channel(state.memory); + let (sys_tx, sys_rx) = watch::channel(state.sys); + let (gpu_tx, gpu_rx) = watch::channel(state.gpu); + let (disks_tx, disks_rx) = watch::channel(state.disks); + let (bt_tx, bt_rx) = watch::channel(state.bluetooth); + let (audio_tx, audio_rx) = watch::channel(state.audio); + + MockState { + receivers: AppReceivers { + network: net_rx, + cpu: cpu_rx, + memory: mem_rx, + sys: sys_rx, + gpu: gpu_rx, + disks: disks_rx, + bluetooth: bt_rx, + audio: audio_rx, + health: Arc::new(RwLock::new(state.health)), + }, + _net_tx: net_tx, + _cpu_tx: cpu_tx, + _mem_tx: mem_tx, + _sys_tx: sys_tx, + _gpu_tx: gpu_tx, + _disks_tx: disks_tx, + _bt_tx: bt_tx, + _audio_tx: audio_tx, + } }