poll_readable() and poll_writable() (#38)

This commit is contained in:
Stjepan Glavina 2020-11-09 15:41:51 +01:00 committed by GitHub
parent 2d35be1ad6
commit af26f326bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 192 additions and 102 deletions

View File

@ -76,7 +76,7 @@ use std::os::windows::io::{AsRawSocket, RawSocket};
use futures_lite::io::{AsyncRead, AsyncWrite};
use futures_lite::stream::{self, Stream};
use futures_lite::{future, pin};
use futures_lite::{future, pin, ready};
use crate::reactor::{Reactor, Source};
@ -310,9 +310,12 @@ impl Future for Timer {
/// try to do that, conflicting tasks will just keep waking each other in turn, thus wasting CPU
/// time.
///
/// This caveat only applies to [`AsyncRead`] and [`AsyncWrite`]. Any number of tasks can be
/// concurrently calling other methods like [`readable()`][`Async::readable()`] or
/// [`read_with()`][`Async::read_with()`].
/// Besides [`AsyncRead`] and [`AsyncWrite`], this caveat also applies to
/// [`poll_readable()`][`Async::poll_readable()`] and
/// [`poll_writable()`][`Async::poll_writable()`].
///
/// However, any number of tasks can be concurrently calling other methods like
/// [`readable()`][`Async::readable()`] or [`read_with()`][`Async::read_with()`].
///
/// ### Closing
///
@ -369,7 +372,7 @@ impl<T> Unpin for Async<T> {}
impl<T: AsRawFd> Async<T> {
/// Creates an async I/O handle.
///
/// This function will put the handle in non-blocking mode and register it in
/// This method will put the handle in non-blocking mode and register it in
/// [epoll]/[kqueue]/[event ports]/[wepoll].
///
/// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement
@ -423,7 +426,7 @@ impl<T: AsRawFd> AsRawFd for Async<T> {
impl<T: AsRawSocket> Async<T> {
/// Creates an async I/O handle.
///
/// This function will put the handle in non-blocking mode and register it in
/// This method will put the handle in non-blocking mode and register it in
/// [epoll]/[kqueue]/[event ports]/[wepoll].
///
/// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement
@ -539,7 +542,7 @@ impl<T> Async<T> {
/// Waits until the I/O handle is readable.
///
/// This function completes when a read operation on this I/O handle wouldn't block.
/// This method completes when a read operation on this I/O handle wouldn't block.
///
/// # Examples
///
@ -560,7 +563,7 @@ impl<T> Async<T> {
/// Waits until the I/O handle is writable.
///
/// This function completes when a write operation on this I/O handle wouldn't block.
/// This method completes when a write operation on this I/O handle wouldn't block.
///
/// # Examples
///
@ -580,9 +583,72 @@ impl<T> Async<T> {
self.source.writable().await
}
/// Polls the I/O handle for readability.
///
/// When this method returns [`Poll::Ready`], that means the OS has delivered an event
/// indicating readability since the last time this task has called the method and received
/// [`Poll::Pending`].
///
/// # Caveats
///
/// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
/// will just keep waking each other in turn, thus wasting CPU time.
///
/// Note that the [`AsyncRead`] implementation for [`Async`] also uses this method.
///
/// # Examples
///
/// ```no_run
/// use async_io::Async;
/// use futures_lite::future;
/// use std::net::TcpListener;
///
/// # futures_lite::future::block_on(async {
/// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
///
/// // Wait until a client can be accepted.
/// future::poll_fn(|cx| listener.poll_readable(cx)).await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.source.poll_readable(cx)
}
/// Polls the I/O handle for writability.
///
/// When this method returns [`Poll::Ready`], that means the OS has delivered an event
/// indicating writability since the last time this task has called the method and received
/// [`Poll::Pending`].
///
/// # Caveats
///
/// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
/// will just keep waking each other in turn, thus wasting CPU time.
///
/// Note that the [`AsyncWrite`] implementation for [`Async`] also uses this method.
///
/// # Examples
///
/// ```
/// use async_io::Async;
/// use futures_lite::future;
/// use std::net::{TcpStream, ToSocketAddrs};
///
/// # futures_lite::future::block_on(async {
/// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
/// let stream = Async::<TcpStream>::connect(addr).await?;
///
/// // Wait until the stream is writable.
/// future::poll_fn(|cx| stream.poll_writable(cx)).await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.source.poll_writable(cx)
}
/// Performs a read operation asynchronously.
///
/// The I/O handle is registered in the reactor and put in non-blocking mode. This function
/// The I/O handle is registered in the reactor and put in non-blocking mode. This method
/// invokes the `op` closure in a loop until it succeeds or returns an error other than
/// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
/// sends a notification that the I/O handle is readable.
@ -615,7 +681,7 @@ impl<T> Async<T> {
/// Performs a read operation asynchronously.
///
/// The I/O handle is registered in the reactor and put in non-blocking mode. This function
/// The I/O handle is registered in the reactor and put in non-blocking mode. This method
/// invokes the `op` closure in a loop until it succeeds or returns an error other than
/// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
/// sends a notification that the I/O handle is readable.
@ -651,7 +717,7 @@ impl<T> Async<T> {
/// Performs a write operation asynchronously.
///
/// The I/O handle is registered in the reactor and put in non-blocking mode. This function
/// The I/O handle is registered in the reactor and put in non-blocking mode. This method
/// invokes the `op` closure in a loop until it succeeds or returns an error other than
/// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
/// sends a notification that the I/O handle is writable.
@ -685,7 +751,7 @@ impl<T> Async<T> {
/// Performs a write operation asynchronously.
///
/// The I/O handle is registered in the reactor and put in non-blocking mode. This function
/// The I/O handle is registered in the reactor and put in non-blocking mode. This method
/// invokes the `op` closure in a loop until it succeeds or returns an error other than
/// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
/// sends a notification that the I/O handle is writable.
@ -739,12 +805,13 @@ impl<T: Read> AsyncRead for Async<T> {
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match (&mut *self).get_mut().read(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
loop {
match (&mut *self).get_mut().read(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_readable(cx))?;
}
self.source.register_reader(cx.waker())?;
Poll::Pending
}
fn poll_read_vectored(
@ -752,12 +819,13 @@ impl<T: Read> AsyncRead for Async<T> {
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
match (&mut *self).get_mut().read_vectored(bufs) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
loop {
match (&mut *self).get_mut().read_vectored(bufs) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_readable(cx))?;
}
self.source.register_reader(cx.waker())?;
Poll::Pending
}
}
@ -770,12 +838,13 @@ where
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match (&*self).get_ref().read(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
loop {
match (&*self).get_ref().read(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_readable(cx))?;
}
self.source.register_reader(cx.waker())?;
Poll::Pending
}
fn poll_read_vectored(
@ -783,12 +852,13 @@ where
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
match (&*self).get_ref().read_vectored(bufs) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
loop {
match (&*self).get_ref().read_vectored(bufs) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_readable(cx))?;
}
self.source.register_reader(cx.waker())?;
Poll::Pending
}
}
@ -798,12 +868,13 @@ impl<T: Write> AsyncWrite for Async<T> {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
match (&mut *self).get_mut().write(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
loop {
match (&mut *self).get_mut().write(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_writable(cx))?;
}
self.source.register_writer(cx.waker())?;
Poll::Pending
}
fn poll_write_vectored(
@ -811,21 +882,23 @@ impl<T: Write> AsyncWrite for Async<T> {
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
match (&mut *self).get_mut().write_vectored(bufs) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
loop {
match (&mut *self).get_mut().write_vectored(bufs) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_writable(cx))?;
}
self.source.register_writer(cx.waker())?;
Poll::Pending
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match (&mut *self).get_mut().flush() {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
loop {
match (&mut *self).get_mut().flush() {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_writable(cx))?;
}
self.source.register_writer(cx.waker())?;
Poll::Pending
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
@ -842,12 +915,13 @@ where
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
match (&*self).get_ref().write(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
loop {
match (&*self).get_ref().write(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_writable(cx))?;
}
self.source.register_writer(cx.waker())?;
Poll::Pending
}
fn poll_write_vectored(
@ -855,21 +929,23 @@ where
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
match (&*self).get_ref().write_vectored(bufs) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
loop {
match (&*self).get_ref().write_vectored(bufs) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_writable(cx))?;
}
self.source.register_writer(cx.waker())?;
Poll::Pending
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match (&*self).get_ref().flush() {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
loop {
match (&*self).get_ref().flush() {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
ready!(self.poll_writable(cx))?;
}
self.source.register_writer(cx.waker())?;
Poll::Pending
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {

View File

@ -8,7 +8,7 @@ use std::os::windows::io::RawSocket;
use std::panic;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Poll, Waker};
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};
use concurrent_queue::ConcurrentQueue;
@ -353,10 +353,15 @@ struct Direction {
/// Last reactor tick that delivered an event.
tick: usize,
/// Waker stored by `AsyncRead` or `AsyncWrite`.
/// Ticks remembered by `Async::poll_readable()` or `Async::poll_writable()`.
ticks: Option<(usize, usize)>,
/// Waker stored by `Async::poll_readable()` or `Async::poll_writable()`.
waker: Option<Waker>,
/// Wakers of tasks waiting for the next event.
///
/// Registered by `Async::readable()` and `Async::writable()`.
wakers: Arena<Option<Waker>>,
}
@ -380,45 +385,45 @@ impl Direction {
}
impl Source {
/// Registers a waker from `AsyncRead`.
/// Polls the I/O source for readability.
pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_ready(READ, cx)
}
/// Polls the I/O source for writability.
pub(crate) fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_ready(WRITE, cx)
}
/// Registers a waker from `poll_readable()` or `poll_writable()`.
///
/// If a different waker is already registered, it gets replaced and woken.
pub(crate) fn register_reader(&self, waker: &Waker) -> io::Result<()> {
self.register(READ, waker)
}
/// Registers a waker from `AsyncWrite`.
///
/// If a different waker is already registered, it gets replaced and woken.
pub(crate) fn register_writer(&self, waker: &Waker) -> io::Result<()> {
self.register(WRITE, waker)
}
/// Waits until the I/O source is readable.
pub(crate) async fn readable(&self) -> io::Result<()> {
self.ready(READ).await
}
/// Waits until the I/O source is writable.
pub(crate) async fn writable(&self) -> io::Result<()> {
self.ready(WRITE).await
}
/// Registers a waker from `AsyncRead` or `AsyncWrite`.
///
/// If a different waker is already registered, it gets replaced and woken.
fn register(&self, dir: usize, waker: &Waker) -> io::Result<()> {
fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut state = self.state.lock().unwrap();
// Check if the reactor has delivered an event.
if let Some((a, b)) = state[dir].ticks {
// If `state[dir].tick` has changed to a value other than the old reactor tick,
// that means a newer reactor tick has delivered an event.
if state[dir].tick != a && state[dir].tick != b {
state[dir].ticks = None;
return Poll::Ready(Ok(()));
}
}
let was_empty = state[dir].is_empty();
// Register the current task's waker.
if let Some(w) = state[dir].waker.take() {
if w.will_wake(waker) {
if w.will_wake(cx.waker()) {
state[dir].waker = Some(w);
return Ok(());
return Poll::Pending;
}
// Wake the previous waker because it's going to get replaced.
panic::catch_unwind(|| w.wake()).ok();
}
state[dir].waker = Some(waker.clone());
state[dir].waker = Some(cx.waker().clone());
state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick));
// Update interest in this I/O handle.
if was_empty {
@ -432,6 +437,20 @@ impl Source {
)?;
}
Poll::Pending
}
/// Waits until the I/O source is readable.
pub(crate) async fn readable(&self) -> io::Result<()> {
self.ready(READ).await?;
log::trace!("readable: fd={}", self.raw);
Ok(())
}
/// Waits until the I/O source is writable.
pub(crate) async fn writable(&self) -> io::Result<()> {
self.ready(WRITE).await?;
log::trace!("writable: fd={}", self.raw);
Ok(())
}
@ -444,12 +463,11 @@ impl Source {
future::poll_fn(|cx| {
let mut state = self.state.lock().unwrap();
// Check if the reactor has delivered a readability event.
// Check if the reactor has delivered an event.
if let Some((a, b)) = ticks {
// If `state.read.tick` has changed to a value other than the old reactor tick,
// that means a newer reactor tick has delivered a readability event.
// If `state[dir].tick` has changed to a value other than the old reactor tick,
// that means a newer reactor tick has delivered an event.
if state[dir].tick != a && state[dir].tick != b {
log::trace!("readable: fd={}", self.raw);
return Poll::Ready(Ok(()));
}
}
@ -466,6 +484,7 @@ impl Source {
state[dir].wakers.remove(i);
}));
index = Some(i);
ticks = Some((Reactor::get().ticker(), state[dir].tick));
i
}
};
@ -483,11 +502,6 @@ impl Source {
)?;
}
// Remember the current ticks.
if ticks.is_none() {
ticks = Some((Reactor::get().ticker(), state[dir].tick));
}
Poll::Pending
})
.await