feat: Add a way to run without the async-process thread

I know I said that I wouldn't add any more features, but I
think this is important enough.

Right now, a thread called "async-process" is responsible for listening
for SIGCHLD and reaping zombie processes. This listens for the SIGCHLD
signal in Unix and uses a channel connected to the waitable handle on
Windows. While this works, we can do better. Through async-signal, the
signal was already asynchronous on Unix; we were already just using
async_io::block_on to wait on the signal. After swapping out the channel
used on Windows with async-channel, the process reaping function "reap"
can be reimplemented as a fully asynchronous future.

From here we must make sure this future is being polled at all times. To
facilitate this, a function named "driver()" is added to the public API.
This future acquires a lock on the reaper structure and calls the
"reap()" future indefinitely. Multiple drivers can be created at once;
they will just wait forever on this lock. This future is intended to be
spawned onto an executor and left to run forever, making sure all child
processes are signalled whenever necessary. If no tasks are running the
driver future, the "async-process" thread is spawned and runs the
"reap()" future itself.

I've added the following controls to make sure that this system is
robust:

- If a "driver" task is dropped, another "driver" task will acquire the
  lock and keep the reaper active.
- Before being dropped, the task checks to see if it is the last driver.
  If it is, it will spawn the "async-process" thread to be the driver.
- When a Child is being created, it checks if there are any active
  drivers. If there are none, it spawns the "async-process" thread
  itself.
- One concern is that the driver future wil try to spawn the
  "async-process" thread as the application exits and the task is being
  dropped, which will be unnecessary and lead to slower shutdowns. To
  prevent this, the future checks to see if there are any extant `Child`
  instances (a new refcount is added to Reaper to facilitate this). If
  there are none, and if there are no zombie processes, it does not
  spawn the additional thread.
- Someone can still `mem::forget()` the driver thread. This does not
  lead to undefined behavior and just leads to processes being left
  dangling. At this point they're asking for wacky behavior.

This strategy might also be viable for `async-io`, if we want to try to
avoid needing to spawn the additional thread there as well.

Closes #7
cc smol-rs/async-io#40

Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
John Nunley 2023-10-10 17:47:46 -07:00 committed by GitHub
parent 9f9351bc52
commit f733a83c22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 328 additions and 57 deletions

View File

@ -26,6 +26,7 @@ async-signal = "0.2.3"
rustix = { version = "0.38", default-features = false, features = ["std", "fs"] }
[target.'cfg(windows)'.dependencies]
async-channel = "1.9.0"
blocking = "1.0.0"
[target.'cfg(windows)'.dependencies.windows-sys]
@ -37,4 +38,5 @@ features = [
]
[dev-dependencies]
async-executor = "1.5.1"
async-io = "1.8"

View File

@ -56,10 +56,13 @@
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
use std::convert::Infallible;
use std::ffi::OsStr;
use std::fmt;
use std::mem;
use std::path::Path;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::thread;
@ -72,7 +75,7 @@ use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
#[cfg(windows)]
use blocking::Unblock;
use async_lock::OnceCell;
use async_lock::{Mutex as AsyncMutex, OnceCell};
use event_listener::{Event, EventListener};
use futures_lite::{future, io, prelude::*};
@ -88,6 +91,10 @@ mod sealed {
pub trait Sealed {}
}
#[cfg(test)]
static DRIVER_THREAD_SPAWNED: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(false);
/// The zombie process reaper.
///
/// This structure reaps zombie processes and emits the `SIGCHLD` signal.
@ -100,6 +107,20 @@ struct Reaper {
/// The pipe that delivers signal notifications.
pipe: Pipe,
/// Locking this mutex indicates that we are polling the SIGCHLD event.
driver_guard: AsyncMutex<()>,
/// The number of tasks polling the SIGCHLD event.
///
/// If this is zero, the `async-process` thread must be spawned.
drivers: AtomicUsize,
/// Number of live `Child` instances currently running.
///
/// This is used to prevent the reaper thread from being spawned right as the program closes,
/// when the reaper thread isn't needed. This represents the number of active processes.
child_count: AtomicUsize,
}
impl Reaper {
@ -107,53 +128,105 @@ impl Reaper {
fn get() -> &'static Self {
static REAPER: OnceCell<Reaper> = OnceCell::new();
REAPER.get_or_init_blocking(|| {
thread::Builder::new()
.name("async-process".to_string())
.spawn(|| REAPER.wait_blocking().reap())
.expect("cannot spawn async-process thread");
Reaper {
sigchld: Event::new(),
zombies: Mutex::new(Vec::new()),
pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
}
REAPER.get_or_init_blocking(|| Reaper {
sigchld: Event::new(),
zombies: Mutex::new(Vec::new()),
pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
driver_guard: AsyncMutex::new(()),
drivers: AtomicUsize::new(0),
child_count: AtomicUsize::new(0),
})
}
/// Ensure that the reaper is driven.
///
/// If there are no active `driver()` callers, this will spawn the `async-process` thread.
#[inline]
fn ensure_driven(&'static self) {
if self
.drivers
.compare_exchange(0, 1, Ordering::SeqCst, Ordering::Acquire)
.is_ok()
{
self.start_driver_thread();
}
}
/// Start the `async-process` thread.
#[cold]
fn start_driver_thread(&'static self) {
#[cfg(test)]
DRIVER_THREAD_SPAWNED
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.unwrap_or_else(|_| unreachable!("Driver thread already spawned"));
thread::Builder::new()
.name("async-process".to_string())
.spawn(move || {
let driver = async move {
// No need to bump self.drivers, it was already bumped in ensure_driven.
let guard = self.driver_guard.lock().await;
self.reap(guard).await
};
#[cfg(unix)]
async_io::block_on(driver);
#[cfg(not(unix))]
future::block_on(driver);
})
.expect("cannot spawn async-process thread");
}
/// Reap zombie processes forever.
fn reap(&'static self) -> ! {
async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
loop {
// Wait for the next SIGCHLD signal.
self.pipe.wait();
self.pipe.wait().await;
// Notify all listeners waiting on the SIGCHLD event.
self.sigchld.notify(std::usize::MAX);
// Reap zombie processes.
let mut zombies = self.zombies.lock().unwrap();
// Reap zombie processes, but make sure we don't hold onto the lock for too long!
let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
let mut i = 0;
while i < zombies.len() {
if let Ok(None) = zombies[i].try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
'reap_zombies: loop {
for _ in 0..50 {
if i >= zombies.len() {
break 'reap_zombies;
}
if let Ok(None) = zombies[i].try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
}
}
// Be a good citizen; yield if there are a lot of processes.
//
// After we yield, check if there are more zombie processes.
future::yield_now().await;
zombies.append(&mut self.zombies.lock().unwrap());
}
// Put zombie processes back.
self.zombies.lock().unwrap().append(&mut zombies);
}
}
/// Register a process with this reaper.
fn register(&'static self, child: &std::process::Child) -> io::Result<()> {
self.ensure_driven();
self.pipe.register(child)
}
}
cfg_if::cfg_if! {
if #[cfg(windows)] {
use async_channel::{Sender, Receiver, bounded};
use std::ffi::c_void;
use std::os::windows::io::AsRawHandle;
use std::sync::mpsc;
use windows_sys::Win32::{
Foundation::{BOOLEAN, HANDLE},
@ -165,25 +238,25 @@ cfg_if::cfg_if! {
/// Waits for the next SIGCHLD signal.
struct Pipe {
/// The sender channel for the SIGCHLD signal.
sender: mpsc::SyncSender<()>,
sender: Sender<()>,
/// The receiver channel for the SIGCHLD signal.
receiver: Mutex<mpsc::Receiver<()>>,
receiver: Receiver<()>,
}
impl Pipe {
/// Creates a new pipe.
fn new() -> io::Result<Pipe> {
let (sender, receiver) = mpsc::sync_channel(1);
let (sender, receiver) = bounded(1);
Ok(Pipe {
sender,
receiver: Mutex::new(receiver),
receiver
})
}
/// Waits for the next SIGCHLD signal.
fn wait(&self) {
self.receiver.lock().unwrap().recv().ok();
async fn wait(&self) {
self.receiver.recv().await.ok();
}
/// Register a process object into this pipe.
@ -236,8 +309,8 @@ cfg_if::cfg_if! {
}
/// Waits for the next SIGCHLD signal.
fn wait(&self) {
async_io::block_on((&self.signals).next());
async fn wait(&self) {
(&self.signals).next().await;
}
/// Register a process object into this pipe.
@ -258,6 +331,7 @@ struct ChildGuard {
inner: Option<std::process::Child>,
reap_on_drop: bool,
kill_on_drop: bool,
reaper: &'static Reaper,
}
impl ChildGuard {
@ -273,11 +347,14 @@ impl Drop for ChildGuard {
self.get_mut().kill().ok();
}
if self.reap_on_drop {
let mut zombies = Reaper::get().zombies.lock().unwrap();
let mut zombies = self.reaper.zombies.lock().unwrap();
if let Ok(None) = self.get_mut().try_wait() {
zombies.push(self.inner.take().unwrap());
}
}
// Decrement number of children.
self.reaper.child_count.fetch_sub(1, Ordering::Acquire);
}
}
@ -328,6 +405,9 @@ impl Child {
let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout);
let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr);
// Bump the child count.
reaper.child_count.fetch_add(1, Ordering::Relaxed);
// Register the child process in the global list.
reaper.register(&child)?;
@ -339,6 +419,7 @@ impl Child {
inner: Some(child),
reap_on_drop: cmd.reap_on_drop,
kill_on_drop: cmd.kill_on_drop,
reaper,
})),
})
}
@ -758,6 +839,80 @@ impl TryFrom<ChildStderr> for OwnedFd {
}
}
/// Runs the driver for the asynchronous processes.
///
/// This future takes control of global structures related to driving [`Child`]ren and reaping
/// zombie processes. These responsibilities include listening for the `SIGCHLD` signal and
/// making sure zombie processes are successfully waited on.
///
/// If multiple tasks run `driver()` at once, only one will actually drive the reaper; the other
/// ones will just sleep. If a task that is driving the reaper is dropped, a previously sleeping
/// task will take over. If all tasks driving the reaper are dropped, the "async-process" thread
/// will be spawned. The "async-process" thread just blocks on this future and will automatically
/// be spawned if no tasks are driving the reaper once a [`Child`] is created.
///
/// This future will never complete. It is intended to be ran on a background task in your
/// executor of choice.
///
/// # Examples
///
/// ```no_run
/// use async_executor::Executor;
/// use async_process::{driver, Command};
///
/// # futures_lite::future::block_on(async {
/// // Create an executor and run on it.
/// let ex = Executor::new();
/// ex.run(async {
/// // Run the driver future in the background.
/// ex.spawn(driver()).detach();
///
/// // Run a command.
/// Command::new("ls").output().await.ok();
/// }).await;
/// # });
/// ```
#[inline]
pub fn driver() -> impl Future<Output = Infallible> + Send + 'static {
struct CallOnDrop<F: FnMut()>(F);
impl<F: FnMut()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}
async {
// Get the reaper.
let reaper = Reaper::get();
// Make sure the reaper knows we're driving it.
reaper.drivers.fetch_add(1, Ordering::SeqCst);
// Decrement the driver count when this future is dropped.
let _guard = CallOnDrop(|| {
let prev_count = reaper.drivers.fetch_sub(1, Ordering::SeqCst);
// If this was the last driver, and there are still resources actively using the
// reaper, make sure that there is a thread driving the reaper.
if prev_count == 1
&& reaper.child_count.load(Ordering::SeqCst) > 0
&& !reaper
.zombies
.lock()
.unwrap_or_else(|x| x.into_inner())
.is_empty()
{
reaper.ensure_driven();
}
});
// Acquire the reaper lock and start polling the SIGCHLD event.
let guard = reaper.driver_guard.lock().await;
reaper.reap(guard).await
}
}
/// A builder for spawning processes.
///
/// # Examples
@ -1152,41 +1307,100 @@ fn blocking_fd(fd: rustix::fd::BorrowedFd<'_>) -> io::Result<()> {
Ok(())
}
#[cfg(unix)]
#[cfg(test)]
mod test {
#[test]
fn test_into_inner() {
futures_lite::future::block_on(async {
use crate::Command;
fn polled_driver() {
use super::{driver, Command};
use futures_lite::future;
use futures_lite::prelude::*;
use std::io::Result;
use std::process::Stdio;
use std::str::from_utf8;
let is_thread_spawned =
|| super::DRIVER_THREAD_SPAWNED.load(std::sync::atomic::Ordering::SeqCst);
use futures_lite::AsyncReadExt;
#[cfg(unix)]
fn command() -> Command {
let mut cmd = Command::new("sh");
cmd.arg("-c").arg("echo hello");
cmd
}
let mut ls_child = Command::new("cat")
.arg("Cargo.toml")
.stdout(Stdio::piped())
.spawn()?;
#[cfg(windows)]
fn command() -> Command {
let mut cmd = Command::new("cmd");
cmd.arg("/C").arg("echo hello");
cmd
}
let stdio: Stdio = ls_child.stdout.take().unwrap().into_stdio().await?;
#[cfg(unix)]
const OUTPUT: &[u8] = b"hello\n";
#[cfg(windows)]
const OUTPUT: &[u8] = b"hello\r\n";
let mut echo_child = Command::new("grep")
.arg("async")
.stdin(stdio)
.stdout(Stdio::piped())
.spawn()?;
future::block_on(async {
// Thread should not be spawned off the bat.
assert!(!is_thread_spawned());
let mut buf = vec![];
let mut stdout = echo_child.stdout.take().unwrap();
// Spawn a driver.
let mut driver1 = Box::pin(driver());
future::poll_once(&mut driver1).await;
assert!(!is_thread_spawned());
stdout.read_to_end(&mut buf).await?;
dbg!(from_utf8(&buf).unwrap_or(""));
// We should be able to run the driver in parallel with a process future.
async {
(&mut driver1).await;
}
.or(async {
let output = command().output().await.unwrap();
assert_eq!(output.stdout, OUTPUT);
})
.await;
assert!(!is_thread_spawned());
Result::Ok(())
})
.unwrap();
// Spawn a second driver.
let mut driver2 = Box::pin(driver());
future::poll_once(&mut driver2).await;
assert!(!is_thread_spawned());
// Poll both drivers in parallel.
async {
(&mut driver1).await;
}
.or(async {
(&mut driver2).await;
})
.or(async {
let output = command().output().await.unwrap();
assert_eq!(output.stdout, OUTPUT);
})
.await;
assert!(!is_thread_spawned());
// Once one is dropped, the other should take over.
drop(driver1);
assert!(!is_thread_spawned());
// Poll driver2 in parallel with a process future.
async {
(&mut driver2).await;
}
.or(async {
let output = command().output().await.unwrap();
assert_eq!(output.stdout, OUTPUT);
})
.await;
assert!(!is_thread_spawned());
// Once driver2 is dropped, the thread should not be spawned, as there are no active
// child processes..
drop(driver2);
assert!(!is_thread_spawned());
// We should now be able to poll the process future independently, it will spawn the
// thread.
let output = command().output().await.unwrap();
assert_eq!(output.stdout, OUTPUT);
assert!(is_thread_spawned());
});
}
}

View File

@ -21,6 +21,25 @@ fn smoke() {
})
}
#[test]
fn smoke_driven() {
future::block_on(
async {
async_process::driver().await;
}
.or(async {
let p = if cfg!(target_os = "windows") {
Command::new("cmd").args(["/C", "exit 0"]).spawn()
} else {
Command::new("true").spawn()
};
assert!(p.is_ok());
let mut p = p.unwrap();
assert!(p.status().await.unwrap().success());
}),
)
}
#[test]
fn smoke_failure() {
assert!(Command::new("if-this-is-a-binary-then-the-world-has-ended")
@ -428,3 +447,39 @@ fn test_spawn_multiple_with_stdio() {
assert_eq!(out2.stderr, b"bar\n");
});
}
#[cfg(unix)]
#[test]
fn test_into_inner() {
futures_lite::future::block_on(async {
use crate::Command;
use std::io::Result;
use std::process::Stdio;
use std::str::from_utf8;
use futures_lite::AsyncReadExt;
let mut ls_child = Command::new("cat")
.arg("Cargo.toml")
.stdout(Stdio::piped())
.spawn()?;
let stdio: Stdio = ls_child.stdout.take().unwrap().into_stdio().await?;
let mut echo_child = Command::new("grep")
.arg("async")
.stdin(stdio)
.stdout(Stdio::piped())
.spawn()?;
let mut buf = vec![];
let mut stdout = echo_child.stdout.take().unwrap();
stdout.read_to_end(&mut buf).await?;
dbg!(from_utf8(&buf).unwrap_or(""));
Result::Ok(())
})
.unwrap();
}