Compare commits
49 Commits
Author | SHA1 | Date |
---|---|---|
John Nunley | a9e4b09a6e | |
cowlicks | 76badb6964 | |
John Nunley | 7323b449d7 | |
kennytm | 06fb10ac23 | |
John Nunley | 581c0a02c0 | |
John Nunley | 420c303921 | |
John Nunley | bbd42b56f8 | |
John Nunley | 1e0751fe65 | |
John Nunley | 02699a4f04 | |
John Nunley | ed17f53035 | |
John Nunley | b6c1ddea79 | |
John Nunley | 35a77ff266 | |
John Nunley | d23ab0b6a3 | |
dependabot[bot] | 0357928248 | |
John Nunley | d0993f8d0b | |
John Nunley | 90f343f0ad | |
dependabot[bot] | 54647bef65 | |
dependabot[bot] | 297ac50224 | |
John Nunley | 65cde366d4 | |
John Nunley | f733a83c22 | |
Taiki Endo | 9f9351bc52 | |
John Nunley | b29af2b72c | |
John Nunley | 513b9262d7 | |
John Nunley | ce7ded77e8 | |
John Nunley | 78342ab1db | |
John Nunley | 52a693e4dd | |
John Nunley | 5e8e0b7c7b | |
Taiki Endo | ac1c639e3e | |
John Nunley | 07165c72f5 | |
dependabot[bot] | 1715616859 | |
John Nunley | d45d6f1094 | |
Taiki Endo | d9d97d0299 | |
Taiki Endo | 7eb60b1025 | |
John Nunley | 5e48a40d6c | |
John Nunley | 1a14d501cb | |
Taiki Endo | 01e36f4abe | |
Taiki Endo | 99b9abc536 | |
Taiki Endo | 811dee59ac | |
John Nunley | e41847a378 | |
dependabot[bot] | 9f57bbfeb7 | |
dependabot[bot] | e84c3fd53c | |
Taiki Endo | e086897e53 | |
Taiki Endo | f76d325959 | |
Taiki Endo | 7980b4696a | |
Taiki Endo | 73f3f8f308 | |
Taiki Endo | 93be5c2506 | |
Taiki Endo | 2a2c1ee34a | |
Taiki Endo | 8226196372 | |
Taiki Endo | 8559816dc6 |
|
@ -0,0 +1,9 @@
|
|||
version: 2
|
||||
updates:
|
||||
- package-ecosystem: cargo
|
||||
directory: /
|
||||
schedule:
|
||||
interval: weekly
|
||||
commit-message:
|
||||
prefix: ''
|
||||
labels: []
|
|
@ -1,16 +1,29 @@
|
|||
name: CI
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
schedule:
|
||||
- cron: '0 2 * * *'
|
||||
- cron: '0 2 * * 0'
|
||||
|
||||
env:
|
||||
RUSTFLAGS: -D warnings
|
||||
CARGO_INCREMENTAL: 0
|
||||
CARGO_NET_GIT_FETCH_WITH_CLI: true
|
||||
CARGO_NET_RETRY: 10
|
||||
CARGO_TERM_COLOR: always
|
||||
RUST_BACKTRACE: 1
|
||||
RUSTFLAGS: -D warnings
|
||||
RUSTDOCFLAGS: -D warnings
|
||||
RUSTUP_MAX_RETRIES: 10
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
|
||||
jobs:
|
||||
test:
|
||||
|
@ -21,7 +34,7 @@ jobs:
|
|||
os: [ubuntu-latest, windows-latest, macos-latest]
|
||||
rust: [nightly, beta, stable]
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install Rust
|
||||
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
|
||||
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }}
|
||||
|
@ -30,16 +43,21 @@ jobs:
|
|||
if: startsWith(matrix.rust, 'nightly')
|
||||
run: cargo check -Z features=dev_dep
|
||||
- run: cargo test
|
||||
- run: cargo test
|
||||
env:
|
||||
RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg async_process_force_signal_backend
|
||||
|
||||
msrv:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
os: [ubuntu-latest, windows-latest]
|
||||
# When updating this, the reminder to update the minimum supported
|
||||
# Rust version in Cargo.toml.
|
||||
rust: ['1.46']
|
||||
rust: ['1.63']
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install Rust
|
||||
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
|
||||
- run: cargo build
|
||||
|
@ -47,7 +65,7 @@ jobs:
|
|||
clippy:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install Rust
|
||||
run: rustup update stable
|
||||
- run: cargo clippy --all-features --all-targets
|
||||
|
@ -55,15 +73,20 @@ jobs:
|
|||
fmt:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install Rust
|
||||
run: rustup update stable
|
||||
- run: cargo fmt --all --check
|
||||
|
||||
security_audit:
|
||||
permissions:
|
||||
checks: write
|
||||
contents: read
|
||||
issues: write
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions-rs/audit-check@v1
|
||||
- uses: actions/checkout@v4
|
||||
# https://github.com/rustsec/audit-check/issues/2
|
||||
- uses: rustsec/audit-check@master
|
||||
with:
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
name: Release
|
||||
|
||||
permissions:
|
||||
contents: write
|
||||
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
|
@ -10,7 +13,7 @@ jobs:
|
|||
if: github.repository_owner == 'smol-rs'
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- uses: taiki-e/create-gh-release-action@v1
|
||||
with:
|
||||
changelog: CHANGELOG.md
|
||||
|
|
44
CHANGELOG.md
44
CHANGELOG.md
|
@ -1,3 +1,47 @@
|
|||
# Version 2.2.2
|
||||
|
||||
- Fix a typo in the docs for `ChildStdin`. (#76)
|
||||
|
||||
# Version 2.2.1
|
||||
|
||||
- Fix a compilation error for 32-bit operating systems by using a 32-bit zombie counter. (#75)
|
||||
|
||||
# Version 2.2.0
|
||||
|
||||
- Port Linux to a new backend that tries to use `pidfd` if it is available. (#68)
|
||||
|
||||
# Version 2.1.0
|
||||
|
||||
- Update `event-listener` to v5.1.0. (#67)
|
||||
|
||||
# Version 2.0.1
|
||||
|
||||
- Update `event-listener` to v4.0.0. (#64)
|
||||
- Update `windows-sys` to v0.52.0. (#65)
|
||||
|
||||
# Version 2.0.0
|
||||
|
||||
- **Breaking:** Remove the `pre_exec` extension function on Unix. It is still available through the `From<std::process::Command>` implementation on `Command`. (#54)
|
||||
- Add the `driver()` function, which allows the processes to be driven without a separate thread. (#52)
|
||||
- Bump `async-io` to v2.0.0 and `async-channel` to v2.0.0. (#60)
|
||||
|
||||
# Version 1.8.1
|
||||
|
||||
- Bump `async-signal` to v0.2.3. (#56)
|
||||
|
||||
# Version 1.8.0
|
||||
|
||||
- Move from `signal-hook` to the `async-signal` crate. (#42)
|
||||
- Reorganize the internals of this crate to be more coherent. (#46)
|
||||
- Bump to `event-listener` v3.0.0. (#43)
|
||||
|
||||
# Version 1.7.0
|
||||
|
||||
- Replace direct dependency on libc with rustix. (#31)
|
||||
- Reduce the number of syscalls used in the `into_stdio` method. (#31)
|
||||
- Add windows::CommandExt::raw_arg on Rust 1.62+. (#32)
|
||||
- Update windows-sys to 0.48. (#39)
|
||||
|
||||
# Version 1.6.0
|
||||
|
||||
- Switch from `winapi` to `windows-sys` (#27)
|
||||
|
|
41
Cargo.toml
41
Cargo.toml
|
@ -2,11 +2,11 @@
|
|||
name = "async-process"
|
||||
# When publishing a new version:
|
||||
# - Update CHANGELOG.md
|
||||
# - Create "v1.x.y" git tag
|
||||
version = "1.6.0"
|
||||
# - Create "v2.x.y" git tag
|
||||
version = "2.2.2"
|
||||
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
|
||||
edition = "2018"
|
||||
rust-version = "1.46"
|
||||
edition = "2021"
|
||||
rust-version = "1.63"
|
||||
description = "Async interface for working with processes"
|
||||
license = "Apache-2.0 OR MIT"
|
||||
repository = "https://github.com/smol-rs/async-process"
|
||||
|
@ -15,31 +15,36 @@ categories = ["asynchronous", "os"]
|
|||
exclude = ["/.*"]
|
||||
|
||||
[dependencies]
|
||||
async-lock = "2.6.0"
|
||||
async-lock = "3.0.0"
|
||||
cfg-if = "1.0"
|
||||
event-listener = "2.4.0"
|
||||
futures-lite = "1.11.0"
|
||||
|
||||
[build-dependencies]
|
||||
autocfg = "1"
|
||||
event-listener = "5.1.0"
|
||||
futures-lite = "2.0.0"
|
||||
tracing = { version = "0.1.40", default-features = false }
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
async-io = "1.8"
|
||||
libc = "0.2.88"
|
||||
async-io = "2.1.0"
|
||||
async-signal = "0.2.3"
|
||||
rustix = { version = "0.38", default-features = false, features = ["std", "fs"] }
|
||||
|
||||
[target.'cfg(unix)'.dependencies.signal-hook]
|
||||
version = "0.3.0"
|
||||
features = ["iterator"]
|
||||
default-features = false
|
||||
[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
|
||||
async-channel = "2.0.0"
|
||||
async-task = "4.7.0"
|
||||
|
||||
[target.'cfg(all(unix, not(any(target_os = "linux", target_os = "android"))))'.dependencies]
|
||||
rustix = { version = "0.38", default-features = false, features = ["std", "fs", "process"] }
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
async-channel = "2.0.0"
|
||||
blocking = "1.0.0"
|
||||
|
||||
[target.'cfg(windows)'.dependencies.windows-sys]
|
||||
version = "0.42"
|
||||
version = "0.52"
|
||||
default-features = false
|
||||
features = [
|
||||
"Win32_Foundation",
|
||||
"Win32_System_Threading",
|
||||
"Win32_System_WindowsProgramming"
|
||||
]
|
||||
|
||||
[dev-dependencies]
|
||||
async-executor = "1.5.1"
|
||||
async-io = "2.1.0"
|
||||
|
|
16
build.rs
16
build.rs
|
@ -1,16 +0,0 @@
|
|||
fn main() {
|
||||
let cfg = match autocfg::AutoCfg::new() {
|
||||
Ok(cfg) => cfg,
|
||||
Err(e) => {
|
||||
println!(
|
||||
"cargo:warning=async-process: failed to detect compiler features: {}",
|
||||
e
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if !cfg.probe_rustc_version(1, 63) {
|
||||
autocfg::emit("async_process_no_io_safety");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
//! An example of running a `Command` with a timeout.
|
||||
|
||||
use async_io::Timer;
|
||||
use async_process::{Command, Stdio};
|
||||
use futures_lite::{future, prelude::*};
|
||||
use std::io;
|
||||
|
||||
fn main() -> io::Result<()> {
|
||||
async_io::block_on(async {
|
||||
// Spawn a a command of your choice.
|
||||
let mut child = Command::new("sleep")
|
||||
.arg("3")
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()?;
|
||||
|
||||
// Run a future to drain the stdout of the child.
|
||||
// We can't use output() here because it would be cancelled along with the child when the timeout
|
||||
// expires.
|
||||
let mut stdout = String::new();
|
||||
let drain_stdout = {
|
||||
let buffer = &mut stdout;
|
||||
let mut stdout = child.stdout.take().unwrap();
|
||||
|
||||
async move {
|
||||
stdout.read_to_string(buffer).await?;
|
||||
|
||||
// Wait for the child to exit or the timeout.
|
||||
future::pending().await
|
||||
}
|
||||
};
|
||||
|
||||
// Run a future to drain the stderr of the child.
|
||||
let mut stderr = String::new();
|
||||
let drain_stderr = {
|
||||
let buffer = &mut stderr;
|
||||
let mut stderr = child.stderr.take().unwrap();
|
||||
|
||||
async move {
|
||||
stderr.read_to_string(buffer).await?;
|
||||
|
||||
// Wait for the child to exit or the timeout.
|
||||
future::pending().await
|
||||
}
|
||||
};
|
||||
|
||||
// Run a future that waits for the child to exit.
|
||||
let wait = async move {
|
||||
child.status().await?;
|
||||
|
||||
// Child exited.
|
||||
io::Result::Ok(false)
|
||||
};
|
||||
|
||||
// Run a future that times out after 1 second.
|
||||
let timeout_s = 1;
|
||||
let timeout = async move {
|
||||
Timer::after(std::time::Duration::from_secs(timeout_s)).await;
|
||||
|
||||
// Timed out.
|
||||
Ok(true)
|
||||
};
|
||||
|
||||
// Run the futures concurrently.
|
||||
// Note: For larger scale programs than this you should probably spawn each individual future on
|
||||
// a separate task in an executor.
|
||||
let timed_out = drain_stdout.or(drain_stderr).or(wait).or(timeout).await?;
|
||||
|
||||
if timed_out {
|
||||
println!("The child timed out.");
|
||||
} else {
|
||||
println!("The child exited.");
|
||||
}
|
||||
|
||||
println!("Stdout:\n{}", stdout);
|
||||
println!("Stderr:\n{}", stderr);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
534
src/lib.rs
534
src/lib.rs
|
@ -49,29 +49,32 @@
|
|||
//! ```
|
||||
|
||||
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
|
||||
#![doc(
|
||||
html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
|
||||
)]
|
||||
#![doc(
|
||||
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
|
||||
)]
|
||||
|
||||
use std::convert::Infallible;
|
||||
use std::ffi::OsStr;
|
||||
use std::fmt;
|
||||
use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll};
|
||||
use std::thread;
|
||||
|
||||
#[cfg(unix)]
|
||||
use async_io::Async;
|
||||
#[cfg(all(not(async_process_no_io_safety), unix))]
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
#[cfg(all(not(async_process_no_io_safety), unix))]
|
||||
use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::io::{AsRawFd, RawFd};
|
||||
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
|
||||
|
||||
#[cfg(windows)]
|
||||
use blocking::Unblock;
|
||||
|
||||
use async_lock::OnceCell;
|
||||
use event_listener::Event;
|
||||
use futures_lite::{future, io, prelude::*};
|
||||
|
||||
#[doc(no_inline)]
|
||||
|
@ -82,19 +85,134 @@ pub mod unix;
|
|||
#[cfg(windows)]
|
||||
pub mod windows;
|
||||
|
||||
/// An event delivered every time the SIGCHLD signal occurs.
|
||||
static SIGCHLD: Event = Event::new();
|
||||
mod reaper;
|
||||
|
||||
mod sealed {
|
||||
pub trait Sealed {}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
static DRIVER_THREAD_SPAWNED: std::sync::atomic::AtomicBool =
|
||||
std::sync::atomic::AtomicBool::new(false);
|
||||
|
||||
/// The zombie process reaper.
|
||||
///
|
||||
/// This structure reaps zombie processes and emits the `SIGCHLD` signal.
|
||||
struct Reaper {
|
||||
/// Underlying system reaper.
|
||||
sys: reaper::Reaper,
|
||||
|
||||
/// The number of tasks polling the SIGCHLD event.
|
||||
///
|
||||
/// If this is zero, the `async-process` thread must be spawned.
|
||||
drivers: AtomicUsize,
|
||||
|
||||
/// Number of live `Child` instances currently running.
|
||||
///
|
||||
/// This is used to prevent the reaper thread from being spawned right as the program closes,
|
||||
/// when the reaper thread isn't needed. This represents the number of active processes.
|
||||
child_count: AtomicUsize,
|
||||
}
|
||||
|
||||
impl Reaper {
|
||||
/// Get the singleton instance of the reaper.
|
||||
fn get() -> &'static Self {
|
||||
static REAPER: OnceCell<Reaper> = OnceCell::new();
|
||||
|
||||
REAPER.get_or_init_blocking(|| Reaper {
|
||||
sys: reaper::Reaper::new(),
|
||||
drivers: AtomicUsize::new(0),
|
||||
child_count: AtomicUsize::new(0),
|
||||
})
|
||||
}
|
||||
|
||||
/// Ensure that the reaper is driven.
|
||||
///
|
||||
/// If there are no active `driver()` callers, this will spawn the `async-process` thread.
|
||||
#[inline]
|
||||
fn ensure_driven(&'static self) {
|
||||
if self
|
||||
.drivers
|
||||
.compare_exchange(0, 1, Ordering::SeqCst, Ordering::Acquire)
|
||||
.is_ok()
|
||||
{
|
||||
self.start_driver_thread();
|
||||
}
|
||||
}
|
||||
|
||||
/// Start the `async-process` thread.
|
||||
#[cold]
|
||||
fn start_driver_thread(&'static self) {
|
||||
#[cfg(test)]
|
||||
DRIVER_THREAD_SPAWNED
|
||||
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
|
||||
.unwrap_or_else(|_| unreachable!("Driver thread already spawned"));
|
||||
|
||||
thread::Builder::new()
|
||||
.name("async-process".to_string())
|
||||
.spawn(move || {
|
||||
let driver = async move {
|
||||
// No need to bump self.drivers, it was already bumped in ensure_driven.
|
||||
let guard = self.sys.lock().await;
|
||||
self.sys.reap(guard).await
|
||||
};
|
||||
|
||||
#[cfg(unix)]
|
||||
async_io::block_on(driver);
|
||||
|
||||
#[cfg(not(unix))]
|
||||
future::block_on(driver);
|
||||
})
|
||||
.expect("cannot spawn async-process thread");
|
||||
}
|
||||
|
||||
/// Register a process with this reaper.
|
||||
fn register(&'static self, child: std::process::Child) -> io::Result<reaper::ChildGuard> {
|
||||
self.ensure_driven();
|
||||
self.sys.register(child)
|
||||
}
|
||||
}
|
||||
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(windows)] {
|
||||
// Wraps a sync I/O type into an async I/O type.
|
||||
fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
|
||||
Ok(Unblock::new(io))
|
||||
}
|
||||
} else if #[cfg(unix)] {
|
||||
/// Wrap a file descriptor into a non-blocking I/O type.
|
||||
fn wrap<T: std::os::unix::io::AsFd>(io: T) -> io::Result<Async<T>> {
|
||||
Async::new(io)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A guard that can kill child processes, or push them into the zombie list.
|
||||
struct ChildGuard {
|
||||
inner: Option<std::process::Child>,
|
||||
inner: reaper::ChildGuard,
|
||||
reap_on_drop: bool,
|
||||
kill_on_drop: bool,
|
||||
reaper: &'static Reaper,
|
||||
}
|
||||
|
||||
impl ChildGuard {
|
||||
fn get_mut(&mut self) -> &mut std::process::Child {
|
||||
self.inner.as_mut().unwrap()
|
||||
self.inner.get_mut()
|
||||
}
|
||||
}
|
||||
|
||||
// When the last reference to the child process is dropped, push it into the zombie list.
|
||||
impl Drop for ChildGuard {
|
||||
fn drop(&mut self) {
|
||||
if self.kill_on_drop {
|
||||
self.get_mut().kill().ok();
|
||||
}
|
||||
if self.reap_on_drop {
|
||||
self.inner.reap(&self.reaper.sys);
|
||||
}
|
||||
|
||||
// Decrement number of children.
|
||||
self.reaper.child_count.fetch_sub(1, Ordering::Acquire);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -136,6 +254,8 @@ impl Child {
|
|||
/// The "async-process" thread waits for processes in the global list and cleans up the
|
||||
/// resources when they exit.
|
||||
fn new(cmd: &mut Command) -> io::Result<Child> {
|
||||
// Make sure the reaper exists before we spawn the child process.
|
||||
let reaper = Reaper::get();
|
||||
let mut child = cmd.inner.spawn()?;
|
||||
|
||||
// Convert sync I/O types into async I/O types.
|
||||
|
@ -143,141 +263,21 @@ impl Child {
|
|||
let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout);
|
||||
let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr);
|
||||
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(windows)] {
|
||||
use std::ffi::c_void;
|
||||
use std::os::windows::io::AsRawHandle;
|
||||
use std::sync::mpsc;
|
||||
// Bump the child count.
|
||||
reaper.child_count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
use windows_sys::Win32::{
|
||||
System::{
|
||||
Threading::{RegisterWaitForSingleObject, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE},
|
||||
WindowsProgramming::INFINITE,
|
||||
},
|
||||
Foundation::{BOOLEAN, HANDLE},
|
||||
};
|
||||
|
||||
// This channel is used to simulate SIGCHLD on Windows.
|
||||
fn callback_channel() -> (&'static mpsc::SyncSender<()>, &'static Mutex<mpsc::Receiver<()>>) {
|
||||
static CALLBACK: OnceCell<(mpsc::SyncSender<()>, Mutex<mpsc::Receiver<()>>)> =
|
||||
OnceCell::new();
|
||||
|
||||
let (s, r) = CALLBACK.get_or_init_blocking(|| {
|
||||
let (s, r) = mpsc::sync_channel(1);
|
||||
(s, Mutex::new(r))
|
||||
});
|
||||
|
||||
(s, r)
|
||||
}
|
||||
|
||||
|
||||
// Called when a child exits.
|
||||
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
|
||||
callback_channel().0.try_send(()).ok();
|
||||
}
|
||||
|
||||
// Register this child process to invoke `callback` on exit.
|
||||
let mut wait_object = 0;
|
||||
let ret = unsafe {
|
||||
RegisterWaitForSingleObject(
|
||||
&mut wait_object,
|
||||
child.as_raw_handle() as HANDLE,
|
||||
Some(callback),
|
||||
std::ptr::null_mut(),
|
||||
INFINITE,
|
||||
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
|
||||
)
|
||||
};
|
||||
if ret == 0 {
|
||||
return Err(io::Error::last_os_error());
|
||||
}
|
||||
|
||||
// Waits for the next SIGCHLD signal.
|
||||
fn wait_sigchld() {
|
||||
callback_channel().1.lock().unwrap().recv().ok();
|
||||
}
|
||||
|
||||
// Wraps a sync I/O type into an async I/O type.
|
||||
fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
|
||||
Ok(Unblock::new(io))
|
||||
}
|
||||
|
||||
} else if #[cfg(unix)] {
|
||||
static SIGNALS: OnceCell<Mutex<signal_hook::iterator::Signals>> = OnceCell::new();
|
||||
|
||||
// Make sure the signal handler is registered before interacting with the process.
|
||||
SIGNALS.get_or_init_blocking(|| Mutex::new(
|
||||
signal_hook::iterator::Signals::new(&[signal_hook::consts::SIGCHLD])
|
||||
.expect("cannot set signal handler for SIGCHLD"),
|
||||
));
|
||||
|
||||
// Waits for the next SIGCHLD signal.
|
||||
fn wait_sigchld() {
|
||||
SIGNALS.get().expect("Signals not registered").lock().unwrap().forever().next();
|
||||
}
|
||||
|
||||
// Wraps a sync I/O type into an async I/O type.
|
||||
fn wrap<T: std::os::unix::io::AsRawFd>(io: T) -> io::Result<Async<T>> {
|
||||
Async::new(io)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static ZOMBIES: OnceCell<Mutex<Vec<std::process::Child>>> = OnceCell::new();
|
||||
|
||||
// Make sure the thread is started.
|
||||
ZOMBIES.get_or_init_blocking(|| {
|
||||
// Start a thread that handles SIGCHLD and notifies tasks when child processes exit.
|
||||
thread::Builder::new()
|
||||
.name("async-process".to_string())
|
||||
.spawn(move || {
|
||||
loop {
|
||||
// Wait for the next SIGCHLD signal.
|
||||
wait_sigchld();
|
||||
|
||||
// Notify all listeners waiting on the SIGCHLD event.
|
||||
SIGCHLD.notify(std::usize::MAX);
|
||||
|
||||
// Reap zombie processes.
|
||||
let mut zombies = ZOMBIES.get().unwrap().lock().unwrap();
|
||||
let mut i = 0;
|
||||
while i < zombies.len() {
|
||||
if let Ok(None) = zombies[i].try_wait() {
|
||||
i += 1;
|
||||
} else {
|
||||
zombies.swap_remove(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.expect("cannot spawn async-process thread");
|
||||
|
||||
Mutex::new(Vec::new())
|
||||
});
|
||||
|
||||
// When the last reference to the child process is dropped, push it into the zombie list.
|
||||
impl Drop for ChildGuard {
|
||||
fn drop(&mut self) {
|
||||
if self.kill_on_drop {
|
||||
self.get_mut().kill().ok();
|
||||
}
|
||||
if self.reap_on_drop {
|
||||
let mut zombies = ZOMBIES.get().unwrap().lock().unwrap();
|
||||
if let Ok(None) = self.get_mut().try_wait() {
|
||||
zombies.push(self.inner.take().unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Register the child process in the global list.
|
||||
let inner = reaper.register(child)?;
|
||||
|
||||
Ok(Child {
|
||||
stdin,
|
||||
stdout,
|
||||
stderr,
|
||||
child: Arc::new(Mutex::new(ChildGuard {
|
||||
inner: Some(child),
|
||||
inner,
|
||||
reap_on_drop: cmd.reap_on_drop,
|
||||
kill_on_drop: cmd.kill_on_drop,
|
||||
reaper,
|
||||
})),
|
||||
})
|
||||
}
|
||||
|
@ -367,18 +367,7 @@ impl Child {
|
|||
self.stdin.take();
|
||||
let child = self.child.clone();
|
||||
|
||||
async move {
|
||||
let mut listener = None;
|
||||
loop {
|
||||
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
|
||||
return Ok(status);
|
||||
}
|
||||
match listener.take() {
|
||||
None => listener = Some(SIGCHLD.listen()),
|
||||
Some(listener) => listener.await,
|
||||
}
|
||||
}
|
||||
}
|
||||
async move { Reaper::get().sys.status(&child).await }
|
||||
}
|
||||
|
||||
/// Drops the stdin handle and collects the output of the process.
|
||||
|
@ -452,7 +441,7 @@ impl fmt::Debug for Child {
|
|||
|
||||
/// A handle to a child process's standard input (stdin).
|
||||
///
|
||||
/// When a [`ChildStdin`] is dropped, the underlying handle gets clossed. If the child process was
|
||||
/// When a [`ChildStdin`] is dropped, the underlying handle gets closed. If the child process was
|
||||
/// previously blocked on input, it becomes unblocked after dropping.
|
||||
#[derive(Debug)]
|
||||
pub struct ChildStdin(
|
||||
|
@ -485,7 +474,7 @@ impl ChildStdin {
|
|||
Ok(self.0.into_inner().await.into())
|
||||
} else if #[cfg(unix)] {
|
||||
let child_stdin = self.0.into_inner()?;
|
||||
blocking_fd(child_stdin.as_raw_fd())?;
|
||||
blocking_fd(rustix::fd::AsFd::as_fd(&child_stdin))?;
|
||||
Ok(child_stdin.into())
|
||||
}
|
||||
}
|
||||
|
@ -517,14 +506,14 @@ impl AsRawFd for ChildStdin {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(all(not(async_process_no_io_safety), unix))]
|
||||
#[cfg(unix)]
|
||||
impl AsFd for ChildStdin {
|
||||
fn as_fd(&self) -> BorrowedFd<'_> {
|
||||
self.0.as_fd()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(not(async_process_no_io_safety), unix))]
|
||||
#[cfg(unix)]
|
||||
impl TryFrom<ChildStdin> for OwnedFd {
|
||||
type Error = io::Error;
|
||||
|
||||
|
@ -577,7 +566,7 @@ impl ChildStdout {
|
|||
Ok(self.0.into_inner().await.into())
|
||||
} else if #[cfg(unix)] {
|
||||
let child_stdout = self.0.into_inner()?;
|
||||
blocking_fd(child_stdout.as_raw_fd())?;
|
||||
blocking_fd(rustix::fd::AsFd::as_fd(&child_stdout))?;
|
||||
Ok(child_stdout.into())
|
||||
}
|
||||
}
|
||||
|
@ -601,14 +590,14 @@ impl AsRawFd for ChildStdout {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(all(not(async_process_no_io_safety), unix))]
|
||||
#[cfg(unix)]
|
||||
impl AsFd for ChildStdout {
|
||||
fn as_fd(&self) -> BorrowedFd<'_> {
|
||||
self.0.as_fd()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(not(async_process_no_io_safety), unix))]
|
||||
#[cfg(unix)]
|
||||
impl TryFrom<ChildStdout> for OwnedFd {
|
||||
type Error = io::Error;
|
||||
|
||||
|
@ -650,7 +639,7 @@ impl ChildStderr {
|
|||
Ok(self.0.into_inner().await.into())
|
||||
} else if #[cfg(unix)] {
|
||||
let child_stderr = self.0.into_inner()?;
|
||||
blocking_fd(child_stderr.as_raw_fd())?;
|
||||
blocking_fd(rustix::fd::AsFd::as_fd(&child_stderr))?;
|
||||
Ok(child_stderr.into())
|
||||
}
|
||||
}
|
||||
|
@ -674,14 +663,14 @@ impl AsRawFd for ChildStderr {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(all(not(async_process_no_io_safety), unix))]
|
||||
#[cfg(unix)]
|
||||
impl AsFd for ChildStderr {
|
||||
fn as_fd(&self) -> BorrowedFd<'_> {
|
||||
self.0.as_fd()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(not(async_process_no_io_safety), unix))]
|
||||
#[cfg(unix)]
|
||||
impl TryFrom<ChildStderr> for OwnedFd {
|
||||
type Error = io::Error;
|
||||
|
||||
|
@ -690,6 +679,68 @@ impl TryFrom<ChildStderr> for OwnedFd {
|
|||
}
|
||||
}
|
||||
|
||||
/// Runs the driver for the asynchronous processes.
|
||||
///
|
||||
/// This future takes control of global structures related to driving [`Child`]ren and reaping
|
||||
/// zombie processes. These responsibilities include listening for the `SIGCHLD` signal and
|
||||
/// making sure zombie processes are successfully waited on.
|
||||
///
|
||||
/// If multiple tasks run `driver()` at once, only one will actually drive the reaper; the other
|
||||
/// ones will just sleep. If a task that is driving the reaper is dropped, a previously sleeping
|
||||
/// task will take over. If all tasks driving the reaper are dropped, the "async-process" thread
|
||||
/// will be spawned. The "async-process" thread just blocks on this future and will automatically
|
||||
/// be spawned if no tasks are driving the reaper once a [`Child`] is created.
|
||||
///
|
||||
/// This future will never complete. It is intended to be ran on a background task in your
|
||||
/// executor of choice.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use async_executor::Executor;
|
||||
/// use async_process::{driver, Command};
|
||||
///
|
||||
/// # futures_lite::future::block_on(async {
|
||||
/// // Create an executor and run on it.
|
||||
/// let ex = Executor::new();
|
||||
/// ex.run(async {
|
||||
/// // Run the driver future in the background.
|
||||
/// ex.spawn(driver()).detach();
|
||||
///
|
||||
/// // Run a command.
|
||||
/// Command::new("ls").output().await.ok();
|
||||
/// }).await;
|
||||
/// # });
|
||||
/// ```
|
||||
#[allow(clippy::manual_async_fn)]
|
||||
#[inline]
|
||||
pub fn driver() -> impl Future<Output = Infallible> + Send + 'static {
|
||||
async {
|
||||
// Get the reaper.
|
||||
let reaper = Reaper::get();
|
||||
|
||||
// Make sure the reaper knows we're driving it.
|
||||
reaper.drivers.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
// Decrement the driver count when this future is dropped.
|
||||
let _guard = CallOnDrop(|| {
|
||||
let prev_count = reaper.drivers.fetch_sub(1, Ordering::SeqCst);
|
||||
|
||||
// If this was the last driver, and there are still resources actively using the
|
||||
// reaper, make sure that there is a thread driving the reaper.
|
||||
if prev_count == 1
|
||||
&& (reaper.child_count.load(Ordering::SeqCst) > 0 || reaper.sys.has_zombies())
|
||||
{
|
||||
reaper.ensure_driven();
|
||||
}
|
||||
});
|
||||
|
||||
// Acquire the reaper lock and start polling the SIGCHLD event.
|
||||
let guard = reaper.sys.lock().await;
|
||||
reaper.sys.reap(guard).await
|
||||
}
|
||||
}
|
||||
|
||||
/// A builder for spawning processes.
|
||||
///
|
||||
/// # Examples
|
||||
|
@ -1063,60 +1114,129 @@ impl fmt::Debug for Command {
|
|||
|
||||
/// Moves `Fd` out of non-blocking mode.
|
||||
#[cfg(unix)]
|
||||
fn blocking_fd(fd: std::os::unix::io::RawFd) -> io::Result<()> {
|
||||
// Helper macro to execute a system call that returns an `io::Result`.
|
||||
macro_rules! syscall {
|
||||
($fn:ident ( $($arg:expr),* $(,)? ) ) => {{
|
||||
let res = unsafe { libc::$fn($($arg, )*) };
|
||||
if res == -1 {
|
||||
return Err(std::io::Error::last_os_error());
|
||||
} else {
|
||||
res
|
||||
fn blocking_fd(fd: rustix::fd::BorrowedFd<'_>) -> io::Result<()> {
|
||||
cfg_if::cfg_if! {
|
||||
// ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
|
||||
// for now, as with the standard library, because it seems to behave
|
||||
// differently depending on the platform.
|
||||
// https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d
|
||||
// https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80
|
||||
// https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a
|
||||
if #[cfg(target_os = "linux")] {
|
||||
rustix::io::ioctl_fionbio(fd, false)?;
|
||||
} else {
|
||||
let previous = rustix::fs::fcntl_getfl(fd)?;
|
||||
let new = previous & !rustix::fs::OFlags::NONBLOCK;
|
||||
if new != previous {
|
||||
rustix::fs::fcntl_setfl(fd, new)?;
|
||||
}
|
||||
}};
|
||||
}
|
||||
}
|
||||
|
||||
let res = syscall!(fcntl(fd, libc::F_GETFL));
|
||||
syscall!(fcntl(fd, libc::F_SETFL, res & !libc::O_NONBLOCK));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
mod test {
|
||||
struct CallOnDrop<F: FnMut()>(F);
|
||||
|
||||
#[test]
|
||||
fn test_into_inner() {
|
||||
futures_lite::future::block_on(async {
|
||||
use crate::Command;
|
||||
|
||||
use std::io::Result;
|
||||
use std::process::Stdio;
|
||||
use std::str::from_utf8;
|
||||
|
||||
use futures_lite::AsyncReadExt;
|
||||
|
||||
let mut ls_child = Command::new("cat")
|
||||
.arg("Cargo.toml")
|
||||
.stdout(Stdio::piped())
|
||||
.spawn()?;
|
||||
|
||||
let stdio: Stdio = ls_child.stdout.take().unwrap().into_stdio().await?;
|
||||
|
||||
let mut echo_child = Command::new("grep")
|
||||
.arg("async")
|
||||
.stdin(stdio)
|
||||
.stdout(Stdio::piped())
|
||||
.spawn()?;
|
||||
|
||||
let mut buf = vec![];
|
||||
let mut stdout = echo_child.stdout.take().unwrap();
|
||||
|
||||
stdout.read_to_end(&mut buf).await?;
|
||||
dbg!(from_utf8(&buf).unwrap_or(""));
|
||||
|
||||
Result::Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
impl<F: FnMut()> Drop for CallOnDrop<F> {
|
||||
fn drop(&mut self) {
|
||||
(self.0)();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
#[test]
|
||||
fn polled_driver() {
|
||||
use super::{driver, Command};
|
||||
use futures_lite::future;
|
||||
use futures_lite::prelude::*;
|
||||
|
||||
let is_thread_spawned =
|
||||
|| super::DRIVER_THREAD_SPAWNED.load(std::sync::atomic::Ordering::SeqCst);
|
||||
|
||||
#[cfg(unix)]
|
||||
fn command() -> Command {
|
||||
let mut cmd = Command::new("sh");
|
||||
cmd.arg("-c").arg("echo hello");
|
||||
cmd
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn command() -> Command {
|
||||
let mut cmd = Command::new("cmd");
|
||||
cmd.arg("/C").arg("echo hello");
|
||||
cmd
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
const OUTPUT: &[u8] = b"hello\n";
|
||||
#[cfg(windows)]
|
||||
const OUTPUT: &[u8] = b"hello\r\n";
|
||||
|
||||
future::block_on(async {
|
||||
// Thread should not be spawned off the bat.
|
||||
assert!(!is_thread_spawned());
|
||||
|
||||
// Spawn a driver.
|
||||
let mut driver1 = Box::pin(driver());
|
||||
future::poll_once(&mut driver1).await;
|
||||
assert!(!is_thread_spawned());
|
||||
|
||||
// We should be able to run the driver in parallel with a process future.
|
||||
async {
|
||||
(&mut driver1).await;
|
||||
}
|
||||
.or(async {
|
||||
let output = command().output().await.unwrap();
|
||||
assert_eq!(output.stdout, OUTPUT);
|
||||
})
|
||||
.await;
|
||||
assert!(!is_thread_spawned());
|
||||
|
||||
// Spawn a second driver.
|
||||
let mut driver2 = Box::pin(driver());
|
||||
future::poll_once(&mut driver2).await;
|
||||
assert!(!is_thread_spawned());
|
||||
|
||||
// Poll both drivers in parallel.
|
||||
async {
|
||||
(&mut driver1).await;
|
||||
}
|
||||
.or(async {
|
||||
(&mut driver2).await;
|
||||
})
|
||||
.or(async {
|
||||
let output = command().output().await.unwrap();
|
||||
assert_eq!(output.stdout, OUTPUT);
|
||||
})
|
||||
.await;
|
||||
assert!(!is_thread_spawned());
|
||||
|
||||
// Once one is dropped, the other should take over.
|
||||
drop(driver1);
|
||||
assert!(!is_thread_spawned());
|
||||
|
||||
// Poll driver2 in parallel with a process future.
|
||||
async {
|
||||
(&mut driver2).await;
|
||||
}
|
||||
.or(async {
|
||||
let output = command().output().await.unwrap();
|
||||
assert_eq!(output.stdout, OUTPUT);
|
||||
})
|
||||
.await;
|
||||
assert!(!is_thread_spawned());
|
||||
|
||||
// Once driver2 is dropped, the thread should not be spawned, as there are no active
|
||||
// child processes..
|
||||
drop(driver2);
|
||||
assert!(!is_thread_spawned());
|
||||
|
||||
// We should now be able to poll the process future independently, it will spawn the
|
||||
// thread.
|
||||
let output = command().output().await.unwrap();
|
||||
assert_eq!(output.stdout, OUTPUT);
|
||||
assert!(is_thread_spawned());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,221 @@
|
|||
//! The underlying system reaper.
|
||||
//!
|
||||
//! There are two backends:
|
||||
//!
|
||||
//! - signal, which waits for SIGCHLD.
|
||||
//! - wait, which waits directly on a process handle.
|
||||
//!
|
||||
//! "wait" is preferred, but is not available on all supported Linuxes. So we
|
||||
//! test to see if pidfd is supported first. If it is, we use wait. If not, we use
|
||||
//! signal.
|
||||
|
||||
#![allow(irrefutable_let_patterns)]
|
||||
|
||||
/// Enable the waiting reaper.
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
macro_rules! cfg_wait {
|
||||
($($tt:tt)*) => {$($tt)*};
|
||||
}
|
||||
|
||||
/// Enable the waiting reaper.
|
||||
#[cfg(not(any(target_os = "linux", target_os = "android")))]
|
||||
macro_rules! cfg_wait {
|
||||
($($tt:tt)*) => {};
|
||||
}
|
||||
|
||||
/// Enable signals.
|
||||
macro_rules! cfg_signal {
|
||||
($($tt:tt)*) => {$($tt)*};
|
||||
}
|
||||
|
||||
cfg_wait! {
|
||||
mod wait;
|
||||
}
|
||||
|
||||
cfg_signal! {
|
||||
mod signal;
|
||||
}
|
||||
|
||||
use std::io;
|
||||
use std::sync::Mutex;
|
||||
|
||||
/// The underlying system reaper.
|
||||
pub(crate) enum Reaper {
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
/// The reaper based on the wait backend.
|
||||
Wait(wait::Reaper),
|
||||
|
||||
/// The reaper based on the signal backend.
|
||||
Signal(signal::Reaper),
|
||||
}
|
||||
|
||||
/// The wrapper around a child.
|
||||
pub(crate) enum ChildGuard {
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
/// The child guard based on the wait backend.
|
||||
Wait(wait::ChildGuard),
|
||||
|
||||
/// The child guard based on the signal backend.
|
||||
Signal(signal::ChildGuard),
|
||||
}
|
||||
|
||||
/// A lock on the reaper.
|
||||
pub(crate) enum Lock {
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
/// The wait-based reaper needs no lock.
|
||||
Wait,
|
||||
|
||||
/// The lock for the signal-based reaper.
|
||||
Signal(signal::Lock),
|
||||
}
|
||||
|
||||
impl Reaper {
|
||||
/// Create a new reaper.
|
||||
pub(crate) fn new() -> Self {
|
||||
cfg_wait! {
|
||||
if wait::available() && !cfg!(async_process_force_signal_backend) {
|
||||
return Self::Wait(wait::Reaper::new());
|
||||
}
|
||||
}
|
||||
|
||||
// Return the signal-based reaper.
|
||||
cfg_signal! {
|
||||
return Self::Signal(signal::Reaper::new());
|
||||
}
|
||||
|
||||
#[allow(unreachable_code)]
|
||||
{
|
||||
panic!("neither the signal backend nor the waiter backend is available")
|
||||
}
|
||||
}
|
||||
|
||||
/// Lock the driver thread.
|
||||
///
|
||||
/// This makes it so only one thread can reap at once.
|
||||
pub(crate) async fn lock(&'static self) -> Lock {
|
||||
cfg_wait! {
|
||||
if let Self::Wait(_this) = self {
|
||||
// No locking needed.
|
||||
return Lock::Wait;
|
||||
}
|
||||
}
|
||||
|
||||
cfg_signal! {
|
||||
if let Self::Signal(this) = self {
|
||||
// We need to lock.
|
||||
return Lock::Signal(this.lock().await);
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
/// Reap zombie processes forever.
|
||||
pub(crate) async fn reap(&'static self, lock: Lock) -> ! {
|
||||
cfg_wait! {
|
||||
if let (Self::Wait(this), Lock::Wait) = (self, &lock) {
|
||||
return this.reap().await;
|
||||
}
|
||||
}
|
||||
|
||||
cfg_signal! {
|
||||
if let (Self::Signal(this), Lock::Signal(lock)) = (self, lock) {
|
||||
return this.reap(lock).await;
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
/// Register a child into this reaper.
|
||||
pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
|
||||
cfg_wait! {
|
||||
if let Self::Wait(this) = self {
|
||||
return this.register(child).map(ChildGuard::Wait);
|
||||
}
|
||||
}
|
||||
|
||||
cfg_signal! {
|
||||
if let Self::Signal(this) = self {
|
||||
return this.register(child).map(ChildGuard::Signal);
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
/// Wait for the inner child to complete.
|
||||
pub(crate) async fn status(
|
||||
&'static self,
|
||||
child: &Mutex<crate::ChildGuard>,
|
||||
) -> io::Result<std::process::ExitStatus> {
|
||||
cfg_wait! {
|
||||
if let Self::Wait(this) = self {
|
||||
return this.status(child).await;
|
||||
}
|
||||
}
|
||||
|
||||
cfg_signal! {
|
||||
if let Self::Signal(this) = self {
|
||||
return this.status(child).await;
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
/// Do we have any registered zombie processes?
|
||||
pub(crate) fn has_zombies(&'static self) -> bool {
|
||||
cfg_wait! {
|
||||
if let Self::Wait(this) = self {
|
||||
return this.has_zombies();
|
||||
}
|
||||
}
|
||||
|
||||
cfg_signal! {
|
||||
if let Self::Signal(this) = self {
|
||||
return this.has_zombies();
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
impl ChildGuard {
|
||||
/// Get a reference to the inner process.
|
||||
pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
|
||||
cfg_wait! {
|
||||
if let Self::Wait(this) = self {
|
||||
return this.get_mut();
|
||||
}
|
||||
}
|
||||
|
||||
cfg_signal! {
|
||||
if let Self::Signal(this) = self {
|
||||
return this.get_mut();
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
/// Start reaping this child process.
|
||||
pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
|
||||
cfg_wait! {
|
||||
if let (Self::Wait(this), Reaper::Wait(reaper)) = (&mut *self, reaper) {
|
||||
this.reap(reaper);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
cfg_signal! {
|
||||
if let (Self::Signal(this), Reaper::Signal(reaper)) = (self, reaper) {
|
||||
this.reap(reaper);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,238 @@
|
|||
//! A version of the reaper that waits for a signal to check for process progress.
|
||||
|
||||
use async_lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
|
||||
use event_listener::Event;
|
||||
use futures_lite::future;
|
||||
|
||||
use std::io;
|
||||
use std::mem;
|
||||
use std::sync::Mutex;
|
||||
|
||||
pub(crate) type Lock = AsyncMutexGuard<'static, ()>;
|
||||
|
||||
/// The zombie process reaper.
|
||||
pub(crate) struct Reaper {
|
||||
/// An event delivered every time the SIGCHLD signal occurs.
|
||||
sigchld: Event,
|
||||
|
||||
/// The list of zombie processes.
|
||||
zombies: Mutex<Vec<std::process::Child>>,
|
||||
|
||||
/// The pipe that delivers signal notifications.
|
||||
pipe: Pipe,
|
||||
|
||||
/// Locking this mutex indicates that we are polling the SIGCHLD event.
|
||||
driver_guard: AsyncMutex<()>,
|
||||
}
|
||||
|
||||
impl Reaper {
|
||||
/// Create a new reaper.
|
||||
pub(crate) fn new() -> Self {
|
||||
Reaper {
|
||||
sigchld: Event::new(),
|
||||
zombies: Mutex::new(Vec::new()),
|
||||
pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
|
||||
driver_guard: AsyncMutex::new(()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Lock the driver thread.
|
||||
pub(crate) async fn lock(&self) -> AsyncMutexGuard<'_, ()> {
|
||||
self.driver_guard.lock().await
|
||||
}
|
||||
|
||||
/// Reap zombie processes forever.
|
||||
pub(crate) async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
|
||||
loop {
|
||||
// Wait for the next SIGCHLD signal.
|
||||
self.pipe.wait().await;
|
||||
|
||||
// Notify all listeners waiting on the SIGCHLD event.
|
||||
self.sigchld.notify(std::usize::MAX);
|
||||
|
||||
// Reap zombie processes, but make sure we don't hold onto the lock for too long!
|
||||
let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
|
||||
let mut i = 0;
|
||||
'reap_zombies: loop {
|
||||
for _ in 0..50 {
|
||||
if i >= zombies.len() {
|
||||
break 'reap_zombies;
|
||||
}
|
||||
|
||||
if let Ok(None) = zombies[i].try_wait() {
|
||||
i += 1;
|
||||
} else {
|
||||
zombies.swap_remove(i);
|
||||
}
|
||||
}
|
||||
|
||||
// Be a good citizen; yield if there are a lot of processes.
|
||||
//
|
||||
// After we yield, check if there are more zombie processes.
|
||||
future::yield_now().await;
|
||||
zombies.append(&mut self.zombies.lock().unwrap());
|
||||
}
|
||||
|
||||
// Put zombie processes back.
|
||||
self.zombies.lock().unwrap().append(&mut zombies);
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a process with this reaper.
|
||||
pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
|
||||
self.pipe.register(&child)?;
|
||||
Ok(ChildGuard { inner: Some(child) })
|
||||
}
|
||||
|
||||
/// Wait for an event to occur for a child process.
|
||||
pub(crate) async fn status(
|
||||
&'static self,
|
||||
child: &Mutex<crate::ChildGuard>,
|
||||
) -> io::Result<std::process::ExitStatus> {
|
||||
loop {
|
||||
// Wait on the child process.
|
||||
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
|
||||
return Ok(status);
|
||||
}
|
||||
|
||||
// Start listening.
|
||||
event_listener::listener!(self.sigchld => listener);
|
||||
|
||||
// Try again.
|
||||
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
|
||||
return Ok(status);
|
||||
}
|
||||
|
||||
// Wait on the listener.
|
||||
listener.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Do we have any registered zombie processes?
|
||||
pub(crate) fn has_zombies(&'static self) -> bool {
|
||||
!self
|
||||
.zombies
|
||||
.lock()
|
||||
.unwrap_or_else(|x| x.into_inner())
|
||||
.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// The wrapper around the child.
|
||||
pub(crate) struct ChildGuard {
|
||||
inner: Option<std::process::Child>,
|
||||
}
|
||||
|
||||
impl ChildGuard {
|
||||
/// Get a mutable reference to the inner child.
|
||||
pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
|
||||
self.inner.as_mut().unwrap()
|
||||
}
|
||||
|
||||
/// Begin the reaping process for this child.
|
||||
pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
|
||||
if let Ok(None) = self.get_mut().try_wait() {
|
||||
reaper.zombies.lock().unwrap().push(self.inner.take().unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(windows)] {
|
||||
use async_channel::{Sender, Receiver, bounded};
|
||||
use std::ffi::c_void;
|
||||
use std::os::windows::io::AsRawHandle;
|
||||
|
||||
use windows_sys::Win32::{
|
||||
Foundation::{BOOLEAN, HANDLE},
|
||||
System::Threading::{
|
||||
RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
|
||||
},
|
||||
};
|
||||
|
||||
/// Waits for the next SIGCHLD signal.
|
||||
struct Pipe {
|
||||
/// The sender channel for the SIGCHLD signal.
|
||||
sender: Sender<()>,
|
||||
|
||||
/// The receiver channel for the SIGCHLD signal.
|
||||
receiver: Receiver<()>,
|
||||
}
|
||||
|
||||
impl Pipe {
|
||||
/// Creates a new pipe.
|
||||
fn new() -> io::Result<Pipe> {
|
||||
let (sender, receiver) = bounded(1);
|
||||
Ok(Pipe {
|
||||
sender,
|
||||
receiver
|
||||
})
|
||||
}
|
||||
|
||||
/// Waits for the next SIGCHLD signal.
|
||||
async fn wait(&self) {
|
||||
self.receiver.recv().await.ok();
|
||||
}
|
||||
|
||||
/// Register a process object into this pipe.
|
||||
fn register(&self, child: &std::process::Child) -> io::Result<()> {
|
||||
// Called when a child exits.
|
||||
#[allow(clippy::infallible_destructuring_match)]
|
||||
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
|
||||
let reaper = match &crate::Reaper::get().sys {
|
||||
super::Reaper::Signal(reaper) => reaper,
|
||||
};
|
||||
|
||||
reaper.pipe.sender.try_send(()).ok();
|
||||
}
|
||||
|
||||
// Register this child process to invoke `callback` on exit.
|
||||
let mut wait_object = 0;
|
||||
let ret = unsafe {
|
||||
RegisterWaitForSingleObject(
|
||||
&mut wait_object,
|
||||
child.as_raw_handle() as HANDLE,
|
||||
Some(callback),
|
||||
std::ptr::null_mut(),
|
||||
INFINITE,
|
||||
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
|
||||
)
|
||||
};
|
||||
|
||||
if ret == 0 {
|
||||
Err(io::Error::last_os_error())
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if #[cfg(unix)] {
|
||||
use async_signal::{Signal, Signals};
|
||||
use futures_lite::prelude::*;
|
||||
|
||||
/// Waits for the next SIGCHLD signal.
|
||||
struct Pipe {
|
||||
/// The iterator over SIGCHLD signals.
|
||||
signals: Signals,
|
||||
}
|
||||
|
||||
impl Pipe {
|
||||
/// Creates a new pipe.
|
||||
fn new() -> io::Result<Pipe> {
|
||||
Ok(Pipe {
|
||||
signals: Signals::new(Some(Signal::Child))?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Waits for the next SIGCHLD signal.
|
||||
async fn wait(&self) {
|
||||
(&self.signals).next().await;
|
||||
}
|
||||
|
||||
/// Register a process object into this pipe.
|
||||
fn register(&self, _child: &std::process::Child) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,188 @@
|
|||
//! A version of the reaper that waits on some polling primitive.
|
||||
//!
|
||||
//! This uses:
|
||||
//!
|
||||
//! - pidfd on Linux/Android
|
||||
|
||||
use async_channel::{Receiver, Sender};
|
||||
use async_task::Runnable;
|
||||
use futures_lite::future;
|
||||
|
||||
use std::io;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Mutex;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
/// The zombie process reaper.
|
||||
pub(crate) struct Reaper {
|
||||
/// The channel for sending new runnables.
|
||||
sender: Sender<Runnable>,
|
||||
|
||||
/// The channel for receiving new runnables.
|
||||
recv: Receiver<Runnable>,
|
||||
|
||||
/// Number of zombie processes.
|
||||
zombies: AtomicUsize,
|
||||
}
|
||||
|
||||
impl Reaper {
|
||||
/// Create a new reaper.
|
||||
pub(crate) fn new() -> Self {
|
||||
let (sender, recv) = async_channel::unbounded();
|
||||
Self {
|
||||
sender,
|
||||
recv,
|
||||
zombies: AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
/// Reap zombie processes forever.
|
||||
pub(crate) async fn reap(&'static self) -> ! {
|
||||
loop {
|
||||
// Fetch the next task.
|
||||
let task = match self.recv.recv().await {
|
||||
Ok(task) => task,
|
||||
Err(_) => panic!("sender should never be closed"),
|
||||
};
|
||||
|
||||
// Poll the task.
|
||||
task.run();
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a child into this reaper.
|
||||
pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
|
||||
Ok(ChildGuard {
|
||||
inner: Some(WaitableChild::new(child)?),
|
||||
})
|
||||
}
|
||||
|
||||
/// Wait for a child to complete.
|
||||
pub(crate) async fn status(
|
||||
&'static self,
|
||||
child: &Mutex<crate::ChildGuard>,
|
||||
) -> io::Result<std::process::ExitStatus> {
|
||||
future::poll_fn(|cx| {
|
||||
// Lock the child.
|
||||
let mut child = child.lock().unwrap();
|
||||
|
||||
// Get the inner child value.
|
||||
let inner = match &mut child.inner {
|
||||
super::ChildGuard::Wait(inner) => inner,
|
||||
_ => unreachable!()
|
||||
};
|
||||
|
||||
// Poll for the next value.
|
||||
inner.inner.as_mut().unwrap().poll_wait(cx)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Do we have any registered zombie processes?
|
||||
pub(crate) fn has_zombies(&'static self) -> bool {
|
||||
self.zombies.load(Ordering::SeqCst) > 0
|
||||
}
|
||||
}
|
||||
|
||||
/// The wrapper around the child.
|
||||
pub(crate) struct ChildGuard {
|
||||
inner: Option<WaitableChild>,
|
||||
}
|
||||
|
||||
impl ChildGuard {
|
||||
/// Get a mutable reference to the inner child.
|
||||
pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
|
||||
self.inner.as_mut().unwrap().get_mut()
|
||||
}
|
||||
|
||||
/// Begin the reaping process for this child.
|
||||
pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
|
||||
// Create a future for polling this child.
|
||||
let future = {
|
||||
let mut inner = self.inner.take().unwrap();
|
||||
async move {
|
||||
// Increment the zombie count.
|
||||
reaper.zombies.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Decrement the zombie count once we are done.
|
||||
let _guard = crate::CallOnDrop(|| {
|
||||
reaper.zombies.fetch_sub(1, Ordering::SeqCst);
|
||||
});
|
||||
|
||||
// Wait on this child forever.
|
||||
let result = future::poll_fn(|cx| inner.poll_wait(cx)).await;
|
||||
if let Err(e) = result {
|
||||
tracing::error!("error while polling zombie process: {}", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Create a function for scheduling this future.
|
||||
let schedule = move |runnable| {
|
||||
reaper.sender.try_send(runnable).ok();
|
||||
};
|
||||
|
||||
// Spawn the task and run it forever.
|
||||
let (runnable, task) = async_task::spawn(future, schedule);
|
||||
task.detach();
|
||||
runnable.schedule();
|
||||
}
|
||||
}
|
||||
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(any(
|
||||
target_os = "linux",
|
||||
target_os = "android"
|
||||
))] {
|
||||
use async_io::Async;
|
||||
use rustix::process;
|
||||
use std::os::unix::io::OwnedFd;
|
||||
|
||||
/// Waitable version of `std::process::Child`
|
||||
struct WaitableChild {
|
||||
child: std::process::Child,
|
||||
handle: Async<OwnedFd>,
|
||||
}
|
||||
|
||||
impl WaitableChild {
|
||||
fn new(child: std::process::Child) -> io::Result<Self> {
|
||||
let pidfd = process::pidfd_open(
|
||||
process::Pid::from_child(&child),
|
||||
process::PidfdFlags::empty()
|
||||
)?;
|
||||
|
||||
Ok(Self {
|
||||
child,
|
||||
handle: Async::new(pidfd)?
|
||||
})
|
||||
}
|
||||
|
||||
fn get_mut(&mut self) -> &mut std::process::Child {
|
||||
&mut self.child
|
||||
}
|
||||
|
||||
fn poll_wait(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<std::process::ExitStatus>> {
|
||||
loop {
|
||||
if let Some(status) = self.child.try_wait()? {
|
||||
return Poll::Ready(Ok(status));
|
||||
}
|
||||
|
||||
// Wait for us to become readable.
|
||||
futures_lite::ready!(self.handle.poll_readable(cx))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Tell if we are able to use this backend.
|
||||
pub(crate) fn available() -> bool {
|
||||
// Create a Pidfd for the current process and see if it works.
|
||||
let result = process::pidfd_open(
|
||||
process::getpid(),
|
||||
process::PidfdFlags::empty()
|
||||
);
|
||||
|
||||
// Tell if it was okay or not.
|
||||
result.is_ok()
|
||||
}
|
||||
}
|
||||
}
|
47
src/unix.rs
47
src/unix.rs
|
@ -7,7 +7,10 @@ use std::os::unix::process::CommandExt as _;
|
|||
use crate::Command;
|
||||
|
||||
/// Unix-specific extensions to the [`Command`] builder.
|
||||
pub trait CommandExt {
|
||||
///
|
||||
/// This trait is sealed: it cannot be implemented outside `async-process`.
|
||||
/// This is so that future additional methods are not breaking changes.
|
||||
pub trait CommandExt: crate::sealed::Sealed {
|
||||
/// Sets the child process's user ID. This translates to a
|
||||
/// `setuid` call in the child process. Failure in the `setuid`
|
||||
/// call will cause the spawn to fail.
|
||||
|
@ -17,39 +20,6 @@ pub trait CommandExt {
|
|||
/// the same semantics as the `uid` field.
|
||||
fn gid(&mut self, id: u32) -> &mut Command;
|
||||
|
||||
/// Schedules a closure to be run just before the `exec` function is
|
||||
/// invoked.
|
||||
///
|
||||
/// The closure is allowed to return an I/O error whose OS error code will
|
||||
/// be communicated back to the parent and returned as an error from when
|
||||
/// the spawn was requested.
|
||||
///
|
||||
/// Multiple closures can be registered and they will be called in order of
|
||||
/// their registration. If a closure returns `Err` then no further closures
|
||||
/// will be called and the spawn operation will immediately return with a
|
||||
/// failure.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// This closure will be run in the context of the child process after a
|
||||
/// `fork`. This primarily means that any modifications made to memory on
|
||||
/// behalf of this closure will **not** be visible to the parent process.
|
||||
/// This is often a very constrained environment where normal operations
|
||||
/// like `malloc` or acquiring a mutex are not guaranteed to work (due to
|
||||
/// other threads perhaps still running when the `fork` was run).
|
||||
///
|
||||
/// This also means that all resources such as file descriptors and
|
||||
/// memory-mapped regions got duplicated. It is your responsibility to make
|
||||
/// sure that the closure does not violate library invariants by making
|
||||
/// invalid use of these duplicates.
|
||||
///
|
||||
/// When this closure is run, aspects such as the stdio file descriptors and
|
||||
/// working directory have successfully been changed, so output to these
|
||||
/// locations may not appear where intended.
|
||||
unsafe fn pre_exec<F>(&mut self, f: F) -> &mut Command
|
||||
where
|
||||
F: FnMut() -> io::Result<()> + Send + Sync + 'static;
|
||||
|
||||
/// Performs all the required setup by this `Command`, followed by calling
|
||||
/// the `execvp` syscall.
|
||||
///
|
||||
|
@ -88,6 +58,7 @@ pub trait CommandExt {
|
|||
S: AsRef<OsStr>;
|
||||
}
|
||||
|
||||
impl crate::sealed::Sealed for Command {}
|
||||
impl CommandExt for Command {
|
||||
fn uid(&mut self, id: u32) -> &mut Command {
|
||||
self.inner.uid(id);
|
||||
|
@ -99,14 +70,6 @@ impl CommandExt for Command {
|
|||
self
|
||||
}
|
||||
|
||||
unsafe fn pre_exec<F>(&mut self, f: F) -> &mut Command
|
||||
where
|
||||
F: FnMut() -> io::Result<()> + Send + Sync + 'static,
|
||||
{
|
||||
self.inner.pre_exec(f);
|
||||
self
|
||||
}
|
||||
|
||||
fn exec(&mut self) -> io::Error {
|
||||
self.inner.exec()
|
||||
}
|
||||
|
|
|
@ -1,25 +1,41 @@
|
|||
//! Windows-specific extensions.
|
||||
|
||||
use std::ffi::OsStr;
|
||||
use std::os::windows::io::{AsRawHandle, RawHandle};
|
||||
use std::os::windows::process::CommandExt as _;
|
||||
|
||||
use crate::{Child, Command};
|
||||
|
||||
/// Windows-specific extensions to the [`Command`] builder.
|
||||
pub trait CommandExt {
|
||||
///
|
||||
/// This trait is sealed: it cannot be implemented outside `async-process`.
|
||||
/// This is so that future additional methods are not breaking changes.
|
||||
pub trait CommandExt: crate::sealed::Sealed {
|
||||
/// Sets the [process creation flags][1] to be passed to `CreateProcess`.
|
||||
///
|
||||
/// These will always be ORed with `CREATE_UNICODE_ENVIRONMENT`.
|
||||
///
|
||||
/// [1]: https://docs.microsoft.com/en-us/windows/win32/procthread/process-creation-flags
|
||||
fn creation_flags(&mut self, flags: u32) -> &mut Command;
|
||||
|
||||
/// Append literal text to the command line without any quoting or escaping.
|
||||
///
|
||||
/// This is useful for passing arguments to `cmd.exe /c`, which doesn't follow
|
||||
/// `CommandLineToArgvW` escaping rules.
|
||||
fn raw_arg<S: AsRef<OsStr>>(&mut self, text_to_append_as_is: S) -> &mut Command;
|
||||
}
|
||||
|
||||
impl crate::sealed::Sealed for Command {}
|
||||
impl CommandExt for Command {
|
||||
fn creation_flags(&mut self, flags: u32) -> &mut Command {
|
||||
self.inner.creation_flags(flags);
|
||||
self
|
||||
}
|
||||
|
||||
fn raw_arg<S: AsRef<OsStr>>(&mut self, text_to_append_as_is: S) -> &mut Command {
|
||||
self.inner.raw_arg(text_to_append_as_is);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRawHandle for Child {
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
//! Sleep test.
|
||||
|
||||
use async_process::Command;
|
||||
use futures_lite::future::block_on;
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn unix_sleep() {
|
||||
block_on(async {
|
||||
let status = Command::new("sleep").arg("1").status().await.unwrap();
|
||||
assert!(status.success());
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
#[test]
|
||||
fn windows_sleep() {
|
||||
block_on(async {
|
||||
let status = Command::new("ping")
|
||||
.args(["-n", "5", "127.0.0.1"])
|
||||
.status()
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(status.success());
|
||||
});
|
||||
}
|
77
tests/std.rs
77
tests/std.rs
|
@ -11,7 +11,7 @@ use futures_lite::{future, prelude::*};
|
|||
fn smoke() {
|
||||
future::block_on(async {
|
||||
let p = if cfg!(target_os = "windows") {
|
||||
Command::new("cmd").args(&["/C", "exit 0"]).spawn()
|
||||
Command::new("cmd").args(["/C", "exit 0"]).spawn()
|
||||
} else {
|
||||
Command::new("true").spawn()
|
||||
};
|
||||
|
@ -21,6 +21,25 @@ fn smoke() {
|
|||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_driven() {
|
||||
future::block_on(
|
||||
async {
|
||||
async_process::driver().await;
|
||||
}
|
||||
.or(async {
|
||||
let p = if cfg!(target_os = "windows") {
|
||||
Command::new("cmd").args(["/C", "exit 0"]).spawn()
|
||||
} else {
|
||||
Command::new("true").spawn()
|
||||
};
|
||||
assert!(p.is_ok());
|
||||
let mut p = p.unwrap();
|
||||
assert!(p.status().await.unwrap().success());
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_failure() {
|
||||
assert!(Command::new("if-this-is-a-binary-then-the-world-has-ended")
|
||||
|
@ -32,7 +51,7 @@ fn smoke_failure() {
|
|||
fn exit_reported_right() {
|
||||
future::block_on(async {
|
||||
let p = if cfg!(target_os = "windows") {
|
||||
Command::new("cmd").args(&["/C", "exit 1"]).spawn()
|
||||
Command::new("cmd").args(["/C", "exit 1"]).spawn()
|
||||
} else {
|
||||
Command::new("false").spawn()
|
||||
};
|
||||
|
@ -84,7 +103,7 @@ fn stdout_works() {
|
|||
future::block_on(async {
|
||||
if cfg!(target_os = "windows") {
|
||||
let mut cmd = Command::new("cmd");
|
||||
cmd.args(&["/C", "echo foobar"]).stdout(Stdio::piped());
|
||||
cmd.args(["/C", "echo foobar"]).stdout(Stdio::piped());
|
||||
assert_eq!(run_output(cmd).await, "foobar\r\n");
|
||||
} else {
|
||||
let mut cmd = Command::new("echo");
|
||||
|
@ -142,7 +161,7 @@ fn test_process_status() {
|
|||
future::block_on(async {
|
||||
let mut status = if cfg!(target_os = "windows") {
|
||||
Command::new("cmd")
|
||||
.args(&["/C", "exit 1"])
|
||||
.args(["/C", "exit 1"])
|
||||
.status()
|
||||
.await
|
||||
.unwrap()
|
||||
|
@ -153,7 +172,7 @@ fn test_process_status() {
|
|||
|
||||
status = if cfg!(target_os = "windows") {
|
||||
Command::new("cmd")
|
||||
.args(&["/C", "exit 0"])
|
||||
.args(["/C", "exit 0"])
|
||||
.status()
|
||||
.await
|
||||
.unwrap()
|
||||
|
@ -186,7 +205,7 @@ fn test_process_output_output() {
|
|||
stderr,
|
||||
} = if cfg!(target_os = "windows") {
|
||||
Command::new("cmd")
|
||||
.args(&["/C", "echo hello"])
|
||||
.args(["/C", "echo hello"])
|
||||
.output()
|
||||
.await
|
||||
.unwrap()
|
||||
|
@ -210,7 +229,7 @@ fn test_process_output_error() {
|
|||
stderr,
|
||||
} = if cfg!(target_os = "windows") {
|
||||
Command::new("cmd")
|
||||
.args(&["/C", "mkdir ."])
|
||||
.args(["/C", "mkdir ."])
|
||||
.output()
|
||||
.await
|
||||
.unwrap()
|
||||
|
@ -228,7 +247,7 @@ fn test_process_output_error() {
|
|||
fn test_finish_once() {
|
||||
future::block_on(async {
|
||||
let mut prog = if cfg!(target_os = "windows") {
|
||||
Command::new("cmd").args(&["/C", "exit 1"]).spawn().unwrap()
|
||||
Command::new("cmd").args(["/C", "exit 1"]).spawn().unwrap()
|
||||
} else {
|
||||
Command::new("false").spawn().unwrap()
|
||||
};
|
||||
|
@ -240,7 +259,7 @@ fn test_finish_once() {
|
|||
fn test_finish_twice() {
|
||||
future::block_on(async {
|
||||
let mut prog = if cfg!(target_os = "windows") {
|
||||
Command::new("cmd").args(&["/C", "exit 1"]).spawn().unwrap()
|
||||
Command::new("cmd").args(["/C", "exit 1"]).spawn().unwrap()
|
||||
} else {
|
||||
Command::new("false").spawn().unwrap()
|
||||
};
|
||||
|
@ -254,7 +273,7 @@ fn test_wait_with_output_once() {
|
|||
future::block_on(async {
|
||||
let prog = if cfg!(target_os = "windows") {
|
||||
Command::new("cmd")
|
||||
.args(&["/C", "echo hello"])
|
||||
.args(["/C", "echo hello"])
|
||||
.stdout(Stdio::piped())
|
||||
.spawn()
|
||||
.unwrap()
|
||||
|
@ -308,7 +327,7 @@ fn test_override_env() {
|
|||
let mut cmd = env_cmd();
|
||||
cmd.env_clear().env("RUN_TEST_NEW_ENV", "123");
|
||||
if let Some(p) = env::var_os("PATH") {
|
||||
cmd.env("PATH", &p);
|
||||
cmd.env("PATH", p);
|
||||
}
|
||||
let result = cmd.output().await.unwrap();
|
||||
let output = String::from_utf8_lossy(&result.stdout).to_string();
|
||||
|
@ -428,3 +447,39 @@ fn test_spawn_multiple_with_stdio() {
|
|||
assert_eq!(out2.stderr, b"bar\n");
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn test_into_inner() {
|
||||
futures_lite::future::block_on(async {
|
||||
use crate::Command;
|
||||
|
||||
use std::io::Result;
|
||||
use std::process::Stdio;
|
||||
use std::str::from_utf8;
|
||||
|
||||
use futures_lite::AsyncReadExt;
|
||||
|
||||
let mut ls_child = Command::new("cat")
|
||||
.arg("Cargo.toml")
|
||||
.stdout(Stdio::piped())
|
||||
.spawn()?;
|
||||
|
||||
let stdio: Stdio = ls_child.stdout.take().unwrap().into_stdio().await?;
|
||||
|
||||
let mut echo_child = Command::new("grep")
|
||||
.arg("async")
|
||||
.stdin(stdio)
|
||||
.stdout(Stdio::piped())
|
||||
.spawn()?;
|
||||
|
||||
let mut buf = vec![];
|
||||
let mut stdout = echo_child.stdout.take().unwrap();
|
||||
|
||||
stdout.read_to_end(&mut buf).await?;
|
||||
dbg!(from_utf8(&buf).unwrap_or(""));
|
||||
|
||||
Result::Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue