Simplify reactor notification

This commit is contained in:
Stjepan Glavina 2020-06-21 14:48:36 +02:00
parent e260c27734
commit 42bf604d2f
2 changed files with 68 additions and 40 deletions

View File

@ -7,7 +7,6 @@ use std::time::{Duration, Instant};
use once_cell::sync::Lazy;
use slab::Slab;
use crate::io_event::IoEvent;
use crate::reactor::Reactor;
static REGISTRY: Lazy<Mutex<Slab<Unparker>>> = Lazy::new(|| Mutex::new(Slab::new()));
@ -132,8 +131,6 @@ const PARKED: usize = 1;
const POLLING: usize = 2;
const NOTIFIED: usize = 3;
static EVENT: Lazy<IoEvent> = Lazy::new(|| IoEvent::new().unwrap());
struct Inner {
state: AtomicUsize,
lock: Mutex<()>,
@ -153,7 +150,6 @@ impl Inner {
reactor_lock
.react(Some(Duration::from_secs(0)))
.expect("failure while polling I/O");
EVENT.clear();
}
return true;
}
@ -166,7 +162,6 @@ impl Inner {
reactor_lock
.react(Some(Duration::from_secs(0)))
.expect("failure while polling I/O");
EVENT.clear();
}
return false;
}
@ -206,7 +201,6 @@ impl Inner {
drop(m);
reactor_lock.react(None).expect("failure while polling I/O");
EVENT.clear();
m = self.lock.lock().unwrap();
}
@ -231,7 +225,6 @@ impl Inner {
reactor_lock
.react(Some(deadline.saturating_duration_since(Instant::now())))
.expect("failure while polling I/O");
EVENT.clear();
if Instant::now() >= deadline {
break;
@ -274,7 +267,7 @@ impl Inner {
if state == PARKED {
self.cvar.notify_one();
} else {
EVENT.notify();
Reactor::get().notify();
}
}
}

View File

@ -40,8 +40,6 @@ use socket2::Socket;
#[cfg(unix)]
use crate::sys::fcntl::{fcntl, FcntlArg};
use crate::io_event::IoEvent;
/// The reactor.
///
/// Every async I/O handle and every timer is registered here. Invocations of
@ -73,12 +71,6 @@ pub(crate) struct Reactor {
/// When inserting or removing a timer, we don't process it immediately - we just push it into
/// this queue. Timers actually get processed when the queue fills up or the reactor is polled.
timer_ops: ConcurrentQueue<TimerOp>,
/// An I/O event that is triggered when a new timer is registered.
///
/// The reason why this field is lazily created is because `IoEvent`s can be created only after
/// the reactor is fully initialized.
timer_event: Lazy<IoEvent>,
}
impl Reactor {
@ -91,11 +83,15 @@ impl Reactor {
events: piper::Mutex::new(sys::Events::new()),
timers: piper::Mutex::new(BTreeMap::new()),
timer_ops: ConcurrentQueue::bounded(1000),
timer_event: Lazy::new(|| IoEvent::new().expect("cannot create an `IoEvent`")),
});
&REACTOR
}
/// Notifies the thread blocked on the reactor.
pub fn notify(&self) {
self.sys.notify().expect("failed to notify reactor");
}
/// Registers an I/O source in the reactor.
pub fn insert_io(
&self,
@ -161,7 +157,7 @@ impl Reactor {
}
// Notify that a timer was added.
self.timer_event.notify();
self.notify();
id
}
@ -187,9 +183,6 @@ impl Reactor {
///
/// Returns the duration until the next timer before this method was called.
fn fire_timers(&self) -> Option<Duration> {
// Clear this event because we're about to fire timers.
self.timer_event.clear();
let mut timers = self.timers.lock();
// Process timer operations, but no more than the queue capacity because otherwise we could
@ -478,19 +471,26 @@ mod sys {
use std::os::unix::io::RawFd;
use std::time::Duration;
use once_cell::sync::Lazy;
use crate::io_event::IoEvent;
use crate::sys::epoll::{
epoll_create1, epoll_ctl, epoll_wait, EpollEvent, EpollFlags, EpollOp,
};
pub struct Reactor(RawFd);
pub struct Reactor {
epoll_fd: RawFd,
io_event: Lazy<IoEvent>,
}
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let epoll_fd = epoll_create1()?;
Ok(Reactor(epoll_fd))
let io_event = Lazy::<IoEvent>::new(|| IoEvent::new().unwrap());
Ok(Reactor { epoll_fd })
}
pub fn register(&self, fd: RawFd, key: usize) -> io::Result<()> {
let ev = &mut EpollEvent::new(0, key as u64);
epoll_ctl(self.0, EpollOp::EpollCtlAdd, fd, Some(ev))
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlAdd, fd, Some(ev))
}
pub fn reregister(&self, fd: RawFd, key: usize, read: bool, write: bool) -> io::Result<()> {
let mut flags = libc::EPOLLONESHOT;
@ -501,10 +501,10 @@ mod sys {
flags |= write_flags();
}
let ev = &mut EpollEvent::new(flags, key as u64);
epoll_ctl(self.0, EpollOp::EpollCtlMod, fd, Some(ev))
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlMod, fd, Some(ev))
}
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
epoll_ctl(self.0, EpollOp::EpollCtlDel, fd, None)
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlDel, fd, None)
}
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout_ms = timeout
@ -517,9 +517,14 @@ mod sys {
})
.and_then(|t| t.as_millis().try_into().ok())
.unwrap_or(-1);
events.len = epoll_wait(self.0, &mut events.list, timeout_ms)?;
events.len = epoll_wait(self.epoll_fd, &mut events.list, timeout_ms)?;
self.io_event.clear();
Ok(events.len)
}
pub fn notify(&self) -> io::Result<()> {
self.io_event.notify();
Ok(())
}
}
fn read_flags() -> EpollFlags {
libc::EPOLLIN | libc::EPOLLRDHUP | libc::EPOLLHUP | libc::EPOLLERR | libc::EPOLLPRI
@ -567,15 +572,25 @@ mod sys {
use std::os::unix::io::RawFd;
use std::time::Duration;
use once_cell::sync::Lazy;
use crate::io_event::IoEvent;
use crate::sys::event::{kevent_ts, kqueue, KEvent};
use crate::sys::fcntl::{fcntl, FcntlArg};
pub struct Reactor(RawFd);
pub struct Reactor {
kqueue_fd: RawFd,
io_event: Lazy<IoEvent>,
}
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let fd = kqueue()?;
fcntl(fd, FcntlArg::F_SETFD(libc::FD_CLOEXEC))?;
Ok(Reactor(fd))
let kqueue_fd = kqueue()?;
fcntl(kqueue_fd, FcntlArg::F_SETFD(libc::FD_CLOEXEC))?;
let io_event = Lazy::<IoEvent>::new(|| IoEvent::new().unwrap());
Ok(Reactor {
kqueue_fd,
io_event,
})
}
pub fn register(&self, _fd: RawFd, _key: usize) -> io::Result<()> {
Ok(())
@ -599,7 +614,7 @@ mod sys {
KEvent::new(fd as _, libc::EVFILT_WRITE, write_flags, 0, 0, udata),
];
let mut eventlist = changelist;
kevent_ts(self.0, &changelist, &mut eventlist, None)?;
kevent_ts(self.kqueue_fd, &changelist, &mut eventlist, None)?;
for ev in &eventlist {
// Explanation for ignoring EPIPE: https://github.com/tokio-rs/mio/issues/582
let (flags, data) = (ev.flags(), ev.data());
@ -620,7 +635,7 @@ mod sys {
KEvent::new(fd as _, libc::EVFILT_READ, flags, 0, 0, 0),
];
let mut eventlist = changelist;
kevent_ts(self.0, &changelist, &mut eventlist, None)?;
kevent_ts(self.kqueue_fd, &changelist, &mut eventlist, None)?;
for ev in &eventlist {
let (flags, data) = (ev.flags(), ev.data());
if (flags & libc::EV_ERROR == 1) && data != 0 && data != libc::ENOENT as _ {
@ -634,9 +649,14 @@ mod sys {
tv_sec: t.as_secs() as libc::time_t,
tv_nsec: t.subsec_nanos() as libc::c_long,
});
events.len = kevent_ts(self.0, &[], &mut events.list, timeout)?;
events.len = kevent_ts(self.kqueue_fd, &[], &mut events.list, timeout)?;
self.io_event.clear();
Ok(events.len)
}
pub fn notify(&self) -> io::Result<()> {
self.io_event.notify();
Ok(())
}
}
pub struct Events {
@ -678,15 +698,24 @@ mod sys {
use std::os::windows::io::{AsRawSocket, RawSocket};
use std::time::Duration;
use once_cell::sync::Lazy;
use wepoll_binding::{Epoll, EventFlag};
pub struct Reactor(Epoll);
use crate::io_event::IoEvent;
pub struct Reactor {
epoll: Epoll,
io_event: Lazy<IoEvent>,
}
impl Reactor {
pub fn new() -> io::Result<Reactor> {
Ok(Reactor(Epoll::new()?))
let epoll = Epoll::new()?;
let io_event = Lazy::<IoEvent>::new(|| IoEvent::new().unwrap());
Ok(Reactor { epoll, io_event })
}
pub fn register(&self, sock: RawSocket, key: usize) -> io::Result<()> {
self.0.register(&As(sock), EventFlag::empty(), key as u64)
self.epoll
.register(&As(sock), EventFlag::empty(), key as u64)
}
pub fn reregister(
&self,
@ -702,10 +731,10 @@ mod sys {
if write {
flags |= write_flags();
}
self.0.reregister(&As(sock), flags, key as u64)
self.epoll.reregister(&As(sock), flags, key as u64)
}
pub fn deregister(&self, sock: RawSocket) -> io::Result<()> {
self.0.deregister(&As(sock))
self.epoll.deregister(&As(sock))
}
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout = timeout.map(|t| {
@ -716,7 +745,13 @@ mod sys {
}
});
events.0.clear();
self.0.poll(&mut events.0, timeout)
let res = self.epoll.poll(&mut events.epoll, timeout);
self.io_event.clear();
res
}
pub fn notify(&self) -> io::Result<()> {
self.io_event.notify();
Ok(())
}
}
struct As(RawSocket);