added tokio shared states instead of monolithic state
Release / Build and Release (push) Has been cancelled

This commit is contained in:
2026-04-02 18:11:21 +02:00
parent bb3f6e565d
commit bdbd6a8a40
18 changed files with 479 additions and 352 deletions
+45 -45
View File
@@ -2,24 +2,16 @@ use crate::config::Config;
use crate::error::{FluxoError, Result};
use crate::modules::WaybarModule;
use crate::output::WaybarOutput;
use crate::state::SharedState;
use crate::state::{AppReceivers, AudioState};
use crate::utils::{TokenValue, format_template};
use libpulse_binding::callbacks::ListResult;
use libpulse_binding::context::subscribe::{Facility, InterestMaskSet};
use libpulse_binding::context::{Context, FlagSet as ContextFlag};
use libpulse_binding::mainloop::threaded::Mainloop as ThreadedMainloop;
use libpulse_binding::volume::Volume;
use std::process::Command;
use std::sync::{Arc, LazyLock};
use tokio::sync::watch;
use tracing::error;
static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create Audio tokio runtime")
});
pub struct AudioDaemon;
impl AudioDaemon {
@@ -27,8 +19,8 @@ impl AudioDaemon {
Self
}
pub fn start(&self, state: SharedState) {
let state_arc = Arc::clone(&state);
pub fn start(&self, state_tx: &watch::Sender<AudioState>) {
let state_tx = state_tx.clone();
std::thread::spawn(move || {
let mut mainloop =
@@ -64,7 +56,7 @@ impl AudioDaemon {
}
// Initial fetch
let _ = fetch_audio_data_sync(&mut context, &state_arc);
let _ = fetch_audio_data_sync(&mut context, &state_tx);
// Subscribe to events
let interest =
@@ -89,7 +81,7 @@ impl AudioDaemon {
let _ = rx.recv_timeout(Duration::from_secs(2));
{
mainloop.lock();
let _ = fetch_audio_data_sync(&mut context, &state_arc);
let _ = fetch_audio_data_sync(&mut context, &state_tx);
mainloop.unlock();
}
}
@@ -99,68 +91,72 @@ impl AudioDaemon {
use std::time::Duration;
fn fetch_audio_data_sync(context: &mut Context, state: &SharedState) -> Result<()> {
let state_inner = Arc::clone(state);
fn fetch_audio_data_sync(
context: &mut Context,
state_tx: &watch::Sender<AudioState>,
) -> Result<()> {
// We fetch all sinks and sources, and also server info to know defaults.
// The order doesn't strictly matter as long as we update correctly.
let st_server = Arc::clone(&state_inner);
let tx_server = state_tx.clone();
context.introspect().get_server_info(move |info| {
let mut lock = RUNTIME.block_on(st_server.write());
lock.audio.sink.name = info
let mut current = tx_server.borrow().clone();
current.sink.name = info
.default_sink_name
.as_ref()
.map(|s| s.to_string())
.unwrap_or_default();
lock.audio.source.name = info
current.source.name = info
.default_source_name
.as_ref()
.map(|s| s.to_string())
.unwrap_or_default();
let _ = tx_server.send(current);
});
let st_sink = Arc::clone(&state_inner);
let tx_sink = state_tx.clone();
context.introspect().get_sink_info_list(move |res| {
if let ListResult::Item(item) = res {
let mut lock = RUNTIME.block_on(st_sink.write());
let mut current = tx_sink.borrow().clone();
// If this matches our default sink name, or if we don't have details for any yet
let is_default = item
.name
.as_ref()
.map(|s| s.as_ref() == lock.audio.sink.name)
.map(|s| s.as_ref() == current.sink.name)
.unwrap_or(false);
if is_default {
lock.audio.sink.description = item
current.sink.description = item
.description
.as_ref()
.map(|s| s.to_string())
.unwrap_or_default();
lock.audio.sink.volume =
current.sink.volume =
((item.volume.avg().0 as f64 / Volume::NORMAL.0 as f64) * 100.0).round() as u8;
lock.audio.sink.muted = item.mute;
current.sink.muted = item.mute;
let _ = tx_sink.send(current);
}
}
});
let st_source = Arc::clone(&state_inner);
let tx_source = state_tx.clone();
context.introspect().get_source_info_list(move |res| {
if let ListResult::Item(item) = res {
let mut lock = RUNTIME.block_on(st_source.write());
let mut current = tx_source.borrow().clone();
let is_default = item
.name
.as_ref()
.map(|s| s.as_ref() == lock.audio.source.name)
.map(|s| s.as_ref() == current.source.name)
.unwrap_or(false);
if is_default {
lock.audio.source.description = item
current.source.description = item
.description
.as_ref()
.map(|s| s.to_string())
.unwrap_or_default();
lock.audio.source.volume =
current.source.volume =
((item.volume.avg().0 as f64 / Volume::NORMAL.0 as f64) * 100.0).round() as u8;
lock.audio.source.muted = item.mute;
current.source.muted = item.mute;
let _ = tx_source.send(current);
}
}
});
@@ -174,7 +170,7 @@ impl WaybarModule for AudioModule {
async fn run(
&self,
config: &Config,
state: &SharedState,
state: &AppReceivers,
args: &[&str],
) -> Result<WaybarOutput> {
let target_type = args.first().unwrap_or(&"sink");
@@ -182,7 +178,7 @@ impl WaybarModule for AudioModule {
match *action {
"cycle" => {
self.cycle_device(target_type)?;
self.cycle_device(target_type).await?;
Ok(WaybarOutput::default())
}
"show" => self.get_status(config, state, target_type).await,
@@ -198,13 +194,10 @@ impl AudioModule {
async fn get_status(
&self,
config: &Config,
state: &SharedState,
state: &AppReceivers,
target_type: &str,
) -> Result<WaybarOutput> {
let audio_state = {
let lock = state.read().await;
lock.audio.clone()
};
let audio_state = state.audio.borrow().clone();
let (name, description, volume, muted) = if target_type == "sink" {
(
@@ -287,7 +280,7 @@ impl AudioModule {
})
}
fn cycle_device(&self, target_type: &str) -> Result<()> {
async fn cycle_device(&self, target_type: &str) -> Result<()> {
// Keep using pactl for cycling for now as it's a rare action
// but we could also implement it natively later.
let set_cmd = if target_type == "sink" {
@@ -305,9 +298,10 @@ impl AudioModule {
} else {
"sources"
};
let output = Command::new("pactl")
let output = tokio::process::Command::new("pactl")
.args(["list", "short", list_cmd])
.output()?;
.output()
.await?;
let stdout = String::from_utf8_lossy(&output.stdout);
let devices: Vec<String> = stdout
@@ -331,7 +325,10 @@ impl AudioModule {
return Ok(());
}
let info_output = Command::new("pactl").args(["info"]).output()?;
let info_output = tokio::process::Command::new("pactl")
.args(["info"])
.output()
.await?;
let info_stdout = String::from_utf8_lossy(&info_output.stdout);
let search_key = if target_type == "sink" {
"Default Sink:"
@@ -349,7 +346,10 @@ impl AudioModule {
let next_index = (current_index + 1) % devices.len();
let next_dev = &devices[next_index];
Command::new("pactl").args([set_cmd, next_dev]).status()?;
tokio::process::Command::new("pactl")
.args([set_cmd, next_dev])
.status()
.await?;
Ok(())
}
}