This commit is contained in:
Stjepan Glavina 2020-09-27 22:15:45 +02:00
parent 06d015e9b2
commit 86b340b50f
1 changed files with 59 additions and 139 deletions

View File

@ -20,6 +20,9 @@ use polling::{Event, Poller};
use vec_arena::Arena;
use waker_fn::waker_fn;
const READ: usize = 0;
const WRITE: usize = 1;
/// The reactor.
///
/// There is only one global instance of this type, accessible by [`Reactor::get()`].
@ -280,18 +283,7 @@ impl Reactor {
let source = Arc::new(Source {
raw,
key,
state: Mutex::new(State {
read: Direction {
tick: 0,
waker: None,
wakers: Vec::new(),
},
write: Direction {
tick: 0,
waker: None,
wakers: Vec::new(),
},
}),
state: Default::default(),
});
sources.insert(source.clone());
@ -466,28 +458,24 @@ impl ReactorLock<'_> {
if let Some(source) = sources.get(ev.key) {
let mut state = source.state.lock().unwrap();
// Wake writers if a writability event was emitted.
if ev.writable {
state.write.tick = tick;
state.write.drain_into(&mut wakers);
}
// Wake readers if a readability event was emitted.
if ev.readable {
state.read.tick = tick;
state.read.drain_into(&mut wakers);
// Collect wakers if a writability event was emitted.
for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
if emitted {
state[dir].tick = tick;
state[dir].drain_into(&mut wakers);
}
}
// Re-register if there are still writers or readers. The can happen if
// e.g. we were previously interested in both readability and writability,
// but only one of them was emitted.
if !(state.write.is_empty() && state.read.is_empty()) {
if !state[READ].is_empty() || !state[WRITE].is_empty() {
self.reactor.poller.interest(
source.raw,
Event {
key: source.key,
readable: !state.read.is_empty(),
writable: !state.write.is_empty(),
readable: !state[READ].is_empty(),
writable: !state[WRITE].is_empty(),
},
)?;
}
@ -536,21 +524,11 @@ pub(crate) struct Source {
key: usize,
/// Inner state with registered wakers.
state: Mutex<State>,
}
/// Inner state with registered wakers.
#[derive(Debug)]
struct State {
/// State of the read direction.
read: Direction,
/// State of the write direction.
write: Direction,
state: Mutex<[Direction; 2]>,
}
/// A read or write direction.
#[derive(Debug)]
#[derive(Debug, Default)]
struct Direction {
/// Last reactor tick that delivered an event.
tick: usize,
@ -577,40 +555,53 @@ impl Direction {
}
impl Source {
/// Registers a waker from `AsyncRead`.
/// Registers a waker from `AsyncRead` or `AsyncWrite`.
///
/// If a different waker is already registered, it gets replaced and woken.
pub(crate) fn register_reader(&self, waker: &Waker) -> io::Result<()> {
fn register(&self, dir: usize, waker: &Waker) -> io::Result<()> {
let mut state = self.state.lock().unwrap();
let is_empty = (state[READ].is_empty(), state[WRITE].is_empty());
// If there are no other readers, re-register in the reactor.
if state.read.is_empty() {
if let Some(w) = state[dir].waker.take() {
if w.will_wake(waker) {
state[dir].waker = Some(w);
return Ok(());
}
panic::catch_unwind(|| w.wake()).ok();
}
state[dir].waker = Some(waker.clone());
// Update interest in this I/O handle.
if is_empty != (state[READ].is_empty(), state[WRITE].is_empty()) {
Reactor::get().poller.interest(
self.raw,
Event {
key: self.key,
readable: true,
writable: !state.write.is_empty(),
readable: !state[READ].is_empty(),
writable: !state[WRITE].is_empty(),
},
)?;
}
if let Some(w) = state.read.waker.take() {
if w.will_wake(waker) {
state.read.waker = Some(w);
return Ok(());
}
// Don't let a panicking waker blow everything up.
panic::catch_unwind(|| w.wake()).ok();
}
state.read.waker = Some(waker.clone());
Ok(())
}
/// Waits until the I/O source is readable.
pub(crate) async fn readable(&self) -> io::Result<()> {
/// Registers a waker from `AsyncRead`.
///
/// If a different waker is already registered, it gets replaced and woken.
pub(crate) fn register_reader(&self, waker: &Waker) -> io::Result<()> {
self.register(READ, waker)
}
/// Registers a waker from `AsyncWrite`.
///
/// If a different waker is already registered, it gets replaced and woken.
pub(crate) fn register_writer(&self, waker: &Waker) -> io::Result<()> {
self.register(WRITE, waker)
}
/// Waits until the I/O source is readable or writable.
async fn ready(&self, dir: usize) -> io::Result<()> {
let mut ticks = None;
future::poll_fn(|cx| {
@ -620,27 +611,27 @@ impl Source {
if let Some((a, b)) = ticks {
// If `state.read.tick` has changed to a value other than the old reactor tick,
// that means a newer reactor tick has delivered a readability event.
if state.read.tick != a && state.read.tick != b {
if state[dir].tick != a && state[dir].tick != b {
log::trace!("readable: fd={}", self.raw);
return Poll::Ready(Ok(()));
}
}
let is_empty = (state.read.is_empty(), state.write.is_empty());
let is_empty = (state[READ].is_empty(), state[WRITE].is_empty());
// Register the current task's waker if not present already.
if state.read.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
state.read.wakers.push(cx.waker().clone());
if state[dir].wakers.iter().all(|w| !w.will_wake(cx.waker())) {
state[dir].wakers.push(cx.waker().clone());
}
// Update interest in this I/O handle.
if is_empty != (state.read.is_empty(), state.write.is_empty()) {
if is_empty != (state[READ].is_empty(), state[WRITE].is_empty()) {
Reactor::get().poller.interest(
self.raw,
Event {
key: self.key,
readable: !state.read.is_empty(),
writable: !state.write.is_empty(),
readable: !state[READ].is_empty(),
writable: !state[WRITE].is_empty(),
},
)?;
}
@ -649,7 +640,7 @@ impl Source {
if ticks.is_none() {
ticks = Some((
Reactor::get().ticker.load(Ordering::SeqCst),
state.read.tick,
state[READ].tick,
));
}
@ -658,85 +649,14 @@ impl Source {
.await
}
/// Registers a waker from `AsyncWrite`.
///
/// If a different waker is already registered, it gets replaced and woken.
pub(crate) fn register_writer(&self, waker: &Waker) -> io::Result<()> {
let mut state = self.state.lock().unwrap();
// If there are no other writers, re-register in the reactor.
if state.write.is_empty() {
Reactor::get().poller.interest(
self.raw,
Event {
key: self.key,
readable: !state.read.is_empty(),
writable: true,
},
)?;
}
if let Some(w) = state.write.waker.take() {
if w.will_wake(waker) {
state.write.waker = Some(w);
return Ok(());
}
// Don't let a panicking waker blow everything up.
panic::catch_unwind(|| w.wake()).ok();
}
state.write.waker = Some(waker.clone());
Ok(())
/// Waits until the I/O source is readable.
pub(crate) async fn readable(&self) -> io::Result<()> {
self.ready(READ).await
}
/// Waits until the I/O source is writable.
pub(crate) async fn writable(&self) -> io::Result<()> {
let mut ticks = None;
future::poll_fn(|cx| {
let mut state = self.state.lock().unwrap();
// Check if the reactor has delivered a writability event.
if let Some((a, b)) = ticks {
// If `state.write.tick` has changed to a value other than the old reactor tick,
// that means a newer reactor tick has delivered a writability event.
if state.write.tick != a && state.write.tick != b {
log::trace!("writable: fd={}", self.raw);
return Poll::Ready(Ok(()));
}
}
let is_empty = (state.read.is_empty(), state.write.is_empty());
// Register the current task's waker if not present already.
if state.write.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
state.write.wakers.push(cx.waker().clone());
}
// Update interest in this I/O handle.
if is_empty != (state.read.is_empty(), state.write.is_empty()) {
Reactor::get().poller.interest(
self.raw,
Event {
key: self.key,
readable: !state.read.is_empty(),
writable: !state.write.is_empty(),
},
)?;
}
// Remember the current ticks.
if ticks.is_none() {
ticks = Some((
Reactor::get().ticker.load(Ordering::SeqCst),
state.write.tick,
));
}
Poll::Pending
})
.await
self.ready(WRITE).await
}
}