More examples

This commit is contained in:
Stjepan Glavina 2020-04-06 22:19:54 +02:00
parent 26690c97fa
commit f9dc5c92fd
11 changed files with 179 additions and 184 deletions

View File

@ -27,6 +27,7 @@ async-h1 = "1.0.1"
async-native-tls = "0.3.3"
async-tungstenite = { version = "0.4.2", features = ["async-native-tls"] }
base64 = "0.12.0"
ctrlc = "3.1.4"
http = "0.2.1"
http-types = "1.1.0"
hyper = { version = "0.13.4", default-features = false, features = ["stream"] }

View File

@ -1,14 +1,12 @@
use std::fs;
use std::net::TcpListener;
use std::path::Path;
use std::thread;
use anyhow::Result;
use async_native_tls::TlsAcceptor;
use async_native_tls::{Identity, TlsAcceptor};
use futures::prelude::*;
use http_types::{Request, Response, StatusCode};
use piper::{Lock, Shared};
use smol::{blocking, Async, Task};
use smol::{Async, Task};
/// Serves a request and returns a response.
async fn serve(req: Request) -> http_types::Result<Response> {
@ -47,6 +45,9 @@ async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Resul
}
fn main() -> Result<()> {
let identity = Identity::from_pkcs12(include_bytes!("../identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Create a thread pool.
let num_threads = num_cpus::get_physical().max(1);
for _ in 0..num_threads {
@ -54,14 +55,9 @@ fn main() -> Result<()> {
}
smol::block_on(async {
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("identity.pfx");
let identity = blocking!(fs::read(path))?;
let tls = TlsAcceptor::new(&identity[..], "password").await?;
let http = listen(Async::<TcpListener>::bind("127.0.0.1:8000")?, None);
let https = listen(Async::<TcpListener>::bind("127.0.0.1:8001")?, Some(tls));
future::try_join(http, https).await?;
Ok(())
})
}

View File

@ -1,23 +1,13 @@
use once_cell::sync::Lazy;
use piper::Receiver;
use signal_hook::iterator::Signals;
use smol::Task;
static CTRL_C: Lazy<Receiver<()>> = Lazy::new(|| {
let (s, r) = piper::chan(100);
Task::blocking(async move {
for _ in Signals::new(&[signal_hook::SIGINT]).unwrap().forever() {
s.send(()).await;
}
})
.detach();
r
});
use futures::prelude::*;
fn main() {
let (s, ctrl_c) = piper::chan(100);
let handle = move || drop(s.send(()).now_or_never());
ctrlc::set_handler(handle).unwrap();
smol::run(async {
println!("Waiting for Ctrl-C");
CTRL_C.recv().await;
ctrl_c.recv().await;
println!("Done!");
})
}

View File

@ -1,17 +1,15 @@
use std::fs;
use std::io;
use std::net::{TcpListener, TcpStream};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use anyhow::{Error, Result};
use async_native_tls::{TlsAcceptor, TlsStream};
use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use futures::prelude::*;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use smol::{blocking, Async, Task};
use smol::{Async, Task};
async fn serve(req: Request<Body>, host: String) -> Result<Response<Body>> {
println!("Serving {}{}", host, req.uri());
@ -37,20 +35,18 @@ async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Resul
}
fn main() -> Result<()> {
let identity = Identity::from_pkcs12(include_bytes!("../identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Create a thread pool.
for _ in 0..num_cpus::get_physical().max(1) {
thread::spawn(|| smol::run(future::pending::<()>()));
}
smol::block_on(async {
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("identity.pfx");
let identity = blocking!(fs::read(path))?;
let tls = TlsAcceptor::new(&identity[..], "password").await?;
let http = listen(Async::<TcpListener>::bind("127.0.0.1:8000")?, None);
let https = listen(Async::<TcpListener>::bind("127.0.0.1:8001")?, Some(tls));
future::try_join(http, https).await?;
Ok(())
})
}

View File

@ -8,14 +8,12 @@
// 1. minica --domains localhost -ip-addresses 127.0.0.1 -ca-cert certificate.pem
// 2. openssl pkcs12 -export -out identity.pfx -inkey localhost/key.pem -in localhost/cert.pem
use std::fs;
use std::net::{TcpListener, TcpStream};
use std::path::Path;
use anyhow::Result;
use async_native_tls::TlsAcceptor;
use async_native_tls::{Identity, TlsAcceptor};
use futures::prelude::*;
use smol::{blocking, Async, Task};
use smol::{Async, Task};
const RESPONSE: &[u8] = br#"
HTTP/1.1 200 OK
@ -54,15 +52,13 @@ async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Resul
}
fn main() -> Result<()> {
smol::run(async {
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("identity.pfx");
let identity = blocking!(fs::read(path))?;
let tls = TlsAcceptor::new(&identity[..], "password").await?;
let identity = Identity::from_pkcs12(include_bytes!("../identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
smol::run(async {
let http = listen(Async::<TcpListener>::bind("127.0.0.1:8000")?, None);
let https = listen(Async::<TcpListener>::bind("127.0.0.1:8001")?, Some(tls));
future::try_join(http, https).await?;
Ok(())
})
}

View File

@ -1,27 +1,24 @@
use std::fs;
use std::net::TcpStream;
use std::path::Path;
use anyhow::Result;
use async_native_tls::{Certificate, TlsConnector};
use futures::io;
use futures::prelude::*;
use piper::Lock;
use smol::Async;
fn main() -> Result<()> {
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("certificate.pem");
let cert = native_tls::Certificate::from_pem(&fs::read(path)?)?;
// Create a TLS connector that is able to connect to localhost:7001
let mut builder = native_tls::TlsConnector::builder();
builder.add_root_certificate(cert);
let connector = async_native_tls::TlsConnector::from(builder);
builder.add_root_certificate(Certificate::from_pem(include_bytes!("../certificate.pem"))?);
let tls = TlsConnector::from(builder);
smol::run(async {
let stdin = smol::reader(std::io::stdin());
let mut stdout = smol::writer(std::io::stdout());
let stream = Async::<TcpStream>::connect("localhost:7001").await?;
let stream = connector.connect("localhost", stream).await?;
let stream = tls.connect("localhost", stream).await?;
println!("Connected to {}", stream.get_ref().get_ref().peer_addr()?);
let stream = Lock::new(stream);

View File

@ -1,12 +1,10 @@
use std::fs;
use std::net::{TcpListener, TcpStream};
use std::path::Path;
use anyhow::Result;
use async_native_tls::{TlsAcceptor, TlsStream};
use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use futures::io;
use piper::Lock;
use smol::{blocking, Async, Task};
use smol::{Async, Task};
async fn echo(stream: TlsStream<Async<TcpStream>>) -> Result<()> {
println!("Copying");
@ -16,13 +14,10 @@ async fn echo(stream: TlsStream<Async<TcpStream>>) -> Result<()> {
}
fn main() -> Result<()> {
// TODO: use native_tls::TlsAcceptor to avoid async files; do the same in other examples
let identity = Identity::from_pkcs12(include_bytes!("../identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
smol::run(async {
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("identity.pfx");
let identity = blocking!(fs::read(path))?;
let tls = TlsAcceptor::new(&identity[..], "password").await?;
let listener = Async::<TcpListener>::bind("127.0.0.1:7001")?;
println!("Listening on {}", listener.get_ref().local_addr()?);

23
examples/unix-signal.rs Normal file
View File

@ -0,0 +1,23 @@
#[cfg(unix)]
fn main() -> std::io::Result<()> {
use std::os::unix::net::UnixStream;
use futures::prelude::*;
use smol::Async;
smol::run(async {
let (a, mut b) = Async::<UnixStream>::pair()?;
signal_hook::pipe::register(signal_hook::SIGINT, a)?;
println!("Waiting for Ctrl-C");
b.read_exact(&mut [0]).await?;
println!("Done!");
Ok(())
})
}
#[cfg(not(unix))]
fn main() {
println!("This example works only on Unix systems!");
}

View File

@ -3,7 +3,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use anyhow::{bail, Context as _, Result};
use async_native_tls::TlsStream;
use async_native_tls::{Certificate, TlsConnector, TlsStream};
use async_tungstenite::WebSocketStream;
use futures::prelude::*;
use smol::Async;
@ -11,7 +11,7 @@ use tungstenite::handshake::client::Response;
use tungstenite::Message;
use url::Url;
async fn connect(addr: &str) -> Result<(WsStream, Response)> {
async fn connect(addr: &str, tls: TlsConnector) -> Result<(WsStream, Response)> {
// Parse the address.
let url = Url::parse(addr)?;
let host = url.host_str().context("cannot parse host")?.to_string();
@ -26,7 +26,7 @@ async fn connect(addr: &str) -> Result<(WsStream, Response)> {
}
"wss" => {
let stream = Async::<TcpStream>::connect(format!("{}:{}", host, port)).await?;
let stream = async_native_tls::connect(host, stream).await?;
let stream = tls.connect(host, stream).await?;
let (stream, resp) = async_tungstenite::client_async(addr, stream).await?;
Ok((WsStream::Tls(stream), resp))
}
@ -35,8 +35,13 @@ async fn connect(addr: &str) -> Result<(WsStream, Response)> {
}
fn main() -> Result<()> {
// Create a TLS connector that is able to connect to wss://localhost:9001
let mut builder = native_tls::TlsConnector::builder();
builder.add_root_certificate(Certificate::from_pem(include_bytes!("../certificate.pem"))?);
let tls = TlsConnector::from(builder);
smol::run(async {
let (mut stream, resp) = connect("wss://echo.websocket.org").await?;
let (mut stream, resp) = connect("wss://echo.websocket.org", tls).await?;
dbg!(resp);
stream.send(Message::text("Hello!")).await?;

View File

@ -1,17 +1,13 @@
#![recursion_limit = "1024"]
use std::fs;
use std::net::{TcpListener, TcpStream};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use anyhow::{Context as _, Result};
use async_native_tls::{TlsAcceptor, TlsStream};
use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use async_tungstenite::WebSocketStream;
use futures::prelude::*;
use smol::{blocking, Async, Task};
use smol::{Async, Task};
use tungstenite::Message;
async fn serve(mut stream: WsStream, host: String) -> Result<()> {
@ -49,20 +45,18 @@ async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Resul
}
fn main() -> Result<()> {
let identity = Identity::from_pkcs12(include_bytes!("../identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Create a thread pool.
for _ in 0..num_cpus::get_physical().max(1) {
thread::spawn(|| smol::run(future::pending::<()>()));
}
smol::block_on(async {
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("identity.pfx");
let identity = blocking!(fs::read(path))?;
let tls = TlsAcceptor::new(&identity[..], "password").await?;
let ws = listen(Async::<TcpListener>::bind("127.0.0.1:9000")?, None);
let wss = listen(Async::<TcpListener>::bind("127.0.0.1:9001")?, Some(tls));
future::try_join(ws, wss).await?;
Ok(())
})
}

View File

@ -32,10 +32,8 @@ use std::time::{Duration, Instant};
#[cfg(unix)]
use std::{
os::unix::{
io::{AsRawFd, RawFd},
net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
},
os::unix::io::{AsRawFd, RawFd},
os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
path::Path,
};
@ -161,17 +159,17 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
/// Executes all futures until the main one completes.
pub fn run<T>(future: impl Future<Output = T>) -> T {
let flag = Arc::new(
IoFlag::create()
.and_then(Async::new)
.expect("cannot create waker flag"),
);
let f = flag.clone();
let w = async_task::waker_fn(move || f.get_ref().set());
let cx = &mut Context::from_waker(&w);
EXECUTOR.processor(|proc| {
let flag = Arc::new(
IoFlag::create()
.and_then(Async::new)
.expect("cannot create waker flag"),
);
let f = flag.clone();
let w = async_task::waker_fn(move || f.get_ref().set());
let cx = &mut Context::from_waker(&w);
futures::pin_mut!(future);
loop {
@ -182,9 +180,11 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
}
while !flag.get_ref().get() {
// TODO: separate Processor and Local, call tick() on both here
if proc.tick() {
REACTOR.poll_quick().expect("failure while polling I/O");
} else {
// Block until either the reactor is locked or the flag is set.
block_on(async {
let lock = REACTOR.lock();
let ready = flag.ready();
@ -203,16 +203,15 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
}
thread_local! {
static HITS: Cell<Option<i32>> = Cell::new(None);
static BUDGET: Cell<Option<i32>> = Cell::new(None);
}
// TODO: poll_throttle_read(), poll_throttle(cost: i32), static BUDGET
/// Runs a budgeted task and returns `true` if the budget was completely used up.
fn use_budget(run: impl FnOnce()) -> bool {
HITS.with(|hits| {
hits.set(Some(200));
/// Runs a task and returns `true` if it was throttled.
fn use_throttle(run: impl FnOnce()) -> bool {
BUDGET.with(|budget| {
budget.set(Some(200));
let res = panic::catch_unwind(panic::AssertUnwindSafe(|| run()));
let is_used_up = hits.take() == Some(0);
let is_used_up = budget.take() == Some(0);
match res {
Ok(()) => is_used_up,
@ -222,31 +221,21 @@ fn use_budget(run: impl FnOnce()) -> bool {
}
#[inline]
fn poll_budget(cx: &mut Context<'_>, n: i32) -> Poll<()> {
HITS.with(|hits| match hits.get() {
fn poll_throttle(cx: &mut Context<'_>) -> Poll<()> {
BUDGET.with(|budget| match budget.get() {
None => Poll::Ready(()),
Some(h) if h == 0 => {
Some(b) if b == 0 => {
cx.waker().wake_by_ref();
Poll::Pending
}
Some(h) => {
hits.set(Some(h.saturating_sub(n)));
Some(b) => {
budget.set(Some(b - 1));
Poll::Ready(())
}
})
}
#[inline]
fn poll_budget_read(cx: &mut Context<'_>) -> Poll<()> {
poll_budget(cx, 2)
}
#[inline]
fn poll_budget_write(cx: &mut Context<'_>) -> Poll<()> {
poll_budget(cx, 1)
}
// ----- Local executor -----
// ----- Single-thread executor -----
thread_local! {
/// Holds a queue of thread-local tasks.
@ -298,7 +287,7 @@ impl Local {
match self.pop() {
None => return false,
Some(runnable) => {
use_budget(|| runnable.run());
use_throttle(|| runnable.run());
}
}
}
@ -324,7 +313,7 @@ impl Local {
}
}
// ----- Global executor -----
// ----- Work-stealing executor -----
/// Holds the global task queue.
static EXECUTOR: Lazy<Executor> = Lazy::new(|| Executor::new());
@ -435,7 +424,7 @@ impl Processor {
match self.pop() {
None => return more_local,
Some(runnable) => {
if use_budget(|| runnable.run()) {
if use_throttle(|| runnable.run()) {
self.flush_slot();
}
}
@ -613,7 +602,7 @@ pub fn iter<T: Send + 'static>(
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
futures::ready!(poll_budget_read(cx));
futures::ready!(poll_throttle(cx));
match &mut *self {
State::Idle(iter) => {
@ -657,7 +646,7 @@ pub fn reader(reader: impl Read + Send + 'static) -> impl AsyncRead + Send + Unp
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
futures::ready!(poll_budget_read(cx));
futures::ready!(poll_throttle(cx));
match &mut *self {
State::Idle(io) => {
@ -722,7 +711,7 @@ pub fn writer(writer: impl Write + Send + 'static) -> impl AsyncWrite + Send + U
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
futures::ready!(poll_budget_write(cx));
futures::ready!(poll_throttle(cx));
loop {
match &mut *self {
@ -740,7 +729,7 @@ pub fn writer(writer: impl Write + Send + 'static) -> impl AsyncWrite + Send + U
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
futures::ready!(poll_budget_write(cx));
futures::ready!(poll_throttle(cx));
loop {
match &mut *self {
@ -784,6 +773,7 @@ static INTERRUPT: Lazy<Async<IoFlag>> = Lazy::new(|| {
struct Source {
raw: sys::Raw,
index: usize,
// TODO: watchers: piper::Lock<Vec<Waker>>,
readers: piper::Lock<Vec<Waker>>,
writers: piper::Lock<Vec<Waker>>,
}
@ -953,16 +943,13 @@ pub struct Timer {
impl Timer {
/// Fires after the specified duration of time.
pub fn after(dur: Duration) -> Timer {
Timer {
when: Instant::now() + dur,
inserted: false,
}
Timer::at(Instant::now() + dur)
}
/// Fires at the specified instant in time.
pub fn at(dur: Duration) -> Timer {
pub fn at(when: Instant) -> Timer {
Timer {
when: Instant::now() + dur,
when,
inserted: false,
}
}
@ -1039,15 +1026,44 @@ pub struct Async<T> {
impl<T: AsRawFd> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
pub fn new(inner: T) -> io::Result<Async<T>> {
// Put the I/O handle into non-blocking mode.
nix::fcntl::fcntl(
inner.as_raw_fd(),
nix::fcntl::FcntlArg::F_SETFL(nix::fcntl::OFlag::O_NONBLOCK),
)
.map_err(|err| match err {
nix::Error::Sys(code) => code.into(),
err => io::Error::new(io::ErrorKind::Other, Box::new(err)),
})?;
use nix::fcntl::{fcntl, FcntlArg, OFlag};
// Put the I/O handle in non-blocking mode.
let flags = fcntl(inner.as_raw_fd(), FcntlArg::F_GETFL).map_err(io_err)?;
let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
fcntl(inner.as_raw_fd(), FcntlArg::F_SETFL(flags)).map_err(io_err)?;
// Register the I/O handle in the reactor.
Ok(Async {
source: REACTOR.register(sys::Raw::new(&inner))?,
inner: Some(Box::new(inner)),
})
}
}
#[cfg(unix)]
impl<T: AsRawFd> AsRawFd for Async<T> {
fn as_raw_fd(&self) -> RawFd {
self.source.raw.0
}
}
/// Converts a `nix::Error` into `std::io::Error`.
#[cfg(unix)]
fn io_err(err: nix::Error) -> io::Error {
match err {
nix::Error::Sys(code) => code.into(),
err => io::Error::new(io::ErrorKind::Other, Box::new(err)),
}
}
#[cfg(windows)]
impl<T: AsRawSocket> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
pub fn new(inner: T) -> io::Result<Async<T>> {
// Put the I/O handle in non-blocking mode.
let socket = unsafe { Socket::from_raw_socket(inner.as_raw_socket()) };
mem::ManuallyDrop::new(socket).set_nonblocking(true)?;
// Register the I/O handle in the reactor.
Ok(Async {
@ -1058,18 +1074,9 @@ impl<T: AsRawFd> Async<T> {
}
#[cfg(windows)]
impl<T: AsRawSocket> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
pub fn new(inner: T) -> io::Result<Async<T>> {
// Put the I/O handle into non-blocking mode.
let socket = unsafe { Socket::from_raw_socket(inner.as_raw_socket()) };
mem::ManuallyDrop::new(socket).set_nonblocking(true)?;
// Register the I/O handle in the reactor.
Ok(Async {
source: REACTOR.register(sys::Raw::new(&inner))?,
inner: Some(Box::new(inner)),
})
impl<T: AsRawSocket> AsRawSocket for Async<T> {
fn as_raw_socket(&self) -> RawSocket {
self.source.raw.0
}
}
@ -1091,16 +1098,25 @@ impl<T> Async<T> {
Ok(inner)
}
pub async fn with<R>(&self, try_op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
// TODO: use this in examples
todo!()
}
pub async fn with_mut<R>(
&mut self,
try_op: impl FnMut(&mut T) -> io::Result<R>,
) -> io::Result<R> {
// TODO: use this in examples
todo!()
}
/// Converts a non-blocking read into an async operation.
pub async fn read_with<R>(&self, try_read: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
let mut f = try_read;
let mut inner = self.inner.as_ref().unwrap();
let wakers = &self.source.readers;
future::poll_fn(|cx| {
futures::ready!(poll_budget_read(cx));
Self::poll_io(cx, |s| f(s), &mut inner, wakers)
})
.await
future::poll_fn(|cx| Self::poll_io(cx, |s| f(s), &mut inner, wakers)).await
}
/// Converts a non-blocking read into an async operation.
@ -1111,11 +1127,7 @@ impl<T> Async<T> {
let mut f = try_read;
let mut inner = self.inner.as_mut().unwrap();
let wakers = &self.source.readers;
future::poll_fn(|cx| {
futures::ready!(poll_budget_read(cx));
Self::poll_io(cx, |s| f(s), &mut inner, wakers)
})
.await
future::poll_fn(|cx| Self::poll_io(cx, |s| f(s), &mut inner, wakers)).await
}
/// Converts a non-blocking write into an async operation.
@ -1123,11 +1135,7 @@ impl<T> Async<T> {
let mut f = try_write;
let mut inner = self.inner.as_ref().unwrap();
let wakers = &self.source.writers;
future::poll_fn(|cx| {
futures::ready!(poll_budget_write(cx));
Self::poll_io(cx, |s| f(s), &mut inner, wakers)
})
.await
future::poll_fn(|cx| Self::poll_io(cx, |s| f(s), &mut inner, wakers)).await
}
/// Converts a non-blocking write into an async operation.
@ -1138,11 +1146,7 @@ impl<T> Async<T> {
let mut f = try_write;
let mut inner = self.inner.as_mut().unwrap();
let wakers = &self.source.writers;
future::poll_fn(|cx| {
futures::ready!(poll_budget_write(cx));
Self::poll_io(cx, |s| f(s), &mut inner, wakers)
})
.await
future::poll_fn(|cx| Self::poll_io(cx, |s| f(s), &mut inner, wakers)).await
}
fn poll_io<I, R>(
@ -1151,6 +1155,8 @@ impl<T> Async<T> {
inner: &mut I,
wakers: &piper::Lock<Vec<Waker>>,
) -> Poll<io::Result<R>> {
futures::ready!(poll_throttle(cx));
// Attempt the non-blocking operation.
match f(inner) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
@ -1456,8 +1462,10 @@ impl IoFlag {
/// Creates an I/O flag.
fn create() -> io::Result<IoFlag> {
// The only portable way of manually triggering I/O events is to create a socket and
// send/receive dummy data on it. See the following links for more information.
// send/receive dummy data on it. This pattern is also known as "the self-pipe trick".
// See the links below for more information.
//
// https://cr.yp.to/docs/selfpipe.html
// https://github.com/python-trio/trio/blob/master/trio/_core/_wakeup_socketpair.py
// https://stackoverflow.com/questions/24933411/how-to-emulate-socket-socketpair-on-windows
// https://gist.github.com/geertj/4325783
@ -1581,7 +1589,7 @@ mod sys {
};
#[derive(Clone, Copy, Debug)]
pub struct Raw(RawFd);
pub struct Raw(pub RawFd);
impl Raw {
pub fn new(s: &impl AsRawFd) -> Raw {
Raw(s.as_raw_fd())
@ -1592,12 +1600,12 @@ mod sys {
impl Reactor {
pub fn create() -> io::Result<Reactor> {
Ok(Reactor(
epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC).map_err(io_err)?,
epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC).map_err(super::io_err)?,
))
}
pub fn register(&self, raw: Raw, index: usize) -> io::Result<()> {
let ev = &mut EpollEvent::new(flags(), index as u64);
epoll_ctl(self.0, EpollOp::EpollCtlAdd, raw.0, Some(ev)).map_err(io_err)
epoll_ctl(self.0, EpollOp::EpollCtlAdd, raw.0, Some(ev)).map_err(super::io_err)
}
pub fn reregister(&self, _raw: Raw, _index: usize) -> io::Result<()> {
Ok(())
@ -1612,7 +1620,7 @@ mod sys {
events.len = match epoll_wait(self.0, &mut events.list, timeout_ms) {
Ok(len) => len,
Err(nix::Error::Sys(Errno::EINTR)) => 0,
Err(err) => return Err(io_err(err)),
Err(err) => return Err(super::io_err(err)),
};
Ok(events.len)
}
@ -1620,9 +1628,6 @@ mod sys {
fn flags() -> EpollFlags {
EpollFlags::EPOLLET | EpollFlags::EPOLLIN | EpollFlags::EPOLLOUT | EpollFlags::EPOLLRDHUP
}
fn io_err(err: impl std::error::Error + Send + Sync + 'static) -> io::Error {
io::Error::new(io::ErrorKind::Other, Box::new(err))
}
pub struct Events {
list: Box<[EpollEvent]>,
@ -1672,7 +1677,7 @@ mod sys {
use nix::sys::event::{kevent_ts, kqueue, EventFilter, EventFlag, FilterFlag, KEvent};
#[derive(Clone, Copy, Debug)]
pub struct Raw(RawFd);
pub struct Raw(pub RawFd);
impl Raw {
pub fn new(s: &impl AsRawFd) -> Raw {
Raw(s.as_raw_fd())
@ -1682,8 +1687,8 @@ mod sys {
pub struct Reactor(RawFd);
impl Reactor {
pub fn create() -> io::Result<Reactor> {
let fd = kqueue().map_err(io_err)?;
fcntl(fd, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)).map_err(io_err)?;
let fd = kqueue().map_err(super::io_err)?;
fcntl(fd, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)).map_err(super::io_err)?;
Ok(Reactor(fd))
}
pub fn register(&self, raw: Raw, index: usize) -> io::Result<()> {
@ -1698,7 +1703,7 @@ mod sys {
let mut eventlist = changelist.clone();
match kevent_ts(self.0, &changelist, &mut eventlist, None) {
Ok(_) | Err(nix::Error::Sys(Errno::EINTR)) => {}
Err(err) => return Err(io_err(err)),
Err(err) => return Err(super::io_err(err)),
}
for ev in &eventlist {
// See https://github.com/tokio-rs/mio/issues/582
@ -1725,7 +1730,7 @@ mod sys {
let mut eventlist = changelist.clone();
match kevent_ts(self.0, &changelist, &mut eventlist, None) {
Ok(_) | Err(nix::Error::Sys(Errno::EINTR)) => {}
Err(err) => return Err(io_err(err)),
Err(err) => return Err(super::io_err(err)),
}
for ev in &eventlist {
if ev.data() != 0 && ev.flags().contains(EventFlag::EV_ERROR) {
@ -1743,14 +1748,11 @@ mod sys {
events.len = match kevent_ts(self.0, &[], &mut events.list, timeout) {
Ok(n) => n,
Err(nix::Error::Sys(Errno::EINTR)) => 0,
Err(err) => return Err(io_err(err)),
Err(err) => return Err(super::io_err(err)),
};
Ok(events.len)
}
}
fn io_err(err: impl std::error::Error + Send + Sync + 'static) -> io::Error {
io::Error::new(io::ErrorKind::Other, Box::new(err))
}
pub struct Events {
list: Box<[KEvent]>,
@ -1792,7 +1794,7 @@ mod sys {
use wepoll_binding::{Epoll, EventFlag};
#[derive(Clone, Copy, Debug)]
pub struct Raw(RawSocket);
pub struct Raw(pub RawSocket);
impl Raw {
pub fn new(s: &impl AsRawSocket) -> Raw {
Raw(s.as_raw_socket())