Port from once-cell to async-lock (#26)

This commit is contained in:
John Nunley 2022-11-26 19:43:59 -08:00 committed by GitHub
parent 115741b59a
commit a44d0b418a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 20 deletions

View File

@ -15,10 +15,10 @@ categories = ["asynchronous", "os"]
exclude = ["/.*"] exclude = ["/.*"]
[dependencies] [dependencies]
async-lock = "2.6.0"
cfg-if = "1.0" cfg-if = "1.0"
event-listener = "2.4.0" event-listener = "2.4.0"
futures-lite = "1.11.0" futures-lite = "1.11.0"
once_cell = "1.4.1"
[build-dependencies] [build-dependencies]
autocfg = "1" autocfg = "1"

View File

@ -70,9 +70,9 @@ use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(windows)] #[cfg(windows)]
use blocking::Unblock; use blocking::Unblock;
use async_lock::OnceCell;
use event_listener::Event; use event_listener::Event;
use futures_lite::{future, io, prelude::*}; use futures_lite::{future, io, prelude::*};
use once_cell::sync::Lazy;
#[doc(no_inline)] #[doc(no_inline)]
pub use std::process::{ExitStatus, Output, Stdio}; pub use std::process::{ExitStatus, Output, Stdio};
@ -154,15 +154,22 @@ impl Child {
}; };
// This channel is used to simulate SIGCHLD on Windows. // This channel is used to simulate SIGCHLD on Windows.
static CALLBACK: Lazy<(mpsc::SyncSender<()>, Mutex<mpsc::Receiver<()>>)> = fn callback_channel() -> (&'static mpsc::SyncSender<()>, &'static Mutex<mpsc::Receiver<()>>) {
Lazy::new(|| { static CALLBACK: OnceCell<(mpsc::SyncSender<()>, Mutex<mpsc::Receiver<()>>)> =
OnceCell::new();
let (s, r) = CALLBACK.get_or_init_blocking(|| {
let (s, r) = mpsc::sync_channel(1); let (s, r) = mpsc::sync_channel(1);
(s, Mutex::new(r)) (s, Mutex::new(r))
}); });
(s, r)
}
// Called when a child exits. // Called when a child exits.
unsafe extern "system" fn callback(_: PVOID, _: BOOLEAN) { unsafe extern "system" fn callback(_: PVOID, _: BOOLEAN) {
CALLBACK.0.try_send(()).ok(); callback_channel().0.try_send(()).ok();
} }
// Register this child process to invoke `callback` on exit. // Register this child process to invoke `callback` on exit.
@ -183,7 +190,7 @@ impl Child {
// Waits for the next SIGCHLD signal. // Waits for the next SIGCHLD signal.
fn wait_sigchld() { fn wait_sigchld() {
CALLBACK.1.lock().unwrap().recv().ok(); callback_channel().1.lock().unwrap().recv().ok();
} }
// Wraps a sync I/O type into an async I/O type. // Wraps a sync I/O type into an async I/O type.
@ -192,19 +199,17 @@ impl Child {
} }
} else if #[cfg(unix)] { } else if #[cfg(unix)] {
static SIGNALS: Lazy<Mutex<signal_hook::iterator::Signals>> = Lazy::new(|| { static SIGNALS: OnceCell<Mutex<signal_hook::iterator::Signals>> = OnceCell::new();
Mutex::new(
signal_hook::iterator::Signals::new(&[signal_hook::consts::SIGCHLD])
.expect("cannot set signal handler for SIGCHLD"),
)
});
// Make sure the signal handler is registered before interacting with the process. // Make sure the signal handler is registered before interacting with the process.
Lazy::force(&SIGNALS); SIGNALS.get_or_init_blocking(|| Mutex::new(
signal_hook::iterator::Signals::new(&[signal_hook::consts::SIGCHLD])
.expect("cannot set signal handler for SIGCHLD"),
));
// Waits for the next SIGCHLD signal. // Waits for the next SIGCHLD signal.
fn wait_sigchld() { fn wait_sigchld() {
SIGNALS.lock().unwrap().forever().next(); SIGNALS.get().expect("Signals not registered").lock().unwrap().forever().next();
} }
// Wraps a sync I/O type into an async I/O type. // Wraps a sync I/O type into an async I/O type.
@ -214,7 +219,10 @@ impl Child {
} }
} }
static ZOMBIES: Lazy<Mutex<Vec<std::process::Child>>> = Lazy::new(|| { static ZOMBIES: OnceCell<Mutex<Vec<std::process::Child>>> = OnceCell::new();
// Make sure the thread is started.
ZOMBIES.get_or_init_blocking(|| {
// Start a thread that handles SIGCHLD and notifies tasks when child processes exit. // Start a thread that handles SIGCHLD and notifies tasks when child processes exit.
thread::Builder::new() thread::Builder::new()
.name("async-process".to_string()) .name("async-process".to_string())
@ -227,7 +235,7 @@ impl Child {
SIGCHLD.notify(std::usize::MAX); SIGCHLD.notify(std::usize::MAX);
// Reap zombie processes. // Reap zombie processes.
let mut zombies = ZOMBIES.lock().unwrap(); let mut zombies = ZOMBIES.get().unwrap().lock().unwrap();
let mut i = 0; let mut i = 0;
while i < zombies.len() { while i < zombies.len() {
if let Ok(None) = zombies[i].try_wait() { if let Ok(None) = zombies[i].try_wait() {
@ -243,9 +251,6 @@ impl Child {
Mutex::new(Vec::new()) Mutex::new(Vec::new())
}); });
// Make sure the thread is started.
Lazy::force(&ZOMBIES);
// When the last reference to the child process is dropped, push it into the zombie list. // When the last reference to the child process is dropped, push it into the zombie list.
impl Drop for ChildGuard { impl Drop for ChildGuard {
fn drop(&mut self) { fn drop(&mut self) {
@ -253,7 +258,7 @@ impl Child {
self.get_mut().kill().ok(); self.get_mut().kill().ok();
} }
if self.reap_on_drop { if self.reap_on_drop {
let mut zombies = ZOMBIES.lock().unwrap(); let mut zombies = ZOMBIES.get().unwrap().lock().unwrap();
if let Ok(None) = self.get_mut().try_wait() { if let Ok(None) = self.get_mut().try_wait() {
zombies.push(self.inner.take().unwrap()); zombies.push(self.inner.take().unwrap());
} }