diff --git a/Cargo.lock b/Cargo.lock index 616a2e2..0f65a02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -145,6 +145,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "cipher" version = "0.4.4" @@ -155,6 +161,16 @@ dependencies = [ "inout", ] +[[package]] +name = "command-group" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a68fa787550392a9d58f44c21a3022cfb3ea3e2458b7f85d3b399d0ceeccf409" +dependencies = [ + "nix 0.27.1", + "winapi", +] + [[package]] name = "convert_case" version = "0.7.1" @@ -596,6 +612,29 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "nix" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" +dependencies = [ + "bitflags", + "cfg-if", + "libc", +] + +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "object" version = "0.36.7" @@ -776,7 +815,10 @@ dependencies = [ name = "roc_command" version = "0.0.1" dependencies = [ + "command-group", "lazy_static", + "libc", + "nix 0.29.0", "roc_io_error", "roc_std", ] diff --git a/crates/roc_command/Cargo.toml b/crates/roc_command/Cargo.toml index 86b0144..acb6576 100644 --- a/crates/roc_command/Cargo.toml +++ b/crates/roc_command/Cargo.toml @@ -11,3 +11,10 @@ version.workspace = true roc_std.workspace = true roc_io_error.workspace = true lazy_static = "1.4" + +# For spawn_grouped! functionality +command-group = "5" + +[target.'cfg(unix)'.dependencies] +libc = "0.2" +nix = { version = "0.29", features = ["process", "signal"] } diff --git a/crates/roc_command/src/lib.rs b/crates/roc_command/src/lib.rs index 13f3814..b7ecab4 100644 --- a/crates/roc_command/src/lib.rs +++ b/crates/roc_command/src/lib.rs @@ -1,24 +1,223 @@ //! This crate provides common functionality for Roc to interface with `std::process::Command` -use roc_std::{roc_refcounted_noop_impl, RocList, RocRefcounted, RocResult, RocStr}; +use command_group::{CommandGroup, GroupChild}; +use roc_std::{RocList, RocRefcounted, RocResult, RocStr}; use std::collections::HashMap; use std::io::{Read, Write}; use std::process::{Child, ChildStdin, ChildStdout, Stdio}; -use std::sync::Mutex; +use std::sync::{Mutex, MutexGuard}; +use std::thread; -// Global storage for spawned processes -lazy_static::lazy_static! { - static ref PROCESSES: Mutex> = Mutex::new(HashMap::new()); - static ref NEXT_PROCESS_ID: Mutex = Mutex::new(1); +/// Lock a mutex, recovering if it was poisoned by a panic in another thread. +fn lock_or_recover(mutex: &Mutex) -> MutexGuard<'_, T> { + mutex.lock().unwrap_or_else(|poisoned| poisoned.into_inner()) +} + +/// Read stdout and stderr concurrently to avoid deadlock when both pipes have large data. +/// +/// Without concurrent reading, if both pipes fill their buffers (~64KB each), the child +/// blocks writing and the parent blocks reading, causing deadlock. +/// +/// Returns individual results for each pipe so callers can handle partial success. +fn read_pipes_concurrently( + stdout: Option, + stderr: Option, +) -> (std::io::Result>, std::io::Result>) { + let stdout_handle = stdout.map(|mut pipe| { + thread::spawn(move || { + let mut bytes = Vec::new(); + pipe.read_to_end(&mut bytes).map(|_| bytes) + }) + }); + + let stderr_handle = stderr.map(|mut pipe| { + thread::spawn(move || { + let mut bytes = Vec::new(); + pipe.read_to_end(&mut bytes).map(|_| bytes) + }) + }); + + let stdout_result = stdout_handle + .map(|h| h.join().unwrap_or_else(|_| Ok(Vec::new()))) + .unwrap_or_else(|| Ok(Vec::new())); + let stderr_result = stderr_handle + .map(|h| h.join().unwrap_or_else(|_| Ok(Vec::new()))) + .unwrap_or_else(|| Ok(Vec::new())); + + (stdout_result, stderr_result) +} + +// ============================================================================= +// Trait and helpers to reduce duplication between spawn! and spawn_grouped! +// ============================================================================= + +/// Trait abstracting over Child and GroupChild for common operations. +trait ChildLike { + fn kill(&mut self) -> std::io::Result<()>; + fn wait(&mut self) -> std::io::Result; + fn try_wait(&mut self) -> std::io::Result>; } -struct SpawnedProcess { - child: Child, +impl ChildLike for Child { + fn kill(&mut self) -> std::io::Result<()> { Child::kill(self) } + fn wait(&mut self) -> std::io::Result { Child::wait(self) } + fn try_wait(&mut self) -> std::io::Result> { Child::try_wait(self) } +} + +impl ChildLike for GroupChild { + fn kill(&mut self) -> std::io::Result<()> { GroupChild::kill(self) } + fn wait(&mut self) -> std::io::Result { GroupChild::wait(self) } + fn try_wait(&mut self) -> std::io::Result> { GroupChild::try_wait(self) } +} + +/// Generic process struct used by both spawn! and spawn_grouped! +struct Process { + child: C, stdin: Option, stdout: Option, stderr: Option, } +/// Write bytes to a process's stdin +fn write_to_stdin(stdin: &mut Option, bytes: &[u8]) -> RocResult<(), roc_io_error::IOErr> { + match stdin { + Some(ref mut handle) => { + match handle.write_all(bytes) { + Ok(()) => match handle.flush() { + Ok(()) => RocResult::ok(()), + Err(err) => RocResult::err(err.into()), + }, + Err(err) => RocResult::err(err.into()), + } + } + None => RocResult::err(roc_io_error::IOErr { + tag: roc_io_error::IOErrTag::Other, + msg: "Process stdin not available".into(), + }), + } +} + +/// Read exactly n bytes from stdout +fn read_from_stdout(stdout: &mut Option, num_bytes: u64) -> RocResult, roc_io_error::IOErr> { + match stdout { + Some(ref mut handle) => { + let mut buffer = vec![0u8; num_bytes as usize]; + match handle.read_exact(&mut buffer) { + Ok(()) => RocResult::ok(RocList::from(&buffer[..])), + Err(err) => RocResult::err(err.into()), + } + } + None => RocResult::err(roc_io_error::IOErr { + tag: roc_io_error::IOErrTag::Other, + msg: "Process stdout not available".into(), + }), + } +} + +/// Read exactly n bytes from stderr +fn read_from_stderr(stderr: &mut Option, num_bytes: u64) -> RocResult, roc_io_error::IOErr> { + match stderr { + Some(ref mut handle) => { + let mut buffer = vec![0u8; num_bytes as usize]; + match handle.read_exact(&mut buffer) { + Ok(()) => RocResult::ok(RocList::from(&buffer[..])), + Err(err) => RocResult::err(err.into()), + } + } + None => RocResult::err(roc_io_error::IOErr { + tag: roc_io_error::IOErrTag::Other, + msg: "Process stderr not available".into(), + }), + } +} + +/// Kill a process and reap it +fn kill_process(process: &mut Process) -> RocResult<(), roc_io_error::IOErr> { + match process.child.kill() { + Ok(()) => { + let _ = process.child.wait(); + RocResult::ok(()) + } + Err(err) => RocResult::err(err.into()), + } +} + +/// Wait for a process to exit and return its output +fn wait_for_process(process: &mut Process) -> RocResult { + let (stdout_result, stderr_result) = read_pipes_concurrently( + process.stdout.take(), + process.stderr.take(), + ); + + // Propagate pipe read errors + let stdout_bytes = match stdout_result { + Ok(bytes) => bytes, + Err(err) => return RocResult::err(err.into()), + }; + let stderr_bytes = match stderr_result { + Ok(bytes) => bytes, + Err(err) => return RocResult::err(err.into()), + }; + + match process.child.wait() { + Ok(status) => { + let exit_code = status.code().unwrap_or(-1); + RocResult::ok(ProcessOutput { + stdout_bytes: RocList::from(&stdout_bytes[..]), + stderr_bytes: RocList::from(&stderr_bytes[..]), + exit_code, + }) + } + Err(err) => RocResult::err(err.into()), + } +} + +/// Poll a process for exit status +fn poll_process(process: &mut Process) -> Result, std::io::Error> { + match process.child.try_wait()? { + Some(status) => { + let (stdout_result, stderr_result) = read_pipes_concurrently( + process.stdout.take(), + process.stderr.take(), + ); + let stdout_bytes = stdout_result?; + let stderr_bytes = stderr_result?; + let exit_code = status.code().unwrap_or(-1); + Ok(Some(ProcessOutput { + stdout_bytes: RocList::from(&stdout_bytes[..]), + stderr_bytes: RocList::from(&stderr_bytes[..]), + exit_code, + })) + } + None => Ok(None), + } +} + +fn not_found_error(msg: &str) -> roc_io_error::IOErr { + roc_io_error::IOErr { + tag: roc_io_error::IOErrTag::NotFound, + msg: msg.into(), + } +} + +// Type aliases for clarity +type SpawnedProcess = Process; +type GroupedProcess = Process; + +// Global storage for spawned processes (regular spawn!) +lazy_static::lazy_static! { + static ref PROCESSES: Mutex> = Mutex::new(HashMap::new()); + static ref NEXT_PROCESS_ID: Mutex = Mutex::new(1); +} + +// Global storage for grouped processes (spawn_grouped! - cleaned up on exit) +// Uses process groups on Unix and Job Objects on Windows to ensure the +// entire process tree is killed when the parent dies. +lazy_static::lazy_static! { + static ref GROUPED_PROCESSES: Mutex> = Mutex::new(HashMap::new()); + static ref NEXT_GROUPED_ID: Mutex = Mutex::new(1); +} + #[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash)] #[repr(C)] pub struct Command { @@ -108,12 +307,10 @@ impl roc_std::RocRefcounted for OutputFromHostSuccess { impl roc_std::RocRefcounted for OutputFromHostFailure { fn inc(&mut self) { - self.exit_code.inc(); self.stdout_bytes.inc(); self.stderr_bytes.inc(); } fn dec(&mut self) { - self.exit_code.dec(); self.stdout_bytes.dec(); self.stderr_bytes.dec(); } @@ -184,23 +381,21 @@ pub fn command_spawn_with_pipes(roc_cmd: &Command) -> RocResult { - let stdin = child.stdin.take(); - let stdout = child.stdout.take(); - let stderr = child.stderr.take(); - let process_id = { - let mut next_id = NEXT_PROCESS_ID.lock().unwrap(); + let mut next_id = lock_or_recover(&NEXT_PROCESS_ID); let id = *next_id; *next_id += 1; id }; + let process = Process { + stdin: child.stdin.take(), + stdout: child.stdout.take(), + stderr: child.stderr.take(), + child, + }; - { - let mut processes = PROCESSES.lock().unwrap(); - processes.insert(process_id, SpawnedProcess { child, stdin, stdout, stderr }); - } - + lock_or_recover(&PROCESSES).insert(process_id, process); RocResult::ok(process_id) } Err(err) => RocResult::err(err.into()), @@ -209,124 +404,49 @@ pub fn command_spawn_with_pipes(roc_cmd: &Command) -> RocResult) -> RocResult<(), roc_io_error::IOErr> { - let mut processes = PROCESSES.lock().unwrap(); - + let mut processes = lock_or_recover(&PROCESSES); match processes.get_mut(&process_id) { - Some(process) => { - match &mut process.stdin { - Some(stdin) => { - match stdin.write_all(bytes.as_slice()) { - Ok(()) => { - match stdin.flush() { - Ok(()) => RocResult::ok(()), - Err(err) => RocResult::err(err.into()), - } - } - Err(err) => RocResult::err(err.into()), - } - } - None => RocResult::err(roc_io_error::IOErr { - tag: roc_io_error::IOErrTag::Other, - msg: "Process stdin not available".into(), - }), - } - } - None => RocResult::err(roc_io_error::IOErr { - tag: roc_io_error::IOErrTag::NotFound, - msg: "Process not found".into(), - }), + Some(process) => write_to_stdin(&mut process.stdin, bytes.as_slice()), + None => RocResult::err(not_found_error("Process not found")), } } /// Read exactly n bytes from a spawned process's stdout pub fn process_read_bytes(process_id: u64, num_bytes: u64) -> RocResult, roc_io_error::IOErr> { - let mut processes = PROCESSES.lock().unwrap(); - + let mut processes = lock_or_recover(&PROCESSES); match processes.get_mut(&process_id) { - Some(process) => { - match &mut process.stdout { - Some(stdout) => { - let mut buffer = vec![0u8; num_bytes as usize]; - match stdout.read_exact(&mut buffer) { - Ok(()) => RocResult::ok(RocList::from(&buffer[..])), - Err(err) => RocResult::err(err.into()), - } - } - None => RocResult::err(roc_io_error::IOErr { - tag: roc_io_error::IOErrTag::Other, - msg: "Process stdout not available".into(), - }), - } - } - None => RocResult::err(roc_io_error::IOErr { - tag: roc_io_error::IOErrTag::NotFound, - msg: "Process not found".into(), - }), + Some(process) => read_from_stdout(&mut process.stdout, num_bytes), + None => RocResult::err(not_found_error("Process not found")), } } /// Read exactly n bytes from a spawned process's stderr pub fn process_read_stderr_bytes(process_id: u64, num_bytes: u64) -> RocResult, roc_io_error::IOErr> { - let mut processes = PROCESSES.lock().unwrap(); - + let mut processes = lock_or_recover(&PROCESSES); match processes.get_mut(&process_id) { - Some(process) => { - match &mut process.stderr { - Some(stderr) => { - let mut buffer = vec![0u8; num_bytes as usize]; - match stderr.read_exact(&mut buffer) { - Ok(()) => RocResult::ok(RocList::from(&buffer[..])), - Err(err) => RocResult::err(err.into()), - } - } - None => RocResult::err(roc_io_error::IOErr { - tag: roc_io_error::IOErrTag::Other, - msg: "Process stderr not available".into(), - }), - } - } - None => RocResult::err(roc_io_error::IOErr { - tag: roc_io_error::IOErrTag::NotFound, - msg: "Process not found".into(), - }), + Some(process) => read_from_stderr(&mut process.stderr, num_bytes), + None => RocResult::err(not_found_error("Process not found")), } } /// Close a spawned process's stdin (sends EOF to child) pub fn process_close_stdin(process_id: u64) -> RocResult<(), roc_io_error::IOErr> { - let mut processes = PROCESSES.lock().unwrap(); - + let mut processes = lock_or_recover(&PROCESSES); match processes.get_mut(&process_id) { Some(process) => { - process.stdin = None; // Drop the handle, sends EOF to child + process.stdin = None; RocResult::ok(()) } - None => RocResult::err(roc_io_error::IOErr { - tag: roc_io_error::IOErrTag::NotFound, - msg: "Process not found".into(), - }), + None => RocResult::err(not_found_error("Process not found")), } } /// Kill a spawned process pub fn process_kill(process_id: u64) -> RocResult<(), roc_io_error::IOErr> { - let mut processes = PROCESSES.lock().unwrap(); - + let mut processes = lock_or_recover(&PROCESSES); match processes.remove(&process_id) { - Some(mut process) => { - match process.child.kill() { - Ok(()) => { - // Wait to reap the process - let _ = process.child.wait(); - RocResult::ok(()) - } - Err(err) => RocResult::err(err.into()), - } - } - None => RocResult::err(roc_io_error::IOErr { - tag: roc_io_error::IOErrTag::NotFound, - msg: "Process not found".into(), - }), + Some(mut process) => kill_process(&mut process), + None => RocResult::err(not_found_error("Process not found")), } } @@ -354,37 +474,10 @@ impl roc_std::RocRefcounted for ProcessOutput { /// Wait for a spawned process to exit and return its output pub fn process_wait(process_id: u64) -> RocResult { - let mut processes = PROCESSES.lock().unwrap(); - + let mut processes = lock_or_recover(&PROCESSES); match processes.remove(&process_id) { - Some(mut process) => { - // Read all stdout and stderr before waiting - let mut stdout_bytes = Vec::new(); - let mut stderr_bytes = Vec::new(); - - if let Some(mut stdout) = process.stdout.take() { - let _ = stdout.read_to_end(&mut stdout_bytes); - } - if let Some(mut stderr) = process.stderr.take() { - let _ = stderr.read_to_end(&mut stderr_bytes); - } - - match process.child.wait() { - Ok(status) => { - let exit_code = status.code().unwrap_or(-1); - RocResult::ok(ProcessOutput { - stdout_bytes: RocList::from(&stdout_bytes[..]), - stderr_bytes: RocList::from(&stderr_bytes[..]), - exit_code, - }) - } - Err(err) => RocResult::err(err.into()), - } - } - None => RocResult::err(roc_io_error::IOErr { - tag: roc_io_error::IOErrTag::NotFound, - msg: "Process not found".into(), - }), + Some(mut process) => wait_for_process(&mut process), + None => RocResult::err(not_found_error("Process not found")), } } @@ -515,42 +608,157 @@ const _: () = { /// This matches `process_wait` behavior and is safe because read errors on pipes /// from exited processes are extremely rare (data is kernel-buffered). pub fn process_poll(process_id: u64) -> RocResult { - let mut processes = PROCESSES.lock().unwrap(); + let mut processes = lock_or_recover(&PROCESSES); match processes.get_mut(&process_id) { - Some(process) => { - match process.child.try_wait() { - Ok(Some(status)) => { - // Process has exited - remove it from the map and collect output - let mut process = processes.remove(&process_id).unwrap(); + Some(process) => match poll_process(process) { + Ok(Some(output)) => { + processes.remove(&process_id); + RocResult::ok(PollResult::Exited(output.exit_code, output.stdout_bytes, output.stderr_bytes)) + } + Ok(None) => RocResult::ok(PollResult::Running()), + Err(err) => RocResult::err(err.into()), + }, + None => RocResult::err(not_found_error("Process not found")), + } +} - let mut stdout_bytes = Vec::new(); - let mut stderr_bytes = Vec::new(); +// ============================================================================= +// spawn_grouped! - Processes that are automatically cleaned up when parent exits +// ============================================================================= - if let Some(mut stdout) = process.stdout.take() { - let _ = stdout.read_to_end(&mut stdout_bytes); - } - if let Some(mut stderr) = process.stderr.take() { - let _ = stderr.read_to_end(&mut stderr_bytes); - } +/// Spawn a process in a managed group that dies with the parent. +/// +/// Platform behavior: +/// - **Linux**: Uses PR_SET_PDEATHSIG so child receives SIGKILL when parent dies. +/// 100% reliable even for SIGKILL of parent. +/// - **macOS**: Uses process groups. Handles normal exit, SIGINT, SIGTERM, crashes. +/// Children may survive if parent is killed with SIGKILL (macOS kernel limitation). +/// - **Windows**: Uses Job Objects (via command-group) to automatically kill +/// all children when the parent exits. 100% reliable. +pub fn command_spawn_grouped(roc_cmd: &Command) -> RocResult { + let mut cmd = std::process::Command::from(roc_cmd); + cmd.stdin(Stdio::piped()); + cmd.stdout(Stdio::piped()); + cmd.stderr(Stdio::piped()); - let exit_code = status.code().unwrap_or(-1); - RocResult::ok(PollResult::Exited( - exit_code, - RocList::from(&stdout_bytes[..]), - RocList::from(&stderr_bytes[..]), - )) - } - Ok(None) => { - // Process is still running - RocResult::ok(PollResult::Running()) + // On Linux, set PR_SET_PDEATHSIG so child dies when parent dies (even SIGKILL) + #[cfg(target_os = "linux")] + { + use std::os::unix::process::CommandExt; + unsafe { + cmd.pre_exec(|| { + let ret = libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL); + if ret == -1 { + return Err(std::io::Error::last_os_error()); } - Err(err) => RocResult::err(err.into()), + Ok(()) + }); + } + } + + match cmd.group_spawn() { + Ok(mut child) => { + let process_id = { + let mut next_id = lock_or_recover(&NEXT_GROUPED_ID); + let id = *next_id; + *next_id += 1; + id + }; + + let process = Process { + stdin: child.inner().stdin.take(), + stdout: child.inner().stdout.take(), + stderr: child.inner().stderr.take(), + child, + }; + + lock_or_recover(&GROUPED_PROCESSES).insert(process_id, process); + RocResult::ok(process_id) + } + Err(err) => RocResult::err(err.into()), + } +} + +/// Kill all grouped processes. Called automatically on exit. +pub fn process_kill_all_grouped() -> RocResult<(), roc_io_error::IOErr> { + let mut processes = lock_or_recover(&GROUPED_PROCESSES); + for (_, mut process) in processes.drain() { + let _ = kill_process(&mut process); + } + RocResult::ok(()) +} + +/// Kill a specific grouped process by ID +pub fn grouped_process_kill(process_id: u64) -> RocResult<(), roc_io_error::IOErr> { + let mut processes = lock_or_recover(&GROUPED_PROCESSES); + match processes.remove(&process_id) { + Some(mut process) => kill_process(&mut process), + None => RocResult::err(not_found_error("Grouped process not found")), + } +} + +/// Poll a grouped process for exit status +pub fn grouped_process_poll(process_id: u64) -> RocResult { + let mut processes = lock_or_recover(&GROUPED_PROCESSES); + + match processes.get_mut(&process_id) { + Some(process) => match poll_process(process) { + Ok(Some(output)) => { + processes.remove(&process_id); + RocResult::ok(PollResult::Exited(output.exit_code, output.stdout_bytes, output.stderr_bytes)) } + Ok(None) => RocResult::ok(PollResult::Running()), + Err(err) => RocResult::err(err.into()), + }, + None => RocResult::err(not_found_error("Grouped process not found")), + } +} + +/// Wait for a grouped process to exit +pub fn grouped_process_wait(process_id: u64) -> RocResult { + let mut processes = lock_or_recover(&GROUPED_PROCESSES); + match processes.remove(&process_id) { + Some(mut process) => wait_for_process(&mut process), + None => RocResult::err(not_found_error("Grouped process not found")), + } +} + +/// Write bytes to a grouped process's stdin +pub fn grouped_process_write_bytes(process_id: u64, bytes: &RocList) -> RocResult<(), roc_io_error::IOErr> { + let mut processes = lock_or_recover(&GROUPED_PROCESSES); + match processes.get_mut(&process_id) { + Some(process) => write_to_stdin(&mut process.stdin, bytes.as_slice()), + None => RocResult::err(not_found_error("Grouped process not found")), + } +} + +/// Read exactly n bytes from a grouped process's stdout +pub fn grouped_process_read_bytes(process_id: u64, num_bytes: u64) -> RocResult, roc_io_error::IOErr> { + let mut processes = lock_or_recover(&GROUPED_PROCESSES); + match processes.get_mut(&process_id) { + Some(process) => read_from_stdout(&mut process.stdout, num_bytes), + None => RocResult::err(not_found_error("Grouped process not found")), + } +} + +/// Read exactly n bytes from a grouped process's stderr +pub fn grouped_process_read_stderr_bytes(process_id: u64, num_bytes: u64) -> RocResult, roc_io_error::IOErr> { + let mut processes = lock_or_recover(&GROUPED_PROCESSES); + match processes.get_mut(&process_id) { + Some(process) => read_from_stderr(&mut process.stderr, num_bytes), + None => RocResult::err(not_found_error("Grouped process not found")), + } +} + +/// Close a grouped process's stdin (sends EOF to child) +pub fn grouped_process_close_stdin(process_id: u64) -> RocResult<(), roc_io_error::IOErr> { + let mut processes = lock_or_recover(&GROUPED_PROCESSES); + match processes.get_mut(&process_id) { + Some(process) => { + process.stdin = None; + RocResult::ok(()) } - None => RocResult::err(roc_io_error::IOErr { - tag: roc_io_error::IOErrTag::NotFound, - msg: "Process not found".into(), - }), + None => RocResult::err(not_found_error("Grouped process not found")), } } diff --git a/crates/roc_host/src/lib.rs b/crates/roc_host/src/lib.rs index 5b10881..9af9bf4 100644 --- a/crates/roc_host/src/lib.rs +++ b/crates/roc_host/src/lib.rs @@ -349,6 +349,15 @@ pub fn init() { roc_fx_process_kill as _, roc_fx_process_wait as _, roc_fx_process_poll as _, + roc_fx_command_spawn_grouped as _, + roc_fx_grouped_process_write_bytes as _, + roc_fx_grouped_process_read_bytes as _, + roc_fx_grouped_process_read_stderr_bytes as _, + roc_fx_grouped_process_close_stdin as _, + roc_fx_grouped_process_kill as _, + roc_fx_grouped_process_wait as _, + roc_fx_grouped_process_poll as _, + roc_fx_process_kill_all_grouped as _, roc_fx_dir_create as _, roc_fx_dir_create_all as _, roc_fx_dir_delete_empty as _, @@ -830,6 +839,72 @@ pub extern "C" fn roc_fx_process_poll( roc_command::process_poll(process_id) } +// === Grouped process functions (spawn_grouped!) === + +#[no_mangle] +pub extern "C" fn roc_fx_command_spawn_grouped( + roc_cmd: &roc_command::Command, +) -> RocResult { + roc_command::command_spawn_grouped(roc_cmd) +} + +#[no_mangle] +pub extern "C" fn roc_fx_grouped_process_write_bytes( + process_id: u64, + bytes: &RocList, +) -> RocResult<(), roc_io_error::IOErr> { + roc_command::grouped_process_write_bytes(process_id, bytes) +} + +#[no_mangle] +pub extern "C" fn roc_fx_grouped_process_read_bytes( + process_id: u64, + num_bytes: u64, +) -> RocResult, roc_io_error::IOErr> { + roc_command::grouped_process_read_bytes(process_id, num_bytes) +} + +#[no_mangle] +pub extern "C" fn roc_fx_grouped_process_read_stderr_bytes( + process_id: u64, + num_bytes: u64, +) -> RocResult, roc_io_error::IOErr> { + roc_command::grouped_process_read_stderr_bytes(process_id, num_bytes) +} + +#[no_mangle] +pub extern "C" fn roc_fx_grouped_process_close_stdin( + process_id: u64, +) -> RocResult<(), roc_io_error::IOErr> { + roc_command::grouped_process_close_stdin(process_id) +} + +#[no_mangle] +pub extern "C" fn roc_fx_grouped_process_kill( + process_id: u64, +) -> RocResult<(), roc_io_error::IOErr> { + roc_command::grouped_process_kill(process_id) +} + +#[no_mangle] +pub extern "C" fn roc_fx_grouped_process_wait( + process_id: u64, +) -> RocResult { + roc_command::grouped_process_wait(process_id) +} + +#[no_mangle] +pub extern "C" fn roc_fx_grouped_process_poll( + process_id: u64, +) -> RocResult { + roc_command::grouped_process_poll(process_id) +} + +#[no_mangle] +pub extern "C" fn roc_fx_process_kill_all_grouped() -> RocResult<(), roc_io_error::IOErr> { + roc_command::process_kill_all_grouped() +} + #[no_mangle] pub extern "C" fn roc_fx_dir_create(roc_path: &RocList) -> RocResult<(), roc_io_error::IOErr> { roc_file::dir_create(roc_path) diff --git a/platform/Cmd.roc b/platform/Cmd.roc index 8d4996d..d31d19c 100644 --- a/platform/Cmd.roc +++ b/platform/Cmd.roc @@ -13,6 +13,8 @@ module [ exec_cmd!, exec_exit_code!, spawn!, + spawn_grouped!, + kill_grouped!, ] import InternalCmd exposing [to_str] @@ -282,7 +284,7 @@ ChildProcess : { ## kill!({})? ## ``` ## -## > Remember to call `kill!` or `wait!` when done to clean up resources. +## Remember to call `kill!` or `wait!` when done to clean up resources. spawn! : Cmd => Result ChildProcess [SpawnFailed IOErr] spawn! = |@Cmd(cmd)| id = Host.command_spawn_with_pipes!(cmd) @@ -324,3 +326,87 @@ spawn! = |@Cmd(cmd)| ) |> Result.map_err(|err| PollFailed(InternalIOErr.handle_err(err))), }) + +## Spawn a child process that gets cleaned up when the parent exits. +## +## Use this for test servers, subprocesses, or anything that shouldn't +## outlive your program. +## +## ## Platform behavior +## +## **Linux and Windows**: Children are guaranteed to die when the parent exits, +## even if the parent is killed with SIGKILL or crashes. Linux uses +## `PR_SET_PDEATHSIG`, Windows uses Job Objects. +## +## **macOS**: Children die on normal exit, Ctrl+C, crashes, and most signals. +## However, if the parent is killed with `kill -9` (SIGKILL), children may +## survive as orphans. This is a macOS kernel limitation - there's no +## equivalent to Linux's `PR_SET_PDEATHSIG`. +## +## ## Example +## +## ``` +## { kill!, wait!, poll!, ... } = +## Cmd.new("./my-test-server") +## |> Cmd.spawn_grouped!()? +## +## kill!({})? # Kills the process tree +## ``` +spawn_grouped! : Cmd => Result ChildProcess [SpawnFailed IOErr] +spawn_grouped! = |@Cmd(cmd)| + id = Host.command_spawn_grouped!(cmd) + |> Result.map_err(|err| SpawnFailed(InternalIOErr.handle_err(err)))? + + Ok({ + write_stdin!: |bytes| + Host.grouped_process_write_bytes!(id, bytes) + |> Result.map_err(|err| WriteFailed(InternalIOErr.handle_err(err))), + + read_stdout!: |num_bytes| + Host.grouped_process_read_bytes!(id, num_bytes) + |> Result.map_err(|err| ReadFailed(InternalIOErr.handle_err(err))), + + read_stderr!: |num_bytes| + Host.grouped_process_read_stderr_bytes!(id, num_bytes) + |> Result.map_err(|err| ReadFailed(InternalIOErr.handle_err(err))), + + close_stdin!: |{}| + Host.grouped_process_close_stdin!(id) + |> Result.map_err(|err| CloseFailed(InternalIOErr.handle_err(err))), + + kill!: |{}| + Host.grouped_process_kill!(id) + |> Result.map_err(|err| KillFailed(InternalIOErr.handle_err(err))), + + wait!: |{}| + Host.grouped_process_wait!(id) + |> Result.map_ok(|{ stdout_bytes, stderr_bytes, exit_code }| { exit_code, stdout: stdout_bytes, stderr: stderr_bytes }) + |> Result.map_err(|err| WaitFailed(InternalIOErr.handle_err(err))), + + poll!: |{}| + Host.grouped_process_poll!(id) + |> Result.map_ok( + |result| + when result is + Running -> Running + Exited({ stderr_bytes, stdout_bytes, exit_code }) -> Exited({ exit_code, stdout: stdout_bytes, stderr: stderr_bytes }) + ) + |> Result.map_err(|err| PollFailed(InternalIOErr.handle_err(err))), + }) + +## Kill all processes spawned via `spawn_grouped!` and their children. +## +## This is called automatically on normal program exit, but you can call it +## explicitly for immediate cleanup. +## +## ``` +## server1 = Cmd.new("./server1") |> Cmd.spawn_grouped!()? +## server2 = Cmd.new("./server2") |> Cmd.spawn_grouped!()? +## +## # Kill all at once +## Cmd.kill_grouped!({})? +## ``` +kill_grouped! : {} => Result {} [KillFailed IOErr] +kill_grouped! = |{}| + Host.process_kill_all_grouped!({}) + |> Result.map_err(|err| KillFailed(InternalIOErr.handle_err(err))) diff --git a/platform/Host.roc b/platform/Host.roc index a939972..7ebe619 100644 --- a/platform/Host.roc +++ b/platform/Host.roc @@ -11,6 +11,15 @@ hosted [ process_kill!, process_wait!, process_poll!, + command_spawn_grouped!, + grouped_process_write_bytes!, + grouped_process_read_bytes!, + grouped_process_read_stderr_bytes!, + grouped_process_close_stdin!, + grouped_process_kill!, + grouped_process_wait!, + grouped_process_poll!, + process_kill_all_grouped!, current_arch_os!, cwd!, dir_create!, @@ -96,6 +105,17 @@ process_kill! : U64 => Result {} InternalIOErr.IOErrFromHost process_wait! : U64 => Result { stdout_bytes : List U8, stderr_bytes : List U8, exit_code : I32 } InternalIOErr.IOErrFromHost process_poll! : U64 => Result InternalCmd.PollResult InternalIOErr.IOErrFromHost +# GROUPED PROCESS (subprocess with automatic cleanup when parent exits) +command_spawn_grouped! : InternalCmd.Command => Result U64 InternalIOErr.IOErrFromHost +grouped_process_write_bytes! : U64, List U8 => Result {} InternalIOErr.IOErrFromHost +grouped_process_read_bytes! : U64, U64 => Result (List U8) InternalIOErr.IOErrFromHost +grouped_process_read_stderr_bytes! : U64, U64 => Result (List U8) InternalIOErr.IOErrFromHost +grouped_process_close_stdin! : U64 => Result {} InternalIOErr.IOErrFromHost +grouped_process_kill! : U64 => Result {} InternalIOErr.IOErrFromHost +grouped_process_wait! : U64 => Result { stdout_bytes : List U8, stderr_bytes : List U8, exit_code : I32 } InternalIOErr.IOErrFromHost +grouped_process_poll! : U64 => Result InternalCmd.PollResult InternalIOErr.IOErrFromHost +process_kill_all_grouped! : {} => Result {} InternalIOErr.IOErrFromHost + # FILE file_write_bytes! : List U8, List U8 => Result {} InternalIOErr.IOErrFromHost file_write_utf8! : List U8, Str => Result {} InternalIOErr.IOErrFromHost diff --git a/tests.sh b/tests.sh new file mode 100755 index 0000000..4cf9912 --- /dev/null +++ b/tests.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash +set -eo pipefail + +cd "$(dirname "$0")" + +indent() { + sed 's/^/ /' +} + +for test_file in tests/*.roc; do + # Skip helper files (not actual tests) + if [[ "$test_file" == *-helper.roc ]]; then + continue + fi + + echo "Running $test_file..." + + DB_PATH=./tests/test.db roc dev --linker legacy "$test_file" 2>&1 | indent + + if [ "${PIPESTATUS[0]}" -ne 0 ]; then + echo "FAILED: $test_file" + exit 1 + fi +done + +echo "All tests passed." diff --git a/tests/pdeathsig-helper.roc b/tests/pdeathsig-helper.roc new file mode 100644 index 0000000..6cddf27 --- /dev/null +++ b/tests/pdeathsig-helper.roc @@ -0,0 +1,18 @@ +app [main!] { pf: platform "../platform/main.roc" } + +import pf.Cmd +import pf.Arg exposing [Arg] +import pf.Sleep + +# Helper for PR_SET_PDEATHSIG test. +# Spawns "sleep 9999" via spawn_grouped! then sleeps forever. +# When this process is killed, "sleep 9999" should also die. + +main! : List Arg => Result {} _ +main! = |_args| + # Spawn child via spawn_grouped! (sets PR_SET_PDEATHSIG) + _ = Cmd.new("sleep") |> Cmd.args(["9999"]) |> Cmd.spawn_grouped!()? + + # Sleep forever - test will kill us + Sleep.millis!(999999999) + Ok({}) diff --git a/tests/spawn-grouped-pdeathsig-test.roc b/tests/spawn-grouped-pdeathsig-test.roc new file mode 100644 index 0000000..b7edf18 --- /dev/null +++ b/tests/spawn-grouped-pdeathsig-test.roc @@ -0,0 +1,177 @@ +app [main!] { pf: platform "../platform/main.roc" } + +import pf.Stdout +import pf.Cmd +import pf.Arg exposing [Arg] +import pf.Sleep + +# Integration test for PR_SET_PDEATHSIG behavior. +# +# PR_SET_PDEATHSIG ensures that when a parent process dies, children +# spawned via spawn_grouped! automatically receive SIGKILL. +# +# NOTE: This test only works on Linux. PR_SET_PDEATHSIG is Linux-specific. +# On other platforms, this test will skip gracefully. +# +# Test strategy: +# 1. Build a helper that spawns "sleep 9999" via spawn_grouped! +# 2. Run helper, wait for child to appear +# 3. Kill helper with SIGKILL +# 4. Verify "sleep 9999" also died + +main! : List Arg => Result {} _ +main! = |_args| + # Platform check: skip on non-Linux + when check_platform!({}) is + Err(NotLinux(platform)) -> + Stdout.line!("SKIP: PR_SET_PDEATHSIG test only runs on Linux (detected: ${platform})") + Ok({}) -> + run_pdeathsig_test!({}) + +run_pdeathsig_test! : {} => Result {} _ +run_pdeathsig_test! = |{}| + # Clean up any leftover processes from previous runs + _ = Cmd.new("pkill") |> Cmd.args(["-9", "-f", "sleep 9999"]) |> Cmd.exec_output!() + _ = Cmd.new("pkill") |> Cmd.args(["-9", "-f", "pdeathsig-helper"]) |> Cmd.exec_output!() + Sleep.millis!(100) + + # Build the helper + Stdout.line!("Building helper...")? + build_result = Cmd.new("roc") + |> Cmd.args(["build", "--linker", "legacy", "tests/pdeathsig-helper.roc", "--output", "tests/pdeathsig-helper"]) + |> Cmd.exec_output!() + + when build_result is + Ok(_) -> {} + Err(e) -> + Stdout.line!("Build failed: ${Inspect.to_str(e)}")? + Err(BuildFailed)? + + # Start the helper in background using spawn! (not spawn_grouped!) + # We use spawn! here because we want to test that the HELPER's use of + # spawn_grouped! causes its child to die when we kill the helper. + # Using spawn_grouped! here would confuse what we're testing. + Stdout.line!("Starting helper...")? + _ = Cmd.new("tests/pdeathsig-helper") |> Cmd.spawn!()? + + # Poll for child (sleep 9999) to appear (avoid race condition with fixed sleep) + child_pid = poll_for_sleep_pid!(50)? # 50 attempts * 100ms = 5s timeout + + Stdout.line!("Found child PID: ${Num.to_str(child_pid)}")? + + # Find and kill the helper with SIGKILL + helper_pid = find_helper_pid!({})? + Stdout.line!("Killing helper (PID ${Num.to_str(helper_pid)}) with SIGKILL...")? + + # Kill helper - ignore result (might already be dead) + _ = Cmd.new("kill") + |> Cmd.args(["-9", Num.to_str(helper_pid)]) + |> Cmd.exec_output!() + + # Poll for child death (avoid fixed sleep race condition) + child_died = poll_for_child_death!(child_pid, 50)? # 50 attempts * 100ms = 5s timeout + + if child_died then + Stdout.line!("PASS: Child died when parent was killed (PR_SET_PDEATHSIG works)") + else + # Clean up + _ = Cmd.new("kill") |> Cmd.args(["-9", Num.to_str(child_pid)]) |> Cmd.exec_output!() + Stdout.line!("FAIL: Child survived! PR_SET_PDEATHSIG not working.")? + Err(PdeathsigFailed) + +## Poll for "sleep 9999" to appear, with retries +poll_for_sleep_pid! : U64 => Result I64 _ +poll_for_sleep_pid! = |max_attempts| + poll_for_sleep_loop!(max_attempts, 0) + +poll_for_sleep_loop! : U64, U64 => Result I64 _ +poll_for_sleep_loop! = |max_attempts, attempt| + if attempt >= max_attempts then + Err(FailedExpectation("sleep 9999 process did not appear after ${Num.to_str(max_attempts)} attempts")) + else + when try_find_sleep_pid!({}) is + Ok(pid) -> Ok(pid) + Err(_) -> + Sleep.millis!(100) + poll_for_sleep_loop!(max_attempts, attempt + 1) + +## Try to find "sleep 9999" PID (returns error if not found) +try_find_sleep_pid! : {} => Result I64 [NotFound] +try_find_sleep_pid! = |{}| + result = Cmd.new("pgrep") + |> Cmd.args(["-f", "sleep 9999"]) + |> Cmd.exec_output!() + + when result is + Ok({ stdout_utf8, stderr_utf8_lossy: _ }) -> + pid_str = stdout_utf8 |> Str.trim |> Str.split_on("\n") |> List.first |> Result.with_default("") + when Str.to_i64(pid_str) is + Ok(pid) -> Ok(pid) + Err(_) -> Err(NotFound) + Err(_) -> + Err(NotFound) + +## Find helper PID +find_helper_pid! : {} => Result I64 _ +find_helper_pid! = |{}| + result = Cmd.new("pgrep") + |> Cmd.args(["-f", "pdeathsig-helper"]) + |> Cmd.exec_output!() + + when result is + Ok({ stdout_utf8, stderr_utf8_lossy: _ }) -> + pid_str = stdout_utf8 |> Str.trim |> Str.split_on("\n") |> List.first |> Result.with_default("") + when Str.to_i64(pid_str) is + Ok(pid) -> Ok(pid) + Err(_) -> Err(FailedExpectation("Could not parse helper PID")) + Err(_) -> + Err(FailedExpectation("Could not find helper process")) + +## Check if PID is alive +is_pid_alive! : I64 => Result Bool _ +is_pid_alive! = |pid| + result = Cmd.new("kill") + |> Cmd.args(["-0", Num.to_str(pid)]) + |> Cmd.exec_output!() + + when result is + Ok(_) -> Ok(Bool.true) + Err(_) -> Ok(Bool.false) + +## Poll for child to die, with retries +poll_for_child_death! : I64, U64 => Result Bool _ +poll_for_child_death! = |pid, max_attempts| + poll_death_loop!(pid, max_attempts, 0) + +poll_death_loop! : I64, U64, U64 => Result Bool _ +poll_death_loop! = |pid, max_attempts, attempt| + if attempt >= max_attempts then + # Timeout - child is still alive + Ok(Bool.false) + else + alive = is_pid_alive!(pid)? + if alive then + Sleep.millis!(100) + poll_death_loop!(pid, max_attempts, attempt + 1) + else + # Child died + Ok(Bool.true) + +## Check if running on Linux +check_platform! : {} => Result {} [NotLinux Str] +check_platform! = |{}| + result = Cmd.new("uname") + |> Cmd.args(["-s"]) + |> Cmd.exec_output!() + + when result is + Ok({ stdout_utf8, stderr_utf8_lossy: _ }) -> + platform = Str.trim(stdout_utf8) + if platform == "Linux" then + Ok({}) + else + Err(NotLinux(platform)) + + Err(_) -> + # Can't determine platform, assume not Linux + Err(NotLinux("unknown")) diff --git a/tests/spawn-grouped-test.roc b/tests/spawn-grouped-test.roc new file mode 100644 index 0000000..d5e7638 --- /dev/null +++ b/tests/spawn-grouped-test.roc @@ -0,0 +1,493 @@ +app [main!] { pf: platform "../platform/main.roc" } + +import pf.Stdout +import pf.Cmd +import pf.Arg exposing [Arg] +import pf.Sleep +import pf.File + +# Tests for spawn_grouped! and kill_grouped! +# +# These tests focus on behavior UNIQUE to spawn_grouped! that differs +# from spawn! (which is tested in cmd-test.roc). +# +# We verify behavior using the API itself (process handles), not external +# tools like ps/grep which are brittle and platform-specific. + +main! : List Arg => Result {} _ +main! = |_args| + + # === spawn_grouped! basic: spawn, kill, verify dead === + proc1 = Cmd.new("sleep") |> Cmd.args(["60"]) |> Cmd.spawn_grouped!()? + + # Verify it's running + when proc1.poll!({})? is + Running -> {} + Exited(_) -> Err(FailedExpectation("proc1 should be Running"))? + + # Kill it + proc1.kill!({})? + + # Verify it's dead (poll returns error after kill) + when proc1.poll!({}) is + Err(PollFailed(_)) -> {} + other -> Err(FailedExpectation("proc1 poll after kill - expected PollFailed, got ${Inspect.to_str(other)}"))? + + # === kill_grouped! kills all tracked processes === + # Spawn 3 processes and KEEP the handles + proc_a = Cmd.new("sleep") |> Cmd.args(["60"]) |> Cmd.spawn_grouped!()? + proc_b = Cmd.new("sleep") |> Cmd.args(["60"]) |> Cmd.spawn_grouped!()? + proc_c = Cmd.new("sleep") |> Cmd.args(["60"]) |> Cmd.spawn_grouped!()? + + # Verify all running + when proc_a.poll!({})? is + Running -> {} + Exited(_) -> Err(FailedExpectation("proc_a should be Running"))? + when proc_b.poll!({})? is + Running -> {} + Exited(_) -> Err(FailedExpectation("proc_b should be Running"))? + when proc_c.poll!({})? is + Running -> {} + Exited(_) -> Err(FailedExpectation("proc_c should be Running"))? + + # Kill ALL grouped processes at once + Cmd.kill_grouped!({})? + + # Verify ALL are dead via their handles + when proc_a.poll!({}) is + Err(PollFailed(_)) -> {} + other -> Err(FailedExpectation("proc_a after kill_grouped - expected PollFailed, got ${Inspect.to_str(other)}"))? + when proc_b.poll!({}) is + Err(PollFailed(_)) -> {} + other -> Err(FailedExpectation("proc_b after kill_grouped - expected PollFailed, got ${Inspect.to_str(other)}"))? + when proc_c.poll!({}) is + Err(PollFailed(_)) -> {} + other -> Err(FailedExpectation("proc_c after kill_grouped - expected PollFailed, got ${Inspect.to_str(other)}"))? + + # === kill_grouped! twice is safe === + Cmd.kill_grouped!({})? # Should not error even with nothing to kill + + # === kill_grouped! with no processes is safe === + # (We just killed everything above, so this tests empty state) + Cmd.kill_grouped!({})? + + # === Spawn after kill_grouped! works === + proc_after = Cmd.new("sh") |> Cmd.args(["-c", "echo ok"]) |> Cmd.spawn_grouped!()? + result = proc_after.wait!({})? + if result.stdout != Str.to_utf8("ok\n") then + Err(FailedExpectation("spawn after kill_grouped failed"))? + else + {} + + # === wait! captures stdout, stderr, and exit_code correctly === + # Test stdout (already tested above, but let's be explicit) + proc_stdout = Cmd.new("sh") |> Cmd.args(["-c", "echo hello"]) |> Cmd.spawn_grouped!()? + stdout_result = proc_stdout.wait!({})? + if stdout_result.stdout != Str.to_utf8("hello\n") then + Err(FailedExpectation("wait! stdout mismatch: expected 'hello\\n', got ${Inspect.to_str(stdout_result.stdout)}"))? + else + {} + + # Test stderr + proc_stderr = Cmd.new("sh") |> Cmd.args(["-c", "echo error >&2"]) |> Cmd.spawn_grouped!()? + stderr_result = proc_stderr.wait!({})? + if stderr_result.stderr != Str.to_utf8("error\n") then + Err(FailedExpectation("wait! stderr mismatch: expected 'error\\n', got ${Inspect.to_str(stderr_result.stderr)}"))? + else + {} + + # Test exit code + proc_exit = Cmd.new("sh") |> Cmd.args(["-c", "exit 42"]) |> Cmd.spawn_grouped!()? + exit_result = proc_exit.wait!({})? + if exit_result.exit_code != 42 then + Err(FailedExpectation("wait! exit_code mismatch: expected 42, got ${Num.to_str(exit_result.exit_code)}"))? + else + {} + + # === poll! returns Exited with correct data (stdout, stderr, exit_code) === + proc_poll = Cmd.new("sh") |> Cmd.args(["-c", "echo polled; echo poll_err >&2; exit 7"]) |> Cmd.spawn_grouped!()? + # Poll until exited + poll_exited = poll_until_exited!(proc_poll, 50)? + when poll_exited is + { exit_code, stdout, stderr } -> + if exit_code != 7 then + Err(FailedExpectation("poll! exit_code mismatch: expected 7, got ${Num.to_str(exit_code)}"))? + else if stdout != Str.to_utf8("polled\n") then + Err(FailedExpectation("poll! stdout mismatch: expected 'polled\\n', got ${Inspect.to_str(stdout)}"))? + else if stderr != Str.to_utf8("poll_err\n") then + Err(FailedExpectation("poll! stderr mismatch: expected 'poll_err\\n', got ${Inspect.to_str(stderr)}"))? + else + {} + + # === wait! with combined stdout AND stderr === + proc_both = Cmd.new("sh") |> Cmd.args(["-c", "echo out1; echo err1 >&2; echo out2; echo err2 >&2"]) |> Cmd.spawn_grouped!()? + both_result = proc_both.wait!({})? + if both_result.stdout != Str.to_utf8("out1\nout2\n") then + Err(FailedExpectation("combined stdout mismatch: got ${Inspect.to_str(both_result.stdout)}"))? + else if both_result.stderr != Str.to_utf8("err1\nerr2\n") then + Err(FailedExpectation("combined stderr mismatch: got ${Inspect.to_str(both_result.stderr)}"))? + else + {} + + # === wait! twice returns error (process already consumed) === + proc_wait_twice = Cmd.new("sh") |> Cmd.args(["-c", "echo once"]) |> Cmd.spawn_grouped!()? + when proc_wait_twice.wait!({}) is + Ok(_) -> {} # First call succeeds + Err(e) -> Err(FailedExpectation("first wait! should succeed, got ${Inspect.to_str(e)}"))? + when proc_wait_twice.wait!({}) is + Err(WaitFailed(_)) -> {} + Ok(_) -> Err(FailedExpectation("wait! twice should fail, but second call succeeded"))? + + # === poll! after wait! returns error === + proc_poll_after_wait = Cmd.new("sh") |> Cmd.args(["-c", "echo done"]) |> Cmd.spawn_grouped!()? + when proc_poll_after_wait.wait!({}) is + Ok(_) -> {} + Err(e) -> Err(FailedExpectation("wait! should succeed, got ${Inspect.to_str(e)}"))? + when proc_poll_after_wait.poll!({}) is + Err(PollFailed(_)) -> {} + Ok(_) -> Err(FailedExpectation("poll! after wait! should fail, but succeeded"))? + + # === wait! after kill! returns error === + proc_wait_after_kill = Cmd.new("sleep") |> Cmd.args(["60"]) |> Cmd.spawn_grouped!()? + proc_wait_after_kill.kill!({})? + when proc_wait_after_kill.wait!({}) is + Err(WaitFailed(_)) -> {} + Ok(_) -> Err(FailedExpectation("wait! after kill! should fail, but succeeded"))? + + # === kill! twice on same process returns error === + proc_kill_twice = Cmd.new("sleep") |> Cmd.args(["60"]) |> Cmd.spawn_grouped!()? + when proc_kill_twice.kill!({}) is + Ok({}) -> {} # First kill succeeds + Err(e) -> Err(FailedExpectation("first kill! should succeed, got ${Inspect.to_str(e)}"))? + when proc_kill_twice.kill!({}) is + Err(KillFailed(_)) -> {} + Ok(_) -> Err(FailedExpectation("kill! twice should fail, but second call succeeded"))? + + # === poll! twice after Exited returns error (process consumed) === + proc_poll_twice = Cmd.new("sh") |> Cmd.args(["-c", "exit 0"]) |> Cmd.spawn_grouped!()? + first_poll = poll_until_exited!(proc_poll_twice, 50)? + when first_poll is + { exit_code: 0, stdout: _, stderr: _ } -> {} + other -> Err(FailedExpectation("first poll! should return Exited with exit_code 0, got ${Inspect.to_str(other)}"))? + # Second poll should fail (process already consumed) + when proc_poll_twice.poll!({}) is + Err(PollFailed(_)) -> {} + Ok(_) -> Err(FailedExpectation("poll! twice should fail, but second call succeeded"))? + + # === CRITICAL: kill_grouped! only kills grouped processes, not spawn! processes === + proc_normal = Cmd.new("sleep") |> Cmd.args(["60"]) |> Cmd.spawn!()? + proc_grouped = Cmd.new("sleep") |> Cmd.args(["60"]) |> Cmd.spawn_grouped!()? + + # Verify both running + when proc_normal.poll!({})? is + Running -> {} + Exited(_) -> Err(FailedExpectation("proc_normal should be Running"))? + when proc_grouped.poll!({})? is + Running -> {} + Exited(_) -> Err(FailedExpectation("proc_grouped should be Running"))? + + # Kill all GROUPED processes + Cmd.kill_grouped!({})? + + # proc_grouped should be dead + when proc_grouped.poll!({}) is + Err(PollFailed(_)) -> {} + other -> Err(FailedExpectation("proc_grouped after kill_grouped - expected PollFailed, got ${Inspect.to_str(other)}"))? + + # proc_normal should STILL BE RUNNING + when proc_normal.poll!({})? is + Running -> {} + Exited(_) -> Err(FailedExpectation("proc_normal should still be Running after kill_grouped!"))? + + # Clean up the normal process and verify it died + proc_normal.kill!({})? + when proc_normal.poll!({}) is + Err(PollFailed(_)) -> {} + other -> Err(FailedExpectation("proc_normal after cleanup kill - expected PollFailed, got ${Inspect.to_str(other)}"))? + + # === Individual kill! after kill_grouped! returns error === + # (Process was already killed by kill_grouped!) + proc_d = Cmd.new("sleep") |> Cmd.args(["60"]) |> Cmd.spawn_grouped!()? + Cmd.kill_grouped!({})? + when proc_d.kill!({}) is + Err(KillFailed(_)) -> {} + other -> Err(FailedExpectation("kill after kill_grouped - expected KillFailed, got ${Inspect.to_str(other)}"))? + + # === stdin/stdout I/O round-trip test === + # Verifies that write_stdin!, close_stdin!, and stdout capture work for grouped processes + proc_cat = Cmd.new("cat") |> Cmd.spawn_grouped!()? + proc_cat.write_stdin!(Str.to_utf8("hello from stdin"))? + proc_cat.close_stdin!({})? + cat_result = proc_cat.wait!({})? + if cat_result.stdout != Str.to_utf8("hello from stdin") then + Err(FailedExpectation("stdin/stdout round-trip failed: expected 'hello from stdin', got ${Inspect.to_str(cat_result.stdout)}"))? + else + {} + + # === stderr I/O test === + # Verifies stderr capture works for grouped processes + proc_stderr_io = Cmd.new("sh") |> Cmd.args(["-c", "cat >&2"]) |> Cmd.spawn_grouped!()? + proc_stderr_io.write_stdin!(Str.to_utf8("error output"))? + proc_stderr_io.close_stdin!({})? + stderr_io_result = proc_stderr_io.wait!({})? + if stderr_io_result.stderr != Str.to_utf8("error output") then + Err(FailedExpectation("stderr I/O test failed: expected 'error output', got ${Inspect.to_str(stderr_io_result.stderr)}"))? + else + {} + + # === environment variables work with spawn_grouped! === + proc_env = Cmd.new("sh") + |> Cmd.args(["-c", "echo $MY_TEST_VAR"]) + |> Cmd.env("MY_TEST_VAR", "it_works") + |> Cmd.spawn_grouped!()? + env_result = proc_env.wait!({})? + if env_result.stdout != Str.to_utf8("it_works\n") then + Err(FailedExpectation("env var test failed: expected 'it_works\\n', got ${Inspect.to_str(env_result.stdout)}"))? + else + {} + + # === spawn failure returns error === + when Cmd.new("/nonexistent/command/that/does/not/exist") |> Cmd.spawn_grouped!() is + Err(SpawnFailed(_)) -> {} + Ok(_) -> Err(FailedExpectation("spawning nonexistent command should fail"))? + + # === read_stdout! basic test === + cat_read = Cmd.new("cat") |> Cmd.spawn_grouped!()? + cat_read.write_stdin!([72, 101, 108, 108, 111])? + read_out = cat_read.read_stdout!(5)? + if read_out != [72, 101, 108, 108, 111] then + Err(FailedExpectation("read_stdout! failed: expected [72, 101, 108, 108, 111], got ${Inspect.to_str(read_out)}"))? + else + {} + cat_read.kill!({})? + + # === close_stdin! + read_stdout! EOF test === + eof_proc = Cmd.new("sh") |> Cmd.args(["-c", "cat; echo done"]) |> Cmd.spawn_grouped!()? + eof_proc.write_stdin!(Str.to_utf8("hi"))? + eof_proc.close_stdin!({})? + eof_out = eof_proc.read_stdout!(7)? # "hi" + "done\n" = 7 bytes + if eof_out != Str.to_utf8("hidone\n") then + Err(FailedExpectation("close_stdin! EOF test failed: expected 'hidone\\n', got ${Inspect.to_str(eof_out)}"))? + else + {} + eof_proc.kill!({})? + + # === read_stderr! test === + stderr_read = Cmd.new("sh") |> Cmd.args(["-c", "echo err >&2"]) |> Cmd.spawn_grouped!()? + stderr_read.close_stdin!({})? + stderr_out = stderr_read.read_stderr!(4)? + if stderr_out != Str.to_utf8("err\n") then + Err(FailedExpectation("read_stderr! failed: expected 'err\\n', got ${Inspect.to_str(stderr_out)}"))? + else + {} + stderr_read.kill!({})? + + # === write after close_stdin! returns error === + closed = Cmd.new("cat") |> Cmd.spawn_grouped!()? + closed.close_stdin!({})? + when closed.write_stdin!([65]) is + Err(WriteFailed(_)) -> {} + other -> Err(FailedExpectation("write after close_stdin! should fail, got ${Inspect.to_str(other)}"))? + closed.kill!({})? + + # === operations after kill! return error === + killed = Cmd.new("cat") |> Cmd.spawn_grouped!()? + killed.kill!({})? + when killed.write_stdin!([65]) is + Err(WriteFailed(_)) -> {} + other -> Err(FailedExpectation("write after kill! should fail, got ${Inspect.to_str(other)}"))? + when killed.read_stdout!(1) is + Err(ReadFailed(_)) -> {} + other -> Err(FailedExpectation("read after kill! should fail, got ${Inspect.to_str(other)}"))? + + # === CRITICAL: Verify process tree killing (grandchildren die) === + # This is THE core feature of spawn_grouped! - killing the entire process tree. + # We spawn a grandchild, write its PID to a temp file, then kill the parent. + # The grandchild should die because it's in the same process group. + + # Clean up temp file from any previous failed runs (ignore error if doesn't exist) + _ = File.delete!("/tmp/roc_test_grandchild_pid") + + tree = Cmd.new("sh") + |> Cmd.args(["-c", "sleep 300 & echo $! > /tmp/roc_test_grandchild_pid; wait"]) + |> Cmd.spawn_grouped!()? + Sleep.millis!(200) # Let grandchild spawn and PID be written + + # Read the grandchild PID from the temp file + read_pid = Cmd.new("cat") |> Cmd.args(["/tmp/roc_test_grandchild_pid"]) |> Cmd.spawn!()? + read_pid_result = read_pid.wait!({})? + pid_str = Str.from_utf8(read_pid_result.stdout) |> Result.with_default("") |> Str.trim + + # Verify grandchild is alive before we kill + alive_check = Cmd.new("sh") + |> Cmd.args(["-c", "kill -0 ${pid_str} 2>/dev/null"]) + |> Cmd.spawn!()? + alive_result = alive_check.wait!({})? + if alive_result.exit_code != 0 then + Err(FailedExpectation("grandchild PID ${pid_str} should be alive before kill!"))? + else + {} + + # Kill the parent (and its process group, including grandchild) + tree.kill!({})? + Sleep.millis!(100) # Let cleanup happen + + # Verify grandchild is dead using kill -0 (returns 0 if process exists, non-zero if dead) + dead_check = Cmd.new("sh") + |> Cmd.args(["-c", "kill -0 ${pid_str} 2>/dev/null"]) + |> Cmd.spawn!()? + dead_result = dead_check.wait!({})? + + # Cleanup temp file + _ = File.delete!("/tmp/roc_test_grandchild_pid") + + if dead_result.exit_code == 0 then + Err(FailedExpectation("grandchild PID ${pid_str} still alive after parent killed!"))? + else + {} # exit_code != 0 means process doesn't exist (good!) + + # === Large I/O test === + # Verify wait! handles large stdout without deadlock + # Generate 100KB of output (100 * 1000 bytes + newlines) + large_proc = Cmd.new("sh") |> Cmd.args(["-c", "dd if=/dev/zero bs=1024 count=100 2>/dev/null | base64"]) |> Cmd.spawn_grouped!()? + large_result = large_proc.wait!({})? + large_size = List.len(large_result.stdout) + # base64 of 100KB is ~137KB (4/3 ratio + newlines) + if large_size < 100000 then + Err(FailedExpectation("large I/O test: expected >100KB stdout, got ${Num.to_str(large_size)} bytes"))? + else + {} + + # === Simultaneous stdout AND stderr (verify both captured) === + # Using 100KB each to verify concurrent reading works (would deadlock with sequential reads) + both_large = Cmd.new("sh") + |> Cmd.args(["-c", "dd if=/dev/zero bs=1024 count=100 2>/dev/null | base64; dd if=/dev/zero bs=1024 count=100 2>/dev/null | base64 >&2"]) + |> Cmd.spawn_grouped!()? + both_large_result = both_large.wait!({})? + if List.len(both_large_result.stdout) < 100000 then + Err(FailedExpectation("simultaneous I/O: stdout too small, got ${Num.to_str(List.len(both_large_result.stdout))} bytes"))? + else if List.len(both_large_result.stderr) < 100000 then + Err(FailedExpectation("simultaneous I/O: stderr too small, got ${Num.to_str(List.len(both_large_result.stderr))} bytes"))? + else + {} + + # NOTE: Cmd.cwd is not available in this version of basic-cli + # Working directory test skipped + + # === read_stdout! requires exact byte count (fails if not enough data) === + # Note: read_stdout! blocks until requested bytes available, fails if stream closes early + exact_proc = Cmd.new("sh") |> Cmd.args(["-c", "echo -n abc"]) |> Cmd.spawn_grouped!()? + exact_proc.close_stdin!({})? + Sleep.millis!(50) # Let output arrive + # Request exactly 3 bytes (what's available) + exact_out = exact_proc.read_stdout!(3)? + if exact_out != Str.to_utf8("abc") then + Err(FailedExpectation("exact read test: expected 'abc', got ${Inspect.to_str(exact_out)}"))? + else + {} + exact_proc.kill!({})? + + # Verify that requesting more bytes than available fails + short_proc = Cmd.new("sh") |> Cmd.args(["-c", "echo -n hi"]) |> Cmd.spawn_grouped!()? + short_proc.close_stdin!({})? + Sleep.millis!(50) + when short_proc.read_stdout!(100) is + Err(ReadFailed(_)) -> {} # Expected: not enough data + Ok(_) -> Err(FailedExpectation("read_stdout! should fail when requesting more bytes than available"))? + short_proc.kill!({})? + + # === Binary data with null bytes (no corruption) === + binary_proc = Cmd.new("sh") |> Cmd.args(["-c", "printf '\\x00\\x01\\x02\\xff'"]) |> Cmd.spawn_grouped!()? + binary_result = binary_proc.wait!({})? + if binary_result.stdout != [0, 1, 2, 255] then + Err(FailedExpectation("binary data test: expected [0, 1, 2, 255], got ${Inspect.to_str(binary_result.stdout)}"))? + else + {} + + # === Empty output test === + empty_proc = Cmd.new("true") |> Cmd.spawn_grouped!()? + empty_result = empty_proc.wait!({})? + if List.len(empty_result.stdout) != 0 then + Err(FailedExpectation("empty output: expected empty stdout, got ${Num.to_str(List.len(empty_result.stdout))} bytes"))? + else if List.len(empty_result.stderr) != 0 then + Err(FailedExpectation("empty output: expected empty stderr, got ${Num.to_str(List.len(empty_result.stderr))} bytes"))? + else if empty_result.exit_code != 0 then + Err(FailedExpectation("empty output: expected exit_code 0, got ${Num.to_str(empty_result.exit_code)}"))? + else + {} + + # === Rapid spawn/kill (race condition test) === + rapid_spawn_kill!(50)? + + # === Stress test: spawn many processes simultaneously === + # Spawn 20 processes, verify all can be killed with kill_grouped! + stress_procs = spawn_many!(20, [])? + # Verify all running + verify_all_running!(stress_procs)? + # Kill all at once + Cmd.kill_grouped!({})? + # Verify all dead + verify_all_dead!(stress_procs)? + + Stdout.line!("All spawn_grouped! tests passed.") + +## Poll a process until it exits, with max retries +poll_until_exited! : Cmd.ChildProcess, U64 => Result { exit_code : I32, stdout : List U8, stderr : List U8 } _ +poll_until_exited! = |proc, max_retries| + poll_loop!(proc, max_retries, 0) + +poll_loop! : Cmd.ChildProcess, U64, U64 => Result { exit_code : I32, stdout : List U8, stderr : List U8 } _ +poll_loop! = |proc, max_retries, attempt| + if attempt >= max_retries then + Err(FailedExpectation("Process did not exit after ${Num.to_str(max_retries)} poll attempts")) + else + when proc.poll!({}) is + Ok(Running) -> + Sleep.millis!(10) + poll_loop!(proc, max_retries, attempt + 1) + + Ok(Exited(data)) -> + Ok(data) + + Err(e) -> + Err(FailedExpectation("poll! failed: ${Inspect.to_str(e)}")) + +## Spawn N sleep processes for stress testing +spawn_many! : U64, List Cmd.ChildProcess => Result (List Cmd.ChildProcess) _ +spawn_many! = |remaining, acc| + if remaining == 0 then + Ok(acc) + else + proc = Cmd.new("sleep") |> Cmd.args(["60"]) |> Cmd.spawn_grouped!()? + spawn_many!(remaining - 1, List.append(acc, proc)) + +## Verify all processes in list are running +verify_all_running! : List Cmd.ChildProcess => Result {} _ +verify_all_running! = |procs| + when procs is + [] -> Ok({}) + [proc, .. as rest] -> + when proc.poll!({}) is + Ok(Running) -> verify_all_running!(rest) + Ok(Exited(_)) -> Err(FailedExpectation("stress test: process should be Running")) + Err(e) -> Err(FailedExpectation("stress test: poll failed ${Inspect.to_str(e)}")) + +## Verify all processes in list are dead (poll returns error) +verify_all_dead! : List Cmd.ChildProcess => Result {} _ +verify_all_dead! = |procs| + when procs is + [] -> Ok({}) + [proc, .. as rest] -> + when proc.poll!({}) is + Err(PollFailed(_)) -> verify_all_dead!(rest) + other -> Err(FailedExpectation("stress test: expected PollFailed, got ${Inspect.to_str(other)}")) + +## Rapid spawn/kill to test for race conditions +rapid_spawn_kill! : U64 => Result {} _ +rapid_spawn_kill! = |remaining| + if remaining == 0 then + Ok({}) + else + proc = Cmd.new("sleep") |> Cmd.args(["60"]) |> Cmd.spawn_grouped!()? + proc.kill!({})? + rapid_spawn_kill!(remaining - 1)