Compare commits
2 Commits
v0.3.2
..
d28e51de6d
| Author | SHA1 | Date | |
|---|---|---|---|
| d28e51de6d | |||
| b18af49ac8 |
Generated
+2
-1
@@ -353,7 +353,7 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
|
||||
|
||||
[[package]]
|
||||
name = "fluxo-rs"
|
||||
version = "0.3.1"
|
||||
version = "0.3.2"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bluer",
|
||||
@@ -1270,6 +1270,7 @@ dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"pin-project-lite",
|
||||
"slab",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
|
||||
+2
-2
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "fluxo-rs"
|
||||
version = "0.3.2"
|
||||
version = "0.3.3"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
@@ -18,7 +18,7 @@ tracing-subscriber = { version = "0.3.23", features = ["env-filter"] }
|
||||
maestro = { git = "https://github.com/qzed/pbpctrl", package = "maestro" }
|
||||
bluer = { version = "0.17", features = ["bluetoothd", "rfcomm", "id"] }
|
||||
tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "time", "macros", "signal", "process"] }
|
||||
tokio-util = { version = "0.7", features = ["codec"] }
|
||||
tokio-util = { version = "0.7", features = ["codec", "time"] }
|
||||
futures = "0.3"
|
||||
libpulse-binding = "2.30"
|
||||
nix = { version = "0.31", features = ["net"] }
|
||||
|
||||
+103
-12
@@ -16,6 +16,7 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::net::UnixListener;
|
||||
use tokio::sync::{RwLock, watch};
|
||||
use tokio::time::{Duration, Instant, sleep};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
struct SocketGuard {
|
||||
@@ -56,7 +57,7 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
|
||||
disks: disks_rx,
|
||||
bluetooth: bt_rx,
|
||||
audio: audio_rx,
|
||||
health,
|
||||
health: Arc::clone(&health),
|
||||
};
|
||||
|
||||
let listener = UnixListener::bind(&sock_path)?;
|
||||
@@ -64,62 +65,99 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
|
||||
path: sock_path.clone(),
|
||||
};
|
||||
|
||||
// Signal handling for graceful shutdown in async context
|
||||
let running = Arc::new(tokio::sync::Notify::new());
|
||||
let r_clone = Arc::clone(&running);
|
||||
// Signal handling for graceful shutdown
|
||||
let cancel_token = CancellationToken::new();
|
||||
let token_clone = cancel_token.clone();
|
||||
tokio::spawn(async move {
|
||||
tokio::signal::ctrl_c().await.unwrap();
|
||||
info!("Received shutdown signal, exiting...");
|
||||
r_clone.notify_waiters();
|
||||
token_clone.cancel();
|
||||
});
|
||||
|
||||
let config_path_clone = config_path.clone();
|
||||
let config = Arc::new(RwLock::new(crate::config::load_config(config_path)));
|
||||
|
||||
// 1. Network Task
|
||||
let token = cancel_token.clone();
|
||||
let net_health = Arc::clone(&health);
|
||||
tokio::spawn(async move {
|
||||
info!("Starting Network polling task");
|
||||
let mut daemon = NetworkDaemon::new();
|
||||
loop {
|
||||
daemon.poll(&net_tx).await;
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
tokio::select! {
|
||||
_ = token.cancelled() => break,
|
||||
_ = sleep(Duration::from_secs(1)) => {
|
||||
if !is_in_backoff("net", &net_health).await {
|
||||
let res = daemon.poll(&net_tx).await;
|
||||
handle_poll_result("net", res, &net_health).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("Network task shut down.");
|
||||
});
|
||||
|
||||
// 2. Fast Hardware Task (CPU, Mem, Load)
|
||||
let token = cancel_token.clone();
|
||||
let hw_health = Arc::clone(&health);
|
||||
tokio::spawn(async move {
|
||||
info!("Starting Fast Hardware polling task");
|
||||
let mut daemon = HardwareDaemon::new();
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = token.cancelled() => break,
|
||||
_ = sleep(Duration::from_secs(1)) => {
|
||||
if !is_in_backoff("cpu", &hw_health).await {
|
||||
daemon.poll_fast(&cpu_tx, &mem_tx, &sys_tx).await;
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("Fast Hardware task shut down.");
|
||||
});
|
||||
|
||||
// 3. Slow Hardware Task (GPU, Disks)
|
||||
let token = cancel_token.clone();
|
||||
let slow_health = Arc::clone(&health);
|
||||
tokio::spawn(async move {
|
||||
info!("Starting Slow Hardware polling task");
|
||||
let mut daemon = HardwareDaemon::new();
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = token.cancelled() => break,
|
||||
_ = sleep(Duration::from_secs(5)) => {
|
||||
if !is_in_backoff("gpu", &slow_health).await {
|
||||
daemon.poll_slow(&gpu_tx, &disks_tx).await;
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("Slow Hardware task shut down.");
|
||||
});
|
||||
|
||||
// 4. Bluetooth Task
|
||||
let token = cancel_token.clone();
|
||||
let bt_health = Arc::clone(&health);
|
||||
let poll_config = Arc::clone(&config);
|
||||
let poll_receivers = receivers.clone();
|
||||
tokio::spawn(async move {
|
||||
info!("Starting Bluetooth polling task");
|
||||
let mut daemon = BtDaemon::new();
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = token.cancelled() => break,
|
||||
_ = sleep(Duration::from_secs(2)) => {
|
||||
if !is_in_backoff("bt", &bt_health).await {
|
||||
let config = poll_config.read().await;
|
||||
daemon.poll(&bt_tx, &poll_receivers, &config).await;
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("Bluetooth task shut down.");
|
||||
});
|
||||
|
||||
// 5. Audio Thread (Event driven - pulse usually needs its own thread)
|
||||
// 5. Audio Thread (Event driven)
|
||||
let audio_daemon = AudioDaemon::new();
|
||||
audio_daemon.start(&audio_tx);
|
||||
|
||||
@@ -127,7 +165,7 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = running.notified() => {
|
||||
_ = cancel_token.cancelled() => {
|
||||
break;
|
||||
}
|
||||
res = listener.accept() => {
|
||||
@@ -190,6 +228,59 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_poll_result(
|
||||
module_name: &str,
|
||||
result: crate::error::Result<()>,
|
||||
health_lock: &Arc<RwLock<HashMap<String, crate::state::ModuleHealth>>>,
|
||||
) {
|
||||
let mut lock = health_lock.write().await;
|
||||
let health = lock.entry(module_name.to_string()).or_default();
|
||||
|
||||
match result {
|
||||
Ok(_) => {
|
||||
if health.consecutive_failures > 0 {
|
||||
info!(
|
||||
module = module_name,
|
||||
"Module recovered after {} failures", health.consecutive_failures
|
||||
);
|
||||
}
|
||||
health.consecutive_failures = 0;
|
||||
health.backoff_until = None;
|
||||
}
|
||||
Err(e) => {
|
||||
health.consecutive_failures += 1;
|
||||
health.last_failure = Some(Instant::now());
|
||||
|
||||
if !e.is_transient() {
|
||||
// Fatal errors trigger immediate long backoff
|
||||
health.backoff_until = Some(Instant::now() + Duration::from_secs(60));
|
||||
error!(module = module_name, error = %e, "Fatal module error, entering long cooldown");
|
||||
} else if health.consecutive_failures >= 3 {
|
||||
// Exponential backoff for transient errors: 30s, 60s, 120s...
|
||||
let backoff_secs = 30 * (2u64.pow(health.consecutive_failures.saturating_sub(3)));
|
||||
let backoff_secs = backoff_secs.min(3600); // Cap at 1 hour
|
||||
health.backoff_until = Some(Instant::now() + Duration::from_secs(backoff_secs));
|
||||
warn!(module = module_name, error = %e, backoff = backoff_secs, "Repeated transient failures, entering backoff");
|
||||
} else {
|
||||
debug!(module = module_name, error = %e, "Transient module failure (attempt {})", health.consecutive_failures);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn is_in_backoff(
|
||||
module_name: &str,
|
||||
health_lock: &Arc<RwLock<HashMap<String, crate::state::ModuleHealth>>>,
|
||||
) -> bool {
|
||||
let lock = health_lock.read().await;
|
||||
if let Some(health) = lock.get(module_name)
|
||||
&& let Some(until) = health.backoff_until
|
||||
{
|
||||
return Instant::now() < until;
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
async fn handle_request(
|
||||
module_name: &str,
|
||||
args: &[&str],
|
||||
|
||||
@@ -18,6 +18,15 @@ pub enum FluxoError {
|
||||
#[error("External system error: {0}")]
|
||||
System(String),
|
||||
|
||||
#[error("Bluetooth error: {0}")]
|
||||
Bluetooth(String),
|
||||
|
||||
#[error("Network error: {0}")]
|
||||
Network(String),
|
||||
|
||||
#[error("Hardware error: {0}")]
|
||||
Hardware(String),
|
||||
|
||||
#[error("IO error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
|
||||
@@ -28,4 +37,18 @@ pub enum FluxoError {
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl FluxoError {
|
||||
pub fn is_transient(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Self::Io(_)
|
||||
| Self::System(_)
|
||||
| Self::Bluetooth(_)
|
||||
| Self::Network(_)
|
||||
| Self::Hardware(_)
|
||||
| Self::Module { .. }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, FluxoError>;
|
||||
|
||||
+42
-14
@@ -19,6 +19,8 @@ pub struct NetworkDaemon {
|
||||
cached_ip: Option<String>,
|
||||
}
|
||||
|
||||
type PollResult = crate::error::Result<(String, Option<String>, Option<(u64, u64)>)>;
|
||||
|
||||
impl NetworkDaemon {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
@@ -30,18 +32,21 @@ impl NetworkDaemon {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn poll(&mut self, state_tx: &watch::Sender<NetworkState>) {
|
||||
let (iface, ip_opt, bytes_opt) = tokio::task::spawn_blocking(|| {
|
||||
let iface = get_primary_interface().unwrap_or_default();
|
||||
pub async fn poll(
|
||||
&mut self,
|
||||
state_tx: &watch::Sender<NetworkState>,
|
||||
) -> crate::error::Result<()> {
|
||||
let (iface, ip_opt, bytes_opt) = tokio::task::spawn_blocking(|| -> PollResult {
|
||||
let iface = get_primary_interface()?;
|
||||
if iface.is_empty() {
|
||||
return (String::new(), None, None);
|
||||
return Ok((String::new(), None, None));
|
||||
}
|
||||
let ip = get_ip_address(&iface);
|
||||
let bytes = get_bytes(&iface).ok();
|
||||
(iface, ip, bytes)
|
||||
Ok((iface, ip, bytes))
|
||||
})
|
||||
.await
|
||||
.unwrap_or((String::new(), None, None));
|
||||
.map_err(|e| crate::error::FluxoError::System(e.to_string()))??;
|
||||
|
||||
if !iface.is_empty() {
|
||||
if self.cached_interface.as_ref() != Some(&iface) || self.cached_ip.is_none() {
|
||||
@@ -51,9 +56,33 @@ impl NetworkDaemon {
|
||||
} else {
|
||||
self.cached_interface = None;
|
||||
self.cached_ip = None;
|
||||
// Provide a default state for "No connection"
|
||||
let mut network = state_tx.borrow().clone();
|
||||
network.interface.clear();
|
||||
network.ip.clear();
|
||||
network.rx_mbps = 0.0;
|
||||
network.tx_mbps = 0.0;
|
||||
let _ = state_tx.send(network);
|
||||
return Err(crate::error::FluxoError::Network(
|
||||
"No primary interface found".into(),
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(ref interface) = self.cached_interface {
|
||||
let interface = if let Some(ref interface) = self.cached_interface {
|
||||
interface.clone()
|
||||
} else {
|
||||
// No interface detected
|
||||
let mut network = state_tx.borrow().clone();
|
||||
network.interface.clear();
|
||||
network.ip.clear();
|
||||
network.rx_mbps = 0.0;
|
||||
network.tx_mbps = 0.0;
|
||||
let _ = state_tx.send(network);
|
||||
return Err(crate::error::FluxoError::Network(
|
||||
"Interface disappeared during poll".into(),
|
||||
));
|
||||
};
|
||||
|
||||
let time_now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
@@ -91,14 +120,13 @@ impl NetworkDaemon {
|
||||
} else {
|
||||
// Read failed, might be down
|
||||
self.cached_interface = None;
|
||||
return Err(crate::error::FluxoError::Network(format!(
|
||||
"Failed to read bytes for {}",
|
||||
interface
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
// No interface detected
|
||||
let mut network = state_tx.borrow().clone();
|
||||
network.interface.clear();
|
||||
network.ip.clear();
|
||||
let _ = state_tx.send(network);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user