refactor(daemon): Refactor commands_loop to use handle_connection (#126)

- Extracted the main token processing loop body in `commands_loop` into `handle_connection` to resolve deep nesting and improve code readability.
- Improved request reading logic by using `(&mut stream).take(request_len as u64).read_to_end(&mut buffer)` to strictly bound allocation to `request_len` and prevent initialization overhead.
- Passed `cargo fmt` and `cargo clippy`.

Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
This commit is contained in:
Tarasov Aleksandr
2026-06-01 22:54:27 +03:00
committed by GitHub
parent 0f8abbc443
commit 105be87222
+74 -65
View File
@@ -15,7 +15,7 @@ use std::os::unix::fs::PermissionsExt;
use std::{fs, time::Duration}; use std::{fs, time::Duration};
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
net::UnixListener, net::{UnixListener, UnixStream},
time::sleep, time::sleep,
}; };
@@ -83,83 +83,92 @@ async fn main() -> Result<()> {
async fn commands_loop(listener: UnixListener) -> Result<()> { async fn commands_loop(listener: UnixListener) -> Result<()> {
loop { loop {
let (mut stream, _addr) = listener.accept().await?; let (stream, _addr) = listener.accept().await?;
tokio::spawn(async move { tokio::spawn(async move {
// ---------- Read request (start) ---------- handle_connection(stream).await;
let mut len_bytes = [0u8; 4]; });
if stream.read_exact(&mut len_bytes).await.is_err() { }
eprintln!("Failed to read message length from client!"); }
return;
}
let request_len = u32::from_le_bytes(len_bytes) as usize; async fn handle_connection(mut stream: UnixStream) {
// ---------- Read request (start) ----------
let mut len_bytes = [0u8; 4];
if stream.read_exact(&mut len_bytes).await.is_err() {
eprintln!("Failed to read message length from client!");
return;
}
if request_len > MAX_MESSAGE_SIZE { let request_len = u32::from_le_bytes(len_bytes) as usize;
eprintln!(
"Failed to read message from client: request too large ({} bytes)!",
request_len
);
return;
}
let mut buffer = vec![0u8; request_len]; if request_len > MAX_MESSAGE_SIZE {
if stream.read_exact(&mut buffer).await.is_err() { eprintln!(
eprintln!("Failed to read message from client!"); "Failed to read message from client: request too large ({} bytes)!",
return; request_len
} );
return;
}
let request: Request = match serde_json::from_slice(&buffer) { let mut buffer = Vec::new();
Ok(req) => req, if (&mut stream)
Err(err) => { .take(request_len as u64)
let response = .read_to_end(&mut buffer)
Response::new(false, format!("Failed to parse request: {}", err)); .await
let response_data = match serde_json::to_vec(&response) { .is_err()
Ok(data) => data, || buffer.len() != request_len
Err(_) => return, // Should not happen with this simple Response {
}; eprintln!("Failed to read message from client!");
let response_len = response_data.len() as u32; return;
let _ = stream.write_all(&response_len.to_le_bytes()).await; }
let _ = stream.write_all(&response_data).await;
return;
}
};
// ---------- Read request (end) ----------
// ---------- Generate response (start) ---------- let request: Request = match serde_json::from_slice(&buffer) {
let command = parse_command(&request); Ok(req) => req,
let response: Response; Err(err) => {
if let Some(command) = command { let response = Response::new(false, format!("Failed to parse request: {}", err));
response = command.execute().await;
} else {
response = Response::new(false, "Unknown command");
}
// ---------- Generate response (end) ----------
// ---------- Send response (start) ----------
let response_data = match serde_json::to_vec(&response) { let response_data = match serde_json::to_vec(&response) {
Ok(data) => data, Ok(data) => data,
Err(err) => { Err(_) => return, // Should not happen with this simple Response
eprintln!("Failed to serialize response: {}", err);
return;
}
}; };
let response_len = response_data.len() as u32; let response_len = response_data.len() as u32;
let _ = stream.write_all(&response_len.to_le_bytes()).await;
let _ = stream.write_all(&response_data).await;
return;
}
};
// ---------- Read request (end) ----------
if stream.write_all(&response_len.to_le_bytes()).await.is_err() { // ---------- Generate response (start) ----------
eprintln!("Failed to write response length to client!"); let command = parse_command(&request);
return; let response: Response;
} if let Some(command) = command {
if stream.write_all(&response_data).await.is_err() { response = command.execute().await;
eprintln!("Failed to write response to client!"); } else {
return; response = Response::new(false, "Unknown command");
} }
// ---------- Send response (end) ---------- // ---------- Generate response (end) ----------
if response.status && response.message.eq("killed") { // ---------- Send response (start) ----------
std::process::exit(0); let response_data = match serde_json::to_vec(&response) {
} Ok(data) => data,
}); Err(err) => {
eprintln!("Failed to serialize response: {}", err);
return;
}
};
let response_len = response_data.len() as u32;
if stream.write_all(&response_len.to_le_bytes()).await.is_err() {
eprintln!("Failed to write response length to client!");
return;
}
if stream.write_all(&response_data).await.is_err() {
eprintln!("Failed to write response to client!");
return;
}
// ---------- Send response (end) ----------
if response.status && response.message.eq("killed") {
std::process::exit(0);
} }
} }