This commit is contained in:
Stjepan Glavina 2020-02-10 01:45:32 +01:00
parent dcb9974392
commit 04ce20b535
1 changed files with 215 additions and 178 deletions

View File

@ -6,12 +6,18 @@ compile_error!("smol does not support this target OS");
use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::convert::TryInto;
use std::fmt::Debug;
use std::future::Future;
use std::io::{self, Read, Write};
use std::mem;
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket};
#[cfg(unix)]
use std::os::unix::{
io::AsRawFd,
net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
};
#[cfg(windows)]
use std::os::windows::io::AsRawSocket;
use std::panic::catch_unwind;
use std::path::Path;
use std::pin::Pin;
@ -21,15 +27,6 @@ use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
#[cfg(unix)]
use std::os::unix::{
io::{AsRawFd, RawFd},
net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
};
#[cfg(target_os = "windows")]
use std::os::windows::io::{AsRawSocket, RawSocket};
use crossbeam_channel as channel;
use crossbeam_utils::sync::Parker;
use futures_core::stream::Stream;
@ -42,19 +39,185 @@ use parking_lot::{Condvar, Mutex};
use slab::Slab;
use socket2::{Domain, Protocol, Socket, Type};
#[cfg(target_os = "linux")]
use nix::sys::epoll::{
epoll_create1, epoll_ctl, epoll_wait, EpollCreateFlags, EpollEvent, EpollFlags, EpollOp,
};
#[cfg(target_os = "windows")]
use wepoll_binding::{Epoll, EventFlag, Events};
// TODO: fix unwraps
// TODO: if epoll/kqueue/wepoll gets EINTR, then retry - or maybe just call notify()
// TODO: catch panics in wake() and Waker::drop()
// TODO: readme for inspiration: https://github.com/piscisaureus/wepoll
#[cfg(target_os = "linux")]
mod sys {
use std::convert::TryInto;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::time::Duration;
use nix::sys::epoll;
#[derive(Clone, Copy)]
pub struct RawSource(RawFd);
impl RawSource {
pub fn new(s: &impl AsRawFd) -> RawSource {
RawSource(s.as_raw_fd())
}
}
pub struct Poll(RawFd);
impl Poll {
pub fn create() -> io::Result<Poll> {
epoll::epoll_create1(epoll::EpollCreateFlags::EPOLL_CLOEXEC)
.map(Poll)
.map_err(io_err)
}
pub fn register(&self, source: RawSource, index: usize) -> io::Result<()> {
let ev = &mut epoll::EpollEvent::new(flags(), index as u64);
epoll::epoll_ctl(self.0, epoll::EpollOp::EpollCtlAdd, source.0, Some(ev))
.map_err(io_err)
}
pub fn reregister(&self, source: RawSource, index: usize) -> io::Result<()> {
let ev = &mut epoll::EpollEvent::new(flags(), index as u64);
epoll::epoll_ctl(self.0, epoll::EpollOp::EpollCtlMod, source.0, Some(ev))
.map_err(io_err)
}
pub fn deregister(&self, source: RawSource) -> io::Result<()> {
epoll::epoll_ctl(self.0, epoll::EpollOp::EpollCtlDel, source.0, None).map_err(io_err)
}
pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout_ms = timeout
.and_then(|t| t.as_millis().try_into().ok())
.unwrap_or(-1);
let n = epoll::epoll_wait(self.0, &mut events.list, timeout_ms).map_err(io_err)?;
events.len = n;
Ok(n)
}
}
pub struct Events {
list: Box<[epoll::EpollEvent]>,
len: usize,
}
impl Events {
pub fn new() -> Events {
Events {
list: vec![epoll::EpollEvent::empty(); 1000].into_boxed_slice(),
len: 0,
}
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list[..self.len].iter().map(|ev| Event {
is_read: ev.events() != epoll::EpollFlags::EPOLLOUT,
is_write: ev.events() != epoll::EpollFlags::EPOLLIN,
index: ev.data() as usize,
})
}
}
pub struct Event {
pub is_read: bool,
pub is_write: bool,
pub index: usize,
}
#[inline]
fn flags() -> epoll::EpollFlags {
epoll::EpollFlags::EPOLLONESHOT
| epoll::EpollFlags::EPOLLIN
| epoll::EpollFlags::EPOLLOUT
| epoll::EpollFlags::EPOLLRDHUP
}
fn io_err(err: impl std::error::Error + Send + Sync + 'static) -> io::Error {
io::Error::new(io::ErrorKind::Other, Box::new(err))
}
}
#[cfg(target_os = "windows")]
mod sys {
use std::convert::TryInto;
use std::io;
use std::os::windows::io::{AsRawSocket, RawSocket};
use std::time::Duration;
use wepoll_binding as wepoll;
#[derive(Clone, Copy)]
pub struct RawSource(RawSocket);
impl RawSource {
pub fn new(s: &impl AsRawSocket) -> RawSource {
RawSource(s.as_raw_socket())
}
}
impl AsRawSocket for RawSource {
fn as_raw_socket(&self) -> RawSocket {
self.0
}
}
pub struct Poll(wepoll::Epoll);
impl Poll {
pub fn create() -> io::Result<Poll> {
wepoll::Epoll::new().map(Poll)
}
pub fn register(&self, source: RawSource, index: usize) -> io::Result<()> {
self.0.register(source, flags(), index as u64)
}
pub fn reregister(&self, source: RawSource, index: usize) -> io::Result<()> {
self.0.reregister(source, flags(), index as u64)?
}
pub fn deregister(&self, source: RawSource) -> io::Result<()> {
self.0.deregister(source)
}
pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
events.0.clear();
self.0.poll(events, timeout)
}
}
pub struct Events(wepoll_binding::Events);
impl Events {
pub fn new() -> Events {
Events(wepoll_binding::Events::with_capacity(1000))
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.0.iter().map(|ev| Event {
is_read: ev.flags() != wepoll::EventFlag::OUT,
is_write: ev.flags() != wepoll::EventFlag::IN,
index: ev.data() as usize,
})
}
}
pub struct Event {
pub is_read: bool,
pub is_write: bool,
pub index: usize,
}
#[inline]
fn flags() -> wepoll::EventFlag {
wepoll::EventFlag::ONESHOT
| wepoll::EventFlag::IN
| wepoll::EventFlag::OUT
| wepoll::EventFlag::RDHUP
}
}
// ----- Poller -----
struct Poller {
@ -92,7 +255,7 @@ impl Poller {
sock2.set_recv_buffer_size(1)?;
let registry = Registry::create()?;
registry.register(&sock2)?;
registry.register(sys::RawSource::new(&sock2))?;
Ok(Poller {
registry,
@ -157,104 +320,37 @@ impl Poller {
// ----- Registry -----
struct Entry {
#[cfg(unix)]
fd: RawFd,
#[cfg(windows)]
socket: RawSocket,
source: sys::RawSource,
index: usize,
readers: Mutex<Vec<Waker>>,
writers: Mutex<Vec<Waker>>,
}
#[cfg(unix)]
impl AsRawFd for Entry {
fn as_raw_fd(&self) -> RawFd {
self.fd
}
}
#[cfg(windows)]
impl AsRawSocket for Entry {
fn as_raw_socket(&self) -> RawSocket {
self.socket
}
}
#[cfg(target_os = "linux")]
struct Registry {
epoll: RawFd,
events: Mutex<Box<[EpollEvent]>>,
io_handles: Mutex<Slab<Arc<Entry>>>,
timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
}
#[cfg(target_os = "windows")]
struct Registry {
epoll: Epoll,
events: Mutex<Events>,
poll: sys::Poll,
events: Mutex<sys::Events>,
io_handles: Mutex<Slab<Arc<Entry>>>,
timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
}
impl Registry {
#[cfg(target_os = "linux")]
fn create() -> io::Result<Registry> {
Ok(Registry {
epoll: epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC).map_err(io_err)?,
events: Mutex::new(vec![EpollEvent::empty(); 1000].into_boxed_slice()),
poll: sys::Poll::create()?,
events: Mutex::new(sys::Events::new()),
io_handles: Mutex::new(Slab::new()),
timers: Mutex::new(BTreeMap::new()),
})
}
#[cfg(target_os = "windows")]
fn create() -> io::Result<Registry> {
Ok(Registry {
epoll: Epoll::new()?,
events: Mutex::new(Events::with_capacity(1000)),
io_handles: Mutex::new(Slab::new()),
timers: Mutex::new(BTreeMap::new()),
})
}
fn register(
&self,
#[cfg(unix)] source: &dyn AsRawFd,
#[cfg(windows)] source: &dyn RawSocket,
) -> io::Result<Arc<Entry>> {
fn register(&self, source: sys::RawSource) -> io::Result<Arc<Entry>> {
let mut io_handles = self.io_handles.lock();
let vacant = io_handles.vacant_entry();
let index = vacant.key();
#[cfg(target_os = "linux")]
epoll_ctl(
self.epoll,
EpollOp::EpollCtlAdd,
source.as_raw_fd(),
Some(&mut EpollEvent::new(
EpollFlags::EPOLLONESHOT
| EpollFlags::EPOLLIN
| EpollFlags::EPOLLOUT
| EpollFlags::EPOLLRDHUP,
index as u64,
)),
)
.map_err(io_err)?;
#[cfg(target_os = "windows")]
self.epoll.register(
source,
EventFlag::ONESHOT | EventFlag::IN | EventFlag::OUT | EventFlag::RDHUP,
index as u64,
)?;
self.poll.register(source, index)?;
let entry = Arc::new(Entry {
#[cfg(unix)]
fd: source.as_raw_fd(),
#[cfg(windows)]
socket: source.as_raw_socket(),
source,
index,
readers: Mutex::new(Vec::new()),
writers: Mutex::new(Vec::new()),
@ -267,14 +363,7 @@ impl Registry {
fn deregister(&self, entry: &Entry) -> io::Result<()> {
let mut io_handles = self.io_handles.lock();
io_handles.remove(entry.index);
#[cfg(target_os = "linux")]
epoll_ctl(self.epoll, EpollOp::EpollCtlDel, entry.as_raw_fd(), None).map_err(io_err)?;
#[cfg(target_os = "windows")]
self.epoll.deregister(entry)?;
Ok(())
self.poll.deregister(entry.source)
}
fn wait_io(&self, timeout: Option<Duration>) -> io::Result<()> {
@ -287,77 +376,29 @@ impl Registry {
self.events.lock()
};
#[cfg(target_os = "linux")]
let (n, iter) = {
let timeout_ms = timeout
.and_then(|t| t.as_millis().try_into().ok())
.unwrap_or(-1);
let n = epoll_wait(self.epoll, &mut events, timeout_ms).map_err(io_err)?;
(n, &events[..n])
};
#[cfg(target_os = "windows")]
let (n, iter) = {
events.clear();
let n = self.epoll.poll(&mut events, timeout)?;
(n, events.iter())
};
if self.poll.poll(&mut events, timeout)? == 0 {
return Ok(());
}
let mut wakers = VecDeque::new();
if n > 0 {
let io_handles = self.io_handles.lock();
let io_handles = self.io_handles.lock();
for ev in iter {
#[cfg(target_os = "linux")]
let (is_read, is_write, index) = (
ev.events() != EpollFlags::EPOLLOUT,
ev.events() != EpollFlags::EPOLLIN,
ev.data() as usize,
);
#[cfg(target_os = "windows")]
let (is_read, is_write, index) = (
ev.flags() != EventFlag::OUT,
ev.flags() != EventFlag::IN,
ev.data() as usize,
);
// In order to minimize latencies, wake writers before readers.
// Source: https://twitter.com/kingprotty/status/1222152589405384705?s=19
if let Some(entry) = io_handles.get(index) {
if is_read {
for w in entry.readers.lock().drain(..) {
wakers.push_back(w);
}
for ev in events.iter() {
// In order to minimize latencies, wake writers before readers.
// Source: https://twitter.com/kingprotty/status/1222152589405384705?s=19
if let Some(entry) = io_handles.get(ev.index) {
if ev.is_read {
for w in entry.readers.lock().drain(..) {
wakers.push_back(w);
}
if is_write {
for w in entry.writers.lock().drain(..) {
wakers.push_front(w);
}
}
#[cfg(target_os = "linux")]
epoll_ctl(
self.epoll,
EpollOp::EpollCtlMod,
entry.fd,
Some(&mut EpollEvent::new(
EpollFlags::EPOLLONESHOT
| EpollFlags::EPOLLIN
| EpollFlags::EPOLLOUT
| EpollFlags::EPOLLRDHUP,
entry.index as u64,
)),
)
.map_err(io_err)?;
#[cfg(target_os = "windows")]
self.epoll.reregister(
entry,
EventFlag::ONESHOT | EventFlag::IN | EventFlag::OUT | EventFlag::RDHUP,
entry.index as u64,
)?;
}
if ev.is_write {
for w in entry.writers.lock().drain(..) {
wakers.push_front(w);
}
}
self.poll.reregister(entry.source, entry.index)?;
}
}
@ -388,12 +429,6 @@ impl Registry {
}
}
/// Converts any error into an I/O error.
#[cfg(unix)]
fn io_err(err: impl std::error::Error + Send + Sync + 'static) -> io::Error {
io::Error::new(io::ErrorKind::Other, Box::new(err))
}
// ----- Executor -----
struct Executor {
@ -815,7 +850,7 @@ impl<T: AsRawFd> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
pub fn nonblocking(source: T) -> io::Result<Async<T>> {
Ok(Async {
entry: POLLER.registry.register(&source)?,
entry: POLLER.registry.register(sys::RawSource::new(&source))?,
source: Box::new(source),
})
}
@ -826,7 +861,7 @@ impl<T: AsRawSocket> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
pub fn nonblocking(source: T) -> io::Result<Async<T>> {
Ok(Async {
entry: POLLER.registry.register(&source)?,
entry: POLLER.registry.register(sys::RawSource::new(&source))?,
source: Box::new(source),
})
}
@ -1008,7 +1043,7 @@ impl Async<TcpListener> {
}
/// Returns a stream over incoming connections.
pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Unpin + '_ {
pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Send + Unpin + '_ {
Box::pin(stream::unfold(self, |listener| async move {
let res = listener.accept().await.map(|(stream, _)| stream);
Some((res, listener))
@ -1140,7 +1175,9 @@ impl Async<UnixListener> {
}
/// Returns a stream over incoming connections.
pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Unpin + '_ {
pub fn incoming(
&self,
) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Send + Unpin + '_ {
Box::pin(stream::unfold(self, |listener| async move {
let res = listener.accept().await.map(|(stream, _)| stream);
Some((res, listener))