migrated daemon to tokio with seperate hardware threads + thiserror

This commit is contained in:
2026-04-01 16:49:03 +02:00
parent 6fa14c0b84
commit ed0051f2c9
19 changed files with 517 additions and 333 deletions
+203 -138
View File
@@ -1,4 +1,5 @@
use crate::config::Config;
use crate::error::FluxoError;
use crate::ipc::socket_path;
use crate::modules::WaybarModule;
use crate::modules::audio::AudioDaemon;
@@ -8,14 +9,12 @@ use crate::modules::network::NetworkDaemon;
use crate::state::{AppState, SharedState};
use anyhow::Result;
use std::fs;
use std::io::{BufRead, BufReader, Write};
use std::net::Shutdown;
use std::os::unix::net::UnixListener;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixListener;
use tokio::sync::RwLock;
use tokio::time::{Duration, Instant, sleep};
use tracing::{debug, error, info, warn};
struct SocketGuard {
@@ -29,7 +28,7 @@ impl Drop for SocketGuard {
}
}
pub fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
let sock_path = socket_path();
if fs::metadata(&sock_path).is_ok() {
@@ -43,134 +42,126 @@ pub fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
path: sock_path.clone(),
};
// Signal handling: set flag so main loop exits cleanly
let running = Arc::new(AtomicBool::new(true));
let running_clone = Arc::clone(&running);
ctrlc::set_handler(move || {
// Signal handling for graceful shutdown in async context
let running = Arc::new(tokio::sync::Notify::new());
let r_clone = Arc::clone(&running);
tokio::spawn(async move {
tokio::signal::ctrl_c().await.unwrap();
info!("Received shutdown signal, exiting...");
running_clone.store(false, Ordering::SeqCst);
})?;
r_clone.notify_waiters();
});
// We store the original config_path to allow proper reloading later
let config_path_clone = config_path.clone();
let config = Arc::new(RwLock::new(crate::config::load_config(config_path)));
// 1. Network Thread
// 1. Network Task
let poll_state = Arc::clone(&state);
let poll_running = Arc::clone(&running);
thread::spawn(move || {
info!("Starting Network polling thread");
tokio::spawn(async move {
info!("Starting Network polling task");
let mut daemon = NetworkDaemon::new();
while poll_running.load(Ordering::SeqCst) {
daemon.poll(Arc::clone(&poll_state));
thread::sleep(Duration::from_secs(1));
loop {
daemon.poll(Arc::clone(&poll_state)).await;
sleep(Duration::from_secs(1)).await;
}
});
// 2. Fast Hardware Thread (CPU, Mem, Load)
// 2. Fast Hardware Task (CPU, Mem, Load)
let poll_state = Arc::clone(&state);
let poll_running = Arc::clone(&running);
thread::spawn(move || {
info!("Starting Fast Hardware polling thread");
tokio::spawn(async move {
info!("Starting Fast Hardware polling task");
let mut daemon = HardwareDaemon::new();
while poll_running.load(Ordering::SeqCst) {
daemon.poll_fast(Arc::clone(&poll_state));
thread::sleep(Duration::from_secs(1));
loop {
daemon.poll_fast(Arc::clone(&poll_state)).await;
sleep(Duration::from_secs(1)).await;
}
});
// 3. Slow Hardware Thread (GPU, Disks)
// 3. Slow Hardware Task (GPU, Disks)
let poll_state = Arc::clone(&state);
let poll_running = Arc::clone(&running);
thread::spawn(move || {
info!("Starting Slow Hardware polling thread");
tokio::spawn(async move {
info!("Starting Slow Hardware polling task");
let mut daemon = HardwareDaemon::new();
while poll_running.load(Ordering::SeqCst) {
daemon.poll_slow(Arc::clone(&poll_state));
thread::sleep(Duration::from_secs(1));
loop {
daemon.poll_slow(Arc::clone(&poll_state)).await;
sleep(Duration::from_secs(1)).await;
}
});
// 4. Bluetooth Thread
// 4. Bluetooth Task
let poll_state = Arc::clone(&state);
let poll_running = Arc::clone(&running);
thread::spawn(move || {
info!("Starting Bluetooth polling thread");
tokio::spawn(async move {
info!("Starting Bluetooth polling task");
let mut daemon = BtDaemon::new();
while poll_running.load(Ordering::SeqCst) {
daemon.poll(Arc::clone(&poll_state));
thread::sleep(Duration::from_secs(1));
loop {
daemon.poll(Arc::clone(&poll_state)).await;
sleep(Duration::from_secs(1)).await;
}
});
// 5. Audio Thread (Event driven)
// 5. Audio Thread (Event driven - pulse usually needs its own thread)
let audio_daemon = AudioDaemon::new();
audio_daemon.start(Arc::clone(&state));
info!("Fluxo daemon successfully bound to socket: {}", sock_path);
// Use non-blocking accept so we can check the running flag
listener.set_nonblocking(true)?;
while running.load(Ordering::SeqCst) {
match listener.accept() {
Ok((mut stream, _)) => {
let state_clone = Arc::clone(&state);
let config_clone = Arc::clone(&config);
let cp_clone = config_path_clone.clone();
thread::spawn(move || {
let mut reader = BufReader::new(&stream);
let mut request = String::new();
if let Err(e) = reader.read_line(&mut request) {
error!("Failed to read from IPC stream: {}", e);
return;
}
drop(reader);
let request = request.trim();
if request.is_empty() {
return;
}
let parts: Vec<&str> = request.split_whitespace().collect();
if let Some(module_name) = parts.first() {
if *module_name == "reload" {
info!("Reloading configuration...");
let new_config = crate::config::load_config(cp_clone);
if let Ok(mut config_lock) = config_clone.write() {
*config_lock = new_config;
let _ = stream.write_all(b"{\"text\":\"ok\"}");
info!("Configuration reloaded successfully.");
} else {
error!("Failed to acquire write lock for configuration reload.");
}
let _ = stream.shutdown(Shutdown::Write);
return;
}
debug!(module = module_name, args = ?&parts[1..], "Handling IPC request");
let response =
handle_request(module_name, &parts[1..], &state_clone, &config_clone);
if let Err(e) = stream.write_all(response.as_bytes()) {
if e.kind() == std::io::ErrorKind::BrokenPipe
|| e.kind() == std::io::ErrorKind::ConnectionReset
{
debug!(
"IPC client disconnected before response could be sent: {}",
e
);
} else {
error!("Failed to write IPC response: {}", e);
}
}
let _ = stream.shutdown(Shutdown::Write);
}
});
loop {
tokio::select! {
_ = running.notified() => {
break;
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(50));
res = listener.accept() => {
match res {
Ok((mut stream, _)) => {
let state_clone = Arc::clone(&state);
let config_clone = Arc::clone(&config);
let cp_clone = config_path_clone.clone();
tokio::spawn(async move {
let (reader, mut writer) = stream.split();
let mut reader = BufReader::new(reader);
let mut request = String::new();
if let Err(e) = reader.read_line(&mut request).await {
error!("Failed to read from IPC stream: {}", e);
return;
}
let request = request.trim();
if request.is_empty() {
return;
}
let parts: Vec<&str> = request.split_whitespace().collect();
if let Some(module_name) = parts.first() {
if *module_name == "reload" {
info!("Reloading configuration...");
let new_config = crate::config::load_config(cp_clone);
let mut config_lock = config_clone.write().await;
*config_lock = new_config;
let _ = writer.write_all(b"{\"text\":\"ok\"}").await;
info!("Configuration reloaded successfully.");
return;
}
debug!(module = module_name, args = ?&parts[1..], "Handling IPC request");
let response =
handle_request(module_name, &parts[1..], &state_clone, &config_clone).await;
if let Err(e) = writer.write_all(response.as_bytes()).await {
if e.kind() == std::io::ErrorKind::BrokenPipe
|| e.kind() == std::io::ErrorKind::ConnectionReset
{
debug!(
"IPC client disconnected before response could be sent: {}",
e
);
} else {
error!("Failed to write IPC response: {}", e);
}
}
}
});
}
Err(e) => error!("Failed to accept incoming connection: {}", e),
}
}
Err(e) => error!("Failed to accept incoming connection: {}", e),
}
}
@@ -178,53 +169,127 @@ pub fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
Ok(())
}
fn handle_request(
async fn handle_request(
module_name: &str,
args: &[&str],
state: &SharedState,
config_lock: &Arc<RwLock<Config>>,
) -> String {
let config = if let Ok(c) = config_lock.read() {
c
} else {
error!("Failed to acquire read lock for configuration.");
return "{\"text\":\"error: config lock failed\"}".to_string();
};
let result = match module_name {
"net" | "network" => crate::modules::network::NetworkModule.run(&config, state, args),
"cpu" => crate::modules::cpu::CpuModule.run(&config, state, args),
"mem" | "memory" => crate::modules::memory::MemoryModule.run(&config, state, args),
"disk" => crate::modules::disk::DiskModule.run(&config, state, args),
"pool" | "btrfs" => crate::modules::btrfs::BtrfsModule.run(&config, state, args),
"vol" => crate::modules::audio::AudioModule.run(
&config,
state,
&["sink", args.first().unwrap_or(&"show")],
),
"mic" => crate::modules::audio::AudioModule.run(
&config,
state,
&["source", args.first().unwrap_or(&"show")],
),
"gpu" => crate::modules::gpu::GpuModule.run(&config, state, args),
"sys" => crate::modules::sys::SysModule.run(&config, state, args),
"bt" | "bluetooth" => crate::modules::bt::BtModule.run(&config, state, args),
"power" => crate::modules::power::PowerModule.run(&config, state, args),
"game" => crate::modules::game::GameModule.run(&config, state, args),
_ => {
warn!("Received request for unknown module: '{}'", module_name);
Err(anyhow::anyhow!("Unknown module: {}", module_name))
// 1. Check Circuit Breaker status
let is_in_backoff = {
let lock = state.read().await;
if let Some(health) = lock.health.get(module_name) {
if let Some(until) = health.backoff_until {
Instant::now() < until
} else {
false
}
} else {
false
}
};
if is_in_backoff {
return format!(
"{{\"text\":\"\u{200B}Cooling down ({})\u{200B}\",\"class\":\"error\"}}",
module_name
);
}
let config = config_lock.read().await;
let result = match module_name {
"net" | "network" => {
crate::modules::network::NetworkModule
.run(&config, state, args)
.await
}
"cpu" => {
crate::modules::cpu::CpuModule
.run(&config, state, args)
.await
}
"mem" | "memory" => {
crate::modules::memory::MemoryModule
.run(&config, state, args)
.await
}
"disk" => {
crate::modules::disk::DiskModule
.run(&config, state, args)
.await
}
"pool" | "btrfs" => {
crate::modules::btrfs::BtrfsModule
.run(&config, state, args)
.await
}
"vol" => {
crate::modules::audio::AudioModule
.run(&config, state, &["sink", args.first().unwrap_or(&"show")])
.await
}
"mic" => {
crate::modules::audio::AudioModule
.run(&config, state, &["source", args.first().unwrap_or(&"show")])
.await
}
"gpu" => {
crate::modules::gpu::GpuModule
.run(&config, state, args)
.await
}
"sys" => {
crate::modules::sys::SysModule
.run(&config, state, args)
.await
}
"bt" | "bluetooth" => crate::modules::bt::BtModule.run(&config, state, args).await,
"power" => {
crate::modules::power::PowerModule
.run(&config, state, args)
.await
}
"game" => {
crate::modules::game::GameModule
.run(&config, state, args)
.await
}
_ => {
warn!("Received request for unknown module: '{}'", module_name);
Err(FluxoError::Ipc(format!("Unknown module: {}", module_name)))
}
};
// 2. Update Health based on result
{
let mut lock = state.write().await;
let health = lock.health.entry(module_name.to_string()).or_default();
match &result {
Ok(_) => {
health.consecutive_failures = 0;
health.backoff_until = None;
}
Err(e) => {
health.consecutive_failures += 1;
health.last_failure = Some(Instant::now());
if health.consecutive_failures >= 3 {
// Backoff for 30 seconds after 3 failures
health.backoff_until = Some(Instant::now() + Duration::from_secs(30));
warn!(module = module_name, error = %e, "Module entered backoff state due to repeated failures");
}
}
}
}
match result {
Ok(output) => serde_json::to_string(&output).unwrap_or_else(|_| "{}".to_string()),
Err(e) => {
error!(module = module_name, error = %e, "Module execution failed");
let error_msg = e.to_string();
error!(module = module_name, error = %error_msg, "Module execution failed");
let err_out = crate::output::WaybarOutput {
text: "\u{200B}Error\u{200B}".to_string(),
tooltip: Some(e.to_string()),
tooltip: Some(error_msg),
class: Some("error".to_string()),
percentage: None,
};