Compare commits
8 Commits
326e5fa811
...
263f21d042
Author | SHA1 | Date |
---|---|---|
Jesse Luehrs | 263f21d042 | |
John Nunley | 581c0a02c0 | |
John Nunley | 420c303921 | |
John Nunley | bbd42b56f8 | |
John Nunley | 1e0751fe65 | |
John Nunley | 02699a4f04 | |
Jesse Luehrs | 4438e61dac | |
Jesse Luehrs | 61e10f81be |
|
@ -43,6 +43,9 @@ jobs:
|
|||
if: startsWith(matrix.rust, 'nightly')
|
||||
run: cargo check -Z features=dev_dep
|
||||
- run: cargo test
|
||||
- run: cargo test
|
||||
env:
|
||||
RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg async_process_force_signal_backend
|
||||
|
||||
msrv:
|
||||
runs-on: ${{ matrix.os }}
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
# Version 2.2.0
|
||||
|
||||
- Port Linux to a new backend that tries to use `pidfd` if it is available. (#68)
|
||||
|
||||
# Version 2.1.0
|
||||
|
||||
- Update `event-listener` to v5.1.0. (#67)
|
||||
|
|
10
Cargo.toml
10
Cargo.toml
|
@ -3,7 +3,7 @@ name = "async-process"
|
|||
# When publishing a new version:
|
||||
# - Update CHANGELOG.md
|
||||
# - Create "v2.x.y" git tag
|
||||
version = "2.1.0"
|
||||
version = "2.2.0"
|
||||
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
|
||||
edition = "2021"
|
||||
rust-version = "1.63"
|
||||
|
@ -19,12 +19,20 @@ async-lock = "3.0.0"
|
|||
cfg-if = "1.0"
|
||||
event-listener = "5.1.0"
|
||||
futures-lite = "2.0.0"
|
||||
tracing = { version = "0.1.40", default-features = false }
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
async-io = "2.1.0"
|
||||
async-signal = "0.2.3"
|
||||
rustix = { version = "0.38", default-features = false, features = ["std", "fs"] }
|
||||
|
||||
[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
|
||||
async-channel = "2.0.0"
|
||||
async-task = "4.7.0"
|
||||
|
||||
[target.'cfg(all(unix, not(any(target_os = "linux", target_os = "android"))))'.dependencies]
|
||||
rustix = { version = "0.38", default-features = false, features = ["std", "fs", "process"] }
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
async-channel = "2.0.0"
|
||||
blocking = "1.0.0"
|
||||
|
|
246
src/lib.rs
246
src/lib.rs
|
@ -59,7 +59,6 @@
|
|||
use std::convert::Infallible;
|
||||
use std::ffi::OsStr;
|
||||
use std::fmt;
|
||||
use std::mem;
|
||||
use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
@ -75,8 +74,7 @@ use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
|
|||
#[cfg(windows)]
|
||||
use blocking::Unblock;
|
||||
|
||||
use async_lock::{Mutex as AsyncMutex, OnceCell};
|
||||
use event_listener::Event;
|
||||
use async_lock::OnceCell;
|
||||
use futures_lite::{future, io, prelude::*};
|
||||
|
||||
#[doc(no_inline)]
|
||||
|
@ -87,6 +85,8 @@ pub mod unix;
|
|||
#[cfg(windows)]
|
||||
pub mod windows;
|
||||
|
||||
mod reaper;
|
||||
|
||||
mod sealed {
|
||||
pub trait Sealed {}
|
||||
}
|
||||
|
@ -99,17 +99,8 @@ static DRIVER_THREAD_SPAWNED: std::sync::atomic::AtomicBool =
|
|||
///
|
||||
/// This structure reaps zombie processes and emits the `SIGCHLD` signal.
|
||||
struct Reaper {
|
||||
/// An event delivered every time the SIGCHLD signal occurs.
|
||||
sigchld: Event,
|
||||
|
||||
/// The list of zombie processes.
|
||||
zombies: Mutex<Vec<std::process::Child>>,
|
||||
|
||||
/// The pipe that delivers signal notifications.
|
||||
pipe: Pipe,
|
||||
|
||||
/// Locking this mutex indicates that we are polling the SIGCHLD event.
|
||||
driver_guard: AsyncMutex<()>,
|
||||
/// Underlying system reaper.
|
||||
sys: reaper::Reaper,
|
||||
|
||||
/// The number of tasks polling the SIGCHLD event.
|
||||
///
|
||||
|
@ -129,10 +120,7 @@ impl Reaper {
|
|||
static REAPER: OnceCell<Reaper> = OnceCell::new();
|
||||
|
||||
REAPER.get_or_init_blocking(|| Reaper {
|
||||
sigchld: Event::new(),
|
||||
zombies: Mutex::new(Vec::new()),
|
||||
pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
|
||||
driver_guard: AsyncMutex::new(()),
|
||||
sys: reaper::Reaper::new(),
|
||||
drivers: AtomicUsize::new(0),
|
||||
child_count: AtomicUsize::new(0),
|
||||
})
|
||||
|
@ -165,8 +153,8 @@ impl Reaper {
|
|||
.spawn(move || {
|
||||
let driver = async move {
|
||||
// No need to bump self.drivers, it was already bumped in ensure_driven.
|
||||
let guard = self.driver_guard.lock().await;
|
||||
self.reap(guard).await
|
||||
let guard = self.sys.lock().await;
|
||||
self.sys.reap(guard).await
|
||||
};
|
||||
|
||||
#[cfg(unix)]
|
||||
|
@ -178,147 +166,20 @@ impl Reaper {
|
|||
.expect("cannot spawn async-process thread");
|
||||
}
|
||||
|
||||
/// Reap zombie processes forever.
|
||||
async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
|
||||
loop {
|
||||
// Wait for the next SIGCHLD signal.
|
||||
self.pipe.wait().await;
|
||||
|
||||
// Notify all listeners waiting on the SIGCHLD event.
|
||||
self.sigchld.notify(std::usize::MAX);
|
||||
|
||||
// Reap zombie processes, but make sure we don't hold onto the lock for too long!
|
||||
let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
|
||||
let mut i = 0;
|
||||
'reap_zombies: loop {
|
||||
for _ in 0..50 {
|
||||
if i >= zombies.len() {
|
||||
break 'reap_zombies;
|
||||
}
|
||||
|
||||
if let Ok(None) = zombies[i].try_wait() {
|
||||
i += 1;
|
||||
} else {
|
||||
zombies.swap_remove(i);
|
||||
}
|
||||
}
|
||||
|
||||
// Be a good citizen; yield if there are a lot of processes.
|
||||
//
|
||||
// After we yield, check if there are more zombie processes.
|
||||
future::yield_now().await;
|
||||
zombies.append(&mut self.zombies.lock().unwrap());
|
||||
}
|
||||
|
||||
// Put zombie processes back.
|
||||
self.zombies.lock().unwrap().append(&mut zombies);
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a process with this reaper.
|
||||
fn register(&'static self, child: &std::process::Child) -> io::Result<()> {
|
||||
fn register(&'static self, child: std::process::Child) -> io::Result<reaper::ChildGuard> {
|
||||
self.ensure_driven();
|
||||
self.pipe.register(child)
|
||||
self.sys.register(child)
|
||||
}
|
||||
}
|
||||
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(windows)] {
|
||||
use async_channel::{Sender, Receiver, bounded};
|
||||
use std::ffi::c_void;
|
||||
use std::os::windows::io::AsRawHandle;
|
||||
|
||||
use windows_sys::Win32::{
|
||||
Foundation::{BOOLEAN, HANDLE},
|
||||
System::Threading::{
|
||||
RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
|
||||
},
|
||||
};
|
||||
|
||||
/// Waits for the next SIGCHLD signal.
|
||||
struct Pipe {
|
||||
/// The sender channel for the SIGCHLD signal.
|
||||
sender: Sender<()>,
|
||||
|
||||
/// The receiver channel for the SIGCHLD signal.
|
||||
receiver: Receiver<()>,
|
||||
}
|
||||
|
||||
impl Pipe {
|
||||
/// Creates a new pipe.
|
||||
fn new() -> io::Result<Pipe> {
|
||||
let (sender, receiver) = bounded(1);
|
||||
Ok(Pipe {
|
||||
sender,
|
||||
receiver
|
||||
})
|
||||
}
|
||||
|
||||
/// Waits for the next SIGCHLD signal.
|
||||
async fn wait(&self) {
|
||||
self.receiver.recv().await.ok();
|
||||
}
|
||||
|
||||
/// Register a process object into this pipe.
|
||||
fn register(&self, child: &std::process::Child) -> io::Result<()> {
|
||||
// Called when a child exits.
|
||||
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
|
||||
Reaper::get().pipe.sender.try_send(()).ok();
|
||||
}
|
||||
|
||||
// Register this child process to invoke `callback` on exit.
|
||||
let mut wait_object = 0;
|
||||
let ret = unsafe {
|
||||
RegisterWaitForSingleObject(
|
||||
&mut wait_object,
|
||||
child.as_raw_handle() as HANDLE,
|
||||
Some(callback),
|
||||
std::ptr::null_mut(),
|
||||
INFINITE,
|
||||
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
|
||||
)
|
||||
};
|
||||
|
||||
if ret == 0 {
|
||||
Err(io::Error::last_os_error())
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wraps a sync I/O type into an async I/O type.
|
||||
fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
|
||||
Ok(Unblock::new(io))
|
||||
}
|
||||
} else if #[cfg(unix)] {
|
||||
use async_signal::{Signal, Signals};
|
||||
|
||||
/// Waits for the next SIGCHLD signal.
|
||||
struct Pipe {
|
||||
/// The iterator over SIGCHLD signals.
|
||||
signals: Signals,
|
||||
}
|
||||
|
||||
impl Pipe {
|
||||
/// Creates a new pipe.
|
||||
fn new() -> io::Result<Pipe> {
|
||||
Ok(Pipe {
|
||||
signals: Signals::new(Some(Signal::Child))?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Waits for the next SIGCHLD signal.
|
||||
async fn wait(&self) {
|
||||
(&self.signals).next().await;
|
||||
}
|
||||
|
||||
/// Register a process object into this pipe.
|
||||
fn register(&self, _child: &std::process::Child) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap a file descriptor into a non-blocking I/O type.
|
||||
fn wrap<T: std::os::unix::io::AsFd>(io: T) -> io::Result<Async<T>> {
|
||||
Async::new(io)
|
||||
|
@ -328,7 +189,7 @@ cfg_if::cfg_if! {
|
|||
|
||||
/// A guard that can kill child processes, or push them into the zombie list.
|
||||
struct ChildGuard {
|
||||
inner: Option<std::process::Child>,
|
||||
inner: reaper::ChildGuard,
|
||||
reap_on_drop: bool,
|
||||
kill_on_drop: bool,
|
||||
reaper: &'static Reaper,
|
||||
|
@ -336,7 +197,7 @@ struct ChildGuard {
|
|||
|
||||
impl ChildGuard {
|
||||
fn get_mut(&mut self) -> &mut std::process::Child {
|
||||
self.inner.as_mut().unwrap()
|
||||
self.inner.get_mut()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -347,10 +208,7 @@ impl Drop for ChildGuard {
|
|||
self.get_mut().kill().ok();
|
||||
}
|
||||
if self.reap_on_drop {
|
||||
let mut zombies = self.reaper.zombies.lock().unwrap();
|
||||
if let Ok(None) = self.get_mut().try_wait() {
|
||||
zombies.push(self.inner.take().unwrap());
|
||||
}
|
||||
self.inner.reap(&self.reaper.sys);
|
||||
}
|
||||
|
||||
// Decrement number of children.
|
||||
|
@ -409,14 +267,14 @@ impl Child {
|
|||
reaper.child_count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Register the child process in the global list.
|
||||
reaper.register(&child)?;
|
||||
let inner = reaper.register(child)?;
|
||||
|
||||
Ok(Child {
|
||||
stdin,
|
||||
stdout,
|
||||
stderr,
|
||||
child: Arc::new(Mutex::new(ChildGuard {
|
||||
inner: Some(child),
|
||||
inner,
|
||||
reap_on_drop: cmd.reap_on_drop,
|
||||
kill_on_drop: cmd.kill_on_drop,
|
||||
reaper,
|
||||
|
@ -481,7 +339,7 @@ impl Child {
|
|||
/// }
|
||||
/// # std::io::Result::Ok(()) });
|
||||
/// ```
|
||||
pub fn try_status(&mut self) -> io::Result<Option<ExitStatus>> {
|
||||
pub fn try_status(&self) -> io::Result<Option<ExitStatus>> {
|
||||
self.child.lock().unwrap().get_mut().try_wait()
|
||||
}
|
||||
|
||||
|
@ -507,27 +365,33 @@ impl Child {
|
|||
/// ```
|
||||
pub fn status(&mut self) -> impl Future<Output = io::Result<ExitStatus>> {
|
||||
self.stdin.take();
|
||||
self.status_no_drop()
|
||||
}
|
||||
|
||||
/// Waits for the process to exit.
|
||||
///
|
||||
/// Unlike `status`, does not drop the stdin handle. You are responsible
|
||||
/// for avoiding deadlocks caused by the child blocking on stdin while the
|
||||
/// parent blocks on waiting for the process to exit.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// # futures_lite::future::block_on(async {
|
||||
/// use async_process::{Command, Stdio};
|
||||
///
|
||||
/// let child = Command::new("cp")
|
||||
/// .arg("a.txt")
|
||||
/// .arg("b.txt")
|
||||
/// .spawn()?;
|
||||
///
|
||||
/// println!("exit status: {}", child.status_no_drop().await?);
|
||||
/// # std::io::Result::Ok(()) });
|
||||
/// ```
|
||||
pub fn status_no_drop(&self) -> impl Future<Output = io::Result<ExitStatus>> {
|
||||
let child = self.child.clone();
|
||||
|
||||
async move {
|
||||
loop {
|
||||
// Wait on the child process.
|
||||
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
|
||||
return Ok(status);
|
||||
}
|
||||
|
||||
// Start listening.
|
||||
event_listener::listener!(Reaper::get().sigchld => listener);
|
||||
|
||||
// Try again.
|
||||
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
|
||||
return Ok(status);
|
||||
}
|
||||
|
||||
// Wait on the listener.
|
||||
listener.await;
|
||||
}
|
||||
}
|
||||
async move { Reaper::get().sys.status(&child).await }
|
||||
}
|
||||
|
||||
/// Drops the stdin handle and collects the output of the process.
|
||||
|
@ -872,16 +736,9 @@ impl TryFrom<ChildStderr> for OwnedFd {
|
|||
/// }).await;
|
||||
/// # });
|
||||
/// ```
|
||||
#[allow(clippy::manual_async_fn)]
|
||||
#[inline]
|
||||
pub fn driver() -> impl Future<Output = Infallible> + Send + 'static {
|
||||
struct CallOnDrop<F: FnMut()>(F);
|
||||
|
||||
impl<F: FnMut()> Drop for CallOnDrop<F> {
|
||||
fn drop(&mut self) {
|
||||
(self.0)();
|
||||
}
|
||||
}
|
||||
|
||||
async {
|
||||
// Get the reaper.
|
||||
let reaper = Reaper::get();
|
||||
|
@ -896,20 +753,15 @@ pub fn driver() -> impl Future<Output = Infallible> + Send + 'static {
|
|||
// If this was the last driver, and there are still resources actively using the
|
||||
// reaper, make sure that there is a thread driving the reaper.
|
||||
if prev_count == 1
|
||||
&& reaper.child_count.load(Ordering::SeqCst) > 0
|
||||
&& !reaper
|
||||
.zombies
|
||||
.lock()
|
||||
.unwrap_or_else(|x| x.into_inner())
|
||||
.is_empty()
|
||||
&& (reaper.child_count.load(Ordering::SeqCst) > 0 || reaper.sys.has_zombies())
|
||||
{
|
||||
reaper.ensure_driven();
|
||||
}
|
||||
});
|
||||
|
||||
// Acquire the reaper lock and start polling the SIGCHLD event.
|
||||
let guard = reaper.driver_guard.lock().await;
|
||||
reaper.reap(guard).await
|
||||
let guard = reaper.sys.lock().await;
|
||||
reaper.sys.reap(guard).await
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1307,6 +1159,14 @@ fn blocking_fd(fd: rustix::fd::BorrowedFd<'_>) -> io::Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
struct CallOnDrop<F: FnMut()>(F);
|
||||
|
||||
impl<F: FnMut()> Drop for CallOnDrop<F> {
|
||||
fn drop(&mut self) {
|
||||
(self.0)();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
#[test]
|
||||
|
|
|
@ -0,0 +1,221 @@
|
|||
//! The underlying system reaper.
|
||||
//!
|
||||
//! There are two backends:
|
||||
//!
|
||||
//! - signal, which waits for SIGCHLD.
|
||||
//! - wait, which waits directly on a process handle.
|
||||
//!
|
||||
//! "wait" is preferred, but is not available on all supported Linuxes. So we
|
||||
//! test to see if pidfd is supported first. If it is, we use wait. If not, we use
|
||||
//! signal.
|
||||
|
||||
#![allow(irrefutable_let_patterns)]
|
||||
|
||||
/// Enable the waiting reaper.
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
macro_rules! cfg_wait {
|
||||
($($tt:tt)*) => {$($tt)*};
|
||||
}
|
||||
|
||||
/// Enable the waiting reaper.
|
||||
#[cfg(not(any(target_os = "linux", target_os = "android")))]
|
||||
macro_rules! cfg_wait {
|
||||
($($tt:tt)*) => {};
|
||||
}
|
||||
|
||||
/// Enable signals.
|
||||
macro_rules! cfg_signal {
|
||||
($($tt:tt)*) => {$($tt)*};
|
||||
}
|
||||
|
||||
cfg_wait! {
|
||||
mod wait;
|
||||
}
|
||||
|
||||
cfg_signal! {
|
||||
mod signal;
|
||||
}
|
||||
|
||||
use std::io;
|
||||
use std::sync::Mutex;
|
||||
|
||||
/// The underlying system reaper.
|
||||
pub(crate) enum Reaper {
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
/// The reaper based on the wait backend.
|
||||
Wait(wait::Reaper),
|
||||
|
||||
/// The reaper based on the signal backend.
|
||||
Signal(signal::Reaper),
|
||||
}
|
||||
|
||||
/// The wrapper around a child.
|
||||
pub(crate) enum ChildGuard {
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
/// The child guard based on the wait backend.
|
||||
Wait(wait::ChildGuard),
|
||||
|
||||
/// The child guard based on the signal backend.
|
||||
Signal(signal::ChildGuard),
|
||||
}
|
||||
|
||||
/// A lock on the reaper.
|
||||
pub(crate) enum Lock {
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
/// The wait-based reaper needs no lock.
|
||||
Wait,
|
||||
|
||||
/// The lock for the signal-based reaper.
|
||||
Signal(signal::Lock),
|
||||
}
|
||||
|
||||
impl Reaper {
|
||||
/// Create a new reaper.
|
||||
pub(crate) fn new() -> Self {
|
||||
cfg_wait! {
|
||||
if wait::available() && !cfg!(async_process_force_signal_backend) {
|
||||
return Self::Wait(wait::Reaper::new());
|
||||
}
|
||||
}
|
||||
|
||||
// Return the signal-based reaper.
|
||||
cfg_signal! {
|
||||
return Self::Signal(signal::Reaper::new());
|
||||
}
|
||||
|
||||
#[allow(unreachable_code)]
|
||||
{
|
||||
panic!("neither the signal backend nor the waiter backend is available")
|
||||
}
|
||||
}
|
||||
|
||||
/// Lock the driver thread.
|
||||
///
|
||||
/// This makes it so only one thread can reap at once.
|
||||
pub(crate) async fn lock(&'static self) -> Lock {
|
||||
cfg_wait! {
|
||||
if let Self::Wait(_this) = self {
|
||||
// No locking needed.
|
||||
return Lock::Wait;
|
||||
}
|
||||
}
|
||||
|
||||
cfg_signal! {
|
||||
if let Self::Signal(this) = self {
|
||||
// We need to lock.
|
||||
return Lock::Signal(this.lock().await);
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
/// Reap zombie processes forever.
|
||||
pub(crate) async fn reap(&'static self, lock: Lock) -> ! {
|
||||
cfg_wait! {
|
||||
if let (Self::Wait(this), Lock::Wait) = (self, &lock) {
|
||||
return this.reap().await;
|
||||
}
|
||||
}
|
||||
|
||||
cfg_signal! {
|
||||
if let (Self::Signal(this), Lock::Signal(lock)) = (self, lock) {
|
||||
return this.reap(lock).await;
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
/// Register a child into this reaper.
|
||||
pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
|
||||
cfg_wait! {
|
||||
if let Self::Wait(this) = self {
|
||||
return this.register(child).map(ChildGuard::Wait);
|
||||
}
|
||||
}
|
||||
|
||||
cfg_signal! {
|
||||
if let Self::Signal(this) = self {
|
||||
return this.register(child).map(ChildGuard::Signal);
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
/// Wait for the inner child to complete.
|
||||
pub(crate) async fn status(
|
||||
&'static self,
|
||||
child: &Mutex<crate::ChildGuard>,
|
||||
) -> io::Result<std::process::ExitStatus> {
|
||||
cfg_wait! {
|
||||
if let Self::Wait(this) = self {
|
||||
return this.status(child).await;
|
||||
}
|
||||
}
|
||||
|
||||
cfg_signal! {
|
||||
if let Self::Signal(this) = self {
|
||||
return this.status(child).await;
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
/// Do we have any registered zombie processes?
|
||||
pub(crate) fn has_zombies(&'static self) -> bool {
|
||||
cfg_wait! {
|
||||
if let Self::Wait(this) = self {
|
||||
return this.has_zombies();
|
||||
}
|
||||
}
|
||||
|
||||
cfg_signal! {
|
||||
if let Self::Signal(this) = self {
|
||||
return this.has_zombies();
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
impl ChildGuard {
|
||||
/// Get a reference to the inner process.
|
||||
pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
|
||||
cfg_wait! {
|
||||
if let Self::Wait(this) = self {
|
||||
return this.get_mut();
|
||||
}
|
||||
}
|
||||
|
||||
cfg_signal! {
|
||||
if let Self::Signal(this) = self {
|
||||
return this.get_mut();
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
/// Start reaping this child process.
|
||||
pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
|
||||
cfg_wait! {
|
||||
if let (Self::Wait(this), Reaper::Wait(reaper)) = (&mut *self, reaper) {
|
||||
this.reap(reaper);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
cfg_signal! {
|
||||
if let (Self::Signal(this), Reaper::Signal(reaper)) = (self, reaper) {
|
||||
this.reap(reaper);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,238 @@
|
|||
//! A version of the reaper that waits for a signal to check for process progress.
|
||||
|
||||
use async_lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
|
||||
use event_listener::Event;
|
||||
use futures_lite::future;
|
||||
|
||||
use std::io;
|
||||
use std::mem;
|
||||
use std::sync::Mutex;
|
||||
|
||||
pub(crate) type Lock = AsyncMutexGuard<'static, ()>;
|
||||
|
||||
/// The zombie process reaper.
|
||||
pub(crate) struct Reaper {
|
||||
/// An event delivered every time the SIGCHLD signal occurs.
|
||||
sigchld: Event,
|
||||
|
||||
/// The list of zombie processes.
|
||||
zombies: Mutex<Vec<std::process::Child>>,
|
||||
|
||||
/// The pipe that delivers signal notifications.
|
||||
pipe: Pipe,
|
||||
|
||||
/// Locking this mutex indicates that we are polling the SIGCHLD event.
|
||||
driver_guard: AsyncMutex<()>,
|
||||
}
|
||||
|
||||
impl Reaper {
|
||||
/// Create a new reaper.
|
||||
pub(crate) fn new() -> Self {
|
||||
Reaper {
|
||||
sigchld: Event::new(),
|
||||
zombies: Mutex::new(Vec::new()),
|
||||
pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
|
||||
driver_guard: AsyncMutex::new(()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Lock the driver thread.
|
||||
pub(crate) async fn lock(&self) -> AsyncMutexGuard<'_, ()> {
|
||||
self.driver_guard.lock().await
|
||||
}
|
||||
|
||||
/// Reap zombie processes forever.
|
||||
pub(crate) async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
|
||||
loop {
|
||||
// Wait for the next SIGCHLD signal.
|
||||
self.pipe.wait().await;
|
||||
|
||||
// Notify all listeners waiting on the SIGCHLD event.
|
||||
self.sigchld.notify(std::usize::MAX);
|
||||
|
||||
// Reap zombie processes, but make sure we don't hold onto the lock for too long!
|
||||
let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
|
||||
let mut i = 0;
|
||||
'reap_zombies: loop {
|
||||
for _ in 0..50 {
|
||||
if i >= zombies.len() {
|
||||
break 'reap_zombies;
|
||||
}
|
||||
|
||||
if let Ok(None) = zombies[i].try_wait() {
|
||||
i += 1;
|
||||
} else {
|
||||
zombies.swap_remove(i);
|
||||
}
|
||||
}
|
||||
|
||||
// Be a good citizen; yield if there are a lot of processes.
|
||||
//
|
||||
// After we yield, check if there are more zombie processes.
|
||||
future::yield_now().await;
|
||||
zombies.append(&mut self.zombies.lock().unwrap());
|
||||
}
|
||||
|
||||
// Put zombie processes back.
|
||||
self.zombies.lock().unwrap().append(&mut zombies);
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a process with this reaper.
|
||||
pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
|
||||
self.pipe.register(&child)?;
|
||||
Ok(ChildGuard { inner: Some(child) })
|
||||
}
|
||||
|
||||
/// Wait for an event to occur for a child process.
|
||||
pub(crate) async fn status(
|
||||
&'static self,
|
||||
child: &Mutex<crate::ChildGuard>,
|
||||
) -> io::Result<std::process::ExitStatus> {
|
||||
loop {
|
||||
// Wait on the child process.
|
||||
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
|
||||
return Ok(status);
|
||||
}
|
||||
|
||||
// Start listening.
|
||||
event_listener::listener!(self.sigchld => listener);
|
||||
|
||||
// Try again.
|
||||
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
|
||||
return Ok(status);
|
||||
}
|
||||
|
||||
// Wait on the listener.
|
||||
listener.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Do we have any registered zombie processes?
|
||||
pub(crate) fn has_zombies(&'static self) -> bool {
|
||||
!self
|
||||
.zombies
|
||||
.lock()
|
||||
.unwrap_or_else(|x| x.into_inner())
|
||||
.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// The wrapper around the child.
|
||||
pub(crate) struct ChildGuard {
|
||||
inner: Option<std::process::Child>,
|
||||
}
|
||||
|
||||
impl ChildGuard {
|
||||
/// Get a mutable reference to the inner child.
|
||||
pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
|
||||
self.inner.as_mut().unwrap()
|
||||
}
|
||||
|
||||
/// Begin the reaping process for this child.
|
||||
pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
|
||||
if let Ok(None) = self.get_mut().try_wait() {
|
||||
reaper.zombies.lock().unwrap().push(self.inner.take().unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(windows)] {
|
||||
use async_channel::{Sender, Receiver, bounded};
|
||||
use std::ffi::c_void;
|
||||
use std::os::windows::io::AsRawHandle;
|
||||
|
||||
use windows_sys::Win32::{
|
||||
Foundation::{BOOLEAN, HANDLE},
|
||||
System::Threading::{
|
||||
RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
|
||||
},
|
||||
};
|
||||
|
||||
/// Waits for the next SIGCHLD signal.
|
||||
struct Pipe {
|
||||
/// The sender channel for the SIGCHLD signal.
|
||||
sender: Sender<()>,
|
||||
|
||||
/// The receiver channel for the SIGCHLD signal.
|
||||
receiver: Receiver<()>,
|
||||
}
|
||||
|
||||
impl Pipe {
|
||||
/// Creates a new pipe.
|
||||
fn new() -> io::Result<Pipe> {
|
||||
let (sender, receiver) = bounded(1);
|
||||
Ok(Pipe {
|
||||
sender,
|
||||
receiver
|
||||
})
|
||||
}
|
||||
|
||||
/// Waits for the next SIGCHLD signal.
|
||||
async fn wait(&self) {
|
||||
self.receiver.recv().await.ok();
|
||||
}
|
||||
|
||||
/// Register a process object into this pipe.
|
||||
fn register(&self, child: &std::process::Child) -> io::Result<()> {
|
||||
// Called when a child exits.
|
||||
#[allow(clippy::infallible_destructuring_match)]
|
||||
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
|
||||
let reaper = match &crate::Reaper::get().sys {
|
||||
super::Reaper::Signal(reaper) => reaper,
|
||||
};
|
||||
|
||||
reaper.pipe.sender.try_send(()).ok();
|
||||
}
|
||||
|
||||
// Register this child process to invoke `callback` on exit.
|
||||
let mut wait_object = 0;
|
||||
let ret = unsafe {
|
||||
RegisterWaitForSingleObject(
|
||||
&mut wait_object,
|
||||
child.as_raw_handle() as HANDLE,
|
||||
Some(callback),
|
||||
std::ptr::null_mut(),
|
||||
INFINITE,
|
||||
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
|
||||
)
|
||||
};
|
||||
|
||||
if ret == 0 {
|
||||
Err(io::Error::last_os_error())
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if #[cfg(unix)] {
|
||||
use async_signal::{Signal, Signals};
|
||||
use futures_lite::prelude::*;
|
||||
|
||||
/// Waits for the next SIGCHLD signal.
|
||||
struct Pipe {
|
||||
/// The iterator over SIGCHLD signals.
|
||||
signals: Signals,
|
||||
}
|
||||
|
||||
impl Pipe {
|
||||
/// Creates a new pipe.
|
||||
fn new() -> io::Result<Pipe> {
|
||||
Ok(Pipe {
|
||||
signals: Signals::new(Some(Signal::Child))?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Waits for the next SIGCHLD signal.
|
||||
async fn wait(&self) {
|
||||
(&self.signals).next().await;
|
||||
}
|
||||
|
||||
/// Register a process object into this pipe.
|
||||
fn register(&self, _child: &std::process::Child) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,188 @@
|
|||
//! A version of the reaper that waits on some polling primitive.
|
||||
//!
|
||||
//! This uses:
|
||||
//!
|
||||
//! - pidfd on Linux/Android
|
||||
|
||||
use async_channel::{Receiver, Sender};
|
||||
use async_task::Runnable;
|
||||
use futures_lite::future;
|
||||
|
||||
use std::io;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Mutex;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
/// The zombie process reaper.
|
||||
pub(crate) struct Reaper {
|
||||
/// The channel for sending new runnables.
|
||||
sender: Sender<Runnable>,
|
||||
|
||||
/// The channel for receiving new runnables.
|
||||
recv: Receiver<Runnable>,
|
||||
|
||||
/// Number of zombie processes.
|
||||
zombies: AtomicU64,
|
||||
}
|
||||
|
||||
impl Reaper {
|
||||
/// Create a new reaper.
|
||||
pub(crate) fn new() -> Self {
|
||||
let (sender, recv) = async_channel::unbounded();
|
||||
Self {
|
||||
sender,
|
||||
recv,
|
||||
zombies: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
/// Reap zombie processes forever.
|
||||
pub(crate) async fn reap(&'static self) -> ! {
|
||||
loop {
|
||||
// Fetch the next task.
|
||||
let task = match self.recv.recv().await {
|
||||
Ok(task) => task,
|
||||
Err(_) => panic!("sender should never be closed"),
|
||||
};
|
||||
|
||||
// Poll the task.
|
||||
task.run();
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a child into this reaper.
|
||||
pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
|
||||
Ok(ChildGuard {
|
||||
inner: Some(WaitableChild::new(child)?),
|
||||
})
|
||||
}
|
||||
|
||||
/// Wait for a child to complete.
|
||||
pub(crate) async fn status(
|
||||
&'static self,
|
||||
child: &Mutex<crate::ChildGuard>,
|
||||
) -> io::Result<std::process::ExitStatus> {
|
||||
future::poll_fn(|cx| {
|
||||
// Lock the child.
|
||||
let mut child = child.lock().unwrap();
|
||||
|
||||
// Get the inner child value.
|
||||
let inner = match &mut child.inner {
|
||||
super::ChildGuard::Wait(inner) => inner,
|
||||
_ => unreachable!()
|
||||
};
|
||||
|
||||
// Poll for the next value.
|
||||
inner.inner.as_mut().unwrap().poll_wait(cx)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Do we have any registered zombie processes?
|
||||
pub(crate) fn has_zombies(&'static self) -> bool {
|
||||
self.zombies.load(Ordering::SeqCst) > 0
|
||||
}
|
||||
}
|
||||
|
||||
/// The wrapper around the child.
|
||||
pub(crate) struct ChildGuard {
|
||||
inner: Option<WaitableChild>,
|
||||
}
|
||||
|
||||
impl ChildGuard {
|
||||
/// Get a mutable reference to the inner child.
|
||||
pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
|
||||
self.inner.as_mut().unwrap().get_mut()
|
||||
}
|
||||
|
||||
/// Begin the reaping process for this child.
|
||||
pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
|
||||
// Create a future for polling this child.
|
||||
let future = {
|
||||
let mut inner = self.inner.take().unwrap();
|
||||
async move {
|
||||
// Increment the zombie count.
|
||||
reaper.zombies.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Decrement the zombie count once we are done.
|
||||
let _guard = crate::CallOnDrop(|| {
|
||||
reaper.zombies.fetch_sub(1, Ordering::SeqCst);
|
||||
});
|
||||
|
||||
// Wait on this child forever.
|
||||
let result = future::poll_fn(|cx| inner.poll_wait(cx)).await;
|
||||
if let Err(e) = result {
|
||||
tracing::error!("error while polling zombie process: {}", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Create a function for scheduling this future.
|
||||
let schedule = move |runnable| {
|
||||
reaper.sender.try_send(runnable).ok();
|
||||
};
|
||||
|
||||
// Spawn the task and run it forever.
|
||||
let (runnable, task) = async_task::spawn(future, schedule);
|
||||
task.detach();
|
||||
runnable.schedule();
|
||||
}
|
||||
}
|
||||
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(any(
|
||||
target_os = "linux",
|
||||
target_os = "android"
|
||||
))] {
|
||||
use async_io::Async;
|
||||
use rustix::process;
|
||||
use std::os::unix::io::OwnedFd;
|
||||
|
||||
/// Waitable version of `std::process::Child`
|
||||
struct WaitableChild {
|
||||
child: std::process::Child,
|
||||
handle: Async<OwnedFd>,
|
||||
}
|
||||
|
||||
impl WaitableChild {
|
||||
fn new(child: std::process::Child) -> io::Result<Self> {
|
||||
let pidfd = process::pidfd_open(
|
||||
process::Pid::from_child(&child),
|
||||
process::PidfdFlags::empty()
|
||||
)?;
|
||||
|
||||
Ok(Self {
|
||||
child,
|
||||
handle: Async::new(pidfd)?
|
||||
})
|
||||
}
|
||||
|
||||
fn get_mut(&mut self) -> &mut std::process::Child {
|
||||
&mut self.child
|
||||
}
|
||||
|
||||
fn poll_wait(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<std::process::ExitStatus>> {
|
||||
loop {
|
||||
if let Some(status) = self.child.try_wait()? {
|
||||
return Poll::Ready(Ok(status));
|
||||
}
|
||||
|
||||
// Wait for us to become readable.
|
||||
futures_lite::ready!(self.handle.poll_readable(cx))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Tell if we are able to use this backend.
|
||||
pub(crate) fn available() -> bool {
|
||||
// Create a Pidfd for the current process and see if it works.
|
||||
let result = process::pidfd_open(
|
||||
process::getpid(),
|
||||
process::PidfdFlags::empty()
|
||||
);
|
||||
|
||||
// Tell if it was okay or not.
|
||||
result.is_ok()
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
//! Sleep test.
|
||||
|
||||
use async_process::Command;
|
||||
use futures_lite::future::block_on;
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn unix_sleep() {
|
||||
block_on(async {
|
||||
let status = Command::new("sleep").arg("1").status().await.unwrap();
|
||||
assert!(status.success());
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
#[test]
|
||||
fn windows_sleep() {
|
||||
block_on(async {
|
||||
let status = Command::new("ping")
|
||||
.args(["-n", "5", "127.0.0.1"])
|
||||
.status()
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(status.success());
|
||||
});
|
||||
}
|
Loading…
Reference in New Issue