use crate::types::pipewire::{AudioDevice, DeviceType, Port, Terminate}; use pipewire::{ context::ContextRc, link::Link, main_loop::MainLoopRc, properties::properties, registry::GlobalObject, spa::utils::dict::DictRef, }; use std::{collections::HashMap, error::Error, thread}; use tokio::{ sync::mpsc, time::{Duration, timeout}, }; pub fn setup_pipewire_context() -> Result<(MainLoopRc, ContextRc), String> { pipewire::init(); let main_loop = MainLoopRc::new(None).map_err(|e| e.to_string())?; let context = ContextRc::new(&main_loop, None).map_err(|e| e.to_string())?; Ok((main_loop, context)) } fn parse_global_object( global_object: &GlobalObject<&DictRef>, ) -> (Option, Option) { // Only objects with props can be devices/ports if let Some(props) = global_object.props { // Only objects with media.class can be devices if let Some(media_class) = props.get("media.class") { let node_id = global_object.id; let node_nick = props.get("node.nick"); let node_name = props.get("node.name"); let node_description = props.get("node.description"); // Check if the device is an input or output return if media_class.starts_with("Audio/Source") { let input_device = AudioDevice { id: node_id, nick: node_nick .unwrap_or(node_description.unwrap_or(node_name.unwrap_or_default())) .to_string(), name: node_name.unwrap_or_default().to_string(), device_type: DeviceType::Input, input_fl: None, input_fr: None, output_fl: None, output_fr: None, }; (Some(input_device), None) } else if media_class.starts_with("Stream/Output/Audio") { let output_device = AudioDevice { id: node_id, nick: node_nick .unwrap_or(node_description.unwrap_or(node_name.unwrap_or_default())) .to_string(), name: node_name.unwrap_or_default().to_string(), device_type: DeviceType::Output, input_fl: None, input_fr: None, output_fl: None, output_fr: None, }; (Some(output_device), None) } else { (None, None) }; // Check if the object is a port } else if props.get("port.direction").is_some() && let (Some(node_id), Some(port_id), Some(port_name)) = ( props.get("node.id").and_then(|id| id.parse::().ok()), props.get("port.id").and_then(|id| id.parse::().ok()), props.get("port.name"), ) { let port = Port { node_id, port_id, name: port_name.to_string(), }; return (None, Some(port)); } } (None, None) } async fn pw_get_global_objects_thread( main_sender: mpsc::Sender<(Option, Option)>, pw_receiver: pipewire::channel::Receiver, init_sender: std::sync::mpsc::SyncSender>, ) { let (main_loop, context) = match setup_pipewire_context() { Ok(res) => res, Err(e) => { let _ = init_sender.send(Err(e)); return; } }; // Stop main loop on Terminate message let _receiver = pw_receiver.attach(main_loop.loop_(), { let _main_loop = main_loop.clone(); move |_| _main_loop.quit() }); let core = match context.connect(None) { Ok(core) => core, Err(e) => { let _ = init_sender.send(Err(format!("Failed to connect to pipewire context: {}", e))); return; } }; let registry = match core.get_registry() { Ok(registry) => registry, Err(e) => { let _ = init_sender.send(Err(format!( "Failed to get registry from pipewire context: {}", e ))); return; } }; let _listener = registry .add_listener_local() .global(move |global| { // Try to parse every global object pipewire finds let (device, port) = parse_global_object(global); // Send message to the main thread let sender_clone = main_sender.clone(); tokio::task::spawn(async move { sender_clone.send((device, port)).await.ok(); }); }) .register(); // Signal successful initialization if init_sender.send(Ok(())).is_err() { return; } main_loop.run(); } pub async fn get_all_devices() -> Result<(Vec, Vec), Box> { // Channels to communicate with pipewire thread let (main_sender, mut main_receiver) = mpsc::channel(10); let (pw_sender, pw_receiver) = pipewire::channel::channel(); let (init_sender, init_receiver) = std::sync::mpsc::sync_channel(0); // Spawn pipewire thread in background let _pw_thread = tokio::spawn(async move { pw_get_global_objects_thread(main_sender, pw_receiver, init_sender).await }); // Wait for initialization to complete if let Err(e) = init_receiver.recv()? { return Err(e.into()); } let mut input_devices: HashMap = HashMap::new(); let mut output_devices: HashMap = HashMap::new(); let mut ports: Vec = vec![]; loop { // If we don't receive a message in 100ms, we can assume that pipewire thread is finished match timeout(Duration::from_millis(100), main_receiver.recv()).await { Ok(Some((device, port))) => { if let Some(device) = device { match device.device_type { DeviceType::Input => { input_devices.insert(device.id, device); } DeviceType::Output => { output_devices.insert(device.id, device); } } } else if let Some(port) = port { ports.push(port); } } Ok(None) | Err(_) => { // Pipewire thread is finished and we can collect our devices let _ = pw_sender.send(Terminate {}); for port in ports { let node_id = port.node_id; if let Some(input_device) = input_devices.get_mut(&node_id) { match port.name.as_str() { "input_FL" => input_device.input_fl = Some(port), "input_FR" => input_device.input_fr = Some(port), "output_FL" => input_device.output_fl = Some(port), "output_FR" => input_device.output_fr = Some(port), "capture_FL" => input_device.output_fl = Some(port), "capture_FR" => input_device.output_fr = Some(port), "input_MONO" => { input_device.input_fl = Some(port.clone()); input_device.input_fr = Some(port) } "capture_MONO" => { input_device.output_fl = Some(port.clone()); input_device.output_fr = Some(port); } _ => {} } } else if let Some(output_device) = output_devices.get_mut(&node_id) { match port.name.as_str() { "input_FL" => output_device.input_fl = Some(port), "input_FR" => output_device.input_fr = Some(port), "output_FL" => output_device.output_fl = Some(port), "output_FR" => output_device.output_fr = Some(port), "capture_FL" => output_device.output_fl = Some(port), "capture_FR" => output_device.output_fr = Some(port), "output_MONO" => { output_device.output_fl = Some(port.clone()); output_device.output_fr = Some(port) } "capture_MONO" => { output_device.output_fl = Some(port.clone()); output_device.output_fr = Some(port) } _ => {} } } } let mut input_devices: Vec = input_devices.values().cloned().collect(); let mut output_devices: Vec = output_devices.values().cloned().collect(); input_devices.sort_by(|a, b| a.id.cmp(&b.id)); output_devices.sort_by(|a, b| a.id.cmp(&b.id)); return Ok((input_devices, output_devices)); } } } } pub async fn get_device(device_name: &str) -> Result> { let (input_devices, output_devices) = get_all_devices().await?; input_devices .into_iter() .chain(output_devices) .find(|device| { device.name == device_name || device.nick == device_name || device.name.contains(device_name) || device.nick.contains(device_name) }) .ok_or_else(|| "Device not found".into()) } pub fn create_virtual_mic() -> Result, Box> { let (pw_sender, pw_receiver) = pipewire::channel::channel::(); let (init_sender, init_receiver) = std::sync::mpsc::sync_channel(0); let _pw_thread = thread::spawn(move || { let (main_loop, context) = match setup_pipewire_context() { Ok(res) => res, Err(e) => { let _ = init_sender.send(Err(e)); return; } }; let core = match context.connect(None) { Ok(core) => core, Err(e) => { let _ = init_sender.send(Err(format!("Failed to connect to pipewire context: {}", e))); return; } }; let props = properties!( "factory.name" => "support.null-audio-sink", "node.name" => "pwsp-virtual-mic", "node.description" => "PWSP Virtual Mic", "media.class" => "Audio/Source/Virtual", "audio.position" => "[ FL FR ]", "audio.channels" => "2", "object.linger" => "false", // Destroy the node on app exit ); let _node = match core.create_object::("adapter", &props) { Ok(node) => node, Err(e) => { let _ = init_sender.send(Err(format!("Failed to create virtual mic: {}", e))); return; } }; let _receiver = pw_receiver.attach(main_loop.loop_(), { let _main_loop = main_loop.clone(); move |_| _main_loop.quit() }); println!("Virtual mic created"); if init_sender.send(Ok(())).is_err() { return; } main_loop.run(); }); if let Err(e) = init_receiver.recv()? { return Err(e.into()); } Ok(pw_sender) } pub async fn link_player_to_virtual_mic() -> Result, Box> { let pwsp_daemon_output = match get_device("pwsp-daemon").await { Ok(device) => device, Err(_) => { return Err( "Could not find alsa_playback.pwsp-daemon device, skipping device linking".into(), ); } }; let pwsp_daemon_input = match get_device("pwsp-virtual-mic").await { Ok(device) => device, Err(_) => { return Err("Could not find pwsp-virtual-mic device, skipping device linking".into()); } }; let output_fl = match pwsp_daemon_output.output_fl { Some(port) => port, None => return Err("Failed to get pwsp-daemon output_fl".into()), }; let output_fr = match pwsp_daemon_output.output_fr { Some(port) => port, None => return Err("Failed to get pwsp-daemon output_fr".into()), }; let input_fl = match pwsp_daemon_input.input_fl { Some(port) => port, None => return Err("Failed to get pwsp-virtual-mic input_fl".into()), }; let input_fr = match pwsp_daemon_input.input_fr { Some(port) => port, None => return Err("Failed to get pwsp-virtual-mic input_fr".into()), }; create_link(output_fl, output_fr, input_fl, input_fr) } pub fn create_link( output_fl: Port, output_fr: Port, input_fl: Port, input_fr: Port, ) -> Result, Box> { let (pw_sender, pw_receiver) = pipewire::channel::channel::(); let (init_sender, init_receiver) = std::sync::mpsc::sync_channel(0); let _pw_thread = thread::spawn(move || { let (main_loop, context) = match setup_pipewire_context() { Ok(res) => res, Err(e) => { let _ = init_sender.send(Err(e)); return; } }; let core = match context.connect(None) { Ok(core) => core, Err(e) => { let _ = init_sender.send(Err(format!("Failed to connect to pipewire context: {}", e))); return; } }; let props_fl = properties! { "link.output.node" => format!("{}", output_fl.node_id).as_str(), "link.output.port" => format!("{}", output_fl.port_id).as_str(), "link.input.node" => format!("{}", input_fl.node_id).as_str(), "link.input.port" => format!("{}", input_fl.port_id).as_str(), }; let props_fr = properties! { "link.output.node" => format!("{}", output_fr.node_id).as_str(), "link.output.port" => format!("{}", output_fr.port_id).as_str(), "link.input.node" => format!("{}", input_fr.node_id).as_str(), "link.input.port" => format!("{}", input_fr.port_id).as_str(), }; let _link_fl = match core.create_object::("link-factory", &props_fl) { Ok(link) => link, Err(e) => { let _ = init_sender.send(Err(format!("Failed to create link FL: {}", e))); return; } }; let _link_fr = match core.create_object::("link-factory", &props_fr) { Ok(link) => link, Err(e) => { let _ = init_sender.send(Err(format!("Failed to create link FR: {}", e))); return; } }; let _receiver = pw_receiver.attach(main_loop.loop_(), { let _main_loop = main_loop.clone(); move |_| _main_loop.quit() }); println!( "Link created: FL: {}-{} FR: {}-{}", output_fl.node_id, input_fl.node_id, output_fr.node_id, input_fr.node_id ); if init_sender.send(Ok(())).is_err() { return; } main_loop.run(); }); if let Err(e) = init_receiver.recv()? { return Err(e.into()); } Ok(pw_sender) }