From 708317a10bbd088568ab1301e97154b57227015e Mon Sep 17 00:00:00 2001 From: Nils Pukropp Date: Sat, 4 Apr 2026 18:39:15 +0200 Subject: [PATCH] daemon decomposition --- src/daemon.rs | 111 +++++++++++++++++++++++++++----------------------- 1 file changed, 60 insertions(+), 51 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index ef2a218..e231665 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -202,54 +202,7 @@ pub async fn run_daemon(config_path: Option) -> Result<()> { let resolved_config_path = get_config_path(config_path.clone()); let config = Arc::new(RwLock::new(crate::config::load_config(config_path.clone()))); - - // 0. Config Watcher (Hot Reload) - let watcher_config = Arc::clone(&config); - let watcher_path = resolved_config_path.clone(); - tokio::spawn(async move { - let (ev_tx, mut ev_rx) = mpsc::channel(1); - let mut watcher = RecommendedWatcher::new( - move |res: notify::Result| { - if let Ok(event) = res - && (event.kind.is_modify() || event.kind.is_create()) - { - let _ = ev_tx.blocking_send(()); - } - }, - NotifyConfig::default(), - ) - .unwrap(); - - if let Some(parent) = watcher_path.parent() { - let _ = watcher.watch(parent, RecursiveMode::NonRecursive); - } - - info!("Config watcher started on {:?}", watcher_path); - - loop { - tokio::select! { - _ = ev_rx.recv() => { - // Debounce reloads - sleep(Duration::from_millis(100)).await; - while ev_rx.try_recv().is_ok() {} - reload_config(&watcher_config, Some(watcher_path.clone())).await; - } - } - } - }); - - // 0.1 SIGHUP Handler - let hup_config = Arc::clone(&config); - let hup_path = resolved_config_path.clone(); - tokio::spawn(async move { - use tokio::signal::unix::{SignalKind, signal}; - let mut stream = signal(SignalKind::hangup()).unwrap(); - loop { - stream.recv().await; - info!("Received SIGHUP, reloading config..."); - reload_config(&hup_config, Some(hup_path.clone())).await; - } - }); + spawn_config_watchers(&config, &resolved_config_path); // 1. Network Task #[cfg(feature = "mod-network")] @@ -376,12 +329,68 @@ pub async fn run_daemon(config_path: Option) -> Result<()> { }); info!("Fluxo daemon successfully bound to socket: {}", sock_path); + run_ipc_loop(listener, receivers, config, config_path, cancel_token).await +} +fn spawn_config_watchers(config: &Arc>, resolved_path: &std::path::Path) { + // File watcher (hot reload on modify/create) + let watcher_config = Arc::clone(config); + let watcher_path = resolved_path.to_path_buf(); + tokio::spawn(async move { + let (ev_tx, mut ev_rx) = mpsc::channel(1); + let mut watcher = RecommendedWatcher::new( + move |res: notify::Result| { + if let Ok(event) = res + && (event.kind.is_modify() || event.kind.is_create()) + { + let _ = ev_tx.blocking_send(()); + } + }, + NotifyConfig::default(), + ) + .unwrap(); + + if let Some(parent) = watcher_path.parent() { + let _ = watcher.watch(parent, RecursiveMode::NonRecursive); + } + + info!("Config watcher started on {:?}", watcher_path); + + loop { + tokio::select! { + _ = ev_rx.recv() => { + sleep(Duration::from_millis(100)).await; + while ev_rx.try_recv().is_ok() {} + reload_config(&watcher_config, Some(watcher_path.clone())).await; + } + } + } + }); + + // SIGHUP handler + let hup_config = Arc::clone(config); + let hup_path = resolved_path.to_path_buf(); + tokio::spawn(async move { + use tokio::signal::unix::{SignalKind, signal}; + let mut stream = signal(SignalKind::hangup()).unwrap(); + loop { + stream.recv().await; + info!("Received SIGHUP, reloading config..."); + reload_config(&hup_config, Some(hup_path.clone())).await; + } + }); +} + +async fn run_ipc_loop( + listener: UnixListener, + receivers: AppReceivers, + config: Arc>, + config_path: Option, + cancel_token: CancellationToken, +) -> Result<()> { loop { tokio::select! { - _ = cancel_token.cancelled() => { - break; - } + _ = cancel_token.cancelled() => break, res = listener.accept() => { match res { Ok((mut stream, _)) => {