Ensure that the reaper thread exists in all case

This commit is contained in:
jtnunley 2022-12-31 13:47:21 -08:00
parent 7242f1b5df
commit 2250643838
1 changed files with 38 additions and 38 deletions

View File

@ -90,8 +90,8 @@ mod sealed {
pub trait Sealed {}
}
/// The reactor that is used to poll child processes.
struct Reactor {
/// The reaper that cleans up "zombie" processes.
struct Reaper {
/// The event that is signalled every time the SIGCHLD signal occurs.
sigchld: Event,
@ -107,16 +107,16 @@ struct Reactor {
polling_guard: AsyncMutex<()>,
}
impl Reactor {
impl Reaper {
/// Get a reference to the global reactor.
fn get() -> &'static Reactor {
static REACTOR: OnceCell<Reactor> = OnceCell::new();
fn get() -> &'static Reaper {
static REACTOR: OnceCell<Reaper> = OnceCell::new();
REACTOR.get_or_init_blocking(|| {
let sigchld = Event::new();
let pipe = Pipe::new().expect("cannot set signal handler for SIGCHLD");
Reactor {
Reaper {
sigchld,
zombies: Mutex::new(Vec::new()),
pipe,
@ -131,10 +131,7 @@ impl Reactor {
}
/// Push a new "zombie" process into the reactor.
fn push_zombie(&'static self, mut child: std::process::Child) {
// Spawn the "async-process" thread if it hasn't been spawned yet.
self.spawn_thread();
fn push_zombie(&self, mut child: std::process::Child) {
let mut zombies = self.zombies.lock().unwrap();
// If the child process has already exited, then we don't need to push it into the list of zombies.
@ -144,7 +141,7 @@ impl Reactor {
}
/// Poll the reactor for "zombie" processes.
async fn poll(&self, _guard: MutexGuard<'_, ()>) -> ! {
async fn reap(&self, _guard: MutexGuard<'_, ()>) -> ! {
loop {
// Wait for the next SIGCHLD signal.
self.pipe.wait().await;
@ -182,22 +179,22 @@ impl Reactor {
}
// Yield to avoid starving other tasks.
Some(i + 1)
Some(i)
}
/// Spawn a backup thread to poll the reactor.
fn spawn_thread(&'static self) {
/// Spawn a backup thread to poll the reactor if it isn't being driven already.
fn ensure_driven(&'static self) {
// Check to see if no one else is polling the reactor.
if let Some(guard) = self.polling_guard.try_lock() {
// If no one else is polling the reactor, then spawn a backup thread to poll it.
thread::Builder::new()
.name("async-process".to_string())
.spawn(move || {
#[cfg(not(windows))]
async_io::block_on(self.poll(guard));
#[cfg(unix)]
async_io::block_on(self.reap(guard));
#[cfg(windows)]
future::block_on(self.poll(guard));
future::block_on(self.reap(guard));
})
.expect("cannot spawn async-process thread");
}
@ -240,7 +237,7 @@ cfg_if::cfg_if! {
// Called when a child exits.
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
Reactor::get().pipe.event.notify_additional(1);
Reaper::get().pipe.event.notify_additional(1);
}
// Register this child process to invoke `callback` on exit.
@ -284,7 +281,7 @@ cfg_if::cfg_if! {
writer
)?;
// Register the reader end of the pipe to be signalled on SIGCHLD.
// Register the reader end of the pipe into the `async-io` reactor.
Async::new(reader).map(|reader| Pipe { reader })
}
@ -354,15 +351,15 @@ cfg_if::cfg_if! {
pub async fn cleanup_zombies<R>(f: impl Future<Output = R>) -> R {
// A future that cleans up zombie processes.
let cleanup = async {
// Acquire a lock on the reactor.
let guard = Reactor::get().polling_guard.lock().await;
// Acquire a lock on the reaper.
let guard = Reaper::get().polling_guard.lock().await;
// Poll the reactor.
Reactor::get().poll(guard).await
// Poll the reaper.
Reaper::get().reap(guard).await
};
// Run these futures in parallel.
futures_lite::future::or(f, cleanup).await
future::or(f, cleanup).await
}
/// A guard that can kill child processes, or push them into the zombie list.
@ -378,6 +375,18 @@ impl ChildGuard {
}
}
// When the last reference to the child process is dropped, push it into the zombie list.
impl Drop for ChildGuard {
fn drop(&mut self) {
if self.kill_on_drop {
self.get_mut().kill().ok();
}
if self.reap_on_drop {
Reaper::get().push_zombie(self.inner.take().unwrap());
}
}
}
/// A spawned child process.
///
/// The process can be in running or exited state. Use [`status()`][`Child::status()`] or
@ -410,18 +419,6 @@ pub struct Child {
child: Arc<Mutex<ChildGuard>>,
}
// When the last reference to the child process is dropped, push it into the zombie list.
impl Drop for ChildGuard {
fn drop(&mut self) {
if self.kill_on_drop {
self.get_mut().kill().ok();
}
if self.reap_on_drop {
Reactor::get().push_zombie(self.inner.take().unwrap());
}
}
}
impl Child {
/// Wraps the inner child process handle and registers it in the global process list.
///
@ -430,6 +427,9 @@ impl Child {
fn new(cmd: &mut Command) -> io::Result<Child> {
let mut child = cmd.inner.spawn()?;
// Make sure that some form of `SIGCHLD` processing is running.
Reaper::get().ensure_driven();
// Convert sync I/O types into async I/O types.
let stdin = child.stdin.take().map(wrap).transpose()?.map(ChildStdin);
let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout);
@ -451,7 +451,7 @@ impl Child {
}
// Register the child process in the global process list.
Reactor::get().register(&child)?;
Reaper::get().register(&child)?;
Ok(Child {
stdin,
@ -557,7 +557,7 @@ impl Child {
return Ok(status);
}
match listener.take() {
None => listener = Some(Reactor::get().sigchld.listen()),
None => listener = Some(Reaper::get().sigchld.listen()),
Some(listener) => listener.await,
}
}