//! The central state machine responsible for coordinating the thermal benchmark. //! //! It manages hardware interactions through the [PlatformSal], generates stress //! using a [Workload], and feeds telemetry to the frontend via MPSC channels. use anyhow::{Result, Context}; use std::sync::mpsc; use std::time::{Duration, Instant}; use std::thread; use std::collections::VecDeque; use sysinfo::System; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Mutex; use std::path::PathBuf; use crate::sal::traits::{PlatformSal, AuditStep, SafetyStatus}; use crate::sal::heuristic::discovery::SystemFactSheet; use crate::sal::safety::{HardwareStateGuard, TdpLimitMicroWatts}; use crate::load::{Workload, IntensityProfile}; use crate::mediator::{TelemetryState, UiCommand, BenchmarkPhase}; use crate::engine::{OptimizerEngine, ThermalProfile, ThermalPoint, OptimizationResult}; /// The central state machine responsible for coordinating the thermal benchmark. /// /// It manages hardware interactions through the [PlatformSal], generates stress /// using a [Workload], and feeds telemetry to the frontend via MPSC channels. pub struct BenchmarkOrchestrator { /// Injected hardware abstraction layer. sal: Arc, /// Discovered system facts and paths. facts: SystemFactSheet, /// Heat generation workload. workload: Box, /// Channel for sending telemetry updates to the UI. telemetry_tx: mpsc::Sender, /// Channel for receiving commands from the UI. command_rx: mpsc::Receiver, /// Current phase of the benchmark. phase: BenchmarkPhase, /// Accumulated thermal data points. profile: ThermalProfile, /// Mathematics engine for data smoothing and optimization. engine: OptimizerEngine, /// CLI override for the configuration output path. optional_config_out: Option, /// The safety membrane protecting the system. safeguard: Option, /// Sliding window of power readings (Watts). history_watts: VecDeque, /// Sliding window of temperature readings (Celsius). history_temp: VecDeque, /// Sliding window of CPU frequency (MHz). history_mhz: VecDeque, /// Detected CPU model string. cpu_model: String, /// Total system RAM in Gigabytes. total_ram_gb: u64, /// Atomic flag indicating a safety-triggered abort. emergency_abort: Arc, /// Human-readable reason for the emergency abort. emergency_reason: Arc>>, } impl BenchmarkOrchestrator { /// Creates a new orchestrator instance with injected dependencies. pub fn new( sal: Arc, facts: SystemFactSheet, workload: Box, telemetry_tx: mpsc::Sender, command_rx: mpsc::Receiver, optional_config_out: Option, ) -> Self { let mut sys = System::new_all(); sys.refresh_all(); let cpu_model = sys.cpus().first() .map(|c| c.brand().to_string()) .unwrap_or_else(|| "Unknown CPU".to_string()); let total_ram_gb = sys.total_memory() / 1024 / 1024 / 1024; Self { sal, facts, workload, telemetry_tx, command_rx, phase: BenchmarkPhase::Auditing, profile: ThermalProfile::default(), engine: OptimizerEngine::new(5), history_watts: VecDeque::with_capacity(120), history_temp: VecDeque::with_capacity(120), history_mhz: VecDeque::with_capacity(120), cpu_model, total_ram_gb, emergency_abort: Arc::new(AtomicBool::new(false)), emergency_reason: Arc::new(Mutex::new(None)), optional_config_out, safeguard: None, } } /// Executes the full benchmark sequence. /// /// This method guarantees that [crate::sal::traits::EnvironmentGuard::restore] and [Workload::stop_workload] /// are called regardless of whether the benchmark succeeds or fails. pub fn run(&mut self) -> Result { self.log("Starting ember-tune Benchmark Sequence.")?; let _watchdog_handle = self.spawn_watchdog_monitor(); let result = self.execute_benchmark(); // --- MANDATORY CLEANUP --- self.log("Benchmark sequence finished. Restoring hardware defaults...")?; let _ = self.workload.stop_workload(); if let Some(mut sg) = self.safeguard.take() { if let Err(e) = sg.release() { anyhow::bail!("CRITICAL: USA Restoration Failure: {}", e); } } if let Err(e) = self.sal.restore() { anyhow::bail!("CRITICAL: Failed to restore hardware state: {}", e); } self.log("✓ Hardware state restored.")?; result } /// Internal execution logic for the benchmark phases. fn execute_benchmark(&mut self) -> Result { let bench_cfg = self.facts.bench_config.clone().context("Benchmarking config missing in facts")?; // 1. Snapshot & Arm Safeguard let mut target_files = self.facts.rapl_paths.iter() .map(|p| p.join("constraint_0_power_limit_uw")) .collect::>(); target_files.extend(self.facts.rapl_paths.iter().map(|p| p.join("constraint_1_power_limit_uw"))); if let Some(tp) = self.facts.paths.configs.get("throttled") { target_files.push(tp.clone()); } let target_services = vec!["tlp.service".to_string(), "thermald.service".to_string(), "throttled.service".to_string()]; self.safeguard = Some(HardwareStateGuard::acquire(&target_files, &target_services)?); // Phase 1: Audit & Baseline self.phase = BenchmarkPhase::Auditing; for step in self.sal.audit() { if let Err(e) = step.outcome { return Err(anyhow::anyhow!("Audit failed ({}): {:?}", step.description, e)); } } self.workload.initialize().context("Failed to initialize workload")?; self.log("Suppressing background services (tlp, thermald)...")?; self.sal.suppress().context("Failed to suppress background services")?; // Baseline (Idle Calibration) self.phase = BenchmarkPhase::IdleCalibration; self.log(&format!("Phase 1: Recording Idle Baseline ({}s)...", bench_cfg.idle_duration_s))?; self.sal.set_fan_mode("auto")?; let mut idle_temps = Vec::new(); let start = Instant::now(); let mut tick = 0; while start.elapsed() < Duration::from_secs(bench_cfg.idle_duration_s) { self.check_abort()?; self.send_telemetry(tick)?; idle_temps.push(self.sal.get_temp().unwrap_or(0.0)); tick += 1; thread::sleep(Duration::from_millis(500)); } self.profile.ambient_temp = self.engine.smooth(&idle_temps).last().cloned().unwrap_or(0.0); self.log(&format!("✓ Idle Baseline: {:.1}°C", self.profile.ambient_temp))?; // Phase 2: Stress Stepping self.phase = BenchmarkPhase::StressTesting; self.log("Phase 2: Starting Synthetic Stress Matrix.")?; self.sal.set_fan_mode("max")?; let steps = bench_cfg.power_steps_watts.clone(); for &pl in &steps { self.log(&format!("Testing PL1 = {:.0}W...", pl))?; let pl1_uw = crate::sal::safety::TdpLimitMicroWatts::new((pl * 1_000_000.0) as u64)?; let pl2_uw = crate::sal::safety::TdpLimitMicroWatts::new(((pl + 5.0) * 1_000_000.0) as u64)?; self.sal.set_sustained_power_limit(pl1_uw)?; self.sal.set_burst_power_limit(pl2_uw)?; self.workload.run_workload( Duration::from_secs(bench_cfg.stress_duration_max_s), IntensityProfile { threads: num_cpus::get(), load_percentage: 100 } )?; let step_start = Instant::now(); let mut step_temps = VecDeque::with_capacity(30); while step_start.elapsed() < Duration::from_secs(bench_cfg.stress_duration_max_s) { self.check_abort()?; let t = self.sal.get_temp().unwrap_or(0.0); step_temps.push_back(t); if step_temps.len() > 10 { step_temps.pop_front(); } self.send_telemetry(tick)?; tick += 1; if step_start.elapsed() > Duration::from_secs(bench_cfg.stress_duration_min_s) && step_temps.len() == 10 { let min = step_temps.iter().fold(f32::MAX, |a, &b| a.min(b)); let max = step_temps.iter().fold(f32::MIN, |a, &b| a.max(b)); if (max - min) < 0.5 { self.log(&format!(" Equilibrium reached at {:.1}°C", t))?; break; } } thread::sleep(Duration::from_millis(500)); } // Record data point let avg_p = self.sal.get_power_w().unwrap_or(0.0); let avg_t = self.sal.get_temp().unwrap_or(0.0); let avg_f = self.sal.get_freq_mhz().unwrap_or(0.0); let fans = self.sal.get_fan_rpms().unwrap_or_default(); let primary_fan = fans.first().cloned().unwrap_or(0); let metrics = self.workload.get_current_metrics().unwrap_or_default(); self.profile.points.push(ThermalPoint { power_w: avg_p, temp_c: avg_t, freq_mhz: avg_f, fan_rpm: primary_fan, throughput: metrics.primary_ops_per_sec, }); self.workload.stop_workload()?; self.log(&format!(" Step complete. Cooling down for {}s...", bench_cfg.cool_down_s))?; thread::sleep(Duration::from_secs(bench_cfg.cool_down_s)); } // Phase 4: Physical Modeling self.phase = BenchmarkPhase::PhysicalModeling; self.log("Phase 3: Calculating Silicon Physical Sweet Spot...")?; let mut res = self.generate_result(false); self.log(&format!("✓ Thermal Resistance (Rθ): {:.3} K/W", res.thermal_resistance_kw))?; self.log(&format!("✓ Silicon Knee Found: {:.1} W", res.silicon_knee_watts))?; thread::sleep(Duration::from_secs(3)); // Phase 5: Finalizing self.phase = BenchmarkPhase::Finalizing; self.log("Benchmark sequence complete. Generating configurations...")?; let config = crate::engine::formatters::throttled::ThrottledConfig { pl1_limit: res.silicon_knee_watts, pl2_limit: res.recommended_pl2, trip_temp: res.max_temp_c.max(95.0), }; let throttled_path = self.optional_config_out.clone() .or_else(|| self.facts.paths.configs.get("throttled").cloned()); if let Some(path) = throttled_path { crate::engine::formatters::throttled::ThrottledTranslator::save(&path, &config)?; self.log(&format!("✓ Saved '{}'.", path.display()))?; res.config_paths.insert("throttled".to_string(), path.clone()); } if let Some(i8k_path) = self.facts.paths.configs.get("i8kmon") { let i8k_config = crate::engine::formatters::i8kmon::I8kmonConfig { t_ambient: self.profile.ambient_temp, t_max_fan: res.max_temp_c - 5.0, thermal_resistance_kw: res.thermal_resistance_kw, }; crate::engine::formatters::i8kmon::I8kmonTranslator::save(i8k_path, &i8k_config)?; self.log(&format!("✓ Saved '{}'.", i8k_path.display()))?; res.config_paths.insert("i8kmon".to_string(), i8k_path.clone()); } Ok(res) } /// Spawns a concurrent monitor that polls safety sensors every 100ms. fn spawn_watchdog_monitor(&self) -> thread::JoinHandle<()> { let abort = self.emergency_abort.clone(); let reason_store = self.emergency_reason.clone(); let sal = self.sal.clone(); let tx = self.telemetry_tx.clone(); thread::spawn(move || { while !abort.load(Ordering::SeqCst) { let status = sal.get_safety_status(); match status { Ok(SafetyStatus::EmergencyAbort(reason)) => { *reason_store.lock().unwrap() = Some(reason.clone()); abort.store(true, Ordering::SeqCst); break; } Ok(SafetyStatus::Warning(msg)) | Ok(SafetyStatus::Critical(msg)) => { let state = TelemetryState { cpu_model: String::new(), total_ram_gb: 0, tick: 0, cpu_temp: 0.0, power_w: 0.0, current_freq: 0.0, fans: Vec::new(), governor: String::new(), pl1_limit: 0.0, pl2_limit: 0.0, fan_tier: String::new(), phase: BenchmarkPhase::StressTesting, history_watts: Vec::new(), history_temp: Vec::new(), history_mhz: Vec::new(), log_event: Some(format!("WATCHDOG: {}", msg)), metadata: std::collections::HashMap::new(), is_emergency: false, emergency_reason: None, }; let _ = tx.send(state); } Ok(SafetyStatus::Nominal) => {} Err(e) => { *reason_store.lock().unwrap() = Some(format!("Watchdog Sensor Failure: {}", e)); abort.store(true, Ordering::SeqCst); break; } } thread::sleep(Duration::from_millis(100)); } }) } /// Generates the final [OptimizationResult] based on current measurements. pub fn generate_result(&self, is_partial: bool) -> OptimizationResult { let r_theta = self.engine.calculate_thermal_resistance(&self.profile); let knee = self.engine.find_silicon_knee(&self.profile); let max_t = self.engine.get_max_temp(&self.profile); OptimizationResult { profile: self.profile.clone(), silicon_knee_watts: knee, thermal_resistance_kw: r_theta, recommended_pl1: knee, recommended_pl2: knee * 1.25, max_temp_c: max_t, is_partial, config_paths: std::collections::HashMap::new(), } } /// Checks if the benchmark has been aborted by the user or the watchdog. fn check_abort(&self) -> Result<()> { if self.emergency_abort.load(Ordering::SeqCst) { let reason = self.emergency_reason.lock().unwrap().clone().unwrap_or_else(|| "Unknown safety trigger".to_string()); return Err(anyhow::anyhow!("EMERGENCY_ABORT: {}", reason)); } if let Ok(cmd) = self.command_rx.try_recv() { match cmd { UiCommand::Abort => { return Err(anyhow::anyhow!("ABORTED")); } } } Ok(()) } /// Helper to send log messages to the frontend. fn log(&self, msg: &str) -> Result<()> { let state = TelemetryState { cpu_model: self.cpu_model.clone(), total_ram_gb: self.total_ram_gb, tick: 0, cpu_temp: self.sal.get_temp().unwrap_or(0.0), power_w: self.sal.get_power_w().unwrap_or(0.0), current_freq: self.sal.get_freq_mhz().unwrap_or(0.0), fans: self.sal.get_fan_rpms().unwrap_or_default(), governor: "unknown".to_string(), pl1_limit: 0.0, pl2_limit: 0.0, fan_tier: "auto".to_string(), phase: self.phase, history_watts: Vec::new(), history_temp: Vec::new(), history_mhz: Vec::new(), log_event: Some(msg.to_string()), metadata: std::collections::HashMap::new(), is_emergency: self.emergency_abort.load(Ordering::SeqCst), emergency_reason: self.emergency_reason.lock().unwrap().clone(), }; self.telemetry_tx.send(state).map_err(|_| anyhow::anyhow!("Telemetry channel closed")) } /// Collects current sensors and sends a complete [TelemetryState] to the frontend. fn send_telemetry(&mut self, tick: u64) -> Result<()> { let temp = self.sal.get_temp().unwrap_or(0.0); let pwr = self.sal.get_power_w().unwrap_or(0.0); let freq = self.sal.get_freq_mhz().unwrap_or(0.0); self.history_temp.push_back(temp); self.history_watts.push_back(pwr); self.history_mhz.push_back(freq); if self.history_temp.len() > 120 { self.history_temp.pop_front(); self.history_watts.pop_front(); self.history_mhz.pop_front(); } let state = TelemetryState { cpu_model: self.cpu_model.clone(), total_ram_gb: self.total_ram_gb, tick, cpu_temp: temp, power_w: pwr, current_freq: freq, fans: self.sal.get_fan_rpms().unwrap_or_default(), governor: "performance".to_string(), pl1_limit: 15.0, pl2_limit: 25.0, fan_tier: "max".to_string(), phase: self.phase, history_watts: self.history_watts.iter().cloned().collect(), history_temp: self.history_temp.iter().cloned().collect(), history_mhz: self.history_mhz.iter().cloned().collect(), log_event: None, metadata: std::collections::HashMap::new(), is_emergency: self.emergency_abort.load(Ordering::SeqCst), emergency_reason: self.emergency_reason.lock().unwrap().clone(), }; self.telemetry_tx.send(state).map_err(|_| anyhow::anyhow!("Telemetry channel closed")) } }