m: Move reaper-related code to another module

Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
John Nunley 2024-01-15 11:25:08 -08:00
parent ed17f53035
commit 341eb0d182
No known key found for this signature in database
GPG Key ID: 2FE69973CFD64832
2 changed files with 255 additions and 180 deletions

View File

@ -59,7 +59,6 @@
use std::convert::Infallible;
use std::ffi::OsStr;
use std::fmt;
use std::mem;
use std::path::Path;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
@ -75,13 +74,15 @@ use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
#[cfg(windows)]
use blocking::Unblock;
use async_lock::{Mutex as AsyncMutex, OnceCell};
use event_listener::Event;
use async_lock::OnceCell;
use futures_lite::{future, io, prelude::*};
#[doc(no_inline)]
pub use std::process::{ExitStatus, Output, Stdio};
#[path = "reaper/signal.rs"]
mod reaper;
#[cfg(unix)]
pub mod unix;
#[cfg(windows)]
@ -99,17 +100,8 @@ static DRIVER_THREAD_SPAWNED: std::sync::atomic::AtomicBool =
///
/// This structure reaps zombie processes and emits the `SIGCHLD` signal.
struct Reaper {
/// An event delivered every time the SIGCHLD signal occurs.
sigchld: Event,
/// The list of zombie processes.
zombies: Mutex<Vec<std::process::Child>>,
/// The pipe that delivers signal notifications.
pipe: Pipe,
/// Locking this mutex indicates that we are polling the SIGCHLD event.
driver_guard: AsyncMutex<()>,
/// Underlying system reaper.
sys: reaper::Reaper,
/// The number of tasks polling the SIGCHLD event.
///
@ -129,10 +121,7 @@ impl Reaper {
static REAPER: OnceCell<Reaper> = OnceCell::new();
REAPER.get_or_init_blocking(|| Reaper {
sigchld: Event::new(),
zombies: Mutex::new(Vec::new()),
pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
driver_guard: AsyncMutex::new(()),
sys: reaper::Reaper::new(),
drivers: AtomicUsize::new(0),
child_count: AtomicUsize::new(0),
})
@ -165,8 +154,8 @@ impl Reaper {
.spawn(move || {
let driver = async move {
// No need to bump self.drivers, it was already bumped in ensure_driven.
let guard = self.driver_guard.lock().await;
self.reap(guard).await
let guard = self.sys.lock().await;
self.sys.reap(guard).await
};
#[cfg(unix)]
@ -179,146 +168,24 @@ impl Reaper {
}
/// Reap zombie processes forever.
async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
loop {
// Wait for the next SIGCHLD signal.
self.pipe.wait().await;
// Notify all listeners waiting on the SIGCHLD event.
self.sigchld.notify(std::usize::MAX);
// Reap zombie processes, but make sure we don't hold onto the lock for too long!
let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
let mut i = 0;
'reap_zombies: loop {
for _ in 0..50 {
if i >= zombies.len() {
break 'reap_zombies;
}
if let Ok(None) = zombies[i].try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
}
}
// Be a good citizen; yield if there are a lot of processes.
//
// After we yield, check if there are more zombie processes.
future::yield_now().await;
zombies.append(&mut self.zombies.lock().unwrap());
}
// Put zombie processes back.
self.zombies.lock().unwrap().append(&mut zombies);
}
async fn reap(&'static self, driver_guard: reaper::Lock) -> ! {
self.sys.reap(driver_guard).await
}
/// Register a process with this reaper.
fn register(&'static self, child: &std::process::Child) -> io::Result<()> {
fn register(&'static self, child: std::process::Child) -> io::Result<reaper::ChildGuard> {
self.ensure_driven();
self.pipe.register(child)
self.sys.register(child)
}
}
cfg_if::cfg_if! {
if #[cfg(windows)] {
use async_channel::{Sender, Receiver, bounded};
use std::ffi::c_void;
use std::os::windows::io::AsRawHandle;
use windows_sys::Win32::{
Foundation::{BOOLEAN, HANDLE},
System::Threading::{
RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
},
};
/// Waits for the next SIGCHLD signal.
struct Pipe {
/// The sender channel for the SIGCHLD signal.
sender: Sender<()>,
/// The receiver channel for the SIGCHLD signal.
receiver: Receiver<()>,
}
impl Pipe {
/// Creates a new pipe.
fn new() -> io::Result<Pipe> {
let (sender, receiver) = bounded(1);
Ok(Pipe {
sender,
receiver
})
}
/// Waits for the next SIGCHLD signal.
async fn wait(&self) {
self.receiver.recv().await.ok();
}
/// Register a process object into this pipe.
fn register(&self, child: &std::process::Child) -> io::Result<()> {
// Called when a child exits.
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
Reaper::get().pipe.sender.try_send(()).ok();
}
// Register this child process to invoke `callback` on exit.
let mut wait_object = 0;
let ret = unsafe {
RegisterWaitForSingleObject(
&mut wait_object,
child.as_raw_handle() as HANDLE,
Some(callback),
std::ptr::null_mut(),
INFINITE,
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
)
};
if ret == 0 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
}
// Wraps a sync I/O type into an async I/O type.
fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
Ok(Unblock::new(io))
}
} else if #[cfg(unix)] {
use async_signal::{Signal, Signals};
/// Waits for the next SIGCHLD signal.
struct Pipe {
/// The iterator over SIGCHLD signals.
signals: Signals,
}
impl Pipe {
/// Creates a new pipe.
fn new() -> io::Result<Pipe> {
Ok(Pipe {
signals: Signals::new(Some(Signal::Child))?,
})
}
/// Waits for the next SIGCHLD signal.
async fn wait(&self) {
(&self.signals).next().await;
}
/// Register a process object into this pipe.
fn register(&self, _child: &std::process::Child) -> io::Result<()> {
Ok(())
}
}
/// Wrap a file descriptor into a non-blocking I/O type.
fn wrap<T: std::os::unix::io::AsFd>(io: T) -> io::Result<Async<T>> {
Async::new(io)
@ -328,7 +195,7 @@ cfg_if::cfg_if! {
/// A guard that can kill child processes, or push them into the zombie list.
struct ChildGuard {
inner: Option<std::process::Child>,
inner: reaper::ChildGuard,
reap_on_drop: bool,
kill_on_drop: bool,
reaper: &'static Reaper,
@ -336,7 +203,7 @@ struct ChildGuard {
impl ChildGuard {
fn get_mut(&mut self) -> &mut std::process::Child {
self.inner.as_mut().unwrap()
self.inner.get_mut()
}
}
@ -347,10 +214,7 @@ impl Drop for ChildGuard {
self.get_mut().kill().ok();
}
if self.reap_on_drop {
let mut zombies = self.reaper.zombies.lock().unwrap();
if let Ok(None) = self.get_mut().try_wait() {
zombies.push(self.inner.take().unwrap());
}
self.inner.reap(&self.reaper.sys);
}
// Decrement number of children.
@ -409,14 +273,14 @@ impl Child {
reaper.child_count.fetch_add(1, Ordering::Relaxed);
// Register the child process in the global list.
reaper.register(&child)?;
let inner = reaper.register(child)?;
Ok(Child {
stdin,
stdout,
stderr,
child: Arc::new(Mutex::new(ChildGuard {
inner: Some(child),
inner,
reap_on_drop: cmd.reap_on_drop,
kill_on_drop: cmd.kill_on_drop,
reaper,
@ -509,25 +373,7 @@ impl Child {
self.stdin.take();
let child = self.child.clone();
async move {
loop {
// Wait on the child process.
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
return Ok(status);
}
// Start listening.
event_listener::listener!(Reaper::get().sigchld => listener);
// Try again.
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
return Ok(status);
}
// Wait on the listener.
listener.await;
}
}
async move { Reaper::get().sys.status(&child).await }
}
/// Drops the stdin handle and collects the output of the process.
@ -896,19 +742,14 @@ pub fn driver() -> impl Future<Output = Infallible> + Send + 'static {
// If this was the last driver, and there are still resources actively using the
// reaper, make sure that there is a thread driving the reaper.
if prev_count == 1
&& reaper.child_count.load(Ordering::SeqCst) > 0
&& !reaper
.zombies
.lock()
.unwrap_or_else(|x| x.into_inner())
.is_empty()
&& (reaper.child_count.load(Ordering::SeqCst) > 0 || reaper.sys.has_zombies())
{
reaper.ensure_driven();
}
});
// Acquire the reaper lock and start polling the SIGCHLD event.
let guard = reaper.driver_guard.lock().await;
let guard = reaper.sys.lock().await;
reaper.reap(guard).await
}
}

234
src/reaper/signal.rs Normal file
View File

@ -0,0 +1,234 @@
//! A version of the reaper that waits for a signal to check for process progress.
use async_lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
use event_listener::Event;
use futures_lite::future;
use std::io;
use std::mem;
use std::sync::Mutex;
pub(crate) type Lock = AsyncMutexGuard<'static, ()>;
/// The zombie process reaper.
pub(crate) struct Reaper {
/// An event delivered every time the SIGCHLD signal occurs.
sigchld: Event,
/// The list of zombie processes.
zombies: Mutex<Vec<std::process::Child>>,
/// The pipe that delivers signal notifications.
pipe: Pipe,
/// Locking this mutex indicates that we are polling the SIGCHLD event.
driver_guard: AsyncMutex<()>,
}
impl Reaper {
/// Create a new reaper.
pub(crate) fn new() -> Self {
Reaper {
sigchld: Event::new(),
zombies: Mutex::new(Vec::new()),
pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
driver_guard: AsyncMutex::new(()),
}
}
/// Lock the driver thread.
pub(crate) async fn lock(&self) -> AsyncMutexGuard<'_, ()> {
self.driver_guard.lock().await
}
/// Reap zombie processes forever.
pub(crate) async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
loop {
// Wait for the next SIGCHLD signal.
self.pipe.wait().await;
// Notify all listeners waiting on the SIGCHLD event.
self.sigchld.notify(std::usize::MAX);
// Reap zombie processes, but make sure we don't hold onto the lock for too long!
let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
let mut i = 0;
'reap_zombies: loop {
for _ in 0..50 {
if i >= zombies.len() {
break 'reap_zombies;
}
if let Ok(None) = zombies[i].try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
}
}
// Be a good citizen; yield if there are a lot of processes.
//
// After we yield, check if there are more zombie processes.
future::yield_now().await;
zombies.append(&mut self.zombies.lock().unwrap());
}
// Put zombie processes back.
self.zombies.lock().unwrap().append(&mut zombies);
}
}
/// Register a process with this reaper.
pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
self.pipe.register(&child)?;
Ok(ChildGuard { inner: Some(child) })
}
/// Wait for an event to occur for a child process.
pub(crate) async fn status(
&'static self,
child: &Mutex<crate::ChildGuard>,
) -> io::Result<std::process::ExitStatus> {
loop {
// Wait on the child process.
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
return Ok(status);
}
// Start listening.
event_listener::listener!(self.sigchld => listener);
// Try again.
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
return Ok(status);
}
// Wait on the listener.
listener.await;
}
}
/// Do we have any registered zombie processes?
pub(crate) fn has_zombies(&'static self) -> bool {
!self
.zombies
.lock()
.unwrap_or_else(|x| x.into_inner())
.is_empty()
}
}
/// The wrapper around the child.
pub(crate) struct ChildGuard {
inner: Option<std::process::Child>,
}
impl ChildGuard {
/// Get a mutable reference to the inner child.
pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
self.inner.as_mut().unwrap()
}
/// Begin the reaping process for this child.
pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
let mut zombies = reaper.zombies.lock().unwrap();
if let Ok(None) = self.get_mut().try_wait() {
zombies.push(self.inner.take().unwrap());
}
}
}
cfg_if::cfg_if! {
if #[cfg(windows)] {
use async_channel::{Sender, Receiver, bounded};
use std::ffi::c_void;
use std::os::windows::io::AsRawHandle;
use windows_sys::Win32::{
Foundation::{BOOLEAN, HANDLE},
System::Threading::{
RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
},
};
/// Waits for the next SIGCHLD signal.
struct Pipe {
/// The sender channel for the SIGCHLD signal.
sender: Sender<()>,
/// The receiver channel for the SIGCHLD signal.
receiver: Receiver<()>,
}
impl Pipe {
/// Creates a new pipe.
fn new() -> io::Result<Pipe> {
let (sender, receiver) = bounded(1);
Ok(Pipe {
sender,
receiver
})
}
/// Waits for the next SIGCHLD signal.
async fn wait(&self) {
self.receiver.recv().await.ok();
}
/// Register a process object into this pipe.
fn register(&self, child: &std::process::Child) -> io::Result<()> {
// Called when a child exits.
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
crate::Reaper::get().sys.pipe.sender.try_send(()).ok();
}
// Register this child process to invoke `callback` on exit.
let mut wait_object = 0;
let ret = unsafe {
RegisterWaitForSingleObject(
&mut wait_object,
child.as_raw_handle() as HANDLE,
Some(callback),
std::ptr::null_mut(),
INFINITE,
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
)
};
if ret == 0 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
}
} else if #[cfg(unix)] {
use async_signal::{Signal, Signals};
use futures_lite::prelude::*;
/// Waits for the next SIGCHLD signal.
struct Pipe {
/// The iterator over SIGCHLD signals.
signals: Signals,
}
impl Pipe {
/// Creates a new pipe.
fn new() -> io::Result<Pipe> {
Ok(Pipe {
signals: Signals::new(Some(Signal::Child))?,
})
}
/// Waits for the next SIGCHLD signal.
async fn wait(&self) {
(&self.signals).next().await;
}
/// Register a process object into this pipe.
fn register(&self, _child: &std::process::Child) -> io::Result<()> {
Ok(())
}
}
}
}