From ed0051f2c904bc2944f676268d44831a91439bfa Mon Sep 17 00:00:00 2001 From: Nils Pukropp Date: Wed, 1 Apr 2026 16:49:03 +0200 Subject: [PATCH 1/5] migrated daemon to tokio with seperate hardware threads + thiserror --- Cargo.lock | 60 +++++-- Cargo.toml | 9 +- src/daemon.rs | 341 ++++++++++++++++++++++++---------------- src/error.rs | 31 ++++ src/main.rs | 8 +- src/modules/audio.rs | 35 +++-- src/modules/bt.rs | 28 ++-- src/modules/btrfs.rs | 14 +- src/modules/cpu.rs | 42 ++--- src/modules/disk.rs | 37 +++-- src/modules/game.rs | 10 +- src/modules/gpu.rs | 40 ++--- src/modules/hardware.rs | 40 +++-- src/modules/memory.rs | 34 ++-- src/modules/mod.rs | 9 +- src/modules/network.rs | 62 ++++---- src/modules/power.rs | 9 +- src/modules/sys.rs | 28 ++-- src/state.rs | 13 +- 19 files changed, 517 insertions(+), 333 deletions(-) create mode 100644 src/error.rs diff --git a/Cargo.lock b/Cargo.lock index 7bc0971..f8ead7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -362,12 +362,13 @@ dependencies = [ "futures", "libpulse-binding", "maestro", - "nix 0.29.0", + "nix 0.31.2", "regex", "serde", "serde_json", "sysinfo", "tempfile", + "thiserror", "tokio", "tokio-util", "toml", @@ -584,9 +585,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "libc" -version = "0.2.183" +version = "0.2.184" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" +checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af" [[package]] name = "libdbus-sys" @@ -710,7 +711,6 @@ dependencies = [ "cfg-if", "cfg_aliases", "libc", - "memoffset", ] [[package]] @@ -723,6 +723,7 @@ dependencies = [ "cfg-if", "cfg_aliases", "libc", + "memoffset", ] [[package]] @@ -1067,9 +1068,9 @@ dependencies = [ [[package]] name = "serde_spanned" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "876ac351060d4f882bb1032b6369eb0aef79ad9df1ea8bc404874d8cc3d0cd98" +checksum = "6662b5879511e06e8999a8a235d848113e942c9124f211511b16466ee2995f26" dependencies = [ "serde_core", ] @@ -1083,6 +1084,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "slab" version = "0.4.12" @@ -1182,6 +1193,26 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "thiserror" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.9" @@ -1201,6 +1232,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", @@ -1243,9 +1275,9 @@ dependencies = [ [[package]] name = "toml" -version = "1.1.0+spec-1.1.0" +version = "1.1.1+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8195ca05e4eb728f4ba94f3e3291661320af739c4e43779cbdfae82ab239fcc" +checksum = "994b95d9e7bae62b34bab0e2a4510b801fa466066a6a8b2b57361fa1eba068ee" dependencies = [ "indexmap", "serde_core", @@ -1258,9 +1290,9 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "1.1.0+spec-1.1.0" +version = "1.1.1+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97251a7c317e03ad83774a8752a7e81fb6067740609f75ea2b585b569a59198f" +checksum = "3165f65f62e28e0115a00b2ebdd37eb6f3b641855f9d636d3cd4103767159ad7" dependencies = [ "serde_core", ] @@ -1279,18 +1311,18 @@ dependencies = [ [[package]] name = "toml_parser" -version = "1.1.0+spec-1.1.0" +version = "1.1.1+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2334f11ee363607eb04df9b8fc8a13ca1715a72ba8662a26ac285c98aabb4011" +checksum = "39ca317ebc49f06bd748bfba29533eac9485569dc9bf80b849024b025e814fb9" dependencies = [ "winnow", ] [[package]] name = "toml_writer" -version = "1.1.0+spec-1.1.0" +version = "1.1.1+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d282ade6016312faf3e41e57ebbba0c073e4056dab1232ab1cb624199648f8ed" +checksum = "756daf9b1013ebe47a8776667b466417e2d4c5679d441c26230efd9ef78692db" [[package]] name = "tracing" diff --git a/Cargo.toml b/Cargo.toml index 0649e06..c063d4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,16 +11,17 @@ regex = "1.12" serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.149" sysinfo = "0.38.4" -toml = "1.0.6" +thiserror = "2.0" +toml = "1.1.1" tracing = "0.1.44" tracing-subscriber = { version = "0.3.23", features = ["env-filter"] } maestro = { git = "https://github.com/qzed/pbpctrl", package = "maestro" } bluer = { version = "0.17", features = ["bluetoothd", "rfcomm", "id"] } -tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "time", "macros"] } +tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "time", "macros", "signal"] } tokio-util = { version = "0.7", features = ["codec"] } futures = "0.3" -libpulse-binding = "2.26" -nix = { version = "0.29", features = ["net"] } +libpulse-binding = "2.30" +nix = { version = "0.31", features = ["net"] } [dev-dependencies] diff --git a/src/daemon.rs b/src/daemon.rs index 64957e0..3a4195f 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1,4 +1,5 @@ use crate::config::Config; +use crate::error::FluxoError; use crate::ipc::socket_path; use crate::modules::WaybarModule; use crate::modules::audio::AudioDaemon; @@ -8,14 +9,12 @@ use crate::modules::network::NetworkDaemon; use crate::state::{AppState, SharedState}; use anyhow::Result; use std::fs; -use std::io::{BufRead, BufReader, Write}; -use std::net::Shutdown; -use std::os::unix::net::UnixListener; use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; -use std::thread; -use std::time::Duration; +use std::sync::Arc; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::UnixListener; +use tokio::sync::RwLock; +use tokio::time::{Duration, Instant, sleep}; use tracing::{debug, error, info, warn}; struct SocketGuard { @@ -29,7 +28,7 @@ impl Drop for SocketGuard { } } -pub fn run_daemon(config_path: Option) -> Result<()> { +pub async fn run_daemon(config_path: Option) -> Result<()> { let sock_path = socket_path(); if fs::metadata(&sock_path).is_ok() { @@ -43,134 +42,126 @@ pub fn run_daemon(config_path: Option) -> Result<()> { path: sock_path.clone(), }; - // Signal handling: set flag so main loop exits cleanly - let running = Arc::new(AtomicBool::new(true)); - let running_clone = Arc::clone(&running); - ctrlc::set_handler(move || { + // Signal handling for graceful shutdown in async context + let running = Arc::new(tokio::sync::Notify::new()); + let r_clone = Arc::clone(&running); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); info!("Received shutdown signal, exiting..."); - running_clone.store(false, Ordering::SeqCst); - })?; + r_clone.notify_waiters(); + }); - // We store the original config_path to allow proper reloading later let config_path_clone = config_path.clone(); let config = Arc::new(RwLock::new(crate::config::load_config(config_path))); - // 1. Network Thread + // 1. Network Task let poll_state = Arc::clone(&state); - let poll_running = Arc::clone(&running); - thread::spawn(move || { - info!("Starting Network polling thread"); + tokio::spawn(async move { + info!("Starting Network polling task"); let mut daemon = NetworkDaemon::new(); - while poll_running.load(Ordering::SeqCst) { - daemon.poll(Arc::clone(&poll_state)); - thread::sleep(Duration::from_secs(1)); + loop { + daemon.poll(Arc::clone(&poll_state)).await; + sleep(Duration::from_secs(1)).await; } }); - // 2. Fast Hardware Thread (CPU, Mem, Load) + // 2. Fast Hardware Task (CPU, Mem, Load) let poll_state = Arc::clone(&state); - let poll_running = Arc::clone(&running); - thread::spawn(move || { - info!("Starting Fast Hardware polling thread"); + tokio::spawn(async move { + info!("Starting Fast Hardware polling task"); let mut daemon = HardwareDaemon::new(); - while poll_running.load(Ordering::SeqCst) { - daemon.poll_fast(Arc::clone(&poll_state)); - thread::sleep(Duration::from_secs(1)); + loop { + daemon.poll_fast(Arc::clone(&poll_state)).await; + sleep(Duration::from_secs(1)).await; } }); - // 3. Slow Hardware Thread (GPU, Disks) + // 3. Slow Hardware Task (GPU, Disks) let poll_state = Arc::clone(&state); - let poll_running = Arc::clone(&running); - thread::spawn(move || { - info!("Starting Slow Hardware polling thread"); + tokio::spawn(async move { + info!("Starting Slow Hardware polling task"); let mut daemon = HardwareDaemon::new(); - while poll_running.load(Ordering::SeqCst) { - daemon.poll_slow(Arc::clone(&poll_state)); - thread::sleep(Duration::from_secs(1)); + loop { + daemon.poll_slow(Arc::clone(&poll_state)).await; + sleep(Duration::from_secs(1)).await; } }); - // 4. Bluetooth Thread + // 4. Bluetooth Task let poll_state = Arc::clone(&state); - let poll_running = Arc::clone(&running); - thread::spawn(move || { - info!("Starting Bluetooth polling thread"); + tokio::spawn(async move { + info!("Starting Bluetooth polling task"); let mut daemon = BtDaemon::new(); - while poll_running.load(Ordering::SeqCst) { - daemon.poll(Arc::clone(&poll_state)); - thread::sleep(Duration::from_secs(1)); + loop { + daemon.poll(Arc::clone(&poll_state)).await; + sleep(Duration::from_secs(1)).await; } }); - // 5. Audio Thread (Event driven) + // 5. Audio Thread (Event driven - pulse usually needs its own thread) let audio_daemon = AudioDaemon::new(); audio_daemon.start(Arc::clone(&state)); info!("Fluxo daemon successfully bound to socket: {}", sock_path); - // Use non-blocking accept so we can check the running flag - listener.set_nonblocking(true)?; - - while running.load(Ordering::SeqCst) { - match listener.accept() { - Ok((mut stream, _)) => { - let state_clone = Arc::clone(&state); - let config_clone = Arc::clone(&config); - let cp_clone = config_path_clone.clone(); - thread::spawn(move || { - let mut reader = BufReader::new(&stream); - let mut request = String::new(); - if let Err(e) = reader.read_line(&mut request) { - error!("Failed to read from IPC stream: {}", e); - return; - } - drop(reader); - - let request = request.trim(); - if request.is_empty() { - return; - } - - let parts: Vec<&str> = request.split_whitespace().collect(); - if let Some(module_name) = parts.first() { - if *module_name == "reload" { - info!("Reloading configuration..."); - let new_config = crate::config::load_config(cp_clone); - if let Ok(mut config_lock) = config_clone.write() { - *config_lock = new_config; - let _ = stream.write_all(b"{\"text\":\"ok\"}"); - info!("Configuration reloaded successfully."); - } else { - error!("Failed to acquire write lock for configuration reload."); - } - let _ = stream.shutdown(Shutdown::Write); - return; - } - - debug!(module = module_name, args = ?&parts[1..], "Handling IPC request"); - let response = - handle_request(module_name, &parts[1..], &state_clone, &config_clone); - if let Err(e) = stream.write_all(response.as_bytes()) { - if e.kind() == std::io::ErrorKind::BrokenPipe - || e.kind() == std::io::ErrorKind::ConnectionReset - { - debug!( - "IPC client disconnected before response could be sent: {}", - e - ); - } else { - error!("Failed to write IPC response: {}", e); - } - } - let _ = stream.shutdown(Shutdown::Write); - } - }); + loop { + tokio::select! { + _ = running.notified() => { + break; } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - thread::sleep(Duration::from_millis(50)); + res = listener.accept() => { + match res { + Ok((mut stream, _)) => { + let state_clone = Arc::clone(&state); + let config_clone = Arc::clone(&config); + let cp_clone = config_path_clone.clone(); + tokio::spawn(async move { + let (reader, mut writer) = stream.split(); + let mut reader = BufReader::new(reader); + let mut request = String::new(); + if let Err(e) = reader.read_line(&mut request).await { + error!("Failed to read from IPC stream: {}", e); + return; + } + + let request = request.trim(); + if request.is_empty() { + return; + } + + let parts: Vec<&str> = request.split_whitespace().collect(); + if let Some(module_name) = parts.first() { + if *module_name == "reload" { + info!("Reloading configuration..."); + let new_config = crate::config::load_config(cp_clone); + let mut config_lock = config_clone.write().await; + *config_lock = new_config; + let _ = writer.write_all(b"{\"text\":\"ok\"}").await; + info!("Configuration reloaded successfully."); + return; + } + + debug!(module = module_name, args = ?&parts[1..], "Handling IPC request"); + let response = + handle_request(module_name, &parts[1..], &state_clone, &config_clone).await; + if let Err(e) = writer.write_all(response.as_bytes()).await { + if e.kind() == std::io::ErrorKind::BrokenPipe + || e.kind() == std::io::ErrorKind::ConnectionReset + { + debug!( + "IPC client disconnected before response could be sent: {}", + e + ); + } else { + error!("Failed to write IPC response: {}", e); + } + } + } + }); + } + Err(e) => error!("Failed to accept incoming connection: {}", e), + } } - Err(e) => error!("Failed to accept incoming connection: {}", e), } } @@ -178,53 +169,127 @@ pub fn run_daemon(config_path: Option) -> Result<()> { Ok(()) } -fn handle_request( +async fn handle_request( module_name: &str, args: &[&str], state: &SharedState, config_lock: &Arc>, ) -> String { - let config = if let Ok(c) = config_lock.read() { - c - } else { - error!("Failed to acquire read lock for configuration."); - return "{\"text\":\"error: config lock failed\"}".to_string(); - }; - - let result = match module_name { - "net" | "network" => crate::modules::network::NetworkModule.run(&config, state, args), - "cpu" => crate::modules::cpu::CpuModule.run(&config, state, args), - "mem" | "memory" => crate::modules::memory::MemoryModule.run(&config, state, args), - "disk" => crate::modules::disk::DiskModule.run(&config, state, args), - "pool" | "btrfs" => crate::modules::btrfs::BtrfsModule.run(&config, state, args), - "vol" => crate::modules::audio::AudioModule.run( - &config, - state, - &["sink", args.first().unwrap_or(&"show")], - ), - "mic" => crate::modules::audio::AudioModule.run( - &config, - state, - &["source", args.first().unwrap_or(&"show")], - ), - "gpu" => crate::modules::gpu::GpuModule.run(&config, state, args), - "sys" => crate::modules::sys::SysModule.run(&config, state, args), - "bt" | "bluetooth" => crate::modules::bt::BtModule.run(&config, state, args), - "power" => crate::modules::power::PowerModule.run(&config, state, args), - "game" => crate::modules::game::GameModule.run(&config, state, args), - _ => { - warn!("Received request for unknown module: '{}'", module_name); - Err(anyhow::anyhow!("Unknown module: {}", module_name)) + // 1. Check Circuit Breaker status + let is_in_backoff = { + let lock = state.read().await; + if let Some(health) = lock.health.get(module_name) { + if let Some(until) = health.backoff_until { + Instant::now() < until + } else { + false + } + } else { + false } }; + if is_in_backoff { + return format!( + "{{\"text\":\"\u{200B}Cooling down ({})\u{200B}\",\"class\":\"error\"}}", + module_name + ); + } + + let config = config_lock.read().await; + + let result = match module_name { + "net" | "network" => { + crate::modules::network::NetworkModule + .run(&config, state, args) + .await + } + "cpu" => { + crate::modules::cpu::CpuModule + .run(&config, state, args) + .await + } + "mem" | "memory" => { + crate::modules::memory::MemoryModule + .run(&config, state, args) + .await + } + "disk" => { + crate::modules::disk::DiskModule + .run(&config, state, args) + .await + } + "pool" | "btrfs" => { + crate::modules::btrfs::BtrfsModule + .run(&config, state, args) + .await + } + "vol" => { + crate::modules::audio::AudioModule + .run(&config, state, &["sink", args.first().unwrap_or(&"show")]) + .await + } + "mic" => { + crate::modules::audio::AudioModule + .run(&config, state, &["source", args.first().unwrap_or(&"show")]) + .await + } + "gpu" => { + crate::modules::gpu::GpuModule + .run(&config, state, args) + .await + } + "sys" => { + crate::modules::sys::SysModule + .run(&config, state, args) + .await + } + "bt" | "bluetooth" => crate::modules::bt::BtModule.run(&config, state, args).await, + "power" => { + crate::modules::power::PowerModule + .run(&config, state, args) + .await + } + "game" => { + crate::modules::game::GameModule + .run(&config, state, args) + .await + } + _ => { + warn!("Received request for unknown module: '{}'", module_name); + Err(FluxoError::Ipc(format!("Unknown module: {}", module_name))) + } + }; + + // 2. Update Health based on result + { + let mut lock = state.write().await; + let health = lock.health.entry(module_name.to_string()).or_default(); + match &result { + Ok(_) => { + health.consecutive_failures = 0; + health.backoff_until = None; + } + Err(e) => { + health.consecutive_failures += 1; + health.last_failure = Some(Instant::now()); + if health.consecutive_failures >= 3 { + // Backoff for 30 seconds after 3 failures + health.backoff_until = Some(Instant::now() + Duration::from_secs(30)); + warn!(module = module_name, error = %e, "Module entered backoff state due to repeated failures"); + } + } + } + } + match result { Ok(output) => serde_json::to_string(&output).unwrap_or_else(|_| "{}".to_string()), Err(e) => { - error!(module = module_name, error = %e, "Module execution failed"); + let error_msg = e.to_string(); + error!(module = module_name, error = %error_msg, "Module execution failed"); let err_out = crate::output::WaybarOutput { text: "\u{200B}Error\u{200B}".to_string(), - tooltip: Some(e.to_string()), + tooltip: Some(error_msg), class: Some("error".to_string()), percentage: None, }; diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..1496ac4 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,31 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +#[allow(dead_code)] +pub enum FluxoError { + #[error("Configuration error: {0}")] + Config(String), + + #[error("Module error ({module}): {message}")] + Module { + module: &'static str, + message: String, + }, + + #[error("Daemon IPC error: {0}")] + Ipc(String), + + #[error("External system error: {0}")] + System(String), + + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("Serialization error: {0}")] + Serialization(#[from] serde_json::Error), + + #[error("Other error: {0}")] + Other(#[from] anyhow::Error), +} + +pub type Result = std::result::Result; diff --git a/src/main.rs b/src/main.rs index 24bd5d4..c1f5dd2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ mod config; mod daemon; +mod error; mod ipc; mod modules; mod output; @@ -88,7 +89,12 @@ fn main() { match &cli.command { Commands::Daemon { config } => { info!("Starting Fluxo daemon..."); - if let Err(e) = daemon::run_daemon(config.clone()) { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + if let Err(e) = rt.block_on(daemon::run_daemon(config.clone())) { error!("Daemon failed: {}", e); process::exit(1); } diff --git a/src/modules/audio.rs b/src/modules/audio.rs index 795525d..85fe256 100644 --- a/src/modules/audio.rs +++ b/src/modules/audio.rs @@ -1,18 +1,25 @@ use crate::config::Config; +use crate::error::{FluxoError, Result}; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::{Result, anyhow}; use libpulse_binding::callbacks::ListResult; use libpulse_binding::context::subscribe::{Facility, InterestMaskSet}; use libpulse_binding::context::{Context, FlagSet as ContextFlag}; use libpulse_binding::mainloop::threaded::Mainloop as ThreadedMainloop; use libpulse_binding::volume::Volume; use std::process::Command; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use tracing::error; +static RUNTIME: LazyLock = LazyLock::new(|| { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create Audio tokio runtime") +}); + pub struct AudioDaemon; impl AudioDaemon { @@ -100,7 +107,7 @@ fn fetch_audio_data_sync(context: &mut Context, state: &SharedState) -> Result<( let st_server = Arc::clone(&state_inner); context.introspect().get_server_info(move |info| { - let mut lock = st_server.write().unwrap(); + let mut lock = RUNTIME.block_on(st_server.write()); lock.audio.sink.name = info .default_sink_name .as_ref() @@ -116,7 +123,7 @@ fn fetch_audio_data_sync(context: &mut Context, state: &SharedState) -> Result<( let st_sink = Arc::clone(&state_inner); context.introspect().get_sink_info_list(move |res| { if let ListResult::Item(item) = res { - let mut lock = st_sink.write().unwrap(); + let mut lock = RUNTIME.block_on(st_sink.write()); // If this matches our default sink name, or if we don't have details for any yet let is_default = item .name @@ -139,7 +146,7 @@ fn fetch_audio_data_sync(context: &mut Context, state: &SharedState) -> Result<( let st_source = Arc::clone(&state_inner); context.introspect().get_source_info_list(move |res| { if let ListResult::Item(item) = res { - let mut lock = st_source.write().unwrap(); + let mut lock = RUNTIME.block_on(st_source.write()); let is_default = item .name .as_ref() @@ -164,7 +171,12 @@ fn fetch_audio_data_sync(context: &mut Context, state: &SharedState) -> Result<( pub struct AudioModule; impl WaybarModule for AudioModule { - fn run(&self, config: &Config, state: &SharedState, args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + state: &SharedState, + args: &[&str], + ) -> Result { let target_type = args.first().unwrap_or(&"sink"); let action = args.get(1).unwrap_or(&"show"); @@ -173,21 +185,24 @@ impl WaybarModule for AudioModule { self.cycle_device(target_type)?; Ok(WaybarOutput::default()) } - "show" => self.get_status(config, state, target_type), - other => Err(anyhow!("Unknown audio action: '{}'", other)), + "show" => self.get_status(config, state, target_type).await, + other => Err(FluxoError::Module { + module: "audio", + message: format!("Unknown audio action: '{}'", other), + }), } } } impl AudioModule { - fn get_status( + async fn get_status( &self, config: &Config, state: &SharedState, target_type: &str, ) -> Result { let audio_state = { - let lock = state.read().unwrap(); + let lock = state.read().await; lock.audio.clone() }; diff --git a/src/modules/bt.rs b/src/modules/bt.rs index 63f9a40..c74213d 100644 --- a/src/modules/bt.rs +++ b/src/modules/bt.rs @@ -1,4 +1,5 @@ use crate::config::Config; +use crate::error::Result as FluxoResult; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::{BtState, SharedState}; @@ -8,8 +9,8 @@ use futures::StreamExt; use std::collections::HashMap; use std::process::Command; use std::sync::{Arc, LazyLock, Mutex}; -use std::time::{Duration, Instant}; use tokio::sync::mpsc; +use tokio::time::{Duration, Instant}; use tracing::{debug, error, info, warn}; // Maestro imports @@ -22,13 +23,6 @@ use maestro::service::MaestroService; #[allow(unused_imports)] use maestro::service::settings::{self, Setting, SettingValue}; -static RUNTIME: LazyLock = LazyLock::new(|| { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("Failed to create BT tokio runtime") -}); - #[derive(Clone, Default)] struct BudsStatus { left_battery: Option, @@ -319,8 +313,8 @@ impl BtDaemon { Self { session: None } } - pub fn poll(&mut self, state: SharedState) { - if let Err(e) = RUNTIME.block_on(self.poll_async(state)) { + pub async fn poll(&mut self, state: SharedState) { + if let Err(e) = self.poll_async(state).await { error!("BT daemon error: {}", e); } } @@ -388,9 +382,8 @@ impl BtDaemon { } } - if let Ok(mut lock) = state.write() { - lock.bluetooth = bt_state; - } + let mut lock = state.write().await; + lock.bluetooth = bt_state; Ok(()) } @@ -486,10 +479,15 @@ static PLUGINS: LazyLock>> = pub struct BtModule; impl WaybarModule for BtModule { - fn run(&self, config: &Config, state: &SharedState, args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + state: &SharedState, + args: &[&str], + ) -> FluxoResult { let action = args.first().unwrap_or(&"show"); let bt_state = { - let lock = state.read().unwrap(); + let lock = state.read().await; lock.bluetooth.clone() }; diff --git a/src/modules/btrfs.rs b/src/modules/btrfs.rs index 801b710..65340e7 100644 --- a/src/modules/btrfs.rs +++ b/src/modules/btrfs.rs @@ -1,18 +1,22 @@ use crate::config::Config; +use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::Result; pub struct BtrfsModule; impl WaybarModule for BtrfsModule { - fn run(&self, config: &Config, state: &SharedState, _args: &[&str]) -> Result { - let disks = if let Ok(s) = state.read() { + async fn run( + &self, + config: &Config, + state: &SharedState, + _args: &[&str], + ) -> Result { + let disks = { + let s = state.read().await; s.disks.clone() - } else { - return Err(anyhow::anyhow!("Failed to read state")); }; let mut total_used: f64 = 0.0; diff --git a/src/modules/cpu.rs b/src/modules/cpu.rs index f266d67..df0e3cb 100644 --- a/src/modules/cpu.rs +++ b/src/modules/cpu.rs @@ -1,24 +1,26 @@ use crate::config::Config; +use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::Result; pub struct CpuModule; impl WaybarModule for CpuModule { - fn run(&self, config: &Config, state: &SharedState, _args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + state: &SharedState, + _args: &[&str], + ) -> Result { let (usage, temp, model) = { - if let Ok(state_lock) = state.read() { - ( - state_lock.cpu.usage, - state_lock.cpu.temp, - state_lock.cpu.model.clone(), - ) - } else { - (0.0, 0.0, String::from("Unknown")) - } + let state_lock = state.read().await; + ( + state_lock.cpu.usage, + state_lock.cpu.temp, + state_lock.cpu.model.clone(), + ) }; let text = format_template( @@ -51,8 +53,8 @@ mod tests { use super::*; use crate::state::{AppState, CpuState, mock_state}; - #[test] - fn test_cpu_normal() { + #[tokio::test] + async fn test_cpu_normal() { let state = mock_state(AppState { cpu: CpuState { usage: 25.0, @@ -62,7 +64,7 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = CpuModule.run(&config, &state, &[]).unwrap(); + let output = CpuModule.run(&config, &state, &[]).await.unwrap(); assert!(output.text.contains("25.0")); assert!(output.text.contains("45.0")); assert_eq!(output.class.as_deref(), Some("normal")); @@ -70,8 +72,8 @@ mod tests { assert_eq!(output.tooltip.as_deref(), Some("Test CPU")); } - #[test] - fn test_cpu_high() { + #[tokio::test] + async fn test_cpu_high() { let state = mock_state(AppState { cpu: CpuState { usage: 80.0, @@ -81,12 +83,12 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = CpuModule.run(&config, &state, &[]).unwrap(); + let output = CpuModule.run(&config, &state, &[]).await.unwrap(); assert_eq!(output.class.as_deref(), Some("high")); } - #[test] - fn test_cpu_max() { + #[tokio::test] + async fn test_cpu_max() { let state = mock_state(AppState { cpu: CpuState { usage: 99.0, @@ -96,7 +98,7 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = CpuModule.run(&config, &state, &[]).unwrap(); + let output = CpuModule.run(&config, &state, &[]).await.unwrap(); assert_eq!(output.class.as_deref(), Some("max")); } } diff --git a/src/modules/disk.rs b/src/modules/disk.rs index 79543c7..aee30ff 100644 --- a/src/modules/disk.rs +++ b/src/modules/disk.rs @@ -1,20 +1,24 @@ use crate::config::Config; +use crate::error::{FluxoError, Result}; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::Result; pub struct DiskModule; impl WaybarModule for DiskModule { - fn run(&self, config: &Config, state: &SharedState, args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + state: &SharedState, + args: &[&str], + ) -> Result { let mountpoint = args.first().unwrap_or(&"/"); - let disks = if let Ok(s) = state.read() { + let disks = { + let s = state.read().await; s.disks.clone() - } else { - return Err(anyhow::anyhow!("Failed to read state")); }; for disk in &disks { @@ -62,7 +66,10 @@ impl WaybarModule for DiskModule { } } - Err(anyhow::anyhow!("Mountpoint {} not found", mountpoint)) + Err(FluxoError::Module { + module: "disk", + message: format!("Mountpoint {} not found", mountpoint), + }) } } @@ -83,30 +90,30 @@ mod tests { }) } - #[test] - fn test_disk_found() { + #[tokio::test] + async fn test_disk_found() { let gb = 1024 * 1024 * 1024; let state = state_with_disk("/", 100 * gb, 60 * gb); let config = Config::default(); - let output = DiskModule.run(&config, &state, &["/"]).unwrap(); + let output = DiskModule.run(&config, &state, &["/"]).await.unwrap(); assert_eq!(output.class.as_deref(), Some("normal")); assert_eq!(output.percentage, Some(40)); // 40% used } - #[test] - fn test_disk_high() { + #[tokio::test] + async fn test_disk_high() { let gb = 1024 * 1024 * 1024; let state = state_with_disk("/", 100 * gb, 15 * gb); let config = Config::default(); - let output = DiskModule.run(&config, &state, &["/"]).unwrap(); + let output = DiskModule.run(&config, &state, &["/"]).await.unwrap(); assert_eq!(output.class.as_deref(), Some("high")); // 85% used } - #[test] - fn test_disk_not_found() { + #[tokio::test] + async fn test_disk_not_found() { let state = mock_state(AppState::default()); let config = Config::default(); - let result = DiskModule.run(&config, &state, &["/nonexistent"]); + let result = DiskModule.run(&config, &state, &["/nonexistent"]).await; assert!(result.is_err()); } } diff --git a/src/modules/game.rs b/src/modules/game.rs index 6e126cf..1ff59f7 100644 --- a/src/modules/game.rs +++ b/src/modules/game.rs @@ -1,8 +1,9 @@ use crate::config::Config; +use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; -use anyhow::{Result, anyhow}; +use anyhow::anyhow; use std::env; use std::io::{Read, Write}; use std::os::unix::net::UnixStream; @@ -10,7 +11,12 @@ use std::os::unix::net::UnixStream; pub struct GameModule; impl WaybarModule for GameModule { - fn run(&self, config: &Config, _state: &SharedState, _args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + _state: &SharedState, + _args: &[&str], + ) -> Result { let is_gamemode = hyprland_ipc("j/getoption animations:enabled") .map(|stdout| stdout.contains("\"int\": 0")) .unwrap_or(false); diff --git a/src/modules/gpu.rs b/src/modules/gpu.rs index d66709e..b5e3e47 100644 --- a/src/modules/gpu.rs +++ b/src/modules/gpu.rs @@ -1,36 +1,30 @@ use crate::config::Config; +use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::Result; pub struct GpuModule; impl WaybarModule for GpuModule { - fn run(&self, config: &Config, state: &SharedState, _args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + state: &SharedState, + _args: &[&str], + ) -> Result { let (active, vendor, usage, vram_used, vram_total, temp, model) = { - if let Ok(state_lock) = state.read() { - ( - state_lock.gpu.active, - state_lock.gpu.vendor.clone(), - state_lock.gpu.usage, - state_lock.gpu.vram_used, - state_lock.gpu.vram_total, - state_lock.gpu.temp, - state_lock.gpu.model.clone(), - ) - } else { - ( - false, - String::from("Unknown"), - 0.0, - 0.0, - 0.0, - 0.0, - String::from("Unknown"), - ) - } + let state_lock = state.read().await; + ( + state_lock.gpu.active, + state_lock.gpu.vendor.clone(), + state_lock.gpu.usage, + state_lock.gpu.vram_used, + state_lock.gpu.vram_total, + state_lock.gpu.temp, + state_lock.gpu.model.clone(), + ) }; if !active { diff --git a/src/modules/hardware.rs b/src/modules/hardware.rs index ff1137c..c3bcda4 100644 --- a/src/modules/hardware.rs +++ b/src/modules/hardware.rs @@ -24,7 +24,7 @@ impl HardwareDaemon { } } - pub fn poll_fast(&mut self, state: SharedState) { + pub async fn poll_fast(&mut self, state: SharedState) { self.sys.refresh_cpu_usage(); self.sys.refresh_memory(); self.components.refresh(true); @@ -70,23 +70,22 @@ impl HardwareDaemon { } } - if let Ok(mut state_lock) = state.write() { - state_lock.cpu.usage = cpu_usage as f64; - state_lock.cpu.temp = cpu_temp; - state_lock.cpu.model = cpu_model; + let mut state_lock = state.write().await; + state_lock.cpu.usage = cpu_usage as f64; + state_lock.cpu.temp = cpu_temp; + state_lock.cpu.model = cpu_model; - state_lock.memory.total_gb = total_mem; - state_lock.memory.used_gb = used_mem; + state_lock.memory.total_gb = total_mem; + state_lock.memory.used_gb = used_mem; - state_lock.sys.load_1 = load_avg.one; - state_lock.sys.load_5 = load_avg.five; - state_lock.sys.load_15 = load_avg.fifteen; - state_lock.sys.uptime = uptime; - state_lock.sys.process_count = process_count; - } + state_lock.sys.load_1 = load_avg.one; + state_lock.sys.load_5 = load_avg.five; + state_lock.sys.load_15 = load_avg.fifteen; + state_lock.sys.uptime = uptime; + state_lock.sys.process_count = process_count; } - pub fn poll_slow(&mut self, state: SharedState) { + pub async fn poll_slow(&mut self, state: SharedState) { // 1. Gather GPU data outside of lock let mut gpu_state = crate::state::GpuState::default(); self.gpu_poll_counter = (self.gpu_poll_counter + 1) % 5; @@ -113,14 +112,13 @@ impl HardwareDaemon { } // 3. Apply to state - if let Ok(mut state_lock) = state.write() { - if should_poll_gpu { - state_lock.gpu = gpu_state; - } + let mut state_lock = state.write().await; + if should_poll_gpu { + state_lock.gpu = gpu_state; + } - if let Some(d) = disks_data { - state_lock.disks = d; - } + if let Some(d) = disks_data { + state_lock.disks = d; } } diff --git a/src/modules/memory.rs b/src/modules/memory.rs index 946d708..f2a9dfe 100644 --- a/src/modules/memory.rs +++ b/src/modules/memory.rs @@ -1,20 +1,22 @@ use crate::config::Config; +use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::Result; pub struct MemoryModule; impl WaybarModule for MemoryModule { - fn run(&self, config: &Config, state: &SharedState, _args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + state: &SharedState, + _args: &[&str], + ) -> Result { let (used_gb, total_gb) = { - if let Ok(state_lock) = state.read() { - (state_lock.memory.used_gb, state_lock.memory.total_gb) - } else { - (0.0, 0.0) - } + let state_lock = state.read().await; + (state_lock.memory.used_gb, state_lock.memory.total_gb) }; let ratio = if total_gb > 0.0 { @@ -53,8 +55,8 @@ mod tests { use super::*; use crate::state::{AppState, MemoryState, mock_state}; - #[test] - fn test_memory_normal() { + #[tokio::test] + async fn test_memory_normal() { let state = mock_state(AppState { memory: MemoryState { used_gb: 8.0, @@ -63,15 +65,15 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = MemoryModule.run(&config, &state, &[]).unwrap(); + let output = MemoryModule.run(&config, &state, &[]).await.unwrap(); assert!(output.text.contains("8.00")); assert!(output.text.contains("32.00")); assert_eq!(output.class.as_deref(), Some("normal")); assert_eq!(output.percentage, Some(25)); // 8/32 = 25% } - #[test] - fn test_memory_high() { + #[tokio::test] + async fn test_memory_high() { let state = mock_state(AppState { memory: MemoryState { used_gb: 26.0, @@ -80,12 +82,12 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = MemoryModule.run(&config, &state, &[]).unwrap(); + let output = MemoryModule.run(&config, &state, &[]).await.unwrap(); assert_eq!(output.class.as_deref(), Some("high")); // 81% } - #[test] - fn test_memory_zero_total() { + #[tokio::test] + async fn test_memory_zero_total() { let state = mock_state(AppState { memory: MemoryState { used_gb: 0.0, @@ -94,7 +96,7 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = MemoryModule.run(&config, &state, &[]).unwrap(); + let output = MemoryModule.run(&config, &state, &[]).await.unwrap(); assert_eq!(output.class.as_deref(), Some("normal")); assert_eq!(output.percentage, Some(0)); } diff --git a/src/modules/mod.rs b/src/modules/mod.rs index ec5a593..56ab6ca 100644 --- a/src/modules/mod.rs +++ b/src/modules/mod.rs @@ -12,10 +12,15 @@ pub mod power; pub mod sys; use crate::config::Config; +use crate::error::Result as FluxoResult; use crate::output::WaybarOutput; use crate::state::SharedState; -use anyhow::Result; pub trait WaybarModule { - fn run(&self, config: &Config, state: &SharedState, args: &[&str]) -> Result; + fn run( + &self, + config: &Config, + state: &SharedState, + args: &[&str], + ) -> impl std::future::Future> + Send; } diff --git a/src/modules/network.rs b/src/modules/network.rs index 876e628..77b7ae4 100644 --- a/src/modules/network.rs +++ b/src/modules/network.rs @@ -1,9 +1,9 @@ use crate::config::Config; +use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::Result; use nix::ifaddrs::getifaddrs; use std::fs; use std::time::{SystemTime, UNIX_EPOCH}; @@ -29,7 +29,7 @@ impl NetworkDaemon { } } - pub fn poll(&mut self, state: SharedState) { + pub async fn poll(&mut self, state: SharedState) { // Re-detect interface on every poll to catch VPNs or route changes immediately if let Ok(iface) = get_primary_interface() && !iface.is_empty() @@ -62,14 +62,14 @@ impl NetworkDaemon { / 1024.0 / 1024.0; - if let Ok(mut state_lock) = state.write() { - state_lock.network.rx_mbps = rx_mbps; - state_lock.network.tx_mbps = tx_mbps; - state_lock.network.interface = interface.clone(); - state_lock.network.ip = self.cached_ip.clone().unwrap_or_default(); - } - } else if let Ok(mut state_lock) = state.write() { + let mut state_lock = state.write().await; + state_lock.network.rx_mbps = rx_mbps; + state_lock.network.tx_mbps = tx_mbps; + state_lock.network.interface = interface.clone(); + state_lock.network.ip = self.cached_ip.clone().unwrap_or_default(); + } else { // First poll: no speed data yet, but update interface/ip + let mut state_lock = state.write().await; state_lock.network.interface = interface.clone(); state_lock.network.ip = self.cached_ip.clone().unwrap_or_default(); } @@ -81,8 +81,9 @@ impl NetworkDaemon { // Read failed, might be down self.cached_interface = None; } - } else if let Ok(mut state_lock) = state.write() { + } else { // No interface detected + let mut state_lock = state.write().await; state_lock.network.interface.clear(); state_lock.network.ip.clear(); } @@ -90,21 +91,20 @@ impl NetworkDaemon { } impl WaybarModule for NetworkModule { - fn run(&self, config: &Config, state: &SharedState, _args: &[&str]) -> Result { - let (interface, ip, rx_mbps, tx_mbps) = if let Ok(s) = state.read() { + async fn run( + &self, + config: &Config, + state: &SharedState, + _args: &[&str], + ) -> Result { + let (interface, ip, rx_mbps, tx_mbps) = { + let s = state.read().await; ( s.network.interface.clone(), s.network.ip.clone(), s.network.rx_mbps, s.network.tx_mbps, ) - } else { - return Ok(WaybarOutput { - text: "No connection".to_string(), - tooltip: None, - class: None, - percentage: None, - }); }; if interface.is_empty() { @@ -208,16 +208,16 @@ mod tests { use super::*; use crate::state::{AppState, NetworkState, mock_state}; - #[test] - fn test_network_no_connection() { + #[tokio::test] + async fn test_network_no_connection() { let state = mock_state(AppState::default()); let config = Config::default(); - let output = NetworkModule.run(&config, &state, &[]).unwrap(); + let output = NetworkModule.run(&config, &state, &[]).await.unwrap(); assert_eq!(output.text, "No connection"); } - #[test] - fn test_network_connected() { + #[tokio::test] + async fn test_network_connected() { let state = mock_state(AppState { network: NetworkState { rx_mbps: 1.5, @@ -228,15 +228,15 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = NetworkModule.run(&config, &state, &[]).unwrap(); + let output = NetworkModule.run(&config, &state, &[]).await.unwrap(); assert!(output.text.contains("eth0")); assert!(output.text.contains("192.168.1.100")); assert!(output.text.contains("1.50")); assert_eq!(output.class.as_deref(), Some("eth0")); } - #[test] - fn test_network_vpn_prefix() { + #[tokio::test] + async fn test_network_vpn_prefix() { let state = mock_state(AppState { network: NetworkState { rx_mbps: 0.0, @@ -247,12 +247,12 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = NetworkModule.run(&config, &state, &[]).unwrap(); + let output = NetworkModule.run(&config, &state, &[]).await.unwrap(); assert!(output.text.starts_with(" ")); } - #[test] - fn test_network_no_ip() { + #[tokio::test] + async fn test_network_no_ip() { let state = mock_state(AppState { network: NetworkState { rx_mbps: 0.0, @@ -263,7 +263,7 @@ mod tests { ..Default::default() }); let config = Config::default(); - let output = NetworkModule.run(&config, &state, &[]).unwrap(); + let output = NetworkModule.run(&config, &state, &[]).await.unwrap(); assert!(output.text.contains("No IP")); } } diff --git a/src/modules/power.rs b/src/modules/power.rs index b537dea..dd0171d 100644 --- a/src/modules/power.rs +++ b/src/modules/power.rs @@ -1,15 +1,20 @@ use crate::config::Config; +use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::Result; use std::fs; pub struct PowerModule; impl WaybarModule for PowerModule { - fn run(&self, config: &Config, _state: &SharedState, _args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + _state: &SharedState, + _args: &[&str], + ) -> Result { let critical_threshold = 15; let warning_threshold = 50; diff --git a/src/modules/sys.rs b/src/modules/sys.rs index fceb486..dee9870 100644 --- a/src/modules/sys.rs +++ b/src/modules/sys.rs @@ -1,26 +1,28 @@ use crate::config::Config; +use crate::error::Result; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::SharedState; use crate::utils::{TokenValue, format_template}; -use anyhow::Result; pub struct SysModule; impl WaybarModule for SysModule { - fn run(&self, config: &Config, state: &SharedState, _args: &[&str]) -> Result { + async fn run( + &self, + config: &Config, + state: &SharedState, + _args: &[&str], + ) -> Result { let (load1, load5, load15, uptime_secs, process_count) = { - if let Ok(state_lock) = state.read() { - ( - state_lock.sys.load_1, - state_lock.sys.load_5, - state_lock.sys.load_15, - state_lock.sys.uptime, - state_lock.sys.process_count, - ) - } else { - (0.0, 0.0, 0.0, 0, 0) - } + let state_lock = state.read().await; + ( + state_lock.sys.load_1, + state_lock.sys.load_5, + state_lock.sys.load_15, + state_lock.sys.uptime, + state_lock.sys.process_count, + ) }; let hours = uptime_secs / 3600; diff --git a/src/state.rs b/src/state.rs index f634c19..d3e2fad 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,4 +1,7 @@ -use std::sync::{Arc, RwLock}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; +use tokio::time::Instant; #[derive(Default, Clone)] pub struct AppState { @@ -10,6 +13,14 @@ pub struct AppState { pub disks: Vec, pub bluetooth: BtState, pub audio: AudioState, + pub health: HashMap, +} + +#[derive(Clone, Default)] +pub struct ModuleHealth { + pub consecutive_failures: u32, + pub last_failure: Option, + pub backoff_until: Option, } #[derive(Default, Clone)] From e57b69a735992b6307ded20ec31c162a33f0fbed Mon Sep 17 00:00:00 2001 From: Nils Pukropp Date: Wed, 1 Apr 2026 16:49:33 +0200 Subject: [PATCH 2/5] version bump --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c063d4f..c78d65f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluxo-rs" -version = "0.3.0" +version = "0.3.1" edition = "2024" [dependencies] From c8a50d7b678f151f9f11ce2b9728dcba19957e05 Mon Sep 17 00:00:00 2001 From: Nils Pukropp Date: Wed, 1 Apr 2026 16:51:44 +0200 Subject: [PATCH 3/5] fixed ci with release fixes --- .gitea/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml index d41166f..8de75a6 100644 --- a/.gitea/workflows/ci.yml +++ b/.gitea/workflows/ci.yml @@ -19,6 +19,8 @@ jobs: toolchain: stable components: rustfmt, clippy cache: false + - name: Install packaging tools + run: apt-get update && apt-get install -y dpkg-dev pkg-config libdbus-1-dev protobuf-compiler libpulse-dev - name: Check formatting run: cargo fmt --check From 81c9b78cb393ef68deb398d68fc116aff549dc84 Mon Sep 17 00:00:00 2001 From: Nils Pukropp Date: Wed, 1 Apr 2026 17:54:16 +0200 Subject: [PATCH 4/5] improved robustness of pixel buds plugin --- Cargo.lock | 2 +- src/config.rs | 26 +-- src/daemon.rs | 4 +- src/modules/bt.rs | 508 +++++++++++++++++++++++++++++----------------- 4 files changed, 334 insertions(+), 206 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f8ead7d..20e1618 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -353,7 +353,7 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" [[package]] name = "fluxo-rs" -version = "0.3.0" +version = "0.3.1" dependencies = [ "anyhow", "bluer", diff --git a/src/config.rs b/src/config.rs index ca82b67..bff02f1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; use std::sync::LazyLock; use tracing::{debug, info, warn}; -#[derive(Deserialize, Default)] +#[derive(Deserialize, Default, Clone)] pub struct Config { #[serde(default)] pub general: GeneralConfig, @@ -33,7 +33,7 @@ pub struct Config { pub game: GameConfig, } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct GeneralConfig { pub menu_command: String, } @@ -46,7 +46,7 @@ impl Default for GeneralConfig { } } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct NetworkConfig { pub format: String, } @@ -59,7 +59,7 @@ impl Default for NetworkConfig { } } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct CpuConfig { pub format: String, } @@ -72,7 +72,7 @@ impl Default for CpuConfig { } } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct MemoryConfig { pub format: String, } @@ -85,7 +85,7 @@ impl Default for MemoryConfig { } } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct GpuConfig { pub format_amd: String, pub format_intel: String, @@ -104,7 +104,7 @@ impl Default for GpuConfig { } } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct SysConfig { pub format: String, } @@ -117,7 +117,7 @@ impl Default for SysConfig { } } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct DiskConfig { pub format: String, } @@ -130,7 +130,7 @@ impl Default for DiskConfig { } } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct PoolConfig { pub format: String, } @@ -143,7 +143,7 @@ impl Default for PoolConfig { } } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct PowerConfig { pub format: String, } @@ -156,7 +156,7 @@ impl Default for PowerConfig { } } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct AudioConfig { pub format_sink_unmuted: String, pub format_sink_muted: String, @@ -175,7 +175,7 @@ impl Default for AudioConfig { } } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct BtConfig { pub format_connected: String, pub format_plugin: String, @@ -194,7 +194,7 @@ impl Default for BtConfig { } } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct GameConfig { pub format_active: String, pub format_inactive: String, diff --git a/src/daemon.rs b/src/daemon.rs index 3a4195f..1e3327e 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -89,11 +89,13 @@ pub async fn run_daemon(config_path: Option) -> Result<()> { // 4. Bluetooth Task let poll_state = Arc::clone(&state); + let poll_config = Arc::clone(&config); tokio::spawn(async move { info!("Starting Bluetooth polling task"); let mut daemon = BtDaemon::new(); loop { - daemon.poll(Arc::clone(&poll_state)).await; + let config = poll_config.read().await; + daemon.poll(Arc::clone(&poll_state), &config).await; sleep(Duration::from_secs(1)).await; } }); diff --git a/src/modules/bt.rs b/src/modules/bt.rs index c74213d..0c1f214 100644 --- a/src/modules/bt.rs +++ b/src/modules/bt.rs @@ -1,14 +1,15 @@ use crate::config::Config; -use crate::error::Result as FluxoResult; +use crate::error::{FluxoError, Result as FluxoResult}; use crate::modules::WaybarModule; use crate::output::WaybarOutput; use crate::state::{BtState, SharedState}; use crate::utils::{TokenValue, format_template}; use anyhow::{Context, Result}; use futures::StreamExt; +use futures::future::BoxFuture; use std::collections::HashMap; use std::process::Command; -use std::sync::{Arc, LazyLock, Mutex}; +use std::sync::{Arc, LazyLock, Mutex, OnceLock}; use tokio::sync::mpsc; use tokio::time::{Duration, Instant}; use tracing::{debug, error, info, warn}; @@ -49,10 +50,11 @@ struct MaestroManager { } impl MaestroManager { - fn new() -> Self { + fn new(state: SharedState) -> Self { let (tx, mut rx) = mpsc::unbounded_channel::(); let statuses = Arc::new(Mutex::new(HashMap::new())); let statuses_clone = Arc::clone(&statuses); + let state_clone = Arc::clone(&state); // Start dedicated BT management thread std::thread::spawn(move || { @@ -77,9 +79,10 @@ impl MaestroManager { let mac_clone = mac.clone(); let st_clone = Arc::clone(&statuses_clone); + let state_inner = Arc::clone(&state_clone); tokio::task::spawn_local(async move { - if let Err(e) = buds_task(&mac_clone, st_clone, buds_rx).await { + if let Err(e) = buds_task(&mac_clone, st_clone, buds_rx, state_inner).await { error!("Buds task for {} failed: {}", mac_clone, e); } }); @@ -130,11 +133,18 @@ async fn buds_task( mac: &str, statuses: Arc>>, mut rx: mpsc::Receiver, + state: SharedState, ) -> Result<()> { info!("Starting native Maestro connection task for {}", mac); loop { - let addr: bluer::Address = mac.parse().context("Failed to parse MAC address")?; + let addr: bluer::Address = match mac.parse() { + Ok(a) => a, + Err(e) => { + error!("Failed to parse MAC address {}: {}", mac, e); + return Err(e.into()); + } + }; let session = bluer::Session::new() .await .context("Failed to create bluer session")?; @@ -152,7 +162,6 @@ async fn buds_task( } // Connect to Maestro RFCOMM service - // We try channel 1 then 2, which covers most Pixel Buds variants. let mut stream = None; for channel in [1, 2] { let socket = match bluer::rfcomm::Socket::new() { @@ -198,7 +207,7 @@ async fn buds_task( let mut client = Client::new(stream); let handle = client.handle(); - // Resolve Maestro channel (typically 1 or 2) + // Resolve Maestro channel let channel = match maestro::protocol::utils::resolve_channel(&mut client).await { Ok(c) => c, Err(e) => { @@ -207,7 +216,6 @@ async fn buds_task( } }; - // Run client in background to handle RPC packets tokio::spawn(async move { if let Err(e) = client.run().await { error!("Maestro client loop error: {}", e); @@ -216,19 +224,28 @@ async fn buds_task( let mut service = MaestroService::new(handle, channel); + // Update health + { + let mut lock = state.write().await; + let health = lock.health.entry("bt.buds".to_string()).or_default(); + health.consecutive_failures = 0; + health.backoff_until = None; + } + // Query initial ANC state if let Ok(val) = service .read_setting_var(settings::SettingId::CurrentAncrState) .await && let SettingValue::CurrentAncrState(anc_state) = val { - let mut status = MAESTRO.get_status(mac); + let mut lock = statuses.lock().unwrap(); + let status = lock.entry(mac.to_string()).or_default(); status.anc_state = anc_state_to_string(&anc_state); - statuses.lock().unwrap().insert(mac.to_string(), status); + status.last_update = Some(Instant::now()); } - // Subscribe to real-time status updates (battery, ANC, wear) - let mut call = match service.subscribe_to_runtime_info() { + // Subscribe to real-time status updates (battery, wear) + let mut runtime_info_call = match service.subscribe_to_runtime_info() { Ok(c) => c, Err(e) => { error!("Failed to subscribe to runtime info for {}: {}", mac, e); @@ -236,9 +253,20 @@ async fn buds_task( } }; - let mut runtime_info = call.stream(); + let mut runtime_info = runtime_info_call.stream(); - debug!("Subscribed to runtime info for {}", mac); + // Subscribe to settings changes (to catch physical toggles on the buds) + let mut settings_changes_call = match service.subscribe_to_settings_changes() { + Ok(s) => s, + Err(e) => { + error!("Failed to subscribe to settings changes for {}: {}", mac, e); + continue; + } + }; + + let mut settings_changes = settings_changes_call.stream(); + + debug!("Subscribed to status and settings for {}", mac); loop { tokio::select! { @@ -248,6 +276,14 @@ async fn buds_task( debug!("Setting ANC mode to {} for {}", mode, mac); let state = mode_to_anc_state(&mode); let val = SettingValue::CurrentAncrState(state); + + { + let mut lock = statuses.lock().unwrap(); + let status = lock.entry(mac.to_string()).or_default(); + status.anc_state = mode.clone(); + status.last_update = Some(Instant::now()); + } + if let Err(e) = service.write_setting(val).await { error!("Failed to write ANC setting for {}: {}", mac, e); } @@ -255,20 +291,51 @@ async fn buds_task( None => return Ok(()), } } - Some(Ok(info)) = runtime_info.next() => { - let mut status = MAESTRO.get_status(mac); - status.last_update = Some(Instant::now()); + Some(res) = runtime_info.next() => { + match res { + Ok(info) => { + let mut lock = statuses.lock().unwrap(); + let status = lock.entry(mac.to_string()).or_default(); + status.last_update = Some(Instant::now()); - if let Some(bat) = info.battery_info { - status.left_battery = bat.left.map(|b| b.level as u8); - status.right_battery = bat.right.map(|b| b.level as u8); - status.case_battery = bat.case.map(|b| b.level as u8); + if let Some(bat) = info.battery_info { + status.left_battery = bat.left.map(|b| b.level as u8); + status.right_battery = bat.right.map(|b| b.level as u8); + status.case_battery = bat.case.map(|b| b.level as u8); + } + } + Err(e) => { + warn!("Runtime info stream error for {}: {}", mac, e); + break; + } } + } + Some(res) = settings_changes.next() => { + if let Ok(change) = res { + use maestro::protocol::types::settings_rsp::ValueOneof as RspOneof; + use maestro::protocol::types::setting_value::ValueOneof as ValOneof; - statuses.lock().unwrap().insert(mac.to_string(), status); + if let Some(RspOneof::Value(setting_val)) = change.value_oneof { + if let Some(ValOneof::CurrentAncrState(anc_state_raw)) = setting_val.value_oneof { + let mut lock = statuses.lock().unwrap(); + let status = lock.entry(mac.to_string()).or_default(); + + let anc_state = match anc_state_raw { + 1 => settings::AncState::Off, + 2 => settings::AncState::Active, + 3 => settings::AncState::Aware, + 4 => settings::AncState::Adaptive, + _ => settings::AncState::Unknown(anc_state_raw), + }; + + status.anc_state = anc_state_to_string(&anc_state); + status.last_update = Some(Instant::now()); + debug!(mode = %status.anc_state, "Caught physical ANC toggle"); + } + } + } } _ = tokio::time::sleep(Duration::from_secs(30)) => { - // Check if still connected to BT if !device.is_connected().await.unwrap_or(false) { break; } @@ -302,7 +369,11 @@ fn anc_state_to_string(state: &settings::AncState) -> String { } } -static MAESTRO: LazyLock = LazyLock::new(MaestroManager::new); +static MAESTRO: OnceLock = OnceLock::new(); + +fn get_maestro(state: &SharedState) -> &MaestroManager { + MAESTRO.get_or_init(|| MaestroManager::new(Arc::clone(state))) +} pub struct BtDaemon { session: Option, @@ -313,13 +384,13 @@ impl BtDaemon { Self { session: None } } - pub async fn poll(&mut self, state: SharedState) { - if let Err(e) = self.poll_async(state).await { + pub async fn poll(&mut self, state: SharedState, config: &Config) { + if let Err(e) = self.poll_async(state, config).await { error!("BT daemon error: {}", e); } } - async fn poll_async(&mut self, state: SharedState) -> Result<()> { + async fn poll_async(&mut self, state: SharedState, config: &Config) -> Result<()> { if self.session.is_none() { self.session = Some(bluer::Session::new().await?); } @@ -338,7 +409,6 @@ impl BtDaemon { let device = adapter.device(addr)?; if device.is_connected().await.unwrap_or(false) { let uuids = device.uuids().await?.unwrap_or_default(); - // Audio sink UUID (0x110b) let audio_sink_uuid = bluer::Uuid::from_u128(0x0000110b_0000_1000_8000_00805f9b34fb); if uuids.contains(&audio_sink_uuid) { @@ -349,10 +419,9 @@ impl BtDaemon { bt_state.battery_percentage = device.battery_percentage().await.unwrap_or(None); - // Plugin detection for p in PLUGINS.iter() { if p.can_handle(&bt_state.device_alias, &bt_state.device_address) { - match p.get_data(&bt_state.device_address) { + match p.get_data(config, &state, &bt_state.device_address).await { Ok(data) => { bt_state.plugin_data = data .into_iter() @@ -392,16 +461,24 @@ impl BtDaemon { pub trait BtPlugin: Send + Sync { fn name(&self) -> &str; fn can_handle(&self, alias: &str, mac: &str) -> bool; - fn get_data(&self, mac: &str) -> Result>; - fn get_modes(&self, _mac: &str) -> Result> { - Ok(vec![]) - } - fn set_mode(&self, _mode: &str, _mac: &str) -> Result<()> { - Ok(()) - } - fn cycle_mode(&self, _mac: &str) -> Result<()> { - Ok(()) - } + fn get_data( + &self, + config: &Config, + state: &SharedState, + mac: &str, + ) -> BoxFuture<'static, FluxoResult>>; + fn get_modes( + &self, + mac: &str, + state: &SharedState, + ) -> BoxFuture<'static, FluxoResult>>; + fn set_mode( + &self, + mode: &str, + mac: &str, + state: &SharedState, + ) -> BoxFuture<'static, FluxoResult<()>>; + fn cycle_mode(&self, mac: &str, state: &SharedState) -> BoxFuture<'static, FluxoResult<()>>; } pub struct PixelBudsPlugin; @@ -415,61 +492,104 @@ impl BtPlugin for PixelBudsPlugin { alias.contains("Pixel Buds Pro") } - fn get_data(&self, mac: &str) -> Result> { - MAESTRO.ensure_task(mac); - let status = MAESTRO.get_status(mac); + fn get_data( + &self, + _config: &Config, + state: &SharedState, + mac: &str, + ) -> BoxFuture<'static, FluxoResult>> { + let mac = mac.to_string(); + let state = Arc::clone(state); + Box::pin(async move { + let maestro = get_maestro(&state); + maestro.ensure_task(&mac); + let status = maestro.get_status(&mac); - if let Some(err) = status.error { - return Err(anyhow::anyhow!(err)); - } + if let Some(err) = status.error { + return Err(FluxoError::Module { + module: "bt.buds", + message: err, + }); + } - let left_display = status - .left_battery - .map(|b| format!("{}%", b)) - .unwrap_or_else(|| "---".to_string()); - let right_display = status - .right_battery - .map(|b| format!("{}%", b)) - .unwrap_or_else(|| "---".to_string()); + let left_display = status + .left_battery + .map(|b| format!("{}%", b)) + .unwrap_or_else(|| "---".to_string()); + let right_display = status + .right_battery + .map(|b| format!("{}%", b)) + .unwrap_or_else(|| "---".to_string()); - let (anc_icon, class) = match status.anc_state.as_str() { - "active" => ("ANC", "anc-active"), - "aware" => ("Aware", "anc-aware"), - "off" => ("Off", "anc-off"), - _ => ("?", "anc-unknown"), - }; + let (anc_icon, class) = match status.anc_state.as_str() { + "active" => ("ANC", "anc-active"), + "aware" => ("Aware", "anc-aware"), + "off" => ("Off", "anc-off"), + _ => ("?", "anc-unknown"), + }; - Ok(vec![ - ("left".to_string(), TokenValue::String(left_display)), - ("right".to_string(), TokenValue::String(right_display)), - ("anc".to_string(), TokenValue::String(anc_icon.to_string())), - ( - "plugin_class".to_string(), - TokenValue::String(class.to_string()), - ), - ]) + Ok(vec![ + ("left".to_string(), TokenValue::String(left_display)), + ("right".to_string(), TokenValue::String(right_display)), + ("anc".to_string(), TokenValue::String(anc_icon.to_string())), + ( + "plugin_class".to_string(), + TokenValue::String(class.to_string()), + ), + ]) + }) } - fn get_modes(&self, _mac: &str) -> Result> { - Ok(vec![ - "active".to_string(), - "aware".to_string(), - "off".to_string(), - ]) + fn get_modes( + &self, + _mac: &str, + _state: &SharedState, + ) -> BoxFuture<'static, FluxoResult>> { + Box::pin(async move { + Ok(vec![ + "active".to_string(), + "aware".to_string(), + "off".to_string(), + ]) + }) } - fn set_mode(&self, mode: &str, mac: &str) -> Result<()> { - MAESTRO.send_command(mac, BudsCommand::SetAnc(mode.to_string())) + fn set_mode( + &self, + mode: &str, + mac: &str, + state: &SharedState, + ) -> BoxFuture<'static, FluxoResult<()>> { + let mode = mode.to_string(); + let mac = mac.to_string(); + let state = Arc::clone(state); + Box::pin(async move { + get_maestro(&state) + .send_command(&mac, BudsCommand::SetAnc(mode)) + .map_err(|e| FluxoError::Module { + module: "bt.buds", + message: e.to_string(), + }) + }) } - fn cycle_mode(&self, mac: &str) -> Result<()> { - let status = MAESTRO.get_status(mac); - let next_mode = match status.anc_state.as_str() { - "active" => "aware", - "aware" => "off", - _ => "active", - }; - self.set_mode(next_mode, mac) + fn cycle_mode(&self, mac: &str, state: &SharedState) -> BoxFuture<'static, FluxoResult<()>> { + let mac = mac.to_string(); + let state = Arc::clone(state); + Box::pin(async move { + let status = get_maestro(&state).get_status(&mac); + let next_mode = match status.anc_state.as_str() { + "active" => "aware", + "aware" => "off", + _ => "active", + }; + get_maestro(&state) + .send_command(&mac, BudsCommand::SetAnc(next_mode.to_string())) + .map_err(|e| FluxoError::Module { + module: "bt.buds", + message: e.to_string(), + }) + }) } } @@ -479,128 +599,134 @@ static PLUGINS: LazyLock>> = pub struct BtModule; impl WaybarModule for BtModule { - async fn run( + fn run( &self, config: &Config, state: &SharedState, args: &[&str], - ) -> FluxoResult { - let action = args.first().unwrap_or(&"show"); - let bt_state = { - let lock = state.read().await; - lock.bluetooth.clone() - }; + ) -> impl std::future::Future> + Send { + let action = args.first().cloned().unwrap_or("show").to_string(); + let args = args.iter().map(|s| s.to_string()).collect::>(); + let state = Arc::clone(state); + let config = config.clone(); - match *action { - "disconnect" if bt_state.connected => { - let _ = Command::new("bluetoothctl") - .args(["disconnect", &bt_state.device_address]) - .output(); - return Ok(WaybarOutput::default()); - } - "cycle_mode" if bt_state.connected => { - let plugin = PLUGINS - .iter() - .find(|p| p.can_handle(&bt_state.device_alias, &bt_state.device_address)); - if let Some(p) = plugin { - p.cycle_mode(&bt_state.device_address)?; + async move { + let bt_state = { + let lock = state.read().await; + lock.bluetooth.clone() + }; + + match action.as_str() { + "disconnect" if bt_state.connected => { + let _ = Command::new("bluetoothctl") + .args(["disconnect", &bt_state.device_address]) + .output(); + return Ok(WaybarOutput::default()); } - return Ok(WaybarOutput::default()); - } - "get_modes" if bt_state.connected => { - let plugin = PLUGINS - .iter() - .find(|p| p.can_handle(&bt_state.device_alias, &bt_state.device_address)); - let modes = if let Some(p) = plugin { - p.get_modes(&bt_state.device_address)? - } else { - vec![] - }; - return Ok(WaybarOutput { - text: modes.join("\n"), - ..Default::default() - }); - } - "set_mode" if bt_state.connected => { - if let Some(mode) = args.get(1) { + "cycle_mode" if bt_state.connected => { let plugin = PLUGINS .iter() .find(|p| p.can_handle(&bt_state.device_alias, &bt_state.device_address)); if let Some(p) = plugin { - p.set_mode(mode, &bt_state.device_address)?; + p.cycle_mode(&bt_state.device_address, &state).await?; + } + return Ok(WaybarOutput::default()); + } + "get_modes" if bt_state.connected => { + let plugin = PLUGINS + .iter() + .find(|p| p.can_handle(&bt_state.device_alias, &bt_state.device_address)); + let modes = if let Some(p) = plugin { + p.get_modes(&bt_state.device_address, &state).await? + } else { + vec![] + }; + return Ok(WaybarOutput { + text: modes.join("\n"), + ..Default::default() + }); + } + "set_mode" if bt_state.connected => { + if let Some(mode) = args.get(1) { + let plugin = PLUGINS.iter().find(|p| { + p.can_handle(&bt_state.device_alias, &bt_state.device_address) + }); + if let Some(p) = plugin { + p.set_mode(mode, &bt_state.device_address, &state).await?; + } + } + return Ok(WaybarOutput::default()); + } + "show" => {} + _ => {} + } + + if !bt_state.adapter_powered { + return Ok(WaybarOutput { + text: config.bt.format_disabled.clone(), + tooltip: Some("Bluetooth Disabled".to_string()), + class: Some("disabled".to_string()), + percentage: None, + }); + } + + if bt_state.connected { + let mut tokens: Vec<(String, TokenValue)> = vec![ + ( + "alias".to_string(), + TokenValue::String(bt_state.device_alias.clone()), + ), + ( + "mac".to_string(), + TokenValue::String(bt_state.device_address.clone()), + ), + ]; + + let mut class = vec!["connected".to_string()]; + let mut has_plugin = false; + + for (k, v) in &bt_state.plugin_data { + if k == "plugin_class" { + class.push(v.clone()); + has_plugin = true; + } else if k == "plugin_error" { + class.push("plugin-error".to_string()); + } else { + tokens.push((k.clone(), TokenValue::String(v.clone()))); } } - return Ok(WaybarOutput::default()); - } - "show" => {} - _ => {} - } - if !bt_state.adapter_powered { - return Ok(WaybarOutput { - text: config.bt.format_disabled.clone(), - tooltip: Some("Bluetooth Disabled".to_string()), - class: Some("disabled".to_string()), - percentage: None, - }); - } - - if bt_state.connected { - let mut tokens: Vec<(String, TokenValue)> = vec![ - ( - "alias".to_string(), - TokenValue::String(bt_state.device_alias.clone()), - ), - ( - "mac".to_string(), - TokenValue::String(bt_state.device_address.clone()), - ), - ]; - - let mut class = vec!["connected".to_string()]; - let mut has_plugin = false; - - for (k, v) in &bt_state.plugin_data { - if k == "plugin_class" { - class.push(v.clone()); - has_plugin = true; - } else if k == "plugin_error" { - class.push("plugin-error".to_string()); + let format = if has_plugin { + &config.bt.format_plugin } else { - tokens.push((k.clone(), TokenValue::String(v.clone()))); - } - } + &config.bt.format_connected + }; - let format = if has_plugin { - &config.bt.format_plugin + let text = format_template(format, &tokens); + let tooltip = format!( + "{} | MAC: {}\nBattery: {}", + bt_state.device_alias, + bt_state.device_address, + bt_state + .battery_percentage + .map(|b| format!("{}%", b)) + .unwrap_or_else(|| "N/A".to_string()) + ); + + Ok(WaybarOutput { + text, + tooltip: Some(tooltip), + class: Some(class.join(" ")), + percentage: bt_state.battery_percentage, + }) } else { - &config.bt.format_connected - }; - - let text = format_template(format, &tokens); - let tooltip = format!( - "{} | MAC: {}\nBattery: {}", - bt_state.device_alias, - bt_state.device_address, - bt_state - .battery_percentage - .map(|b| format!("{}%", b)) - .unwrap_or_else(|| "N/A".to_string()) - ); - - Ok(WaybarOutput { - text, - tooltip: Some(tooltip), - class: Some(class.join(" ")), - percentage: bt_state.battery_percentage, - }) - } else { - Ok(WaybarOutput { - text: config.bt.format_disconnected.clone(), - tooltip: Some("Bluetooth On (Disconnected)".to_string()), - class: Some("disconnected".to_string()), - percentage: None, - }) + Ok(WaybarOutput { + text: config.bt.format_disconnected.clone(), + tooltip: Some("Bluetooth On (Disconnected)".to_string()), + class: Some("disconnected".to_string()), + percentage: None, + }) + } } } } From a932e6b4227d62e388428dda2bf39713d89b2db5 Mon Sep 17 00:00:00 2001 From: Nils Pukropp Date: Wed, 1 Apr 2026 18:16:22 +0200 Subject: [PATCH 5/5] updated documentation + readme and seperated bt modules --- README.md | 115 +++--- src/modules/bt.rs | 732 -------------------------------------- src/modules/bt/buds.rs | 142 ++++++++ src/modules/bt/maestro.rs | 362 +++++++++++++++++++ src/modules/bt/mod.rs | 236 ++++++++++++ 5 files changed, 783 insertions(+), 804 deletions(-) delete mode 100644 src/modules/bt.rs create mode 100644 src/modules/bt/buds.rs create mode 100644 src/modules/bt/maestro.rs create mode 100644 src/modules/bt/mod.rs diff --git a/README.md b/README.md index 7b225e9..e30defc 100644 --- a/README.md +++ b/README.md @@ -1,95 +1,66 @@ # fluxo-rs -fluxo-rs is a high-performance system metrics daemon and client designed specifically for waybar. It replaces standard shell scripts with a compiled rust binary that collects data via a background polling loop and serves it over a unix domain socket (`/tmp/fluxo.sock`). +fluxo-rs is a high-performance system metrics daemon and client designed specifically for Waybar. It replaces standard shell scripts with a compiled Rust binary that collects data via a background polling loop and serves it over a Unix socket. -## Description +## Key Features -The project follows a client-server architecture: -- **Daemon**: Handles heavy lifting (polling cpu, memory, network, gpu) and stores state in memory. -- **Client**: A thin cli wrapper that connects to the daemon's socket to retrieve formatted json for waybar. - -This approach eliminates process spawning overhead and temporary file locking, resulting in near-zero cpu usage for custom modules. - -## Features - -- **Ultra-lightweight**: Background polling is highly optimized (e.g., O(1) process counting). -- **Jitter-free**: Uses zero-width sentinels and figure spaces to prevent waybar from trimming padding. -- **Configurable**: Fully customizable output formats via toml config. -- **Interactive Menus**: Integrated support for selecting items (like Bluetooth devices) via external menus (e.g., Rofi, Wofi, Fuzzel). -- **Live Reload**: Configuration can be reloaded without restarting the daemon. -- **Multi-vendor GPU**: Native support for intel (igpu), amd, and nvidia. +- **Asynchronous Architecture**: Built on **Tokio**, the daemon handles concurrent IPC requests and background tasks with zero latency and minimal CPU overhead. +- **Native Library Integrations**: + - **Audio**: Direct `libpulse` integration for event-driven, instant volume and device updates. + - **Bluetooth**: Native `bluer` integration for robust device monitoring. + - **Pixel Buds Pro**: Custom native RPC implementation for real-time battery and ANC control. + - **Network**: Native `nix` and `/proc` inspection for high-speed interface monitoring. + - **Hyprland**: Direct IPC Unix socket communication for gamemode and animation status. +- **Circuit Breaker (Failsafe)**: Automatically detects failing modules and enters a "Cool down" state to prevent resource waste and log spam. +- **Multi-threaded Polling**: Decoupled subsystem threads ensure that a hang in one system (e.g., a slow GPU probe) never freezes your entire bar. ## Modules | Command | Description | Tokens | | :--- | :--- | :--- | -| `net` | Network speed (rx/tx) | `{interface}`, `{ip}`, `{rx}`, `{tx}` | -| `cpu` | CPU usage and temp | `{usage}`, `{temp}` | +| `cpu` | CPU usage and temperature | `{usage}`, `{temp}`, `{model}` | | `mem` | Memory usage | `{used}`, `{total}` | -| `gpu` | GPU utilization | `{usage}`, `{vram_used}`, `{vram_total}`, `{temp}` | -| `sys` | System load and uptime | `{uptime}`, `{load1}`, `{load5}`, `{load15}` | -| `disk` | Disk usage (default: /) | `{mount}`, `{used}`, `{total}` | -| `pool` | Aggregate storage (btrfs) | `{used}`, `{total}` | -| `vol` | Audio output volume | `{name}`, `{volume}`, `{icon}` | -| `mic` | Audio input volume | `{name}`, `{volume}`, `{icon}` | +| `net` | Network status & speeds | `{interface}`, `{ip}`, `{rx}`, `{tx}` | +| `sys` | System load and uptime | `{uptime}`, `{load1}`, `{load5}`, `{load15}`, `{procs}` | +| `disk` | Disk usage | `{mount}`, `{used}`, `{total}` | +| `pool` | Btrfs aggregate storage | `{used}`, `{total}` | +| `vol` | Audio output (sink) | `{name}`, `{volume}`, `{icon}` | +| `mic` | Audio input (source) | `{name}`, `{volume}`, `{icon}` | | `bt` | Bluetooth status & plugins | `{alias}`, `{mac}`, `{left}`, `{right}`, `{anc}` | | `power` | Battery and AC status | `{percentage}`, `{icon}` | -| `game` | Hyprland gamemode status | active/inactive icon strings | +| `game` | Hyprland status | active/inactive icons | ## Setup -1. Build the project: - ```bash - cd fluxo-rs - cargo build --release - ``` +1. **Build**: `cargo build --release` +2. **Configure**: Create `~/.config/fluxo/config.toml` (see `example.config.toml`). +3. **Daemon**: Start `fluxo-rs daemon`. It's recommended to run this as a systemd user service. -2. Start the daemon: - ```bash - # Starts the daemon using the default config path (~/.config/fluxo/config.toml) - ./target/release/fluxo-rs daemon & - - # Or provide a custom path - ./target/release/fluxo-rs daemon --config /path/to/your/config.toml & - ``` +## Waybar Configuration -3. Configuration: - Create `~/.config/fluxo/config.toml` (see `example.config.toml` for all default options). +To achieve zero-latency updates, use **Waybar Signals**: -4. Waybar configuration (`config.jsonc`): - ```json - "custom/cpu": { - "exec": "~/path/to/fluxo-rs cpu", - "return-type": "json" - } - ``` - -## Development - -### Architecture -- `src/main.rs`: Entry point, CLI parsing, interactive GUI spawns (menus), and client-side formatting logic. -- `src/daemon.rs`: UDS listener, configuration management, and polling orchestration. -- `src/ipc.rs`: Unix domain socket communication logic. -- `src/utils.rs`: Generic GUI utilities (like the menu spawner). -- `src/modules/`: Individual metric implementations. -- `src/state.rs`: Shared thread-safe data structures. - -### Adding a Module -1. Add the required config block to `src/config.rs`. -2. Add the required state fields to `src/state.rs`. -3. Implement the `WaybarModule` trait in a new file in `src/modules/`. -4. Add polling logic to `src/modules/hardware.rs` or `src/daemon.rs`. -5. Register the new subcommand in `src/main.rs` and the router in `src/daemon.rs`. - -### Configuration Reload -The daemon can reload its configuration live: -```bash -fluxo-rs reload +```jsonc +"custom/audio": { + "exec": "fluxo vol", + "return-type": "json", + "interval": 5, + "signal": 8, + "on-click": "fluxo audio cycle sink && pkill -RTMIN+8 waybar" +}, +"custom/bluetooth": { + "exec": "fluxo bt", + "return-type": "json", + "interval": 5, + "signal": 9, + "on-click": "fluxo bt menu && pkill -RTMIN+9 waybar", + "on-click-right": "fluxo bt cycle_mode && pkill -RTMIN+9 waybar" +} ``` -### Logging & Debugging -`fluxo-rs` uses the `tracing` ecosystem. If a module isn't behaving properly or your configuration isn't applying, start the daemon with debug logging enabled in the foreground: +## Debugging + +Start the daemon with `RUST_LOG=debug` to see detailed logs of library interactions and circuit breaker status: ```bash RUST_LOG=debug fluxo-rs daemon ``` -This will print verbose information regarding config parsing errors, subprocess failures, and IPC requests. diff --git a/src/modules/bt.rs b/src/modules/bt.rs deleted file mode 100644 index 0c1f214..0000000 --- a/src/modules/bt.rs +++ /dev/null @@ -1,732 +0,0 @@ -use crate::config::Config; -use crate::error::{FluxoError, Result as FluxoResult}; -use crate::modules::WaybarModule; -use crate::output::WaybarOutput; -use crate::state::{BtState, SharedState}; -use crate::utils::{TokenValue, format_template}; -use anyhow::{Context, Result}; -use futures::StreamExt; -use futures::future::BoxFuture; -use std::collections::HashMap; -use std::process::Command; -use std::sync::{Arc, LazyLock, Mutex, OnceLock}; -use tokio::sync::mpsc; -use tokio::time::{Duration, Instant}; -use tracing::{debug, error, info, warn}; - -// Maestro imports -#[allow(unused_imports)] -use maestro::protocol::codec::Codec; -#[allow(unused_imports)] -use maestro::pwrpc::client::{Client, ClientHandle}; -#[allow(unused_imports)] -use maestro::service::MaestroService; -#[allow(unused_imports)] -use maestro::service::settings::{self, Setting, SettingValue}; - -#[derive(Clone, Default)] -struct BudsStatus { - left_battery: Option, - right_battery: Option, - case_battery: Option, - anc_state: String, - #[allow(dead_code)] - last_update: Option, - error: Option, -} - -enum BudsCommand { - SetAnc(String), -} - -enum ManagerCommand { - EnsureTask(String), - SendCommand(String, BudsCommand), -} - -struct MaestroManager { - statuses: Arc>>, - management_tx: mpsc::UnboundedSender, -} - -impl MaestroManager { - fn new(state: SharedState) -> Self { - let (tx, mut rx) = mpsc::unbounded_channel::(); - let statuses = Arc::new(Mutex::new(HashMap::new())); - let statuses_clone = Arc::clone(&statuses); - let state_clone = Arc::clone(&state); - - // Start dedicated BT management thread - std::thread::spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - let local = tokio::task::LocalSet::new(); - - local.block_on(&rt, async move { - let mut command_txs: HashMap> = HashMap::new(); - - loop { - tokio::select! { - Some(cmd) = rx.recv() => { - match cmd { - ManagerCommand::EnsureTask(mac) => { - if !command_txs.contains_key(&mac) { - let (tx, buds_rx) = mpsc::channel::(10); - command_txs.insert(mac.clone(), tx); - - let mac_clone = mac.clone(); - let st_clone = Arc::clone(&statuses_clone); - let state_inner = Arc::clone(&state_clone); - - tokio::task::spawn_local(async move { - if let Err(e) = buds_task(&mac_clone, st_clone, buds_rx, state_inner).await { - error!("Buds task for {} failed: {}", mac_clone, e); - } - }); - } - } - ManagerCommand::SendCommand(mac, buds_cmd) => { - if let Some(tx) = command_txs.get(&mac) { - let _ = tx.try_send(buds_cmd); - } - } - } - } - _ = tokio::time::sleep(Duration::from_millis(100)) => { - // Cleanup dropped tasks if needed - } - } - } - }); - }); - - Self { - statuses, - management_tx: tx, - } - } - - fn get_status(&self, mac: &str) -> BudsStatus { - let statuses = self.statuses.lock().unwrap(); - statuses.get(mac).cloned().unwrap_or_default() - } - - fn ensure_task(&self, mac: &str) { - let _ = self - .management_tx - .send(ManagerCommand::EnsureTask(mac.to_string())); - } - - fn send_command(&self, mac: &str, cmd: BudsCommand) -> Result<()> { - self.ensure_task(mac); - let _ = self - .management_tx - .send(ManagerCommand::SendCommand(mac.to_string(), cmd)); - Ok(()) - } -} - -async fn buds_task( - mac: &str, - statuses: Arc>>, - mut rx: mpsc::Receiver, - state: SharedState, -) -> Result<()> { - info!("Starting native Maestro connection task for {}", mac); - - loop { - let addr: bluer::Address = match mac.parse() { - Ok(a) => a, - Err(e) => { - error!("Failed to parse MAC address {}: {}", mac, e); - return Err(e.into()); - } - }; - let session = bluer::Session::new() - .await - .context("Failed to create bluer session")?; - let adapter = session - .default_adapter() - .await - .context("Failed to get default adapter")?; - let device = adapter - .device(addr) - .context("Failed to get device handle")?; - - if !device.is_connected().await.unwrap_or(false) { - debug!("Device {} not connected to BT, stopping maestro task", mac); - break; - } - - // Connect to Maestro RFCOMM service - let mut stream = None; - for channel in [1, 2] { - let socket = match bluer::rfcomm::Socket::new() { - Ok(s) => s, - Err(e) => { - error!("Failed to create RFCOMM socket: {}", e); - return Err(e.into()); - } - }; - let target = bluer::rfcomm::SocketAddr::new(addr, channel); - debug!( - "Trying to connect RFCOMM to {} on channel {}...", - mac, channel - ); - match socket.connect(target).await { - Ok(s) => { - stream = Some(s); - break; - } - Err(e) => { - debug!("Failed to connect to channel {}: {}", channel, e); - } - } - } - - let stream = match stream { - Some(s) => s, - None => { - warn!( - "Failed to connect RFCOMM to {} on any common channel. Retrying in 15s...", - mac - ); - tokio::time::sleep(Duration::from_secs(15)).await; - continue; - } - }; - - info!("Connected Maestro RFCOMM to {} on channel", mac); - - // Initialize Maestro communication stack - let codec = Codec::new(); - let stream = codec.wrap(stream); - let mut client = Client::new(stream); - let handle = client.handle(); - - // Resolve Maestro channel - let channel = match maestro::protocol::utils::resolve_channel(&mut client).await { - Ok(c) => c, - Err(e) => { - error!("Failed to resolve Maestro channel for {}: {}", mac, e); - continue; - } - }; - - tokio::spawn(async move { - if let Err(e) = client.run().await { - error!("Maestro client loop error: {}", e); - } - }); - - let mut service = MaestroService::new(handle, channel); - - // Update health - { - let mut lock = state.write().await; - let health = lock.health.entry("bt.buds".to_string()).or_default(); - health.consecutive_failures = 0; - health.backoff_until = None; - } - - // Query initial ANC state - if let Ok(val) = service - .read_setting_var(settings::SettingId::CurrentAncrState) - .await - && let SettingValue::CurrentAncrState(anc_state) = val - { - let mut lock = statuses.lock().unwrap(); - let status = lock.entry(mac.to_string()).or_default(); - status.anc_state = anc_state_to_string(&anc_state); - status.last_update = Some(Instant::now()); - } - - // Subscribe to real-time status updates (battery, wear) - let mut runtime_info_call = match service.subscribe_to_runtime_info() { - Ok(c) => c, - Err(e) => { - error!("Failed to subscribe to runtime info for {}: {}", mac, e); - continue; - } - }; - - let mut runtime_info = runtime_info_call.stream(); - - // Subscribe to settings changes (to catch physical toggles on the buds) - let mut settings_changes_call = match service.subscribe_to_settings_changes() { - Ok(s) => s, - Err(e) => { - error!("Failed to subscribe to settings changes for {}: {}", mac, e); - continue; - } - }; - - let mut settings_changes = settings_changes_call.stream(); - - debug!("Subscribed to status and settings for {}", mac); - - loop { - tokio::select! { - cmd = rx.recv() => { - match cmd { - Some(BudsCommand::SetAnc(mode)) => { - debug!("Setting ANC mode to {} for {}", mode, mac); - let state = mode_to_anc_state(&mode); - let val = SettingValue::CurrentAncrState(state); - - { - let mut lock = statuses.lock().unwrap(); - let status = lock.entry(mac.to_string()).or_default(); - status.anc_state = mode.clone(); - status.last_update = Some(Instant::now()); - } - - if let Err(e) = service.write_setting(val).await { - error!("Failed to write ANC setting for {}: {}", mac, e); - } - } - None => return Ok(()), - } - } - Some(res) = runtime_info.next() => { - match res { - Ok(info) => { - let mut lock = statuses.lock().unwrap(); - let status = lock.entry(mac.to_string()).or_default(); - status.last_update = Some(Instant::now()); - - if let Some(bat) = info.battery_info { - status.left_battery = bat.left.map(|b| b.level as u8); - status.right_battery = bat.right.map(|b| b.level as u8); - status.case_battery = bat.case.map(|b| b.level as u8); - } - } - Err(e) => { - warn!("Runtime info stream error for {}: {}", mac, e); - break; - } - } - } - Some(res) = settings_changes.next() => { - if let Ok(change) = res { - use maestro::protocol::types::settings_rsp::ValueOneof as RspOneof; - use maestro::protocol::types::setting_value::ValueOneof as ValOneof; - - if let Some(RspOneof::Value(setting_val)) = change.value_oneof { - if let Some(ValOneof::CurrentAncrState(anc_state_raw)) = setting_val.value_oneof { - let mut lock = statuses.lock().unwrap(); - let status = lock.entry(mac.to_string()).or_default(); - - let anc_state = match anc_state_raw { - 1 => settings::AncState::Off, - 2 => settings::AncState::Active, - 3 => settings::AncState::Aware, - 4 => settings::AncState::Adaptive, - _ => settings::AncState::Unknown(anc_state_raw), - }; - - status.anc_state = anc_state_to_string(&anc_state); - status.last_update = Some(Instant::now()); - debug!(mode = %status.anc_state, "Caught physical ANC toggle"); - } - } - } - } - _ = tokio::time::sleep(Duration::from_secs(30)) => { - if !device.is_connected().await.unwrap_or(false) { - break; - } - } - } - } - - if !device.is_connected().await.unwrap_or(false) { - break; - } - } - - Ok(()) -} - -fn mode_to_anc_state(mode: &str) -> settings::AncState { - match mode { - "active" => settings::AncState::Active, - "aware" => settings::AncState::Aware, - "off" => settings::AncState::Off, - _ => settings::AncState::Off, - } -} - -fn anc_state_to_string(state: &settings::AncState) -> String { - match state { - settings::AncState::Active => "active".to_string(), - settings::AncState::Aware => "aware".to_string(), - settings::AncState::Off => "off".to_string(), - _ => "unknown".to_string(), - } -} - -static MAESTRO: OnceLock = OnceLock::new(); - -fn get_maestro(state: &SharedState) -> &MaestroManager { - MAESTRO.get_or_init(|| MaestroManager::new(Arc::clone(state))) -} - -pub struct BtDaemon { - session: Option, -} - -impl BtDaemon { - pub fn new() -> Self { - Self { session: None } - } - - pub async fn poll(&mut self, state: SharedState, config: &Config) { - if let Err(e) = self.poll_async(state, config).await { - error!("BT daemon error: {}", e); - } - } - - async fn poll_async(&mut self, state: SharedState, config: &Config) -> Result<()> { - if self.session.is_none() { - self.session = Some(bluer::Session::new().await?); - } - let session = self.session.as_ref().unwrap(); - let adapter = session.default_adapter().await?; - let adapter_powered = adapter.is_powered().await.unwrap_or(false); - - let mut bt_state = BtState { - adapter_powered, - ..Default::default() - }; - - if adapter_powered { - let devices = adapter.device_addresses().await?; - for addr in devices { - let device = adapter.device(addr)?; - if device.is_connected().await.unwrap_or(false) { - let uuids = device.uuids().await?.unwrap_or_default(); - let audio_sink_uuid = - bluer::Uuid::from_u128(0x0000110b_0000_1000_8000_00805f9b34fb); - if uuids.contains(&audio_sink_uuid) { - bt_state.connected = true; - bt_state.device_address = addr.to_string(); - bt_state.device_alias = - device.alias().await.unwrap_or_else(|_| addr.to_string()); - bt_state.battery_percentage = - device.battery_percentage().await.unwrap_or(None); - - for p in PLUGINS.iter() { - if p.can_handle(&bt_state.device_alias, &bt_state.device_address) { - match p.get_data(config, &state, &bt_state.device_address).await { - Ok(data) => { - bt_state.plugin_data = data - .into_iter() - .map(|(k, v)| { - let val_str = match v { - TokenValue::String(s) => s, - TokenValue::Int(i) => i.to_string(), - TokenValue::Float(f) => format!("{:.1}", f), - }; - (k, val_str) - }) - .collect(); - } - Err(e) => { - warn!("Plugin {} failed for {}: {}", p.name(), addr, e); - bt_state - .plugin_data - .push(("plugin_error".to_string(), e.to_string())); - } - } - break; - } - } - break; - } - } - } - } - - let mut lock = state.write().await; - lock.bluetooth = bt_state; - - Ok(()) - } -} - -pub trait BtPlugin: Send + Sync { - fn name(&self) -> &str; - fn can_handle(&self, alias: &str, mac: &str) -> bool; - fn get_data( - &self, - config: &Config, - state: &SharedState, - mac: &str, - ) -> BoxFuture<'static, FluxoResult>>; - fn get_modes( - &self, - mac: &str, - state: &SharedState, - ) -> BoxFuture<'static, FluxoResult>>; - fn set_mode( - &self, - mode: &str, - mac: &str, - state: &SharedState, - ) -> BoxFuture<'static, FluxoResult<()>>; - fn cycle_mode(&self, mac: &str, state: &SharedState) -> BoxFuture<'static, FluxoResult<()>>; -} - -pub struct PixelBudsPlugin; - -impl BtPlugin for PixelBudsPlugin { - fn name(&self) -> &str { - "Pixel Buds Pro" - } - - fn can_handle(&self, alias: &str, _mac: &str) -> bool { - alias.contains("Pixel Buds Pro") - } - - fn get_data( - &self, - _config: &Config, - state: &SharedState, - mac: &str, - ) -> BoxFuture<'static, FluxoResult>> { - let mac = mac.to_string(); - let state = Arc::clone(state); - Box::pin(async move { - let maestro = get_maestro(&state); - maestro.ensure_task(&mac); - let status = maestro.get_status(&mac); - - if let Some(err) = status.error { - return Err(FluxoError::Module { - module: "bt.buds", - message: err, - }); - } - - let left_display = status - .left_battery - .map(|b| format!("{}%", b)) - .unwrap_or_else(|| "---".to_string()); - let right_display = status - .right_battery - .map(|b| format!("{}%", b)) - .unwrap_or_else(|| "---".to_string()); - - let (anc_icon, class) = match status.anc_state.as_str() { - "active" => ("ANC", "anc-active"), - "aware" => ("Aware", "anc-aware"), - "off" => ("Off", "anc-off"), - _ => ("?", "anc-unknown"), - }; - - Ok(vec![ - ("left".to_string(), TokenValue::String(left_display)), - ("right".to_string(), TokenValue::String(right_display)), - ("anc".to_string(), TokenValue::String(anc_icon.to_string())), - ( - "plugin_class".to_string(), - TokenValue::String(class.to_string()), - ), - ]) - }) - } - - fn get_modes( - &self, - _mac: &str, - _state: &SharedState, - ) -> BoxFuture<'static, FluxoResult>> { - Box::pin(async move { - Ok(vec![ - "active".to_string(), - "aware".to_string(), - "off".to_string(), - ]) - }) - } - - fn set_mode( - &self, - mode: &str, - mac: &str, - state: &SharedState, - ) -> BoxFuture<'static, FluxoResult<()>> { - let mode = mode.to_string(); - let mac = mac.to_string(); - let state = Arc::clone(state); - Box::pin(async move { - get_maestro(&state) - .send_command(&mac, BudsCommand::SetAnc(mode)) - .map_err(|e| FluxoError::Module { - module: "bt.buds", - message: e.to_string(), - }) - }) - } - - fn cycle_mode(&self, mac: &str, state: &SharedState) -> BoxFuture<'static, FluxoResult<()>> { - let mac = mac.to_string(); - let state = Arc::clone(state); - Box::pin(async move { - let status = get_maestro(&state).get_status(&mac); - let next_mode = match status.anc_state.as_str() { - "active" => "aware", - "aware" => "off", - _ => "active", - }; - get_maestro(&state) - .send_command(&mac, BudsCommand::SetAnc(next_mode.to_string())) - .map_err(|e| FluxoError::Module { - module: "bt.buds", - message: e.to_string(), - }) - }) - } -} - -static PLUGINS: LazyLock>> = - LazyLock::new(|| vec![Box::new(PixelBudsPlugin)]); - -pub struct BtModule; - -impl WaybarModule for BtModule { - fn run( - &self, - config: &Config, - state: &SharedState, - args: &[&str], - ) -> impl std::future::Future> + Send { - let action = args.first().cloned().unwrap_or("show").to_string(); - let args = args.iter().map(|s| s.to_string()).collect::>(); - let state = Arc::clone(state); - let config = config.clone(); - - async move { - let bt_state = { - let lock = state.read().await; - lock.bluetooth.clone() - }; - - match action.as_str() { - "disconnect" if bt_state.connected => { - let _ = Command::new("bluetoothctl") - .args(["disconnect", &bt_state.device_address]) - .output(); - return Ok(WaybarOutput::default()); - } - "cycle_mode" if bt_state.connected => { - let plugin = PLUGINS - .iter() - .find(|p| p.can_handle(&bt_state.device_alias, &bt_state.device_address)); - if let Some(p) = plugin { - p.cycle_mode(&bt_state.device_address, &state).await?; - } - return Ok(WaybarOutput::default()); - } - "get_modes" if bt_state.connected => { - let plugin = PLUGINS - .iter() - .find(|p| p.can_handle(&bt_state.device_alias, &bt_state.device_address)); - let modes = if let Some(p) = plugin { - p.get_modes(&bt_state.device_address, &state).await? - } else { - vec![] - }; - return Ok(WaybarOutput { - text: modes.join("\n"), - ..Default::default() - }); - } - "set_mode" if bt_state.connected => { - if let Some(mode) = args.get(1) { - let plugin = PLUGINS.iter().find(|p| { - p.can_handle(&bt_state.device_alias, &bt_state.device_address) - }); - if let Some(p) = plugin { - p.set_mode(mode, &bt_state.device_address, &state).await?; - } - } - return Ok(WaybarOutput::default()); - } - "show" => {} - _ => {} - } - - if !bt_state.adapter_powered { - return Ok(WaybarOutput { - text: config.bt.format_disabled.clone(), - tooltip: Some("Bluetooth Disabled".to_string()), - class: Some("disabled".to_string()), - percentage: None, - }); - } - - if bt_state.connected { - let mut tokens: Vec<(String, TokenValue)> = vec![ - ( - "alias".to_string(), - TokenValue::String(bt_state.device_alias.clone()), - ), - ( - "mac".to_string(), - TokenValue::String(bt_state.device_address.clone()), - ), - ]; - - let mut class = vec!["connected".to_string()]; - let mut has_plugin = false; - - for (k, v) in &bt_state.plugin_data { - if k == "plugin_class" { - class.push(v.clone()); - has_plugin = true; - } else if k == "plugin_error" { - class.push("plugin-error".to_string()); - } else { - tokens.push((k.clone(), TokenValue::String(v.clone()))); - } - } - - let format = if has_plugin { - &config.bt.format_plugin - } else { - &config.bt.format_connected - }; - - let text = format_template(format, &tokens); - let tooltip = format!( - "{} | MAC: {}\nBattery: {}", - bt_state.device_alias, - bt_state.device_address, - bt_state - .battery_percentage - .map(|b| format!("{}%", b)) - .unwrap_or_else(|| "N/A".to_string()) - ); - - Ok(WaybarOutput { - text, - tooltip: Some(tooltip), - class: Some(class.join(" ")), - percentage: bt_state.battery_percentage, - }) - } else { - Ok(WaybarOutput { - text: config.bt.format_disconnected.clone(), - tooltip: Some("Bluetooth On (Disconnected)".to_string()), - class: Some("disconnected".to_string()), - percentage: None, - }) - } - } - } -} diff --git a/src/modules/bt/buds.rs b/src/modules/bt/buds.rs new file mode 100644 index 0000000..bd17c23 --- /dev/null +++ b/src/modules/bt/buds.rs @@ -0,0 +1,142 @@ +use crate::config::Config; +use crate::error::{FluxoError, Result as FluxoResult}; +use crate::modules::bt::maestro::{BudsCommand, get_maestro}; +use crate::state::SharedState; +use crate::utils::TokenValue; +use futures::future::BoxFuture; +use std::sync::Arc; + +pub trait BtPlugin: Send + Sync { + fn name(&self) -> &str; + fn can_handle(&self, alias: &str, mac: &str) -> bool; + fn get_data( + &self, + config: &Config, + state: &SharedState, + mac: &str, + ) -> BoxFuture<'static, FluxoResult>>; + fn get_modes( + &self, + mac: &str, + state: &SharedState, + ) -> BoxFuture<'static, FluxoResult>>; + fn set_mode( + &self, + mode: &str, + mac: &str, + state: &SharedState, + ) -> BoxFuture<'static, FluxoResult<()>>; + fn cycle_mode(&self, mac: &str, state: &SharedState) -> BoxFuture<'static, FluxoResult<()>>; +} + +pub struct PixelBudsPlugin; + +impl BtPlugin for PixelBudsPlugin { + fn name(&self) -> &str { + "Pixel Buds Pro" + } + + fn can_handle(&self, alias: &str, _mac: &str) -> bool { + alias.contains("Pixel Buds Pro") + } + + fn get_data( + &self, + _config: &Config, + state: &SharedState, + mac: &str, + ) -> BoxFuture<'static, FluxoResult>> { + let mac = mac.to_string(); + let state = Arc::clone(state); + Box::pin(async move { + let maestro = get_maestro(&state); + maestro.ensure_task(&mac); + let status = maestro.get_status(&mac); + + if let Some(err) = status.error { + return Err(FluxoError::Module { + module: "bt.buds", + message: err, + }); + } + + let left_display = status + .left_battery + .map(|b| format!("{}%", b)) + .unwrap_or_else(|| "---".to_string()); + let right_display = status + .right_battery + .map(|b| format!("{}%", b)) + .unwrap_or_else(|| "---".to_string()); + + let (anc_icon, class) = match status.anc_state.as_str() { + "active" => ("ANC", "anc-active"), + "aware" => ("Aware", "anc-aware"), + "off" => ("Off", "anc-off"), + _ => ("?", "anc-unknown"), + }; + + Ok(vec![ + ("left".to_string(), TokenValue::String(left_display)), + ("right".to_string(), TokenValue::String(right_display)), + ("anc".to_string(), TokenValue::String(anc_icon.to_string())), + ( + "plugin_class".to_string(), + TokenValue::String(class.to_string()), + ), + ]) + }) + } + + fn get_modes( + &self, + _mac: &str, + _state: &SharedState, + ) -> BoxFuture<'static, FluxoResult>> { + Box::pin(async move { + Ok(vec![ + "active".to_string(), + "aware".to_string(), + "off".to_string(), + ]) + }) + } + + fn set_mode( + &self, + mode: &str, + mac: &str, + state: &SharedState, + ) -> BoxFuture<'static, FluxoResult<()>> { + let mode = mode.to_string(); + let mac = mac.to_string(); + let state = Arc::clone(state); + Box::pin(async move { + get_maestro(&state) + .send_command(&mac, BudsCommand::SetAnc(mode)) + .map_err(|e: anyhow::Error| FluxoError::Module { + module: "bt.buds", + message: e.to_string(), + }) + }) + } + + fn cycle_mode(&self, mac: &str, state: &SharedState) -> BoxFuture<'static, FluxoResult<()>> { + let mac = mac.to_string(); + let state = Arc::clone(state); + Box::pin(async move { + let status = get_maestro(&state).get_status(&mac); + let next_mode = match status.anc_state.as_str() { + "active" => "aware", + "aware" => "off", + _ => "active", + }; + get_maestro(&state) + .send_command(&mac, BudsCommand::SetAnc(next_mode.to_string())) + .map_err(|e: anyhow::Error| FluxoError::Module { + module: "bt.buds", + message: e.to_string(), + }) + }) + } +} diff --git a/src/modules/bt/maestro.rs b/src/modules/bt/maestro.rs new file mode 100644 index 0000000..8b9bcf8 --- /dev/null +++ b/src/modules/bt/maestro.rs @@ -0,0 +1,362 @@ +use crate::state::SharedState; +use anyhow::{Context, Result}; +use futures::StreamExt; +use std::collections::HashMap; +use std::sync::{Arc, Mutex, OnceLock}; +use std::time::{Duration, Instant}; +use tokio::sync::mpsc; +use tracing::{debug, error, info, warn}; + +// Maestro imports +use maestro::protocol::codec::Codec; +use maestro::pwrpc::client::Client; +use maestro::service::MaestroService; +use maestro::service::settings::{self, SettingValue}; + +#[derive(Clone, Default)] +pub struct BudsStatus { + pub left_battery: Option, + pub right_battery: Option, + pub case_battery: Option, + pub anc_state: String, + #[allow(dead_code)] + pub last_update: Option, + pub error: Option, +} + +pub enum BudsCommand { + SetAnc(String), +} + +pub enum ManagerCommand { + EnsureTask(String), + SendCommand(String, BudsCommand), +} + +pub struct MaestroManager { + statuses: Arc>>, + management_tx: mpsc::UnboundedSender, +} + +impl MaestroManager { + pub fn new(state: SharedState) -> Self { + let (tx, mut rx) = mpsc::unbounded_channel::(); + let statuses = Arc::new(Mutex::new(HashMap::new())); + let statuses_clone = Arc::clone(&statuses); + let state_clone = Arc::clone(&state); + + // Start dedicated BT management thread + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let local = tokio::task::LocalSet::new(); + + local.block_on(&rt, async move { + let mut command_txs: HashMap> = HashMap::new(); + + loop { + tokio::select! { + Some(cmd) = rx.recv() => { + match cmd { + ManagerCommand::EnsureTask(mac) => { + if !command_txs.contains_key(&mac) { + let (tx, buds_rx) = mpsc::channel::(10); + command_txs.insert(mac.clone(), tx); + + let mac_clone = mac.clone(); + let st_clone = Arc::clone(&statuses_clone); + let state_inner = Arc::clone(&state_clone); + + tokio::task::spawn_local(async move { + if let Err(e) = buds_task(&mac_clone, st_clone, buds_rx, state_inner).await { + error!("Buds task for {} failed: {}", mac_clone, e); + } + }); + } + } + ManagerCommand::SendCommand(mac, buds_cmd) => { + if let Some(tx) = command_txs.get(&mac) { + let _ = tx.try_send(buds_cmd); + } + } + } + } + _ = tokio::time::sleep(Duration::from_millis(100)) => { + // Cleanup dropped tasks if needed + } + } + } + }); + }); + + Self { + statuses, + management_tx: tx, + } + } + + pub fn get_status(&self, mac: &str) -> BudsStatus { + let statuses = self.statuses.lock().unwrap(); + statuses.get(mac).cloned().unwrap_or_default() + } + + pub fn ensure_task(&self, mac: &str) { + let _ = self + .management_tx + .send(ManagerCommand::EnsureTask(mac.to_string())); + } + + pub fn send_command(&self, mac: &str, cmd: BudsCommand) -> Result<()> { + self.ensure_task(mac); + let _ = self + .management_tx + .send(ManagerCommand::SendCommand(mac.to_string(), cmd)); + Ok(()) + } +} + +async fn buds_task( + mac: &str, + statuses: Arc>>, + mut rx: mpsc::Receiver, + state: SharedState, +) -> Result<()> { + info!("Starting native Maestro connection task for {}", mac); + + loop { + let addr: bluer::Address = match mac.parse() { + Ok(a) => a, + Err(e) => { + error!("Failed to parse MAC address {}: {}", mac, e); + return Err(e.into()); + } + }; + let session = bluer::Session::new() + .await + .context("Failed to create bluer session")?; + let adapter = session + .default_adapter() + .await + .context("Failed to get default adapter")?; + let device = adapter + .device(addr) + .context("Failed to get device handle")?; + + if !device.is_connected().await.unwrap_or(false) { + debug!("Device {} not connected to BT, stopping maestro task", mac); + break; + } + + // Connect to Maestro RFCOMM service + let mut stream = None; + for channel in [1, 2] { + let socket = match bluer::rfcomm::Socket::new() { + Ok(s) => s, + Err(e) => { + error!("Failed to create RFCOMM socket: {}", e); + return Err(e.into()); + } + }; + let target = bluer::rfcomm::SocketAddr::new(addr, channel); + debug!( + "Trying to connect RFCOMM to {} on channel {}...", + mac, channel + ); + match socket.connect(target).await { + Ok(s) => { + stream = Some(s); + break; + } + Err(e) => { + debug!("Failed to connect to channel {}: {}", channel, e); + } + } + } + + let stream = match stream { + Some(s) => s, + None => { + warn!( + "Failed to connect RFCOMM to {} on any common channel. Retrying in 15s...", + mac + ); + tokio::time::sleep(Duration::from_secs(15)).await; + continue; + } + }; + + info!("Connected Maestro RFCOMM to {} on channel", mac); + + // Initialize Maestro communication stack + let codec = Codec::new(); + let stream = codec.wrap(stream); + let mut client = Client::new(stream); + let handle = client.handle(); + + // Resolve Maestro channel + let channel = match maestro::protocol::utils::resolve_channel(&mut client).await { + Ok(c) => c, + Err(e) => { + error!("Failed to resolve Maestro channel for {}: {}", mac, e); + continue; + } + }; + + tokio::spawn(async move { + if let Err(e) = client.run().await { + error!("Maestro client loop error: {}", e); + } + }); + + let mut service = MaestroService::new(handle, channel); + + // Update health + { + let mut lock = state.write().await; + let health = lock.health.entry("bt.buds".to_string()).or_default(); + health.consecutive_failures = 0; + health.backoff_until = None; + } + + // Query initial ANC state + if let Ok(val) = service + .read_setting_var(settings::SettingId::CurrentAncrState) + .await + && let SettingValue::CurrentAncrState(anc_state) = val + { + let mut lock = statuses.lock().unwrap(); + let status = lock.entry(mac.to_string()).or_default(); + status.anc_state = anc_state_to_string(&anc_state); + status.last_update = Some(Instant::now()); + } + + let mut runtime_info_call = match service.subscribe_to_runtime_info() { + Ok(c) => c, + Err(e) => { + error!("Failed to subscribe to runtime info for {}: {}", mac, e); + continue; + } + }; + + let mut runtime_info = runtime_info_call.stream(); + + let mut settings_changes_call = match service.subscribe_to_settings_changes() { + Ok(s) => s, + Err(e) => { + error!("Failed to subscribe to settings changes for {}: {}", mac, e); + continue; + } + }; + + let mut settings_changes = settings_changes_call.stream(); + + debug!("Subscribed to status and settings for {}", mac); + + loop { + tokio::select! { + cmd = rx.recv() => { + match cmd { + Some(BudsCommand::SetAnc(mode)) => { + debug!("Setting ANC mode to {} for {}", mode, mac); + let state = mode_to_anc_state(&mode); + let val = SettingValue::CurrentAncrState(state); + + { + let mut lock = statuses.lock().unwrap(); + let status = lock.entry(mac.to_string()).or_default(); + status.anc_state = mode.clone(); + status.last_update = Some(Instant::now()); + } + + if let Err(e) = service.write_setting(val).await { + error!("Failed to write ANC setting for {}: {}", mac, e); + } + } + None => return Ok(()), + } + } + Some(res) = runtime_info.next() => { + match res { + Ok(info) => { + let mut lock = statuses.lock().unwrap(); + let status = lock.entry(mac.to_string()).or_default(); + status.last_update = Some(Instant::now()); + + if let Some(bat) = info.battery_info { + status.left_battery = bat.left.map(|b| b.level as u8); + status.right_battery = bat.right.map(|b| b.level as u8); + status.case_battery = bat.case.map(|b| b.level as u8); + } + } + Err(e) => { + warn!("Runtime info stream error for {}: {}", mac, e); + break; + } + } + } + Some(res) = settings_changes.next() => { + if let Ok(change) = res { + use maestro::protocol::types::settings_rsp::ValueOneof as RspOneof; + use maestro::protocol::types::setting_value::ValueOneof as ValOneof; + + if let Some(RspOneof::Value(setting_val)) = change.value_oneof + && let Some(ValOneof::CurrentAncrState(anc_state_raw)) = setting_val.value_oneof { + let mut lock = statuses.lock().unwrap(); + let status = lock.entry(mac.to_string()).or_default(); + + let anc_state = match anc_state_raw { + 1 => settings::AncState::Off, + 2 => settings::AncState::Active, + 3 => settings::AncState::Aware, + 4 => settings::AncState::Adaptive, + _ => settings::AncState::Unknown(anc_state_raw), + }; + + status.anc_state = anc_state_to_string(&anc_state); + status.last_update = Some(Instant::now()); + debug!(mode = %status.anc_state, "Caught physical ANC toggle"); + } + } + } + _ = tokio::time::sleep(Duration::from_secs(30)) => { + if !device.is_connected().await.unwrap_or(false) { + break; + } + } + } + } + + if !device.is_connected().await.unwrap_or(false) { + break; + } + } + + Ok(()) +} + +fn mode_to_anc_state(mode: &str) -> settings::AncState { + match mode { + "active" => settings::AncState::Active, + "aware" => settings::AncState::Aware, + "off" => settings::AncState::Off, + _ => settings::AncState::Off, + } +} + +pub fn anc_state_to_string(state: &settings::AncState) -> String { + match state { + settings::AncState::Active => "active".to_string(), + settings::AncState::Aware => "aware".to_string(), + settings::AncState::Off => "off".to_string(), + _ => "unknown".to_string(), + } +} + +static MAESTRO: OnceLock = OnceLock::new(); + +pub fn get_maestro(state: &SharedState) -> &MaestroManager { + MAESTRO.get_or_init(|| MaestroManager::new(Arc::clone(state))) +} diff --git a/src/modules/bt/mod.rs b/src/modules/bt/mod.rs new file mode 100644 index 0000000..ecd033c --- /dev/null +++ b/src/modules/bt/mod.rs @@ -0,0 +1,236 @@ +pub mod buds; +pub mod maestro; + +use crate::config::Config; +use crate::error::Result as FluxoResult; +use crate::modules::WaybarModule; +use crate::output::WaybarOutput; +use crate::state::{BtState, SharedState}; +use crate::utils::{TokenValue, format_template}; +use anyhow::Result; +use std::process::Command; +use std::sync::{Arc, LazyLock}; +use tracing::{error, warn}; + +use self::buds::{BtPlugin, PixelBudsPlugin}; + +pub struct BtDaemon { + session: Option, +} + +impl BtDaemon { + pub fn new() -> Self { + Self { session: None } + } + + pub async fn poll(&mut self, state: SharedState, config: &Config) { + if let Err(e) = self.poll_async(state, config).await { + error!("BT daemon error: {}", e); + } + } + + async fn poll_async(&mut self, state: SharedState, config: &Config) -> Result<()> { + if self.session.is_none() { + self.session = Some(bluer::Session::new().await?); + } + let session = self.session.as_ref().unwrap(); + let adapter = session.default_adapter().await?; + let adapter_powered = adapter.is_powered().await.unwrap_or(false); + + let mut bt_state = BtState { + adapter_powered, + ..Default::default() + }; + + if adapter_powered { + let devices = adapter.device_addresses().await?; + for addr in devices { + let device = adapter.device(addr)?; + if device.is_connected().await.unwrap_or(false) { + let uuids = device.uuids().await?.unwrap_or_default(); + let audio_sink_uuid = + bluer::Uuid::from_u128(0x0000110b_0000_1000_8000_00805f9b34fb); + if uuids.contains(&audio_sink_uuid) { + bt_state.connected = true; + bt_state.device_address = addr.to_string(); + bt_state.device_alias = + device.alias().await.unwrap_or_else(|_| addr.to_string()); + bt_state.battery_percentage = + device.battery_percentage().await.unwrap_or(None); + + for p in PLUGINS.iter() { + if p.can_handle(&bt_state.device_alias, &bt_state.device_address) { + match p.get_data(config, &state, &bt_state.device_address).await { + Ok(data) => { + bt_state.plugin_data = data + .into_iter() + .map(|(k, v)| { + let val_str = match v { + TokenValue::String(s) => s, + TokenValue::Int(i) => i.to_string(), + TokenValue::Float(f) => format!("{:.1}", f), + }; + (k, val_str) + }) + .collect(); + } + Err(e) => { + warn!("Plugin {} failed for {}: {}", p.name(), addr, e); + bt_state + .plugin_data + .push(("plugin_error".to_string(), e.to_string())); + } + } + break; + } + } + break; + } + } + } + } + + let mut lock = state.write().await; + lock.bluetooth = bt_state; + + Ok(()) + } +} + +static PLUGINS: LazyLock>> = + LazyLock::new(|| vec![Box::new(PixelBudsPlugin)]); + +pub struct BtModule; + +impl WaybarModule for BtModule { + fn run( + &self, + config: &Config, + state: &SharedState, + args: &[&str], + ) -> impl std::future::Future> + Send { + let action = args.first().cloned().unwrap_or("show").to_string(); + let args = args.iter().map(|s| s.to_string()).collect::>(); + let state = Arc::clone(state); + let config = config.clone(); + + async move { + let bt_state = { + let lock = state.read().await; + lock.bluetooth.clone() + }; + + match action.as_str() { + "disconnect" if bt_state.connected => { + let _ = Command::new("bluetoothctl") + .args(["disconnect", &bt_state.device_address]) + .output(); + return Ok(WaybarOutput::default()); + } + "cycle_mode" if bt_state.connected => { + let plugin = PLUGINS + .iter() + .find(|p| p.can_handle(&bt_state.device_alias, &bt_state.device_address)); + if let Some(p) = plugin { + p.cycle_mode(&bt_state.device_address, &state).await?; + } + return Ok(WaybarOutput::default()); + } + "get_modes" if bt_state.connected => { + let plugin = PLUGINS + .iter() + .find(|p| p.can_handle(&bt_state.device_alias, &bt_state.device_address)); + let modes = if let Some(p) = plugin { + p.get_modes(&bt_state.device_address, &state).await? + } else { + vec![] + }; + return Ok(WaybarOutput { + text: modes.join("\n"), + ..Default::default() + }); + } + "set_mode" if bt_state.connected => { + if let Some(mode) = args.get(1) { + let plugin = PLUGINS.iter().find(|p| { + p.can_handle(&bt_state.device_alias, &bt_state.device_address) + }); + if let Some(p) = plugin { + p.set_mode(mode, &bt_state.device_address, &state).await?; + } + } + return Ok(WaybarOutput::default()); + } + "show" => {} + _ => {} + } + + if !bt_state.adapter_powered { + return Ok(WaybarOutput { + text: config.bt.format_disabled.clone(), + tooltip: Some("Bluetooth Disabled".to_string()), + class: Some("disabled".to_string()), + percentage: None, + }); + } + + if bt_state.connected { + let mut tokens: Vec<(String, TokenValue)> = vec![ + ( + "alias".to_string(), + TokenValue::String(bt_state.device_alias.clone()), + ), + ( + "mac".to_string(), + TokenValue::String(bt_state.device_address.clone()), + ), + ]; + + let mut class = vec!["connected".to_string()]; + let mut has_plugin = false; + + for (k, v) in &bt_state.plugin_data { + if k == "plugin_class" { + class.push(v.clone()); + has_plugin = true; + } else if k == "plugin_error" { + class.push("plugin-error".to_string()); + } else { + tokens.push((k.clone(), TokenValue::String(v.clone()))); + } + } + + let format = if has_plugin { + &config.bt.format_plugin + } else { + &config.bt.format_connected + }; + + let text = format_template(format, &tokens); + let tooltip = format!( + "{} | MAC: {}\nBattery: {}", + bt_state.device_alias, + bt_state.device_address, + bt_state + .battery_percentage + .map(|b| format!("{}%", b)) + .unwrap_or_else(|| "N/A".to_string()) + ); + + Ok(WaybarOutput { + text, + tooltip: Some(tooltip), + class: Some(class.join(" ")), + percentage: bt_state.battery_percentage, + }) + } else { + Ok(WaybarOutput { + text: config.bt.format_disconnected.clone(), + tooltip: Some("Bluetooth On (Disconnected)".to_string()), + class: Some("disconnected".to_string()), + percentage: None, + }) + } + } + } +}