Change AtomicU64 to AtomicUsize

This commit is contained in:
Stjepan Glavina 2020-06-21 14:17:26 +02:00
parent 84ba785ccf
commit e260c27734
1 changed files with 27 additions and 17 deletions

View File

@ -25,7 +25,7 @@ use std::mem;
use std::os::unix::io::RawFd;
#[cfg(windows)]
use std::os::windows::io::{FromRawSocket, RawSocket};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Poll, Waker};
use std::time::{Duration, Instant};
@ -53,7 +53,7 @@ pub(crate) struct Reactor {
sys: sys::Reactor,
/// Ticker bumped before polling.
ticker: AtomicU64,
ticker: AtomicUsize,
/// Registered sources.
sources: piper::Mutex<Slab<Arc<Source>>>,
@ -86,7 +86,7 @@ impl Reactor {
pub fn get() -> &'static Reactor {
static REACTOR: Lazy<Reactor> = Lazy::new(|| Reactor {
sys: sys::Reactor::new().expect("cannot initialize I/O event notification"),
ticker: AtomicU64::new(0),
ticker: AtomicUsize::new(0),
sources: piper::Mutex::new(Slab::new()),
events: piper::Mutex::new(sys::Events::new()),
timers: piper::Mutex::new(BTreeMap::new()),
@ -359,10 +359,10 @@ pub(crate) struct Source {
#[derive(Debug)]
struct Wakers {
/// Last reactor tick that delivered a readability event.
tick_readable: u64,
tick_readable: usize,
/// Last reactor tick that delivered a writability event.
tick_writable: u64,
tick_writable: usize,
/// Tasks waiting for the next readability event.
readers: Vec<Waker>,
@ -383,14 +383,16 @@ impl Source {
/// Waits until the I/O source is readable.
pub(crate) async fn readable(&self) -> io::Result<()> {
let mut tick = None;
let mut ticks = None;
future::poll_fn(|cx| {
let mut wakers = self.wakers.lock();
// Check if the reactor has delivered a readability event.
if let Some(tick) = tick {
if wakers.tick_readable > tick {
if let Some((a, b)) = ticks {
// If `tick_readable` has changed to a value other than the old reactor tick, that
// means a newer reactor tick has delivered a readability event.
if wakers.tick_readable != a && wakers.tick_readable != b {
return Poll::Ready(Ok(()));
}
}
@ -410,9 +412,12 @@ impl Source {
wakers.readers.push(cx.waker().clone());
}
// Remember the current reactor tick.
if tick.is_none() {
tick = Some(Reactor::get().ticker.load(Ordering::SeqCst));
// Remember the current ticks.
if ticks.is_none() {
ticks = Some((
Reactor::get().ticker.load(Ordering::SeqCst),
wakers.tick_readable,
));
}
Poll::Pending
@ -422,14 +427,16 @@ impl Source {
/// Waits until the I/O source is writable.
pub(crate) async fn writable(&self) -> io::Result<()> {
let mut tick = None;
let mut ticks = None;
future::poll_fn(|cx| {
let mut wakers = self.wakers.lock();
// Check if the reactor has delivered a writability event.
if let Some(tick) = tick {
if wakers.tick_writable > tick {
if let Some((a, b)) = ticks {
// If `tick_writable` has changed to a value other than the old reactor tick, that
// means a newer reactor tick has delivered a writability event.
if wakers.tick_writable != a && wakers.tick_writable != b {
return Poll::Ready(Ok(()));
}
}
@ -449,9 +456,12 @@ impl Source {
wakers.writers.push(cx.waker().clone());
}
// Remember the current reactor tick.
if tick.is_none() {
tick = Some(Reactor::get().ticker.load(Ordering::SeqCst));
// Remember the current ticks.
if ticks.is_none() {
ticks = Some((
Reactor::get().ticker.load(Ordering::SeqCst),
wakers.tick_writable,
));
}
Poll::Pending