Don't reap the child while there's a Child instance

This commit is contained in:
Stjepan Glavina 2020-08-20 19:47:23 +00:00
parent 79706e0bc0
commit 377e658288
2 changed files with 65 additions and 46 deletions

View File

@ -13,10 +13,10 @@ categories = ["asynchronous", "os"]
readme = "README.md"
[dependencies]
async-channel = "1.4.0"
cfg-if = "0.1.10"
once_cell = "1.4.1"
event-listener = "2.4.0"
futures-lite = "0.1.11"
once_cell = "1.4.1"
[target.'cfg(unix)'.dependencies]
async-io = "0.1.11"

View File

@ -60,17 +60,29 @@ use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::thread;
use async_channel::{Receiver, Sender};
#[cfg(unix)]
use async_io::Async;
#[cfg(windows)]
use blocking::Unblock;
use event_listener::Event;
use futures_lite::{future, io, AsyncReadExt};
use once_cell::sync::Lazy;
#[doc(no_inline)]
pub use std::process::{ExitStatus, Output, Stdio};
/// An event delivered every time the SIGCHLD signal occurs.
static SIGCHLD: Event = Event::new();
/// A guard that pushes abandoned child processes into the zombie list.
struct ChildGuard(Option<std::process::Child>);
impl ChildGuard {
fn get_mut(&mut self) -> &mut std::process::Child {
self.0.as_mut().unwrap()
}
}
/// A spawned child process.
///
/// The process can be in running or exited state. Use [`status()`][`Child::status()`] or
@ -99,11 +111,8 @@ pub struct Child {
/// The handle for reading from the child's standard error (stderr), if it has been captured.
pub stderr: Option<ChildStderr>,
/// The inner handle to the child process.
child: Arc<Mutex<std::process::Child>>,
/// A channel that gets closed when the process exits.
exited: Receiver<()>,
/// The inner child process handle.
child: Arc<Mutex<ChildGuard>>,
}
impl Child {
@ -123,7 +132,7 @@ impl Child {
};
// This channel is used to simulate SIGCHLD on Windows.
static SIGCHLD: Lazy<(mpsc::SyncSender<()>, Mutex<mpsc::Receiver<()>>)> =
static CALLBACK: Lazy<(mpsc::SyncSender<()>, Mutex<mpsc::Receiver<()>>)> =
Lazy::new(|| {
let (s, r) = mpsc::sync_channel(1);
(s, Mutex::new(r))
@ -131,7 +140,7 @@ impl Child {
// Called when a child exits.
unsafe extern "system" fn callback(_: PVOID, _: BOOLEAN) {
let _ = SIGCHLD.0.try_send(());
let _ = CALLBACK.0.try_send(());
}
// Register this child process to invoke `callback` on exit.
@ -152,7 +161,7 @@ impl Child {
// Waits for the next SIGCHLD signal.
fn wait_sigchld() {
let _ = SIGCHLD.1.lock().unwrap().recv();
let _ = CALLBACK.1.lock().unwrap().recv();
}
// Wraps a sync I/O type into an async I/O type.
@ -181,14 +190,7 @@ impl Child {
}
}
// An entry in the list of running child processes.
struct Entry {
child: Arc<Mutex<std::process::Child>>,
_exited: Sender<()>,
}
// The global list of running child processes.
static CHILDREN: Lazy<Mutex<Vec<Entry>>> = Lazy::new(|| {
static ZOMBIES: Lazy<Mutex<Vec<std::process::Child>>> = Lazy::new(|| {
// Start a thread that handles SIGCHLD and notifies tasks when child processes exit.
thread::Builder::new()
.name("async-process".to_string())
@ -197,13 +199,19 @@ impl Child {
// Wait for the next SIGCHLD signal.
wait_sigchld();
// Remove processes that have exited. When an entry is removed from this
// `Vec`, its associated `Sender` is dropped, thus disconnecting the
// channel and waking up the task waiting on the `Receiver`.
CHILDREN.lock().unwrap().retain(|entry| {
let mut child = entry.child.lock().unwrap();
child.try_wait().expect("error waiting a child").is_none()
});
// Notify all listeners waiting on the SIGCHLD event.
SIGCHLD.notify(std::usize::MAX);
// Reap zombie processes.
let mut zombies = ZOMBIES.lock().unwrap();
let mut i = 0;
while i < zombies.len() {
if let Ok(None) = zombies[i].try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
}
}
}
})
.expect("cannot spawn async-process thread");
@ -211,25 +219,29 @@ impl Child {
Mutex::new(Vec::new())
});
// Make sure the thread is started.
Lazy::force(&ZOMBIES);
// When the last reference to the child process is dropped, push it into the zombie list.
impl Drop for ChildGuard {
fn drop(&mut self) {
let mut zombies = ZOMBIES.lock().unwrap();
if let Ok(None) = self.get_mut().try_wait() {
zombies.push(self.0.take().unwrap());
}
}
}
// Convert sync I/O types into async I/O types.
let stdin = child.stdin.take().map(wrap).transpose()?.map(ChildStdin);
let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout);
let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr);
// Register the child process in the global list.
let child = Arc::new(Mutex::new(child));
let (sender, exited) = async_channel::bounded(1);
CHILDREN.lock().unwrap().push(Entry {
child: child.clone(),
_exited: sender,
});
Ok(Child {
stdin,
stdout,
stderr,
child,
exited,
child: Arc::new(Mutex::new(ChildGuard(Some(child)))),
})
}
@ -246,7 +258,7 @@ impl Child {
/// # std::io::Result::Ok(()) });
/// ```
pub fn id(&self) -> u32 {
self.child.lock().unwrap().id()
self.child.lock().unwrap().get_mut().id()
}
/// Forces the child process to exit.
@ -269,7 +281,7 @@ impl Child {
/// # std::io::Result::Ok(()) });
/// ```
pub fn kill(&mut self) -> io::Result<()> {
self.child.lock().unwrap().kill()
self.child.lock().unwrap().get_mut().kill()
}
/// Returns the exit status if the process has exited.
@ -291,7 +303,7 @@ impl Child {
/// # std::io::Result::Ok(()) });
/// ```
pub fn try_status(&mut self) -> io::Result<Option<ExitStatus>> {
self.child.lock().unwrap().try_wait()
self.child.lock().unwrap().get_mut().try_wait()
}
/// Drops the stdin handle and waits for the process to exit.
@ -317,11 +329,18 @@ impl Child {
pub fn status(&mut self) -> impl Future<Output = io::Result<ExitStatus>> {
self.stdin.take();
let child = self.child.clone();
let exited = self.exited.clone();
async move {
let _ = exited.recv().await;
child.lock().unwrap().wait()
let mut listener = None;
loop {
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
return Ok(status);
}
match listener.take() {
None => listener = Some(SIGCHLD.listen()),
Some(listener) => listener.await,
}
}
}
}
@ -359,7 +378,7 @@ impl Child {
if let Some(mut s) = stdout {
s.read_to_end(&mut v).await?;
}
Ok(v)
io::Result::Ok(v)
};
// A future that collects stderr.
@ -369,12 +388,12 @@ impl Child {
if let Some(mut s) = stderr {
s.read_to_end(&mut v).await?;
}
Ok(v)
io::Result::Ok(v)
};
async move {
let (status, (stdout, stderr)) =
future::try_join(status, future::try_join(stdout, stderr)).await?;
let (stdout, stderr) = future::try_join(stdout, stderr).await?;
let status = status.await?;
Ok(Output {
status,
stdout,