This commit is contained in:
Stjepan Glavina 2020-09-27 21:55:12 +02:00
parent 671f0353c4
commit 06d015e9b2
2 changed files with 44 additions and 85 deletions

View File

@ -590,15 +590,12 @@ impl<T> Async<T> {
pub async fn read_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
let mut op = op;
loop {
// If there are no blocked readers, attempt the read operation.
if !self.source.readers_registered() {
// Yield with some small probability - this improves fairness.
future::poll_fn(|cx| maybe_yield(cx)).await;
// Yield with some small probability - this improves fairness.
future::poll_fn(|cx| maybe_yield(cx)).await;
match op(self.get_ref()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
match op(self.get_ref()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
// Wait until the I/O handle becomes readable.
@ -634,15 +631,12 @@ impl<T> Async<T> {
) -> io::Result<R> {
let mut op = op;
loop {
// If there are no blocked readers, attempt the read operation.
if !self.source.readers_registered() {
// Yield with some small probability - this improves fairness.
future::poll_fn(|cx| maybe_yield(cx)).await;
// Yield with some small probability - this improves fairness.
future::poll_fn(|cx| maybe_yield(cx)).await;
match op(self.get_mut()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
match op(self.get_mut()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
// Wait until the I/O handle becomes readable.
@ -676,15 +670,12 @@ impl<T> Async<T> {
pub async fn write_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
let mut op = op;
loop {
// If there are no blocked readers, attempt the write operation.
if !self.source.writers_registered() {
// Yield with some small probability - this improves fairness.
future::poll_fn(|cx| maybe_yield(cx)).await;
// Yield with some small probability - this improves fairness.
future::poll_fn(|cx| maybe_yield(cx)).await;
match op(self.get_ref()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
match op(self.get_ref()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
// Wait until the I/O handle becomes writable.
@ -721,15 +712,12 @@ impl<T> Async<T> {
) -> io::Result<R> {
let mut op = op;
loop {
// If there are no blocked readers, attempt the write operation.
if !self.source.writers_registered() {
// Yield with some small probability - this improves fairness.
future::poll_fn(|cx| maybe_yield(cx)).await;
// Yield with some small probability - this improves fairness.
future::poll_fn(|cx| maybe_yield(cx)).await;
match op(self.get_mut()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
match op(self.get_mut()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
// Wait until the I/O handle becomes writable.

View File

@ -7,7 +7,7 @@ use std::os::unix::io::RawFd;
#[cfg(windows)]
use std::os::windows::io::RawSocket;
use std::panic;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Context, Poll, Waker};
use std::thread;
@ -292,7 +292,6 @@ impl Reactor {
wakers: Vec::new(),
},
}),
wakers_registered: AtomicU8::new(0),
});
sources.insert(source.clone());
@ -471,18 +470,12 @@ impl ReactorLock<'_> {
if ev.writable {
state.write.tick = tick;
state.write.drain_into(&mut wakers);
source
.wakers_registered
.fetch_and(!WRITERS_REGISTERED, Ordering::SeqCst);
}
// Wake readers if a readability event was emitted.
if ev.readable {
state.read.tick = tick;
state.read.drain_into(&mut wakers);
source
.wakers_registered
.fetch_and(!READERS_REGISTERED, Ordering::SeqCst);
}
// Re-register if there are still writers or readers. The can happen if
@ -544,16 +537,8 @@ pub(crate) struct Source {
/// Inner state with registered wakers.
state: Mutex<State>,
/// Whether there are wakers interested in events on this source.
///
/// Hold two bits of information: `READERS_REGISTERED` and `WRITERS_REGISTERED`.
wakers_registered: AtomicU8,
}
const READERS_REGISTERED: u8 = 1 << 0;
const WRITERS_REGISTERED: u8 = 1 << 1;
/// Inner state with registered wakers.
#[derive(Debug)]
struct State {
@ -608,8 +593,6 @@ impl Source {
writable: !state.write.is_empty(),
},
)?;
self.wakers_registered
.fetch_or(READERS_REGISTERED, Ordering::SeqCst);
}
if let Some(w) = state.read.waker.take() {
@ -643,25 +626,25 @@ impl Source {
}
}
// If there are no other readers, re-register in the reactor.
if state.read.is_empty() {
Reactor::get().poller.interest(
self.raw,
Event {
key: self.key,
readable: true,
writable: !state.write.is_empty(),
},
)?;
self.wakers_registered
.fetch_or(READERS_REGISTERED, Ordering::SeqCst);
}
let is_empty = (state.read.is_empty(), state.write.is_empty());
// Register the current task's waker if not present already.
if state.read.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
state.read.wakers.push(cx.waker().clone());
}
// Update interest in this I/O handle.
if is_empty != (state.read.is_empty(), state.write.is_empty()) {
Reactor::get().poller.interest(
self.raw,
Event {
key: self.key,
readable: !state.read.is_empty(),
writable: !state.write.is_empty(),
},
)?;
}
// Remember the current ticks.
if ticks.is_none() {
ticks = Some((
@ -675,11 +658,6 @@ impl Source {
.await
}
/// Returns `true` if there is at least one registered reader.
pub(crate) fn readers_registered(&self) -> bool {
self.wakers_registered.load(Ordering::SeqCst) & READERS_REGISTERED != 0
}
/// Registers a waker from `AsyncWrite`.
///
/// If a different waker is already registered, it gets replaced and woken.
@ -696,8 +674,6 @@ impl Source {
writable: true,
},
)?;
self.wakers_registered
.fetch_or(WRITERS_REGISTERED, Ordering::SeqCst);
}
if let Some(w) = state.write.waker.take() {
@ -731,23 +707,23 @@ impl Source {
}
}
// If there are no other writers, re-register in the reactor.
if state.write.is_empty() {
let is_empty = (state.read.is_empty(), state.write.is_empty());
// Register the current task's waker if not present already.
if state.write.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
state.write.wakers.push(cx.waker().clone());
}
// Update interest in this I/O handle.
if is_empty != (state.read.is_empty(), state.write.is_empty()) {
Reactor::get().poller.interest(
self.raw,
Event {
key: self.key,
readable: !state.read.is_empty(),
writable: true,
writable: !state.write.is_empty(),
},
)?;
self.wakers_registered
.fetch_or(WRITERS_REGISTERED, Ordering::SeqCst);
}
// Register the current task's waker if not present already.
if state.write.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
state.write.wakers.push(cx.waker().clone());
}
// Remember the current ticks.
@ -762,11 +738,6 @@ impl Source {
})
.await
}
/// Returns `true` if there is at least one registered writer.
pub(crate) fn writers_registered(&self) -> bool {
self.wakers_registered.load(Ordering::SeqCst) & WRITERS_REGISTERED != 0
}
}
/// Runs a closure when dropped.