diff --git a/Cargo.toml b/Cargo.toml index 8934640..65845f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ async-signal = "0.2.3" rustix = { version = "0.38", default-features = false, features = ["std", "fs"] } [target.'cfg(windows)'.dependencies] +async-channel = "1.9.0" blocking = "1.0.0" [target.'cfg(windows)'.dependencies.windows-sys] @@ -37,4 +38,5 @@ features = [ ] [dev-dependencies] +async-executor = "1.5.1" async-io = "1.8" diff --git a/src/lib.rs b/src/lib.rs index 66973f0..19b094a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,10 +56,13 @@ html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] +use std::convert::Infallible; use std::ffi::OsStr; use std::fmt; +use std::mem; use std::path::Path; use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use std::thread; @@ -72,7 +75,7 @@ use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd}; #[cfg(windows)] use blocking::Unblock; -use async_lock::OnceCell; +use async_lock::{Mutex as AsyncMutex, OnceCell}; use event_listener::{Event, EventListener}; use futures_lite::{future, io, prelude::*}; @@ -88,6 +91,10 @@ mod sealed { pub trait Sealed {} } +#[cfg(test)] +static DRIVER_THREAD_SPAWNED: std::sync::atomic::AtomicBool = + std::sync::atomic::AtomicBool::new(false); + /// The zombie process reaper. /// /// This structure reaps zombie processes and emits the `SIGCHLD` signal. @@ -100,6 +107,20 @@ struct Reaper { /// The pipe that delivers signal notifications. pipe: Pipe, + + /// Locking this mutex indicates that we are polling the SIGCHLD event. + driver_guard: AsyncMutex<()>, + + /// The number of tasks polling the SIGCHLD event. + /// + /// If this is zero, the `async-process` thread must be spawned. + drivers: AtomicUsize, + + /// Number of live `Child` instances currently running. + /// + /// This is used to prevent the reaper thread from being spawned right as the program closes, + /// when the reaper thread isn't needed. This represents the number of active processes. + child_count: AtomicUsize, } impl Reaper { @@ -107,53 +128,105 @@ impl Reaper { fn get() -> &'static Self { static REAPER: OnceCell = OnceCell::new(); - REAPER.get_or_init_blocking(|| { - thread::Builder::new() - .name("async-process".to_string()) - .spawn(|| REAPER.wait_blocking().reap()) - .expect("cannot spawn async-process thread"); - - Reaper { - sigchld: Event::new(), - zombies: Mutex::new(Vec::new()), - pipe: Pipe::new().expect("cannot create SIGCHLD pipe"), - } + REAPER.get_or_init_blocking(|| Reaper { + sigchld: Event::new(), + zombies: Mutex::new(Vec::new()), + pipe: Pipe::new().expect("cannot create SIGCHLD pipe"), + driver_guard: AsyncMutex::new(()), + drivers: AtomicUsize::new(0), + child_count: AtomicUsize::new(0), }) } + /// Ensure that the reaper is driven. + /// + /// If there are no active `driver()` callers, this will spawn the `async-process` thread. + #[inline] + fn ensure_driven(&'static self) { + if self + .drivers + .compare_exchange(0, 1, Ordering::SeqCst, Ordering::Acquire) + .is_ok() + { + self.start_driver_thread(); + } + } + + /// Start the `async-process` thread. + #[cold] + fn start_driver_thread(&'static self) { + #[cfg(test)] + DRIVER_THREAD_SPAWNED + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .unwrap_or_else(|_| unreachable!("Driver thread already spawned")); + + thread::Builder::new() + .name("async-process".to_string()) + .spawn(move || { + let driver = async move { + // No need to bump self.drivers, it was already bumped in ensure_driven. + let guard = self.driver_guard.lock().await; + self.reap(guard).await + }; + + #[cfg(unix)] + async_io::block_on(driver); + + #[cfg(not(unix))] + future::block_on(driver); + }) + .expect("cannot spawn async-process thread"); + } + /// Reap zombie processes forever. - fn reap(&'static self) -> ! { + async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! { loop { // Wait for the next SIGCHLD signal. - self.pipe.wait(); + self.pipe.wait().await; // Notify all listeners waiting on the SIGCHLD event. self.sigchld.notify(std::usize::MAX); - // Reap zombie processes. - let mut zombies = self.zombies.lock().unwrap(); + // Reap zombie processes, but make sure we don't hold onto the lock for too long! + let mut zombies = mem::take(&mut *self.zombies.lock().unwrap()); let mut i = 0; - while i < zombies.len() { - if let Ok(None) = zombies[i].try_wait() { - i += 1; - } else { - zombies.swap_remove(i); + 'reap_zombies: loop { + for _ in 0..50 { + if i >= zombies.len() { + break 'reap_zombies; + } + + if let Ok(None) = zombies[i].try_wait() { + i += 1; + } else { + zombies.swap_remove(i); + } } + + // Be a good citizen; yield if there are a lot of processes. + // + // After we yield, check if there are more zombie processes. + future::yield_now().await; + zombies.append(&mut self.zombies.lock().unwrap()); } + + // Put zombie processes back. + self.zombies.lock().unwrap().append(&mut zombies); } } /// Register a process with this reaper. fn register(&'static self, child: &std::process::Child) -> io::Result<()> { + self.ensure_driven(); self.pipe.register(child) } } cfg_if::cfg_if! { if #[cfg(windows)] { + use async_channel::{Sender, Receiver, bounded}; use std::ffi::c_void; use std::os::windows::io::AsRawHandle; - use std::sync::mpsc; use windows_sys::Win32::{ Foundation::{BOOLEAN, HANDLE}, @@ -165,25 +238,25 @@ cfg_if::cfg_if! { /// Waits for the next SIGCHLD signal. struct Pipe { /// The sender channel for the SIGCHLD signal. - sender: mpsc::SyncSender<()>, + sender: Sender<()>, /// The receiver channel for the SIGCHLD signal. - receiver: Mutex>, + receiver: Receiver<()>, } impl Pipe { /// Creates a new pipe. fn new() -> io::Result { - let (sender, receiver) = mpsc::sync_channel(1); + let (sender, receiver) = bounded(1); Ok(Pipe { sender, - receiver: Mutex::new(receiver), + receiver }) } /// Waits for the next SIGCHLD signal. - fn wait(&self) { - self.receiver.lock().unwrap().recv().ok(); + async fn wait(&self) { + self.receiver.recv().await.ok(); } /// Register a process object into this pipe. @@ -236,8 +309,8 @@ cfg_if::cfg_if! { } /// Waits for the next SIGCHLD signal. - fn wait(&self) { - async_io::block_on((&self.signals).next()); + async fn wait(&self) { + (&self.signals).next().await; } /// Register a process object into this pipe. @@ -258,6 +331,7 @@ struct ChildGuard { inner: Option, reap_on_drop: bool, kill_on_drop: bool, + reaper: &'static Reaper, } impl ChildGuard { @@ -273,11 +347,14 @@ impl Drop for ChildGuard { self.get_mut().kill().ok(); } if self.reap_on_drop { - let mut zombies = Reaper::get().zombies.lock().unwrap(); + let mut zombies = self.reaper.zombies.lock().unwrap(); if let Ok(None) = self.get_mut().try_wait() { zombies.push(self.inner.take().unwrap()); } } + + // Decrement number of children. + self.reaper.child_count.fetch_sub(1, Ordering::Acquire); } } @@ -328,6 +405,9 @@ impl Child { let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout); let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr); + // Bump the child count. + reaper.child_count.fetch_add(1, Ordering::Relaxed); + // Register the child process in the global list. reaper.register(&child)?; @@ -339,6 +419,7 @@ impl Child { inner: Some(child), reap_on_drop: cmd.reap_on_drop, kill_on_drop: cmd.kill_on_drop, + reaper, })), }) } @@ -758,6 +839,80 @@ impl TryFrom for OwnedFd { } } +/// Runs the driver for the asynchronous processes. +/// +/// This future takes control of global structures related to driving [`Child`]ren and reaping +/// zombie processes. These responsibilities include listening for the `SIGCHLD` signal and +/// making sure zombie processes are successfully waited on. +/// +/// If multiple tasks run `driver()` at once, only one will actually drive the reaper; the other +/// ones will just sleep. If a task that is driving the reaper is dropped, a previously sleeping +/// task will take over. If all tasks driving the reaper are dropped, the "async-process" thread +/// will be spawned. The "async-process" thread just blocks on this future and will automatically +/// be spawned if no tasks are driving the reaper once a [`Child`] is created. +/// +/// This future will never complete. It is intended to be ran on a background task in your +/// executor of choice. +/// +/// # Examples +/// +/// ```no_run +/// use async_executor::Executor; +/// use async_process::{driver, Command}; +/// +/// # futures_lite::future::block_on(async { +/// // Create an executor and run on it. +/// let ex = Executor::new(); +/// ex.run(async { +/// // Run the driver future in the background. +/// ex.spawn(driver()).detach(); +/// +/// // Run a command. +/// Command::new("ls").output().await.ok(); +/// }).await; +/// # }); +/// ``` +#[inline] +pub fn driver() -> impl Future + Send + 'static { + struct CallOnDrop(F); + + impl Drop for CallOnDrop { + fn drop(&mut self) { + (self.0)(); + } + } + + async { + // Get the reaper. + let reaper = Reaper::get(); + + // Make sure the reaper knows we're driving it. + reaper.drivers.fetch_add(1, Ordering::SeqCst); + + // Decrement the driver count when this future is dropped. + let _guard = CallOnDrop(|| { + let prev_count = reaper.drivers.fetch_sub(1, Ordering::SeqCst); + + // If this was the last driver, and there are still resources actively using the + // reaper, make sure that there is a thread driving the reaper. + if prev_count == 1 + && reaper.child_count.load(Ordering::SeqCst) > 0 + && !reaper + .zombies + .lock() + .unwrap_or_else(|x| x.into_inner()) + .is_empty() + { + reaper.ensure_driven(); + } + }); + + // Acquire the reaper lock and start polling the SIGCHLD event. + let guard = reaper.driver_guard.lock().await; + reaper.reap(guard).await + } +} + /// A builder for spawning processes. /// /// # Examples @@ -1152,41 +1307,100 @@ fn blocking_fd(fd: rustix::fd::BorrowedFd<'_>) -> io::Result<()> { Ok(()) } -#[cfg(unix)] +#[cfg(test)] mod test { - #[test] - fn test_into_inner() { - futures_lite::future::block_on(async { - use crate::Command; + fn polled_driver() { + use super::{driver, Command}; + use futures_lite::future; + use futures_lite::prelude::*; - use std::io::Result; - use std::process::Stdio; - use std::str::from_utf8; + let is_thread_spawned = + || super::DRIVER_THREAD_SPAWNED.load(std::sync::atomic::Ordering::SeqCst); - use futures_lite::AsyncReadExt; + #[cfg(unix)] + fn command() -> Command { + let mut cmd = Command::new("sh"); + cmd.arg("-c").arg("echo hello"); + cmd + } - let mut ls_child = Command::new("cat") - .arg("Cargo.toml") - .stdout(Stdio::piped()) - .spawn()?; + #[cfg(windows)] + fn command() -> Command { + let mut cmd = Command::new("cmd"); + cmd.arg("/C").arg("echo hello"); + cmd + } - let stdio: Stdio = ls_child.stdout.take().unwrap().into_stdio().await?; + #[cfg(unix)] + const OUTPUT: &[u8] = b"hello\n"; + #[cfg(windows)] + const OUTPUT: &[u8] = b"hello\r\n"; - let mut echo_child = Command::new("grep") - .arg("async") - .stdin(stdio) - .stdout(Stdio::piped()) - .spawn()?; + future::block_on(async { + // Thread should not be spawned off the bat. + assert!(!is_thread_spawned()); - let mut buf = vec![]; - let mut stdout = echo_child.stdout.take().unwrap(); + // Spawn a driver. + let mut driver1 = Box::pin(driver()); + future::poll_once(&mut driver1).await; + assert!(!is_thread_spawned()); - stdout.read_to_end(&mut buf).await?; - dbg!(from_utf8(&buf).unwrap_or("")); + // We should be able to run the driver in parallel with a process future. + async { + (&mut driver1).await; + } + .or(async { + let output = command().output().await.unwrap(); + assert_eq!(output.stdout, OUTPUT); + }) + .await; + assert!(!is_thread_spawned()); - Result::Ok(()) - }) - .unwrap(); + // Spawn a second driver. + let mut driver2 = Box::pin(driver()); + future::poll_once(&mut driver2).await; + assert!(!is_thread_spawned()); + + // Poll both drivers in parallel. + async { + (&mut driver1).await; + } + .or(async { + (&mut driver2).await; + }) + .or(async { + let output = command().output().await.unwrap(); + assert_eq!(output.stdout, OUTPUT); + }) + .await; + assert!(!is_thread_spawned()); + + // Once one is dropped, the other should take over. + drop(driver1); + assert!(!is_thread_spawned()); + + // Poll driver2 in parallel with a process future. + async { + (&mut driver2).await; + } + .or(async { + let output = command().output().await.unwrap(); + assert_eq!(output.stdout, OUTPUT); + }) + .await; + assert!(!is_thread_spawned()); + + // Once driver2 is dropped, the thread should not be spawned, as there are no active + // child processes.. + drop(driver2); + assert!(!is_thread_spawned()); + + // We should now be able to poll the process future independently, it will spawn the + // thread. + let output = command().output().await.unwrap(); + assert_eq!(output.stdout, OUTPUT); + assert!(is_thread_spawned()); + }); } } diff --git a/tests/std.rs b/tests/std.rs index 91c794e..0665fe0 100644 --- a/tests/std.rs +++ b/tests/std.rs @@ -21,6 +21,25 @@ fn smoke() { }) } +#[test] +fn smoke_driven() { + future::block_on( + async { + async_process::driver().await; + } + .or(async { + let p = if cfg!(target_os = "windows") { + Command::new("cmd").args(["/C", "exit 0"]).spawn() + } else { + Command::new("true").spawn() + }; + assert!(p.is_ok()); + let mut p = p.unwrap(); + assert!(p.status().await.unwrap().success()); + }), + ) +} + #[test] fn smoke_failure() { assert!(Command::new("if-this-is-a-binary-then-the-world-has-ended") @@ -428,3 +447,39 @@ fn test_spawn_multiple_with_stdio() { assert_eq!(out2.stderr, b"bar\n"); }); } + +#[cfg(unix)] +#[test] +fn test_into_inner() { + futures_lite::future::block_on(async { + use crate::Command; + + use std::io::Result; + use std::process::Stdio; + use std::str::from_utf8; + + use futures_lite::AsyncReadExt; + + let mut ls_child = Command::new("cat") + .arg("Cargo.toml") + .stdout(Stdio::piped()) + .spawn()?; + + let stdio: Stdio = ls_child.stdout.take().unwrap().into_stdio().await?; + + let mut echo_child = Command::new("grep") + .arg("async") + .stdin(stdio) + .stdout(Stdio::piped()) + .spawn()?; + + let mut buf = vec![]; + let mut stdout = echo_child.stdout.take().unwrap(); + + stdout.read_to_end(&mut buf).await?; + dbg!(from_utf8(&buf).unwrap_or("")); + + Result::Ok(()) + }) + .unwrap(); +}