More comments

This commit is contained in:
Stjepan Glavina 2020-04-17 10:42:37 +02:00
parent dc6e936c75
commit 28a09ca2bb
1 changed files with 183 additions and 34 deletions

View File

@ -60,24 +60,24 @@
//!
//! Look inside the [examples] directory for more.
//!
//! Those examples show how to read a [file][read-file] or
//! [directory][read-directory], [spawn][process-run] a process or read its
//! [output][process-output], use timers to [sleep][timer-sleep] or set a
//! [timeout][timer-timeout], catch the [Ctrl-C][ctrl-c] signal and [gracefully shut down] TODO,
//! The examples show how to read a [file][read-file] or [directory][read-directory],
//! [spawn][process-run] a process and read its [output][process-output], use timers to
//! [sleep][timer-sleep] or set a [timeout][timer-timeout], catch the [Ctrl-C][ctrl-c] signal, and
//! redirect standard [input to output][stdin-to-stdout].
//!
//! They include a simple TCP [client][tcp-client]/[server][tcp-server], a chat
//! [client][chat-client]/[server][chat-server] over TCP, a [web crawler][web-crawler], a simple
//! They also include a [web crawler][web-crawler], a simple TCP
//! [client][tcp-client]/[server][tcp-server], a TCP chat
//! [client][chat-client]/[server][chat-server], a simple
//! TLS [client][tls-client]/[server][tls-server], a simple
//! HTTP+TLS [client][simple-client]/[server][simple-server], a [hyper]
//! [client][hyper-client]/[server][hyper-server], an [async-h1]
//! [client][async-h1-client]/[server][async-h1-server], a WebSocket+TLS
//! [client][async-h1-client]/[server][async-h1-server], and a WebSocket+TLS
//! [client][websocket-client]/[server][websocket-server].
//!
//! Even non-async libraries can be plugged into this runtime. See examples for [inotify],
//! Even non-async libraries can be plugged into this runtime: see how to use [inotify],
//! [timerfd], [signal-hook], and [uds_windows].
//!
//! You can also mix this runtime with [async-std] and [tokio], or use runtime-specific
//! Finally, you can mix this runtime with [async-std] and [tokio], or use runtime-specific
//! libraries like [reqwest].
//!
//! [epoll]: https://en.wikipedia.org/wiki/Epoll
@ -394,7 +394,7 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
})
}
/// Runs executors and the reactor.
/// Runs executors and polls the reactor.
///
/// TODO a thread-pool example with num_cpus::get().max(1)
/// TODO a stoppable thread-pool with channels
@ -583,6 +583,7 @@ impl ThreadLocalExecutor {
/// Executes a batch of tasks and returns `true` if there are more tasks to run.
fn execute(&self) -> bool {
// Execute 4 series of 50 tasks.
for _ in 0..4 {
for _ in 0..50 {
// Find the next task to run.
@ -766,6 +767,7 @@ struct Worker {
impl Worker {
/// Executes a batch of tasks and returns `true` if there are more tasks to run.
fn execute(&self) -> bool {
// Execute 4 series of 50 tasks.
for _ in 0..4 {
for _ in 0..50 {
// Find the next task to run.
@ -1192,6 +1194,8 @@ impl Future for Timer {
/// Async I/O.
///
/// TODO
/// TODO: suggest using Shared to split
/// TODO: suggest using Lock if Async<T> is not splittable
#[derive(Debug)]
pub struct Async<T> {
/// A source registered in the reactor.
@ -1308,10 +1312,11 @@ impl<T> Async<T> {
mut op: impl FnMut() -> io::Result<R>,
source: &Source,
) -> Poll<io::Result<R>> {
// Throttle if the current task has done too many I/O operations without yielding.
// Throttle if the current task did too many I/O operations without yielding.
futures::ready!(poll_throttle(cx));
loop {
// This number is bumped just before I/O notifications while wakers are locked.
let tick = source.tick.load(Ordering::Acquire);
// Attempt the non-blocking operation.
@ -1322,29 +1327,34 @@ impl<T> Async<T> {
// Lock the waker list and retry the non-blocking operation.
let mut wakers = source.wakers.lock();
// If the current task is already registered, return.
if wakers.iter().any(|w| w.will_wake(cx.waker())) {
break;
return Poll::Pending;
}
// If there were no new notifications, register and return.
if source.tick.load(Ordering::Acquire) == tick {
wakers.push(cx.waker().clone());
break;
return Poll::Pending;
}
}
Poll::Pending
}
}
impl<T> Drop for Async<T> {
fn drop(&mut self) {
if self.io.is_some() {
// Destructors should not panic.
// Deregister and ignore errors because destructors should not panic.
let _ = Reactor::get().deregister(&self.source);
// Drop the I/O handle to close it.
self.io.take();
}
}
}
/// Pins a future and then polls it.
fn poll_future<T>(cx: &mut Context<'_>, fut: impl Future<Output = T>) -> Poll<T> {
futures::pin_mut!(fut);
fut.poll(cx)
@ -1414,21 +1424,27 @@ where
impl Async<TcpListener> {
/// Creates a listener bound to the specified address.
///
/// TODO
pub fn bind<A: ToString>(addr: A) -> io::Result<Async<TcpListener>> {
let addr = addr
.to_string()
.parse::<SocketAddr>()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
TcpListener::bind(addr).and_then(Async::new)
Ok(Async::new(TcpListener::bind(addr)?)?)
}
/// Accepts a new incoming connection.
///
/// TODO
pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> {
let (stream, addr) = self.with(|io| io.accept()).await?;
Ok((Async::new(stream)?, addr))
}
/// Returns a stream over incoming connections.
///
/// TODO
pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Send + Unpin + '_ {
Box::pin(stream::unfold(self, |listener| async move {
let res = listener.accept().await.map(|(stream, _)| stream);
@ -1439,6 +1455,8 @@ impl Async<TcpListener> {
impl Async<TcpStream> {
/// Connects to the specified address.
///
/// TODO
pub async fn connect<A: ToString>(addr: A) -> io::Result<Async<TcpStream>> {
let addr = addr.to_string();
let addr = Task::blocking(async move {
@ -1461,20 +1479,22 @@ impl Async<TcpStream> {
let _ = socket.connect(&addr.into());
let stream = Async::new(socket.into_tcp_stream())?;
// Wait for connect to complete.
let wait_connect = |mut stream: &TcpStream| match stream.write(&[]) {
// Waits until the stream becomes writable.
let wait_writable = |mut stream: &TcpStream| match stream.write(&[]) {
Err(err) if err.kind() == io::ErrorKind::NotConnected => {
Err(io::Error::new(io::ErrorKind::WouldBlock, ""))
}
res => res.map(|_| ()),
};
// The stream becomes writable when connected.
stream.with(|io| wait_connect(io)).await?;
stream.with(|io| wait_writable(io)).await?;
Ok(stream)
}
/// Receives data from the stream without removing it from the buffer.
///
/// TODO
pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.with(|io| io.peek(buf)).await
}
@ -1482,41 +1502,55 @@ impl Async<TcpStream> {
impl Async<UdpSocket> {
/// Creates a socket bound to the specified address.
///
/// TODO
pub fn bind<A: ToString>(addr: A) -> io::Result<Async<UdpSocket>> {
let addr = addr
.to_string()
.parse::<SocketAddr>()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
UdpSocket::bind(addr).and_then(Async::new)
Ok(Async::new(UdpSocket::bind(addr)?)?)
}
/// Sends data to the specified address.
///
/// TODO
pub async fn send_to<A: Into<SocketAddr>>(&self, buf: &[u8], addr: A) -> io::Result<usize> {
let addr = addr.into();
self.with(|io| io.send_to(buf, addr)).await
}
/// Sends data to the socket's peer.
///
/// TODO
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.with(|io| io.send(buf)).await
}
/// Receives data from the socket.
///
/// TODO
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.with(|io| io.recv_from(buf)).await
}
/// Receives data from the socket's peer.
///
/// TODO
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.with(|io| io.recv(buf)).await
}
/// Receives data without removing it from the buffer.
///
/// TODO
pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.with(|io| io.peek_from(buf)).await
}
/// Receives data from the socket's peer without removing it from the buffer.
///
/// TODO
pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.with(|io| io.peek(buf)).await
}
@ -1525,18 +1559,24 @@ impl Async<UdpSocket> {
#[cfg(unix)]
impl Async<UnixListener> {
/// Creates a listener bound to the specified path.
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixListener>> {
///
/// TODO
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixListener>> {
let path = path.as_ref().to_owned();
blocking!(UnixListener::bind(path)).and_then(Async::new)
Ok(Async::new(UnixListener::bind(path)?)?)
}
/// Accepts a new incoming connection.
///
/// TODO
pub async fn accept(&self) -> io::Result<(Async<UnixStream>, UnixSocketAddr)> {
let (stream, addr) = self.with(|io| io.accept()).await?;
Ok((Async::new(stream)?, addr))
}
/// Returns a stream over incoming connections.
///
/// TODO
pub fn incoming(
&self,
) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Send + Unpin + '_ {
@ -1550,12 +1590,16 @@ impl Async<UnixListener> {
#[cfg(unix)]
impl Async<UnixStream> {
/// Connects to the specified path.
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixStream>> {
///
/// TODO
pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixStream>> {
let path = path.as_ref().to_owned();
blocking!(UnixStream::connect(path)).and_then(Async::new)
Ok(Async::new(UnixStream::connect(path)?)?)
}
/// Creates an unnamed pair of connected streams.
///
/// TODO
pub fn pair() -> io::Result<(Async<UnixStream>, Async<UnixStream>)> {
let (stream1, stream2) = UnixStream::pair()?;
Ok((Async::new(stream1)?, Async::new(stream2)?))
@ -1565,38 +1609,52 @@ impl Async<UnixStream> {
#[cfg(unix)]
impl Async<UnixDatagram> {
/// Creates a socket bound to the specified path.
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixDatagram>> {
///
/// TODO
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixDatagram>> {
let path = path.as_ref().to_owned();
blocking!(UnixDatagram::bind(path)).and_then(Async::new)
Ok(Async::new(UnixDatagram::bind(path)?)?)
}
/// Creates a socket not bound to any address.
///
/// TODO
pub fn unbound() -> io::Result<Async<UnixDatagram>> {
UnixDatagram::unbound().and_then(Async::new)
Ok(Async::new(UnixDatagram::unbound()?)?)
}
/// Creates an unnamed pair of connected sockets.
///
/// TODO
pub fn pair() -> io::Result<(Async<UnixDatagram>, Async<UnixDatagram>)> {
let (socket1, socket2) = UnixDatagram::pair()?;
Ok((Async::new(socket1)?, Async::new(socket2)?))
}
/// Sends data to the specified address.
///
/// TODO
pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> {
self.with(|io| io.send_to(buf, &path)).await
}
/// Sends data to the socket's peer.
///
/// TODO
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.with(|io| io.send(buf)).await
}
/// Receives data from the socket.
///
/// TODO
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> {
self.with(|io| io.recv_from(buf)).await
}
/// Receives data from the socket's peer.
///
/// TODO
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.with(|io| io.recv(buf)).await
}
@ -1607,11 +1665,14 @@ impl Async<UnixDatagram> {
/// A boolean flag that is set whenever a thread-local task is woken by another thread.
///
/// Every time this flag's value is changed, an I/O event is triggered.
///
/// TODO
#[derive(Clone)]
struct IoEvent {
pipe: Arc<SelfPipe>,
}
/// TODO
impl IoEvent {
fn create() -> io::Result<IoEvent> {
Ok(IoEvent {
@ -1635,12 +1696,15 @@ impl IoEvent {
/// A boolean flag that triggers I/O events whenever changed.
///
/// https://cr.yp.to/docs/selfpipe.html
///
/// TODO
struct SelfPipe {
flag: AtomicBool,
writer: Socket,
reader: Async<Socket>,
}
/// TODO
impl SelfPipe {
/// Creates a self-pipe.
fn create() -> io::Result<SelfPipe> {
@ -1655,7 +1719,7 @@ impl SelfPipe {
}
/// Sets the flag to `true`.
// TODO: rename to raise() as in "raise a signal"?
// TODO: rename to raise() as in "raise a signal"? or even better: emit() or notify()
fn set(&self) {
// Publish all in-memory changes before setting the flag.
atomic::fence(Ordering::SeqCst);
@ -1696,6 +1760,7 @@ impl SelfPipe {
}
}
/// TODO
#[cfg(unix)]
fn pipe() -> io::Result<(Socket, Socket)> {
let (sock1, sock2) = Socket::pair(Domain::unix(), Type::stream(), None)?;
@ -1704,6 +1769,7 @@ fn pipe() -> io::Result<(Socket, Socket)> {
Ok((sock1, sock2))
}
/// TODO
#[cfg(windows)]
fn pipe() -> io::Result<(Socket, Socket)> {
// TODO The only portable way of manually triggering I/O events is to create a socket and
@ -1735,6 +1801,8 @@ fn pipe() -> io::Result<(Socket, Socket)> {
// ---------- Blocking executor ----------
/// TODO: docs
/// A thread pool for blocking tasks.
struct BlockingExecutor {
state: Mutex<State>,
@ -1827,6 +1895,8 @@ impl BlockingExecutor {
}
/// Spawns blocking code onto a thread.
///
/// TODO
#[macro_export]
macro_rules! blocking {
($($expr:tt)*) => {
@ -1835,11 +1905,16 @@ macro_rules! blocking {
}
/// Creates an iterator that runs on a thread.
///
/// TODO
pub fn iter<T: Send + 'static>(
iter: impl Iterator<Item = T> + Send + 'static,
) -> impl Stream<Item = T> + Send + Unpin + 'static {
/// Current state of the iterator.
enum State<T, I> {
/// The iterator is idle.
Idle(Option<I>),
/// The iterator is running in a blocking task and sending items into a channel.
Busy(piper::Receiver<T>, Task<I>),
}
@ -1854,24 +1929,39 @@ pub fn iter<T: Send + 'static>(
match &mut *self {
State::Idle(iter) => {
// If idle, take the iterator out to run it on a blocking task.
let mut iter = iter.take().unwrap();
let (sender, receiver) = piper::chan(8 * 1024);
// This channel capacity seems to work well in practice. If it's too low, there
// will be too much synchronization between tasks. If too high, memory
// consumption increases.
let (sender, receiver) = piper::chan(8 * 1024); // 8192 items
// Spawn a blocking task that runs the iterator and returns it when done.
let task = Task::blocking(async move {
for item in &mut iter {
sender.send(item).await;
}
iter
});
// Move into the busy state and poll again.
*self = State::Busy(receiver, task);
self.poll_next(cx)
}
State::Busy(receiver, task) => {
// Poll the channel.
let opt = futures::ready!(Pin::new(receiver).poll_next(cx));
// If the channel is closed, retrieve the iterator back from the blocking task.
// This is not really a required step, but it's cleaner to drop the iterator on
// the same thread that created it.
if opt.is_none() {
// At the end of stream, retrieve the iterator back.
// Poll the task to retrieve the iterator.
let iter = futures::ready!(Pin::new(task).poll(cx));
*self = State::Idle(Some(iter));
}
Poll::Ready(opt)
}
}
@ -1883,8 +1973,11 @@ pub fn iter<T: Send + 'static>(
/// Creates a reader that runs on a thread.
pub fn reader(reader: impl Read + Send + 'static) -> impl AsyncRead + Send + Unpin + 'static {
/// Current state of the reader.
enum State<T> {
/// The reader is idle.
Idle(Option<T>),
/// The reader is running in a blocking task and sending bytes into a pipe.
Busy(piper::Reader, Task<(io::Result<()>, T)>),
}
@ -1899,29 +1992,49 @@ pub fn reader(reader: impl Read + Send + 'static) -> impl AsyncRead + Send + Unp
match &mut *self {
State::Idle(io) => {
// If idle, take the I/O handle out to read it on a blocking task.
let mut io = io.take().unwrap();
// This pipe capacity seems to work well in practice. If it's too low, there
// will be too much synchronization between tasks. If too high, memory
// consumption increases.
let (reader, mut writer) = piper::pipe(8 * 1024 * 1024); // 8 MB
// Spawn a blocking task that reads and returns the I/O handle when done.
let task = Task::blocking(async move {
// Copy bytes from the I/O handle into the pipe until the pipe is closed or
// an error occurs.
let res = futures::io::copy(&mut io, &mut writer).await;
(res.map(drop), io)
});
// Move into the busy state and poll again.
*self = State::Busy(reader, task);
self.poll_read(cx, buf)
}
State::Busy(reader, task) => {
// Poll the pipe.
let n = futures::ready!(Pin::new(reader).poll_read(cx, buf))?;
// If the pipe is closed, retrieve the I/O handle back from the blocking task.
// This is not really a required step, but it's cleaner to drop the handle on
// the same thread that created it.
if n == 0 {
// At the end of stream, retrieve the reader back.
// Poll the task to retrieve the I/O handle.
let (res, io) = futures::ready!(Pin::new(task).poll(cx));
// Make sure to move into the idle state before reporting errors.
*self = State::Idle(Some(io));
res?;
}
Poll::Ready(Ok(n))
}
}
}
}
// It's okay to treat the `Read` type as `AsyncRead` because it's only read from inside a
// blocking task.
let io = Box::pin(AllowStdIo::new(reader));
State::Idle(Some(io))
}
@ -1932,22 +2045,36 @@ pub fn reader(reader: impl Read + Send + 'static) -> impl AsyncRead + Send + Unp
///
/// TODO
pub fn writer(writer: impl Write + Send + 'static) -> impl AsyncWrite + Send + Unpin + 'static {
/// Current state of the writer.
enum State<T> {
/// The writer is idle.
Idle(Option<T>),
/// The writer is running in a blocking task and receiving bytes from a pipe.
Busy(Option<piper::Writer>, Task<(io::Result<()>, T)>),
}
impl<T: AsyncWrite + Send + Unpin + 'static> State<T> {
/// Starts a blocking task.
fn start(&mut self) {
if let State::Idle(io) = self {
// If idle, take the I/O handle out to write on a blocking task.
let mut io = io.take().unwrap();
// This pipe capacity seems to work well in practice. If it's too low, there will
// be too much synchronization between tasks. If too high, memory consumption
// increases.
let (reader, writer) = piper::pipe(8 * 1024 * 1024); // 8 MB
// Spawn a blocking task that writes and returns the I/O handle when done.
let task = Task::blocking(async move {
// Copy bytes from the pipe into the I/O handle until the pipe is closed or an
// error occurs. Flush the I/O handle at the end.
match futures::io::copy(reader, &mut io).await {
Ok(_) => (io.flush().await, io),
Err(err) => (Err(err), io),
}
});
// Move into the busy state.
*self = State::Busy(Some(writer), task);
}
}
@ -1964,14 +2091,22 @@ pub fn writer(writer: impl Write + Send + 'static) -> impl AsyncWrite + Send + U
loop {
match &mut *self {
// The writer is idle and closed.
State::Idle(None) => return Poll::Ready(Ok(0)),
// The writer is idle and open - start a blocking task.
State::Idle(Some(_)) => self.start(),
// The task is flushing and in process of stopping.
State::Busy(None, task) => {
// The writing end of the pipe is closed, so await the task.
// Poll the task to retrieve the I/O handle.
let (res, io) = futures::ready!(Pin::new(task).poll(cx));
// Make sure to move into the idle state before reporting errors.
*self = State::Idle(Some(io));
res?;
}
// The writer is busy - write more bytes into the pipe.
State::Busy(Some(writer), _) => return Pin::new(writer).poll_write(cx, buf),
}
}
@ -1983,12 +2118,22 @@ pub fn writer(writer: impl Write + Send + 'static) -> impl AsyncWrite + Send + U
loop {
match &mut *self {
// The writer is idle and closed.
State::Idle(None) => return Poll::Ready(Ok(())),
// The writer is idle and open - start a blocking task.
State::Idle(Some(_)) => self.start(),
// The task is busy.
State::Busy(writer, task) => {
// Close the writing end of the pipe and await the task.
// Drop the writer to close the pipe. This stops the `futures::io::copy`
// operation in the task, after which the task flushes the I/O handle and
// returns it back.
writer.take();
// Poll the task to retrieve the I/O handle.
let (res, io) = futures::ready!(Pin::new(task).poll(cx));
// Make sure to move into the idle state before reporting errors.
*self = State::Idle(Some(io));
return Poll::Ready(res);
}
@ -1997,13 +2142,17 @@ pub fn writer(writer: impl Write + Send + 'static) -> impl AsyncWrite + Send + U
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
// Flush and then drop the I/O handle.
// First, make sure the I/O handle is flushed.
futures::ready!(Pin::new(&mut *self).poll_flush(cx))?;
// Then move into the idle state with no I/O handle, thus dropping it.
*self = State::Idle(None);
Poll::Ready(Ok(()))
}
}
// It's okay to treat the `Write` type as `AsyncWrite` because it's only written to inside a
// blocking task.
let io = AllowStdIo::new(writer);
State::Idle(Some(io))
}