Review comments

Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
John Nunley 2023-10-08 08:26:26 -07:00
parent 7756cb4552
commit f6dac9af91
No known key found for this signature in database
GPG Key ID: 397D2B00FEA368AA
1 changed files with 32 additions and 23 deletions

View File

@ -63,7 +63,7 @@ use std::mem;
use std::path::Path;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Once};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::thread;
@ -93,6 +93,10 @@ mod sealed {
pub trait Sealed {}
}
#[cfg(test)]
static DRIVER_THREAD_SPAWNED: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(false);
/// The zombie process reaper.
///
/// This structure reaps zombie processes and emits the `SIGCHLD` signal.
@ -114,9 +118,6 @@ struct Reaper {
/// If this is zero, the `async-process` thread must be spawned.
drivers: AtomicUsize,
/// Ensures the `async-process` thread is only ever spawned once.
async_process_thread: Once,
/// Number of live `Child` instances currently running.
///
/// This is used to prevent the reaper thread from being spawned right as the program closes,
@ -135,7 +136,6 @@ impl Reaper {
pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
driver_guard: AsyncMutex::new(()),
drivers: AtomicUsize::new(0),
async_process_thread: Once::new(),
child_count: AtomicUsize::new(0),
})
}
@ -145,7 +145,11 @@ impl Reaper {
/// If there are no active `driver()` callers, this will spawn the `async-process` thread.
#[inline]
fn ensure_driven(&'static self) {
if self.drivers.load(Ordering::Acquire) == 0 {
if self
.drivers
.compare_exchange(0, 1, Ordering::SeqCst, Ordering::Acquire)
.is_ok()
{
self.start_driver_thread();
}
}
@ -153,24 +157,25 @@ impl Reaper {
/// Start the `async-process` thread.
#[cold]
fn start_driver_thread(&'static self) {
self.async_process_thread.call_once(move || {
thread::Builder::new()
.name("async-process".to_string())
.spawn(move || {
let driver = async move {
self.drivers.fetch_add(1, Ordering::SeqCst);
let guard = self.driver_guard.lock().await;
self.reap(guard).await
};
#[cfg(test)]
DRIVER_THREAD_SPAWNED.store(true, Ordering::SeqCst);
#[cfg(unix)]
async_io::block_on(driver);
thread::Builder::new()
.name("async-process".to_string())
.spawn(move || {
let driver = async move {
// No need to bump self.drivers, it was already bumped in ensure_driven.
let guard = self.driver_guard.lock().await;
self.reap(guard).await
};
#[cfg(not(unix))]
future::block_on(driver);
})
.expect("cannot spawn async-process thread");
});
#[cfg(unix)]
async_io::block_on(driver);
#[cfg(not(unix))]
future::block_on(driver);
})
.expect("cannot spawn async-process thread");
}
/// Reap zombie processes forever.
@ -199,7 +204,10 @@ impl Reaper {
}
// Be a good citizen; yield if there are a lot of processes.
//
// After we yield, check if there are more zombie processes.
future::yield_now().await;
zombies.append(&mut self.zombies.lock().unwrap());
}
// Put zombie processes back.
@ -1307,7 +1315,8 @@ mod test {
use futures_lite::future;
use futures_lite::prelude::*;
let is_thread_spawned = || crate::Reaper::get().async_process_thread.is_completed();
let is_thread_spawned =
|| super::DRIVER_THREAD_SPAWNED.load(std::sync::atomic::Ordering::SeqCst);
#[cfg(unix)]
fn command() -> Command {