2 Commits

Author SHA1 Message Date
nvrl d28e51de6d version bupm
Release / Build and Release (push) Successful in 2m10s
2026-04-02 18:46:33 +02:00
nvrl b18af49ac8 graceful shutdown
Release / Build and Release (push) Has been cancelled
2026-04-02 18:46:17 +02:00
5 changed files with 207 additions and 64 deletions
Generated
+2 -1
View File
@@ -353,7 +353,7 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
[[package]]
name = "fluxo-rs"
version = "0.3.1"
version = "0.3.2"
dependencies = [
"anyhow",
"bluer",
@@ -1270,6 +1270,7 @@ dependencies = [
"futures-core",
"futures-sink",
"pin-project-lite",
"slab",
"tokio",
]
+2 -2
View File
@@ -1,6 +1,6 @@
[package]
name = "fluxo-rs"
version = "0.3.2"
version = "0.3.3"
edition = "2024"
[dependencies]
@@ -18,7 +18,7 @@ tracing-subscriber = { version = "0.3.23", features = ["env-filter"] }
maestro = { git = "https://github.com/qzed/pbpctrl", package = "maestro" }
bluer = { version = "0.17", features = ["bluetoothd", "rfcomm", "id"] }
tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "time", "macros", "signal", "process"] }
tokio-util = { version = "0.7", features = ["codec"] }
tokio-util = { version = "0.7", features = ["codec", "time"] }
futures = "0.3"
libpulse-binding = "2.30"
nix = { version = "0.31", features = ["net"] }
+107 -16
View File
@@ -16,6 +16,7 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixListener;
use tokio::sync::{RwLock, watch};
use tokio::time::{Duration, Instant, sleep};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
struct SocketGuard {
@@ -56,7 +57,7 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
disks: disks_rx,
bluetooth: bt_rx,
audio: audio_rx,
health,
health: Arc::clone(&health),
};
let listener = UnixListener::bind(&sock_path)?;
@@ -64,62 +65,99 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
path: sock_path.clone(),
};
// Signal handling for graceful shutdown in async context
let running = Arc::new(tokio::sync::Notify::new());
let r_clone = Arc::clone(&running);
// Signal handling for graceful shutdown
let cancel_token = CancellationToken::new();
let token_clone = cancel_token.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.unwrap();
info!("Received shutdown signal, exiting...");
r_clone.notify_waiters();
token_clone.cancel();
});
let config_path_clone = config_path.clone();
let config = Arc::new(RwLock::new(crate::config::load_config(config_path)));
// 1. Network Task
let token = cancel_token.clone();
let net_health = Arc::clone(&health);
tokio::spawn(async move {
info!("Starting Network polling task");
let mut daemon = NetworkDaemon::new();
loop {
daemon.poll(&net_tx).await;
sleep(Duration::from_secs(1)).await;
tokio::select! {
_ = token.cancelled() => break,
_ = sleep(Duration::from_secs(1)) => {
if !is_in_backoff("net", &net_health).await {
let res = daemon.poll(&net_tx).await;
handle_poll_result("net", res, &net_health).await;
}
}
}
}
info!("Network task shut down.");
});
// 2. Fast Hardware Task (CPU, Mem, Load)
let token = cancel_token.clone();
let hw_health = Arc::clone(&health);
tokio::spawn(async move {
info!("Starting Fast Hardware polling task");
let mut daemon = HardwareDaemon::new();
loop {
daemon.poll_fast(&cpu_tx, &mem_tx, &sys_tx).await;
sleep(Duration::from_secs(1)).await;
tokio::select! {
_ = token.cancelled() => break,
_ = sleep(Duration::from_secs(1)) => {
if !is_in_backoff("cpu", &hw_health).await {
daemon.poll_fast(&cpu_tx, &mem_tx, &sys_tx).await;
}
}
}
}
info!("Fast Hardware task shut down.");
});
// 3. Slow Hardware Task (GPU, Disks)
let token = cancel_token.clone();
let slow_health = Arc::clone(&health);
tokio::spawn(async move {
info!("Starting Slow Hardware polling task");
let mut daemon = HardwareDaemon::new();
loop {
daemon.poll_slow(&gpu_tx, &disks_tx).await;
sleep(Duration::from_secs(1)).await;
tokio::select! {
_ = token.cancelled() => break,
_ = sleep(Duration::from_secs(5)) => {
if !is_in_backoff("gpu", &slow_health).await {
daemon.poll_slow(&gpu_tx, &disks_tx).await;
}
}
}
}
info!("Slow Hardware task shut down.");
});
// 4. Bluetooth Task
let token = cancel_token.clone();
let bt_health = Arc::clone(&health);
let poll_config = Arc::clone(&config);
let poll_receivers = receivers.clone();
tokio::spawn(async move {
info!("Starting Bluetooth polling task");
let mut daemon = BtDaemon::new();
loop {
let config = poll_config.read().await;
daemon.poll(&bt_tx, &poll_receivers, &config).await;
sleep(Duration::from_secs(1)).await;
tokio::select! {
_ = token.cancelled() => break,
_ = sleep(Duration::from_secs(2)) => {
if !is_in_backoff("bt", &bt_health).await {
let config = poll_config.read().await;
daemon.poll(&bt_tx, &poll_receivers, &config).await;
}
}
}
}
info!("Bluetooth task shut down.");
});
// 5. Audio Thread (Event driven - pulse usually needs its own thread)
// 5. Audio Thread (Event driven)
let audio_daemon = AudioDaemon::new();
audio_daemon.start(&audio_tx);
@@ -127,7 +165,7 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
loop {
tokio::select! {
_ = running.notified() => {
_ = cancel_token.cancelled() => {
break;
}
res = listener.accept() => {
@@ -190,6 +228,59 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
Ok(())
}
async fn handle_poll_result(
module_name: &str,
result: crate::error::Result<()>,
health_lock: &Arc<RwLock<HashMap<String, crate::state::ModuleHealth>>>,
) {
let mut lock = health_lock.write().await;
let health = lock.entry(module_name.to_string()).or_default();
match result {
Ok(_) => {
if health.consecutive_failures > 0 {
info!(
module = module_name,
"Module recovered after {} failures", health.consecutive_failures
);
}
health.consecutive_failures = 0;
health.backoff_until = None;
}
Err(e) => {
health.consecutive_failures += 1;
health.last_failure = Some(Instant::now());
if !e.is_transient() {
// Fatal errors trigger immediate long backoff
health.backoff_until = Some(Instant::now() + Duration::from_secs(60));
error!(module = module_name, error = %e, "Fatal module error, entering long cooldown");
} else if health.consecutive_failures >= 3 {
// Exponential backoff for transient errors: 30s, 60s, 120s...
let backoff_secs = 30 * (2u64.pow(health.consecutive_failures.saturating_sub(3)));
let backoff_secs = backoff_secs.min(3600); // Cap at 1 hour
health.backoff_until = Some(Instant::now() + Duration::from_secs(backoff_secs));
warn!(module = module_name, error = %e, backoff = backoff_secs, "Repeated transient failures, entering backoff");
} else {
debug!(module = module_name, error = %e, "Transient module failure (attempt {})", health.consecutive_failures);
}
}
}
}
async fn is_in_backoff(
module_name: &str,
health_lock: &Arc<RwLock<HashMap<String, crate::state::ModuleHealth>>>,
) -> bool {
let lock = health_lock.read().await;
if let Some(health) = lock.get(module_name)
&& let Some(until) = health.backoff_until
{
return Instant::now() < until;
}
false
}
async fn handle_request(
module_name: &str,
args: &[&str],
+23
View File
@@ -18,6 +18,15 @@ pub enum FluxoError {
#[error("External system error: {0}")]
System(String),
#[error("Bluetooth error: {0}")]
Bluetooth(String),
#[error("Network error: {0}")]
Network(String),
#[error("Hardware error: {0}")]
Hardware(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
@@ -28,4 +37,18 @@ pub enum FluxoError {
Other(#[from] anyhow::Error),
}
impl FluxoError {
pub fn is_transient(&self) -> bool {
matches!(
self,
Self::Io(_)
| Self::System(_)
| Self::Bluetooth(_)
| Self::Network(_)
| Self::Hardware(_)
| Self::Module { .. }
)
}
}
pub type Result<T> = std::result::Result<T, FluxoError>;
+73 -45
View File
@@ -19,6 +19,8 @@ pub struct NetworkDaemon {
cached_ip: Option<String>,
}
type PollResult = crate::error::Result<(String, Option<String>, Option<(u64, u64)>)>;
impl NetworkDaemon {
pub fn new() -> Self {
Self {
@@ -30,18 +32,21 @@ impl NetworkDaemon {
}
}
pub async fn poll(&mut self, state_tx: &watch::Sender<NetworkState>) {
let (iface, ip_opt, bytes_opt) = tokio::task::spawn_blocking(|| {
let iface = get_primary_interface().unwrap_or_default();
pub async fn poll(
&mut self,
state_tx: &watch::Sender<NetworkState>,
) -> crate::error::Result<()> {
let (iface, ip_opt, bytes_opt) = tokio::task::spawn_blocking(|| -> PollResult {
let iface = get_primary_interface()?;
if iface.is_empty() {
return (String::new(), None, None);
return Ok((String::new(), None, None));
}
let ip = get_ip_address(&iface);
let bytes = get_bytes(&iface).ok();
(iface, ip, bytes)
Ok((iface, ip, bytes))
})
.await
.unwrap_or((String::new(), None, None));
.map_err(|e| crate::error::FluxoError::System(e.to_string()))??;
if !iface.is_empty() {
if self.cached_interface.as_ref() != Some(&iface) || self.cached_ip.is_none() {
@@ -51,54 +56,77 @@ impl NetworkDaemon {
} else {
self.cached_interface = None;
self.cached_ip = None;
// Provide a default state for "No connection"
let mut network = state_tx.borrow().clone();
network.interface.clear();
network.ip.clear();
network.rx_mbps = 0.0;
network.tx_mbps = 0.0;
let _ = state_tx.send(network);
return Err(crate::error::FluxoError::Network(
"No primary interface found".into(),
));
}
if let Some(ref interface) = self.cached_interface {
let time_now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
if let Some((rx_bytes_now, tx_bytes_now)) = bytes_opt {
if self.last_time > 0 && time_now > self.last_time {
let time_diff = (time_now - self.last_time) as f64;
let rx_mbps = (rx_bytes_now.saturating_sub(self.last_rx_bytes)) as f64
/ time_diff
/ 1024.0
/ 1024.0;
let tx_mbps = (tx_bytes_now.saturating_sub(self.last_tx_bytes)) as f64
/ time_diff
/ 1024.0
/ 1024.0;
let mut network = state_tx.borrow().clone();
network.rx_mbps = rx_mbps;
network.tx_mbps = tx_mbps;
network.interface = interface.clone();
network.ip = self.cached_ip.clone().unwrap_or_default();
let _ = state_tx.send(network);
} else {
// First poll: no speed data yet, but update interface/ip
let mut network = state_tx.borrow().clone();
network.interface = interface.clone();
network.ip = self.cached_ip.clone().unwrap_or_default();
let _ = state_tx.send(network);
}
self.last_time = time_now;
self.last_rx_bytes = rx_bytes_now;
self.last_tx_bytes = tx_bytes_now;
} else {
// Read failed, might be down
self.cached_interface = None;
}
let interface = if let Some(ref interface) = self.cached_interface {
interface.clone()
} else {
// No interface detected
let mut network = state_tx.borrow().clone();
network.interface.clear();
network.ip.clear();
network.rx_mbps = 0.0;
network.tx_mbps = 0.0;
let _ = state_tx.send(network);
return Err(crate::error::FluxoError::Network(
"Interface disappeared during poll".into(),
));
};
let time_now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
if let Some((rx_bytes_now, tx_bytes_now)) = bytes_opt {
if self.last_time > 0 && time_now > self.last_time {
let time_diff = (time_now - self.last_time) as f64;
let rx_mbps = (rx_bytes_now.saturating_sub(self.last_rx_bytes)) as f64
/ time_diff
/ 1024.0
/ 1024.0;
let tx_mbps = (tx_bytes_now.saturating_sub(self.last_tx_bytes)) as f64
/ time_diff
/ 1024.0
/ 1024.0;
let mut network = state_tx.borrow().clone();
network.rx_mbps = rx_mbps;
network.tx_mbps = tx_mbps;
network.interface = interface.clone();
network.ip = self.cached_ip.clone().unwrap_or_default();
let _ = state_tx.send(network);
} else {
// First poll: no speed data yet, but update interface/ip
let mut network = state_tx.borrow().clone();
network.interface = interface.clone();
network.ip = self.cached_ip.clone().unwrap_or_default();
let _ = state_tx.send(network);
}
self.last_time = time_now;
self.last_rx_bytes = rx_bytes_now;
self.last_tx_bytes = tx_bytes_now;
} else {
// Read failed, might be down
self.cached_interface = None;
return Err(crate::error::FluxoError::Network(format!(
"Failed to read bytes for {}",
interface
)));
}
Ok(())
}
}