diff --git a/src/lib.rs b/src/lib.rs index 5fd618f..ecaf8e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,7 +59,6 @@ use std::convert::Infallible; use std::ffi::OsStr; use std::fmt; -use std::mem; use std::path::Path; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -75,13 +74,15 @@ use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd}; #[cfg(windows)] use blocking::Unblock; -use async_lock::{Mutex as AsyncMutex, OnceCell}; -use event_listener::Event; +use async_lock::OnceCell; use futures_lite::{future, io, prelude::*}; #[doc(no_inline)] pub use std::process::{ExitStatus, Output, Stdio}; +#[path = "reaper/signal.rs"] +mod reaper; + #[cfg(unix)] pub mod unix; #[cfg(windows)] @@ -99,17 +100,8 @@ static DRIVER_THREAD_SPAWNED: std::sync::atomic::AtomicBool = /// /// This structure reaps zombie processes and emits the `SIGCHLD` signal. struct Reaper { - /// An event delivered every time the SIGCHLD signal occurs. - sigchld: Event, - - /// The list of zombie processes. - zombies: Mutex>, - - /// The pipe that delivers signal notifications. - pipe: Pipe, - - /// Locking this mutex indicates that we are polling the SIGCHLD event. - driver_guard: AsyncMutex<()>, + /// Underlying system reaper. + sys: reaper::Reaper, /// The number of tasks polling the SIGCHLD event. /// @@ -129,10 +121,7 @@ impl Reaper { static REAPER: OnceCell = OnceCell::new(); REAPER.get_or_init_blocking(|| Reaper { - sigchld: Event::new(), - zombies: Mutex::new(Vec::new()), - pipe: Pipe::new().expect("cannot create SIGCHLD pipe"), - driver_guard: AsyncMutex::new(()), + sys: reaper::Reaper::new(), drivers: AtomicUsize::new(0), child_count: AtomicUsize::new(0), }) @@ -165,8 +154,8 @@ impl Reaper { .spawn(move || { let driver = async move { // No need to bump self.drivers, it was already bumped in ensure_driven. - let guard = self.driver_guard.lock().await; - self.reap(guard).await + let guard = self.sys.lock().await; + self.sys.reap(guard).await }; #[cfg(unix)] @@ -179,146 +168,24 @@ impl Reaper { } /// Reap zombie processes forever. - async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! { - loop { - // Wait for the next SIGCHLD signal. - self.pipe.wait().await; - - // Notify all listeners waiting on the SIGCHLD event. - self.sigchld.notify(std::usize::MAX); - - // Reap zombie processes, but make sure we don't hold onto the lock for too long! - let mut zombies = mem::take(&mut *self.zombies.lock().unwrap()); - let mut i = 0; - 'reap_zombies: loop { - for _ in 0..50 { - if i >= zombies.len() { - break 'reap_zombies; - } - - if let Ok(None) = zombies[i].try_wait() { - i += 1; - } else { - zombies.swap_remove(i); - } - } - - // Be a good citizen; yield if there are a lot of processes. - // - // After we yield, check if there are more zombie processes. - future::yield_now().await; - zombies.append(&mut self.zombies.lock().unwrap()); - } - - // Put zombie processes back. - self.zombies.lock().unwrap().append(&mut zombies); - } + async fn reap(&'static self, driver_guard: reaper::Lock) -> ! { + self.sys.reap(driver_guard).await } /// Register a process with this reaper. - fn register(&'static self, child: &std::process::Child) -> io::Result<()> { + fn register(&'static self, child: std::process::Child) -> io::Result { self.ensure_driven(); - self.pipe.register(child) + self.sys.register(child) } } cfg_if::cfg_if! { if #[cfg(windows)] { - use async_channel::{Sender, Receiver, bounded}; - use std::ffi::c_void; - use std::os::windows::io::AsRawHandle; - - use windows_sys::Win32::{ - Foundation::{BOOLEAN, HANDLE}, - System::Threading::{ - RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE, - }, - }; - - /// Waits for the next SIGCHLD signal. - struct Pipe { - /// The sender channel for the SIGCHLD signal. - sender: Sender<()>, - - /// The receiver channel for the SIGCHLD signal. - receiver: Receiver<()>, - } - - impl Pipe { - /// Creates a new pipe. - fn new() -> io::Result { - let (sender, receiver) = bounded(1); - Ok(Pipe { - sender, - receiver - }) - } - - /// Waits for the next SIGCHLD signal. - async fn wait(&self) { - self.receiver.recv().await.ok(); - } - - /// Register a process object into this pipe. - fn register(&self, child: &std::process::Child) -> io::Result<()> { - // Called when a child exits. - unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) { - Reaper::get().pipe.sender.try_send(()).ok(); - } - - // Register this child process to invoke `callback` on exit. - let mut wait_object = 0; - let ret = unsafe { - RegisterWaitForSingleObject( - &mut wait_object, - child.as_raw_handle() as HANDLE, - Some(callback), - std::ptr::null_mut(), - INFINITE, - WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE, - ) - }; - - if ret == 0 { - Err(io::Error::last_os_error()) - } else { - Ok(()) - } - } - } - // Wraps a sync I/O type into an async I/O type. fn wrap(io: T) -> io::Result> { Ok(Unblock::new(io)) } } else if #[cfg(unix)] { - use async_signal::{Signal, Signals}; - - /// Waits for the next SIGCHLD signal. - struct Pipe { - /// The iterator over SIGCHLD signals. - signals: Signals, - } - - impl Pipe { - /// Creates a new pipe. - fn new() -> io::Result { - Ok(Pipe { - signals: Signals::new(Some(Signal::Child))?, - }) - } - - /// Waits for the next SIGCHLD signal. - async fn wait(&self) { - (&self.signals).next().await; - } - - /// Register a process object into this pipe. - fn register(&self, _child: &std::process::Child) -> io::Result<()> { - Ok(()) - } - } - /// Wrap a file descriptor into a non-blocking I/O type. fn wrap(io: T) -> io::Result> { Async::new(io) @@ -328,7 +195,7 @@ cfg_if::cfg_if! { /// A guard that can kill child processes, or push them into the zombie list. struct ChildGuard { - inner: Option, + inner: reaper::ChildGuard, reap_on_drop: bool, kill_on_drop: bool, reaper: &'static Reaper, @@ -336,7 +203,7 @@ struct ChildGuard { impl ChildGuard { fn get_mut(&mut self) -> &mut std::process::Child { - self.inner.as_mut().unwrap() + self.inner.get_mut() } } @@ -347,10 +214,7 @@ impl Drop for ChildGuard { self.get_mut().kill().ok(); } if self.reap_on_drop { - let mut zombies = self.reaper.zombies.lock().unwrap(); - if let Ok(None) = self.get_mut().try_wait() { - zombies.push(self.inner.take().unwrap()); - } + self.inner.reap(&self.reaper.sys); } // Decrement number of children. @@ -409,14 +273,14 @@ impl Child { reaper.child_count.fetch_add(1, Ordering::Relaxed); // Register the child process in the global list. - reaper.register(&child)?; + let inner = reaper.register(child)?; Ok(Child { stdin, stdout, stderr, child: Arc::new(Mutex::new(ChildGuard { - inner: Some(child), + inner, reap_on_drop: cmd.reap_on_drop, kill_on_drop: cmd.kill_on_drop, reaper, @@ -509,25 +373,7 @@ impl Child { self.stdin.take(); let child = self.child.clone(); - async move { - loop { - // Wait on the child process. - if let Some(status) = child.lock().unwrap().get_mut().try_wait()? { - return Ok(status); - } - - // Start listening. - event_listener::listener!(Reaper::get().sigchld => listener); - - // Try again. - if let Some(status) = child.lock().unwrap().get_mut().try_wait()? { - return Ok(status); - } - - // Wait on the listener. - listener.await; - } - } + async move { Reaper::get().sys.status(&child).await } } /// Drops the stdin handle and collects the output of the process. @@ -896,19 +742,14 @@ pub fn driver() -> impl Future + Send + 'static { // If this was the last driver, and there are still resources actively using the // reaper, make sure that there is a thread driving the reaper. if prev_count == 1 - && reaper.child_count.load(Ordering::SeqCst) > 0 - && !reaper - .zombies - .lock() - .unwrap_or_else(|x| x.into_inner()) - .is_empty() + && (reaper.child_count.load(Ordering::SeqCst) > 0 || reaper.sys.has_zombies()) { reaper.ensure_driven(); } }); // Acquire the reaper lock and start polling the SIGCHLD event. - let guard = reaper.driver_guard.lock().await; + let guard = reaper.sys.lock().await; reaper.reap(guard).await } } diff --git a/src/reaper/signal.rs b/src/reaper/signal.rs new file mode 100644 index 0000000..557a0ab --- /dev/null +++ b/src/reaper/signal.rs @@ -0,0 +1,234 @@ +//! A version of the reaper that waits for a signal to check for process progress. + +use async_lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}; +use event_listener::Event; +use futures_lite::future; + +use std::io; +use std::mem; +use std::sync::Mutex; + +pub(crate) type Lock = AsyncMutexGuard<'static, ()>; + +/// The zombie process reaper. +pub(crate) struct Reaper { + /// An event delivered every time the SIGCHLD signal occurs. + sigchld: Event, + + /// The list of zombie processes. + zombies: Mutex>, + + /// The pipe that delivers signal notifications. + pipe: Pipe, + + /// Locking this mutex indicates that we are polling the SIGCHLD event. + driver_guard: AsyncMutex<()>, +} + +impl Reaper { + /// Create a new reaper. + pub(crate) fn new() -> Self { + Reaper { + sigchld: Event::new(), + zombies: Mutex::new(Vec::new()), + pipe: Pipe::new().expect("cannot create SIGCHLD pipe"), + driver_guard: AsyncMutex::new(()), + } + } + + /// Lock the driver thread. + pub(crate) async fn lock(&self) -> AsyncMutexGuard<'_, ()> { + self.driver_guard.lock().await + } + + /// Reap zombie processes forever. + pub(crate) async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! { + loop { + // Wait for the next SIGCHLD signal. + self.pipe.wait().await; + + // Notify all listeners waiting on the SIGCHLD event. + self.sigchld.notify(std::usize::MAX); + + // Reap zombie processes, but make sure we don't hold onto the lock for too long! + let mut zombies = mem::take(&mut *self.zombies.lock().unwrap()); + let mut i = 0; + 'reap_zombies: loop { + for _ in 0..50 { + if i >= zombies.len() { + break 'reap_zombies; + } + + if let Ok(None) = zombies[i].try_wait() { + i += 1; + } else { + zombies.swap_remove(i); + } + } + + // Be a good citizen; yield if there are a lot of processes. + // + // After we yield, check if there are more zombie processes. + future::yield_now().await; + zombies.append(&mut self.zombies.lock().unwrap()); + } + + // Put zombie processes back. + self.zombies.lock().unwrap().append(&mut zombies); + } + } + + /// Register a process with this reaper. + pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result { + self.pipe.register(&child)?; + Ok(ChildGuard { inner: Some(child) }) + } + + /// Wait for an event to occur for a child process. + pub(crate) async fn status( + &'static self, + child: &Mutex, + ) -> io::Result { + loop { + // Wait on the child process. + if let Some(status) = child.lock().unwrap().get_mut().try_wait()? { + return Ok(status); + } + + // Start listening. + event_listener::listener!(self.sigchld => listener); + + // Try again. + if let Some(status) = child.lock().unwrap().get_mut().try_wait()? { + return Ok(status); + } + + // Wait on the listener. + listener.await; + } + } + + /// Do we have any registered zombie processes? + pub(crate) fn has_zombies(&'static self) -> bool { + !self + .zombies + .lock() + .unwrap_or_else(|x| x.into_inner()) + .is_empty() + } +} + +/// The wrapper around the child. +pub(crate) struct ChildGuard { + inner: Option, +} + +impl ChildGuard { + /// Get a mutable reference to the inner child. + pub(crate) fn get_mut(&mut self) -> &mut std::process::Child { + self.inner.as_mut().unwrap() + } + + /// Begin the reaping process for this child. + pub(crate) fn reap(&mut self, reaper: &'static Reaper) { + let mut zombies = reaper.zombies.lock().unwrap(); + if let Ok(None) = self.get_mut().try_wait() { + zombies.push(self.inner.take().unwrap()); + } + } +} + +cfg_if::cfg_if! { + if #[cfg(windows)] { + use async_channel::{Sender, Receiver, bounded}; + use std::ffi::c_void; + use std::os::windows::io::AsRawHandle; + + use windows_sys::Win32::{ + Foundation::{BOOLEAN, HANDLE}, + System::Threading::{ + RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE, + }, + }; + + /// Waits for the next SIGCHLD signal. + struct Pipe { + /// The sender channel for the SIGCHLD signal. + sender: Sender<()>, + + /// The receiver channel for the SIGCHLD signal. + receiver: Receiver<()>, + } + + impl Pipe { + /// Creates a new pipe. + fn new() -> io::Result { + let (sender, receiver) = bounded(1); + Ok(Pipe { + sender, + receiver + }) + } + + /// Waits for the next SIGCHLD signal. + async fn wait(&self) { + self.receiver.recv().await.ok(); + } + + /// Register a process object into this pipe. + fn register(&self, child: &std::process::Child) -> io::Result<()> { + // Called when a child exits. + unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) { + crate::Reaper::get().sys.pipe.sender.try_send(()).ok(); + } + + // Register this child process to invoke `callback` on exit. + let mut wait_object = 0; + let ret = unsafe { + RegisterWaitForSingleObject( + &mut wait_object, + child.as_raw_handle() as HANDLE, + Some(callback), + std::ptr::null_mut(), + INFINITE, + WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE, + ) + }; + + if ret == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } + } + } + } else if #[cfg(unix)] { + use async_signal::{Signal, Signals}; + use futures_lite::prelude::*; + + /// Waits for the next SIGCHLD signal. + struct Pipe { + /// The iterator over SIGCHLD signals. + signals: Signals, + } + + impl Pipe { + /// Creates a new pipe. + fn new() -> io::Result { + Ok(Pipe { + signals: Signals::new(Some(Signal::Child))?, + }) + } + + /// Waits for the next SIGCHLD signal. + async fn wait(&self) { + (&self.signals).next().await; + } + + /// Register a process object into this pipe. + fn register(&self, _child: &std::process::Child) -> io::Result<()> { + Ok(()) + } + } + } +}