This commit is contained in:
Stjepan Glavina 2020-02-10 02:37:36 +01:00
parent 04ce20b535
commit 1c404a210c
2 changed files with 181 additions and 201 deletions

View File

@ -6,7 +6,7 @@
* Complete - Fully featured and ready for production.
* Documented - Simple code, easy to understand and modify.
* Portable - Works on Linux, Android, macOS, iOS, and Windows. (TODO: only Linux right now)
* Lightweight - Small dependencies, relies on epoll/kqueue/WSApoll.
* Lightweight - Small dependencies, relies on epoll/kqueue/WSAPoll.
## Features

View File

@ -44,193 +44,19 @@ use socket2::{Domain, Protocol, Socket, Type};
// TODO: catch panics in wake() and Waker::drop()
// TODO: readme for inspiration: https://github.com/piscisaureus/wepoll
#[cfg(target_os = "linux")]
mod sys {
use std::convert::TryInto;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::time::Duration;
// ----- Reactor -----
use nix::sys::epoll;
#[derive(Clone, Copy)]
pub struct RawSource(RawFd);
impl RawSource {
pub fn new(s: &impl AsRawFd) -> RawSource {
RawSource(s.as_raw_fd())
}
}
pub struct Poll(RawFd);
impl Poll {
pub fn create() -> io::Result<Poll> {
epoll::epoll_create1(epoll::EpollCreateFlags::EPOLL_CLOEXEC)
.map(Poll)
.map_err(io_err)
}
pub fn register(&self, source: RawSource, index: usize) -> io::Result<()> {
let ev = &mut epoll::EpollEvent::new(flags(), index as u64);
epoll::epoll_ctl(self.0, epoll::EpollOp::EpollCtlAdd, source.0, Some(ev))
.map_err(io_err)
}
pub fn reregister(&self, source: RawSource, index: usize) -> io::Result<()> {
let ev = &mut epoll::EpollEvent::new(flags(), index as u64);
epoll::epoll_ctl(self.0, epoll::EpollOp::EpollCtlMod, source.0, Some(ev))
.map_err(io_err)
}
pub fn deregister(&self, source: RawSource) -> io::Result<()> {
epoll::epoll_ctl(self.0, epoll::EpollOp::EpollCtlDel, source.0, None).map_err(io_err)
}
pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout_ms = timeout
.and_then(|t| t.as_millis().try_into().ok())
.unwrap_or(-1);
let n = epoll::epoll_wait(self.0, &mut events.list, timeout_ms).map_err(io_err)?;
events.len = n;
Ok(n)
}
}
pub struct Events {
list: Box<[epoll::EpollEvent]>,
len: usize,
}
impl Events {
pub fn new() -> Events {
Events {
list: vec![epoll::EpollEvent::empty(); 1000].into_boxed_slice(),
len: 0,
}
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list[..self.len].iter().map(|ev| Event {
is_read: ev.events() != epoll::EpollFlags::EPOLLOUT,
is_write: ev.events() != epoll::EpollFlags::EPOLLIN,
index: ev.data() as usize,
})
}
}
pub struct Event {
pub is_read: bool,
pub is_write: bool,
pub index: usize,
}
#[inline]
fn flags() -> epoll::EpollFlags {
epoll::EpollFlags::EPOLLONESHOT
| epoll::EpollFlags::EPOLLIN
| epoll::EpollFlags::EPOLLOUT
| epoll::EpollFlags::EPOLLRDHUP
}
fn io_err(err: impl std::error::Error + Send + Sync + 'static) -> io::Error {
io::Error::new(io::ErrorKind::Other, Box::new(err))
}
}
#[cfg(target_os = "windows")]
mod sys {
use std::convert::TryInto;
use std::io;
use std::os::windows::io::{AsRawSocket, RawSocket};
use std::time::Duration;
use wepoll_binding as wepoll;
#[derive(Clone, Copy)]
pub struct RawSource(RawSocket);
impl RawSource {
pub fn new(s: &impl AsRawSocket) -> RawSource {
RawSource(s.as_raw_socket())
}
}
impl AsRawSocket for RawSource {
fn as_raw_socket(&self) -> RawSocket {
self.0
}
}
pub struct Poll(wepoll::Epoll);
impl Poll {
pub fn create() -> io::Result<Poll> {
wepoll::Epoll::new().map(Poll)
}
pub fn register(&self, source: RawSource, index: usize) -> io::Result<()> {
self.0.register(source, flags(), index as u64)
}
pub fn reregister(&self, source: RawSource, index: usize) -> io::Result<()> {
self.0.reregister(source, flags(), index as u64)?
}
pub fn deregister(&self, source: RawSource) -> io::Result<()> {
self.0.deregister(source)
}
pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
events.0.clear();
self.0.poll(events, timeout)
}
}
pub struct Events(wepoll_binding::Events);
impl Events {
pub fn new() -> Events {
Events(wepoll_binding::Events::with_capacity(1000))
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.0.iter().map(|ev| Event {
is_read: ev.flags() != wepoll::EventFlag::OUT,
is_write: ev.flags() != wepoll::EventFlag::IN,
index: ev.data() as usize,
})
}
}
pub struct Event {
pub is_read: bool,
pub is_write: bool,
pub index: usize,
}
#[inline]
fn flags() -> wepoll::EventFlag {
wepoll::EventFlag::ONESHOT
| wepoll::EventFlag::IN
| wepoll::EventFlag::OUT
| wepoll::EventFlag::RDHUP
}
}
// ----- Poller -----
struct Poller {
struct Reactor {
registry: Registry,
flag: AtomicBool,
socket_notify: Socket,
socket_wakeup: Socket,
}
static POLLER: Lazy<Poller> = Lazy::new(|| Poller::create().expect("cannot create poller"));
static REACTOR: Lazy<Reactor> = Lazy::new(|| Reactor::create().expect("cannot create reactor"));
impl Poller {
fn create() -> io::Result<Poller> {
impl Reactor {
fn create() -> io::Result<Reactor> {
// https://stackoverflow.com/questions/24933411/how-to-emulate-socket-socketpair-on-windows
// https://github.com/mhils/backports.socketpair/blob/master/backports/socketpair/__init__.py
// https://github.com/python-trio/trio/blob/master/trio/_core/_wakeup_socketpair.py
@ -257,7 +83,7 @@ impl Poller {
let registry = Registry::create()?;
registry.register(sys::RawSource::new(&sock2))?;
Ok(Poller {
Ok(Reactor {
registry,
flag: AtomicBool::new(false),
socket_notify: sock1,
@ -327,7 +153,7 @@ struct Entry {
}
struct Registry {
poll: sys::Poll,
poller: sys::Poller,
events: Mutex<sys::Events>,
io_handles: Mutex<Slab<Arc<Entry>>>,
timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
@ -336,7 +162,7 @@ struct Registry {
impl Registry {
fn create() -> io::Result<Registry> {
Ok(Registry {
poll: sys::Poll::create()?,
poller: sys::Poller::create()?,
events: Mutex::new(sys::Events::new()),
io_handles: Mutex::new(Slab::new()),
timers: Mutex::new(BTreeMap::new()),
@ -347,7 +173,7 @@ impl Registry {
let mut io_handles = self.io_handles.lock();
let vacant = io_handles.vacant_entry();
let index = vacant.key();
self.poll.register(source, index)?;
self.poller.register(source, index)?;
let entry = Arc::new(Entry {
source,
@ -363,7 +189,7 @@ impl Registry {
fn deregister(&self, entry: &Entry) -> io::Result<()> {
let mut io_handles = self.io_handles.lock();
io_handles.remove(entry.index);
self.poll.deregister(entry.source)
self.poller.deregister(entry.source)
}
fn wait_io(&self, timeout: Option<Duration>) -> io::Result<()> {
@ -376,7 +202,7 @@ impl Registry {
self.events.lock()
};
if self.poll.poll(&mut events, timeout)? == 0 {
if self.poller.poll(&mut events, timeout)? == 0 {
return Ok(());
}
@ -384,9 +210,11 @@ impl Registry {
let io_handles = self.io_handles.lock();
for ev in events.iter() {
// In order to minimize latencies, wake writers before readers.
// Source: https://twitter.com/kingprotty/status/1222152589405384705?s=19
if let Some(entry) = io_handles.get(ev.index) {
self.poller.reregister(entry.source, entry.index)?;
// In order to minimize latencies, wake writers before readers.
// Source: https://twitter.com/kingprotty/status/1222152589405384705?s=19
if ev.is_read {
for w in entry.readers.lock().drain(..) {
wakers.push_back(w);
@ -397,8 +225,6 @@ impl Registry {
wakers.push_front(w);
}
}
self.poll.reregister(entry.source, entry.index)?;
}
}
@ -463,7 +289,7 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
let ready = ready.clone();
move || {
if !ready.swap(true, Ordering::SeqCst) {
POLLER.interrupt();
REACTOR.interrupt();
let _m = EXECUTOR.mutex.lock();
EXECUTOR.cvar.notify_all();
}
@ -479,7 +305,7 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
while !ready.load(Ordering::SeqCst) {
if runs >= 64 {
runs = 0;
POLLER.poll_quick().unwrap();
REACTOR.poll_quick().unwrap();
}
match EXECUTOR.receiver.try_recv() {
@ -491,7 +317,7 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
Err(_) => {
runs = 0;
fails += 1;
POLLER.poll_quick().unwrap();
REACTOR.poll_quick().unwrap();
if fails <= 1 {
continue;
@ -527,7 +353,7 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
})
};
POLLER.poll().unwrap();
REACTOR.poll().unwrap();
}
}
}
@ -555,7 +381,7 @@ impl<T: Send + 'static> Task<T> {
// Create a runnable and schedule it for execution.
let schedule = |runnable| {
EXECUTOR.queue.send(runnable).unwrap();
POLLER.interrupt();
REACTOR.interrupt();
};
let (runnable, handle) = async_task::spawn(future, schedule, ());
runnable.schedule();
@ -798,7 +624,7 @@ impl Timer {
impl Drop for Timer {
fn drop(&mut self) {
let id = self as *mut Timer as usize;
POLLER.registry.timers.lock().remove(&(self.when, id));
REACTOR.registry.timers.lock().remove(&(self.when, id));
self.inserted = false;
}
}
@ -808,7 +634,7 @@ impl Future for Timer {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let id = &mut *self as *mut Timer as usize;
let mut timers = POLLER.registry.timers.lock();
let mut timers = REACTOR.registry.timers.lock();
if Instant::now() >= self.when {
timers.remove(&(self.when, id));
@ -829,7 +655,7 @@ impl Future for Timer {
if is_earliest {
drop(timers);
POLLER.interrupt();
REACTOR.interrupt();
}
}
@ -850,7 +676,7 @@ impl<T: AsRawFd> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
pub fn nonblocking(source: T) -> io::Result<Async<T>> {
Ok(Async {
entry: POLLER.registry.register(sys::RawSource::new(&source))?,
entry: REACTOR.registry.register(sys::RawSource::new(&source))?,
source: Box::new(source),
})
}
@ -861,7 +687,7 @@ impl<T: AsRawSocket> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
pub fn nonblocking(source: T) -> io::Result<Async<T>> {
Ok(Async {
entry: POLLER.registry.register(sys::RawSource::new(&source))?,
entry: REACTOR.registry.register(sys::RawSource::new(&source))?,
source: Box::new(source),
})
}
@ -947,7 +773,7 @@ impl<T> Async<T> {
impl<T> Drop for Async<T> {
fn drop(&mut self) {
POLLER.registry.deregister(&self.entry).unwrap();
REACTOR.registry.deregister(&self.entry).unwrap();
}
}
@ -1247,3 +1073,157 @@ impl Async<UnixDatagram> {
self.read_with(|source| source.recv(buf)).await
}
}
// ----- Linux (epoll) -----
#[cfg(target_os = "linux")]
mod sys {
use std::convert::TryInto;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::time::Duration;
use nix::sys::epoll::{
epoll_create1, epoll_ctl, epoll_wait, EpollCreateFlags, EpollEvent, EpollFlags, EpollOp,
};
#[derive(Clone, Copy)]
pub struct RawSource(RawFd);
impl RawSource {
pub fn new(s: &impl AsRawFd) -> RawSource {
RawSource(s.as_raw_fd())
}
}
pub struct Poller(RawFd);
impl Poller {
pub fn create() -> io::Result<Poller> {
epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC)
.map(Poller)
.map_err(io_err)
}
pub fn register(&self, source: RawSource, index: usize) -> io::Result<()> {
let ev = &mut EpollEvent::new(flags(), index as u64);
epoll_ctl(self.0, EpollOp::EpollCtlAdd, source.0, Some(ev)).map_err(io_err)
}
pub fn reregister(&self, _source: RawSource, _index: usize) -> io::Result<()> {
Ok(())
}
pub fn deregister(&self, source: RawSource) -> io::Result<()> {
epoll_ctl(self.0, EpollOp::EpollCtlDel, source.0, None).map_err(io_err)
}
pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout_ms = timeout
.and_then(|t| t.as_millis().try_into().ok())
.unwrap_or(-1);
events.len = epoll_wait(self.0, &mut events.list, timeout_ms).map_err(io_err)?;
Ok(events.len)
}
}
pub struct Events {
list: Box<[EpollEvent]>,
len: usize,
}
impl Events {
pub fn new() -> Events {
Events {
list: vec![EpollEvent::empty(); 1000].into_boxed_slice(),
len: 0,
}
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list[..self.len].iter().map(|ev| Event {
is_read: ev.events() != EpollFlags::EPOLLOUT,
is_write: ev.events() != EpollFlags::EPOLLIN,
index: ev.data() as usize,
})
}
}
pub struct Event {
pub is_read: bool,
pub is_write: bool,
pub index: usize,
}
#[inline]
fn flags() -> EpollFlags {
EpollFlags::EPOLLET | EpollFlags::EPOLLIN | EpollFlags::EPOLLOUT | EpollFlags::EPOLLRDHUP
}
fn io_err(err: impl std::error::Error + Send + Sync + 'static) -> io::Error {
io::Error::new(io::ErrorKind::Other, Box::new(err))
}
}
// ----- Windows (WSAPoll) -----
#[cfg(target_os = "windows")]
mod sys {
use std::convert::TryInto;
use std::io;
use std::os::windows::io::{AsRawSocket, RawSocket};
use std::time::Duration;
use wepoll_binding as wepoll;
#[derive(Clone, Copy)]
pub struct RawSource(RawSocket);
impl RawSource {
pub fn new(s: &impl AsRawSocket) -> RawSource {
RawSource(s.as_raw_socket())
}
}
impl AsRawSocket for RawSource {
fn as_raw_socket(&self) -> RawSocket {
self.0
}
}
pub struct Poller(wepoll::Epoll);
impl Poller {
pub fn create() -> io::Result<Poller> {
wepoll::Epoll::new().map(Poller)
}
pub fn register(&self, source: RawSource, index: usize) -> io::Result<()> {
self.0.register(source, flags(), index as u64)
}
pub fn reregister(&self, source: RawSource, index: usize) -> io::Result<()> {
self.0.reregister(source, flags(), index as u64)?
}
pub fn deregister(&self, source: RawSource) -> io::Result<()> {
self.0.deregister(source)
}
pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
events.0.clear();
self.0.poll(events, timeout)
}
}
pub struct Events(wepoll::Events);
impl Events {
pub fn new() -> Events {
Events(wepoll::Events::with_capacity(1000))
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.0.iter().map(|ev| Event {
is_read: ev.flags() != wepoll::EventFlag::OUT,
is_write: ev.flags() != wepoll::EventFlag::IN,
index: ev.data() as usize,
})
}
}
pub struct Event {
pub is_read: bool,
pub is_write: bool,
pub index: usize,
}
#[inline]
fn flags() -> wepoll::EventFlag {
use wepoll::EventFlag::*;
ONESHOT | IN | OUT | RDHUP
}
}