feat: Add a cleanup_zombies future

This commit is contained in:
jtnunley 2022-12-30 18:43:14 -08:00 committed by notgull
parent e086897e53
commit ae9fb34132
4 changed files with 317 additions and 115 deletions

View File

@ -29,7 +29,6 @@ rustix = { version = "0.36", default-features = false, features = ["std", "fs"]
[target.'cfg(unix)'.dependencies.signal-hook]
version = "0.3.0"
features = ["iterator"]
default-features = false
[target.'cfg(windows)'.dependencies]
@ -43,3 +42,6 @@ features = [
"Win32_System_Threading",
"Win32_System_WindowsProgramming"
]
[dev-dependencies]
async-executor = "1.5.0"

View File

@ -20,6 +20,10 @@ spawned child processes to exit and then calls the `wait()` syscall to clean up
processes. This is unlike the `process` API in the standard library, where dropping a running
`Child` leaks its resources.
However, note that you can reap zombie processes without spawning the "async-process" thread
by calling the `cleanup_zombies` method. The "async-process" method is therefore only spawned
if no other thread calls `cleanup_zombies`.
This crate uses [`async-io`] for async I/O on Unix-like systems and [`blocking`] for async I/O
on Windows.

View File

@ -9,6 +9,10 @@
//! processes. This is unlike the `process` API in the standard library, where dropping a running
//! `Child` leaks its resources.
//!
//! However, note that you can reap zombie processes without spawning the "async-process" thread
//! by calling the [`cleanup_zombies`] method. The "async-process" method is therefore only spawned
//! if no other thread calls [`cleanup_zombies`].
//!
//! This crate uses [`async-io`] for async I/O on Unix-like systems and [`blocking`] for async I/O
//! on Windows.
//!
@ -70,7 +74,7 @@ use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(windows)]
use blocking::Unblock;
use async_lock::OnceCell;
use async_lock::{Mutex as AsyncMutex, MutexGuard, OnceCell};
use event_listener::Event;
use futures_lite::{future, io, prelude::*};
@ -86,8 +90,280 @@ mod sealed {
pub trait Sealed {}
}
/// An event delivered every time the SIGCHLD signal occurs.
static SIGCHLD: Event = Event::new();
/// The reactor that is used to poll child processes.
struct Reactor {
/// The event that is signalled every time the SIGCHLD signal occurs.
sigchld: Event,
/// The list of "zombie" processes that have exited but have not been waited for.
zombies: Mutex<Vec<std::process::Child>>,
/// A pipe that signals the "async-process" thread when a new process is spawned.
pipe: Pipe,
/// The guard used to poll this reactor.
///
/// This is used to ensure that the reactor is only polled by a single thread at a time.
polling_guard: AsyncMutex<()>,
}
impl Reactor {
/// Get a reference to the global reactor.
fn get() -> &'static Reactor {
static REACTOR: OnceCell<Reactor> = OnceCell::new();
REACTOR.get_or_init_blocking(|| {
let sigchld = Event::new();
let pipe = Pipe::new().expect("cannot set signal handler for SIGCHLD");
Reactor {
sigchld,
zombies: Mutex::new(Vec::new()),
pipe,
polling_guard: AsyncMutex::new(()),
}
})
}
/// Register a child process in the reactor.
fn register(&self, child: &std::process::Child) -> io::Result<()> {
self.pipe.register(child)
}
/// Push a new "zombie" process into the reactor.
fn push_zombie(&'static self, mut child: std::process::Child) {
// Spawn the "async-process" thread if it hasn't been spawned yet.
self.spawn_thread();
let mut zombies = self.zombies.lock().unwrap();
// If the child process has already exited, then we don't need to push it into the list of zombies.
if let Ok(None) = child.try_wait() {
zombies.push(child);
}
}
/// Poll the reactor for "zombie" processes.
async fn poll(&self, _guard: MutexGuard<'_, ()>) -> ! {
loop {
// Wait for the next SIGCHLD signal.
self.pipe.wait().await;
// Notify all listeners waiting on the SIGCHLD event.
self.sigchld.notify(std::usize::MAX);
// Reap zombie processes.
let mut start = 0;
while let Some(new_start) = self.process_zombies(start) {
start = new_start;
future::yield_now().await;
}
}
}
/// Wait on zombie processes.
///
/// Returns `Some(index to start at)` if we handled too many in one go.
fn process_zombies(&self, start: usize) -> Option<usize> {
let mut i = start;
let mut zombies = self.zombies.lock().unwrap();
// Only process a limited number of zombies at a time.
for _ in 0..100 {
// Get the zombie process.
let zombie = zombies.get_mut(i)?;
// Try to wait on it. If it's done, remove it from the list.
if let Ok(None) = zombie.try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
}
}
// Yield to avoid starving other tasks.
Some(i + 1)
}
/// Spawn a backup thread to poll the reactor.
fn spawn_thread(&'static self) {
// Check to see if no one else is polling the reactor.
if let Some(guard) = self.polling_guard.try_lock() {
// If no one else is polling the reactor, then spawn a backup thread to poll it.
thread::Builder::new()
.name("async-process".to_string())
.spawn(move || {
#[cfg(not(windows))]
async_io::block_on(self.poll(guard));
#[cfg(windows)]
future::block_on(self.poll(guard));
})
.expect("cannot spawn async-process thread");
}
}
}
cfg_if::cfg_if! {
if #[cfg(windows)] {
/// A callback that simulates a SIGCHLD signal on Windows.
struct Pipe {
/// The event that is signalled to simulate a SIGCHLD signal.
event: Event,
}
impl Pipe {
/// Create a new pipe.
fn new() -> io::Result<Self> {
Ok(Pipe {
event: Event::new(),
})
}
/// Waits for the SIGCHLD signal.
async fn wait(&self) {
self.event.listen().await;
}
/// Registers a child process in this pipe.
fn register(&self, child: &std::process::Child) -> io::Result<()> {
use std::ffi::c_void;
use std::os::windows::io::AsRawHandle;
// Called when a child exits.
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
Reactor::get().pipe.event.notify_additional(1);
}
use windows_sys::Win32::{
System::{
Threading::{RegisterWaitForSingleObject, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE},
WindowsProgramming::INFINITE,
},
Foundation::{BOOLEAN, HANDLE},
};
// Register this child process to invoke `callback` on exit.
let mut wait_object = 0;
let ret = unsafe {
RegisterWaitForSingleObject(
&mut wait_object,
child.as_raw_handle() as HANDLE,
Some(callback),
std::ptr::null_mut(),
INFINITE,
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
)
};
if ret == 0 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
}
} else if #[cfg(unix)] {
use std::os::unix::net::UnixStream;
/// A pipe that waits on the `SIGCHLD` signal.
struct Pipe {
/// The read end of the pipe.
reader: Async<UnixStream>,
}
impl Pipe {
/// Create a new pipe.
fn new() -> io::Result<Self> {
// Create a pipe.
let (reader, writer) = UnixStream::pair()?;
// Register the writer end of the pipe to be signalled on SIGCHLD.
signal_hook::low_level::pipe::register(
signal_hook::consts::SIGCHLD,
writer
)?;
// Register the reader end of the pipe to be signalled on SIGCHLD.
Async::new(reader).map(|reader| Pipe { reader })
}
/// Wait for the next `SIGCHLD` signal.
async fn wait(&self) {
// Wait for anything to be written to the pipe.
let mut buf = [0; 1];
(&self.reader).read_exact(&mut buf).await.ok();
}
/// Registers a child process in this pipe.
fn register(&self, _child: &std::process::Child) -> io::Result<()> {
// Do nothing.
Ok(())
}
}
}
}
/// Run the zombie process cleanup routine along with the provided future.
///
/// This avoids the need to spawn the "async-process" thread in order to reap zombie processes.
/// This waits for the `SIGCHLD` signal, and then reaps zombie processes. Only one thread can
/// wait on the `SIGCHLD` signal at a time, so if another thread is already waiting on the
/// signal, then it will just run the future without reaping zombie processes.
///
/// # Example
///
/// ```no_run
/// use async_process::{Command, cleanup_zombies};
///
/// # futures_lite::future::block_on(async {
/// // This will not spawn the "async-process" thread.
/// cleanup_zombies(async {
/// let out = Command::new("echo").arg("hello").arg("world").output().await?;
/// assert_eq!(out.stdout, b"hello world\n");
/// std::io::Result::Ok(())
/// }).await?;
/// # std::io::Result::Ok(()) });
/// ```
///
/// If you are using the [`async-executor`] crate, then you may want to spawn `cleanup_zombies`
/// as its own detached task.
///
/// [`async-executor`]: https://docs.rs/async-executor
///
/// ```no_run
/// use async_executor::Executor;
/// use async_process::{Command, cleanup_zombies};
/// use std::future::pending;
///
/// # futures_lite::future::block_on(async {
/// let executor = Executor::new();
///
/// // Spawn `cleanup_zombies` as a task.
/// let task = executor.spawn(cleanup_zombies(pending::<()>()));
/// task.detach();
///
/// // Run the executor with your own future.
/// executor.run(async {
/// let out = Command::new("echo").arg("hello").arg("world").output().await?;
/// assert_eq!(out.stdout, b"hello world\n");
/// std::io::Result::Ok(())
/// }).await?;
/// # std::io::Result::Ok(()) });
/// ```
pub async fn cleanup_zombies<R>(f: impl Future<Output = R>) -> R {
// A future that cleans up zombie processes.
let cleanup = async {
// Acquire a lock on the reactor.
let guard = Reactor::get().polling_guard.lock().await;
// Poll the reactor.
Reactor::get().poll(guard).await
};
// Run these futures in parallel.
futures_lite::future::or(f, cleanup).await
}
/// A guard that can kill child processes, or push them into the zombie list.
struct ChildGuard {
@ -134,6 +410,18 @@ pub struct Child {
child: Arc<Mutex<ChildGuard>>,
}
// When the last reference to the child process is dropped, push it into the zombie list.
impl Drop for ChildGuard {
fn drop(&mut self) {
if self.kill_on_drop {
self.get_mut().kill().ok();
}
if self.reap_on_drop {
Reactor::get().push_zombie(self.inner.take().unwrap());
}
}
}
impl Child {
/// Wraps the inner child process handle and registers it in the global process list.
///
@ -149,76 +437,12 @@ impl Child {
cfg_if::cfg_if! {
if #[cfg(windows)] {
use std::ffi::c_void;
use std::os::windows::io::AsRawHandle;
use std::sync::mpsc;
use windows_sys::Win32::{
System::{
Threading::{RegisterWaitForSingleObject, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE},
WindowsProgramming::INFINITE,
},
Foundation::{BOOLEAN, HANDLE},
};
// This channel is used to simulate SIGCHLD on Windows.
fn callback_channel() -> (&'static mpsc::SyncSender<()>, &'static Mutex<mpsc::Receiver<()>>) {
static CALLBACK: OnceCell<(mpsc::SyncSender<()>, Mutex<mpsc::Receiver<()>>)> =
OnceCell::new();
let (s, r) = CALLBACK.get_or_init_blocking(|| {
let (s, r) = mpsc::sync_channel(1);
(s, Mutex::new(r))
});
(s, r)
}
// Called when a child exits.
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
callback_channel().0.try_send(()).ok();
}
// Register this child process to invoke `callback` on exit.
let mut wait_object = 0;
let ret = unsafe {
RegisterWaitForSingleObject(
&mut wait_object,
child.as_raw_handle() as HANDLE,
Some(callback),
std::ptr::null_mut(),
INFINITE,
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
)
};
if ret == 0 {
return Err(io::Error::last_os_error());
}
// Waits for the next SIGCHLD signal.
fn wait_sigchld() {
callback_channel().1.lock().unwrap().recv().ok();
}
// Wraps a sync I/O type into an async I/O type.
fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
Ok(Unblock::new(io))
}
} else if #[cfg(unix)] {
static SIGNALS: OnceCell<Mutex<signal_hook::iterator::Signals>> = OnceCell::new();
// Make sure the signal handler is registered before interacting with the process.
SIGNALS.get_or_init_blocking(|| Mutex::new(
signal_hook::iterator::Signals::new(&[signal_hook::consts::SIGCHLD])
.expect("cannot set signal handler for SIGCHLD"),
));
// Waits for the next SIGCHLD signal.
fn wait_sigchld() {
SIGNALS.get().expect("Signals not registered").lock().unwrap().forever().next();
}
// Wraps a sync I/O type into an async I/O type.
fn wrap<T: std::os::unix::io::AsRawFd>(io: T) -> io::Result<Async<T>> {
Async::new(io)
@ -226,52 +450,8 @@ impl Child {
}
}
static ZOMBIES: OnceCell<Mutex<Vec<std::process::Child>>> = OnceCell::new();
// Make sure the thread is started.
ZOMBIES.get_or_init_blocking(|| {
// Start a thread that handles SIGCHLD and notifies tasks when child processes exit.
thread::Builder::new()
.name("async-process".to_string())
.spawn(move || {
loop {
// Wait for the next SIGCHLD signal.
wait_sigchld();
// Notify all listeners waiting on the SIGCHLD event.
SIGCHLD.notify(std::usize::MAX);
// Reap zombie processes.
let mut zombies = ZOMBIES.get().unwrap().lock().unwrap();
let mut i = 0;
while i < zombies.len() {
if let Ok(None) = zombies[i].try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
}
}
}
})
.expect("cannot spawn async-process thread");
Mutex::new(Vec::new())
});
// When the last reference to the child process is dropped, push it into the zombie list.
impl Drop for ChildGuard {
fn drop(&mut self) {
if self.kill_on_drop {
self.get_mut().kill().ok();
}
if self.reap_on_drop {
let mut zombies = ZOMBIES.get().unwrap().lock().unwrap();
if let Ok(None) = self.get_mut().try_wait() {
zombies.push(self.inner.take().unwrap());
}
}
}
}
// Register the child process in the global process list.
Reactor::get().register(&child)?;
Ok(Child {
stdin,
@ -377,7 +557,7 @@ impl Child {
return Ok(status);
}
match listener.take() {
None => listener = Some(SIGCHLD.listen()),
None => listener = Some(Reactor::get().sigchld.listen()),
Some(listener) => listener.await,
}
}

View File

@ -21,6 +21,22 @@ fn smoke() {
})
}
#[test]
fn smoke_with_driver() {
future::block_on({
async_process::cleanup_zombies(async {
let p = if cfg!(target_os = "windows") {
Command::new("cmd").args(&["/C", "exit 0"]).spawn()
} else {
Command::new("true").spawn()
};
assert!(p.is_ok());
let mut p = p.unwrap();
assert!(p.status().await.unwrap().success());
})
})
}
#[test]
fn smoke_failure() {
assert!(Command::new("if-this-is-a-binary-then-the-world-has-ended")