daemon decomposition
This commit is contained in:
+59
-50
@@ -202,54 +202,7 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
|
|||||||
|
|
||||||
let resolved_config_path = get_config_path(config_path.clone());
|
let resolved_config_path = get_config_path(config_path.clone());
|
||||||
let config = Arc::new(RwLock::new(crate::config::load_config(config_path.clone())));
|
let config = Arc::new(RwLock::new(crate::config::load_config(config_path.clone())));
|
||||||
|
spawn_config_watchers(&config, &resolved_config_path);
|
||||||
// 0. Config Watcher (Hot Reload)
|
|
||||||
let watcher_config = Arc::clone(&config);
|
|
||||||
let watcher_path = resolved_config_path.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let (ev_tx, mut ev_rx) = mpsc::channel(1);
|
|
||||||
let mut watcher = RecommendedWatcher::new(
|
|
||||||
move |res: notify::Result<Event>| {
|
|
||||||
if let Ok(event) = res
|
|
||||||
&& (event.kind.is_modify() || event.kind.is_create())
|
|
||||||
{
|
|
||||||
let _ = ev_tx.blocking_send(());
|
|
||||||
}
|
|
||||||
},
|
|
||||||
NotifyConfig::default(),
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
if let Some(parent) = watcher_path.parent() {
|
|
||||||
let _ = watcher.watch(parent, RecursiveMode::NonRecursive);
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Config watcher started on {:?}", watcher_path);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
_ = ev_rx.recv() => {
|
|
||||||
// Debounce reloads
|
|
||||||
sleep(Duration::from_millis(100)).await;
|
|
||||||
while ev_rx.try_recv().is_ok() {}
|
|
||||||
reload_config(&watcher_config, Some(watcher_path.clone())).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// 0.1 SIGHUP Handler
|
|
||||||
let hup_config = Arc::clone(&config);
|
|
||||||
let hup_path = resolved_config_path.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
use tokio::signal::unix::{SignalKind, signal};
|
|
||||||
let mut stream = signal(SignalKind::hangup()).unwrap();
|
|
||||||
loop {
|
|
||||||
stream.recv().await;
|
|
||||||
info!("Received SIGHUP, reloading config...");
|
|
||||||
reload_config(&hup_config, Some(hup_path.clone())).await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// 1. Network Task
|
// 1. Network Task
|
||||||
#[cfg(feature = "mod-network")]
|
#[cfg(feature = "mod-network")]
|
||||||
@@ -376,12 +329,68 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
|
|||||||
});
|
});
|
||||||
|
|
||||||
info!("Fluxo daemon successfully bound to socket: {}", sock_path);
|
info!("Fluxo daemon successfully bound to socket: {}", sock_path);
|
||||||
|
run_ipc_loop(listener, receivers, config, config_path, cancel_token).await
|
||||||
|
}
|
||||||
|
|
||||||
|
fn spawn_config_watchers(config: &Arc<RwLock<Config>>, resolved_path: &std::path::Path) {
|
||||||
|
// File watcher (hot reload on modify/create)
|
||||||
|
let watcher_config = Arc::clone(config);
|
||||||
|
let watcher_path = resolved_path.to_path_buf();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let (ev_tx, mut ev_rx) = mpsc::channel(1);
|
||||||
|
let mut watcher = RecommendedWatcher::new(
|
||||||
|
move |res: notify::Result<Event>| {
|
||||||
|
if let Ok(event) = res
|
||||||
|
&& (event.kind.is_modify() || event.kind.is_create())
|
||||||
|
{
|
||||||
|
let _ = ev_tx.blocking_send(());
|
||||||
|
}
|
||||||
|
},
|
||||||
|
NotifyConfig::default(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
if let Some(parent) = watcher_path.parent() {
|
||||||
|
let _ = watcher.watch(parent, RecursiveMode::NonRecursive);
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Config watcher started on {:?}", watcher_path);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = cancel_token.cancelled() => {
|
_ = ev_rx.recv() => {
|
||||||
break;
|
sleep(Duration::from_millis(100)).await;
|
||||||
|
while ev_rx.try_recv().is_ok() {}
|
||||||
|
reload_config(&watcher_config, Some(watcher_path.clone())).await;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// SIGHUP handler
|
||||||
|
let hup_config = Arc::clone(config);
|
||||||
|
let hup_path = resolved_path.to_path_buf();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
use tokio::signal::unix::{SignalKind, signal};
|
||||||
|
let mut stream = signal(SignalKind::hangup()).unwrap();
|
||||||
|
loop {
|
||||||
|
stream.recv().await;
|
||||||
|
info!("Received SIGHUP, reloading config...");
|
||||||
|
reload_config(&hup_config, Some(hup_path.clone())).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_ipc_loop(
|
||||||
|
listener: UnixListener,
|
||||||
|
receivers: AppReceivers,
|
||||||
|
config: Arc<RwLock<Config>>,
|
||||||
|
config_path: Option<PathBuf>,
|
||||||
|
cancel_token: CancellationToken,
|
||||||
|
) -> Result<()> {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancel_token.cancelled() => break,
|
||||||
res = listener.accept() => {
|
res = listener.accept() => {
|
||||||
match res {
|
match res {
|
||||||
Ok((mut stream, _)) => {
|
Ok((mut stream, _)) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user