From 251ab32f59ccb96c6a53728367ba7c39c5bd2923 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Tue, 18 Aug 2020 20:19:25 +0000 Subject: [PATCH] Initial commit --- .gitignore | 2 + Cargo.toml | 33 +++++ examples/foo.rs | 29 ++++ rustfmt.toml | 1 + src/lib.rs | 349 +++++++++++++++++++++++++++++++++++++++++++++ tests/process.rs | 363 +++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 777 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 examples/foo.rs create mode 100644 rustfmt.toml create mode 100644 src/lib.rs create mode 100644 tests/process.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..34711bc --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "async-process" +version = "0.1.0" +authors = ["Stjepan Glavina "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-channel = "1.4.0" +cfg-if = "0.1.10" +futures-lite = "0.1.11" +once_cell = "1.4.1" + +[target.'cfg(unix)'.dependencies] +async-io = "0.1.11" +signal-hook = { version = "0.1.16", default-features = false } + +[target.'cfg(windows)'.dependencies] +blocking = "0.5.1" + +[target.'cfg(windows)'.dependencies.winapi] +version = "0.3.9" +features = [ + "handleapi", + "winerror", + "minwindef", + "processthreadsapi", + "synchapi", + "threadpoollegacyapiset", + "winbase", + "winnt", +] diff --git a/examples/foo.rs b/examples/foo.rs new file mode 100644 index 0000000..94d74ad --- /dev/null +++ b/examples/foo.rs @@ -0,0 +1,29 @@ +use std::io; +use std::os::unix::process::ExitStatusExt; + +use async_process::{Command, ExitStatus, Stdio}; +use futures_lite::*; + +fn main() -> io::Result<()> { + future::block_on(async { + // dbg!(std::process::Command::new("ls").arg(".").spawn()?.wait_with_output())?; + + dbg!( + Command::new("ls") + .arg(".") + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .status() + .await + )?; + + // let mut child = Command::new("/bin/sh") + // .arg("-c") + // .arg("kill -9 $$") + // .spawn()?; + // let status = child.status().await?; + // dbg!(status); + + Ok(()) + }) +} diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..1082fd8 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +version = "Two" diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..8dfc42e --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,349 @@ +//! Async execution and interaction with processes. + +#![cfg_attr(unix, forbid(unsafe_code))] +#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] + +use std::ffi::OsStr; +use std::path::Path; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; +use std::thread; + +use async_channel::{Receiver, Sender}; +#[cfg(unix)] +use async_io::Async; +#[cfg(windows)] +use blocking::Unblock; +use futures_lite::*; +use once_cell::sync::Lazy; + +#[doc(no_inline)] +pub use std::process::{ExitStatus, Output, Stdio}; + +pub struct Child { + pub stdin: Option, + pub stdout: Option, + pub stderr: Option, + + child: Arc>, + exited: Receiver<()>, +} + +impl Child { + fn new(mut child: std::process::Child) -> io::Result { + cfg_if::cfg_if! { + if #[cfg(windows)] { + use std::os::windows::io::AsRawHandle; + use std::sync::mpsc; + + use winapi::um::{ + winbase::{RegisterWaitForSingleObject, INFINITE}, + winnt::{BOOLEAN, HANDLE, PVOID, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE}, + }; + + // This channel is used to simulate SIGCHLD on Windows. + static SIGCHLD: Lazy<(mpsc::SyncSender<()>, Mutex>)> = + Lazy::new(|| { + let (s, r) = mpsc::sync_channel(1); + (s, Mutex::new(r)) + }); + + // Called when a child exits. + unsafe extern "system" fn callback(_: PVOID, _: BOOLEAN) { + let _ = SIGCHLD.0.try_send(()); + } + + // Register this child process to invoke `callback` on exit. + let mut wait_object = std::ptr::null_mut(); + let ret = unsafe { + RegisterWaitForSingleObject( + &mut wait_object, + child.as_raw_handle() as HANDLE, + Some(callback), + std::ptr::null_mut(), + INFINITE, + WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE, + ) + }; + if ret == 0 { + return Err(io::Error::last_os_error()); + } + + // Waits for the next SIGCHLD signal. + fn wait_sigchld() { + let _ = SIGCHLD.1.lock().unwrap().recv(); + } + + // Wraps a sync I/O type into an async I/O type. + fn wrap(io: T) -> io::Result> { + Ok(Unblock::new(io)) + } + + } else if #[cfg(unix)] { + // Waits for the next SIGCHLD signal. + fn wait_sigchld() { + static SIGNALS: Lazy = Lazy::new(|| { + signal_hook::iterator::Signals::new(&[signal_hook::SIGCHLD]) + .expect("cannot set signal handler for SIGCHLD") + }); + SIGNALS.forever().next(); + } + + // Wraps a sync I/O type into an async I/O type. + fn wrap(io: T) -> io::Result> { + Async::new(io) + } + } + } + + // An entry in the list of running child processes. + struct Entry { + child: Arc>, + _exited: Sender<()>, + } + + // The global list of running child processes. + static CHILDREN: Lazy>> = Lazy::new(|| { + // Start a thread that handles SIGCHLD and notifies tasks when child processes exit. + thread::Builder::new() + .name("async-process".to_string()) + .spawn(move || { + loop { + // Wait for the next SIGCHLD signal. + wait_sigchld(); + + // Remove processes that have exited. When an entry is removed from this + // `Vec`, its associated `Sender` is dropped, thus disconnecting the + // channel and waking up the task waiting on the `Receiver`. + CHILDREN.lock().unwrap().retain(|entry| { + let mut child = entry.child.lock().unwrap(); + child.try_wait().expect("error waiting a child").is_none() + }); + } + }) + .expect("cannot spawn async-process thread"); + + Mutex::new(Vec::new()) + }); + + // Convert sync I/O types into async I/O types. + let stdin = child.stdin.take().map(wrap).transpose()?.map(ChildStdin); + let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout); + let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr); + + // Register the child process in the global list. + let child = Arc::new(Mutex::new(child)); + let (sender, exited) = async_channel::bounded(1); + CHILDREN.lock().unwrap().push(Entry { + child: child.clone(), + _exited: sender, + }); + + Ok(Child { + stdin, + stdout, + stderr, + child, + exited, + }) + } + + pub fn id(&self) -> u32 { + self.child.lock().unwrap().id() + } + + pub fn kill(&mut self) -> io::Result<()> { + self.child.lock().unwrap().kill() + } + + // NOTE: unlike status(), does not drop stdin + pub fn try_status(&mut self) -> io::Result> { + self.child.lock().unwrap().try_wait() + } + + // NOTE: drops stdin + pub fn status(&mut self) -> impl Future> { + self.stdin.take(); + let child = self.child.clone(); + let exited = self.exited.clone(); + + async move { + let _ = exited.recv().await; + child.lock().unwrap().wait() + } + } + + // NOTE: this closes stdin and drains stdout+stderr + pub fn output(mut self) -> impl Future> { + let status = self.status(); + + let stdout = self.stdout.take(); + let stdout = async move { + let mut v = Vec::new(); + if let Some(mut s) = stdout { + s.read_to_end(&mut v).await?; + } + Ok(v) + }; + + let stderr = self.stderr.take(); + let stderr = async move { + let mut v = Vec::new(); + if let Some(mut s) = stderr { + s.read_to_end(&mut v).await?; + } + Ok(v) + }; + + async move { + let (status, (stdout, stderr)) = + future::try_join(status, future::try_join(stdout, stderr)).await?; + Ok(Output { + status, + stdout, + stderr, + }) + } + } +} + +pub struct ChildStdin( + #[cfg(windows)] Unblock, + #[cfg(unix)] Async, +); + +impl AsyncWrite for ChildStdin { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_close(cx) + } +} + +pub struct ChildStdout( + #[cfg(windows)] Unblock, + #[cfg(unix)] Async, +); + +impl AsyncRead for ChildStdout { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_read(cx, buf) + } +} + +pub struct ChildStderr( + #[cfg(windows)] Unblock, + #[cfg(unix)] Async, +); + +impl AsyncRead for ChildStderr { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_read(cx, buf) + } +} + +pub struct Command(std::process::Command); + +impl Command { + pub fn new>(program: S) -> Command { + Command(std::process::Command::new(program)) + } + + pub fn arg>(&mut self, arg: S) -> &mut Command { + self.0.arg(arg); + self + } + + pub fn args(&mut self, args: I) -> &mut Command + where + I: IntoIterator, + S: AsRef, + { + self.0.args(args); + self + } + + pub fn env(&mut self, key: K, val: V) -> &mut Command + where + K: AsRef, + V: AsRef, + { + self.0.env(key, val); + self + } + + pub fn envs(&mut self, vars: I) -> &mut Command + where + I: IntoIterator, + K: AsRef, + V: AsRef, + { + self.0.envs(vars); + self + } + + pub fn env_remove>(&mut self, key: K) -> &mut Command { + self.0.env_remove(key); + self + } + + pub fn env_clear(&mut self) -> &mut Command { + self.0.env_clear(); + self + } + + pub fn current_dir>(&mut self, dir: P) -> &mut Command { + self.0.current_dir(dir); + self + } + + pub fn stdin>(&mut self, cfg: T) -> &mut Command { + self.0.stdin(cfg); + self + } + + pub fn stdout>(&mut self, cfg: T) -> &mut Command { + self.0.stdout(cfg); + self + } + + pub fn stderr>(&mut self, cfg: T) -> &mut Command { + self.0.stderr(cfg); + self + } + + pub fn spawn(&mut self) -> io::Result { + Child::new(self.0.spawn()?) + } + + pub fn status(&mut self) -> impl Future> { + let child = self.spawn(); + async { child?.status().await } + } + + pub fn output(&mut self) -> impl Future> { + self.0.stdout(Stdio::piped()); + self.0.stderr(Stdio::piped()); + let child = self.spawn(); + async { child?.output().await } + } +} diff --git a/tests/process.rs b/tests/process.rs new file mode 100644 index 0000000..0d457ec --- /dev/null +++ b/tests/process.rs @@ -0,0 +1,363 @@ +use std::env; +use std::str; + +use async_process::{Command, Output, Stdio}; +use futures_lite::*; + +#[test] +fn smoke() { + future::block_on(async { + let p = if cfg!(target_os = "windows") { + Command::new("cmd").args(&["/C", "exit 0"]).spawn() + } else { + Command::new("true").spawn() + }; + assert!(p.is_ok()); + let mut p = p.unwrap(); + assert!(p.status().await.unwrap().success()); + }) +} + +#[test] +fn smoke_failure() { + match Command::new("if-this-is-a-binary-then-the-world-has-ended").spawn() { + Ok(..) => panic!(), + Err(..) => {} + } +} + +#[test] +fn exit_reported_right() { + future::block_on(async { + let p = if cfg!(target_os = "windows") { + Command::new("cmd").args(&["/C", "exit 1"]).spawn() + } else { + Command::new("false").spawn() + }; + assert!(p.is_ok()); + let mut p = p.unwrap(); + assert!(p.status().await.unwrap().code() == Some(1)); + drop(p.status().await); + }) +} + +#[test] +#[cfg(unix)] +fn signal_reported_right() { + use std::os::unix::process::ExitStatusExt; + + future::block_on(async { + let mut p = Command::new("/bin/sh") + .arg("-c") + .arg("read a") + .stdin(Stdio::piped()) + .spawn() + .unwrap(); + p.kill().unwrap(); + match p.status().await.unwrap().signal() { + Some(9) => {} + result => panic!("not terminated by signal 9 (instead, {:?})", result), + } + }) +} + +pub async fn run_output(mut cmd: Command) -> String { + let p = cmd.spawn(); + assert!(p.is_ok()); + let mut p = p.unwrap(); + assert!(p.stdout.is_some()); + let mut ret = String::new(); + p.stdout + .as_mut() + .unwrap() + .read_to_string(&mut ret) + .await + .unwrap(); + assert!(p.status().await.unwrap().success()); + return ret; +} + +#[test] +fn stdout_works() { + future::block_on(async { + if cfg!(target_os = "windows") { + let mut cmd = Command::new("cmd"); + cmd.args(&["/C", "echo foobar"]).stdout(Stdio::piped()); + assert_eq!(run_output(cmd).await, "foobar\r\n"); + } else { + let mut cmd = Command::new("echo"); + cmd.arg("foobar").stdout(Stdio::piped()); + assert_eq!(run_output(cmd).await, "foobar\n"); + } + }) +} + +#[test] +fn set_current_dir_works() { + future::block_on(async { + let mut cmd = Command::new("/bin/sh"); + cmd.arg("-c") + .arg("pwd") + .current_dir("/") + .stdout(Stdio::piped()); + assert_eq!(run_output(cmd).await, "/\n"); + }) +} + +#[test] +fn stdin_works() { + future::block_on(async { + let mut p = Command::new("/bin/sh") + .arg("-c") + .arg("read line; echo $line") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + p.stdin + .as_mut() + .unwrap() + .write("foobar".as_bytes()) + .await + .unwrap(); + drop(p.stdin.take()); + let mut out = String::new(); + p.stdout + .as_mut() + .unwrap() + .read_to_string(&mut out) + .await + .unwrap(); + assert!(p.status().await.unwrap().success()); + assert_eq!(out, "foobar\n"); + }) +} + +#[test] +fn test_process_status() { + future::block_on(async { + let mut status = if cfg!(target_os = "windows") { + Command::new("cmd") + .args(&["/C", "exit 1"]) + .status() + .await + .unwrap() + } else { + Command::new("false").status().await.unwrap() + }; + assert!(status.code() == Some(1)); + + status = if cfg!(target_os = "windows") { + Command::new("cmd") + .args(&["/C", "exit 0"]) + .status() + .await + .unwrap() + } else { + Command::new("true").status().await.unwrap() + }; + assert!(status.success()); + }) +} + +#[test] +fn test_process_output_fail_to_start() { + future::block_on(async { + match Command::new("/no-binary-by-this-name-should-exist") + .output() + .await + { + Err(e) => assert_eq!(e.kind(), io::ErrorKind::NotFound), + Ok(..) => panic!(), + } + }) +} + +#[test] +fn test_process_output_output() { + future::block_on(async { + let Output { + status, + stdout, + stderr, + } = if cfg!(target_os = "windows") { + Command::new("cmd") + .args(&["/C", "echo hello"]) + .output() + .await + .unwrap() + } else { + Command::new("echo").arg("hello").output().await.unwrap() + }; + let output_str = str::from_utf8(&stdout).unwrap(); + + assert!(status.success()); + assert_eq!(output_str.trim().to_string(), "hello"); + assert_eq!(stderr, Vec::new()); + }) +} + +#[test] +fn test_process_output_error() { + future::block_on(async { + let Output { + status, + stdout, + stderr, + } = if cfg!(target_os = "windows") { + Command::new("cmd") + .args(&["/C", "mkdir ."]) + .output() + .await + .unwrap() + } else { + Command::new("mkdir").arg("./").output().await.unwrap() + }; + + assert!(status.code() == Some(1)); + assert_eq!(stdout, Vec::new()); + assert!(!stderr.is_empty()); + }) +} + +#[test] +fn test_finish_once() { + future::block_on(async { + let mut prog = if cfg!(target_os = "windows") { + Command::new("cmd").args(&["/C", "exit 1"]).spawn().unwrap() + } else { + Command::new("false").spawn().unwrap() + }; + assert!(prog.status().await.unwrap().code() == Some(1)); + }) +} + +#[test] +fn test_finish_twice() { + future::block_on(async { + let mut prog = if cfg!(target_os = "windows") { + Command::new("cmd").args(&["/C", "exit 1"]).spawn().unwrap() + } else { + Command::new("false").spawn().unwrap() + }; + assert!(prog.status().await.unwrap().code() == Some(1)); + assert!(prog.status().await.unwrap().code() == Some(1)); + }) +} + +#[test] +fn test_wait_with_output_once() { + future::block_on(async { + let prog = if cfg!(target_os = "windows") { + Command::new("cmd") + .args(&["/C", "echo hello"]) + .stdout(Stdio::piped()) + .spawn() + .unwrap() + } else { + Command::new("echo") + .arg("hello") + .stdout(Stdio::piped()) + .spawn() + .unwrap() + }; + + let Output { + status, + stdout, + stderr, + } = prog.output().await.unwrap(); + let output_str = str::from_utf8(&stdout).unwrap(); + + assert!(status.success()); + assert_eq!(output_str.trim().to_string(), "hello"); + assert_eq!(stderr, Vec::new()); + }) +} + +#[cfg(all(unix, not(target_os = "android")))] +pub fn env_cmd() -> Command { + Command::new("env") +} + +#[cfg(target_os = "android")] +pub fn env_cmd() -> Command { + let mut cmd = Command::new("/system/bin/sh"); + cmd.arg("-c").arg("set"); + cmd +} + +#[cfg(windows)] +pub fn env_cmd() -> Command { + let mut cmd = Command::new("cmd"); + cmd.arg("/c").arg("set"); + cmd +} + +#[test] +fn test_override_env() { + future::block_on(async { + // In some build environments (such as chrooted Nix builds), `env` can + // only be found in the explicitly-provided PATH env variable, not in + // default places such as /bin or /usr/bin. So we need to pass through + // PATH to our sub-process. + let mut cmd = env_cmd(); + cmd.env_clear().env("RUN_TEST_NEW_ENV", "123"); + if let Some(p) = env::var_os("PATH") { + cmd.env("PATH", &p); + } + let result = cmd.output().await.unwrap(); + let output = String::from_utf8_lossy(&result.stdout).to_string(); + + assert!( + output.contains("RUN_TEST_NEW_ENV=123"), + "didn't find RUN_TEST_NEW_ENV inside of:\n\n{}", + output + ); + }) +} + +#[test] +fn test_add_to_env() { + future::block_on(async { + let result = env_cmd() + .env("RUN_TEST_NEW_ENV", "123") + .output() + .await + .unwrap(); + let output = String::from_utf8_lossy(&result.stdout).to_string(); + + assert!( + output.contains("RUN_TEST_NEW_ENV=123"), + "didn't find RUN_TEST_NEW_ENV inside of:\n\n{}", + output + ); + }) +} + +#[test] +fn test_capture_env_at_spawn() { + future::block_on(async { + let mut cmd = env_cmd(); + cmd.env("RUN_TEST_NEW_ENV1", "123"); + + // This variable will not be present if the environment has already + // been captured above. + env::set_var("RUN_TEST_NEW_ENV2", "456"); + let result = cmd.output().await.unwrap(); + env::remove_var("RUN_TEST_NEW_ENV2"); + + let output = String::from_utf8_lossy(&result.stdout).to_string(); + + assert!( + output.contains("RUN_TEST_NEW_ENV1=123"), + "didn't find RUN_TEST_NEW_ENV1 inside of:\n\n{}", + output + ); + assert!( + output.contains("RUN_TEST_NEW_ENV2=456"), + "didn't find RUN_TEST_NEW_ENV2 inside of:\n\n{}", + output + ); + }) +}