From 52a693e4dde42b9b8567e24c5276770b871ec203 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 10 Sep 2023 16:00:56 -0700 Subject: [PATCH] m: Centralize all global state into a single structure Signed-off-by: John Nunley --- src/lib.rs | 318 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 184 insertions(+), 134 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5c87762..6cacc7b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -90,8 +90,170 @@ mod sealed { pub trait Sealed {} } -/// An event delivered every time the SIGCHLD signal occurs. -static SIGCHLD: Event = Event::new(); +/// The zombie process reaper. +/// +/// This structure reaps zombie processes and emits the `SIGCHLD` signal. +struct Reaper { + /// An event delivered every time the SIGCHLD signal occurs. + sigchld: Event, + + /// The list of zombie processes. + zombies: Mutex>, + + /// The pipe that delivers signal notifications. + pipe: Pipe, +} + +impl Reaper { + /// Get the singleton instance of the reaper. + fn get() -> &'static Self { + static REAPER: OnceCell = OnceCell::new(); + + REAPER.get_or_init_blocking(|| { + thread::Builder::new() + .name("async-process".to_string()) + .spawn(|| REAPER.wait_blocking().reap()) + .expect("cannot spawn async-process thread"); + + Reaper { + sigchld: Event::new(), + zombies: Mutex::new(Vec::new()), + pipe: Pipe::new().expect("cannot create SIGCHLD pipe"), + } + }) + } + + /// Reap zombie processes forever. + fn reap(&'static self) -> ! { + loop { + // Wait for the next SIGCHLD signal. + self.pipe.wait(); + + // Notify all listeners waiting on the SIGCHLD event. + self.sigchld.notify(std::usize::MAX); + + // Reap zombie processes. + let mut zombies = self.zombies.lock().unwrap(); + let mut i = 0; + while i < zombies.len() { + if let Ok(None) = zombies[i].try_wait() { + i += 1; + } else { + zombies.swap_remove(i); + } + } + } + } + + /// Register a process with this reaper. + fn register(&'static self, child: &std::process::Child) -> io::Result<()> { + self.pipe.register(child) + } +} + +cfg_if::cfg_if! { + if #[cfg(windows)] { + use std::ffi::c_void; + use std::os::windows::io::AsRawHandle; + use std::sync::mpsc; + + use windows_sys::Win32::{ + Foundation::{BOOLEAN, HANDLE}, + System::Threading::{ + RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE, + }, + }; + + /// Waits for the next SIGCHLD signal. + struct Pipe { + /// The sender channel for the SIGCHLD signal. + sender: mpsc::SyncSender<()>, + + /// The receiver channel for the SIGCHLD signal. + receiver: Mutex>, + } + + impl Pipe { + /// Creates a new pipe. + fn new() -> io::Result { + let (sender, receiver) = mpsc::sync_channel(1); + Ok(Pipe { + sender, + receiver: Mutex::new(receiver), + }) + } + + /// Waits for the next SIGCHLD signal. + fn wait(&self) { + self.receiver.lock().unwrap().recv().ok(); + } + + /// Register a process object into this pipe. + fn register(&self, child: &std::process::Child) -> io::Result<()> { + // Called when a child exits. + unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) { + Reaper::get().pipe.sender.try_send(()).ok(); + } + + // Register this child process to invoke `callback` on exit. + let mut wait_object = 0; + let ret = unsafe { + RegisterWaitForSingleObject( + &mut wait_object, + child.as_raw_handle() as HANDLE, + Some(callback), + std::ptr::null_mut(), + INFINITE, + WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE, + ) + }; + + if ret == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } + } + } + + // Wraps a sync I/O type into an async I/O type. + fn wrap(io: T) -> io::Result> { + Ok(Unblock::new(io)) + } + } else if #[cfg(unix)] { + use async_signal::{Signal, Signals}; + + /// Waits for the next SIGCHLD signal. + struct Pipe { + /// The iterator over SIGCHLD signals. + signals: Signals, + } + + impl Pipe { + /// Creates a new pipe. + fn new() -> io::Result { + Ok(Pipe { + signals: Signals::new(Some(Signal::Child))?, + }) + } + + /// Waits for the next SIGCHLD signal. + fn wait(&self) { + async_io::block_on((&self.signals).next()); + } + + /// Register a process object into this pipe. + fn register(&self, _child: &std::process::Child) -> io::Result<()> { + Ok(()) + } + } + + /// Wrap a file descriptor into a non-blocking I/O type. + fn wrap(io: T) -> io::Result> { + Async::new(io) + } + } +} /// A guard that can kill child processes, or push them into the zombie list. struct ChildGuard { @@ -106,6 +268,21 @@ impl ChildGuard { } } +// When the last reference to the child process is dropped, push it into the zombie list. +impl Drop for ChildGuard { + fn drop(&mut self) { + if self.kill_on_drop { + self.get_mut().kill().ok(); + } + if self.reap_on_drop { + let mut zombies = Reaper::get().zombies.lock().unwrap(); + if let Ok(None) = self.get_mut().try_wait() { + zombies.push(self.inner.take().unwrap()); + } + } + } +} + /// A spawned child process. /// /// The process can be in running or exited state. Use [`status()`][`Child::status()`] or @@ -144,6 +321,8 @@ impl Child { /// The "async-process" thread waits for processes in the global list and cleans up the /// resources when they exit. fn new(cmd: &mut Command) -> io::Result { + // Make sure the reaper exists before we spawn the child process. + let reaper = Reaper::get(); let mut child = cmd.inner.spawn()?; // Convert sync I/O types into async I/O types. @@ -151,137 +330,8 @@ impl Child { let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout); let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr); - cfg_if::cfg_if! { - if #[cfg(windows)] { - use std::ffi::c_void; - use std::os::windows::io::AsRawHandle; - use std::sync::mpsc; - - use windows_sys::Win32::{ - Foundation::{BOOLEAN, HANDLE}, - System::Threading::{ - RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE, - }, - }; - - // This channel is used to simulate SIGCHLD on Windows. - fn callback_channel() -> (&'static mpsc::SyncSender<()>, &'static Mutex>) { - static CALLBACK: OnceCell<(mpsc::SyncSender<()>, Mutex>)> = - OnceCell::new(); - - let (s, r) = CALLBACK.get_or_init_blocking(|| { - let (s, r) = mpsc::sync_channel(1); - (s, Mutex::new(r)) - }); - - (s, r) - } - - // Called when a child exits. - unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) { - callback_channel().0.try_send(()).ok(); - } - - // Register this child process to invoke `callback` on exit. - let mut wait_object = 0; - let ret = unsafe { - RegisterWaitForSingleObject( - &mut wait_object, - child.as_raw_handle() as HANDLE, - Some(callback), - std::ptr::null_mut(), - INFINITE, - WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE, - ) - }; - if ret == 0 { - return Err(io::Error::last_os_error()); - } - - // Waits for the next SIGCHLD signal. - fn wait_sigchld() { - callback_channel().1.lock().unwrap().recv().ok(); - } - - // Wraps a sync I/O type into an async I/O type. - fn wrap(io: T) -> io::Result> { - Ok(Unblock::new(io)) - } - - } else if #[cfg(unix)] { - use async_signal::{Signal, Signals}; - - static SIGNALS: OnceCell = OnceCell::new(); - - // Make sure the signal handler is registered before interacting with the process. - SIGNALS.get_or_init_blocking(|| { - Signals::new(Some(Signal::Child)) - .expect("Failed to register SIGCHLD handler") - }); - - // Waits for the next SIGCHLD signal. - fn wait_sigchld() { - async_io::block_on( - SIGNALS - .get() - .expect("Signals not registered") - .next() - ); - } - - // Wraps a sync I/O type into an async I/O type. - fn wrap(io: T) -> io::Result> { - Async::new(io) - } - } - } - - static ZOMBIES: OnceCell>> = OnceCell::new(); - - // Make sure the thread is started. - ZOMBIES.get_or_init_blocking(|| { - // Start a thread that handles SIGCHLD and notifies tasks when child processes exit. - thread::Builder::new() - .name("async-process".to_string()) - .spawn(move || { - loop { - // Wait for the next SIGCHLD signal. - wait_sigchld(); - - // Notify all listeners waiting on the SIGCHLD event. - SIGCHLD.notify(std::usize::MAX); - - // Reap zombie processes. - let mut zombies = ZOMBIES.get().unwrap().lock().unwrap(); - let mut i = 0; - while i < zombies.len() { - if let Ok(None) = zombies[i].try_wait() { - i += 1; - } else { - zombies.swap_remove(i); - } - } - } - }) - .expect("cannot spawn async-process thread"); - - Mutex::new(Vec::new()) - }); - - // When the last reference to the child process is dropped, push it into the zombie list. - impl Drop for ChildGuard { - fn drop(&mut self) { - if self.kill_on_drop { - self.get_mut().kill().ok(); - } - if self.reap_on_drop { - let mut zombies = ZOMBIES.get().unwrap().lock().unwrap(); - if let Ok(None) = self.get_mut().try_wait() { - zombies.push(self.inner.take().unwrap()); - } - } - } - } + // Register the child process in the global list. + reaper.register(&child)?; Ok(Child { stdin, @@ -381,7 +431,7 @@ impl Child { let child = self.child.clone(); async move { - let listener = EventListener::new(&SIGCHLD); + let listener = EventListener::new(&Reaper::get().sigchld); let mut listening = false; futures_lite::pin!(listener);