diff --git a/src/reactor.rs b/src/reactor.rs index edd5166..26e4a62 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -25,7 +25,7 @@ use std::mem; use std::os::unix::io::RawFd; #[cfg(windows)] use std::os::windows::io::{FromRawSocket, RawSocket}; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Poll, Waker}; use std::time::{Duration, Instant}; @@ -53,7 +53,7 @@ pub(crate) struct Reactor { sys: sys::Reactor, /// Ticker bumped before polling. - ticker: AtomicU64, + ticker: AtomicUsize, /// Registered sources. sources: piper::Mutex>>, @@ -86,7 +86,7 @@ impl Reactor { pub fn get() -> &'static Reactor { static REACTOR: Lazy = Lazy::new(|| Reactor { sys: sys::Reactor::new().expect("cannot initialize I/O event notification"), - ticker: AtomicU64::new(0), + ticker: AtomicUsize::new(0), sources: piper::Mutex::new(Slab::new()), events: piper::Mutex::new(sys::Events::new()), timers: piper::Mutex::new(BTreeMap::new()), @@ -359,10 +359,10 @@ pub(crate) struct Source { #[derive(Debug)] struct Wakers { /// Last reactor tick that delivered a readability event. - tick_readable: u64, + tick_readable: usize, /// Last reactor tick that delivered a writability event. - tick_writable: u64, + tick_writable: usize, /// Tasks waiting for the next readability event. readers: Vec, @@ -383,14 +383,16 @@ impl Source { /// Waits until the I/O source is readable. pub(crate) async fn readable(&self) -> io::Result<()> { - let mut tick = None; + let mut ticks = None; future::poll_fn(|cx| { let mut wakers = self.wakers.lock(); // Check if the reactor has delivered a readability event. - if let Some(tick) = tick { - if wakers.tick_readable > tick { + if let Some((a, b)) = ticks { + // If `tick_readable` has changed to a value other than the old reactor tick, that + // means a newer reactor tick has delivered a readability event. + if wakers.tick_readable != a && wakers.tick_readable != b { return Poll::Ready(Ok(())); } } @@ -410,9 +412,12 @@ impl Source { wakers.readers.push(cx.waker().clone()); } - // Remember the current reactor tick. - if tick.is_none() { - tick = Some(Reactor::get().ticker.load(Ordering::SeqCst)); + // Remember the current ticks. + if ticks.is_none() { + ticks = Some(( + Reactor::get().ticker.load(Ordering::SeqCst), + wakers.tick_readable, + )); } Poll::Pending @@ -422,14 +427,16 @@ impl Source { /// Waits until the I/O source is writable. pub(crate) async fn writable(&self) -> io::Result<()> { - let mut tick = None; + let mut ticks = None; future::poll_fn(|cx| { let mut wakers = self.wakers.lock(); // Check if the reactor has delivered a writability event. - if let Some(tick) = tick { - if wakers.tick_writable > tick { + if let Some((a, b)) = ticks { + // If `tick_writable` has changed to a value other than the old reactor tick, that + // means a newer reactor tick has delivered a writability event. + if wakers.tick_writable != a && wakers.tick_writable != b { return Poll::Ready(Ok(())); } } @@ -449,9 +456,12 @@ impl Source { wakers.writers.push(cx.waker().clone()); } - // Remember the current reactor tick. - if tick.is_none() { - tick = Some(Reactor::get().ticker.load(Ordering::SeqCst)); + // Remember the current ticks. + if ticks.is_none() { + ticks = Some(( + Reactor::get().ticker.load(Ordering::SeqCst), + wakers.tick_writable, + )); } Poll::Pending