Adding the `into_stdio` make async-process more flexible. (#13)

This commit is contained in:
BinCheng 2021-04-24 16:39:11 +08:00 committed by GitHub
parent 716cc8aa01
commit 7d9dc0b914
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 165 additions and 2 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "async-process"
version = "1.0.2"
version = "1.1.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
description = "Async interface for working with processes"
@ -19,6 +19,7 @@ once_cell = "1.4.1"
[target.'cfg(unix)'.dependencies]
async-io = "1.0.0"
libc = "0.2.88"
[target.'cfg(unix)'.dependencies.signal-hook]
version = "0.3.0"
@ -33,5 +34,5 @@ version = "0.3.9"
default-features = false
features = [
"winbase",
"winnt",
"winnt"
]

View File

@ -60,8 +60,12 @@ use std::thread;
#[cfg(unix)]
use async_io::Async;
#[cfg(unix)]
use std::os::unix::io::AsRawFd;
#[cfg(windows)]
use blocking::Unblock;
use event_listener::Event;
use futures_lite::{future, io, prelude::*};
use once_cell::sync::Lazy;
@ -443,6 +447,38 @@ pub struct ChildStdin(
#[cfg(unix)] Async<std::process::ChildStdin>,
);
impl ChildStdin {
/// Convert async_process::ChildStdin into std::process::Stdio.
///
/// You can use it to associate to the next process.
///
/// # Examples
///
/// ```no_run
/// # futures_lite::future::block_on(async {
/// use async_process::Command;
/// use std::process::Stdio;
///
/// let mut ls_child = Command::new("ls").stdin(Stdio::piped()).spawn()?;
/// let stdio:Stdio = ls_child.stdin.take().unwrap().into_stdio().await?;
///
/// let mut echo_child = Command::new("echo").arg("./").stdout(stdio).spawn()?;
///
/// # std::io::Result::Ok(()) });
/// ```
pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
cfg_if::cfg_if! {
if #[cfg(windows)] {
Ok(self.0.into_inner().await.into())
} else if #[cfg(unix)] {
let child_stdin = self.0.into_inner()?;
blocking_fd(child_stdin.as_raw_fd())?;
Ok(child_stdin.into())
}
}
}
}
impl io::AsyncWrite for ChildStdin {
fn poll_write(
mut self: Pin<&mut Self>,
@ -470,6 +506,41 @@ pub struct ChildStdout(
#[cfg(unix)] Async<std::process::ChildStdout>,
);
impl ChildStdout {
/// Convert async_process::ChildStdout into std::process::Stdio.
///
/// You can use it to associate to the next process.
///
/// # Examples
///
/// ```no_run
/// # futures_lite::future::block_on(async {
/// use async_process::Command;
/// use std::process::Stdio;
/// use std::io::Read;
/// use futures_lite::AsyncReadExt;
///
/// let mut ls_child = Command::new("ls").stdout(Stdio::piped()).spawn()?;
/// let stdio:Stdio = ls_child.stdout.take().unwrap().into_stdio().await?;
///
/// let mut echo_child = Command::new("echo").stdin(stdio).stdout(Stdio::piped()).spawn()?;
/// let mut buf = vec![];
/// echo_child.stdout.take().unwrap().read(&mut buf).await;
/// # std::io::Result::Ok(()) });
/// ```
pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
cfg_if::cfg_if! {
if #[cfg(windows)] {
Ok(self.0.into_inner().await.into())
} else if #[cfg(unix)] {
let child_stdout = self.0.into_inner()?;
blocking_fd(child_stdout.as_raw_fd())?;
Ok(child_stdout.into())
}
}
}
}
impl io::AsyncRead for ChildStdout {
fn poll_read(
mut self: Pin<&mut Self>,
@ -489,6 +560,37 @@ pub struct ChildStderr(
#[cfg(unix)] Async<std::process::ChildStderr>,
);
impl ChildStderr {
/// Convert async_process::ChildStderr into std::process::Stdio.
///
/// You can use it to associate to the next process.
///
/// # Examples
///
/// ```no_run
/// # futures_lite::future::block_on(async {
/// use async_process::Command;
/// use std::process::Stdio;
///
/// let mut ls_child = Command::new("ls").arg("x").stderr(Stdio::piped()).spawn()?;
/// let stdio:Stdio = ls_child.stderr.take().unwrap().into_stdio().await?;
///
/// let mut echo_child = Command::new("echo").stdin(stdio).spawn()?;
/// # std::io::Result::Ok(()) });
/// ```
pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
cfg_if::cfg_if! {
if #[cfg(windows)] {
Ok(self.0.into_inner().await.into())
} else if #[cfg(unix)] {
let child_stderr = self.0.into_inner()?;
blocking_fd(child_stderr.as_raw_fd())?;
Ok(child_stderr.into())
}
}
}
}
impl io::AsyncRead for ChildStderr {
fn poll_read(
mut self: Pin<&mut Self>,
@ -837,3 +939,63 @@ impl Command {
async { child?.output().await }
}
}
#[cfg(unix)]
/// Moves `Fd` out of nonblocking mode.
fn blocking_fd(fd: std::os::unix::io::RawFd) -> io::Result<()> {
// Helper macro to execute a system call that returns an `io::Result`.
macro_rules! syscall {
($fn: ident ( $($arg: expr),* $(,)* ) ) => {{
let res = unsafe { libc::$fn($($arg, )*) };
if res == -1 {
return Err(std::io::Error::last_os_error());
} else {
res
}
}};
}
let res = syscall!(fcntl(fd, libc::F_GETFL));
syscall!(fcntl(fd, libc::F_SETFL, res & !libc::O_NONBLOCK));
Ok(())
}
#[cfg(unix)]
mod test {
#[test]
fn test_into_inner() {
futures_lite::future::block_on(async {
use crate::Command;
use std::io::Result;
use std::process::Stdio;
use std::str::from_utf8;
use futures_lite::AsyncReadExt;
let mut ls_child = Command::new("cat")
.arg("Cargo.toml")
.stdout(Stdio::piped())
.spawn()?;
let stdio: Stdio = ls_child.stdout.take().unwrap().into_stdio().await?;
let mut echo_child = Command::new("grep")
.arg("async")
.stdin(stdio)
.stdout(Stdio::piped())
.spawn()?;
let mut buf = vec![];
let mut stdout = echo_child.stdout.take().unwrap();
stdout.read_to_end(&mut buf).await?;
dbg!(from_utf8(&buf).unwrap_or(""));
Result::Ok(())
})
.unwrap();
}
}