mirror of
https://github.com/arabianq/pipewire-soundpad.git
synced 2026-06-19 04:03:33 +00:00
perf: Optimize PipeWire communication with single background thread actor
Replaces the previous approach of spawning new PipeWire main loops and awaiting a 100ms timeout for every `get_all_devices` call. Instead, this introduces a `PipeWireManager` which maintains a single, persistent background thread with a PipeWire main loop and registry listener. The actor caches `AudioDevice`s and `Port`s continuously in a `HashMap`, providing instantaneous responses to device queries via a channel. This architectural change replaces `Sender<Terminate>` with a simple `PwTerminator` drop-guard to manage the lifetimes of PipeWire objects (virtual mic and links) via `PwCommand::DestroyObject` without creating additional OS threads or PipeWire contexts. Co-authored-by: arabianq <55220741+arabianq@users.noreply.github.com>
This commit is contained in:
@@ -28,7 +28,10 @@ async fn main() -> Result<()> {
|
||||
}
|
||||
|
||||
get_daemon_config(); // Initialize daemon config
|
||||
create_virtual_mic()?;
|
||||
|
||||
// Virtual mic object must be kept alive by some variable until daemon exits
|
||||
let _virtual_mic = create_virtual_mic().await?;
|
||||
|
||||
if let Err(err) = get_audio_player().await {
|
||||
eprintln!("Failed to initialize audio player: {}", err);
|
||||
} // Initialize audio player
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use crate::{
|
||||
types::pipewire::{DeviceType, Terminate},
|
||||
types::pipewire::DeviceType,
|
||||
utils::{
|
||||
daemon::get_daemon_config,
|
||||
pipewire::{create_link, get_device, link_player_to_virtual_mic},
|
||||
pipewire::{PwTerminator, create_link, get_device, link_player_to_virtual_mic},
|
||||
},
|
||||
};
|
||||
use anyhow::{Result, anyhow};
|
||||
@@ -58,8 +58,8 @@ pub struct AudioPlayer {
|
||||
pub tracks: HashMap<u32, PlayingSound>,
|
||||
pub next_id: u32,
|
||||
|
||||
input_link_sender: Option<pipewire::channel::Sender<Terminate>>,
|
||||
player_link_sender: Option<pipewire::channel::Sender<Terminate>>,
|
||||
input_link_sender: Option<PwTerminator>,
|
||||
player_link_sender: Option<PwTerminator>,
|
||||
pub input_device_name: Option<String>,
|
||||
|
||||
pub volume: f32, // Master volume
|
||||
@@ -108,24 +108,16 @@ impl AudioPlayer {
|
||||
}
|
||||
|
||||
fn abort_link_thread(&mut self) {
|
||||
if let Some(sender) = &self.input_link_sender {
|
||||
if sender.send(Terminate {}).is_ok() {
|
||||
if self.input_link_sender.is_some() {
|
||||
println!("Sent terminate signal to input link thread");
|
||||
self.input_link_sender = None;
|
||||
} else {
|
||||
eprintln!("Failed to send terminate signal to input link thread");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn abort_player_link_thread(&mut self) {
|
||||
if let Some(sender) = &self.player_link_sender {
|
||||
if sender.send(Terminate {}).is_ok() {
|
||||
if self.player_link_sender.is_some() {
|
||||
println!("Sent terminate signal to player link thread");
|
||||
self.player_link_sender = None;
|
||||
} else {
|
||||
eprintln!("Failed to send terminate signal to player link thread");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,7 +179,7 @@ impl AudioPlayer {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
self.input_link_sender = Some(create_link(output_fl, output_fr, input_fl, input_fr)?);
|
||||
self.input_link_sender = Some(create_link(output_fl, output_fr, input_fl, input_fr).await?);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
+273
-249
@@ -1,14 +1,224 @@
|
||||
use crate::types::pipewire::{AudioDevice, DeviceType, Port, Terminate};
|
||||
use crate::types::pipewire::{AudioDevice, DeviceType, Port};
|
||||
use anyhow::{Result, anyhow};
|
||||
use pipewire::{
|
||||
context::ContextRc, link::Link, main_loop::MainLoopRc, properties::properties,
|
||||
registry::GlobalObject, spa::utils::dict::DictRef,
|
||||
};
|
||||
use std::{collections::HashMap, thread};
|
||||
use tokio::{
|
||||
sync::mpsc,
|
||||
time::{Duration, timeout},
|
||||
use std::{cell::RefCell, collections::HashMap, rc::Rc, sync::OnceLock, thread};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
pub enum PwCommand {
|
||||
GetDevices {
|
||||
resp: oneshot::Sender<(Vec<AudioDevice>, Vec<AudioDevice>)>,
|
||||
},
|
||||
CreateVirtualMic {
|
||||
resp: oneshot::Sender<Result<u32, String>>,
|
||||
},
|
||||
CreateLink {
|
||||
output_fl: Port,
|
||||
output_fr: Port,
|
||||
input_fl: Port,
|
||||
input_fr: Port,
|
||||
resp: oneshot::Sender<Result<(u32, u32), String>>,
|
||||
},
|
||||
DestroyObject {
|
||||
id: u32,
|
||||
},
|
||||
}
|
||||
|
||||
struct AppState {
|
||||
input_devices: HashMap<u32, AudioDevice>,
|
||||
output_devices: HashMap<u32, AudioDevice>,
|
||||
ports: HashMap<u32, Port>,
|
||||
proxies: HashMap<u32, Box<dyn std::any::Any>>,
|
||||
proxy_id_counter: u32,
|
||||
ready_tx: Option<std::sync::mpsc::Sender<()>>,
|
||||
}
|
||||
|
||||
pub struct PipewireManager {
|
||||
pub sender: pipewire::channel::Sender<PwCommand>,
|
||||
}
|
||||
|
||||
static MANAGER: OnceLock<PipewireManager> = OnceLock::new();
|
||||
|
||||
pub fn get_manager() -> &'static PipewireManager {
|
||||
MANAGER.get_or_init(|| {
|
||||
let (pw_sender, pw_receiver) = pipewire::channel::channel::<PwCommand>();
|
||||
let (ready_tx, ready_rx) = std::sync::mpsc::channel();
|
||||
|
||||
thread::spawn(move || {
|
||||
let (main_loop, context) = setup_pipewire_context().expect("Failed to setup pipewire");
|
||||
|
||||
// Leak main_loop and context so their borrows can be 'static
|
||||
let main_loop = Box::leak(Box::new(main_loop));
|
||||
let context = Box::leak(Box::new(context));
|
||||
|
||||
// Leak to fix lifetime issues since this thread lives forever
|
||||
let core = Box::leak(Box::new(
|
||||
context
|
||||
.connect(None)
|
||||
.expect("Failed to connect to pipewire"),
|
||||
));
|
||||
let registry = Box::leak(Box::new(
|
||||
core.get_registry().expect("Failed to get registry"),
|
||||
));
|
||||
|
||||
let state = Rc::new(RefCell::new(AppState {
|
||||
input_devices: HashMap::new(),
|
||||
output_devices: HashMap::new(),
|
||||
ports: HashMap::new(),
|
||||
proxies: HashMap::new(),
|
||||
proxy_id_counter: 10000,
|
||||
ready_tx: Some(ready_tx),
|
||||
}));
|
||||
|
||||
let state_for_registry_add = state.clone();
|
||||
let state_for_registry_remove = state.clone();
|
||||
|
||||
let _listener = registry
|
||||
.add_listener_local()
|
||||
.global(move |global| {
|
||||
let (device, port) = parse_global_object(global);
|
||||
let mut s = state_for_registry_add.borrow_mut();
|
||||
if let Some(device) = device {
|
||||
match device.device_type {
|
||||
DeviceType::Input => {
|
||||
s.input_devices.insert(device.id, device);
|
||||
}
|
||||
DeviceType::Output => {
|
||||
s.output_devices.insert(device.id, device);
|
||||
}
|
||||
}
|
||||
} else if let Some(port) = port {
|
||||
let node_id = port.node_id;
|
||||
s.ports.insert(port.port_id, port.clone());
|
||||
if let Some(d) = s.input_devices.get_mut(&node_id) {
|
||||
d.add_port(port.clone());
|
||||
} else if let Some(d) = s.output_devices.get_mut(&node_id) {
|
||||
d.add_port(port);
|
||||
}
|
||||
}
|
||||
})
|
||||
.global_remove(move |id| {
|
||||
let mut s = state_for_registry_remove.borrow_mut();
|
||||
s.input_devices.remove(&id);
|
||||
s.output_devices.remove(&id);
|
||||
s.ports.retain(|_, port| port.node_id != id);
|
||||
s.ports.remove(&id);
|
||||
})
|
||||
.register();
|
||||
|
||||
// sync to signal ready
|
||||
let state_for_sync = state.clone();
|
||||
let _core_listener = core
|
||||
.add_listener_local()
|
||||
.done(move |id, _seq| {
|
||||
if id == 0 {
|
||||
let mut s = state_for_sync.borrow_mut();
|
||||
if let Some(tx) = s.ready_tx.take() {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
}
|
||||
})
|
||||
.register();
|
||||
|
||||
let _pending = core.sync(0).expect("sync failed");
|
||||
|
||||
let state_for_cmd = state.clone();
|
||||
let _receiver = pw_receiver.attach(main_loop.loop_(), move |cmd| {
|
||||
let mut s = state_for_cmd.borrow_mut();
|
||||
match cmd {
|
||||
PwCommand::GetDevices { resp } => {
|
||||
let mut inputs: Vec<AudioDevice> =
|
||||
s.input_devices.values().cloned().collect();
|
||||
let mut outputs: Vec<AudioDevice> =
|
||||
s.output_devices.values().cloned().collect();
|
||||
inputs.sort_by_key(|a| a.id);
|
||||
outputs.sort_by_key(|a| a.id);
|
||||
let _ = resp.send((inputs, outputs));
|
||||
}
|
||||
PwCommand::CreateVirtualMic { resp } => {
|
||||
let props = properties!(
|
||||
"factory.name" => "support.null-audio-sink",
|
||||
"node.name" => "pwsp-virtual-mic",
|
||||
"node.description" => "PWSP Virtual Mic",
|
||||
"media.class" => "Audio/Source/Virtual",
|
||||
"audio.position" => "[ FL FR ]",
|
||||
"audio.channels" => "2",
|
||||
"object.linger" => "false",
|
||||
);
|
||||
match core.create_object::<pipewire::node::Node>("adapter", &props) {
|
||||
Ok(node) => {
|
||||
s.proxy_id_counter += 1;
|
||||
let id = s.proxy_id_counter;
|
||||
s.proxies.insert(id, Box::new(node));
|
||||
let _ = resp.send(Ok(id));
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = resp.send(Err(e.to_string()));
|
||||
}
|
||||
}
|
||||
}
|
||||
PwCommand::CreateLink {
|
||||
output_fl,
|
||||
output_fr,
|
||||
input_fl,
|
||||
input_fr,
|
||||
resp,
|
||||
} => {
|
||||
let props_fl = properties! {
|
||||
"link.output.node" => format!("{}", output_fl.node_id).as_str(),
|
||||
"link.output.port" => format!("{}", output_fl.port_id).as_str(),
|
||||
"link.input.node" => format!("{}", input_fl.node_id).as_str(),
|
||||
"link.input.port" => format!("{}", input_fl.port_id).as_str(),
|
||||
};
|
||||
let props_fr = properties! {
|
||||
"link.output.node" => format!("{}", output_fr.node_id).as_str(),
|
||||
"link.output.port" => format!("{}", output_fr.port_id).as_str(),
|
||||
"link.input.node" => format!("{}", input_fr.node_id).as_str(),
|
||||
"link.input.port" => format!("{}", input_fr.port_id).as_str(),
|
||||
};
|
||||
|
||||
let link_fl = match core.create_object::<Link>("link-factory", &props_fl) {
|
||||
Ok(link) => link,
|
||||
Err(e) => {
|
||||
let _ = resp.send(Err(e.to_string()));
|
||||
return;
|
||||
}
|
||||
};
|
||||
let link_fr = match core.create_object::<Link>("link-factory", &props_fr) {
|
||||
Ok(link) => link,
|
||||
Err(e) => {
|
||||
let _ = resp.send(Err(e.to_string()));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
s.proxy_id_counter += 1;
|
||||
let id_fl = s.proxy_id_counter;
|
||||
s.proxies.insert(id_fl, Box::new(link_fl));
|
||||
|
||||
s.proxy_id_counter += 1;
|
||||
let id_fr = s.proxy_id_counter;
|
||||
s.proxies.insert(id_fr, Box::new(link_fr));
|
||||
|
||||
let _ = resp.send(Ok((id_fl, id_fr)));
|
||||
}
|
||||
PwCommand::DestroyObject { id } => {
|
||||
s.proxies.remove(&id);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
main_loop.run();
|
||||
});
|
||||
|
||||
// Wait for the pipewire thread to be fully up and processed initial events
|
||||
let _ = ready_rx.recv();
|
||||
|
||||
PipewireManager { sender: pw_sender }
|
||||
})
|
||||
}
|
||||
|
||||
pub fn setup_pipewire_context() -> Result<(MainLoopRc, ContextRc), String> {
|
||||
pipewire::init();
|
||||
@@ -71,127 +281,17 @@ fn parse_global_object(
|
||||
(None, None)
|
||||
}
|
||||
|
||||
async fn pw_get_global_objects_thread(
|
||||
main_sender: mpsc::Sender<(Option<AudioDevice>, Option<Port>)>,
|
||||
pw_receiver: pipewire::channel::Receiver<Terminate>,
|
||||
init_sender: tokio::sync::oneshot::Sender<Result<(), String>>,
|
||||
) {
|
||||
let (main_loop, context) = match setup_pipewire_context() {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
let _ = init_sender.send(Err(e));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Stop main loop on Terminate message
|
||||
let _receiver = pw_receiver.attach(main_loop.loop_(), {
|
||||
let _main_loop = main_loop.clone();
|
||||
move |_| _main_loop.quit()
|
||||
});
|
||||
|
||||
let core = match context.connect(None) {
|
||||
Ok(core) => core,
|
||||
Err(e) => {
|
||||
let _ = init_sender.send(Err(format!("Failed to connect to pipewire context: {}", e)));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let registry = match core.get_registry() {
|
||||
Ok(registry) => registry,
|
||||
Err(e) => {
|
||||
let _ = init_sender.send(Err(format!(
|
||||
"Failed to get registry from pipewire context: {}",
|
||||
e
|
||||
)));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let _listener = registry
|
||||
.add_listener_local()
|
||||
.global(move |global| {
|
||||
// Try to parse every global object pipewire finds
|
||||
let (device, port) = parse_global_object(global);
|
||||
|
||||
// Send message to the main thread
|
||||
let sender_clone = main_sender.clone();
|
||||
tokio::task::spawn(async move {
|
||||
sender_clone.send((device, port)).await.ok();
|
||||
});
|
||||
})
|
||||
.register();
|
||||
|
||||
// Signal successful initialization
|
||||
if init_sender.send(Ok(())).is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
main_loop.run();
|
||||
}
|
||||
|
||||
pub async fn get_all_devices() -> Result<(Vec<AudioDevice>, Vec<AudioDevice>)> {
|
||||
// Channels to communicate with pipewire thread
|
||||
let (main_sender, mut main_receiver) = mpsc::channel(10);
|
||||
let (pw_sender, pw_receiver) = pipewire::channel::channel();
|
||||
let (init_sender, init_receiver) = tokio::sync::oneshot::channel();
|
||||
|
||||
// Spawn pipewire thread in background
|
||||
let _pw_thread = tokio::spawn(async move {
|
||||
pw_get_global_objects_thread(main_sender, pw_receiver, init_sender).await
|
||||
});
|
||||
|
||||
// Wait for initialization to complete
|
||||
if let Err(e) = init_receiver.await {
|
||||
return Err(anyhow!(e));
|
||||
}
|
||||
|
||||
let mut input_devices: HashMap<u32, AudioDevice> = HashMap::new();
|
||||
let mut output_devices: HashMap<u32, AudioDevice> = HashMap::new();
|
||||
let mut ports: Vec<Port> = vec![];
|
||||
|
||||
loop {
|
||||
// If we don't receive a message in 100ms, we can assume that pipewire thread is finished
|
||||
match timeout(Duration::from_millis(100), main_receiver.recv()).await {
|
||||
Ok(Some((device, port))) => {
|
||||
if let Some(device) = device {
|
||||
match device.device_type {
|
||||
DeviceType::Input => {
|
||||
input_devices.insert(device.id, device);
|
||||
}
|
||||
DeviceType::Output => {
|
||||
output_devices.insert(device.id, device);
|
||||
}
|
||||
}
|
||||
} else if let Some(port) = port {
|
||||
ports.push(port);
|
||||
}
|
||||
}
|
||||
Ok(None) | Err(_) => {
|
||||
// Pipewire thread is finished and we can collect our devices
|
||||
let _ = pw_sender.send(Terminate {});
|
||||
|
||||
for port in ports {
|
||||
let node_id = port.node_id;
|
||||
|
||||
if let Some(input_device) = input_devices.get_mut(&node_id) {
|
||||
input_device.add_port(port);
|
||||
} else if let Some(output_device) = output_devices.get_mut(&node_id) {
|
||||
output_device.add_port(port);
|
||||
}
|
||||
}
|
||||
|
||||
let mut input_devices: Vec<AudioDevice> = input_devices.into_values().collect();
|
||||
let mut output_devices: Vec<AudioDevice> = output_devices.into_values().collect();
|
||||
|
||||
input_devices.sort_by_key(|a| a.id);
|
||||
output_devices.sort_by_key(|a| a.id);
|
||||
|
||||
return Ok((input_devices, output_devices));
|
||||
}
|
||||
}
|
||||
}
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let manager = get_manager();
|
||||
manager
|
||||
.sender
|
||||
.send(PwCommand::GetDevices { resp: tx })
|
||||
.map_err(|_| anyhow!("Failed to send GetDevices to manager"))?;
|
||||
let res = rx
|
||||
.await
|
||||
.map_err(|e| anyhow!("Failed to receive response: {}", e))?;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub async fn get_device(device_name: &str) -> Result<AudioDevice> {
|
||||
@@ -209,65 +309,36 @@ pub async fn get_device(device_name: &str) -> Result<AudioDevice> {
|
||||
.ok_or_else(|| anyhow!("Device not found"))
|
||||
}
|
||||
|
||||
pub fn create_virtual_mic() -> Result<pipewire::channel::Sender<Terminate>> {
|
||||
let (pw_sender, pw_receiver) = pipewire::channel::channel::<Terminate>();
|
||||
let (init_sender, init_receiver) = std::sync::mpsc::sync_channel(0);
|
||||
|
||||
let _pw_thread = thread::spawn(move || {
|
||||
let (main_loop, context) = match setup_pipewire_context() {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
let _ = init_sender.send(Err(e));
|
||||
return;
|
||||
}
|
||||
};
|
||||
let core = match context.connect(None) {
|
||||
Ok(core) => core,
|
||||
Err(e) => {
|
||||
let _ =
|
||||
init_sender.send(Err(format!("Failed to connect to pipewire context: {}", e)));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let props = properties!(
|
||||
"factory.name" => "support.null-audio-sink",
|
||||
"node.name" => "pwsp-virtual-mic",
|
||||
"node.description" => "PWSP Virtual Mic",
|
||||
"media.class" => "Audio/Source/Virtual",
|
||||
"audio.position" => "[ FL FR ]",
|
||||
"audio.channels" => "2",
|
||||
"object.linger" => "false", // Destroy the node on app exit
|
||||
);
|
||||
|
||||
let _node = match core.create_object::<pipewire::node::Node>("adapter", &props) {
|
||||
Ok(node) => node,
|
||||
Err(e) => {
|
||||
let _ = init_sender.send(Err(format!("Failed to create virtual mic: {}", e)));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let _receiver = pw_receiver.attach(main_loop.loop_(), {
|
||||
let _main_loop = main_loop.clone();
|
||||
move |_| _main_loop.quit()
|
||||
});
|
||||
|
||||
println!("Virtual mic created");
|
||||
if init_sender.send(Ok(())).is_err() {
|
||||
return;
|
||||
}
|
||||
main_loop.run();
|
||||
});
|
||||
|
||||
if let Err(e) = init_receiver.recv()? {
|
||||
return Err(anyhow!(e));
|
||||
pub struct PwTerminator {
|
||||
ids: Vec<u32>,
|
||||
}
|
||||
|
||||
Ok(pw_sender)
|
||||
impl Drop for PwTerminator {
|
||||
fn drop(&mut self) {
|
||||
let manager = get_manager();
|
||||
for id in &self.ids {
|
||||
let _ = manager.sender.send(PwCommand::DestroyObject { id: *id });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn link_player_to_virtual_mic() -> Result<pipewire::channel::Sender<Terminate>> {
|
||||
pub async fn create_virtual_mic() -> Result<PwTerminator> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let manager = get_manager();
|
||||
manager
|
||||
.sender
|
||||
.send(PwCommand::CreateVirtualMic { resp: tx })
|
||||
.map_err(|_| anyhow!("Failed to send CreateVirtualMic to manager"))?;
|
||||
|
||||
let res = rx
|
||||
.await
|
||||
.map_err(|e| anyhow!("Failed to receive response: {}", e))?;
|
||||
|
||||
let id = res.map_err(|e| anyhow!(e))?;
|
||||
Ok(PwTerminator { ids: vec![id] })
|
||||
}
|
||||
|
||||
pub async fn link_player_to_virtual_mic() -> Result<PwTerminator> {
|
||||
let pwsp_daemon_output = match get_device("pwsp-daemon").await {
|
||||
Ok(device) => device,
|
||||
Err(_) => {
|
||||
@@ -303,81 +374,34 @@ pub async fn link_player_to_virtual_mic() -> Result<pipewire::channel::Sender<Te
|
||||
None => return Err(anyhow!("Failed to get pwsp-virtual-mic input_fr")),
|
||||
};
|
||||
|
||||
create_link(output_fl, output_fr, input_fl, input_fr)
|
||||
create_link(output_fl, output_fr, input_fl, input_fr).await
|
||||
}
|
||||
|
||||
pub fn create_link(
|
||||
pub async fn create_link(
|
||||
output_fl: Port,
|
||||
output_fr: Port,
|
||||
input_fl: Port,
|
||||
input_fr: Port,
|
||||
) -> Result<pipewire::channel::Sender<Terminate>> {
|
||||
let (pw_sender, pw_receiver) = pipewire::channel::channel::<Terminate>();
|
||||
let (init_sender, init_receiver) = std::sync::mpsc::sync_channel(0);
|
||||
) -> Result<PwTerminator> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let manager = get_manager();
|
||||
manager
|
||||
.sender
|
||||
.send(PwCommand::CreateLink {
|
||||
output_fl,
|
||||
output_fr,
|
||||
input_fl,
|
||||
input_fr,
|
||||
resp: tx,
|
||||
})
|
||||
.map_err(|_| anyhow!("Failed to send CreateLink to manager"))?;
|
||||
|
||||
let _pw_thread = thread::spawn(move || {
|
||||
let (main_loop, context) = match setup_pipewire_context() {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
let _ = init_sender.send(Err(e));
|
||||
return;
|
||||
}
|
||||
};
|
||||
let core = match context.connect(None) {
|
||||
Ok(core) => core,
|
||||
Err(e) => {
|
||||
let _ =
|
||||
init_sender.send(Err(format!("Failed to connect to pipewire context: {}", e)));
|
||||
return;
|
||||
}
|
||||
};
|
||||
let res = rx
|
||||
.await
|
||||
.map_err(|e| anyhow!("Failed to receive response: {}", e))?;
|
||||
|
||||
let props_fl = properties! {
|
||||
"link.output.node" => format!("{}", output_fl.node_id).as_str(),
|
||||
"link.output.port" => format!("{}", output_fl.port_id).as_str(),
|
||||
"link.input.node" => format!("{}", input_fl.node_id).as_str(),
|
||||
"link.input.port" => format!("{}", input_fl.port_id).as_str(),
|
||||
};
|
||||
let props_fr = properties! {
|
||||
"link.output.node" => format!("{}", output_fr.node_id).as_str(),
|
||||
"link.output.port" => format!("{}", output_fr.port_id).as_str(),
|
||||
"link.input.node" => format!("{}", input_fr.node_id).as_str(),
|
||||
"link.input.port" => format!("{}", input_fr.port_id).as_str(),
|
||||
};
|
||||
|
||||
let _link_fl = match core.create_object::<Link>("link-factory", &props_fl) {
|
||||
Ok(link) => link,
|
||||
Err(e) => {
|
||||
let _ = init_sender.send(Err(format!("Failed to create link FL: {}", e)));
|
||||
return;
|
||||
}
|
||||
};
|
||||
let _link_fr = match core.create_object::<Link>("link-factory", &props_fr) {
|
||||
Ok(link) => link,
|
||||
Err(e) => {
|
||||
let _ = init_sender.send(Err(format!("Failed to create link FR: {}", e)));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let _receiver = pw_receiver.attach(main_loop.loop_(), {
|
||||
let _main_loop = main_loop.clone();
|
||||
move |_| _main_loop.quit()
|
||||
});
|
||||
|
||||
println!(
|
||||
"Link created: FL: {}-{} FR: {}-{}",
|
||||
output_fl.node_id, input_fl.node_id, output_fr.node_id, input_fr.node_id
|
||||
);
|
||||
if init_sender.send(Ok(())).is_err() {
|
||||
return;
|
||||
}
|
||||
main_loop.run();
|
||||
});
|
||||
|
||||
if let Err(e) = init_receiver.recv()? {
|
||||
return Err(anyhow!(e));
|
||||
}
|
||||
|
||||
Ok(pw_sender)
|
||||
let (id_fl, id_fr) = res.map_err(|e| anyhow!(e))?;
|
||||
Ok(PwTerminator {
|
||||
ids: vec![id_fl, id_fr],
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user