diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 345cbd1..6ea6b04 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,6 +43,9 @@ jobs: if: startsWith(matrix.rust, 'nightly') run: cargo check -Z features=dev_dep - run: cargo test + - run: cargo test + env: + RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg async_process_force_signal_backend msrv: runs-on: ${{ matrix.os }} diff --git a/Cargo.toml b/Cargo.toml index 15e8efd..fa85114 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ tracing = { version = "0.1.40", default-features = false } [target.'cfg(unix)'.dependencies] async-io = "2.1.0" +async-signal = "0.2.3" rustix = { version = "0.38", default-features = false, features = ["std", "fs"] } [target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies] @@ -30,7 +31,6 @@ async-channel = "2.0.0" async-task = "4.7.0" [target.'cfg(all(unix, not(any(target_os = "linux", target_os = "android"))))'.dependencies] -async-signal = "0.2.3" rustix = { version = "0.38", default-features = false, features = ["std", "fs", "process"] } [target.'cfg(windows)'.dependencies] diff --git a/src/lib.rs b/src/lib.rs index 86ada29..d9eb9dd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,24 +80,13 @@ use futures_lite::{future, io, prelude::*}; #[doc(no_inline)] pub use std::process::{ExitStatus, Output, Stdio}; -cfg_if::cfg_if! { - if #[cfg(any( - target_os = "linux", - target_os = "android" - ))] { - #[path = "reaper/wait.rs"] - mod reaper; - } else { - #[path = "reaper/signal.rs"] - mod reaper; - } -} - #[cfg(unix)] pub mod unix; #[cfg(windows)] pub mod windows; +mod reaper; + mod sealed { pub trait Sealed {} } @@ -177,11 +166,6 @@ impl Reaper { .expect("cannot spawn async-process thread"); } - /// Reap zombie processes forever. - async fn reap(&'static self, driver_guard: reaper::Lock) -> ! { - self.sys.reap(driver_guard).await - } - /// Register a process with this reaper. fn register(&'static self, child: std::process::Child) -> io::Result { self.ensure_driven(); @@ -728,16 +712,9 @@ impl TryFrom for OwnedFd { /// }).await; /// # }); /// ``` +#[allow(clippy::manual_async_fn)] #[inline] pub fn driver() -> impl Future + Send + 'static { - struct CallOnDrop(F); - - impl Drop for CallOnDrop { - fn drop(&mut self) { - (self.0)(); - } - } - async { // Get the reaper. let reaper = Reaper::get(); @@ -760,7 +737,7 @@ pub fn driver() -> impl Future + Send + 'static { // Acquire the reaper lock and start polling the SIGCHLD event. let guard = reaper.sys.lock().await; - reaper.reap(guard).await + reaper.sys.reap(guard).await } } @@ -1158,6 +1135,14 @@ fn blocking_fd(fd: rustix::fd::BorrowedFd<'_>) -> io::Result<()> { Ok(()) } +struct CallOnDrop(F); + +impl Drop for CallOnDrop { + fn drop(&mut self) { + (self.0)(); + } +} + #[cfg(test)] mod test { #[test] diff --git a/src/reaper/mod.rs b/src/reaper/mod.rs new file mode 100644 index 0000000..4255e0c --- /dev/null +++ b/src/reaper/mod.rs @@ -0,0 +1,221 @@ +//! The underlying system reaper. +//! +//! There are two backends: +//! +//! - signal, which waits for SIGCHLD. +//! - wait, which waits directly on a process handle. +//! +//! "wait" is preferred, but is not available on all supported Linuxes. So we +//! test to see if pidfd is supported first. If it is, we use wait. If not, we use +//! signal. + +#![allow(irrefutable_let_patterns)] + +/// Enable the waiting reaper. +#[cfg(any(target_os = "linux", target_os = "android"))] +macro_rules! cfg_wait { + ($($tt:tt)*) => {$($tt)*}; +} + +/// Enable the waiting reaper. +#[cfg(not(any(target_os = "linux", target_os = "android")))] +macro_rules! cfg_wait { + ($($tt:tt)*) => {}; +} + +/// Enable signals. +macro_rules! cfg_signal { + ($($tt:tt)*) => {$($tt)*}; +} + +cfg_wait! { + mod wait; +} + +cfg_signal! { + mod signal; +} + +use std::io; +use std::sync::Mutex; + +/// The underlying system reaper. +pub(crate) enum Reaper { + #[cfg(any(target_os = "linux", target_os = "android"))] + /// The reaper based on the wait backend. + Wait(wait::Reaper), + + /// The reaper based on the signal backend. + Signal(signal::Reaper), +} + +/// The wrapper around a child. +pub(crate) enum ChildGuard { + #[cfg(any(target_os = "linux", target_os = "android"))] + /// The child guard based on the wait backend. + Wait(wait::ChildGuard), + + /// The child guard based on the signal backend. + Signal(signal::ChildGuard), +} + +/// A lock on the reaper. +pub(crate) enum Lock { + #[cfg(any(target_os = "linux", target_os = "android"))] + /// The wait-based reaper needs no lock. + Wait, + + /// The lock for the signal-based reaper. + Signal(signal::Lock), +} + +impl Reaper { + /// Create a new reaper. + pub(crate) fn new() -> Self { + cfg_wait! { + if wait::available() && !cfg!(async_process_force_signal_backend) { + return Self::Wait(wait::Reaper::new()); + } + } + + // Return the signal-based reaper. + cfg_signal! { + return Self::Signal(signal::Reaper::new()); + } + + #[allow(unreachable_code)] + { + panic!("neither the signal backend nor the waiter backend is available") + } + } + + /// Lock the driver thread. + /// + /// This makes it so only one thread can reap at once. + pub(crate) async fn lock(&'static self) -> Lock { + cfg_wait! { + if let Self::Wait(_this) = self { + // No locking needed. + return Lock::Wait; + } + } + + cfg_signal! { + if let Self::Signal(this) = self { + // We need to lock. + return Lock::Signal(this.lock().await); + } + } + + unreachable!() + } + + /// Reap zombie processes forever. + pub(crate) async fn reap(&'static self, lock: Lock) -> ! { + cfg_wait! { + if let (Self::Wait(this), Lock::Wait) = (self, &lock) { + return this.reap().await; + } + } + + cfg_signal! { + if let (Self::Signal(this), Lock::Signal(lock)) = (self, lock) { + return this.reap(lock).await; + } + } + + unreachable!() + } + + /// Register a child into this reaper. + pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result { + cfg_wait! { + if let Self::Wait(this) = self { + return this.register(child).map(ChildGuard::Wait); + } + } + + cfg_signal! { + if let Self::Signal(this) = self { + return this.register(child).map(ChildGuard::Signal); + } + } + + unreachable!() + } + + /// Wait for the inner child to complete. + pub(crate) async fn status( + &'static self, + child: &Mutex, + ) -> io::Result { + cfg_wait! { + if let Self::Wait(this) = self { + return this.status(child).await; + } + } + + cfg_signal! { + if let Self::Signal(this) = self { + return this.status(child).await; + } + } + + unreachable!() + } + + /// Do we have any registered zombie processes? + pub(crate) fn has_zombies(&'static self) -> bool { + cfg_wait! { + if let Self::Wait(this) = self { + return this.has_zombies(); + } + } + + cfg_signal! { + if let Self::Signal(this) = self { + return this.has_zombies(); + } + } + + unreachable!() + } +} + +impl ChildGuard { + /// Get a reference to the inner process. + pub(crate) fn get_mut(&mut self) -> &mut std::process::Child { + cfg_wait! { + if let Self::Wait(this) = self { + return this.get_mut(); + } + } + + cfg_signal! { + if let Self::Signal(this) = self { + return this.get_mut(); + } + } + + unreachable!() + } + + /// Start reaping this child process. + pub(crate) fn reap(&mut self, reaper: &'static Reaper) { + cfg_wait! { + if let (Self::Wait(this), Reaper::Wait(reaper)) = (&mut *self, reaper) { + this.reap(reaper); + return; + } + } + + cfg_signal! { + if let (Self::Signal(this), Reaper::Signal(reaper)) = (self, reaper) { + this.reap(reaper); + return; + } + } + + unreachable!() + } +} diff --git a/src/reaper/signal.rs b/src/reaper/signal.rs index 557a0ab..5532c00 100644 --- a/src/reaper/signal.rs +++ b/src/reaper/signal.rs @@ -131,9 +131,8 @@ impl ChildGuard { /// Begin the reaping process for this child. pub(crate) fn reap(&mut self, reaper: &'static Reaper) { - let mut zombies = reaper.zombies.lock().unwrap(); if let Ok(None) = self.get_mut().try_wait() { - zombies.push(self.inner.take().unwrap()); + reaper.zombies.lock().unwrap().push(self.inner.take().unwrap()); } } } @@ -178,8 +177,13 @@ cfg_if::cfg_if! { /// Register a process object into this pipe. fn register(&self, child: &std::process::Child) -> io::Result<()> { // Called when a child exits. + #[allow(clippy::infallible_destructuring_match)] unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) { - crate::Reaper::get().sys.pipe.sender.try_send(()).ok(); + let reaper = match &crate::Reaper::get().sys { + super::Reaper::Signal(reaper) => reaper, + }; + + reaper.pipe.sender.try_send(()).ok(); } // Register this child process to invoke `callback` on exit. diff --git a/src/reaper/wait.rs b/src/reaper/wait.rs index 0a064c1..00f6b36 100644 --- a/src/reaper/wait.rs +++ b/src/reaper/wait.rs @@ -13,8 +13,6 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Mutex; use std::task::{Context, Poll}; -pub(crate) type Lock = (); - /// The zombie process reaper. pub(crate) struct Reaper { /// The channel for sending new runnables. @@ -38,14 +36,8 @@ impl Reaper { } } - /// "Lock" the driver thread. - /// - /// Since multiple threads can drive the reactor at once, there is no need to - /// actually lock anything. So this function only exists for symmetry. - pub(crate) async fn lock(&self) {} - /// Reap zombie processes forever. - pub(crate) async fn reap(&'static self, _: ()) -> ! { + pub(crate) async fn reap(&'static self) -> ! { loop { // Fetch the next task. let task = match self.recv.recv().await { @@ -71,15 +63,17 @@ impl Reaper { child: &Mutex, ) -> io::Result { future::poll_fn(|cx| { - // Lock the child and poll it once. - child - .lock() - .unwrap() - .inner - .inner - .as_mut() - .unwrap() - .poll_wait(cx) + // Lock the child. + let mut child = child.lock().unwrap(); + + // Get the inner child value. + let inner = match &mut child.inner { + super::ChildGuard::Wait(inner) => inner, + _ => unreachable!() + }; + + // Poll for the next value. + inner.inner.as_mut().unwrap().poll_wait(cx) }) .await } @@ -103,14 +97,6 @@ impl ChildGuard { /// Begin the reaping process for this child. pub(crate) fn reap(&mut self, reaper: &'static Reaper) { - struct CallOnDrop(F); - - impl Drop for CallOnDrop { - fn drop(&mut self) { - (self.0)(); - } - } - // Create a future for polling this child. let future = { let mut inner = self.inner.take().unwrap(); @@ -119,19 +105,19 @@ impl ChildGuard { reaper.zombies.fetch_add(1, Ordering::Relaxed); // Decrement the zombie count once we are done. - let _guard = CallOnDrop(|| { + let _guard = crate::CallOnDrop(|| { reaper.zombies.fetch_sub(1, Ordering::SeqCst); }); // Wait on this child forever. let result = future::poll_fn(|cx| inner.poll_wait(cx)).await; if let Err(e) = result { - tracing::error!("error while polling zombie process: {}", e); + tracing::error!("error while polling zombie process: {}", e); } } }; - // Create a future for scheduling this future. + // Create a function for scheduling this future. let schedule = move |runnable| { reaper.sender.try_send(runnable).ok(); }; @@ -186,5 +172,17 @@ cfg_if::cfg_if! { } } } + + /// Tell if we are able to use this backend. + pub(crate) fn available() -> bool { + // Create a Pidfd for the current process and see if it works. + let result = process::pidfd_open( + process::getpid(), + process::PidfdFlags::empty() + ); + + // Tell if it was okay or not. + result.is_ok() + } } }