From 2250643838ccaf20e2022d987118f04dd2e32aa5 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Sat, 31 Dec 2022 13:47:21 -0800 Subject: [PATCH] Ensure that the reaper thread exists in all case --- src/lib.rs | 76 +++++++++++++++++++++++++++--------------------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 847d974..b59116b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -90,8 +90,8 @@ mod sealed { pub trait Sealed {} } -/// The reactor that is used to poll child processes. -struct Reactor { +/// The reaper that cleans up "zombie" processes. +struct Reaper { /// The event that is signalled every time the SIGCHLD signal occurs. sigchld: Event, @@ -107,16 +107,16 @@ struct Reactor { polling_guard: AsyncMutex<()>, } -impl Reactor { +impl Reaper { /// Get a reference to the global reactor. - fn get() -> &'static Reactor { - static REACTOR: OnceCell = OnceCell::new(); + fn get() -> &'static Reaper { + static REACTOR: OnceCell = OnceCell::new(); REACTOR.get_or_init_blocking(|| { let sigchld = Event::new(); let pipe = Pipe::new().expect("cannot set signal handler for SIGCHLD"); - Reactor { + Reaper { sigchld, zombies: Mutex::new(Vec::new()), pipe, @@ -131,10 +131,7 @@ impl Reactor { } /// Push a new "zombie" process into the reactor. - fn push_zombie(&'static self, mut child: std::process::Child) { - // Spawn the "async-process" thread if it hasn't been spawned yet. - self.spawn_thread(); - + fn push_zombie(&self, mut child: std::process::Child) { let mut zombies = self.zombies.lock().unwrap(); // If the child process has already exited, then we don't need to push it into the list of zombies. @@ -144,7 +141,7 @@ impl Reactor { } /// Poll the reactor for "zombie" processes. - async fn poll(&self, _guard: MutexGuard<'_, ()>) -> ! { + async fn reap(&self, _guard: MutexGuard<'_, ()>) -> ! { loop { // Wait for the next SIGCHLD signal. self.pipe.wait().await; @@ -182,22 +179,22 @@ impl Reactor { } // Yield to avoid starving other tasks. - Some(i + 1) + Some(i) } - /// Spawn a backup thread to poll the reactor. - fn spawn_thread(&'static self) { + /// Spawn a backup thread to poll the reactor if it isn't being driven already. + fn ensure_driven(&'static self) { // Check to see if no one else is polling the reactor. if let Some(guard) = self.polling_guard.try_lock() { // If no one else is polling the reactor, then spawn a backup thread to poll it. thread::Builder::new() .name("async-process".to_string()) .spawn(move || { - #[cfg(not(windows))] - async_io::block_on(self.poll(guard)); + #[cfg(unix)] + async_io::block_on(self.reap(guard)); #[cfg(windows)] - future::block_on(self.poll(guard)); + future::block_on(self.reap(guard)); }) .expect("cannot spawn async-process thread"); } @@ -240,7 +237,7 @@ cfg_if::cfg_if! { // Called when a child exits. unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) { - Reactor::get().pipe.event.notify_additional(1); + Reaper::get().pipe.event.notify_additional(1); } // Register this child process to invoke `callback` on exit. @@ -284,7 +281,7 @@ cfg_if::cfg_if! { writer )?; - // Register the reader end of the pipe to be signalled on SIGCHLD. + // Register the reader end of the pipe into the `async-io` reactor. Async::new(reader).map(|reader| Pipe { reader }) } @@ -354,15 +351,15 @@ cfg_if::cfg_if! { pub async fn cleanup_zombies(f: impl Future) -> R { // A future that cleans up zombie processes. let cleanup = async { - // Acquire a lock on the reactor. - let guard = Reactor::get().polling_guard.lock().await; + // Acquire a lock on the reaper. + let guard = Reaper::get().polling_guard.lock().await; - // Poll the reactor. - Reactor::get().poll(guard).await + // Poll the reaper. + Reaper::get().reap(guard).await }; // Run these futures in parallel. - futures_lite::future::or(f, cleanup).await + future::or(f, cleanup).await } /// A guard that can kill child processes, or push them into the zombie list. @@ -378,6 +375,18 @@ impl ChildGuard { } } +// When the last reference to the child process is dropped, push it into the zombie list. +impl Drop for ChildGuard { + fn drop(&mut self) { + if self.kill_on_drop { + self.get_mut().kill().ok(); + } + if self.reap_on_drop { + Reaper::get().push_zombie(self.inner.take().unwrap()); + } + } +} + /// A spawned child process. /// /// The process can be in running or exited state. Use [`status()`][`Child::status()`] or @@ -410,18 +419,6 @@ pub struct Child { child: Arc>, } -// When the last reference to the child process is dropped, push it into the zombie list. -impl Drop for ChildGuard { - fn drop(&mut self) { - if self.kill_on_drop { - self.get_mut().kill().ok(); - } - if self.reap_on_drop { - Reactor::get().push_zombie(self.inner.take().unwrap()); - } - } -} - impl Child { /// Wraps the inner child process handle and registers it in the global process list. /// @@ -430,6 +427,9 @@ impl Child { fn new(cmd: &mut Command) -> io::Result { let mut child = cmd.inner.spawn()?; + // Make sure that some form of `SIGCHLD` processing is running. + Reaper::get().ensure_driven(); + // Convert sync I/O types into async I/O types. let stdin = child.stdin.take().map(wrap).transpose()?.map(ChildStdin); let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout); @@ -451,7 +451,7 @@ impl Child { } // Register the child process in the global process list. - Reactor::get().register(&child)?; + Reaper::get().register(&child)?; Ok(Child { stdin, @@ -557,7 +557,7 @@ impl Child { return Ok(status); } match listener.take() { - None => listener = Some(Reactor::get().sigchld.listen()), + None => listener = Some(Reaper::get().sigchld.listen()), Some(listener) => listener.await, } }