1655 lines
53 KiB
Rust
1655 lines
53 KiB
Rust
//! Async I/O and timers.
|
|
//!
|
|
//! This crate provides two tools:
|
|
//!
|
|
//! * [`Async`], an adapter for standard networking types (and [many other] types) to use in
|
|
//! async programs.
|
|
//! * [`Timer`], a future that expires at a point in time.
|
|
//!
|
|
//! For concrete async networking types built on top of this crate, see [`async-net`].
|
|
//!
|
|
//! [many other]: https://github.com/stjepang/async-io/tree/master/examples
|
|
//! [`async-net`]: https://docs.rs/async-net
|
|
//!
|
|
//! # Implementation
|
|
//!
|
|
//! The first time [`Async`] or [`Timer`] is used, a thread named "async-io" will be spawned.
|
|
//! The purpose of this thread is to wait for I/O events reported by the operating system, and then
|
|
//! wake appropriate futures blocked on I/O or timers when they can be resumed.
|
|
//!
|
|
//! To wait for the next I/O event, the "async-io" thread uses [epoll] on Linux/Android/illumos,
|
|
//! [kqueue] on macOS/iOS/BSD, [event ports] on illumos/Solaris, and [wepoll] on Windows. That
|
|
//! functionality is provided by the [`polling`] crate.
|
|
//!
|
|
//! However, note that you can also process I/O events and wake futures on any thread using the
|
|
//! [`block_on()`] function. The "async-io" thread is therefore just a fallback mechanism
|
|
//! processing I/O events in case no other threads are.
|
|
//!
|
|
//! [epoll]: https://en.wikipedia.org/wiki/Epoll
|
|
//! [kqueue]: https://en.wikipedia.org/wiki/Kqueue
|
|
//! [event ports]: https://illumos.org/man/port_create
|
|
//! [wepoll]: https://github.com/piscisaureus/wepoll
|
|
//! [`polling`]: https://docs.rs/polling
|
|
//!
|
|
//! # Examples
|
|
//!
|
|
//! Connect to `example.com:80`, or time out after 10 seconds.
|
|
//!
|
|
//! ```
|
|
//! use async_io::{Async, Timer};
|
|
//! use futures_lite::{future::FutureExt, io};
|
|
//!
|
|
//! use std::net::{TcpStream, ToSocketAddrs};
|
|
//! use std::time::Duration;
|
|
//!
|
|
//! # futures_lite::future::block_on(async {
|
|
//! let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
|
|
//!
|
|
//! let stream = Async::<TcpStream>::connect(addr).or(async {
|
|
//! Timer::after(Duration::from_secs(10)).await;
|
|
//! Err(io::ErrorKind::TimedOut.into())
|
|
//! })
|
|
//! .await?;
|
|
//! # std::io::Result::Ok(()) });
|
|
//! ```
|
|
|
|
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
|
|
|
|
use std::convert::TryFrom;
|
|
use std::future::Future;
|
|
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
|
|
use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket};
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
use std::task::{Context, Poll, Waker};
|
|
use std::time::{Duration, Instant};
|
|
|
|
#[cfg(unix)]
|
|
use std::{
|
|
os::unix::io::{AsRawFd, RawFd},
|
|
os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
|
|
path::Path,
|
|
};
|
|
|
|
#[cfg(windows)]
|
|
use std::os::windows::io::{AsRawSocket, RawSocket};
|
|
|
|
use futures_lite::io::{AsyncRead, AsyncWrite};
|
|
use futures_lite::stream::{self, Stream};
|
|
use futures_lite::{future, pin, ready};
|
|
|
|
use crate::reactor::{Reactor, Source};
|
|
|
|
mod driver;
|
|
mod reactor;
|
|
|
|
pub use driver::block_on;
|
|
|
|
/// Use Duration::MAX once duration_constants are stabilized.
|
|
fn duration_max() -> Duration {
|
|
Duration::new(u64::MAX, 1_000_000_000 - 1)
|
|
}
|
|
|
|
/// A future that expires at a point in time.
|
|
///
|
|
/// Timers are futures that output the [`Instant`] at which they fired.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// Sleep for 1 second:
|
|
///
|
|
/// ```
|
|
/// use async_io::Timer;
|
|
/// use std::time::Duration;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// Timer::after(Duration::from_secs(1)).await;
|
|
/// # });
|
|
/// ```
|
|
///
|
|
/// Timeout after 1 second:
|
|
///
|
|
/// ```
|
|
/// use async_io::Timer;
|
|
/// use futures_lite::FutureExt;
|
|
/// use std::time::Duration;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let addrs = async_net::resolve("google.com:80")
|
|
/// .or(async {
|
|
/// Timer::after(Duration::from_secs(10)).await;
|
|
/// Err(std::io::ErrorKind::TimedOut.into())
|
|
/// })
|
|
/// .await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
#[derive(Debug)]
|
|
pub struct Timer {
|
|
/// This timer's ID and last waker that polled it.
|
|
///
|
|
/// When this field is set to `None`, this timer is not registered in the reactor.
|
|
id_and_waker: Option<(usize, Waker)>,
|
|
|
|
/// When this timer fires.
|
|
when: Instant,
|
|
|
|
/// The period.
|
|
period: Duration,
|
|
}
|
|
|
|
impl Timer {
|
|
/// Creates a timer that expires after the given duration of time.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Timer;
|
|
/// use std::time::Duration;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// Timer::after(Duration::from_secs(1)).await;
|
|
/// # });
|
|
/// ```
|
|
pub fn after(duration: Duration) -> Timer {
|
|
Timer::at(Instant::now() + duration)
|
|
}
|
|
|
|
/// Creates a timer that expires at the given time instant.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Timer;
|
|
/// use std::time::{Duration, Instant};
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let now = Instant::now();
|
|
/// let when = now + Duration::from_secs(1);
|
|
/// Timer::at(when).await;
|
|
/// # });
|
|
/// ```
|
|
pub fn at(instant: Instant) -> Timer {
|
|
// Use Duration::MAX once duration_constants are stabilized.
|
|
Timer::interval_at(instant, duration_max())
|
|
}
|
|
|
|
/// Sets the timer to expire after the new duration of time.
|
|
///
|
|
/// Note that resetting a timer is different from creating a new timer because
|
|
/// [`set_after()`][`Timer::set_after()`] does not remove the waker associated with the task
|
|
/// that is polling the timer.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Timer;
|
|
/// use std::time::Duration;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let mut t = Timer::after(Duration::from_secs(1));
|
|
/// t.set_after(Duration::from_millis(100));
|
|
/// # });
|
|
/// ```
|
|
pub fn set_after(&mut self, duration: Duration) {
|
|
self.set_at(Instant::now() + duration);
|
|
}
|
|
|
|
/// Sets the timer to expire at the new time instant.
|
|
///
|
|
/// Note that resetting a timer is different from creating a new timer because
|
|
/// [`set_after()`][`Timer::set_after()`] does not remove the waker associated with the task
|
|
/// that is polling the timer.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Timer;
|
|
/// use std::time::{Duration, Instant};
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let mut t = Timer::after(Duration::from_secs(1));
|
|
///
|
|
/// let now = Instant::now();
|
|
/// let when = now + Duration::from_secs(1);
|
|
/// t.set_at(when);
|
|
/// # });
|
|
/// ```
|
|
pub fn set_at(&mut self, instant: Instant) {
|
|
if let Some((id, _)) = self.id_and_waker.as_ref() {
|
|
// Deregister the timer from the reactor.
|
|
Reactor::get().remove_timer(self.when, *id);
|
|
}
|
|
|
|
// Update the timeout.
|
|
self.when = instant;
|
|
|
|
if let Some((id, waker)) = self.id_and_waker.as_mut() {
|
|
// Re-register the timer with the new timeout.
|
|
*id = Reactor::get().insert_timer(self.when, waker);
|
|
}
|
|
}
|
|
|
|
/// Creates a timer that ticks every period.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Timer;
|
|
/// use futures_lite::StreamExt;
|
|
/// use std::time::{Duration, Instant};
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let period = Duration::from_secs(1);
|
|
/// Timer::interval(period).next().await;
|
|
/// # });
|
|
/// ```
|
|
pub fn interval(period: Duration) -> Timer {
|
|
Timer::interval_at(Instant::now() + period, period)
|
|
}
|
|
|
|
/// Creates a timer that ticks every period, starting at `start`.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Timer;
|
|
/// use futures_lite::StreamExt;
|
|
/// use std::time::{Duration, Instant};
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let now = Instant::now();
|
|
/// let period = Duration::from_secs(1);
|
|
/// Timer::interval_at(now, period).next().await;
|
|
/// # });
|
|
/// ```
|
|
pub fn interval_at(start: Instant, period: Duration) -> Timer {
|
|
Timer {
|
|
id_and_waker: None,
|
|
when: start,
|
|
period: period,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for Timer {
|
|
fn drop(&mut self) {
|
|
if let Some((id, _)) = self.id_and_waker.take() {
|
|
// Deregister the timer from the reactor.
|
|
Reactor::get().remove_timer(self.when, id);
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Future for Timer {
|
|
type Output = Instant;
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
match self.poll_next(cx) {
|
|
Poll::Ready(Some(when)) => Poll::Ready(when),
|
|
Poll::Pending => Poll::Pending,
|
|
Poll::Ready(None) => unreachable!(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Stream for Timer {
|
|
type Item = Instant;
|
|
|
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
// Check if the timer has already fired.
|
|
if Instant::now() >= self.when {
|
|
if let Some((id, _)) = self.id_and_waker.take() {
|
|
// Deregister the timer from the reactor.
|
|
Reactor::get().remove_timer(self.when, id);
|
|
}
|
|
let when = self.when;
|
|
if let Some(next) = when.checked_add(self.period) {
|
|
self.when = next;
|
|
// Register the timer in the reactor.
|
|
let id = Reactor::get().insert_timer(self.when, cx.waker());
|
|
self.id_and_waker = Some((id, cx.waker().clone()));
|
|
}
|
|
return Poll::Ready(Some(when));
|
|
} else {
|
|
match &self.id_and_waker {
|
|
None => {
|
|
// Register the timer in the reactor.
|
|
let id = Reactor::get().insert_timer(self.when, cx.waker());
|
|
self.id_and_waker = Some((id, cx.waker().clone()));
|
|
}
|
|
Some((id, w)) if !w.will_wake(cx.waker()) => {
|
|
// Deregister the timer from the reactor to remove the old waker.
|
|
Reactor::get().remove_timer(self.when, *id);
|
|
|
|
// Register the timer in the reactor with the new waker.
|
|
let id = Reactor::get().insert_timer(self.when, cx.waker());
|
|
self.id_and_waker = Some((id, cx.waker().clone()));
|
|
}
|
|
Some(_) => {}
|
|
}
|
|
}
|
|
Poll::Pending
|
|
}
|
|
}
|
|
|
|
/// Async adapter for I/O types.
|
|
///
|
|
/// This type puts an I/O handle into non-blocking mode, registers it in
|
|
/// [epoll]/[kqueue]/[event ports]/[wepoll], and then provides an async interface for it.
|
|
///
|
|
/// [epoll]: https://en.wikipedia.org/wiki/Epoll
|
|
/// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
|
|
/// [event ports]: https://illumos.org/man/port_create
|
|
/// [wepoll]: https://github.com/piscisaureus/wepoll
|
|
///
|
|
/// # Caveats
|
|
///
|
|
/// [`Async`] is a low-level primitive, and as such it comes with some caveats.
|
|
///
|
|
/// For higher-level primitives built on top of [`Async`], look into [`async-net`] or
|
|
/// [`async-process`] (on Unix).
|
|
///
|
|
/// [`async-net`]: https://github.com/stjepang/async-net
|
|
/// [`async-process`]: https://github.com/stjepang/async-process
|
|
///
|
|
/// ### Supported types
|
|
///
|
|
/// [`Async`] supports all networking types, as well as some OS-specific file descriptors like
|
|
/// [timerfd] and [inotify].
|
|
///
|
|
/// However, do not use [`Async`] with types like [`File`][`std::fs::File`],
|
|
/// [`Stdin`][`std::io::Stdin`], [`Stdout`][`std::io::Stdout`], or [`Stderr`][`std::io::Stderr`]
|
|
/// because all operating systems have issues with them when put in non-blocking mode.
|
|
///
|
|
/// [timerfd]: https://github.com/stjepang/async-io/blob/master/examples/linux-timerfd.rs
|
|
/// [inotify]: https://github.com/stjepang/async-io/blob/master/examples/linux-inotify.rs
|
|
///
|
|
/// ### Concurrent I/O
|
|
///
|
|
/// Note that [`&Async<T>`][`Async`] implements [`AsyncRead`] and [`AsyncWrite`] if `&T`
|
|
/// implements those traits, which means tasks can concurrently read and write using shared
|
|
/// references.
|
|
///
|
|
/// But there is a catch: only one task can read a time, and only one task can write at a time. It
|
|
/// is okay to have two tasks where one is reading and the other is writing at the same time, but
|
|
/// it is not okay to have two tasks reading at the same time or writing at the same time. If you
|
|
/// try to do that, conflicting tasks will just keep waking each other in turn, thus wasting CPU
|
|
/// time.
|
|
///
|
|
/// Besides [`AsyncRead`] and [`AsyncWrite`], this caveat also applies to
|
|
/// [`poll_readable()`][`Async::poll_readable()`] and
|
|
/// [`poll_writable()`][`Async::poll_writable()`].
|
|
///
|
|
/// However, any number of tasks can be concurrently calling other methods like
|
|
/// [`readable()`][`Async::readable()`] or [`read_with()`][`Async::read_with()`].
|
|
///
|
|
/// ### Closing
|
|
///
|
|
/// Closing the write side of [`Async`] with [`close()`][`futures_lite::AsyncWriteExt::close()`]
|
|
/// simply flushes. If you want to shutdown a TCP or Unix socket, use
|
|
/// [`Shutdown`][`std::net::Shutdown`].
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// Connect to a server and echo incoming messages back to the server:
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use futures_lite::io;
|
|
/// use std::net::TcpStream;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// // Connect to a local server.
|
|
/// let stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 8000)).await?;
|
|
///
|
|
/// // Echo all messages from the read side of the stream into the write side.
|
|
/// io::copy(&stream, &stream).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
///
|
|
/// You can use either predefined async methods or wrap blocking I/O operations in
|
|
/// [`Async::read_with()`], [`Async::read_with_mut()`], [`Async::write_with()`], and
|
|
/// [`Async::write_with_mut()`]:
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::net::TcpListener;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
|
|
///
|
|
/// // These two lines are equivalent:
|
|
/// let (stream, addr) = listener.accept().await?;
|
|
/// let (stream, addr) = listener.read_with(|inner| inner.accept()).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
#[derive(Debug)]
|
|
pub struct Async<T> {
|
|
/// A source registered in the reactor.
|
|
source: Arc<Source>,
|
|
|
|
/// The inner I/O handle.
|
|
io: Option<T>,
|
|
}
|
|
|
|
impl<T> Unpin for Async<T> {}
|
|
|
|
#[cfg(unix)]
|
|
impl<T: AsRawFd> Async<T> {
|
|
/// Creates an async I/O handle.
|
|
///
|
|
/// This method will put the handle in non-blocking mode and register it in
|
|
/// [epoll]/[kqueue]/[event ports]/[wepoll].
|
|
///
|
|
/// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement
|
|
/// `AsRawSocket`.
|
|
///
|
|
/// [epoll]: https://en.wikipedia.org/wiki/Epoll
|
|
/// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
|
|
/// [event ports]: https://illumos.org/man/port_create
|
|
/// [wepoll]: https://github.com/piscisaureus/wepoll
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Async;
|
|
/// use std::net::{SocketAddr, TcpListener};
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
|
|
/// let listener = Async::new(listener)?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub fn new(io: T) -> io::Result<Async<T>> {
|
|
let fd = io.as_raw_fd();
|
|
|
|
// Put the file descriptor in non-blocking mode.
|
|
unsafe {
|
|
let mut res = libc::fcntl(fd, libc::F_GETFL);
|
|
if res != -1 {
|
|
res = libc::fcntl(fd, libc::F_SETFL, res | libc::O_NONBLOCK);
|
|
}
|
|
if res == -1 {
|
|
return Err(io::Error::last_os_error());
|
|
}
|
|
}
|
|
|
|
Ok(Async {
|
|
source: Reactor::get().insert_io(fd)?,
|
|
io: Some(io),
|
|
})
|
|
}
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
impl<T: AsRawFd> AsRawFd for Async<T> {
|
|
fn as_raw_fd(&self) -> RawFd {
|
|
self.source.raw
|
|
}
|
|
}
|
|
|
|
#[cfg(windows)]
|
|
impl<T: AsRawSocket> Async<T> {
|
|
/// Creates an async I/O handle.
|
|
///
|
|
/// This method will put the handle in non-blocking mode and register it in
|
|
/// [epoll]/[kqueue]/[event ports]/[wepoll].
|
|
///
|
|
/// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement
|
|
/// `AsRawSocket`.
|
|
///
|
|
/// [epoll]: https://en.wikipedia.org/wiki/Epoll
|
|
/// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
|
|
/// [event ports]: https://illumos.org/man/port_create
|
|
/// [wepoll]: https://github.com/piscisaureus/wepoll
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Async;
|
|
/// use std::net::{SocketAddr, TcpListener};
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
|
|
/// let listener = Async::new(listener)?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub fn new(io: T) -> io::Result<Async<T>> {
|
|
let sock = io.as_raw_socket();
|
|
|
|
// Put the socket in non-blocking mode.
|
|
unsafe {
|
|
use winapi::ctypes;
|
|
use winapi::um::winsock2;
|
|
|
|
let mut nonblocking = true as ctypes::c_ulong;
|
|
let res = winsock2::ioctlsocket(
|
|
sock as winsock2::SOCKET,
|
|
winsock2::FIONBIO,
|
|
&mut nonblocking,
|
|
);
|
|
if res != 0 {
|
|
return Err(io::Error::last_os_error());
|
|
}
|
|
}
|
|
|
|
Ok(Async {
|
|
source: Reactor::get().insert_io(sock)?,
|
|
io: Some(io),
|
|
})
|
|
}
|
|
}
|
|
|
|
#[cfg(windows)]
|
|
impl<T: AsRawSocket> AsRawSocket for Async<T> {
|
|
fn as_raw_socket(&self) -> RawSocket {
|
|
self.source.raw
|
|
}
|
|
}
|
|
|
|
impl<T> Async<T> {
|
|
/// Gets a reference to the inner I/O handle.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Async;
|
|
/// use std::net::TcpListener;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
|
|
/// let inner = listener.get_ref();
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub fn get_ref(&self) -> &T {
|
|
self.io.as_ref().unwrap()
|
|
}
|
|
|
|
/// Gets a mutable reference to the inner I/O handle.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Async;
|
|
/// use std::net::TcpListener;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
|
|
/// let inner = listener.get_mut();
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub fn get_mut(&mut self) -> &mut T {
|
|
self.io.as_mut().unwrap()
|
|
}
|
|
|
|
/// Unwraps the inner I/O handle.
|
|
///
|
|
/// This method will **not** put the I/O handle back into blocking mode.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Async;
|
|
/// use std::net::TcpListener;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
|
|
/// let inner = listener.into_inner()?;
|
|
///
|
|
/// // Put the listener back into blocking mode.
|
|
/// inner.set_nonblocking(false)?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub fn into_inner(mut self) -> io::Result<T> {
|
|
let io = self.io.take().unwrap();
|
|
Reactor::get().remove_io(&self.source)?;
|
|
Ok(io)
|
|
}
|
|
|
|
/// Waits until the I/O handle is readable.
|
|
///
|
|
/// This method completes when a read operation on this I/O handle wouldn't block.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::net::TcpListener;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
|
|
///
|
|
/// // Wait until a client can be accepted.
|
|
/// listener.readable().await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn readable(&self) -> io::Result<()> {
|
|
self.source.readable().await
|
|
}
|
|
|
|
/// Waits until the I/O handle is writable.
|
|
///
|
|
/// This method completes when a write operation on this I/O handle wouldn't block.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Async;
|
|
/// use std::net::{TcpStream, ToSocketAddrs};
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
|
|
/// let stream = Async::<TcpStream>::connect(addr).await?;
|
|
///
|
|
/// // Wait until the stream is writable.
|
|
/// stream.writable().await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn writable(&self) -> io::Result<()> {
|
|
self.source.writable().await
|
|
}
|
|
|
|
/// Polls the I/O handle for readability.
|
|
///
|
|
/// When this method returns [`Poll::Ready`], that means the OS has delivered an event
|
|
/// indicating readability since the last time this task has called the method and received
|
|
/// [`Poll::Pending`].
|
|
///
|
|
/// # Caveats
|
|
///
|
|
/// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
|
|
/// will just keep waking each other in turn, thus wasting CPU time.
|
|
///
|
|
/// Note that the [`AsyncRead`] implementation for [`Async`] also uses this method.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use futures_lite::future;
|
|
/// use std::net::TcpListener;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
|
|
///
|
|
/// // Wait until a client can be accepted.
|
|
/// future::poll_fn(|cx| listener.poll_readable(cx)).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
|
self.source.poll_readable(cx)
|
|
}
|
|
|
|
/// Polls the I/O handle for writability.
|
|
///
|
|
/// When this method returns [`Poll::Ready`], that means the OS has delivered an event
|
|
/// indicating writability since the last time this task has called the method and received
|
|
/// [`Poll::Pending`].
|
|
///
|
|
/// # Caveats
|
|
///
|
|
/// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
|
|
/// will just keep waking each other in turn, thus wasting CPU time.
|
|
///
|
|
/// Note that the [`AsyncWrite`] implementation for [`Async`] also uses this method.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Async;
|
|
/// use futures_lite::future;
|
|
/// use std::net::{TcpStream, ToSocketAddrs};
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
|
|
/// let stream = Async::<TcpStream>::connect(addr).await?;
|
|
///
|
|
/// // Wait until the stream is writable.
|
|
/// future::poll_fn(|cx| stream.poll_writable(cx)).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
|
self.source.poll_writable(cx)
|
|
}
|
|
|
|
/// Performs a read operation asynchronously.
|
|
///
|
|
/// The I/O handle is registered in the reactor and put in non-blocking mode. This method
|
|
/// invokes the `op` closure in a loop until it succeeds or returns an error other than
|
|
/// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
|
|
/// sends a notification that the I/O handle is readable.
|
|
///
|
|
/// The closure receives a shared reference to the I/O handle.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::net::TcpListener;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
|
|
///
|
|
/// // Accept a new client asynchronously.
|
|
/// let (stream, addr) = listener.read_with(|l| l.accept()).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn read_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
|
|
let mut op = op;
|
|
loop {
|
|
match op(self.get_ref()) {
|
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
res => return res,
|
|
}
|
|
optimistic(self.readable()).await?;
|
|
}
|
|
}
|
|
|
|
/// Performs a read operation asynchronously.
|
|
///
|
|
/// The I/O handle is registered in the reactor and put in non-blocking mode. This method
|
|
/// invokes the `op` closure in a loop until it succeeds or returns an error other than
|
|
/// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
|
|
/// sends a notification that the I/O handle is readable.
|
|
///
|
|
/// The closure receives a mutable reference to the I/O handle.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::net::TcpListener;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
|
|
///
|
|
/// // Accept a new client asynchronously.
|
|
/// let (stream, addr) = listener.read_with_mut(|l| l.accept()).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn read_with_mut<R>(
|
|
&mut self,
|
|
op: impl FnMut(&mut T) -> io::Result<R>,
|
|
) -> io::Result<R> {
|
|
let mut op = op;
|
|
loop {
|
|
match op(self.get_mut()) {
|
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
res => return res,
|
|
}
|
|
optimistic(self.readable()).await?;
|
|
}
|
|
}
|
|
|
|
/// Performs a write operation asynchronously.
|
|
///
|
|
/// The I/O handle is registered in the reactor and put in non-blocking mode. This method
|
|
/// invokes the `op` closure in a loop until it succeeds or returns an error other than
|
|
/// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
|
|
/// sends a notification that the I/O handle is writable.
|
|
///
|
|
/// The closure receives a shared reference to the I/O handle.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::net::UdpSocket;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
|
|
/// socket.get_ref().connect("127.0.0.1:9000")?;
|
|
///
|
|
/// let msg = b"hello";
|
|
/// let len = socket.write_with(|s| s.send(msg)).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn write_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
|
|
let mut op = op;
|
|
loop {
|
|
match op(self.get_ref()) {
|
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
res => return res,
|
|
}
|
|
optimistic(self.writable()).await?;
|
|
}
|
|
}
|
|
|
|
/// Performs a write operation asynchronously.
|
|
///
|
|
/// The I/O handle is registered in the reactor and put in non-blocking mode. This method
|
|
/// invokes the `op` closure in a loop until it succeeds or returns an error other than
|
|
/// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
|
|
/// sends a notification that the I/O handle is writable.
|
|
///
|
|
/// The closure receives a mutable reference to the I/O handle.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::net::UdpSocket;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let mut socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
|
|
/// socket.get_ref().connect("127.0.0.1:9000")?;
|
|
///
|
|
/// let msg = b"hello";
|
|
/// let len = socket.write_with_mut(|s| s.send(msg)).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn write_with_mut<R>(
|
|
&mut self,
|
|
op: impl FnMut(&mut T) -> io::Result<R>,
|
|
) -> io::Result<R> {
|
|
let mut op = op;
|
|
loop {
|
|
match op(self.get_mut()) {
|
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
res => return res,
|
|
}
|
|
optimistic(self.writable()).await?;
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> Drop for Async<T> {
|
|
fn drop(&mut self) {
|
|
if self.io.is_some() {
|
|
// Deregister and ignore errors because destructors should not panic.
|
|
Reactor::get().remove_io(&self.source).ok();
|
|
|
|
// Drop the I/O handle to close it.
|
|
self.io.take();
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T: Read> AsyncRead for Async<T> {
|
|
fn poll_read(
|
|
mut self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
buf: &mut [u8],
|
|
) -> Poll<io::Result<usize>> {
|
|
loop {
|
|
match (&mut *self).get_mut().read(buf) {
|
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
res => return Poll::Ready(res),
|
|
}
|
|
ready!(self.poll_readable(cx))?;
|
|
}
|
|
}
|
|
|
|
fn poll_read_vectored(
|
|
mut self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
bufs: &mut [IoSliceMut<'_>],
|
|
) -> Poll<io::Result<usize>> {
|
|
loop {
|
|
match (&mut *self).get_mut().read_vectored(bufs) {
|
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
res => return Poll::Ready(res),
|
|
}
|
|
ready!(self.poll_readable(cx))?;
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> AsyncRead for &Async<T>
|
|
where
|
|
for<'a> &'a T: Read,
|
|
{
|
|
fn poll_read(
|
|
self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
buf: &mut [u8],
|
|
) -> Poll<io::Result<usize>> {
|
|
loop {
|
|
match (&*self).get_ref().read(buf) {
|
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
res => return Poll::Ready(res),
|
|
}
|
|
ready!(self.poll_readable(cx))?;
|
|
}
|
|
}
|
|
|
|
fn poll_read_vectored(
|
|
self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
bufs: &mut [IoSliceMut<'_>],
|
|
) -> Poll<io::Result<usize>> {
|
|
loop {
|
|
match (&*self).get_ref().read_vectored(bufs) {
|
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
res => return Poll::Ready(res),
|
|
}
|
|
ready!(self.poll_readable(cx))?;
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T: Write> AsyncWrite for Async<T> {
|
|
fn poll_write(
|
|
mut self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
buf: &[u8],
|
|
) -> Poll<io::Result<usize>> {
|
|
loop {
|
|
match (&mut *self).get_mut().write(buf) {
|
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
res => return Poll::Ready(res),
|
|
}
|
|
ready!(self.poll_writable(cx))?;
|
|
}
|
|
}
|
|
|
|
fn poll_write_vectored(
|
|
mut self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
bufs: &[IoSlice<'_>],
|
|
) -> Poll<io::Result<usize>> {
|
|
loop {
|
|
match (&mut *self).get_mut().write_vectored(bufs) {
|
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
res => return Poll::Ready(res),
|
|
}
|
|
ready!(self.poll_writable(cx))?;
|
|
}
|
|
}
|
|
|
|
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
|
loop {
|
|
match (&mut *self).get_mut().flush() {
|
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
res => return Poll::Ready(res),
|
|
}
|
|
ready!(self.poll_writable(cx))?;
|
|
}
|
|
}
|
|
|
|
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
|
self.poll_flush(cx)
|
|
}
|
|
}
|
|
|
|
impl<T> AsyncWrite for &Async<T>
|
|
where
|
|
for<'a> &'a T: Write,
|
|
{
|
|
fn poll_write(
|
|
self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
buf: &[u8],
|
|
) -> Poll<io::Result<usize>> {
|
|
loop {
|
|
match (&*self).get_ref().write(buf) {
|
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
res => return Poll::Ready(res),
|
|
}
|
|
ready!(self.poll_writable(cx))?;
|
|
}
|
|
}
|
|
|
|
fn poll_write_vectored(
|
|
self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
bufs: &[IoSlice<'_>],
|
|
) -> Poll<io::Result<usize>> {
|
|
loop {
|
|
match (&*self).get_ref().write_vectored(bufs) {
|
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
res => return Poll::Ready(res),
|
|
}
|
|
ready!(self.poll_writable(cx))?;
|
|
}
|
|
}
|
|
|
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
|
loop {
|
|
match (&*self).get_ref().flush() {
|
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
res => return Poll::Ready(res),
|
|
}
|
|
ready!(self.poll_writable(cx))?;
|
|
}
|
|
}
|
|
|
|
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
|
self.poll_flush(cx)
|
|
}
|
|
}
|
|
|
|
impl Async<TcpListener> {
|
|
/// Creates a TCP listener bound to the specified address.
|
|
///
|
|
/// Binding with port number 0 will request an available port from the OS.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Async;
|
|
/// use std::net::TcpListener;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
|
|
/// println!("Listening on {}", listener.get_ref().local_addr()?);
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpListener>> {
|
|
let addr = addr.into();
|
|
Ok(Async::new(TcpListener::bind(addr)?)?)
|
|
}
|
|
|
|
/// Accepts a new incoming TCP connection.
|
|
///
|
|
/// When a connection is established, it will be returned as a TCP stream together with its
|
|
/// remote address.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::net::TcpListener;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
|
|
/// let (stream, addr) = listener.accept().await?;
|
|
/// println!("Accepted client: {}", addr);
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> {
|
|
let (stream, addr) = self.read_with(|io| io.accept()).await?;
|
|
Ok((Async::new(stream)?, addr))
|
|
}
|
|
|
|
/// Returns a stream of incoming TCP connections.
|
|
///
|
|
/// The stream is infinite, i.e. it never stops with a [`None`].
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use futures_lite::{pin, stream::StreamExt};
|
|
/// use std::net::TcpListener;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
|
|
/// let incoming = listener.incoming();
|
|
/// pin!(incoming);
|
|
///
|
|
/// while let Some(stream) = incoming.next().await {
|
|
/// let stream = stream?;
|
|
/// println!("Accepted client: {}", stream.get_ref().peer_addr()?);
|
|
/// }
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Send + '_ {
|
|
stream::unfold(self, |listener| async move {
|
|
let res = listener.accept().await.map(|(stream, _)| stream);
|
|
Some((res, listener))
|
|
})
|
|
}
|
|
}
|
|
|
|
impl TryFrom<std::net::TcpListener> for Async<std::net::TcpListener> {
|
|
type Error = io::Error;
|
|
|
|
fn try_from(listener: std::net::TcpListener) -> io::Result<Self> {
|
|
Async::new(listener)
|
|
}
|
|
}
|
|
|
|
impl Async<TcpStream> {
|
|
/// Creates a TCP connection to the specified address.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Async;
|
|
/// use std::net::{TcpStream, ToSocketAddrs};
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
|
|
/// let stream = Async::<TcpStream>::connect(addr).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn connect<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpStream>> {
|
|
// Begin async connect.
|
|
let stream = Async::new(nb_connect::tcp(addr)?)?;
|
|
|
|
// The stream becomes writable when connected.
|
|
stream.writable().await?;
|
|
|
|
// Check if there was an error while connecting.
|
|
match stream.get_ref().take_error()? {
|
|
None => Ok(stream),
|
|
Some(err) => Err(err),
|
|
}
|
|
}
|
|
|
|
/// Reads data from the stream without removing it from the buffer.
|
|
///
|
|
/// Returns the number of bytes read. Successive calls of this method read the same data.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Async;
|
|
/// use futures_lite::{io::AsyncWriteExt, stream::StreamExt};
|
|
/// use std::net::{TcpStream, ToSocketAddrs};
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
|
|
/// let mut stream = Async::<TcpStream>::connect(addr).await?;
|
|
///
|
|
/// stream
|
|
/// .write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
|
|
/// .await?;
|
|
///
|
|
/// let mut buf = [0u8; 1024];
|
|
/// let len = stream.peek(&mut buf).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
|
|
self.read_with(|io| io.peek(buf)).await
|
|
}
|
|
}
|
|
|
|
impl TryFrom<std::net::TcpStream> for Async<std::net::TcpStream> {
|
|
type Error = io::Error;
|
|
|
|
fn try_from(stream: std::net::TcpStream) -> io::Result<Self> {
|
|
Async::new(stream)
|
|
}
|
|
}
|
|
|
|
impl Async<UdpSocket> {
|
|
/// Creates a UDP socket bound to the specified address.
|
|
///
|
|
/// Binding with port number 0 will request an available port from the OS.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_io::Async;
|
|
/// use std::net::UdpSocket;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
|
|
/// println!("Bound to {}", socket.get_ref().local_addr()?);
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<UdpSocket>> {
|
|
let addr = addr.into();
|
|
Ok(Async::new(UdpSocket::bind(addr)?)?)
|
|
}
|
|
|
|
/// Receives a single datagram message.
|
|
///
|
|
/// Returns the number of bytes read and the address the message came from.
|
|
///
|
|
/// This method must be called with a valid byte slice of sufficient size to hold the message.
|
|
/// If the message is too long to fit, excess bytes may get discarded.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::net::UdpSocket;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
|
|
///
|
|
/// let mut buf = [0u8; 1024];
|
|
/// let (len, addr) = socket.recv_from(&mut buf).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
|
|
self.read_with(|io| io.recv_from(buf)).await
|
|
}
|
|
|
|
/// Receives a single datagram message without removing it from the queue.
|
|
///
|
|
/// Returns the number of bytes read and the address the message came from.
|
|
///
|
|
/// This method must be called with a valid byte slice of sufficient size to hold the message.
|
|
/// If the message is too long to fit, excess bytes may get discarded.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::net::UdpSocket;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
|
|
///
|
|
/// let mut buf = [0u8; 1024];
|
|
/// let (len, addr) = socket.peek_from(&mut buf).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
|
|
self.read_with(|io| io.peek_from(buf)).await
|
|
}
|
|
|
|
/// Sends data to the specified address.
|
|
///
|
|
/// Returns the number of bytes writen.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::net::UdpSocket;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
|
|
/// let addr = socket.get_ref().local_addr()?;
|
|
///
|
|
/// let msg = b"hello";
|
|
/// let len = socket.send_to(msg, addr).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn send_to<A: Into<SocketAddr>>(&self, buf: &[u8], addr: A) -> io::Result<usize> {
|
|
let addr = addr.into();
|
|
self.write_with(|io| io.send_to(buf, addr)).await
|
|
}
|
|
|
|
/// Receives a single datagram message from the connected peer.
|
|
///
|
|
/// Returns the number of bytes read.
|
|
///
|
|
/// This method must be called with a valid byte slice of sufficient size to hold the message.
|
|
/// If the message is too long to fit, excess bytes may get discarded.
|
|
///
|
|
/// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
|
|
/// This method will fail if the socket is not connected.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::net::UdpSocket;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
|
|
/// socket.get_ref().connect("127.0.0.1:9000")?;
|
|
///
|
|
/// let mut buf = [0u8; 1024];
|
|
/// let len = socket.recv(&mut buf).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
|
|
self.read_with(|io| io.recv(buf)).await
|
|
}
|
|
|
|
/// Receives a single datagram message from the connected peer without removing it from the
|
|
/// queue.
|
|
///
|
|
/// Returns the number of bytes read and the address the message came from.
|
|
///
|
|
/// This method must be called with a valid byte slice of sufficient size to hold the message.
|
|
/// If the message is too long to fit, excess bytes may get discarded.
|
|
///
|
|
/// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
|
|
/// This method will fail if the socket is not connected.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::net::UdpSocket;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
|
|
/// socket.get_ref().connect("127.0.0.1:9000")?;
|
|
///
|
|
/// let mut buf = [0u8; 1024];
|
|
/// let len = socket.peek(&mut buf).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
|
|
self.read_with(|io| io.peek(buf)).await
|
|
}
|
|
|
|
/// Sends data to the connected peer.
|
|
///
|
|
/// Returns the number of bytes written.
|
|
///
|
|
/// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
|
|
/// This method will fail if the socket is not connected.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::net::UdpSocket;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
|
|
/// socket.get_ref().connect("127.0.0.1:9000")?;
|
|
///
|
|
/// let msg = b"hello";
|
|
/// let len = socket.send(msg).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
|
|
self.write_with(|io| io.send(buf)).await
|
|
}
|
|
}
|
|
|
|
impl TryFrom<std::net::UdpSocket> for Async<std::net::UdpSocket> {
|
|
type Error = io::Error;
|
|
|
|
fn try_from(socket: std::net::UdpSocket) -> io::Result<Self> {
|
|
Async::new(socket)
|
|
}
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
impl Async<UnixListener> {
|
|
/// Creates a UDS listener bound to the specified path.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::os::unix::net::UnixListener;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
|
|
/// println!("Listening on {:?}", listener.get_ref().local_addr()?);
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixListener>> {
|
|
let path = path.as_ref().to_owned();
|
|
Ok(Async::new(UnixListener::bind(path)?)?)
|
|
}
|
|
|
|
/// Accepts a new incoming UDS stream connection.
|
|
///
|
|
/// When a connection is established, it will be returned as a stream together with its remote
|
|
/// address.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::os::unix::net::UnixListener;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
|
|
/// let (stream, addr) = listener.accept().await?;
|
|
/// println!("Accepted client: {:?}", addr);
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn accept(&self) -> io::Result<(Async<UnixStream>, UnixSocketAddr)> {
|
|
let (stream, addr) = self.read_with(|io| io.accept()).await?;
|
|
Ok((Async::new(stream)?, addr))
|
|
}
|
|
|
|
/// Returns a stream of incoming UDS connections.
|
|
///
|
|
/// The stream is infinite, i.e. it never stops with a [`None`] item.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use futures_lite::{pin, stream::StreamExt};
|
|
/// use std::os::unix::net::UnixListener;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
|
|
/// let incoming = listener.incoming();
|
|
/// pin!(incoming);
|
|
///
|
|
/// while let Some(stream) = incoming.next().await {
|
|
/// let stream = stream?;
|
|
/// println!("Accepted client: {:?}", stream.get_ref().peer_addr()?);
|
|
/// }
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Send + '_ {
|
|
stream::unfold(self, |listener| async move {
|
|
let res = listener.accept().await.map(|(stream, _)| stream);
|
|
Some((res, listener))
|
|
})
|
|
}
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
impl TryFrom<std::os::unix::net::UnixListener> for Async<std::os::unix::net::UnixListener> {
|
|
type Error = io::Error;
|
|
|
|
fn try_from(listener: std::os::unix::net::UnixListener) -> io::Result<Self> {
|
|
Async::new(listener)
|
|
}
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
impl Async<UnixStream> {
|
|
/// Creates a UDS stream connected to the specified path.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::os::unix::net::UnixStream;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let stream = Async::<UnixStream>::connect("/tmp/socket").await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixStream>> {
|
|
// Begin async connect.
|
|
let stream = Async::new(nb_connect::unix(path)?)?;
|
|
|
|
// The stream becomes writable when connected.
|
|
stream.writable().await?;
|
|
|
|
// On Linux, it appears the socket may become writable even when connecting fails, so we
|
|
// must do an extra check here and see if the peer address is retrievable.
|
|
stream.get_ref().peer_addr()?;
|
|
Ok(stream)
|
|
}
|
|
|
|
/// Creates an unnamed pair of connected UDS stream sockets.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::os::unix::net::UnixStream;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let (stream1, stream2) = Async::<UnixStream>::pair()?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub fn pair() -> io::Result<(Async<UnixStream>, Async<UnixStream>)> {
|
|
let (stream1, stream2) = UnixStream::pair()?;
|
|
Ok((Async::new(stream1)?, Async::new(stream2)?))
|
|
}
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
impl TryFrom<std::os::unix::net::UnixStream> for Async<std::os::unix::net::UnixStream> {
|
|
type Error = io::Error;
|
|
|
|
fn try_from(stream: std::os::unix::net::UnixStream) -> io::Result<Self> {
|
|
Async::new(stream)
|
|
}
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
impl Async<UnixDatagram> {
|
|
/// Creates a UDS datagram socket bound to the specified path.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::os::unix::net::UnixDatagram;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let socket = Async::<UnixDatagram>::bind("/tmp/socket")?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixDatagram>> {
|
|
let path = path.as_ref().to_owned();
|
|
Ok(Async::new(UnixDatagram::bind(path)?)?)
|
|
}
|
|
|
|
/// Creates a UDS datagram socket not bound to any address.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::os::unix::net::UnixDatagram;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let socket = Async::<UnixDatagram>::unbound()?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub fn unbound() -> io::Result<Async<UnixDatagram>> {
|
|
Ok(Async::new(UnixDatagram::unbound()?)?)
|
|
}
|
|
|
|
/// Creates an unnamed pair of connected Unix datagram sockets.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::os::unix::net::UnixDatagram;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let (socket1, socket2) = Async::<UnixDatagram>::pair()?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub fn pair() -> io::Result<(Async<UnixDatagram>, Async<UnixDatagram>)> {
|
|
let (socket1, socket2) = UnixDatagram::pair()?;
|
|
Ok((Async::new(socket1)?, Async::new(socket2)?))
|
|
}
|
|
|
|
/// Receives data from the socket.
|
|
///
|
|
/// Returns the number of bytes read and the address the message came from.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::os::unix::net::UnixDatagram;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let socket = Async::<UnixDatagram>::bind("/tmp/socket")?;
|
|
///
|
|
/// let mut buf = [0u8; 1024];
|
|
/// let (len, addr) = socket.recv_from(&mut buf).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> {
|
|
self.read_with(|io| io.recv_from(buf)).await
|
|
}
|
|
|
|
/// Sends data to the specified address.
|
|
///
|
|
/// Returns the number of bytes written.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::os::unix::net::UnixDatagram;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let socket = Async::<UnixDatagram>::unbound()?;
|
|
///
|
|
/// let msg = b"hello";
|
|
/// let addr = "/tmp/socket";
|
|
/// let len = socket.send_to(msg, addr).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> {
|
|
self.write_with(|io| io.send_to(buf, &path)).await
|
|
}
|
|
|
|
/// Receives data from the connected peer.
|
|
///
|
|
/// Returns the number of bytes read and the address the message came from.
|
|
///
|
|
/// The [`connect`][`UnixDatagram::connect()`] method connects this socket to a remote address.
|
|
/// This method will fail if the socket is not connected.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::os::unix::net::UnixDatagram;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let socket = Async::<UnixDatagram>::bind("/tmp/socket1")?;
|
|
/// socket.get_ref().connect("/tmp/socket2")?;
|
|
///
|
|
/// let mut buf = [0u8; 1024];
|
|
/// let len = socket.recv(&mut buf).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
|
|
self.read_with(|io| io.recv(buf)).await
|
|
}
|
|
|
|
/// Sends data to the connected peer.
|
|
///
|
|
/// Returns the number of bytes written.
|
|
///
|
|
/// The [`connect`][`UnixDatagram::connect()`] method connects this socket to a remote address.
|
|
/// This method will fail if the socket is not connected.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use async_io::Async;
|
|
/// use std::os::unix::net::UnixDatagram;
|
|
///
|
|
/// # futures_lite::future::block_on(async {
|
|
/// let socket = Async::<UnixDatagram>::bind("/tmp/socket1")?;
|
|
/// socket.get_ref().connect("/tmp/socket2")?;
|
|
///
|
|
/// let msg = b"hello";
|
|
/// let len = socket.send(msg).await?;
|
|
/// # std::io::Result::Ok(()) });
|
|
/// ```
|
|
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
|
|
self.write_with(|io| io.send(buf)).await
|
|
}
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
impl TryFrom<std::os::unix::net::UnixDatagram> for Async<std::os::unix::net::UnixDatagram> {
|
|
type Error = io::Error;
|
|
|
|
fn try_from(socket: std::os::unix::net::UnixDatagram) -> io::Result<Self> {
|
|
Async::new(socket)
|
|
}
|
|
}
|
|
|
|
/// Polls a future once, waits for a wakeup, and then optimistically assumes the future is ready.
|
|
async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()> {
|
|
let mut polled = false;
|
|
pin!(fut);
|
|
|
|
future::poll_fn(|cx| {
|
|
if !polled {
|
|
polled = true;
|
|
fut.as_mut().poll(cx)
|
|
} else {
|
|
Poll::Ready(Ok(()))
|
|
}
|
|
})
|
|
.await
|
|
}
|