Compare commits

...

3 Commits

Author SHA1 Message Date
google-labs-jules[bot] 2a56e1db6b perf: Optimize PipeWire communication with single background thread actor
Replaces the previous approach of spawning new PipeWire main loops and
awaiting a 100ms timeout for every `get_all_devices` call. Instead,
this introduces a `PipeWireManager` which maintains a single, persistent
background thread with a PipeWire main loop and registry listener. The
actor caches `AudioDevice`s and `Port`s continuously in a `HashMap`,
providing instantaneous responses to device queries via a channel.

This architectural change replaces `Sender<Terminate>` with a simple
`PwTerminator` drop-guard to manage the lifetimes of PipeWire objects
(virtual mic and links) via `PwCommand::DestroyObject` without creating
additional OS threads or PipeWire contexts.

Co-authored-by: arabianq <55220741+arabianq@users.noreply.github.com>
2026-06-13 14:34:54 +00:00
arabianq 838fc1ce29 fix: increment pkgrel to 2 for pwsp-bin aur package 2026-06-05 23:52:31 +03:00
arabianq 622cf39fa2 fix: correct asset paths in pwsp-bin aur package 2026-06-05 23:52:06 +03:00
5 changed files with 296 additions and 277 deletions
+1 -1
View File
@@ -1,7 +1,7 @@
pkgbase = pwsp-bin
pkgdesc = Lets you play audio files through your microphone (Pre-built binaries)
pkgver = 1.12.0
pkgrel = 1
pkgrel = 2
url = https://github.com/arabianq/pipewire-soundpad
arch = x86_64
arch = aarch64
+4 -4
View File
@@ -2,7 +2,7 @@
pkgname=pwsp-bin
_pkgname=pipewire-soundpad
pkgver=1.12.0
pkgrel=1
pkgrel=2
pkgdesc="Lets you play audio files through your microphone (Pre-built binaries)"
arch=('x86_64' 'aarch64')
url="https://github.com/arabianq/pipewire-soundpad"
@@ -26,9 +26,9 @@ package() {
install -Dm755 "${srcdir}/pwsp-daemon" "${pkgdir}/usr/bin/pwsp-daemon"
install -Dm755 "${srcdir}/pwsp-gui" "${pkgdir}/usr/bin/pwsp-gui"
install -Dm644 "$_srcsrc/assets/pwsp-gui.desktop" "${pkgdir}/usr/share/applications/pwsp-gui.desktop"
install -Dm644 "$_srcsrc/assets/icon.png" "${pkgdir}/usr/share/icons/hicolor/256x256/apps/pwsp.png"
install -Dm644 "$_srcsrc/assets/pwsp-daemon.service" "${pkgdir}/usr/lib/systemd/user/pwsp-daemon.service"
install -Dm644 "$_srcsrc/pwsp-gui/assets/pwsp-gui.desktop" "${pkgdir}/usr/share/applications/pwsp-gui.desktop"
install -Dm644 "$_srcsrc/pwsp-gui/assets/icon.png" "${pkgdir}/usr/share/icons/hicolor/256x256/apps/pwsp.png"
install -Dm644 "$_srcsrc/pwsp-gui/assets/pwsp-daemon.service" "${pkgdir}/usr/lib/systemd/user/pwsp-daemon.service"
install -Dm644 "$_srcsrc/LICENSE" "${pkgdir}/usr/share/licenses/${pkgname}/LICENSE"
}
+4 -1
View File
@@ -28,7 +28,10 @@ async fn main() -> Result<()> {
}
get_daemon_config(); // Initialize daemon config
create_virtual_mic()?;
// Virtual mic object must be kept alive by some variable until daemon exits
let _virtual_mic = create_virtual_mic().await?;
if let Err(err) = get_audio_player().await {
eprintln!("Failed to initialize audio player: {}", err);
} // Initialize audio player
+11 -19
View File
@@ -1,8 +1,8 @@
use crate::{
types::pipewire::{DeviceType, Terminate},
types::pipewire::DeviceType,
utils::{
daemon::get_daemon_config,
pipewire::{create_link, get_device, link_player_to_virtual_mic},
pipewire::{PwTerminator, create_link, get_device, link_player_to_virtual_mic},
},
};
use anyhow::{Result, anyhow};
@@ -58,8 +58,8 @@ pub struct AudioPlayer {
pub tracks: HashMap<u32, PlayingSound>,
pub next_id: u32,
input_link_sender: Option<pipewire::channel::Sender<Terminate>>,
player_link_sender: Option<pipewire::channel::Sender<Terminate>>,
input_link_sender: Option<PwTerminator>,
player_link_sender: Option<PwTerminator>,
pub input_device_name: Option<String>,
pub volume: f32, // Master volume
@@ -108,24 +108,16 @@ impl AudioPlayer {
}
fn abort_link_thread(&mut self) {
if let Some(sender) = &self.input_link_sender {
if sender.send(Terminate {}).is_ok() {
println!("Sent terminate signal to input link thread");
self.input_link_sender = None;
} else {
eprintln!("Failed to send terminate signal to input link thread");
}
if self.input_link_sender.is_some() {
println!("Sent terminate signal to input link thread");
self.input_link_sender = None;
}
}
fn abort_player_link_thread(&mut self) {
if let Some(sender) = &self.player_link_sender {
if sender.send(Terminate {}).is_ok() {
println!("Sent terminate signal to player link thread");
self.player_link_sender = None;
} else {
eprintln!("Failed to send terminate signal to player link thread");
}
if self.player_link_sender.is_some() {
println!("Sent terminate signal to player link thread");
self.player_link_sender = None;
}
}
@@ -187,7 +179,7 @@ impl AudioPlayer {
return Ok(());
};
self.input_link_sender = Some(create_link(output_fl, output_fr, input_fl, input_fr)?);
self.input_link_sender = Some(create_link(output_fl, output_fr, input_fl, input_fr).await?);
Ok(())
}
+276 -252
View File
@@ -1,14 +1,224 @@
use crate::types::pipewire::{AudioDevice, DeviceType, Port, Terminate};
use crate::types::pipewire::{AudioDevice, DeviceType, Port};
use anyhow::{Result, anyhow};
use pipewire::{
context::ContextRc, link::Link, main_loop::MainLoopRc, properties::properties,
registry::GlobalObject, spa::utils::dict::DictRef,
};
use std::{collections::HashMap, thread};
use tokio::{
sync::mpsc,
time::{Duration, timeout},
};
use std::{cell::RefCell, collections::HashMap, rc::Rc, sync::OnceLock, thread};
use tokio::sync::oneshot;
pub enum PwCommand {
GetDevices {
resp: oneshot::Sender<(Vec<AudioDevice>, Vec<AudioDevice>)>,
},
CreateVirtualMic {
resp: oneshot::Sender<Result<u32, String>>,
},
CreateLink {
output_fl: Port,
output_fr: Port,
input_fl: Port,
input_fr: Port,
resp: oneshot::Sender<Result<(u32, u32), String>>,
},
DestroyObject {
id: u32,
},
}
struct AppState {
input_devices: HashMap<u32, AudioDevice>,
output_devices: HashMap<u32, AudioDevice>,
ports: HashMap<u32, Port>,
proxies: HashMap<u32, Box<dyn std::any::Any>>,
proxy_id_counter: u32,
ready_tx: Option<std::sync::mpsc::Sender<()>>,
}
pub struct PipewireManager {
pub sender: pipewire::channel::Sender<PwCommand>,
}
static MANAGER: OnceLock<PipewireManager> = OnceLock::new();
pub fn get_manager() -> &'static PipewireManager {
MANAGER.get_or_init(|| {
let (pw_sender, pw_receiver) = pipewire::channel::channel::<PwCommand>();
let (ready_tx, ready_rx) = std::sync::mpsc::channel();
thread::spawn(move || {
let (main_loop, context) = setup_pipewire_context().expect("Failed to setup pipewire");
// Leak main_loop and context so their borrows can be 'static
let main_loop = Box::leak(Box::new(main_loop));
let context = Box::leak(Box::new(context));
// Leak to fix lifetime issues since this thread lives forever
let core = Box::leak(Box::new(
context
.connect(None)
.expect("Failed to connect to pipewire"),
));
let registry = Box::leak(Box::new(
core.get_registry().expect("Failed to get registry"),
));
let state = Rc::new(RefCell::new(AppState {
input_devices: HashMap::new(),
output_devices: HashMap::new(),
ports: HashMap::new(),
proxies: HashMap::new(),
proxy_id_counter: 10000,
ready_tx: Some(ready_tx),
}));
let state_for_registry_add = state.clone();
let state_for_registry_remove = state.clone();
let _listener = registry
.add_listener_local()
.global(move |global| {
let (device, port) = parse_global_object(global);
let mut s = state_for_registry_add.borrow_mut();
if let Some(device) = device {
match device.device_type {
DeviceType::Input => {
s.input_devices.insert(device.id, device);
}
DeviceType::Output => {
s.output_devices.insert(device.id, device);
}
}
} else if let Some(port) = port {
let node_id = port.node_id;
s.ports.insert(port.port_id, port.clone());
if let Some(d) = s.input_devices.get_mut(&node_id) {
d.add_port(port.clone());
} else if let Some(d) = s.output_devices.get_mut(&node_id) {
d.add_port(port);
}
}
})
.global_remove(move |id| {
let mut s = state_for_registry_remove.borrow_mut();
s.input_devices.remove(&id);
s.output_devices.remove(&id);
s.ports.retain(|_, port| port.node_id != id);
s.ports.remove(&id);
})
.register();
// sync to signal ready
let state_for_sync = state.clone();
let _core_listener = core
.add_listener_local()
.done(move |id, _seq| {
if id == 0 {
let mut s = state_for_sync.borrow_mut();
if let Some(tx) = s.ready_tx.take() {
let _ = tx.send(());
}
}
})
.register();
let _pending = core.sync(0).expect("sync failed");
let state_for_cmd = state.clone();
let _receiver = pw_receiver.attach(main_loop.loop_(), move |cmd| {
let mut s = state_for_cmd.borrow_mut();
match cmd {
PwCommand::GetDevices { resp } => {
let mut inputs: Vec<AudioDevice> =
s.input_devices.values().cloned().collect();
let mut outputs: Vec<AudioDevice> =
s.output_devices.values().cloned().collect();
inputs.sort_by_key(|a| a.id);
outputs.sort_by_key(|a| a.id);
let _ = resp.send((inputs, outputs));
}
PwCommand::CreateVirtualMic { resp } => {
let props = properties!(
"factory.name" => "support.null-audio-sink",
"node.name" => "pwsp-virtual-mic",
"node.description" => "PWSP Virtual Mic",
"media.class" => "Audio/Source/Virtual",
"audio.position" => "[ FL FR ]",
"audio.channels" => "2",
"object.linger" => "false",
);
match core.create_object::<pipewire::node::Node>("adapter", &props) {
Ok(node) => {
s.proxy_id_counter += 1;
let id = s.proxy_id_counter;
s.proxies.insert(id, Box::new(node));
let _ = resp.send(Ok(id));
}
Err(e) => {
let _ = resp.send(Err(e.to_string()));
}
}
}
PwCommand::CreateLink {
output_fl,
output_fr,
input_fl,
input_fr,
resp,
} => {
let props_fl = properties! {
"link.output.node" => format!("{}", output_fl.node_id).as_str(),
"link.output.port" => format!("{}", output_fl.port_id).as_str(),
"link.input.node" => format!("{}", input_fl.node_id).as_str(),
"link.input.port" => format!("{}", input_fl.port_id).as_str(),
};
let props_fr = properties! {
"link.output.node" => format!("{}", output_fr.node_id).as_str(),
"link.output.port" => format!("{}", output_fr.port_id).as_str(),
"link.input.node" => format!("{}", input_fr.node_id).as_str(),
"link.input.port" => format!("{}", input_fr.port_id).as_str(),
};
let link_fl = match core.create_object::<Link>("link-factory", &props_fl) {
Ok(link) => link,
Err(e) => {
let _ = resp.send(Err(e.to_string()));
return;
}
};
let link_fr = match core.create_object::<Link>("link-factory", &props_fr) {
Ok(link) => link,
Err(e) => {
let _ = resp.send(Err(e.to_string()));
return;
}
};
s.proxy_id_counter += 1;
let id_fl = s.proxy_id_counter;
s.proxies.insert(id_fl, Box::new(link_fl));
s.proxy_id_counter += 1;
let id_fr = s.proxy_id_counter;
s.proxies.insert(id_fr, Box::new(link_fr));
let _ = resp.send(Ok((id_fl, id_fr)));
}
PwCommand::DestroyObject { id } => {
s.proxies.remove(&id);
}
}
});
main_loop.run();
});
// Wait for the pipewire thread to be fully up and processed initial events
let _ = ready_rx.recv();
PipewireManager { sender: pw_sender }
})
}
pub fn setup_pipewire_context() -> Result<(MainLoopRc, ContextRc), String> {
pipewire::init();
@@ -71,127 +281,17 @@ fn parse_global_object(
(None, None)
}
async fn pw_get_global_objects_thread(
main_sender: mpsc::Sender<(Option<AudioDevice>, Option<Port>)>,
pw_receiver: pipewire::channel::Receiver<Terminate>,
init_sender: tokio::sync::oneshot::Sender<Result<(), String>>,
) {
let (main_loop, context) = match setup_pipewire_context() {
Ok(res) => res,
Err(e) => {
let _ = init_sender.send(Err(e));
return;
}
};
// Stop main loop on Terminate message
let _receiver = pw_receiver.attach(main_loop.loop_(), {
let _main_loop = main_loop.clone();
move |_| _main_loop.quit()
});
let core = match context.connect(None) {
Ok(core) => core,
Err(e) => {
let _ = init_sender.send(Err(format!("Failed to connect to pipewire context: {}", e)));
return;
}
};
let registry = match core.get_registry() {
Ok(registry) => registry,
Err(e) => {
let _ = init_sender.send(Err(format!(
"Failed to get registry from pipewire context: {}",
e
)));
return;
}
};
let _listener = registry
.add_listener_local()
.global(move |global| {
// Try to parse every global object pipewire finds
let (device, port) = parse_global_object(global);
// Send message to the main thread
let sender_clone = main_sender.clone();
tokio::task::spawn(async move {
sender_clone.send((device, port)).await.ok();
});
})
.register();
// Signal successful initialization
if init_sender.send(Ok(())).is_err() {
return;
}
main_loop.run();
}
pub async fn get_all_devices() -> Result<(Vec<AudioDevice>, Vec<AudioDevice>)> {
// Channels to communicate with pipewire thread
let (main_sender, mut main_receiver) = mpsc::channel(10);
let (pw_sender, pw_receiver) = pipewire::channel::channel();
let (init_sender, init_receiver) = tokio::sync::oneshot::channel();
// Spawn pipewire thread in background
let _pw_thread = tokio::spawn(async move {
pw_get_global_objects_thread(main_sender, pw_receiver, init_sender).await
});
// Wait for initialization to complete
if let Err(e) = init_receiver.await {
return Err(anyhow!(e));
}
let mut input_devices: HashMap<u32, AudioDevice> = HashMap::new();
let mut output_devices: HashMap<u32, AudioDevice> = HashMap::new();
let mut ports: Vec<Port> = vec![];
loop {
// If we don't receive a message in 100ms, we can assume that pipewire thread is finished
match timeout(Duration::from_millis(100), main_receiver.recv()).await {
Ok(Some((device, port))) => {
if let Some(device) = device {
match device.device_type {
DeviceType::Input => {
input_devices.insert(device.id, device);
}
DeviceType::Output => {
output_devices.insert(device.id, device);
}
}
} else if let Some(port) = port {
ports.push(port);
}
}
Ok(None) | Err(_) => {
// Pipewire thread is finished and we can collect our devices
let _ = pw_sender.send(Terminate {});
for port in ports {
let node_id = port.node_id;
if let Some(input_device) = input_devices.get_mut(&node_id) {
input_device.add_port(port);
} else if let Some(output_device) = output_devices.get_mut(&node_id) {
output_device.add_port(port);
}
}
let mut input_devices: Vec<AudioDevice> = input_devices.into_values().collect();
let mut output_devices: Vec<AudioDevice> = output_devices.into_values().collect();
input_devices.sort_by_key(|a| a.id);
output_devices.sort_by_key(|a| a.id);
return Ok((input_devices, output_devices));
}
}
}
let (tx, rx) = oneshot::channel();
let manager = get_manager();
manager
.sender
.send(PwCommand::GetDevices { resp: tx })
.map_err(|_| anyhow!("Failed to send GetDevices to manager"))?;
let res = rx
.await
.map_err(|e| anyhow!("Failed to receive response: {}", e))?;
Ok(res)
}
pub async fn get_device(device_name: &str) -> Result<AudioDevice> {
@@ -209,65 +309,36 @@ pub async fn get_device(device_name: &str) -> Result<AudioDevice> {
.ok_or_else(|| anyhow!("Device not found"))
}
pub fn create_virtual_mic() -> Result<pipewire::channel::Sender<Terminate>> {
let (pw_sender, pw_receiver) = pipewire::channel::channel::<Terminate>();
let (init_sender, init_receiver) = std::sync::mpsc::sync_channel(0);
let _pw_thread = thread::spawn(move || {
let (main_loop, context) = match setup_pipewire_context() {
Ok(res) => res,
Err(e) => {
let _ = init_sender.send(Err(e));
return;
}
};
let core = match context.connect(None) {
Ok(core) => core,
Err(e) => {
let _ =
init_sender.send(Err(format!("Failed to connect to pipewire context: {}", e)));
return;
}
};
let props = properties!(
"factory.name" => "support.null-audio-sink",
"node.name" => "pwsp-virtual-mic",
"node.description" => "PWSP Virtual Mic",
"media.class" => "Audio/Source/Virtual",
"audio.position" => "[ FL FR ]",
"audio.channels" => "2",
"object.linger" => "false", // Destroy the node on app exit
);
let _node = match core.create_object::<pipewire::node::Node>("adapter", &props) {
Ok(node) => node,
Err(e) => {
let _ = init_sender.send(Err(format!("Failed to create virtual mic: {}", e)));
return;
}
};
let _receiver = pw_receiver.attach(main_loop.loop_(), {
let _main_loop = main_loop.clone();
move |_| _main_loop.quit()
});
println!("Virtual mic created");
if init_sender.send(Ok(())).is_err() {
return;
}
main_loop.run();
});
if let Err(e) = init_receiver.recv()? {
return Err(anyhow!(e));
}
Ok(pw_sender)
pub struct PwTerminator {
ids: Vec<u32>,
}
pub async fn link_player_to_virtual_mic() -> Result<pipewire::channel::Sender<Terminate>> {
impl Drop for PwTerminator {
fn drop(&mut self) {
let manager = get_manager();
for id in &self.ids {
let _ = manager.sender.send(PwCommand::DestroyObject { id: *id });
}
}
}
pub async fn create_virtual_mic() -> Result<PwTerminator> {
let (tx, rx) = oneshot::channel();
let manager = get_manager();
manager
.sender
.send(PwCommand::CreateVirtualMic { resp: tx })
.map_err(|_| anyhow!("Failed to send CreateVirtualMic to manager"))?;
let res = rx
.await
.map_err(|e| anyhow!("Failed to receive response: {}", e))?;
let id = res.map_err(|e| anyhow!(e))?;
Ok(PwTerminator { ids: vec![id] })
}
pub async fn link_player_to_virtual_mic() -> Result<PwTerminator> {
let pwsp_daemon_output = match get_device("pwsp-daemon").await {
Ok(device) => device,
Err(_) => {
@@ -303,81 +374,34 @@ pub async fn link_player_to_virtual_mic() -> Result<pipewire::channel::Sender<Te
None => return Err(anyhow!("Failed to get pwsp-virtual-mic input_fr")),
};
create_link(output_fl, output_fr, input_fl, input_fr)
create_link(output_fl, output_fr, input_fl, input_fr).await
}
pub fn create_link(
pub async fn create_link(
output_fl: Port,
output_fr: Port,
input_fl: Port,
input_fr: Port,
) -> Result<pipewire::channel::Sender<Terminate>> {
let (pw_sender, pw_receiver) = pipewire::channel::channel::<Terminate>();
let (init_sender, init_receiver) = std::sync::mpsc::sync_channel(0);
) -> Result<PwTerminator> {
let (tx, rx) = oneshot::channel();
let manager = get_manager();
manager
.sender
.send(PwCommand::CreateLink {
output_fl,
output_fr,
input_fl,
input_fr,
resp: tx,
})
.map_err(|_| anyhow!("Failed to send CreateLink to manager"))?;
let _pw_thread = thread::spawn(move || {
let (main_loop, context) = match setup_pipewire_context() {
Ok(res) => res,
Err(e) => {
let _ = init_sender.send(Err(e));
return;
}
};
let core = match context.connect(None) {
Ok(core) => core,
Err(e) => {
let _ =
init_sender.send(Err(format!("Failed to connect to pipewire context: {}", e)));
return;
}
};
let res = rx
.await
.map_err(|e| anyhow!("Failed to receive response: {}", e))?;
let props_fl = properties! {
"link.output.node" => format!("{}", output_fl.node_id).as_str(),
"link.output.port" => format!("{}", output_fl.port_id).as_str(),
"link.input.node" => format!("{}", input_fl.node_id).as_str(),
"link.input.port" => format!("{}", input_fl.port_id).as_str(),
};
let props_fr = properties! {
"link.output.node" => format!("{}", output_fr.node_id).as_str(),
"link.output.port" => format!("{}", output_fr.port_id).as_str(),
"link.input.node" => format!("{}", input_fr.node_id).as_str(),
"link.input.port" => format!("{}", input_fr.port_id).as_str(),
};
let _link_fl = match core.create_object::<Link>("link-factory", &props_fl) {
Ok(link) => link,
Err(e) => {
let _ = init_sender.send(Err(format!("Failed to create link FL: {}", e)));
return;
}
};
let _link_fr = match core.create_object::<Link>("link-factory", &props_fr) {
Ok(link) => link,
Err(e) => {
let _ = init_sender.send(Err(format!("Failed to create link FR: {}", e)));
return;
}
};
let _receiver = pw_receiver.attach(main_loop.loop_(), {
let _main_loop = main_loop.clone();
move |_| _main_loop.quit()
});
println!(
"Link created: FL: {}-{} FR: {}-{}",
output_fl.node_id, input_fl.node_id, output_fr.node_id, input_fr.node_id
);
if init_sender.send(Ok(())).is_err() {
return;
}
main_loop.run();
});
if let Err(e) = init_receiver.recv()? {
return Err(anyhow!(e));
}
Ok(pw_sender)
let (id_fl, id_fr) = res.map_err(|e| anyhow!(e))?;
Ok(PwTerminator {
ids: vec![id_fl, id_fr],
})
}