Special waker for AsyncRead/AsyncWrite

This commit is contained in:
Stjepan Glavina 2020-09-27 21:20:28 +02:00
parent e49452d7f0
commit 671f0353c4
2 changed files with 233 additions and 112 deletions

View File

@ -77,7 +77,7 @@ use std::os::windows::io::{AsRawSocket, FromRawSocket, RawSocket};
use futures_lite::io::{AsyncRead, AsyncWrite};
use futures_lite::stream::{self, Stream};
use futures_lite::{future, pin};
use futures_lite::{future, pin, ready};
use crate::reactor::{Reactor, Source};
@ -593,7 +593,7 @@ impl<T> Async<T> {
// If there are no blocked readers, attempt the read operation.
if !self.source.readers_registered() {
// Yield with some small probability - this improves fairness.
maybe_yield().await;
future::poll_fn(|cx| maybe_yield(cx)).await;
match op(self.get_ref()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
@ -637,7 +637,7 @@ impl<T> Async<T> {
// If there are no blocked readers, attempt the read operation.
if !self.source.readers_registered() {
// Yield with some small probability - this improves fairness.
maybe_yield().await;
future::poll_fn(|cx| maybe_yield(cx)).await;
match op(self.get_mut()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
@ -679,7 +679,7 @@ impl<T> Async<T> {
// If there are no blocked readers, attempt the write operation.
if !self.source.writers_registered() {
// Yield with some small probability - this improves fairness.
maybe_yield().await;
future::poll_fn(|cx| maybe_yield(cx)).await;
match op(self.get_ref()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
@ -724,7 +724,7 @@ impl<T> Async<T> {
// If there are no blocked readers, attempt the write operation.
if !self.source.writers_registered() {
// Yield with some small probability - this improves fairness.
maybe_yield().await;
future::poll_fn(|cx| maybe_yield(cx)).await;
match op(self.get_mut()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
@ -756,7 +756,13 @@ impl<T: Read> AsyncRead for Async<T> {
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
poll_future(cx, self.read_with_mut(|io| io.read(buf)))
ready!(maybe_yield(cx));
match (&mut *self).get_mut().read(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
self.source.register_reader(cx.waker())?;
Poll::Pending
}
fn poll_read_vectored(
@ -764,7 +770,13 @@ impl<T: Read> AsyncRead for Async<T> {
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
poll_future(cx, self.read_with_mut(|io| io.read_vectored(bufs)))
ready!(maybe_yield(cx));
match (&mut *self).get_mut().read_vectored(bufs) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
self.source.register_reader(cx.waker())?;
Poll::Pending
}
}
@ -777,7 +789,13 @@ where
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
poll_future(cx, self.read_with(|io| (&*io).read(buf)))
ready!(maybe_yield(cx));
match (&*self).get_ref().read(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
self.source.register_reader(cx.waker())?;
Poll::Pending
}
fn poll_read_vectored(
@ -785,7 +803,13 @@ where
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
poll_future(cx, self.read_with(|io| (&*io).read_vectored(bufs)))
ready!(maybe_yield(cx));
match (&*self).get_ref().read_vectored(bufs) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
self.source.register_reader(cx.waker())?;
Poll::Pending
}
}
@ -795,7 +819,13 @@ impl<T: Write> AsyncWrite for Async<T> {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
poll_future(cx, self.write_with_mut(|io| io.write(buf)))
ready!(maybe_yield(cx));
match (&mut *self).get_mut().write(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
self.source.register_writer(cx.waker())?;
Poll::Pending
}
fn poll_write_vectored(
@ -803,11 +833,23 @@ impl<T: Write> AsyncWrite for Async<T> {
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
poll_future(cx, self.write_with_mut(|io| io.write_vectored(bufs)))
ready!(maybe_yield(cx));
match (&mut *self).get_mut().write_vectored(bufs) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
self.source.register_writer(cx.waker())?;
Poll::Pending
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
poll_future(cx, self.write_with_mut(|io| io.flush()))
ready!(maybe_yield(cx));
match (&mut *self).get_mut().flush() {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
self.source.register_writer(cx.waker())?;
Poll::Pending
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
@ -824,7 +866,13 @@ where
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
poll_future(cx, self.write_with(|io| (&*io).write(buf)))
ready!(maybe_yield(cx));
match (&*self).get_ref().write(buf) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
self.source.register_writer(cx.waker())?;
Poll::Pending
}
fn poll_write_vectored(
@ -832,11 +880,23 @@ where
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
poll_future(cx, self.write_with(|io| (&*io).write_vectored(bufs)))
ready!(maybe_yield(cx));
match (&*self).get_ref().write_vectored(bufs) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
self.source.register_writer(cx.waker())?;
Poll::Pending
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
poll_future(cx, self.write_with(|io| (&*io).flush()))
ready!(maybe_yield(cx));
match (&*self).get_ref().flush() {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
self.source.register_writer(cx.waker())?;
Poll::Pending
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
@ -1462,12 +1522,6 @@ impl TryFrom<std::os::unix::net::UnixDatagram> for Async<std::os::unix::net::Uni
}
}
/// Polls a future once.
fn poll_future<T>(cx: &mut Context<'_>, fut: impl Future<Output = T>) -> Poll<T> {
pin!(fut);
fut.poll(cx)
}
/// Polls a future once, waits for a wakeup, and then optimistically assumes the future is ready.
async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()> {
let mut polled = false;
@ -1484,9 +1538,16 @@ async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()>
.await
}
/// Yield with some small probability.
async fn maybe_yield() {
/// Yields with some small probability.
///
/// If a task is doing a lot of I/O and never gets blocked on it, it may keep the executor busy
/// forever without giving other tasks a chance to run. To prevent this kind of task starvation,
/// I/O operations yield randomly even if they are ready.
fn maybe_yield(cx: &mut Context<'_>) -> Poll<()> {
if fastrand::usize(..100) == 0 {
future::yield_now().await;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}

View File

@ -280,11 +280,17 @@ impl Reactor {
let source = Arc::new(Source {
raw,
key,
wakers: Mutex::new(Wakers {
tick_readable: 0,
tick_writable: 0,
readers: Vec::new(),
writers: Vec::new(),
state: Mutex::new(State {
read: Direction {
tick: 0,
waker: None,
wakers: Vec::new(),
},
write: Direction {
tick: 0,
waker: None,
wakers: Vec::new(),
},
}),
wakers_registered: AtomicU8::new(0),
});
@ -459,37 +465,36 @@ impl ReactorLock<'_> {
for ev in self.events.iter() {
// Check if there is a source in the table with this key.
if let Some(source) = sources.get(ev.key) {
let mut w = source.wakers.lock().unwrap();
// Wake readers if a readability event was emitted.
if ev.readable {
w.tick_readable = tick;
wakers.append(&mut w.readers);
source
.wakers_registered
.fetch_and(!READERS_REGISTERED, Ordering::SeqCst);
}
let mut state = source.state.lock().unwrap();
// Wake writers if a writability event was emitted.
if ev.writable {
w.tick_writable = tick;
wakers.append(&mut w.writers);
state.write.tick = tick;
state.write.drain_into(&mut wakers);
source
.wakers_registered
.fetch_and(!WRITERS_REGISTERED, Ordering::SeqCst);
}
// Re-register if there are still writers or
// readers. The can happen if e.g. we were
// previously interested in both readability and
// writability, but only one of them was emitted.
if !(w.writers.is_empty() && w.readers.is_empty()) {
// Wake readers if a readability event was emitted.
if ev.readable {
state.read.tick = tick;
state.read.drain_into(&mut wakers);
source
.wakers_registered
.fetch_and(!READERS_REGISTERED, Ordering::SeqCst);
}
// Re-register if there are still writers or readers. The can happen if
// e.g. we were previously interested in both readability and writability,
// but only one of them was emitted.
if !(state.write.is_empty() && state.read.is_empty()) {
self.reactor.poller.interest(
source.raw,
Event {
key: source.key,
readable: !w.readers.is_empty(),
writable: !w.writers.is_empty(),
readable: !state.read.is_empty(),
writable: !state.write.is_empty(),
},
)?;
}
@ -537,8 +542,8 @@ pub(crate) struct Source {
/// The key of this source obtained during registration.
key: usize,
/// Tasks interested in events on this source.
wakers: Mutex<Wakers>,
/// Inner state with registered wakers.
state: Mutex<State>,
/// Whether there are wakers interested in events on this source.
///
@ -549,48 +554,103 @@ pub(crate) struct Source {
const READERS_REGISTERED: u8 = 1 << 0;
const WRITERS_REGISTERED: u8 = 1 << 1;
/// Tasks interested in events on a source.
/// Inner state with registered wakers.
#[derive(Debug)]
struct Wakers {
/// Last reactor tick that delivered a readability event.
tick_readable: usize,
struct State {
/// State of the read direction.
read: Direction,
/// Last reactor tick that delivered a writability event.
tick_writable: usize,
/// State of the write direction.
write: Direction,
}
/// Tasks waiting for the next readability event.
readers: Vec<Waker>,
/// A read or write direction.
#[derive(Debug)]
struct Direction {
/// Last reactor tick that delivered an event.
tick: usize,
/// Tasks waiting for the next writability event.
writers: Vec<Waker>,
/// Waker stored by `AsyncRead` or `AsyncWrite`.
waker: Option<Waker>,
/// Wakers of tasks waiting for the next event.
wakers: Vec<Waker>,
}
impl Direction {
/// Returns `true` if there are no wakers interested in this direction.
fn is_empty(&self) -> bool {
self.waker.is_none() && self.wakers.is_empty()
}
fn drain_into(&mut self, dst: &mut Vec<Waker>) {
if let Some(w) = self.waker.take() {
dst.push(w);
}
dst.append(&mut self.wakers);
}
}
impl Source {
/// Registers a waker from `AsyncRead`.
///
/// If a different waker is already registered, it gets replaced and woken.
pub(crate) fn register_reader(&self, waker: &Waker) -> io::Result<()> {
let mut state = self.state.lock().unwrap();
// If there are no other readers, re-register in the reactor.
if state.read.is_empty() {
Reactor::get().poller.interest(
self.raw,
Event {
key: self.key,
readable: true,
writable: !state.write.is_empty(),
},
)?;
self.wakers_registered
.fetch_or(READERS_REGISTERED, Ordering::SeqCst);
}
if let Some(w) = state.read.waker.take() {
if w.will_wake(waker) {
state.read.waker = Some(w);
return Ok(());
}
// Don't let a panicking waker blow everything up.
panic::catch_unwind(|| w.wake()).ok();
}
state.read.waker = Some(waker.clone());
Ok(())
}
/// Waits until the I/O source is readable.
pub(crate) async fn readable(&self) -> io::Result<()> {
let mut ticks = None;
future::poll_fn(|cx| {
let mut w = self.wakers.lock().unwrap();
let mut state = self.state.lock().unwrap();
// Check if the reactor has delivered a readability event.
if let Some((a, b)) = ticks {
// If `tick_readable` has changed to a value other than the old reactor tick, that
// means a newer reactor tick has delivered a readability event.
if w.tick_readable != a && w.tick_readable != b {
// If `state.read.tick` has changed to a value other than the old reactor tick,
// that means a newer reactor tick has delivered a readability event.
if state.read.tick != a && state.read.tick != b {
log::trace!("readable: fd={}", self.raw);
return Poll::Ready(Ok(()));
}
}
// If there are no other readers, re-register in the reactor.
if w.readers.is_empty() {
if state.read.is_empty() {
Reactor::get().poller.interest(
self.raw,
Event {
key: self.key,
readable: true,
writable: !w.writers.is_empty(),
writable: !state.write.is_empty(),
},
)?;
self.wakers_registered
@ -598,19 +658,15 @@ impl Source {
}
// Register the current task's waker if not present already.
if w.readers.iter().all(|w| !w.will_wake(cx.waker())) {
w.readers.push(cx.waker().clone());
if limit_waker_list(&mut w.readers) {
self.wakers_registered
.fetch_and(!READERS_REGISTERED, Ordering::SeqCst);
}
if state.read.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
state.read.wakers.push(cx.waker().clone());
}
// Remember the current ticks.
if ticks.is_none() {
ticks = Some((
Reactor::get().ticker.load(Ordering::SeqCst),
w.tick_readable,
state.read.tick,
));
}
@ -624,30 +680,64 @@ impl Source {
self.wakers_registered.load(Ordering::SeqCst) & READERS_REGISTERED != 0
}
/// Registers a waker from `AsyncWrite`.
///
/// If a different waker is already registered, it gets replaced and woken.
pub(crate) fn register_writer(&self, waker: &Waker) -> io::Result<()> {
let mut state = self.state.lock().unwrap();
// If there are no other writers, re-register in the reactor.
if state.write.is_empty() {
Reactor::get().poller.interest(
self.raw,
Event {
key: self.key,
readable: !state.read.is_empty(),
writable: true,
},
)?;
self.wakers_registered
.fetch_or(WRITERS_REGISTERED, Ordering::SeqCst);
}
if let Some(w) = state.write.waker.take() {
if w.will_wake(waker) {
state.write.waker = Some(w);
return Ok(());
}
// Don't let a panicking waker blow everything up.
panic::catch_unwind(|| w.wake()).ok();
}
state.write.waker = Some(waker.clone());
Ok(())
}
/// Waits until the I/O source is writable.
pub(crate) async fn writable(&self) -> io::Result<()> {
let mut ticks = None;
future::poll_fn(|cx| {
let mut w = self.wakers.lock().unwrap();
let mut state = self.state.lock().unwrap();
// Check if the reactor has delivered a writability event.
if let Some((a, b)) = ticks {
// If `tick_writable` has changed to a value other than the old reactor tick, that
// means a newer reactor tick has delivered a writability event.
if w.tick_writable != a && w.tick_writable != b {
// If `state.write.tick` has changed to a value other than the old reactor tick,
// that means a newer reactor tick has delivered a writability event.
if state.write.tick != a && state.write.tick != b {
log::trace!("writable: fd={}", self.raw);
return Poll::Ready(Ok(()));
}
}
// If there are no other writers, re-register in the reactor.
if w.writers.is_empty() {
if state.write.is_empty() {
Reactor::get().poller.interest(
self.raw,
Event {
key: self.key,
readable: !w.readers.is_empty(),
readable: !state.read.is_empty(),
writable: true,
},
)?;
@ -656,19 +746,15 @@ impl Source {
}
// Register the current task's waker if not present already.
if w.writers.iter().all(|w| !w.will_wake(cx.waker())) {
w.writers.push(cx.waker().clone());
if limit_waker_list(&mut w.writers) {
self.wakers_registered
.fetch_and(!WRITERS_REGISTERED, Ordering::SeqCst);
}
if state.write.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
state.write.wakers.push(cx.waker().clone());
}
// Remember the current ticks.
if ticks.is_none() {
ticks = Some((
Reactor::get().ticker.load(Ordering::SeqCst),
w.tick_writable,
state.write.tick,
));
}
@ -683,32 +769,6 @@ impl Source {
}
}
/// Wakes up all wakers in the list if it grew too big and returns whether it did.
///
/// The waker list keeps growing in pathological cases where a single async I/O handle has lots of
/// different reader or writer tasks. If the number of interested wakers crosses some threshold, we
/// clear the list and wake all of them at once.
///
/// This strategy prevents memory leaks by bounding the number of stored wakers. However, since all
/// wakers get woken, tasks might simply re-register their interest again, thus creating an
/// infinite loop and burning CPU cycles forever.
///
/// However, we don't worry about such scenarios because it's very unlikely to have more than two
/// actually concurrent tasks operating on a single async I/O handle. If we happen to cross the
/// aforementioned threshold, we have bigger problems to worry about.
fn limit_waker_list(wakers: &mut Vec<Waker>) -> bool {
if wakers.len() > 50 {
log::trace!("limit_waker_list: clearing the list");
for waker in wakers.drain(..) {
// Don't let a panicking waker blow everything up.
panic::catch_unwind(|| waker.wake()).ok();
}
true
} else {
false
}
}
/// Runs a closure when dropped.
struct CallOnDrop<F: Fn()>(F);