poll abstraction

This commit is contained in:
2026-04-04 18:37:27 +02:00
parent 57906de920
commit f1e601f9ed
2 changed files with 135 additions and 123 deletions
+87 -62
View File
@@ -31,6 +31,64 @@ use tokio::time::{Duration, sleep};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info}; use tracing::{debug, error, info};
/// Spawn a health-tracked polling loop. `$poll_expr` is awaited each cycle.
macro_rules! spawn_poll_loop {
($name:expr, $interval:expr, $health:expr, $token:expr, $poll_expr:expr) => {
tokio::spawn(async move {
info!(concat!("Starting ", $name, " polling task"));
loop {
if !crate::health::is_poll_in_backoff($name, &$health).await {
let res: crate::error::Result<()> = $poll_expr.await;
crate::health::handle_poll_result($name, res, &$health).await;
}
tokio::select! {
_ = $token.cancelled() => break,
_ = sleep($interval) => {}
}
}
info!(concat!($name, " task shut down."));
})
};
}
/// Spawn a health-tracked polling loop with an extra trigger channel for early wake-up.
macro_rules! spawn_poll_loop_triggered {
($name:expr, $interval:expr, $health:expr, $token:expr, $trigger:expr, $poll_expr:expr) => {
tokio::spawn(async move {
info!(concat!("Starting ", $name, " polling task"));
loop {
if !crate::health::is_poll_in_backoff($name, &$health).await {
let res: crate::error::Result<()> = $poll_expr.await;
crate::health::handle_poll_result($name, res, &$health).await;
}
tokio::select! {
_ = $token.cancelled() => break,
_ = $trigger.recv() => {},
_ = sleep($interval) => {}
}
}
info!(concat!($name, " task shut down."));
})
};
}
/// Spawn a simple polling loop without health tracking.
macro_rules! spawn_poll_loop_simple {
($name:expr, $interval:expr, $token:expr, $poll_expr:expr) => {
tokio::spawn(async move {
info!(concat!("Starting ", $name, " polling task"));
loop {
$poll_expr.await;
tokio::select! {
_ = $token.cancelled() => break,
_ = sleep($interval) => {}
}
}
info!(concat!($name, " task shut down."));
})
};
}
struct SocketGuard { struct SocketGuard {
path: String, path: String,
} }
@@ -196,23 +254,16 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
// 1. Network Task // 1. Network Task
#[cfg(feature = "mod-network")] #[cfg(feature = "mod-network")]
if config.read().await.network.enabled { if config.read().await.network.enabled {
let mut daemon = NetworkDaemon::new();
let token = cancel_token.clone(); let token = cancel_token.clone();
let net_health = Arc::clone(&health); let h = Arc::clone(&health);
tokio::spawn(async move { spawn_poll_loop!(
info!("Starting Network polling task"); "net",
let mut daemon = NetworkDaemon::new(); Duration::from_secs(1),
loop { h,
if !crate::health::is_poll_in_backoff("net", &net_health).await { token,
let res = daemon.poll(&net_tx).await; daemon.poll(&net_tx)
crate::health::handle_poll_result("net", res, &net_health).await; );
}
tokio::select! {
_ = token.cancelled() => break,
_ = sleep(Duration::from_secs(1)) => {}
}
}
info!("Network task shut down.");
});
} }
// 2. Fast Hardware Task (CPU, Mem, Load) // 2. Fast Hardware Task (CPU, Mem, Load)
@@ -222,22 +273,14 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
let fast_enabled = cfg.cpu.enabled || cfg.memory.enabled || cfg.sys.enabled; let fast_enabled = cfg.cpu.enabled || cfg.memory.enabled || cfg.sys.enabled;
drop(cfg); drop(cfg);
if fast_enabled { if fast_enabled {
let mut daemon = HardwareDaemon::new();
let token = cancel_token.clone(); let token = cancel_token.clone();
let hw_health = Arc::clone(&health); spawn_poll_loop_simple!(
tokio::spawn(async move { "fast_hw",
info!("Starting Fast Hardware polling task"); Duration::from_secs(1),
let mut daemon = HardwareDaemon::new(); token,
loop { daemon.poll_fast(&cpu_tx, &mem_tx, &sys_tx)
if !crate::health::is_poll_in_backoff("cpu", &hw_health).await { );
daemon.poll_fast(&cpu_tx, &mem_tx, &sys_tx).await;
}
tokio::select! {
_ = token.cancelled() => break,
_ = sleep(Duration::from_secs(1)) => {}
}
}
info!("Fast Hardware task shut down.");
});
} }
} }
@@ -248,47 +291,29 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
let slow_enabled = cfg.gpu.enabled || cfg.disk.enabled; let slow_enabled = cfg.gpu.enabled || cfg.disk.enabled;
drop(cfg); drop(cfg);
if slow_enabled { if slow_enabled {
let mut daemon = HardwareDaemon::new();
let token = cancel_token.clone(); let token = cancel_token.clone();
let slow_health = Arc::clone(&health); spawn_poll_loop_simple!(
tokio::spawn(async move { "slow_hw",
info!("Starting Slow Hardware polling task"); Duration::from_secs(5),
let mut daemon = HardwareDaemon::new(); token,
loop { daemon.poll_slow(&gpu_tx, &disks_tx)
if !crate::health::is_poll_in_backoff("gpu", &slow_health).await { );
daemon.poll_slow(&gpu_tx, &disks_tx).await;
}
tokio::select! {
_ = token.cancelled() => break,
_ = sleep(Duration::from_secs(5)) => {}
}
}
info!("Slow Hardware task shut down.");
});
} }
} }
// 4. Bluetooth Task // 4. Bluetooth Task
#[cfg(feature = "mod-bt")] #[cfg(feature = "mod-bt")]
if config.read().await.bt.enabled { if config.read().await.bt.enabled {
let mut daemon = BtDaemon::new();
let token = cancel_token.clone(); let token = cancel_token.clone();
let bt_health = Arc::clone(&health); let h = Arc::clone(&health);
let poll_config = Arc::clone(&config); let poll_config = Arc::clone(&config);
let poll_receivers = receivers.clone(); let poll_receivers = receivers.clone();
tokio::spawn(async move { spawn_poll_loop_triggered!("bt", Duration::from_secs(2), h, token, bt_force_rx, async {
info!("Starting Bluetooth polling task"); let config = poll_config.read().await;
let mut daemon = BtDaemon::new(); daemon.poll(&bt_tx, &poll_receivers, &config).await;
loop { Ok(())
if !crate::health::is_poll_in_backoff("bt", &bt_health).await {
let config = poll_config.read().await;
daemon.poll(&bt_tx, &poll_receivers, &config).await;
}
tokio::select! {
_ = token.cancelled() => break,
_ = bt_force_rx.recv() => {},
_ = sleep(Duration::from_secs(2)) => {}
}
}
info!("Bluetooth task shut down.");
}); });
} }
+48 -61
View File
@@ -7,10 +7,42 @@ use futures::StreamExt;
use tokio::sync::watch; use tokio::sync::watch;
use tokio::time::Duration; use tokio::time::Duration;
use tracing::{debug, error, info}; use tracing::{debug, error, info};
use zbus::{Connection, fdo::PropertiesProxy, names::InterfaceName, proxy}; use zbus::proxy;
use zbus::zvariant::OwnedValue;
use zbus::{Connection, fdo::PropertiesProxy};
pub struct DndModule; pub struct DndModule;
/// Read dunst's `paused` property via raw D-Bus call.
async fn dunst_get_paused(connection: &Connection) -> anyhow::Result<bool> {
let reply = connection
.call_method(
Some("org.freedesktop.Notifications"),
"/org/freedesktop/Notifications",
Some("org.freedesktop.DBus.Properties"),
"Get",
&("org.dunstproject.cmd0", "paused"),
)
.await?;
let value: OwnedValue = reply.body().deserialize()?;
Ok(bool::try_from(&*value)?)
}
/// Set dunst's `paused` property via raw D-Bus call.
async fn dunst_set_paused(connection: &Connection, paused: bool) -> anyhow::Result<()> {
let value = zbus::zvariant::Value::from(paused);
connection
.call_method(
Some("org.freedesktop.Notifications"),
"/org/freedesktop/Notifications",
Some("org.freedesktop.DBus.Properties"),
"Set",
&("org.dunstproject.cmd0", "paused", value),
)
.await?;
Ok(())
}
impl WaybarModule for DndModule { impl WaybarModule for DndModule {
async fn run( async fn run(
&self, &self,
@@ -29,7 +61,7 @@ impl WaybarModule for DndModule {
message: format!("DBus connection failed: {}", e), message: format!("DBus connection failed: {}", e),
})?; })?;
// Try toggling SwayNC // Try SwayNC
if let Ok(proxy) = SwayncControlProxy::new(&connection).await if let Ok(proxy) = SwayncControlProxy::new(&connection).await
&& let Ok(is_dnd) = proxy.dnd().await && let Ok(is_dnd) = proxy.dnd().await
{ {
@@ -37,11 +69,9 @@ impl WaybarModule for DndModule {
return Ok(WaybarOutput::default()); return Ok(WaybarOutput::default());
} }
// Try toggling Dunst // Try Dunst via raw D-Bus
if let Ok(proxy) = DunstControlProxy::new(&connection).await if let Ok(is_paused) = dunst_get_paused(&connection).await {
&& let Ok(is_paused) = proxy.paused().await let _ = dunst_set_paused(&connection, !is_paused).await;
{
let _ = proxy.set_paused(!is_paused).await;
return Ok(WaybarOutput::default()); return Ok(WaybarOutput::default());
} }
@@ -85,18 +115,6 @@ trait SwayncControl {
fn set_dnd(&self, value: bool) -> zbus::Result<()>; fn set_dnd(&self, value: bool) -> zbus::Result<()>;
} }
#[proxy(
interface = "org.dunstproject.cmd0",
default_service = "org.freedesktop.Notifications",
default_path = "/org/freedesktop/Notifications"
)]
trait DunstControl {
#[zbus(property)]
fn paused(&self) -> zbus::Result<bool>;
#[zbus(property)]
fn set_paused(&self, value: bool) -> zbus::Result<()>;
}
impl DndDaemon { impl DndDaemon {
pub fn new() -> Self { pub fn new() -> Self {
Self Self
@@ -146,53 +164,19 @@ impl DndDaemon {
return Err(anyhow::anyhow!("SwayNC DND stream ended")); return Err(anyhow::anyhow!("SwayNC DND stream ended"));
} }
// Try Dunst (polling via raw D-Bus Properties.Get for maximum compatibility) // Try Dunst via raw D-Bus calls (bypasses zbus proxy issues)
let props_proxy = PropertiesProxy::builder(&connection) match dunst_get_paused(&connection).await {
.destination("org.freedesktop.Notifications")? Ok(is_paused) => {
.path("/org/freedesktop/Notifications")?
.build()
.await;
match &props_proxy {
Ok(_) => debug!("Created Properties proxy for Dunst"),
Err(e) => debug!("Failed to create Properties proxy for Dunst: {}", e),
}
if let Ok(props) = props_proxy {
// Read paused property via raw Properties.Get
let initial = props
.get(
InterfaceName::from_static_str_unchecked("org.dunstproject.cmd0"),
"paused",
)
.await;
match &initial {
Ok(_) => debug!("Successfully read Dunst paused property"),
Err(e) => debug!("Failed to read Dunst paused property: {}", e),
}
if let Ok(value) = initial
&& let Ok(is_paused) = bool::try_from(&*value)
{
info!("Found Dunst, using polling-based DND monitoring"); info!("Found Dunst, using polling-based DND monitoring");
let _ = tx.send(DndState { is_dnd: is_paused }); let _ = tx.send(DndState { is_dnd: is_paused });
loop { loop {
tokio::time::sleep(Duration::from_secs(2)).await; tokio::time::sleep(Duration::from_secs(2)).await;
match props match dunst_get_paused(&connection).await {
.get( Ok(is_paused) => {
InterfaceName::from_static_str_unchecked("org.dunstproject.cmd0"), let current = tx.borrow().is_dnd;
"paused", if current != is_paused {
) let _ = tx.send(DndState { is_dnd: is_paused });
.await
{
Ok(value) => {
if let Ok(is_paused) = bool::try_from(&*value) {
let current = tx.borrow().is_dnd;
if current != is_paused {
let _ = tx.send(DndState { is_dnd: is_paused });
}
} }
} }
Err(e) => { Err(e) => {
@@ -204,6 +188,9 @@ impl DndDaemon {
return Err(anyhow::anyhow!("Dunst connection lost")); return Err(anyhow::anyhow!("Dunst connection lost"));
} }
Err(e) => {
info!("Dunst not available: {}", e);
}
} }
Err(anyhow::anyhow!( Err(anyhow::anyhow!(