Centralize all global state
This commit is contained in:
parent
5e8e0b7c7b
commit
ccea232615
319
src/lib.rs
319
src/lib.rs
|
@ -90,8 +90,173 @@ mod sealed {
|
|||
pub trait Sealed {}
|
||||
}
|
||||
|
||||
/// An event delivered every time the SIGCHLD signal occurs.
|
||||
static SIGCHLD: Event = Event::new();
|
||||
/// The zombie process reaper.
|
||||
///
|
||||
/// This structure reaps zombie processes and emits the `SIGCHLD` signal.
|
||||
struct Reaper {
|
||||
/// An event delivered every time the SIGCHLD signal occurs.
|
||||
sigchld: Event,
|
||||
|
||||
/// The list of zombie processes.
|
||||
zombies: Mutex<Vec<std::process::Child>>,
|
||||
|
||||
/// The pipe that delivers signal notifications.
|
||||
pipe: OnceCell<Pipe>,
|
||||
}
|
||||
|
||||
impl Reaper {
|
||||
/// Get the singleton instance of the reaper.
|
||||
fn get() -> &'static Self {
|
||||
static REAPER: Reaper = Reaper {
|
||||
sigchld: Event::new(),
|
||||
zombies: Mutex::new(Vec::new()),
|
||||
pipe: OnceCell::new(),
|
||||
};
|
||||
|
||||
// Start up the reaper thread if we haven't already.
|
||||
REAPER.pipe.get_or_init_blocking(|| {
|
||||
thread::Builder::new()
|
||||
.name("async-process".to_string())
|
||||
.spawn(|| REAPER.reap())
|
||||
.expect("cannot spawn async-process thread");
|
||||
|
||||
Pipe::new().expect("cannot create SIGCHLD pipe")
|
||||
});
|
||||
|
||||
&REAPER
|
||||
}
|
||||
|
||||
/// Reap zombie processes forever.
|
||||
fn reap(&'static self) -> ! {
|
||||
loop {
|
||||
// Wait for the next SIGCHLD signal.
|
||||
self.pipe.get().unwrap().wait();
|
||||
|
||||
// Notify all listeners waiting on the SIGCHLD event.
|
||||
self.sigchld.notify(std::usize::MAX);
|
||||
|
||||
// Reap zombie processes.
|
||||
let mut zombies = self.zombies.lock().unwrap();
|
||||
let mut i = 0;
|
||||
while i < zombies.len() {
|
||||
if let Ok(None) = zombies[i].try_wait() {
|
||||
i += 1;
|
||||
} else {
|
||||
zombies.swap_remove(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a process with this reaper.
|
||||
fn register(&'static self, child: &std::process::Child) -> io::Result<()> {
|
||||
self.pipe.get().unwrap().register(child)
|
||||
}
|
||||
}
|
||||
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(windows)] {
|
||||
use std::ffi::c_void;
|
||||
use std::os::windows::io::AsRawHandle;
|
||||
use std::sync::mpsc;
|
||||
|
||||
use windows_sys::Win32::{
|
||||
Foundation::{BOOLEAN, HANDLE},
|
||||
System::Threading::{
|
||||
RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
|
||||
},
|
||||
};
|
||||
|
||||
/// Waits for the next SIGCHLD signal.
|
||||
struct Pipe {
|
||||
/// The sender channel for the SIGCHLD signal.
|
||||
sender: mpsc::SyncSender<()>,
|
||||
|
||||
/// The receiver channel for the SIGCHLD signal.
|
||||
receiver: Mutex<mpsc::Receiver<()>>,
|
||||
}
|
||||
|
||||
impl Pipe {
|
||||
/// Creates a new pipe.
|
||||
fn new() -> io::Result<Pipe> {
|
||||
let (sender, receiver) = mpsc::sync_channel(1);
|
||||
Ok(Pipe {
|
||||
sender,
|
||||
receiver: Mutex::new(receiver),
|
||||
})
|
||||
}
|
||||
|
||||
/// Waits for the next SIGCHLD signal.
|
||||
fn wait(&self) {
|
||||
self.receiver.lock().unwrap().recv().ok();
|
||||
}
|
||||
|
||||
/// Register a process object into this pipe.
|
||||
fn register(&self, _child: &std::process::Child) -> io::Result<()> {
|
||||
// Called when a child exits.
|
||||
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
|
||||
callback_channel().0.try_send(()).ok();
|
||||
}
|
||||
|
||||
// Register this child process to invoke `callback` on exit.
|
||||
let mut wait_object = 0;
|
||||
let ret = unsafe {
|
||||
RegisterWaitForSingleObject(
|
||||
&mut wait_object,
|
||||
child.as_raw_handle() as HANDLE,
|
||||
Some(callback),
|
||||
std::ptr::null_mut(),
|
||||
INFINITE,
|
||||
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
|
||||
)
|
||||
};
|
||||
|
||||
if ret == 0 {
|
||||
Err(io::Error::last_os_error())
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wraps a sync I/O type into an async I/O type.
|
||||
fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
|
||||
Ok(Unblock::new(io))
|
||||
}
|
||||
} else if #[cfg(unix)] {
|
||||
use async_signal::{Signal, Signals};
|
||||
|
||||
/// Waits for the next SIGCHLD signal.
|
||||
struct Pipe {
|
||||
/// The iterator over SIGCHLD signals.
|
||||
signals: Signals,
|
||||
}
|
||||
|
||||
impl Pipe {
|
||||
/// Creates a new pipe.
|
||||
fn new() -> io::Result<Pipe> {
|
||||
Ok(Pipe {
|
||||
signals: Signals::new(Some(Signal::Child))?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Waits for the next SIGCHLD signal.
|
||||
fn wait(&self) {
|
||||
async_io::block_on((&self.signals).next());
|
||||
}
|
||||
|
||||
/// Register a process object into this pipe.
|
||||
fn register(&self, _child: &std::process::Child) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap a file descriptor into a non-blocking I/O type.
|
||||
fn wrap<T: std::os::unix::io::AsRawFd>(io: T) -> io::Result<Async<T>> {
|
||||
Async::new(io)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A guard that can kill child processes, or push them into the zombie list.
|
||||
struct ChildGuard {
|
||||
|
@ -106,6 +271,21 @@ impl ChildGuard {
|
|||
}
|
||||
}
|
||||
|
||||
// When the last reference to the child process is dropped, push it into the zombie list.
|
||||
impl Drop for ChildGuard {
|
||||
fn drop(&mut self) {
|
||||
if self.kill_on_drop {
|
||||
self.get_mut().kill().ok();
|
||||
}
|
||||
if self.reap_on_drop {
|
||||
let mut zombies = Reaper::get().zombies.lock().unwrap();
|
||||
if let Ok(None) = self.get_mut().try_wait() {
|
||||
zombies.push(self.inner.take().unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A spawned child process.
|
||||
///
|
||||
/// The process can be in running or exited state. Use [`status()`][`Child::status()`] or
|
||||
|
@ -151,137 +331,8 @@ impl Child {
|
|||
let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout);
|
||||
let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr);
|
||||
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(windows)] {
|
||||
use std::ffi::c_void;
|
||||
use std::os::windows::io::AsRawHandle;
|
||||
use std::sync::mpsc;
|
||||
|
||||
use windows_sys::Win32::{
|
||||
Foundation::{BOOLEAN, HANDLE},
|
||||
System::Threading::{
|
||||
RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
|
||||
},
|
||||
};
|
||||
|
||||
// This channel is used to simulate SIGCHLD on Windows.
|
||||
fn callback_channel() -> (&'static mpsc::SyncSender<()>, &'static Mutex<mpsc::Receiver<()>>) {
|
||||
static CALLBACK: OnceCell<(mpsc::SyncSender<()>, Mutex<mpsc::Receiver<()>>)> =
|
||||
OnceCell::new();
|
||||
|
||||
let (s, r) = CALLBACK.get_or_init_blocking(|| {
|
||||
let (s, r) = mpsc::sync_channel(1);
|
||||
(s, Mutex::new(r))
|
||||
});
|
||||
|
||||
(s, r)
|
||||
}
|
||||
|
||||
// Called when a child exits.
|
||||
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
|
||||
callback_channel().0.try_send(()).ok();
|
||||
}
|
||||
|
||||
// Register this child process to invoke `callback` on exit.
|
||||
let mut wait_object = 0;
|
||||
let ret = unsafe {
|
||||
RegisterWaitForSingleObject(
|
||||
&mut wait_object,
|
||||
child.as_raw_handle() as HANDLE,
|
||||
Some(callback),
|
||||
std::ptr::null_mut(),
|
||||
INFINITE,
|
||||
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
|
||||
)
|
||||
};
|
||||
if ret == 0 {
|
||||
return Err(io::Error::last_os_error());
|
||||
}
|
||||
|
||||
// Waits for the next SIGCHLD signal.
|
||||
fn wait_sigchld() {
|
||||
callback_channel().1.lock().unwrap().recv().ok();
|
||||
}
|
||||
|
||||
// Wraps a sync I/O type into an async I/O type.
|
||||
fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
|
||||
Ok(Unblock::new(io))
|
||||
}
|
||||
|
||||
} else if #[cfg(unix)] {
|
||||
use async_signal::{Signal, Signals};
|
||||
|
||||
static SIGNALS: OnceCell<Signals> = OnceCell::new();
|
||||
|
||||
// Make sure the signal handler is registered before interacting with the process.
|
||||
SIGNALS.get_or_init_blocking(|| {
|
||||
Signals::new(Some(Signal::Child))
|
||||
.expect("Failed to register SIGCHLD handler")
|
||||
});
|
||||
|
||||
// Waits for the next SIGCHLD signal.
|
||||
fn wait_sigchld() {
|
||||
async_io::block_on(
|
||||
SIGNALS
|
||||
.get()
|
||||
.expect("Signals not registered")
|
||||
.next()
|
||||
);
|
||||
}
|
||||
|
||||
// Wraps a sync I/O type into an async I/O type.
|
||||
fn wrap<T: std::os::unix::io::AsRawFd>(io: T) -> io::Result<Async<T>> {
|
||||
Async::new(io)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static ZOMBIES: OnceCell<Mutex<Vec<std::process::Child>>> = OnceCell::new();
|
||||
|
||||
// Make sure the thread is started.
|
||||
ZOMBIES.get_or_init_blocking(|| {
|
||||
// Start a thread that handles SIGCHLD and notifies tasks when child processes exit.
|
||||
thread::Builder::new()
|
||||
.name("async-process".to_string())
|
||||
.spawn(move || {
|
||||
loop {
|
||||
// Wait for the next SIGCHLD signal.
|
||||
wait_sigchld();
|
||||
|
||||
// Notify all listeners waiting on the SIGCHLD event.
|
||||
SIGCHLD.notify(std::usize::MAX);
|
||||
|
||||
// Reap zombie processes.
|
||||
let mut zombies = ZOMBIES.get().unwrap().lock().unwrap();
|
||||
let mut i = 0;
|
||||
while i < zombies.len() {
|
||||
if let Ok(None) = zombies[i].try_wait() {
|
||||
i += 1;
|
||||
} else {
|
||||
zombies.swap_remove(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.expect("cannot spawn async-process thread");
|
||||
|
||||
Mutex::new(Vec::new())
|
||||
});
|
||||
|
||||
// When the last reference to the child process is dropped, push it into the zombie list.
|
||||
impl Drop for ChildGuard {
|
||||
fn drop(&mut self) {
|
||||
if self.kill_on_drop {
|
||||
self.get_mut().kill().ok();
|
||||
}
|
||||
if self.reap_on_drop {
|
||||
let mut zombies = ZOMBIES.get().unwrap().lock().unwrap();
|
||||
if let Ok(None) = self.get_mut().try_wait() {
|
||||
zombies.push(self.inner.take().unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Register the child process in the global list.
|
||||
Reaper::get().register(&child)?;
|
||||
|
||||
Ok(Child {
|
||||
stdin,
|
||||
|
@ -381,7 +432,7 @@ impl Child {
|
|||
let child = self.child.clone();
|
||||
|
||||
async move {
|
||||
let listener = EventListener::new(&SIGCHLD);
|
||||
let listener = EventListener::new(&Reaper::get().sigchld);
|
||||
let mut listening = false;
|
||||
futures_lite::pin!(listener);
|
||||
|
||||
|
|
Loading…
Reference in New Issue