This commit is contained in:
+107
-16
@@ -16,6 +16,7 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::net::UnixListener;
|
||||
use tokio::sync::{RwLock, watch};
|
||||
use tokio::time::{Duration, Instant, sleep};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
struct SocketGuard {
|
||||
@@ -56,7 +57,7 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
|
||||
disks: disks_rx,
|
||||
bluetooth: bt_rx,
|
||||
audio: audio_rx,
|
||||
health,
|
||||
health: Arc::clone(&health),
|
||||
};
|
||||
|
||||
let listener = UnixListener::bind(&sock_path)?;
|
||||
@@ -64,62 +65,99 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
|
||||
path: sock_path.clone(),
|
||||
};
|
||||
|
||||
// Signal handling for graceful shutdown in async context
|
||||
let running = Arc::new(tokio::sync::Notify::new());
|
||||
let r_clone = Arc::clone(&running);
|
||||
// Signal handling for graceful shutdown
|
||||
let cancel_token = CancellationToken::new();
|
||||
let token_clone = cancel_token.clone();
|
||||
tokio::spawn(async move {
|
||||
tokio::signal::ctrl_c().await.unwrap();
|
||||
info!("Received shutdown signal, exiting...");
|
||||
r_clone.notify_waiters();
|
||||
token_clone.cancel();
|
||||
});
|
||||
|
||||
let config_path_clone = config_path.clone();
|
||||
let config = Arc::new(RwLock::new(crate::config::load_config(config_path)));
|
||||
|
||||
// 1. Network Task
|
||||
let token = cancel_token.clone();
|
||||
let net_health = Arc::clone(&health);
|
||||
tokio::spawn(async move {
|
||||
info!("Starting Network polling task");
|
||||
let mut daemon = NetworkDaemon::new();
|
||||
loop {
|
||||
daemon.poll(&net_tx).await;
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
tokio::select! {
|
||||
_ = token.cancelled() => break,
|
||||
_ = sleep(Duration::from_secs(1)) => {
|
||||
if !is_in_backoff("net", &net_health).await {
|
||||
let res = daemon.poll(&net_tx).await;
|
||||
handle_poll_result("net", res, &net_health).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("Network task shut down.");
|
||||
});
|
||||
|
||||
// 2. Fast Hardware Task (CPU, Mem, Load)
|
||||
let token = cancel_token.clone();
|
||||
let hw_health = Arc::clone(&health);
|
||||
tokio::spawn(async move {
|
||||
info!("Starting Fast Hardware polling task");
|
||||
let mut daemon = HardwareDaemon::new();
|
||||
loop {
|
||||
daemon.poll_fast(&cpu_tx, &mem_tx, &sys_tx).await;
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
tokio::select! {
|
||||
_ = token.cancelled() => break,
|
||||
_ = sleep(Duration::from_secs(1)) => {
|
||||
if !is_in_backoff("cpu", &hw_health).await {
|
||||
daemon.poll_fast(&cpu_tx, &mem_tx, &sys_tx).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("Fast Hardware task shut down.");
|
||||
});
|
||||
|
||||
// 3. Slow Hardware Task (GPU, Disks)
|
||||
let token = cancel_token.clone();
|
||||
let slow_health = Arc::clone(&health);
|
||||
tokio::spawn(async move {
|
||||
info!("Starting Slow Hardware polling task");
|
||||
let mut daemon = HardwareDaemon::new();
|
||||
loop {
|
||||
daemon.poll_slow(&gpu_tx, &disks_tx).await;
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
tokio::select! {
|
||||
_ = token.cancelled() => break,
|
||||
_ = sleep(Duration::from_secs(5)) => {
|
||||
if !is_in_backoff("gpu", &slow_health).await {
|
||||
daemon.poll_slow(&gpu_tx, &disks_tx).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("Slow Hardware task shut down.");
|
||||
});
|
||||
|
||||
// 4. Bluetooth Task
|
||||
let token = cancel_token.clone();
|
||||
let bt_health = Arc::clone(&health);
|
||||
let poll_config = Arc::clone(&config);
|
||||
let poll_receivers = receivers.clone();
|
||||
tokio::spawn(async move {
|
||||
info!("Starting Bluetooth polling task");
|
||||
let mut daemon = BtDaemon::new();
|
||||
loop {
|
||||
let config = poll_config.read().await;
|
||||
daemon.poll(&bt_tx, &poll_receivers, &config).await;
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
tokio::select! {
|
||||
_ = token.cancelled() => break,
|
||||
_ = sleep(Duration::from_secs(2)) => {
|
||||
if !is_in_backoff("bt", &bt_health).await {
|
||||
let config = poll_config.read().await;
|
||||
daemon.poll(&bt_tx, &poll_receivers, &config).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("Bluetooth task shut down.");
|
||||
});
|
||||
|
||||
// 5. Audio Thread (Event driven - pulse usually needs its own thread)
|
||||
// 5. Audio Thread (Event driven)
|
||||
let audio_daemon = AudioDaemon::new();
|
||||
audio_daemon.start(&audio_tx);
|
||||
|
||||
@@ -127,7 +165,7 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = running.notified() => {
|
||||
_ = cancel_token.cancelled() => {
|
||||
break;
|
||||
}
|
||||
res = listener.accept() => {
|
||||
@@ -190,6 +228,59 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_poll_result(
|
||||
module_name: &str,
|
||||
result: crate::error::Result<()>,
|
||||
health_lock: &Arc<RwLock<HashMap<String, crate::state::ModuleHealth>>>,
|
||||
) {
|
||||
let mut lock = health_lock.write().await;
|
||||
let health = lock.entry(module_name.to_string()).or_default();
|
||||
|
||||
match result {
|
||||
Ok(_) => {
|
||||
if health.consecutive_failures > 0 {
|
||||
info!(
|
||||
module = module_name,
|
||||
"Module recovered after {} failures", health.consecutive_failures
|
||||
);
|
||||
}
|
||||
health.consecutive_failures = 0;
|
||||
health.backoff_until = None;
|
||||
}
|
||||
Err(e) => {
|
||||
health.consecutive_failures += 1;
|
||||
health.last_failure = Some(Instant::now());
|
||||
|
||||
if !e.is_transient() {
|
||||
// Fatal errors trigger immediate long backoff
|
||||
health.backoff_until = Some(Instant::now() + Duration::from_secs(60));
|
||||
error!(module = module_name, error = %e, "Fatal module error, entering long cooldown");
|
||||
} else if health.consecutive_failures >= 3 {
|
||||
// Exponential backoff for transient errors: 30s, 60s, 120s...
|
||||
let backoff_secs = 30 * (2u64.pow(health.consecutive_failures.saturating_sub(3)));
|
||||
let backoff_secs = backoff_secs.min(3600); // Cap at 1 hour
|
||||
health.backoff_until = Some(Instant::now() + Duration::from_secs(backoff_secs));
|
||||
warn!(module = module_name, error = %e, backoff = backoff_secs, "Repeated transient failures, entering backoff");
|
||||
} else {
|
||||
debug!(module = module_name, error = %e, "Transient module failure (attempt {})", health.consecutive_failures);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn is_in_backoff(
|
||||
module_name: &str,
|
||||
health_lock: &Arc<RwLock<HashMap<String, crate::state::ModuleHealth>>>,
|
||||
) -> bool {
|
||||
let lock = health_lock.read().await;
|
||||
if let Some(health) = lock.get(module_name)
|
||||
&& let Some(until) = health.backoff_until
|
||||
{
|
||||
return Instant::now() < until;
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
async fn handle_request(
|
||||
module_name: &str,
|
||||
args: &[&str],
|
||||
|
||||
Reference in New Issue
Block a user