Compare commits

...

49 Commits

Author SHA1 Message Date
John Nunley a9e4b09a6e
v2.2.2
Signed-off-by: John Nunley <dev@notgull.net>
2024-04-20 12:19:56 -07:00
cowlicks 76badb6964
Fix typo in ChildStdin docs (#77) 2024-04-19 10:34:15 +09:00
John Nunley 7323b449d7
v2.2.1
Signed-off-by: John Nunley <dev@notgull.net>
2024-04-13 22:18:58 -07:00
kennytm 06fb10ac23
bugfix: Use AtomicUsize instead of U64 to count zombies for 32-bit compatibility
Fix #74.
2024-04-11 17:03:29 -07:00
John Nunley 581c0a02c0 v2.2.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-03-30 09:12:08 -07:00
John Nunley 420c303921
ci: Add a test that sleeps
I think this catches errors in notification for the Linux backend.

Signed-off-by: John Nunley <dev@notgull.net>
2024-03-30 08:31:01 -07:00
John Nunley bbd42b56f8 feat: Allow fallback to signal backend
As pidfd isn't available in older versions of Linux that Rust still
supports, this is necessary for running on older Linux. In addition,
signals tests are still kept in CI.

Signed-off-by: John Nunley <dev@notgull.net>
2024-03-29 20:37:31 -07:00
John Nunley 1e0751fe65 feat: Add a waitable process backend for Linux
This commit adds a new backend for the process reaper. Rather than
waiting on a signal, it instead registers the process's pidfd into
async-io and waits on that instead.

I've coded this backend to also allow for other systems to be registered
here as well.

Signed-off-by: John Nunley <dev@notgull.net>
2024-03-29 20:37:31 -07:00
John Nunley 02699a4f04 m: Move reaper-related code to another module
Signed-off-by: John Nunley <dev@notgull.net>
2024-03-29 20:37:31 -07:00
John Nunley ed17f53035 v2.1.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-02-18 08:47:04 -08:00
John Nunley b6c1ddea79 chore: Bump event-listener to v5.1.0
https://github.com/smol-rs/async-process/pull/70#issuecomment-1950955967

Signed-off-by: John Nunley <dev@notgull.net>
2024-02-17 22:03:44 -08:00
John Nunley 35a77ff266
m: Port to event-listener v5.0.0
cc smol-rs/event-listener#104

Signed-off-by: John Nunley <dev@notgull.net>
2024-02-12 06:26:10 -08:00
John Nunley d23ab0b6a3
v2.0.1
Signed-off-by: John Nunley <dev@notgull.net>
2023-11-24 07:48:05 -08:00
dependabot[bot] 0357928248 Update windows-sys requirement from 0.48 to 0.52
Updates the requirements on [windows-sys](https://github.com/microsoft/windows-rs) to permit the latest version.
- [Release notes](https://github.com/microsoft/windows-rs/releases)
- [Commits](https://github.com/microsoft/windows-rs/compare/0.48.0...0.52.0)

---
updated-dependencies:
- dependency-name: windows-sys
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-11-20 22:24:19 +09:00
John Nunley d0993f8d0b
m: Bump to event-listener v4.0.0
Signed-off-by: John Nunley <dev@notgull.net>
2023-11-19 12:51:15 -08:00
John Nunley 90f343f0ad
v2.0.0
Signed-off-by: John Nunley <dev@notgull.net>
2023-10-31 18:26:06 -07:00
dependabot[bot] 54647bef65 Update async-lock requirement from 2.6.0 to 3.0.0
Updates the requirements on [async-lock](https://github.com/smol-rs/async-lock) to permit the latest version.
- [Release notes](https://github.com/smol-rs/async-lock/releases)
- [Changelog](https://github.com/smol-rs/async-lock/blob/master/CHANGELOG.md)
- [Commits](https://github.com/smol-rs/async-lock/compare/v2.6.0...v3.0.0)

---
updated-dependencies:
- dependency-name: async-lock
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-10-30 10:38:54 +09:00
dependabot[bot] 297ac50224 Update futures-lite requirement from 1.11.0 to 2.0.0
Updates the requirements on [futures-lite](https://github.com/smol-rs/futures-lite) to permit the latest version.
- [Release notes](https://github.com/smol-rs/futures-lite/releases)
- [Changelog](https://github.com/smol-rs/futures-lite/blob/master/CHANGELOG.md)
- [Commits](https://github.com/smol-rs/futures-lite/compare/v1.11.0...v2.0.0)

---
updated-dependencies:
- dependency-name: futures-lite
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-10-30 10:34:17 +09:00
John Nunley 65cde366d4
Bump async-io to v2.0.0 and async-channel to v2.0.0
Signed-off-by: John Nunley <dev@notgull.net>
2023-10-28 19:04:54 -07:00
John Nunley f733a83c22
feat: Add a way to run without the async-process thread
I know I said that I wouldn't add any more features, but I
think this is important enough.

Right now, a thread called "async-process" is responsible for listening
for SIGCHLD and reaping zombie processes. This listens for the SIGCHLD
signal in Unix and uses a channel connected to the waitable handle on
Windows. While this works, we can do better. Through async-signal, the
signal was already asynchronous on Unix; we were already just using
async_io::block_on to wait on the signal. After swapping out the channel
used on Windows with async-channel, the process reaping function "reap"
can be reimplemented as a fully asynchronous future.

From here we must make sure this future is being polled at all times. To
facilitate this, a function named "driver()" is added to the public API.
This future acquires a lock on the reaper structure and calls the
"reap()" future indefinitely. Multiple drivers can be created at once;
they will just wait forever on this lock. This future is intended to be
spawned onto an executor and left to run forever, making sure all child
processes are signalled whenever necessary. If no tasks are running the
driver future, the "async-process" thread is spawned and runs the
"reap()" future itself.

I've added the following controls to make sure that this system is
robust:

- If a "driver" task is dropped, another "driver" task will acquire the
  lock and keep the reaper active.
- Before being dropped, the task checks to see if it is the last driver.
  If it is, it will spawn the "async-process" thread to be the driver.
- When a Child is being created, it checks if there are any active
  drivers. If there are none, it spawns the "async-process" thread
  itself.
- One concern is that the driver future wil try to spawn the
  "async-process" thread as the application exits and the task is being
  dropped, which will be unnecessary and lead to slower shutdowns. To
  prevent this, the future checks to see if there are any extant `Child`
  instances (a new refcount is added to Reaper to facilitate this). If
  there are none, and if there are no zombie processes, it does not
  spawn the additional thread.
- Someone can still `mem::forget()` the driver thread. This does not
  lead to undefined behavior and just leads to processes being left
  dangling. At this point they're asking for wacky behavior.

This strategy might also be viable for `async-io`, if we want to try to
avoid needing to spawn the additional thread there as well.

Closes #7
cc smol-rs/async-io#40

Signed-off-by: John Nunley <dev@notgull.net>
2023-10-10 17:47:46 -07:00
Taiki Endo 9f9351bc52
Migrate to Rust 2021 (#58) 2023-10-08 14:56:48 +09:00
John Nunley b29af2b72c
breaking: Remove the pre-exec extension function
The purpose for removing this function is twofold:

- It is the only unsafe code in this crate that can't be realistically
  replaced with safe code.
- It is a footgun anyways, and can be done anyways with Into::into() if
   users really want it.

This is a breaking change.

Signed-off-by: John Nunley <dev@notgull.net>
2023-10-07 21:06:27 -07:00
John Nunley 513b9262d7
v1.8.1
Signed-off-by: John Nunley <dev@notgull.net>
2023-10-07 18:48:22 -07:00
John Nunley ce7ded77e8
bugfix: Bump async-signal to v0.2.3
Extracted from #54, should fix #55

Signed-off-by: John Nunley <dev@notgull.net>
2023-10-05 19:02:10 -07:00
John Nunley 78342ab1db
v1.8.0
Signed-off-by: John Nunley <dev@notgull.net>
2023-09-25 09:58:59 -07:00
John Nunley 52a693e4dd
m: Centralize all global state into a single structure
Signed-off-by: John Nunley <dev@notgull.net>
2023-09-10 16:00:56 -07:00
John Nunley 5e8e0b7c7b
Upgrade to event-listener v3.0.0 (#43)
Signed-off-by: John Nunley <dev@notgull.net>
2023-09-10 12:57:24 -07:00
Taiki Endo ac1c639e3e Update actions/checkout action to v4 2023-09-10 18:19:38 +09:00
John Nunley 07165c72f5
Add smol-rs logo (#48) 2023-07-17 14:41:47 +09:00
dependabot[bot] 1715616859
m: Update rustix requirement from 0.37 to 0.38
Updates the requirements on [rustix](https://github.com/bytecodealliance/rustix) to permit the latest version.
- [Release notes](https://github.com/bytecodealliance/rustix/releases)
- [Commits](https://github.com/bytecodealliance/rustix/compare/v0.37.0...v0.38.2)

---
updated-dependencies:
- dependency-name: rustix
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-07-02 19:05:21 -07:00
John Nunley d45d6f1094
Use async-signal instead of signal-hook (#42) 2023-06-11 11:21:40 -07:00
Taiki Endo d9d97d0299 Fix clippy::needless_borrow warning
```
error: the borrowed expression implements the required traits
   --> src/lib.rs:212:57
    |
212 |                     signal_hook::iterator::Signals::new(&[signal_hook::consts::SIGCHLD])
    |                                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: change this to: `[signal_hook::consts::SIGCHLD]`
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow
    = note: `-D clippy::needless-borrow` implied by `-D warnings`

error: the borrowed expression implements the required traits
  --> tests/std.rs:14:38
   |
14 |             Command::new("cmd").args(&["/C", "exit 0"]).spawn()
   |                                      ^^^^^^^^^^^^^^^^^ help: change this to: `["/C", "exit 0"]`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow
   = note: `-D clippy::needless-borrow` implied by `-D warnings`

error: the borrowed expression implements the required traits
  --> tests/std.rs:35:38
   |
35 |             Command::new("cmd").args(&["/C", "exit 1"]).spawn()
   |                                      ^^^^^^^^^^^^^^^^^ help: change this to: `["/C", "exit 1"]`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow

error: the borrowed expression implements the required traits
  --> tests/std.rs:87:22
   |
87 |             cmd.args(&["/C", "echo foobar"]).stdout(Stdio::piped());
   |                      ^^^^^^^^^^^^^^^^^^^^^^ help: change this to: `["/C", "echo foobar"]`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow

error: the borrowed expression implements the required traits
   --> tests/std.rs:145:23
    |
145 |                 .args(&["/C", "exit 1"])
    |                       ^^^^^^^^^^^^^^^^^ help: change this to: `["/C", "exit 1"]`
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow

error: the borrowed expression implements the required traits
   --> tests/std.rs:156:23
    |
156 |                 .args(&["/C", "exit 0"])
    |                       ^^^^^^^^^^^^^^^^^ help: change this to: `["/C", "exit 0"]`
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow

error: the borrowed expression implements the required traits
   --> tests/std.rs:189:23
    |
189 |                 .args(&["/C", "echo hello"])
    |                       ^^^^^^^^^^^^^^^^^^^^^ help: change this to: `["/C", "echo hello"]`
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow

error: the borrowed expression implements the required traits
   --> tests/std.rs:213:23
    |
213 |                 .args(&["/C", "mkdir ."])
    |                       ^^^^^^^^^^^^^^^^^^ help: change this to: `["/C", "mkdir ."]`
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow

error: the borrowed expression implements the required traits
   --> tests/std.rs:231:38
    |
231 |             Command::new("cmd").args(&["/C", "exit 1"]).spawn().unwrap()
    |                                      ^^^^^^^^^^^^^^^^^ help: change this to: `["/C", "exit 1"]`
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow

error: the borrowed expression implements the required traits
   --> tests/std.rs:243:38
    |
243 |             Command::new("cmd").args(&["/C", "exit 1"]).spawn().unwrap()
    |                                      ^^^^^^^^^^^^^^^^^ help: change this to: `["/C", "exit 1"]`
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow

error: the borrowed expression implements the required traits
   --> tests/std.rs:257:23
    |
257 |                 .args(&["/C", "echo hello"])
    |                       ^^^^^^^^^^^^^^^^^^^^^ help: change this to: `["/C", "echo hello"]`
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow
```
2023-06-11 22:26:44 +09:00
Taiki Endo 7eb60b1025 Bump MSRV to 1.63 2023-06-11 22:26:44 +09:00
John Nunley 5e48a40d6c
v1.7.0 (#41) 2023-04-07 10:47:16 -07:00
John Nunley 1a14d501cb
Fix minor typo in example.rs (#40) 2023-04-07 10:14:06 -07:00
Taiki Endo 01e36f4abe Update windows-sys to 0.48 2023-04-04 03:46:36 +09:00
Taiki Endo 99b9abc536 Minimize GITHUB_TOKEN permissions
Refs: https://github.blog/changelog/2021-04-20-github-actions-control-permissions-for-github_token
2023-03-26 16:35:52 +09:00
Taiki Endo 811dee59ac Set CARGO_NET_GIT_FETCH_WITH_CLI=true in CI 2023-03-26 16:35:38 +09:00
John Nunley e41847a378
ex: Add example for timed out process (#38) 2023-03-19 06:44:10 -07:00
dependabot[bot] 9f57bbfeb7 Update rustix requirement from 0.36 to 0.37
Updates the requirements on [rustix](https://github.com/bytecodealliance/rustix) to permit the latest version.
- [Release notes](https://github.com/bytecodealliance/rustix/releases)
- [Commits](https://github.com/bytecodealliance/rustix/compare/v0.36.0...v0.37.1)

---
updated-dependencies:
- dependency-name: rustix
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-03-06 12:14:46 +09:00
dependabot[bot] e84c3fd53c Update windows-sys requirement from 0.42 to 0.45
Updates the requirements on [windows-sys](https://github.com/microsoft/windows-rs) to permit the latest version.
- [Release notes](https://github.com/microsoft/windows-rs/releases)
- [Commits](https://github.com/microsoft/windows-rs/compare/0.42.0...0.45.0)

---
updated-dependencies:
- dependency-name: windows-sys
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-01-23 11:32:45 +09:00
Taiki Endo e086897e53 Add windows::CommandExt::raw_arg on Rust 1.62+ 2022-12-30 14:31:53 +09:00
Taiki Endo f76d325959 Seal CommandExt trait
This is technically a breaking change, but we follow the standard
library's decision that sealing CommandExt is fine.
bfd1ccfb27
2022-12-30 14:31:53 +09:00
Taiki Endo 7980b4696a Reduce syscalls in blocking_fd 2022-12-30 13:43:50 +09:00
Taiki Endo 73f3f8f308 Fix clippy::needless_borrow warning
```
warning: the borrowed expression implements the required traits
   --> tests/std.rs:311:29
    |
311 |             cmd.env("PATH", &p);
    |                             ^^ help: change this to: `p`
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow
    = note: `#[warn(clippy::needless_borrow)]` on by default
```
2022-12-30 13:43:50 +09:00
Taiki Endo 93be5c2506 Replace direct dependency on libc with rustix 2022-12-30 13:43:50 +09:00
Taiki Endo 2a2c1ee34a Bump MSRV to 1.47
On Windows, we depend on blocking and async-task that require Rust 1.47.
2022-12-28 12:44:10 +09:00
Taiki Endo 8226196372 Enable dependabot update for Rust 2022-12-28 12:44:10 +09:00
Taiki Endo 8559816dc6 Clean up CI config 2022-12-28 12:44:10 +09:00
15 changed files with 1281 additions and 306 deletions

9
.github/dependabot.yml vendored Normal file
View File

@ -0,0 +1,9 @@
version: 2
updates:
- package-ecosystem: cargo
directory: /
schedule:
interval: weekly
commit-message:
prefix: ''
labels: []

View File

@ -1,16 +1,29 @@
name: CI
permissions:
contents: read
on:
pull_request:
push:
branches:
- master
schedule:
- cron: '0 2 * * *'
- cron: '0 2 * * 0'
env:
RUSTFLAGS: -D warnings
CARGO_INCREMENTAL: 0
CARGO_NET_GIT_FETCH_WITH_CLI: true
CARGO_NET_RETRY: 10
CARGO_TERM_COLOR: always
RUST_BACKTRACE: 1
RUSTFLAGS: -D warnings
RUSTDOCFLAGS: -D warnings
RUSTUP_MAX_RETRIES: 10
defaults:
run:
shell: bash
jobs:
test:
@ -21,7 +34,7 @@ jobs:
os: [ubuntu-latest, windows-latest, macos-latest]
rust: [nightly, beta, stable]
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Install Rust
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }}
@ -30,16 +43,21 @@ jobs:
if: startsWith(matrix.rust, 'nightly')
run: cargo check -Z features=dev_dep
- run: cargo test
- run: cargo test
env:
RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg async_process_force_signal_backend
msrv:
runs-on: ubuntu-latest
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest]
# When updating this, the reminder to update the minimum supported
# Rust version in Cargo.toml.
rust: ['1.46']
rust: ['1.63']
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- run: cargo build
@ -47,7 +65,7 @@ jobs:
clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- run: cargo clippy --all-features --all-targets
@ -55,15 +73,20 @@ jobs:
fmt:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- run: cargo fmt --all --check
security_audit:
permissions:
checks: write
contents: read
issues: write
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions-rs/audit-check@v1
- uses: actions/checkout@v4
# https://github.com/rustsec/audit-check/issues/2
- uses: rustsec/audit-check@master
with:
token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -1,5 +1,8 @@
name: Release
permissions:
contents: write
on:
push:
tags:
@ -10,7 +13,7 @@ jobs:
if: github.repository_owner == 'smol-rs'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: taiki-e/create-gh-release-action@v1
with:
changelog: CHANGELOG.md

View File

@ -1,3 +1,47 @@
# Version 2.2.2
- Fix a typo in the docs for `ChildStdin`. (#76)
# Version 2.2.1
- Fix a compilation error for 32-bit operating systems by using a 32-bit zombie counter. (#75)
# Version 2.2.0
- Port Linux to a new backend that tries to use `pidfd` if it is available. (#68)
# Version 2.1.0
- Update `event-listener` to v5.1.0. (#67)
# Version 2.0.1
- Update `event-listener` to v4.0.0. (#64)
- Update `windows-sys` to v0.52.0. (#65)
# Version 2.0.0
- **Breaking:** Remove the `pre_exec` extension function on Unix. It is still available through the `From<std::process::Command>` implementation on `Command`. (#54)
- Add the `driver()` function, which allows the processes to be driven without a separate thread. (#52)
- Bump `async-io` to v2.0.0 and `async-channel` to v2.0.0. (#60)
# Version 1.8.1
- Bump `async-signal` to v0.2.3. (#56)
# Version 1.8.0
- Move from `signal-hook` to the `async-signal` crate. (#42)
- Reorganize the internals of this crate to be more coherent. (#46)
- Bump to `event-listener` v3.0.0. (#43)
# Version 1.7.0
- Replace direct dependency on libc with rustix. (#31)
- Reduce the number of syscalls used in the `into_stdio` method. (#31)
- Add windows::CommandExt::raw_arg on Rust 1.62+. (#32)
- Update windows-sys to 0.48. (#39)
# Version 1.6.0
- Switch from `winapi` to `windows-sys` (#27)

View File

@ -2,11 +2,11 @@
name = "async-process"
# When publishing a new version:
# - Update CHANGELOG.md
# - Create "v1.x.y" git tag
version = "1.6.0"
# - Create "v2.x.y" git tag
version = "2.2.2"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
rust-version = "1.46"
edition = "2021"
rust-version = "1.63"
description = "Async interface for working with processes"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/smol-rs/async-process"
@ -15,31 +15,36 @@ categories = ["asynchronous", "os"]
exclude = ["/.*"]
[dependencies]
async-lock = "2.6.0"
async-lock = "3.0.0"
cfg-if = "1.0"
event-listener = "2.4.0"
futures-lite = "1.11.0"
[build-dependencies]
autocfg = "1"
event-listener = "5.1.0"
futures-lite = "2.0.0"
tracing = { version = "0.1.40", default-features = false }
[target.'cfg(unix)'.dependencies]
async-io = "1.8"
libc = "0.2.88"
async-io = "2.1.0"
async-signal = "0.2.3"
rustix = { version = "0.38", default-features = false, features = ["std", "fs"] }
[target.'cfg(unix)'.dependencies.signal-hook]
version = "0.3.0"
features = ["iterator"]
default-features = false
[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
async-channel = "2.0.0"
async-task = "4.7.0"
[target.'cfg(all(unix, not(any(target_os = "linux", target_os = "android"))))'.dependencies]
rustix = { version = "0.38", default-features = false, features = ["std", "fs", "process"] }
[target.'cfg(windows)'.dependencies]
async-channel = "2.0.0"
blocking = "1.0.0"
[target.'cfg(windows)'.dependencies.windows-sys]
version = "0.42"
version = "0.52"
default-features = false
features = [
"Win32_Foundation",
"Win32_System_Threading",
"Win32_System_WindowsProgramming"
]
[dev-dependencies]
async-executor = "1.5.1"
async-io = "2.1.0"

View File

@ -1,16 +0,0 @@
fn main() {
let cfg = match autocfg::AutoCfg::new() {
Ok(cfg) => cfg,
Err(e) => {
println!(
"cargo:warning=async-process: failed to detect compiler features: {}",
e
);
return;
}
};
if !cfg.probe_rustc_version(1, 63) {
autocfg::emit("async_process_no_io_safety");
}
}

80
examples/timeout.rs Normal file
View File

@ -0,0 +1,80 @@
//! An example of running a `Command` with a timeout.
use async_io::Timer;
use async_process::{Command, Stdio};
use futures_lite::{future, prelude::*};
use std::io;
fn main() -> io::Result<()> {
async_io::block_on(async {
// Spawn a a command of your choice.
let mut child = Command::new("sleep")
.arg("3")
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
// Run a future to drain the stdout of the child.
// We can't use output() here because it would be cancelled along with the child when the timeout
// expires.
let mut stdout = String::new();
let drain_stdout = {
let buffer = &mut stdout;
let mut stdout = child.stdout.take().unwrap();
async move {
stdout.read_to_string(buffer).await?;
// Wait for the child to exit or the timeout.
future::pending().await
}
};
// Run a future to drain the stderr of the child.
let mut stderr = String::new();
let drain_stderr = {
let buffer = &mut stderr;
let mut stderr = child.stderr.take().unwrap();
async move {
stderr.read_to_string(buffer).await?;
// Wait for the child to exit or the timeout.
future::pending().await
}
};
// Run a future that waits for the child to exit.
let wait = async move {
child.status().await?;
// Child exited.
io::Result::Ok(false)
};
// Run a future that times out after 1 second.
let timeout_s = 1;
let timeout = async move {
Timer::after(std::time::Duration::from_secs(timeout_s)).await;
// Timed out.
Ok(true)
};
// Run the futures concurrently.
// Note: For larger scale programs than this you should probably spawn each individual future on
// a separate task in an executor.
let timed_out = drain_stdout.or(drain_stderr).or(wait).or(timeout).await?;
if timed_out {
println!("The child timed out.");
} else {
println!("The child exited.");
}
println!("Stdout:\n{}", stdout);
println!("Stderr:\n{}", stderr);
Ok(())
})
}

View File

@ -49,29 +49,32 @@
//! ```
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![doc(
html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
#![doc(
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
use std::convert::Infallible;
use std::ffi::OsStr;
use std::fmt;
use std::path::Path;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::thread;
#[cfg(unix)]
use async_io::Async;
#[cfg(all(not(async_process_no_io_safety), unix))]
use std::convert::{TryFrom, TryInto};
#[cfg(all(not(async_process_no_io_safety), unix))]
use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
#[cfg(windows)]
use blocking::Unblock;
use async_lock::OnceCell;
use event_listener::Event;
use futures_lite::{future, io, prelude::*};
#[doc(no_inline)]
@ -82,19 +85,134 @@ pub mod unix;
#[cfg(windows)]
pub mod windows;
/// An event delivered every time the SIGCHLD signal occurs.
static SIGCHLD: Event = Event::new();
mod reaper;
mod sealed {
pub trait Sealed {}
}
#[cfg(test)]
static DRIVER_THREAD_SPAWNED: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(false);
/// The zombie process reaper.
///
/// This structure reaps zombie processes and emits the `SIGCHLD` signal.
struct Reaper {
/// Underlying system reaper.
sys: reaper::Reaper,
/// The number of tasks polling the SIGCHLD event.
///
/// If this is zero, the `async-process` thread must be spawned.
drivers: AtomicUsize,
/// Number of live `Child` instances currently running.
///
/// This is used to prevent the reaper thread from being spawned right as the program closes,
/// when the reaper thread isn't needed. This represents the number of active processes.
child_count: AtomicUsize,
}
impl Reaper {
/// Get the singleton instance of the reaper.
fn get() -> &'static Self {
static REAPER: OnceCell<Reaper> = OnceCell::new();
REAPER.get_or_init_blocking(|| Reaper {
sys: reaper::Reaper::new(),
drivers: AtomicUsize::new(0),
child_count: AtomicUsize::new(0),
})
}
/// Ensure that the reaper is driven.
///
/// If there are no active `driver()` callers, this will spawn the `async-process` thread.
#[inline]
fn ensure_driven(&'static self) {
if self
.drivers
.compare_exchange(0, 1, Ordering::SeqCst, Ordering::Acquire)
.is_ok()
{
self.start_driver_thread();
}
}
/// Start the `async-process` thread.
#[cold]
fn start_driver_thread(&'static self) {
#[cfg(test)]
DRIVER_THREAD_SPAWNED
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.unwrap_or_else(|_| unreachable!("Driver thread already spawned"));
thread::Builder::new()
.name("async-process".to_string())
.spawn(move || {
let driver = async move {
// No need to bump self.drivers, it was already bumped in ensure_driven.
let guard = self.sys.lock().await;
self.sys.reap(guard).await
};
#[cfg(unix)]
async_io::block_on(driver);
#[cfg(not(unix))]
future::block_on(driver);
})
.expect("cannot spawn async-process thread");
}
/// Register a process with this reaper.
fn register(&'static self, child: std::process::Child) -> io::Result<reaper::ChildGuard> {
self.ensure_driven();
self.sys.register(child)
}
}
cfg_if::cfg_if! {
if #[cfg(windows)] {
// Wraps a sync I/O type into an async I/O type.
fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
Ok(Unblock::new(io))
}
} else if #[cfg(unix)] {
/// Wrap a file descriptor into a non-blocking I/O type.
fn wrap<T: std::os::unix::io::AsFd>(io: T) -> io::Result<Async<T>> {
Async::new(io)
}
}
}
/// A guard that can kill child processes, or push them into the zombie list.
struct ChildGuard {
inner: Option<std::process::Child>,
inner: reaper::ChildGuard,
reap_on_drop: bool,
kill_on_drop: bool,
reaper: &'static Reaper,
}
impl ChildGuard {
fn get_mut(&mut self) -> &mut std::process::Child {
self.inner.as_mut().unwrap()
self.inner.get_mut()
}
}
// When the last reference to the child process is dropped, push it into the zombie list.
impl Drop for ChildGuard {
fn drop(&mut self) {
if self.kill_on_drop {
self.get_mut().kill().ok();
}
if self.reap_on_drop {
self.inner.reap(&self.reaper.sys);
}
// Decrement number of children.
self.reaper.child_count.fetch_sub(1, Ordering::Acquire);
}
}
@ -136,6 +254,8 @@ impl Child {
/// The "async-process" thread waits for processes in the global list and cleans up the
/// resources when they exit.
fn new(cmd: &mut Command) -> io::Result<Child> {
// Make sure the reaper exists before we spawn the child process.
let reaper = Reaper::get();
let mut child = cmd.inner.spawn()?;
// Convert sync I/O types into async I/O types.
@ -143,141 +263,21 @@ impl Child {
let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout);
let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr);
cfg_if::cfg_if! {
if #[cfg(windows)] {
use std::ffi::c_void;
use std::os::windows::io::AsRawHandle;
use std::sync::mpsc;
// Bump the child count.
reaper.child_count.fetch_add(1, Ordering::Relaxed);
use windows_sys::Win32::{
System::{
Threading::{RegisterWaitForSingleObject, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE},
WindowsProgramming::INFINITE,
},
Foundation::{BOOLEAN, HANDLE},
};
// This channel is used to simulate SIGCHLD on Windows.
fn callback_channel() -> (&'static mpsc::SyncSender<()>, &'static Mutex<mpsc::Receiver<()>>) {
static CALLBACK: OnceCell<(mpsc::SyncSender<()>, Mutex<mpsc::Receiver<()>>)> =
OnceCell::new();
let (s, r) = CALLBACK.get_or_init_blocking(|| {
let (s, r) = mpsc::sync_channel(1);
(s, Mutex::new(r))
});
(s, r)
}
// Called when a child exits.
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
callback_channel().0.try_send(()).ok();
}
// Register this child process to invoke `callback` on exit.
let mut wait_object = 0;
let ret = unsafe {
RegisterWaitForSingleObject(
&mut wait_object,
child.as_raw_handle() as HANDLE,
Some(callback),
std::ptr::null_mut(),
INFINITE,
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
)
};
if ret == 0 {
return Err(io::Error::last_os_error());
}
// Waits for the next SIGCHLD signal.
fn wait_sigchld() {
callback_channel().1.lock().unwrap().recv().ok();
}
// Wraps a sync I/O type into an async I/O type.
fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
Ok(Unblock::new(io))
}
} else if #[cfg(unix)] {
static SIGNALS: OnceCell<Mutex<signal_hook::iterator::Signals>> = OnceCell::new();
// Make sure the signal handler is registered before interacting with the process.
SIGNALS.get_or_init_blocking(|| Mutex::new(
signal_hook::iterator::Signals::new(&[signal_hook::consts::SIGCHLD])
.expect("cannot set signal handler for SIGCHLD"),
));
// Waits for the next SIGCHLD signal.
fn wait_sigchld() {
SIGNALS.get().expect("Signals not registered").lock().unwrap().forever().next();
}
// Wraps a sync I/O type into an async I/O type.
fn wrap<T: std::os::unix::io::AsRawFd>(io: T) -> io::Result<Async<T>> {
Async::new(io)
}
}
}
static ZOMBIES: OnceCell<Mutex<Vec<std::process::Child>>> = OnceCell::new();
// Make sure the thread is started.
ZOMBIES.get_or_init_blocking(|| {
// Start a thread that handles SIGCHLD and notifies tasks when child processes exit.
thread::Builder::new()
.name("async-process".to_string())
.spawn(move || {
loop {
// Wait for the next SIGCHLD signal.
wait_sigchld();
// Notify all listeners waiting on the SIGCHLD event.
SIGCHLD.notify(std::usize::MAX);
// Reap zombie processes.
let mut zombies = ZOMBIES.get().unwrap().lock().unwrap();
let mut i = 0;
while i < zombies.len() {
if let Ok(None) = zombies[i].try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
}
}
}
})
.expect("cannot spawn async-process thread");
Mutex::new(Vec::new())
});
// When the last reference to the child process is dropped, push it into the zombie list.
impl Drop for ChildGuard {
fn drop(&mut self) {
if self.kill_on_drop {
self.get_mut().kill().ok();
}
if self.reap_on_drop {
let mut zombies = ZOMBIES.get().unwrap().lock().unwrap();
if let Ok(None) = self.get_mut().try_wait() {
zombies.push(self.inner.take().unwrap());
}
}
}
}
// Register the child process in the global list.
let inner = reaper.register(child)?;
Ok(Child {
stdin,
stdout,
stderr,
child: Arc::new(Mutex::new(ChildGuard {
inner: Some(child),
inner,
reap_on_drop: cmd.reap_on_drop,
kill_on_drop: cmd.kill_on_drop,
reaper,
})),
})
}
@ -367,18 +367,7 @@ impl Child {
self.stdin.take();
let child = self.child.clone();
async move {
let mut listener = None;
loop {
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
return Ok(status);
}
match listener.take() {
None => listener = Some(SIGCHLD.listen()),
Some(listener) => listener.await,
}
}
}
async move { Reaper::get().sys.status(&child).await }
}
/// Drops the stdin handle and collects the output of the process.
@ -452,7 +441,7 @@ impl fmt::Debug for Child {
/// A handle to a child process's standard input (stdin).
///
/// When a [`ChildStdin`] is dropped, the underlying handle gets clossed. If the child process was
/// When a [`ChildStdin`] is dropped, the underlying handle gets closed. If the child process was
/// previously blocked on input, it becomes unblocked after dropping.
#[derive(Debug)]
pub struct ChildStdin(
@ -485,7 +474,7 @@ impl ChildStdin {
Ok(self.0.into_inner().await.into())
} else if #[cfg(unix)] {
let child_stdin = self.0.into_inner()?;
blocking_fd(child_stdin.as_raw_fd())?;
blocking_fd(rustix::fd::AsFd::as_fd(&child_stdin))?;
Ok(child_stdin.into())
}
}
@ -517,14 +506,14 @@ impl AsRawFd for ChildStdin {
}
}
#[cfg(all(not(async_process_no_io_safety), unix))]
#[cfg(unix)]
impl AsFd for ChildStdin {
fn as_fd(&self) -> BorrowedFd<'_> {
self.0.as_fd()
}
}
#[cfg(all(not(async_process_no_io_safety), unix))]
#[cfg(unix)]
impl TryFrom<ChildStdin> for OwnedFd {
type Error = io::Error;
@ -577,7 +566,7 @@ impl ChildStdout {
Ok(self.0.into_inner().await.into())
} else if #[cfg(unix)] {
let child_stdout = self.0.into_inner()?;
blocking_fd(child_stdout.as_raw_fd())?;
blocking_fd(rustix::fd::AsFd::as_fd(&child_stdout))?;
Ok(child_stdout.into())
}
}
@ -601,14 +590,14 @@ impl AsRawFd for ChildStdout {
}
}
#[cfg(all(not(async_process_no_io_safety), unix))]
#[cfg(unix)]
impl AsFd for ChildStdout {
fn as_fd(&self) -> BorrowedFd<'_> {
self.0.as_fd()
}
}
#[cfg(all(not(async_process_no_io_safety), unix))]
#[cfg(unix)]
impl TryFrom<ChildStdout> for OwnedFd {
type Error = io::Error;
@ -650,7 +639,7 @@ impl ChildStderr {
Ok(self.0.into_inner().await.into())
} else if #[cfg(unix)] {
let child_stderr = self.0.into_inner()?;
blocking_fd(child_stderr.as_raw_fd())?;
blocking_fd(rustix::fd::AsFd::as_fd(&child_stderr))?;
Ok(child_stderr.into())
}
}
@ -674,14 +663,14 @@ impl AsRawFd for ChildStderr {
}
}
#[cfg(all(not(async_process_no_io_safety), unix))]
#[cfg(unix)]
impl AsFd for ChildStderr {
fn as_fd(&self) -> BorrowedFd<'_> {
self.0.as_fd()
}
}
#[cfg(all(not(async_process_no_io_safety), unix))]
#[cfg(unix)]
impl TryFrom<ChildStderr> for OwnedFd {
type Error = io::Error;
@ -690,6 +679,68 @@ impl TryFrom<ChildStderr> for OwnedFd {
}
}
/// Runs the driver for the asynchronous processes.
///
/// This future takes control of global structures related to driving [`Child`]ren and reaping
/// zombie processes. These responsibilities include listening for the `SIGCHLD` signal and
/// making sure zombie processes are successfully waited on.
///
/// If multiple tasks run `driver()` at once, only one will actually drive the reaper; the other
/// ones will just sleep. If a task that is driving the reaper is dropped, a previously sleeping
/// task will take over. If all tasks driving the reaper are dropped, the "async-process" thread
/// will be spawned. The "async-process" thread just blocks on this future and will automatically
/// be spawned if no tasks are driving the reaper once a [`Child`] is created.
///
/// This future will never complete. It is intended to be ran on a background task in your
/// executor of choice.
///
/// # Examples
///
/// ```no_run
/// use async_executor::Executor;
/// use async_process::{driver, Command};
///
/// # futures_lite::future::block_on(async {
/// // Create an executor and run on it.
/// let ex = Executor::new();
/// ex.run(async {
/// // Run the driver future in the background.
/// ex.spawn(driver()).detach();
///
/// // Run a command.
/// Command::new("ls").output().await.ok();
/// }).await;
/// # });
/// ```
#[allow(clippy::manual_async_fn)]
#[inline]
pub fn driver() -> impl Future<Output = Infallible> + Send + 'static {
async {
// Get the reaper.
let reaper = Reaper::get();
// Make sure the reaper knows we're driving it.
reaper.drivers.fetch_add(1, Ordering::SeqCst);
// Decrement the driver count when this future is dropped.
let _guard = CallOnDrop(|| {
let prev_count = reaper.drivers.fetch_sub(1, Ordering::SeqCst);
// If this was the last driver, and there are still resources actively using the
// reaper, make sure that there is a thread driving the reaper.
if prev_count == 1
&& (reaper.child_count.load(Ordering::SeqCst) > 0 || reaper.sys.has_zombies())
{
reaper.ensure_driven();
}
});
// Acquire the reaper lock and start polling the SIGCHLD event.
let guard = reaper.sys.lock().await;
reaper.sys.reap(guard).await
}
}
/// A builder for spawning processes.
///
/// # Examples
@ -1063,60 +1114,129 @@ impl fmt::Debug for Command {
/// Moves `Fd` out of non-blocking mode.
#[cfg(unix)]
fn blocking_fd(fd: std::os::unix::io::RawFd) -> io::Result<()> {
// Helper macro to execute a system call that returns an `io::Result`.
macro_rules! syscall {
($fn:ident ( $($arg:expr),* $(,)? ) ) => {{
let res = unsafe { libc::$fn($($arg, )*) };
if res == -1 {
return Err(std::io::Error::last_os_error());
} else {
res
fn blocking_fd(fd: rustix::fd::BorrowedFd<'_>) -> io::Result<()> {
cfg_if::cfg_if! {
// ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
// for now, as with the standard library, because it seems to behave
// differently depending on the platform.
// https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d
// https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80
// https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a
if #[cfg(target_os = "linux")] {
rustix::io::ioctl_fionbio(fd, false)?;
} else {
let previous = rustix::fs::fcntl_getfl(fd)?;
let new = previous & !rustix::fs::OFlags::NONBLOCK;
if new != previous {
rustix::fs::fcntl_setfl(fd, new)?;
}
}};
}
}
let res = syscall!(fcntl(fd, libc::F_GETFL));
syscall!(fcntl(fd, libc::F_SETFL, res & !libc::O_NONBLOCK));
Ok(())
}
#[cfg(unix)]
mod test {
struct CallOnDrop<F: FnMut()>(F);
#[test]
fn test_into_inner() {
futures_lite::future::block_on(async {
use crate::Command;
use std::io::Result;
use std::process::Stdio;
use std::str::from_utf8;
use futures_lite::AsyncReadExt;
let mut ls_child = Command::new("cat")
.arg("Cargo.toml")
.stdout(Stdio::piped())
.spawn()?;
let stdio: Stdio = ls_child.stdout.take().unwrap().into_stdio().await?;
let mut echo_child = Command::new("grep")
.arg("async")
.stdin(stdio)
.stdout(Stdio::piped())
.spawn()?;
let mut buf = vec![];
let mut stdout = echo_child.stdout.take().unwrap();
stdout.read_to_end(&mut buf).await?;
dbg!(from_utf8(&buf).unwrap_or(""));
Result::Ok(())
})
.unwrap();
impl<F: FnMut()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}
#[cfg(test)]
mod test {
#[test]
fn polled_driver() {
use super::{driver, Command};
use futures_lite::future;
use futures_lite::prelude::*;
let is_thread_spawned =
|| super::DRIVER_THREAD_SPAWNED.load(std::sync::atomic::Ordering::SeqCst);
#[cfg(unix)]
fn command() -> Command {
let mut cmd = Command::new("sh");
cmd.arg("-c").arg("echo hello");
cmd
}
#[cfg(windows)]
fn command() -> Command {
let mut cmd = Command::new("cmd");
cmd.arg("/C").arg("echo hello");
cmd
}
#[cfg(unix)]
const OUTPUT: &[u8] = b"hello\n";
#[cfg(windows)]
const OUTPUT: &[u8] = b"hello\r\n";
future::block_on(async {
// Thread should not be spawned off the bat.
assert!(!is_thread_spawned());
// Spawn a driver.
let mut driver1 = Box::pin(driver());
future::poll_once(&mut driver1).await;
assert!(!is_thread_spawned());
// We should be able to run the driver in parallel with a process future.
async {
(&mut driver1).await;
}
.or(async {
let output = command().output().await.unwrap();
assert_eq!(output.stdout, OUTPUT);
})
.await;
assert!(!is_thread_spawned());
// Spawn a second driver.
let mut driver2 = Box::pin(driver());
future::poll_once(&mut driver2).await;
assert!(!is_thread_spawned());
// Poll both drivers in parallel.
async {
(&mut driver1).await;
}
.or(async {
(&mut driver2).await;
})
.or(async {
let output = command().output().await.unwrap();
assert_eq!(output.stdout, OUTPUT);
})
.await;
assert!(!is_thread_spawned());
// Once one is dropped, the other should take over.
drop(driver1);
assert!(!is_thread_spawned());
// Poll driver2 in parallel with a process future.
async {
(&mut driver2).await;
}
.or(async {
let output = command().output().await.unwrap();
assert_eq!(output.stdout, OUTPUT);
})
.await;
assert!(!is_thread_spawned());
// Once driver2 is dropped, the thread should not be spawned, as there are no active
// child processes..
drop(driver2);
assert!(!is_thread_spawned());
// We should now be able to poll the process future independently, it will spawn the
// thread.
let output = command().output().await.unwrap();
assert_eq!(output.stdout, OUTPUT);
assert!(is_thread_spawned());
});
}
}

221
src/reaper/mod.rs Normal file
View File

@ -0,0 +1,221 @@
//! The underlying system reaper.
//!
//! There are two backends:
//!
//! - signal, which waits for SIGCHLD.
//! - wait, which waits directly on a process handle.
//!
//! "wait" is preferred, but is not available on all supported Linuxes. So we
//! test to see if pidfd is supported first. If it is, we use wait. If not, we use
//! signal.
#![allow(irrefutable_let_patterns)]
/// Enable the waiting reaper.
#[cfg(any(target_os = "linux", target_os = "android"))]
macro_rules! cfg_wait {
($($tt:tt)*) => {$($tt)*};
}
/// Enable the waiting reaper.
#[cfg(not(any(target_os = "linux", target_os = "android")))]
macro_rules! cfg_wait {
($($tt:tt)*) => {};
}
/// Enable signals.
macro_rules! cfg_signal {
($($tt:tt)*) => {$($tt)*};
}
cfg_wait! {
mod wait;
}
cfg_signal! {
mod signal;
}
use std::io;
use std::sync::Mutex;
/// The underlying system reaper.
pub(crate) enum Reaper {
#[cfg(any(target_os = "linux", target_os = "android"))]
/// The reaper based on the wait backend.
Wait(wait::Reaper),
/// The reaper based on the signal backend.
Signal(signal::Reaper),
}
/// The wrapper around a child.
pub(crate) enum ChildGuard {
#[cfg(any(target_os = "linux", target_os = "android"))]
/// The child guard based on the wait backend.
Wait(wait::ChildGuard),
/// The child guard based on the signal backend.
Signal(signal::ChildGuard),
}
/// A lock on the reaper.
pub(crate) enum Lock {
#[cfg(any(target_os = "linux", target_os = "android"))]
/// The wait-based reaper needs no lock.
Wait,
/// The lock for the signal-based reaper.
Signal(signal::Lock),
}
impl Reaper {
/// Create a new reaper.
pub(crate) fn new() -> Self {
cfg_wait! {
if wait::available() && !cfg!(async_process_force_signal_backend) {
return Self::Wait(wait::Reaper::new());
}
}
// Return the signal-based reaper.
cfg_signal! {
return Self::Signal(signal::Reaper::new());
}
#[allow(unreachable_code)]
{
panic!("neither the signal backend nor the waiter backend is available")
}
}
/// Lock the driver thread.
///
/// This makes it so only one thread can reap at once.
pub(crate) async fn lock(&'static self) -> Lock {
cfg_wait! {
if let Self::Wait(_this) = self {
// No locking needed.
return Lock::Wait;
}
}
cfg_signal! {
if let Self::Signal(this) = self {
// We need to lock.
return Lock::Signal(this.lock().await);
}
}
unreachable!()
}
/// Reap zombie processes forever.
pub(crate) async fn reap(&'static self, lock: Lock) -> ! {
cfg_wait! {
if let (Self::Wait(this), Lock::Wait) = (self, &lock) {
return this.reap().await;
}
}
cfg_signal! {
if let (Self::Signal(this), Lock::Signal(lock)) = (self, lock) {
return this.reap(lock).await;
}
}
unreachable!()
}
/// Register a child into this reaper.
pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
cfg_wait! {
if let Self::Wait(this) = self {
return this.register(child).map(ChildGuard::Wait);
}
}
cfg_signal! {
if let Self::Signal(this) = self {
return this.register(child).map(ChildGuard::Signal);
}
}
unreachable!()
}
/// Wait for the inner child to complete.
pub(crate) async fn status(
&'static self,
child: &Mutex<crate::ChildGuard>,
) -> io::Result<std::process::ExitStatus> {
cfg_wait! {
if let Self::Wait(this) = self {
return this.status(child).await;
}
}
cfg_signal! {
if let Self::Signal(this) = self {
return this.status(child).await;
}
}
unreachable!()
}
/// Do we have any registered zombie processes?
pub(crate) fn has_zombies(&'static self) -> bool {
cfg_wait! {
if let Self::Wait(this) = self {
return this.has_zombies();
}
}
cfg_signal! {
if let Self::Signal(this) = self {
return this.has_zombies();
}
}
unreachable!()
}
}
impl ChildGuard {
/// Get a reference to the inner process.
pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
cfg_wait! {
if let Self::Wait(this) = self {
return this.get_mut();
}
}
cfg_signal! {
if let Self::Signal(this) = self {
return this.get_mut();
}
}
unreachable!()
}
/// Start reaping this child process.
pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
cfg_wait! {
if let (Self::Wait(this), Reaper::Wait(reaper)) = (&mut *self, reaper) {
this.reap(reaper);
return;
}
}
cfg_signal! {
if let (Self::Signal(this), Reaper::Signal(reaper)) = (self, reaper) {
this.reap(reaper);
return;
}
}
unreachable!()
}
}

238
src/reaper/signal.rs Normal file
View File

@ -0,0 +1,238 @@
//! A version of the reaper that waits for a signal to check for process progress.
use async_lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
use event_listener::Event;
use futures_lite::future;
use std::io;
use std::mem;
use std::sync::Mutex;
pub(crate) type Lock = AsyncMutexGuard<'static, ()>;
/// The zombie process reaper.
pub(crate) struct Reaper {
/// An event delivered every time the SIGCHLD signal occurs.
sigchld: Event,
/// The list of zombie processes.
zombies: Mutex<Vec<std::process::Child>>,
/// The pipe that delivers signal notifications.
pipe: Pipe,
/// Locking this mutex indicates that we are polling the SIGCHLD event.
driver_guard: AsyncMutex<()>,
}
impl Reaper {
/// Create a new reaper.
pub(crate) fn new() -> Self {
Reaper {
sigchld: Event::new(),
zombies: Mutex::new(Vec::new()),
pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
driver_guard: AsyncMutex::new(()),
}
}
/// Lock the driver thread.
pub(crate) async fn lock(&self) -> AsyncMutexGuard<'_, ()> {
self.driver_guard.lock().await
}
/// Reap zombie processes forever.
pub(crate) async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
loop {
// Wait for the next SIGCHLD signal.
self.pipe.wait().await;
// Notify all listeners waiting on the SIGCHLD event.
self.sigchld.notify(std::usize::MAX);
// Reap zombie processes, but make sure we don't hold onto the lock for too long!
let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
let mut i = 0;
'reap_zombies: loop {
for _ in 0..50 {
if i >= zombies.len() {
break 'reap_zombies;
}
if let Ok(None) = zombies[i].try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
}
}
// Be a good citizen; yield if there are a lot of processes.
//
// After we yield, check if there are more zombie processes.
future::yield_now().await;
zombies.append(&mut self.zombies.lock().unwrap());
}
// Put zombie processes back.
self.zombies.lock().unwrap().append(&mut zombies);
}
}
/// Register a process with this reaper.
pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
self.pipe.register(&child)?;
Ok(ChildGuard { inner: Some(child) })
}
/// Wait for an event to occur for a child process.
pub(crate) async fn status(
&'static self,
child: &Mutex<crate::ChildGuard>,
) -> io::Result<std::process::ExitStatus> {
loop {
// Wait on the child process.
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
return Ok(status);
}
// Start listening.
event_listener::listener!(self.sigchld => listener);
// Try again.
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
return Ok(status);
}
// Wait on the listener.
listener.await;
}
}
/// Do we have any registered zombie processes?
pub(crate) fn has_zombies(&'static self) -> bool {
!self
.zombies
.lock()
.unwrap_or_else(|x| x.into_inner())
.is_empty()
}
}
/// The wrapper around the child.
pub(crate) struct ChildGuard {
inner: Option<std::process::Child>,
}
impl ChildGuard {
/// Get a mutable reference to the inner child.
pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
self.inner.as_mut().unwrap()
}
/// Begin the reaping process for this child.
pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
if let Ok(None) = self.get_mut().try_wait() {
reaper.zombies.lock().unwrap().push(self.inner.take().unwrap());
}
}
}
cfg_if::cfg_if! {
if #[cfg(windows)] {
use async_channel::{Sender, Receiver, bounded};
use std::ffi::c_void;
use std::os::windows::io::AsRawHandle;
use windows_sys::Win32::{
Foundation::{BOOLEAN, HANDLE},
System::Threading::{
RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
},
};
/// Waits for the next SIGCHLD signal.
struct Pipe {
/// The sender channel for the SIGCHLD signal.
sender: Sender<()>,
/// The receiver channel for the SIGCHLD signal.
receiver: Receiver<()>,
}
impl Pipe {
/// Creates a new pipe.
fn new() -> io::Result<Pipe> {
let (sender, receiver) = bounded(1);
Ok(Pipe {
sender,
receiver
})
}
/// Waits for the next SIGCHLD signal.
async fn wait(&self) {
self.receiver.recv().await.ok();
}
/// Register a process object into this pipe.
fn register(&self, child: &std::process::Child) -> io::Result<()> {
// Called when a child exits.
#[allow(clippy::infallible_destructuring_match)]
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
let reaper = match &crate::Reaper::get().sys {
super::Reaper::Signal(reaper) => reaper,
};
reaper.pipe.sender.try_send(()).ok();
}
// Register this child process to invoke `callback` on exit.
let mut wait_object = 0;
let ret = unsafe {
RegisterWaitForSingleObject(
&mut wait_object,
child.as_raw_handle() as HANDLE,
Some(callback),
std::ptr::null_mut(),
INFINITE,
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
)
};
if ret == 0 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
}
} else if #[cfg(unix)] {
use async_signal::{Signal, Signals};
use futures_lite::prelude::*;
/// Waits for the next SIGCHLD signal.
struct Pipe {
/// The iterator over SIGCHLD signals.
signals: Signals,
}
impl Pipe {
/// Creates a new pipe.
fn new() -> io::Result<Pipe> {
Ok(Pipe {
signals: Signals::new(Some(Signal::Child))?,
})
}
/// Waits for the next SIGCHLD signal.
async fn wait(&self) {
(&self.signals).next().await;
}
/// Register a process object into this pipe.
fn register(&self, _child: &std::process::Child) -> io::Result<()> {
Ok(())
}
}
}
}

188
src/reaper/wait.rs Normal file
View File

@ -0,0 +1,188 @@
//! A version of the reaper that waits on some polling primitive.
//!
//! This uses:
//!
//! - pidfd on Linux/Android
use async_channel::{Receiver, Sender};
use async_task::Runnable;
use futures_lite::future;
use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use std::task::{Context, Poll};
/// The zombie process reaper.
pub(crate) struct Reaper {
/// The channel for sending new runnables.
sender: Sender<Runnable>,
/// The channel for receiving new runnables.
recv: Receiver<Runnable>,
/// Number of zombie processes.
zombies: AtomicUsize,
}
impl Reaper {
/// Create a new reaper.
pub(crate) fn new() -> Self {
let (sender, recv) = async_channel::unbounded();
Self {
sender,
recv,
zombies: AtomicUsize::new(0),
}
}
/// Reap zombie processes forever.
pub(crate) async fn reap(&'static self) -> ! {
loop {
// Fetch the next task.
let task = match self.recv.recv().await {
Ok(task) => task,
Err(_) => panic!("sender should never be closed"),
};
// Poll the task.
task.run();
}
}
/// Register a child into this reaper.
pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
Ok(ChildGuard {
inner: Some(WaitableChild::new(child)?),
})
}
/// Wait for a child to complete.
pub(crate) async fn status(
&'static self,
child: &Mutex<crate::ChildGuard>,
) -> io::Result<std::process::ExitStatus> {
future::poll_fn(|cx| {
// Lock the child.
let mut child = child.lock().unwrap();
// Get the inner child value.
let inner = match &mut child.inner {
super::ChildGuard::Wait(inner) => inner,
_ => unreachable!()
};
// Poll for the next value.
inner.inner.as_mut().unwrap().poll_wait(cx)
})
.await
}
/// Do we have any registered zombie processes?
pub(crate) fn has_zombies(&'static self) -> bool {
self.zombies.load(Ordering::SeqCst) > 0
}
}
/// The wrapper around the child.
pub(crate) struct ChildGuard {
inner: Option<WaitableChild>,
}
impl ChildGuard {
/// Get a mutable reference to the inner child.
pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
self.inner.as_mut().unwrap().get_mut()
}
/// Begin the reaping process for this child.
pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
// Create a future for polling this child.
let future = {
let mut inner = self.inner.take().unwrap();
async move {
// Increment the zombie count.
reaper.zombies.fetch_add(1, Ordering::Relaxed);
// Decrement the zombie count once we are done.
let _guard = crate::CallOnDrop(|| {
reaper.zombies.fetch_sub(1, Ordering::SeqCst);
});
// Wait on this child forever.
let result = future::poll_fn(|cx| inner.poll_wait(cx)).await;
if let Err(e) = result {
tracing::error!("error while polling zombie process: {}", e);
}
}
};
// Create a function for scheduling this future.
let schedule = move |runnable| {
reaper.sender.try_send(runnable).ok();
};
// Spawn the task and run it forever.
let (runnable, task) = async_task::spawn(future, schedule);
task.detach();
runnable.schedule();
}
}
cfg_if::cfg_if! {
if #[cfg(any(
target_os = "linux",
target_os = "android"
))] {
use async_io::Async;
use rustix::process;
use std::os::unix::io::OwnedFd;
/// Waitable version of `std::process::Child`
struct WaitableChild {
child: std::process::Child,
handle: Async<OwnedFd>,
}
impl WaitableChild {
fn new(child: std::process::Child) -> io::Result<Self> {
let pidfd = process::pidfd_open(
process::Pid::from_child(&child),
process::PidfdFlags::empty()
)?;
Ok(Self {
child,
handle: Async::new(pidfd)?
})
}
fn get_mut(&mut self) -> &mut std::process::Child {
&mut self.child
}
fn poll_wait(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<std::process::ExitStatus>> {
loop {
if let Some(status) = self.child.try_wait()? {
return Poll::Ready(Ok(status));
}
// Wait for us to become readable.
futures_lite::ready!(self.handle.poll_readable(cx))?;
}
}
}
/// Tell if we are able to use this backend.
pub(crate) fn available() -> bool {
// Create a Pidfd for the current process and see if it works.
let result = process::pidfd_open(
process::getpid(),
process::PidfdFlags::empty()
);
// Tell if it was okay or not.
result.is_ok()
}
}
}

View File

@ -7,7 +7,10 @@ use std::os::unix::process::CommandExt as _;
use crate::Command;
/// Unix-specific extensions to the [`Command`] builder.
pub trait CommandExt {
///
/// This trait is sealed: it cannot be implemented outside `async-process`.
/// This is so that future additional methods are not breaking changes.
pub trait CommandExt: crate::sealed::Sealed {
/// Sets the child process's user ID. This translates to a
/// `setuid` call in the child process. Failure in the `setuid`
/// call will cause the spawn to fail.
@ -17,39 +20,6 @@ pub trait CommandExt {
/// the same semantics as the `uid` field.
fn gid(&mut self, id: u32) -> &mut Command;
/// Schedules a closure to be run just before the `exec` function is
/// invoked.
///
/// The closure is allowed to return an I/O error whose OS error code will
/// be communicated back to the parent and returned as an error from when
/// the spawn was requested.
///
/// Multiple closures can be registered and they will be called in order of
/// their registration. If a closure returns `Err` then no further closures
/// will be called and the spawn operation will immediately return with a
/// failure.
///
/// # Safety
///
/// This closure will be run in the context of the child process after a
/// `fork`. This primarily means that any modifications made to memory on
/// behalf of this closure will **not** be visible to the parent process.
/// This is often a very constrained environment where normal operations
/// like `malloc` or acquiring a mutex are not guaranteed to work (due to
/// other threads perhaps still running when the `fork` was run).
///
/// This also means that all resources such as file descriptors and
/// memory-mapped regions got duplicated. It is your responsibility to make
/// sure that the closure does not violate library invariants by making
/// invalid use of these duplicates.
///
/// When this closure is run, aspects such as the stdio file descriptors and
/// working directory have successfully been changed, so output to these
/// locations may not appear where intended.
unsafe fn pre_exec<F>(&mut self, f: F) -> &mut Command
where
F: FnMut() -> io::Result<()> + Send + Sync + 'static;
/// Performs all the required setup by this `Command`, followed by calling
/// the `execvp` syscall.
///
@ -88,6 +58,7 @@ pub trait CommandExt {
S: AsRef<OsStr>;
}
impl crate::sealed::Sealed for Command {}
impl CommandExt for Command {
fn uid(&mut self, id: u32) -> &mut Command {
self.inner.uid(id);
@ -99,14 +70,6 @@ impl CommandExt for Command {
self
}
unsafe fn pre_exec<F>(&mut self, f: F) -> &mut Command
where
F: FnMut() -> io::Result<()> + Send + Sync + 'static,
{
self.inner.pre_exec(f);
self
}
fn exec(&mut self) -> io::Error {
self.inner.exec()
}

View File

@ -1,25 +1,41 @@
//! Windows-specific extensions.
use std::ffi::OsStr;
use std::os::windows::io::{AsRawHandle, RawHandle};
use std::os::windows::process::CommandExt as _;
use crate::{Child, Command};
/// Windows-specific extensions to the [`Command`] builder.
pub trait CommandExt {
///
/// This trait is sealed: it cannot be implemented outside `async-process`.
/// This is so that future additional methods are not breaking changes.
pub trait CommandExt: crate::sealed::Sealed {
/// Sets the [process creation flags][1] to be passed to `CreateProcess`.
///
/// These will always be ORed with `CREATE_UNICODE_ENVIRONMENT`.
///
/// [1]: https://docs.microsoft.com/en-us/windows/win32/procthread/process-creation-flags
fn creation_flags(&mut self, flags: u32) -> &mut Command;
/// Append literal text to the command line without any quoting or escaping.
///
/// This is useful for passing arguments to `cmd.exe /c`, which doesn't follow
/// `CommandLineToArgvW` escaping rules.
fn raw_arg<S: AsRef<OsStr>>(&mut self, text_to_append_as_is: S) -> &mut Command;
}
impl crate::sealed::Sealed for Command {}
impl CommandExt for Command {
fn creation_flags(&mut self, flags: u32) -> &mut Command {
self.inner.creation_flags(flags);
self
}
fn raw_arg<S: AsRef<OsStr>>(&mut self, text_to_append_as_is: S) -> &mut Command {
self.inner.raw_arg(text_to_append_as_is);
self
}
}
impl AsRawHandle for Child {

26
tests/sleep.rs Normal file
View File

@ -0,0 +1,26 @@
//! Sleep test.
use async_process::Command;
use futures_lite::future::block_on;
#[cfg(unix)]
#[test]
fn unix_sleep() {
block_on(async {
let status = Command::new("sleep").arg("1").status().await.unwrap();
assert!(status.success());
});
}
#[cfg(windows)]
#[test]
fn windows_sleep() {
block_on(async {
let status = Command::new("ping")
.args(["-n", "5", "127.0.0.1"])
.status()
.await
.unwrap();
assert!(status.success());
});
}

View File

@ -11,7 +11,7 @@ use futures_lite::{future, prelude::*};
fn smoke() {
future::block_on(async {
let p = if cfg!(target_os = "windows") {
Command::new("cmd").args(&["/C", "exit 0"]).spawn()
Command::new("cmd").args(["/C", "exit 0"]).spawn()
} else {
Command::new("true").spawn()
};
@ -21,6 +21,25 @@ fn smoke() {
})
}
#[test]
fn smoke_driven() {
future::block_on(
async {
async_process::driver().await;
}
.or(async {
let p = if cfg!(target_os = "windows") {
Command::new("cmd").args(["/C", "exit 0"]).spawn()
} else {
Command::new("true").spawn()
};
assert!(p.is_ok());
let mut p = p.unwrap();
assert!(p.status().await.unwrap().success());
}),
)
}
#[test]
fn smoke_failure() {
assert!(Command::new("if-this-is-a-binary-then-the-world-has-ended")
@ -32,7 +51,7 @@ fn smoke_failure() {
fn exit_reported_right() {
future::block_on(async {
let p = if cfg!(target_os = "windows") {
Command::new("cmd").args(&["/C", "exit 1"]).spawn()
Command::new("cmd").args(["/C", "exit 1"]).spawn()
} else {
Command::new("false").spawn()
};
@ -84,7 +103,7 @@ fn stdout_works() {
future::block_on(async {
if cfg!(target_os = "windows") {
let mut cmd = Command::new("cmd");
cmd.args(&["/C", "echo foobar"]).stdout(Stdio::piped());
cmd.args(["/C", "echo foobar"]).stdout(Stdio::piped());
assert_eq!(run_output(cmd).await, "foobar\r\n");
} else {
let mut cmd = Command::new("echo");
@ -142,7 +161,7 @@ fn test_process_status() {
future::block_on(async {
let mut status = if cfg!(target_os = "windows") {
Command::new("cmd")
.args(&["/C", "exit 1"])
.args(["/C", "exit 1"])
.status()
.await
.unwrap()
@ -153,7 +172,7 @@ fn test_process_status() {
status = if cfg!(target_os = "windows") {
Command::new("cmd")
.args(&["/C", "exit 0"])
.args(["/C", "exit 0"])
.status()
.await
.unwrap()
@ -186,7 +205,7 @@ fn test_process_output_output() {
stderr,
} = if cfg!(target_os = "windows") {
Command::new("cmd")
.args(&["/C", "echo hello"])
.args(["/C", "echo hello"])
.output()
.await
.unwrap()
@ -210,7 +229,7 @@ fn test_process_output_error() {
stderr,
} = if cfg!(target_os = "windows") {
Command::new("cmd")
.args(&["/C", "mkdir ."])
.args(["/C", "mkdir ."])
.output()
.await
.unwrap()
@ -228,7 +247,7 @@ fn test_process_output_error() {
fn test_finish_once() {
future::block_on(async {
let mut prog = if cfg!(target_os = "windows") {
Command::new("cmd").args(&["/C", "exit 1"]).spawn().unwrap()
Command::new("cmd").args(["/C", "exit 1"]).spawn().unwrap()
} else {
Command::new("false").spawn().unwrap()
};
@ -240,7 +259,7 @@ fn test_finish_once() {
fn test_finish_twice() {
future::block_on(async {
let mut prog = if cfg!(target_os = "windows") {
Command::new("cmd").args(&["/C", "exit 1"]).spawn().unwrap()
Command::new("cmd").args(["/C", "exit 1"]).spawn().unwrap()
} else {
Command::new("false").spawn().unwrap()
};
@ -254,7 +273,7 @@ fn test_wait_with_output_once() {
future::block_on(async {
let prog = if cfg!(target_os = "windows") {
Command::new("cmd")
.args(&["/C", "echo hello"])
.args(["/C", "echo hello"])
.stdout(Stdio::piped())
.spawn()
.unwrap()
@ -308,7 +327,7 @@ fn test_override_env() {
let mut cmd = env_cmd();
cmd.env_clear().env("RUN_TEST_NEW_ENV", "123");
if let Some(p) = env::var_os("PATH") {
cmd.env("PATH", &p);
cmd.env("PATH", p);
}
let result = cmd.output().await.unwrap();
let output = String::from_utf8_lossy(&result.stdout).to_string();
@ -428,3 +447,39 @@ fn test_spawn_multiple_with_stdio() {
assert_eq!(out2.stderr, b"bar\n");
});
}
#[cfg(unix)]
#[test]
fn test_into_inner() {
futures_lite::future::block_on(async {
use crate::Command;
use std::io::Result;
use std::process::Stdio;
use std::str::from_utf8;
use futures_lite::AsyncReadExt;
let mut ls_child = Command::new("cat")
.arg("Cargo.toml")
.stdout(Stdio::piped())
.spawn()?;
let stdio: Stdio = ls_child.stdout.take().unwrap().into_stdio().await?;
let mut echo_child = Command::new("grep")
.arg("async")
.stdin(stdio)
.stdout(Stdio::piped())
.spawn()?;
let mut buf = vec![];
let mut stdout = echo_child.stdout.take().unwrap();
stdout.read_to_end(&mut buf).await?;
dbg!(from_utf8(&buf).unwrap_or(""));
Result::Ok(())
})
.unwrap();
}