Improve pipe on windows + overall cleanup

This commit is contained in:
jtnunley 2022-12-31 14:27:31 -08:00 committed by notgull
parent 2250643838
commit 4463e507d8
2 changed files with 122 additions and 30 deletions

View File

@ -56,6 +56,7 @@
use std::ffi::OsStr;
use std::fmt;
use std::mem;
use std::path::Path;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
@ -149,37 +150,49 @@ impl Reaper {
// Notify all listeners waiting on the SIGCHLD event.
self.sigchld.notify(std::usize::MAX);
// Reap zombie processes.
let mut start = 0;
while let Some(new_start) = self.process_zombies(start) {
start = new_start;
// Take out the list of zombie processes.
let mut zombies = {
let mut zombies_lock = self.zombies.lock().unwrap();
if zombies_lock.is_empty() {
continue;
}
mem::take(&mut *zombies_lock)
};
// Poll the zombie processes to see if they are finished.
let mut i = 0;
'poll_zombies: loop {
// Only poll a set number of zombies at a time to avoid starvation.
for _ in 0..100 {
// If we've reached the list's end, break out of the loop.
if i == zombies.len() {
break 'poll_zombies;
}
// Get the zombie process.
let zombie = &mut zombies[i];
// Try to wait on it. If it's done, remove it from the list.
if let Ok(None) = zombie.try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
}
}
// Yield to avoid starving other tasks.
future::yield_now().await;
}
}
}
/// Wait on zombie processes.
///
/// Returns `Some(index to start at)` if we handled too many in one go.
fn process_zombies(&self, start: usize) -> Option<usize> {
let mut i = start;
let mut zombies = self.zombies.lock().unwrap();
// Put the list of zombie processes back.
let mut zombies_lock = self.zombies.lock().unwrap();
let mut new_zombies = mem::replace(&mut *zombies_lock, zombies);
// Only process a limited number of zombies at a time.
for _ in 0..100 {
// Get the zombie process.
let zombie = zombies.get_mut(i)?;
// Try to wait on it. If it's done, remove it from the list.
if let Ok(None) = zombie.try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
// If any new zombies have been added, append them to the list.
if !new_zombies.is_empty() {
zombies_lock.append(&mut new_zombies);
}
}
// Yield to avoid starving other tasks.
Some(i)
}
/// Spawn a backup thread to poll the reactor if it isn't being driven already.
@ -203,23 +216,60 @@ impl Reaper {
cfg_if::cfg_if! {
if #[cfg(windows)] {
use std::sync::atomic::{AtomicUsize, Ordering};
/// A callback that simulates a SIGCHLD signal on Windows.
struct Pipe {
/// The event that is signalled to simulate a SIGCHLD signal.
event: Event,
/// The event to signal when a child process completes.
signal: Event,
/// The number of child processes we've seen complete but haven't acted on yet.
complete: AtomicUsize,
}
impl Pipe {
/// Create a new pipe.
fn new() -> io::Result<Self> {
Ok(Pipe {
event: Event::new(),
signal: Event::new(),
complete: AtomicUsize::new(0),
})
}
/// Waits for the SIGCHLD signal.
async fn wait(&self) {
self.event.listen().await;
let mut completed = self.complete.load(Ordering::Acquire);
loop {
// If there's already a completed process, decrement and return.
if completed > 0 {
if let Err(actual) =
self.complete.compare_exchange(completed, completed - 1, Ordering::SeqCst, Ordering::SeqCst) {
completed = actual;
continue;
}
// Finish.
return;
} else {
// Register a listener for the next completed process.
let listener = self.signal.listen();
// See if there's a completed process now.
completed = self.complete.load(Ordering::Acquire);
if completed > 0 {
// There is, so we can drop the listener.
drop(listener);
continue;
}
// Wait for the next completed process.
listener.await;
// Loop around and try again.
completed = self.complete.load(Ordering::Acquire);
}
}
}
/// Registers a child process in this pipe.
@ -237,7 +287,17 @@ cfg_if::cfg_if! {
// Called when a child exits.
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
Reaper::get().pipe.event.notify_additional(1);
// A panic here would unwind into Win32, causing undefined behavior.
abort_on_panic(|| {
// Get the global pipe.
let pipe = &Reaper::get().pipe;
// Increment the number of completed processes.
pipe.complete.fetch_add(1, Ordering::SeqCst);
// Signal the SIGCHLD event.
pipe.signal.notify_additional(std::usize::MAX);
});
}
// Register this child process to invoke `callback` on exit.
@ -260,6 +320,22 @@ cfg_if::cfg_if! {
}
}
}
/// Abort if the given closure panics.
fn abort_on_panic<R>(f: impl FnOnce() -> R) -> R {
struct Bomb;
impl Drop for Bomb {
fn drop(&mut self) {
std::process::abort();
}
}
let bomb = Bomb;
let r = f();
mem::forget(bomb);
r
}
} else if #[cfg(unix)] {
use std::os::unix::net::UnixStream;

View File

@ -44,6 +44,22 @@ fn smoke_failure() {
.is_err());
}
#[test]
fn wait() {
future::block_on({
async_process::cleanup_zombies(async {
let p = if cfg!(target_os = "windows") {
Command::new("timeout").args(&["/t", "3"]).spawn()
} else {
Command::new("sleep").arg("3").spawn()
};
assert!(p.is_ok());
let mut p = p.unwrap();
assert!(p.status().await.unwrap().success());
})
});
}
#[test]
fn exit_reported_right() {
future::block_on(async {