From 2a56e1db6b36671c9da4d99523781db816c6c3f1 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sat, 13 Jun 2026 14:34:54 +0000 Subject: [PATCH] perf: Optimize PipeWire communication with single background thread actor Replaces the previous approach of spawning new PipeWire main loops and awaiting a 100ms timeout for every `get_all_devices` call. Instead, this introduces a `PipeWireManager` which maintains a single, persistent background thread with a PipeWire main loop and registry listener. The actor caches `AudioDevice`s and `Port`s continuously in a `HashMap`, providing instantaneous responses to device queries via a channel. This architectural change replaces `Sender` with a simple `PwTerminator` drop-guard to manage the lifetimes of PipeWire objects (virtual mic and links) via `PwCommand::DestroyObject` without creating additional OS threads or PipeWire contexts. Co-authored-by: arabianq <55220741+arabianq@users.noreply.github.com> --- pwsp-daemon/src/main.rs | 5 +- pwsp-lib/src/types/audio_player.rs | 30 +- pwsp-lib/src/utils/pipewire.rs | 528 +++++++++++++++-------------- 3 files changed, 291 insertions(+), 272 deletions(-) diff --git a/pwsp-daemon/src/main.rs b/pwsp-daemon/src/main.rs index 87cc27e..1129806 100644 --- a/pwsp-daemon/src/main.rs +++ b/pwsp-daemon/src/main.rs @@ -28,7 +28,10 @@ async fn main() -> Result<()> { } get_daemon_config(); // Initialize daemon config - create_virtual_mic()?; + + // Virtual mic object must be kept alive by some variable until daemon exits + let _virtual_mic = create_virtual_mic().await?; + if let Err(err) = get_audio_player().await { eprintln!("Failed to initialize audio player: {}", err); } // Initialize audio player diff --git a/pwsp-lib/src/types/audio_player.rs b/pwsp-lib/src/types/audio_player.rs index 0f74193..f03fd93 100644 --- a/pwsp-lib/src/types/audio_player.rs +++ b/pwsp-lib/src/types/audio_player.rs @@ -1,8 +1,8 @@ use crate::{ - types::pipewire::{DeviceType, Terminate}, + types::pipewire::DeviceType, utils::{ daemon::get_daemon_config, - pipewire::{create_link, get_device, link_player_to_virtual_mic}, + pipewire::{PwTerminator, create_link, get_device, link_player_to_virtual_mic}, }, }; use anyhow::{Result, anyhow}; @@ -58,8 +58,8 @@ pub struct AudioPlayer { pub tracks: HashMap, pub next_id: u32, - input_link_sender: Option>, - player_link_sender: Option>, + input_link_sender: Option, + player_link_sender: Option, pub input_device_name: Option, pub volume: f32, // Master volume @@ -108,24 +108,16 @@ impl AudioPlayer { } fn abort_link_thread(&mut self) { - if let Some(sender) = &self.input_link_sender { - if sender.send(Terminate {}).is_ok() { - println!("Sent terminate signal to input link thread"); - self.input_link_sender = None; - } else { - eprintln!("Failed to send terminate signal to input link thread"); - } + if self.input_link_sender.is_some() { + println!("Sent terminate signal to input link thread"); + self.input_link_sender = None; } } fn abort_player_link_thread(&mut self) { - if let Some(sender) = &self.player_link_sender { - if sender.send(Terminate {}).is_ok() { - println!("Sent terminate signal to player link thread"); - self.player_link_sender = None; - } else { - eprintln!("Failed to send terminate signal to player link thread"); - } + if self.player_link_sender.is_some() { + println!("Sent terminate signal to player link thread"); + self.player_link_sender = None; } } @@ -187,7 +179,7 @@ impl AudioPlayer { return Ok(()); }; - self.input_link_sender = Some(create_link(output_fl, output_fr, input_fl, input_fr)?); + self.input_link_sender = Some(create_link(output_fl, output_fr, input_fl, input_fr).await?); Ok(()) } diff --git a/pwsp-lib/src/utils/pipewire.rs b/pwsp-lib/src/utils/pipewire.rs index 289493d..db9f485 100644 --- a/pwsp-lib/src/utils/pipewire.rs +++ b/pwsp-lib/src/utils/pipewire.rs @@ -1,14 +1,224 @@ -use crate::types::pipewire::{AudioDevice, DeviceType, Port, Terminate}; +use crate::types::pipewire::{AudioDevice, DeviceType, Port}; use anyhow::{Result, anyhow}; use pipewire::{ context::ContextRc, link::Link, main_loop::MainLoopRc, properties::properties, registry::GlobalObject, spa::utils::dict::DictRef, }; -use std::{collections::HashMap, thread}; -use tokio::{ - sync::mpsc, - time::{Duration, timeout}, -}; +use std::{cell::RefCell, collections::HashMap, rc::Rc, sync::OnceLock, thread}; +use tokio::sync::oneshot; + +pub enum PwCommand { + GetDevices { + resp: oneshot::Sender<(Vec, Vec)>, + }, + CreateVirtualMic { + resp: oneshot::Sender>, + }, + CreateLink { + output_fl: Port, + output_fr: Port, + input_fl: Port, + input_fr: Port, + resp: oneshot::Sender>, + }, + DestroyObject { + id: u32, + }, +} + +struct AppState { + input_devices: HashMap, + output_devices: HashMap, + ports: HashMap, + proxies: HashMap>, + proxy_id_counter: u32, + ready_tx: Option>, +} + +pub struct PipewireManager { + pub sender: pipewire::channel::Sender, +} + +static MANAGER: OnceLock = OnceLock::new(); + +pub fn get_manager() -> &'static PipewireManager { + MANAGER.get_or_init(|| { + let (pw_sender, pw_receiver) = pipewire::channel::channel::(); + let (ready_tx, ready_rx) = std::sync::mpsc::channel(); + + thread::spawn(move || { + let (main_loop, context) = setup_pipewire_context().expect("Failed to setup pipewire"); + + // Leak main_loop and context so their borrows can be 'static + let main_loop = Box::leak(Box::new(main_loop)); + let context = Box::leak(Box::new(context)); + + // Leak to fix lifetime issues since this thread lives forever + let core = Box::leak(Box::new( + context + .connect(None) + .expect("Failed to connect to pipewire"), + )); + let registry = Box::leak(Box::new( + core.get_registry().expect("Failed to get registry"), + )); + + let state = Rc::new(RefCell::new(AppState { + input_devices: HashMap::new(), + output_devices: HashMap::new(), + ports: HashMap::new(), + proxies: HashMap::new(), + proxy_id_counter: 10000, + ready_tx: Some(ready_tx), + })); + + let state_for_registry_add = state.clone(); + let state_for_registry_remove = state.clone(); + + let _listener = registry + .add_listener_local() + .global(move |global| { + let (device, port) = parse_global_object(global); + let mut s = state_for_registry_add.borrow_mut(); + if let Some(device) = device { + match device.device_type { + DeviceType::Input => { + s.input_devices.insert(device.id, device); + } + DeviceType::Output => { + s.output_devices.insert(device.id, device); + } + } + } else if let Some(port) = port { + let node_id = port.node_id; + s.ports.insert(port.port_id, port.clone()); + if let Some(d) = s.input_devices.get_mut(&node_id) { + d.add_port(port.clone()); + } else if let Some(d) = s.output_devices.get_mut(&node_id) { + d.add_port(port); + } + } + }) + .global_remove(move |id| { + let mut s = state_for_registry_remove.borrow_mut(); + s.input_devices.remove(&id); + s.output_devices.remove(&id); + s.ports.retain(|_, port| port.node_id != id); + s.ports.remove(&id); + }) + .register(); + + // sync to signal ready + let state_for_sync = state.clone(); + let _core_listener = core + .add_listener_local() + .done(move |id, _seq| { + if id == 0 { + let mut s = state_for_sync.borrow_mut(); + if let Some(tx) = s.ready_tx.take() { + let _ = tx.send(()); + } + } + }) + .register(); + + let _pending = core.sync(0).expect("sync failed"); + + let state_for_cmd = state.clone(); + let _receiver = pw_receiver.attach(main_loop.loop_(), move |cmd| { + let mut s = state_for_cmd.borrow_mut(); + match cmd { + PwCommand::GetDevices { resp } => { + let mut inputs: Vec = + s.input_devices.values().cloned().collect(); + let mut outputs: Vec = + s.output_devices.values().cloned().collect(); + inputs.sort_by_key(|a| a.id); + outputs.sort_by_key(|a| a.id); + let _ = resp.send((inputs, outputs)); + } + PwCommand::CreateVirtualMic { resp } => { + let props = properties!( + "factory.name" => "support.null-audio-sink", + "node.name" => "pwsp-virtual-mic", + "node.description" => "PWSP Virtual Mic", + "media.class" => "Audio/Source/Virtual", + "audio.position" => "[ FL FR ]", + "audio.channels" => "2", + "object.linger" => "false", + ); + match core.create_object::("adapter", &props) { + Ok(node) => { + s.proxy_id_counter += 1; + let id = s.proxy_id_counter; + s.proxies.insert(id, Box::new(node)); + let _ = resp.send(Ok(id)); + } + Err(e) => { + let _ = resp.send(Err(e.to_string())); + } + } + } + PwCommand::CreateLink { + output_fl, + output_fr, + input_fl, + input_fr, + resp, + } => { + let props_fl = properties! { + "link.output.node" => format!("{}", output_fl.node_id).as_str(), + "link.output.port" => format!("{}", output_fl.port_id).as_str(), + "link.input.node" => format!("{}", input_fl.node_id).as_str(), + "link.input.port" => format!("{}", input_fl.port_id).as_str(), + }; + let props_fr = properties! { + "link.output.node" => format!("{}", output_fr.node_id).as_str(), + "link.output.port" => format!("{}", output_fr.port_id).as_str(), + "link.input.node" => format!("{}", input_fr.node_id).as_str(), + "link.input.port" => format!("{}", input_fr.port_id).as_str(), + }; + + let link_fl = match core.create_object::("link-factory", &props_fl) { + Ok(link) => link, + Err(e) => { + let _ = resp.send(Err(e.to_string())); + return; + } + }; + let link_fr = match core.create_object::("link-factory", &props_fr) { + Ok(link) => link, + Err(e) => { + let _ = resp.send(Err(e.to_string())); + return; + } + }; + + s.proxy_id_counter += 1; + let id_fl = s.proxy_id_counter; + s.proxies.insert(id_fl, Box::new(link_fl)); + + s.proxy_id_counter += 1; + let id_fr = s.proxy_id_counter; + s.proxies.insert(id_fr, Box::new(link_fr)); + + let _ = resp.send(Ok((id_fl, id_fr))); + } + PwCommand::DestroyObject { id } => { + s.proxies.remove(&id); + } + } + }); + + main_loop.run(); + }); + + // Wait for the pipewire thread to be fully up and processed initial events + let _ = ready_rx.recv(); + + PipewireManager { sender: pw_sender } + }) +} pub fn setup_pipewire_context() -> Result<(MainLoopRc, ContextRc), String> { pipewire::init(); @@ -71,127 +281,17 @@ fn parse_global_object( (None, None) } -async fn pw_get_global_objects_thread( - main_sender: mpsc::Sender<(Option, Option)>, - pw_receiver: pipewire::channel::Receiver, - init_sender: tokio::sync::oneshot::Sender>, -) { - let (main_loop, context) = match setup_pipewire_context() { - Ok(res) => res, - Err(e) => { - let _ = init_sender.send(Err(e)); - return; - } - }; - - // Stop main loop on Terminate message - let _receiver = pw_receiver.attach(main_loop.loop_(), { - let _main_loop = main_loop.clone(); - move |_| _main_loop.quit() - }); - - let core = match context.connect(None) { - Ok(core) => core, - Err(e) => { - let _ = init_sender.send(Err(format!("Failed to connect to pipewire context: {}", e))); - return; - } - }; - - let registry = match core.get_registry() { - Ok(registry) => registry, - Err(e) => { - let _ = init_sender.send(Err(format!( - "Failed to get registry from pipewire context: {}", - e - ))); - return; - } - }; - - let _listener = registry - .add_listener_local() - .global(move |global| { - // Try to parse every global object pipewire finds - let (device, port) = parse_global_object(global); - - // Send message to the main thread - let sender_clone = main_sender.clone(); - tokio::task::spawn(async move { - sender_clone.send((device, port)).await.ok(); - }); - }) - .register(); - - // Signal successful initialization - if init_sender.send(Ok(())).is_err() { - return; - } - - main_loop.run(); -} - pub async fn get_all_devices() -> Result<(Vec, Vec)> { - // Channels to communicate with pipewire thread - let (main_sender, mut main_receiver) = mpsc::channel(10); - let (pw_sender, pw_receiver) = pipewire::channel::channel(); - let (init_sender, init_receiver) = tokio::sync::oneshot::channel(); - - // Spawn pipewire thread in background - let _pw_thread = tokio::spawn(async move { - pw_get_global_objects_thread(main_sender, pw_receiver, init_sender).await - }); - - // Wait for initialization to complete - if let Err(e) = init_receiver.await { - return Err(anyhow!(e)); - } - - let mut input_devices: HashMap = HashMap::new(); - let mut output_devices: HashMap = HashMap::new(); - let mut ports: Vec = vec![]; - - loop { - // If we don't receive a message in 100ms, we can assume that pipewire thread is finished - match timeout(Duration::from_millis(100), main_receiver.recv()).await { - Ok(Some((device, port))) => { - if let Some(device) = device { - match device.device_type { - DeviceType::Input => { - input_devices.insert(device.id, device); - } - DeviceType::Output => { - output_devices.insert(device.id, device); - } - } - } else if let Some(port) = port { - ports.push(port); - } - } - Ok(None) | Err(_) => { - // Pipewire thread is finished and we can collect our devices - let _ = pw_sender.send(Terminate {}); - - for port in ports { - let node_id = port.node_id; - - if let Some(input_device) = input_devices.get_mut(&node_id) { - input_device.add_port(port); - } else if let Some(output_device) = output_devices.get_mut(&node_id) { - output_device.add_port(port); - } - } - - let mut input_devices: Vec = input_devices.into_values().collect(); - let mut output_devices: Vec = output_devices.into_values().collect(); - - input_devices.sort_by_key(|a| a.id); - output_devices.sort_by_key(|a| a.id); - - return Ok((input_devices, output_devices)); - } - } - } + let (tx, rx) = oneshot::channel(); + let manager = get_manager(); + manager + .sender + .send(PwCommand::GetDevices { resp: tx }) + .map_err(|_| anyhow!("Failed to send GetDevices to manager"))?; + let res = rx + .await + .map_err(|e| anyhow!("Failed to receive response: {}", e))?; + Ok(res) } pub async fn get_device(device_name: &str) -> Result { @@ -209,65 +309,36 @@ pub async fn get_device(device_name: &str) -> Result { .ok_or_else(|| anyhow!("Device not found")) } -pub fn create_virtual_mic() -> Result> { - let (pw_sender, pw_receiver) = pipewire::channel::channel::(); - let (init_sender, init_receiver) = std::sync::mpsc::sync_channel(0); - - let _pw_thread = thread::spawn(move || { - let (main_loop, context) = match setup_pipewire_context() { - Ok(res) => res, - Err(e) => { - let _ = init_sender.send(Err(e)); - return; - } - }; - let core = match context.connect(None) { - Ok(core) => core, - Err(e) => { - let _ = - init_sender.send(Err(format!("Failed to connect to pipewire context: {}", e))); - return; - } - }; - - let props = properties!( - "factory.name" => "support.null-audio-sink", - "node.name" => "pwsp-virtual-mic", - "node.description" => "PWSP Virtual Mic", - "media.class" => "Audio/Source/Virtual", - "audio.position" => "[ FL FR ]", - "audio.channels" => "2", - "object.linger" => "false", // Destroy the node on app exit - ); - - let _node = match core.create_object::("adapter", &props) { - Ok(node) => node, - Err(e) => { - let _ = init_sender.send(Err(format!("Failed to create virtual mic: {}", e))); - return; - } - }; - - let _receiver = pw_receiver.attach(main_loop.loop_(), { - let _main_loop = main_loop.clone(); - move |_| _main_loop.quit() - }); - - println!("Virtual mic created"); - if init_sender.send(Ok(())).is_err() { - return; - } - main_loop.run(); - }); - - if let Err(e) = init_receiver.recv()? { - return Err(anyhow!(e)); - } - - Ok(pw_sender) +pub struct PwTerminator { + ids: Vec, } -pub async fn link_player_to_virtual_mic() -> Result> { +impl Drop for PwTerminator { + fn drop(&mut self) { + let manager = get_manager(); + for id in &self.ids { + let _ = manager.sender.send(PwCommand::DestroyObject { id: *id }); + } + } +} + +pub async fn create_virtual_mic() -> Result { + let (tx, rx) = oneshot::channel(); + let manager = get_manager(); + manager + .sender + .send(PwCommand::CreateVirtualMic { resp: tx }) + .map_err(|_| anyhow!("Failed to send CreateVirtualMic to manager"))?; + + let res = rx + .await + .map_err(|e| anyhow!("Failed to receive response: {}", e))?; + + let id = res.map_err(|e| anyhow!(e))?; + Ok(PwTerminator { ids: vec![id] }) +} + +pub async fn link_player_to_virtual_mic() -> Result { let pwsp_daemon_output = match get_device("pwsp-daemon").await { Ok(device) => device, Err(_) => { @@ -303,81 +374,34 @@ pub async fn link_player_to_virtual_mic() -> Result return Err(anyhow!("Failed to get pwsp-virtual-mic input_fr")), }; - create_link(output_fl, output_fr, input_fl, input_fr) + create_link(output_fl, output_fr, input_fl, input_fr).await } -pub fn create_link( +pub async fn create_link( output_fl: Port, output_fr: Port, input_fl: Port, input_fr: Port, -) -> Result> { - let (pw_sender, pw_receiver) = pipewire::channel::channel::(); - let (init_sender, init_receiver) = std::sync::mpsc::sync_channel(0); +) -> Result { + let (tx, rx) = oneshot::channel(); + let manager = get_manager(); + manager + .sender + .send(PwCommand::CreateLink { + output_fl, + output_fr, + input_fl, + input_fr, + resp: tx, + }) + .map_err(|_| anyhow!("Failed to send CreateLink to manager"))?; - let _pw_thread = thread::spawn(move || { - let (main_loop, context) = match setup_pipewire_context() { - Ok(res) => res, - Err(e) => { - let _ = init_sender.send(Err(e)); - return; - } - }; - let core = match context.connect(None) { - Ok(core) => core, - Err(e) => { - let _ = - init_sender.send(Err(format!("Failed to connect to pipewire context: {}", e))); - return; - } - }; + let res = rx + .await + .map_err(|e| anyhow!("Failed to receive response: {}", e))?; - let props_fl = properties! { - "link.output.node" => format!("{}", output_fl.node_id).as_str(), - "link.output.port" => format!("{}", output_fl.port_id).as_str(), - "link.input.node" => format!("{}", input_fl.node_id).as_str(), - "link.input.port" => format!("{}", input_fl.port_id).as_str(), - }; - let props_fr = properties! { - "link.output.node" => format!("{}", output_fr.node_id).as_str(), - "link.output.port" => format!("{}", output_fr.port_id).as_str(), - "link.input.node" => format!("{}", input_fr.node_id).as_str(), - "link.input.port" => format!("{}", input_fr.port_id).as_str(), - }; - - let _link_fl = match core.create_object::("link-factory", &props_fl) { - Ok(link) => link, - Err(e) => { - let _ = init_sender.send(Err(format!("Failed to create link FL: {}", e))); - return; - } - }; - let _link_fr = match core.create_object::("link-factory", &props_fr) { - Ok(link) => link, - Err(e) => { - let _ = init_sender.send(Err(format!("Failed to create link FR: {}", e))); - return; - } - }; - - let _receiver = pw_receiver.attach(main_loop.loop_(), { - let _main_loop = main_loop.clone(); - move |_| _main_loop.quit() - }); - - println!( - "Link created: FL: {}-{} FR: {}-{}", - output_fl.node_id, input_fl.node_id, output_fr.node_id, input_fr.node_id - ); - if init_sender.send(Ok(())).is_err() { - return; - } - main_loop.run(); - }); - - if let Err(e) = init_receiver.recv()? { - return Err(anyhow!(e)); - } - - Ok(pw_sender) + let (id_fl, id_fr) = res.map_err(|e| anyhow!(e))?; + Ok(PwTerminator { + ids: vec![id_fl, id_fr], + }) }