migrated daemon to tokio with seperate hardware threads + thiserror #1

Merged
nvrl merged 5 commits from release/0.3.1 into main 2026-04-01 18:16:24 +02:00
4 changed files with 334 additions and 206 deletions
Showing only changes of commit 81c9b78cb3 - Show all commits
Generated
+1 -1
View File
@@ -353,7 +353,7 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
[[package]]
name = "fluxo-rs"
version = "0.3.0"
version = "0.3.1"
dependencies = [
"anyhow",
"bluer",
+13 -13
View File
@@ -5,7 +5,7 @@ use std::path::PathBuf;
use std::sync::LazyLock;
use tracing::{debug, info, warn};
#[derive(Deserialize, Default)]
#[derive(Deserialize, Default, Clone)]
pub struct Config {
#[serde(default)]
pub general: GeneralConfig,
@@ -33,7 +33,7 @@ pub struct Config {
pub game: GameConfig,
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct GeneralConfig {
pub menu_command: String,
}
@@ -46,7 +46,7 @@ impl Default for GeneralConfig {
}
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct NetworkConfig {
pub format: String,
}
@@ -59,7 +59,7 @@ impl Default for NetworkConfig {
}
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct CpuConfig {
pub format: String,
}
@@ -72,7 +72,7 @@ impl Default for CpuConfig {
}
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct MemoryConfig {
pub format: String,
}
@@ -85,7 +85,7 @@ impl Default for MemoryConfig {
}
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct GpuConfig {
pub format_amd: String,
pub format_intel: String,
@@ -104,7 +104,7 @@ impl Default for GpuConfig {
}
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct SysConfig {
pub format: String,
}
@@ -117,7 +117,7 @@ impl Default for SysConfig {
}
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct DiskConfig {
pub format: String,
}
@@ -130,7 +130,7 @@ impl Default for DiskConfig {
}
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct PoolConfig {
pub format: String,
}
@@ -143,7 +143,7 @@ impl Default for PoolConfig {
}
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct PowerConfig {
pub format: String,
}
@@ -156,7 +156,7 @@ impl Default for PowerConfig {
}
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct AudioConfig {
pub format_sink_unmuted: String,
pub format_sink_muted: String,
@@ -175,7 +175,7 @@ impl Default for AudioConfig {
}
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct BtConfig {
pub format_connected: String,
pub format_plugin: String,
@@ -194,7 +194,7 @@ impl Default for BtConfig {
}
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct GameConfig {
pub format_active: String,
pub format_inactive: String,
+3 -1
View File
@@ -89,11 +89,13 @@ pub async fn run_daemon(config_path: Option<PathBuf>) -> Result<()> {
// 4. Bluetooth Task
let poll_state = Arc::clone(&state);
let poll_config = Arc::clone(&config);
tokio::spawn(async move {
info!("Starting Bluetooth polling task");
let mut daemon = BtDaemon::new();
loop {
daemon.poll(Arc::clone(&poll_state)).await;
let config = poll_config.read().await;
daemon.poll(Arc::clone(&poll_state), &config).await;
sleep(Duration::from_secs(1)).await;
}
});
+181 -55
View File
@@ -1,14 +1,15 @@
use crate::config::Config;
use crate::error::Result as FluxoResult;
use crate::error::{FluxoError, Result as FluxoResult};
use crate::modules::WaybarModule;
use crate::output::WaybarOutput;
use crate::state::{BtState, SharedState};
use crate::utils::{TokenValue, format_template};
use anyhow::{Context, Result};
use futures::StreamExt;
use futures::future::BoxFuture;
use std::collections::HashMap;
use std::process::Command;
use std::sync::{Arc, LazyLock, Mutex};
use std::sync::{Arc, LazyLock, Mutex, OnceLock};
use tokio::sync::mpsc;
use tokio::time::{Duration, Instant};
use tracing::{debug, error, info, warn};
@@ -49,10 +50,11 @@ struct MaestroManager {
}
impl MaestroManager {
fn new() -> Self {
fn new(state: SharedState) -> Self {
let (tx, mut rx) = mpsc::unbounded_channel::<ManagerCommand>();
let statuses = Arc::new(Mutex::new(HashMap::new()));
let statuses_clone = Arc::clone(&statuses);
let state_clone = Arc::clone(&state);
// Start dedicated BT management thread
std::thread::spawn(move || {
@@ -77,9 +79,10 @@ impl MaestroManager {
let mac_clone = mac.clone();
let st_clone = Arc::clone(&statuses_clone);
let state_inner = Arc::clone(&state_clone);
tokio::task::spawn_local(async move {
if let Err(e) = buds_task(&mac_clone, st_clone, buds_rx).await {
if let Err(e) = buds_task(&mac_clone, st_clone, buds_rx, state_inner).await {
error!("Buds task for {} failed: {}", mac_clone, e);
}
});
@@ -130,11 +133,18 @@ async fn buds_task(
mac: &str,
statuses: Arc<Mutex<HashMap<String, BudsStatus>>>,
mut rx: mpsc::Receiver<BudsCommand>,
state: SharedState,
) -> Result<()> {
info!("Starting native Maestro connection task for {}", mac);
loop {
let addr: bluer::Address = mac.parse().context("Failed to parse MAC address")?;
let addr: bluer::Address = match mac.parse() {
Ok(a) => a,
Err(e) => {
error!("Failed to parse MAC address {}: {}", mac, e);
return Err(e.into());
}
};
let session = bluer::Session::new()
.await
.context("Failed to create bluer session")?;
@@ -152,7 +162,6 @@ async fn buds_task(
}
// Connect to Maestro RFCOMM service
// We try channel 1 then 2, which covers most Pixel Buds variants.
let mut stream = None;
for channel in [1, 2] {
let socket = match bluer::rfcomm::Socket::new() {
@@ -198,7 +207,7 @@ async fn buds_task(
let mut client = Client::new(stream);
let handle = client.handle();
// Resolve Maestro channel (typically 1 or 2)
// Resolve Maestro channel
let channel = match maestro::protocol::utils::resolve_channel(&mut client).await {
Ok(c) => c,
Err(e) => {
@@ -207,7 +216,6 @@ async fn buds_task(
}
};
// Run client in background to handle RPC packets
tokio::spawn(async move {
if let Err(e) = client.run().await {
error!("Maestro client loop error: {}", e);
@@ -216,19 +224,28 @@ async fn buds_task(
let mut service = MaestroService::new(handle, channel);
// Update health
{
let mut lock = state.write().await;
let health = lock.health.entry("bt.buds".to_string()).or_default();
health.consecutive_failures = 0;
health.backoff_until = None;
}
// Query initial ANC state
if let Ok(val) = service
.read_setting_var(settings::SettingId::CurrentAncrState)
.await
&& let SettingValue::CurrentAncrState(anc_state) = val
{
let mut status = MAESTRO.get_status(mac);
let mut lock = statuses.lock().unwrap();
let status = lock.entry(mac.to_string()).or_default();
status.anc_state = anc_state_to_string(&anc_state);
statuses.lock().unwrap().insert(mac.to_string(), status);
status.last_update = Some(Instant::now());
}
// Subscribe to real-time status updates (battery, ANC, wear)
let mut call = match service.subscribe_to_runtime_info() {
// Subscribe to real-time status updates (battery, wear)
let mut runtime_info_call = match service.subscribe_to_runtime_info() {
Ok(c) => c,
Err(e) => {
error!("Failed to subscribe to runtime info for {}: {}", mac, e);
@@ -236,9 +253,20 @@ async fn buds_task(
}
};
let mut runtime_info = call.stream();
let mut runtime_info = runtime_info_call.stream();
debug!("Subscribed to runtime info for {}", mac);
// Subscribe to settings changes (to catch physical toggles on the buds)
let mut settings_changes_call = match service.subscribe_to_settings_changes() {
Ok(s) => s,
Err(e) => {
error!("Failed to subscribe to settings changes for {}: {}", mac, e);
continue;
}
};
let mut settings_changes = settings_changes_call.stream();
debug!("Subscribed to status and settings for {}", mac);
loop {
tokio::select! {
@@ -248,6 +276,14 @@ async fn buds_task(
debug!("Setting ANC mode to {} for {}", mode, mac);
let state = mode_to_anc_state(&mode);
let val = SettingValue::CurrentAncrState(state);
{
let mut lock = statuses.lock().unwrap();
let status = lock.entry(mac.to_string()).or_default();
status.anc_state = mode.clone();
status.last_update = Some(Instant::now());
}
if let Err(e) = service.write_setting(val).await {
error!("Failed to write ANC setting for {}: {}", mac, e);
}
@@ -255,8 +291,11 @@ async fn buds_task(
None => return Ok(()),
}
}
Some(Ok(info)) = runtime_info.next() => {
let mut status = MAESTRO.get_status(mac);
Some(res) = runtime_info.next() => {
match res {
Ok(info) => {
let mut lock = statuses.lock().unwrap();
let status = lock.entry(mac.to_string()).or_default();
status.last_update = Some(Instant::now());
if let Some(bat) = info.battery_info {
@@ -264,11 +303,39 @@ async fn buds_task(
status.right_battery = bat.right.map(|b| b.level as u8);
status.case_battery = bat.case.map(|b| b.level as u8);
}
}
Err(e) => {
warn!("Runtime info stream error for {}: {}", mac, e);
break;
}
}
}
Some(res) = settings_changes.next() => {
if let Ok(change) = res {
use maestro::protocol::types::settings_rsp::ValueOneof as RspOneof;
use maestro::protocol::types::setting_value::ValueOneof as ValOneof;
statuses.lock().unwrap().insert(mac.to_string(), status);
if let Some(RspOneof::Value(setting_val)) = change.value_oneof {
if let Some(ValOneof::CurrentAncrState(anc_state_raw)) = setting_val.value_oneof {
let mut lock = statuses.lock().unwrap();
let status = lock.entry(mac.to_string()).or_default();
let anc_state = match anc_state_raw {
1 => settings::AncState::Off,
2 => settings::AncState::Active,
3 => settings::AncState::Aware,
4 => settings::AncState::Adaptive,
_ => settings::AncState::Unknown(anc_state_raw),
};
status.anc_state = anc_state_to_string(&anc_state);
status.last_update = Some(Instant::now());
debug!(mode = %status.anc_state, "Caught physical ANC toggle");
}
}
}
}
_ = tokio::time::sleep(Duration::from_secs(30)) => {
// Check if still connected to BT
if !device.is_connected().await.unwrap_or(false) {
break;
}
@@ -302,7 +369,11 @@ fn anc_state_to_string(state: &settings::AncState) -> String {
}
}
static MAESTRO: LazyLock<MaestroManager> = LazyLock::new(MaestroManager::new);
static MAESTRO: OnceLock<MaestroManager> = OnceLock::new();
fn get_maestro(state: &SharedState) -> &MaestroManager {
MAESTRO.get_or_init(|| MaestroManager::new(Arc::clone(state)))
}
pub struct BtDaemon {
session: Option<bluer::Session>,
@@ -313,13 +384,13 @@ impl BtDaemon {
Self { session: None }
}
pub async fn poll(&mut self, state: SharedState) {
if let Err(e) = self.poll_async(state).await {
pub async fn poll(&mut self, state: SharedState, config: &Config) {
if let Err(e) = self.poll_async(state, config).await {
error!("BT daemon error: {}", e);
}
}
async fn poll_async(&mut self, state: SharedState) -> Result<()> {
async fn poll_async(&mut self, state: SharedState, config: &Config) -> Result<()> {
if self.session.is_none() {
self.session = Some(bluer::Session::new().await?);
}
@@ -338,7 +409,6 @@ impl BtDaemon {
let device = adapter.device(addr)?;
if device.is_connected().await.unwrap_or(false) {
let uuids = device.uuids().await?.unwrap_or_default();
// Audio sink UUID (0x110b)
let audio_sink_uuid =
bluer::Uuid::from_u128(0x0000110b_0000_1000_8000_00805f9b34fb);
if uuids.contains(&audio_sink_uuid) {
@@ -349,10 +419,9 @@ impl BtDaemon {
bt_state.battery_percentage =
device.battery_percentage().await.unwrap_or(None);
// Plugin detection
for p in PLUGINS.iter() {
if p.can_handle(&bt_state.device_alias, &bt_state.device_address) {
match p.get_data(&bt_state.device_address) {
match p.get_data(config, &state, &bt_state.device_address).await {
Ok(data) => {
bt_state.plugin_data = data
.into_iter()
@@ -392,16 +461,24 @@ impl BtDaemon {
pub trait BtPlugin: Send + Sync {
fn name(&self) -> &str;
fn can_handle(&self, alias: &str, mac: &str) -> bool;
fn get_data(&self, mac: &str) -> Result<Vec<(String, TokenValue)>>;
fn get_modes(&self, _mac: &str) -> Result<Vec<String>> {
Ok(vec![])
}
fn set_mode(&self, _mode: &str, _mac: &str) -> Result<()> {
Ok(())
}
fn cycle_mode(&self, _mac: &str) -> Result<()> {
Ok(())
}
fn get_data(
&self,
config: &Config,
state: &SharedState,
mac: &str,
) -> BoxFuture<'static, FluxoResult<Vec<(String, TokenValue)>>>;
fn get_modes(
&self,
mac: &str,
state: &SharedState,
) -> BoxFuture<'static, FluxoResult<Vec<String>>>;
fn set_mode(
&self,
mode: &str,
mac: &str,
state: &SharedState,
) -> BoxFuture<'static, FluxoResult<()>>;
fn cycle_mode(&self, mac: &str, state: &SharedState) -> BoxFuture<'static, FluxoResult<()>>;
}
pub struct PixelBudsPlugin;
@@ -415,12 +492,24 @@ impl BtPlugin for PixelBudsPlugin {
alias.contains("Pixel Buds Pro")
}
fn get_data(&self, mac: &str) -> Result<Vec<(String, TokenValue)>> {
MAESTRO.ensure_task(mac);
let status = MAESTRO.get_status(mac);
fn get_data(
&self,
_config: &Config,
state: &SharedState,
mac: &str,
) -> BoxFuture<'static, FluxoResult<Vec<(String, TokenValue)>>> {
let mac = mac.to_string();
let state = Arc::clone(state);
Box::pin(async move {
let maestro = get_maestro(&state);
maestro.ensure_task(&mac);
let status = maestro.get_status(&mac);
if let Some(err) = status.error {
return Err(anyhow::anyhow!(err));
return Err(FluxoError::Module {
module: "bt.buds",
message: err,
});
}
let left_display = status
@@ -448,28 +537,59 @@ impl BtPlugin for PixelBudsPlugin {
TokenValue::String(class.to_string()),
),
])
})
}
fn get_modes(&self, _mac: &str) -> Result<Vec<String>> {
fn get_modes(
&self,
_mac: &str,
_state: &SharedState,
) -> BoxFuture<'static, FluxoResult<Vec<String>>> {
Box::pin(async move {
Ok(vec![
"active".to_string(),
"aware".to_string(),
"off".to_string(),
])
})
}
fn set_mode(&self, mode: &str, mac: &str) -> Result<()> {
MAESTRO.send_command(mac, BudsCommand::SetAnc(mode.to_string()))
fn set_mode(
&self,
mode: &str,
mac: &str,
state: &SharedState,
) -> BoxFuture<'static, FluxoResult<()>> {
let mode = mode.to_string();
let mac = mac.to_string();
let state = Arc::clone(state);
Box::pin(async move {
get_maestro(&state)
.send_command(&mac, BudsCommand::SetAnc(mode))
.map_err(|e| FluxoError::Module {
module: "bt.buds",
message: e.to_string(),
})
})
}
fn cycle_mode(&self, mac: &str) -> Result<()> {
let status = MAESTRO.get_status(mac);
fn cycle_mode(&self, mac: &str, state: &SharedState) -> BoxFuture<'static, FluxoResult<()>> {
let mac = mac.to_string();
let state = Arc::clone(state);
Box::pin(async move {
let status = get_maestro(&state).get_status(&mac);
let next_mode = match status.anc_state.as_str() {
"active" => "aware",
"aware" => "off",
_ => "active",
};
self.set_mode(next_mode, mac)
get_maestro(&state)
.send_command(&mac, BudsCommand::SetAnc(next_mode.to_string()))
.map_err(|e| FluxoError::Module {
module: "bt.buds",
message: e.to_string(),
})
})
}
}
@@ -479,19 +599,24 @@ static PLUGINS: LazyLock<Vec<Box<dyn BtPlugin>>> =
pub struct BtModule;
impl WaybarModule for BtModule {
async fn run(
fn run(
&self,
config: &Config,
state: &SharedState,
args: &[&str],
) -> FluxoResult<WaybarOutput> {
let action = args.first().unwrap_or(&"show");
) -> impl std::future::Future<Output = FluxoResult<WaybarOutput>> + Send {
let action = args.first().cloned().unwrap_or("show").to_string();
let args = args.iter().map(|s| s.to_string()).collect::<Vec<_>>();
let state = Arc::clone(state);
let config = config.clone();
async move {
let bt_state = {
let lock = state.read().await;
lock.bluetooth.clone()
};
match *action {
match action.as_str() {
"disconnect" if bt_state.connected => {
let _ = Command::new("bluetoothctl")
.args(["disconnect", &bt_state.device_address])
@@ -503,7 +628,7 @@ impl WaybarModule for BtModule {
.iter()
.find(|p| p.can_handle(&bt_state.device_alias, &bt_state.device_address));
if let Some(p) = plugin {
p.cycle_mode(&bt_state.device_address)?;
p.cycle_mode(&bt_state.device_address, &state).await?;
}
return Ok(WaybarOutput::default());
}
@@ -512,7 +637,7 @@ impl WaybarModule for BtModule {
.iter()
.find(|p| p.can_handle(&bt_state.device_alias, &bt_state.device_address));
let modes = if let Some(p) = plugin {
p.get_modes(&bt_state.device_address)?
p.get_modes(&bt_state.device_address, &state).await?
} else {
vec![]
};
@@ -523,11 +648,11 @@ impl WaybarModule for BtModule {
}
"set_mode" if bt_state.connected => {
if let Some(mode) = args.get(1) {
let plugin = PLUGINS
.iter()
.find(|p| p.can_handle(&bt_state.device_alias, &bt_state.device_address));
let plugin = PLUGINS.iter().find(|p| {
p.can_handle(&bt_state.device_alias, &bt_state.device_address)
});
if let Some(p) = plugin {
p.set_mode(mode, &bt_state.device_address)?;
p.set_mode(mode, &bt_state.device_address, &state).await?;
}
}
return Ok(WaybarOutput::default());
@@ -603,4 +728,5 @@ impl WaybarModule for BtModule {
})
}
}
}
}