Fix errors

This commit is contained in:
Stjepan Glavina 2020-05-21 23:47:17 +02:00
parent 6d4bffa124
commit 26a3208342
1 changed files with 68 additions and 84 deletions

View File

@ -25,7 +25,7 @@ use std::mem;
use std::os::unix::io::RawFd;
#[cfg(windows)]
use std::os::windows::io::{FromRawSocket, RawSocket};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Poll, Waker};
use std::time::{Duration, Instant};
@ -51,8 +51,11 @@ pub(crate) struct Reactor {
/// Raw bindings to epoll/kqueue/wepoll.
sys: sys::Reactor,
/// Registered I/O sources.
sources: piper::Mutex<Sources>,
/// Ticker bumped when polling or registering a source.
ticker: AtomicU64,
/// Registered sources.
sources: piper::Mutex<Slab<Arc<Source>>>,
/// Temporary storage for I/O events when polling the reactor.
events: piper::Lock<sys::Events>,
@ -77,27 +80,13 @@ pub(crate) struct Reactor {
timer_event: Lazy<IoEvent>,
}
/// Registered I/O sources.
struct Sources {
/// Incremented on registration.
///
/// This is used to tag I/O handles in order to distinguish between new and old sources that
/// use the same index into the table.
tick: u32,
/// The table of registered I/O sources.
table: Slab<Arc<Source>>,
}
impl Reactor {
/// Returns a reference to the reactor.
pub fn get() -> &'static Reactor {
static REACTOR: Lazy<Reactor> = Lazy::new(|| Reactor {
sys: sys::Reactor::new().expect("cannot initialize I/O event notification"),
sources: piper::Mutex::new(Sources {
tick: 0,
table: Slab::new(),
}),
ticker: AtomicU64::new(0),
sources: piper::Mutex::new(Slab::new()),
events: piper::Lock::new(sys::Events::new()),
timers: piper::Mutex::new(BTreeMap::new()),
timer_ops: ArrayQueue::new(1000),
@ -113,23 +102,7 @@ impl Reactor {
#[cfg(windows)] raw: RawSocket,
) -> io::Result<Arc<Source>> {
let mut sources = self.sources.lock();
// Bump the ticker.
let tick = sources.tick;
sources.tick = tick.wrapping_add(1);
// Find a vacant entry in the table.
let vacant = sources.table.vacant_entry();
let index = vacant.key();
assert!(
index < u32::max_value() as usize,
"too many registered I/O handles"
);
// The key identifying this I/O source.
//
// Lower 32 bits contain the index into the table and the upper 32 bits contain the tag.
let key = index as u64 | ((tick as u64) << 32);
let vacant = sources.vacant_entry();
// Put the I/O handle in non-blocking mode.
#[cfg(unix)]
@ -145,29 +118,27 @@ impl Reactor {
}
// Create a source and register it.
let key = vacant.key();
self.sys.register(raw, key)?;
let source = Arc::new(Source {
raw,
key,
reg_tick: self.ticker.fetch_add(1, Ordering::SeqCst),
wakers: piper::Mutex::new(Wakers {
tick_readable: 0,
tick_writable: 0,
ticker_readable: 0,
ticker_writable: 0,
readers: Vec::new(),
writers: Vec::new(),
}),
});
self.sys.register(raw, source.key)?;
Ok(vacant.insert(source).clone())
}
/// Deregisters an I/O source from the reactor.
pub fn remove_io(&self, source: &Source) -> io::Result<()> {
let mut sources = self.sources.lock();
// Lower 32 bits of the key contain the index into the table.
let index = source.key as u32 as usize;
sources.table.remove(index);
sources.remove(source.key);
self.sys.deregister(source.raw)
}
@ -297,6 +268,9 @@ impl ReactorLock<'_> {
};
loop {
// Bump the ticker before polling I/O.
let tick = self.reactor.ticker.fetch_add(1, Ordering::SeqCst);
// Block on I/O events.
match self.reactor.sys.wait(&mut self.events, timeout) {
// The timeout was hit so fire ready timers.
@ -311,19 +285,15 @@ impl ReactorLock<'_> {
let sources = self.reactor.sources.lock();
for ev in self.events.iter() {
// Lower 32 bits of the key contain the index into the table.
let index = ev.key as u32 as usize;
// Check if there is a source in the table at this index.
if let Some(source) = sources.table.get(index) {
// Check if the key matches. If it doesn't that means we got a stale
// event for an old source that was previously stored at this index.
if source.key == ev.key {
let mut wakers = source.wakers.lock();
// Check if there is a source in the table with this key.
if let Some(source) = sources.get(ev.key) {
let mut wakers = source.wakers.lock();
// Check if the source was registered before polling.
if source.reg_tick < tick {
// Wake readers if a readability event was emitted.
if ev.readable {
wakers.tick_readable += 1;
wakers.ticker_readable += 1;
for w in wakers.readers.drain(..) {
w.wake();
}
@ -331,11 +301,25 @@ impl ReactorLock<'_> {
// Wake writers if a writability event was emitted.
if ev.writable {
wakers.tick_writable += 1;
wakers.ticker_writable += 1;
for w in wakers.writers.drain(..) {
w.wake();
}
}
} else {
// If the source was potentially registered during or after
// polling, we can't be sure whether this event belongs to the
// source currently registered at this index or the source that was
// previously at this index.
//
// Just to be on the safe side, let's reregister and wait for the
// next round of events.
self.reactor.sys.reregister(
source.raw,
source.key,
!wakers.readers.is_empty(),
!wakers.writers.is_empty(),
)?;
}
}
}
@ -370,11 +354,11 @@ pub(crate) struct Source {
#[cfg(windows)]
pub(crate) raw: RawSocket,
/// The ID of this source obtained during registration.
///
/// Lower 32 bits contain the index into the table of sources. Upper 32 bits is just an extra
/// tag to distinguish between new and old sources that use the same index.
key: u64,
/// The reactor tick when this source was registered.
reg_tick: u64,
/// The key of this source obtained during registration.
key: usize,
/// Tasks interested in events on this source.
wakers: piper::Mutex<Wakers>,
@ -384,10 +368,10 @@ pub(crate) struct Source {
#[derive(Debug)]
struct Wakers {
/// Number of delivered readability events.
tick_readable: u64,
ticker_readable: u64,
/// Number of delivered writability events.
tick_writable: u64,
ticker_writable: u64,
/// Tasks waiting for the next readability event.
readers: Vec<Waker>,
@ -430,11 +414,11 @@ impl Source {
}
// Record the readability tick to wait for.
tick = Some(wakers.tick_readable + 1);
tick = Some(wakers.ticker_readable + 1);
Poll::Pending
}
Some(tick) => {
if self.wakers.lock().tick_readable < tick {
if self.wakers.lock().ticker_readable < tick {
Poll::Pending
} else {
Poll::Ready(Ok(()))
@ -468,11 +452,11 @@ impl Source {
}
// Record the writability tick to wait for.
tick = Some(wakers.tick_writable + 1);
tick = Some(wakers.ticker_writable + 1);
Poll::Pending
}
Some(tick) => {
if self.wakers.lock().tick_writable < tick {
if self.wakers.lock().ticker_writable < tick {
Poll::Pending
} else {
Poll::Ready(Ok(()))
@ -512,11 +496,11 @@ mod sys {
let epoll_fd = epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC).map_err(io_err)?;
Ok(Reactor(epoll_fd))
}
pub fn register(&self, fd: RawFd, key: u64) -> io::Result<()> {
let ev = &mut EpollEvent::new(EpollFlags::empty(), key);
pub fn register(&self, fd: RawFd, key: usize) -> io::Result<()> {
let ev = &mut EpollEvent::new(EpollFlags::empty(), key as u64);
epoll_ctl(self.0, EpollOp::EpollCtlAdd, fd, Some(ev)).map_err(io_err)
}
pub fn reregister(&self, fd: RawFd, key: u64, read: bool, write: bool) -> io::Result<()> {
pub fn reregister(&self, fd: RawFd, key: usize, read: bool, write: bool) -> io::Result<()> {
let mut flags = EpollFlags::EPOLLONESHOT;
if read {
flags |= read_flags();
@ -524,7 +508,7 @@ mod sys {
if write {
flags |= write_flags();
}
let ev = &mut EpollEvent::new(flags, key);
let ev = &mut EpollEvent::new(flags, key as u64);
epoll_ctl(self.0, EpollOp::EpollCtlMod, fd, Some(ev)).map_err(io_err)
}
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
@ -566,14 +550,14 @@ mod sys {
self.list[..self.len].iter().map(|ev| Event {
readable: ev.events().intersects(read_flags()),
writable: ev.events().intersects(write_flags()),
key: ev.data(),
key: ev.data() as usize,
})
}
}
pub struct Event {
pub readable: bool,
pub writable: bool,
pub key: u64,
pub key: usize,
}
}
@ -605,10 +589,10 @@ mod sys {
fcntl(fd, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)).map_err(io_err)?;
Ok(Reactor(fd))
}
pub fn register(&self, fd: RawFd, key: u64) -> io::Result<()> {
pub fn register(&self, _fd: RawFd, _key: usize) -> io::Result<()> {
Ok(())
}
pub fn reregister(&self, fd: RawFd, key: u64, read: bool, write: bool) -> io::Result<()> {
pub fn reregister(&self, fd: RawFd, key: usize, read: bool, write: bool) -> io::Result<()> {
let mut read_flags = EventFlag::EV_ONESHOT | EventFlag::EV_RECEIPT;
let mut write_flags = EventFlag::EV_ONESHOT | EventFlag::EV_RECEIPT;
if read {
@ -621,7 +605,7 @@ mod sys {
} else {
write_flags |= EventFlag::EV_DELETE;
}
let udata = key;
let udata = key as _;
let changelist = [
KEvent::new(
fd as _,
@ -698,14 +682,14 @@ mod sys {
self.list[..self.len].iter().map(|ev| Event {
readable: ev.filter() != EventFilter::EVFILT_WRITE,
writable: ev.filter() != EventFilter::EVFILT_READ,
key: ev.data(),
key: ev.data() as usize,
})
}
}
pub struct Event {
pub readable: bool,
pub writable: bool,
pub key: u64,
pub key: usize,
}
}
@ -723,13 +707,13 @@ mod sys {
pub fn new() -> io::Result<Reactor> {
Ok(Reactor(Epoll::new()?))
}
pub fn register(&self, sock: RawSocket, key: u64) -> io::Result<()> {
self.0.register(&As(sock), EventFlag::empty(), key);
pub fn register(&self, sock: RawSocket, key: usize) -> io::Result<()> {
self.0.register(&As(sock), EventFlag::empty(), key as u64)
}
pub fn reregister(
&self,
sock: RawSocket,
key: u64,
key: usize,
read: bool,
write: bool,
) -> io::Result<()> {
@ -740,7 +724,7 @@ mod sys {
if write {
flags |= write_flags();
}
self.0.reregister(&As(sock), flags, key)
self.0.reregister(&As(sock), flags, key as u64)
}
pub fn deregister(&self, sock: RawSocket) -> io::Result<()> {
self.0.deregister(&As(sock))
@ -779,13 +763,13 @@ mod sys {
self.0.iter().map(|ev| Event {
readable: ev.flags().intersects(read_flags()),
writable: ev.flags().intersects(write_flags()),
index: ev.data(),
key: ev.data() as usize,
})
}
}
pub struct Event {
pub readable: bool,
pub writable: bool,
pub key: u64,
pub key: usize,
}
}