mirror of https://github.com/stjepang/smol
Simplify reactor code
This commit is contained in:
parent
30427646f6
commit
6bbd398331
|
@ -31,6 +31,7 @@ concurrent-queue = "1.1.1"
|
|||
fastrand = "1.1.0"
|
||||
futures-io = { version = "0.3.5", default-features = false, features = ["std"] }
|
||||
futures-util = { version = "0.3.5", default-features = false, features = ["std", "io"] }
|
||||
libc = "0.2.70"
|
||||
once_cell = "1.3.1"
|
||||
scoped-tls = "1.0.0"
|
||||
slab = "0.4.2"
|
||||
|
@ -42,11 +43,9 @@ default-features = false
|
|||
features = ["rt-threaded"]
|
||||
optional = true
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
libc = "0.2.70"
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
wepoll-sys-stjepang = "1.0.0"
|
||||
winapi = { version = "0.3.8", features = ["ioapiset"] }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = "0.3"
|
||||
|
|
160
src/reactor.rs
160
src/reactor.rs
|
@ -24,7 +24,7 @@ use std::mem;
|
|||
#[cfg(unix)]
|
||||
use std::os::unix::io::RawFd;
|
||||
#[cfg(windows)]
|
||||
use std::os::windows::io::{FromRawSocket, RawSocket};
|
||||
use std::os::windows::io::RawSocket;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
use std::task::{Poll, Waker};
|
||||
|
@ -34,11 +34,6 @@ use concurrent_queue::ConcurrentQueue;
|
|||
use futures_util::future;
|
||||
use once_cell::sync::Lazy;
|
||||
use slab::Slab;
|
||||
#[cfg(windows)]
|
||||
use socket2::Socket;
|
||||
|
||||
#[cfg(unix)]
|
||||
use crate::sys::fcntl::{fcntl, FcntlArg};
|
||||
|
||||
/// The reactor.
|
||||
///
|
||||
|
@ -101,19 +96,6 @@ impl Reactor {
|
|||
let mut sources = self.sources.lock().unwrap();
|
||||
let vacant = sources.vacant_entry();
|
||||
|
||||
// Put the I/O handle in non-blocking mode.
|
||||
#[cfg(unix)]
|
||||
{
|
||||
let flags = fcntl(raw, FcntlArg::F_GETFL)?;
|
||||
let flags = flags | libc::O_NONBLOCK;
|
||||
fcntl(raw, FcntlArg::F_SETFL(flags))?;
|
||||
}
|
||||
#[cfg(windows)]
|
||||
{
|
||||
let socket = unsafe { Socket::from_raw_socket(raw) };
|
||||
mem::ManuallyDrop::new(socket).set_nonblocking(true)?;
|
||||
}
|
||||
|
||||
// Create a source and register it.
|
||||
let key = vacant.key();
|
||||
self.sys.register(raw, key)?;
|
||||
|
@ -491,6 +473,8 @@ mod sys {
|
|||
Ok(reactor)
|
||||
}
|
||||
pub fn register(&self, fd: RawFd, key: usize) -> io::Result<()> {
|
||||
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
|
||||
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
|
||||
let ev = &mut EpollEvent::new(0, key as u64);
|
||||
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlAdd, fd, Some(ev))
|
||||
}
|
||||
|
@ -589,7 +573,17 @@ mod sys {
|
|||
use std::time::Duration;
|
||||
|
||||
use crate::sys::event::{kevent_ts, kqueue, KEvent};
|
||||
use crate::sys::fcntl::{fcntl, FcntlArg};
|
||||
|
||||
macro_rules! syscall {
|
||||
($fn:ident $args:tt) => {{
|
||||
let res = unsafe { libc::$fn $args };
|
||||
if res == -1 {
|
||||
Err(std::io::Error::last_os_error())
|
||||
} else {
|
||||
Ok(res)
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
pub struct Reactor {
|
||||
kqueue_fd: RawFd,
|
||||
|
@ -599,7 +593,7 @@ mod sys {
|
|||
impl Reactor {
|
||||
pub fn new() -> io::Result<Reactor> {
|
||||
let kqueue_fd = kqueue()?;
|
||||
fcntl(kqueue_fd, FcntlArg::F_SETFD(libc::FD_CLOEXEC))?;
|
||||
syscall!(fcntl(kqueue_fd, libc::F_SETFD, libc::FD_CLOEXEC))?;
|
||||
let (read_stream, write_stream) = UnixStream::pair()?;
|
||||
read_stream.set_nonblocking(true)?;
|
||||
write_stream.set_nonblocking(true)?;
|
||||
|
@ -611,7 +605,9 @@ mod sys {
|
|||
reactor.reregister(reactor.read_stream.as_raw_fd(), !0, true, false)?;
|
||||
Ok(reactor)
|
||||
}
|
||||
pub fn register(&self, _fd: RawFd, _key: usize) -> io::Result<()> {
|
||||
pub fn register(&self, fd: RawFd, _key: usize) -> io::Result<()> {
|
||||
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
|
||||
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
|
||||
Ok(())
|
||||
}
|
||||
pub fn reregister(&self, fd: RawFd, key: usize, read: bool, write: bool) -> io::Result<()> {
|
||||
|
@ -718,35 +714,58 @@ mod sys {
|
|||
mod sys {
|
||||
use std::convert::TryInto;
|
||||
use std::io;
|
||||
use std::os::raw::c_int;
|
||||
use std::os::windows::io::{AsRawSocket, RawSocket};
|
||||
use std::time::Duration;
|
||||
|
||||
use wepoll_sys_stjepang::*;
|
||||
use wepoll_sys_stjepang as we;
|
||||
use winapi::um::winsock2;
|
||||
|
||||
macro_rules! syscall {
|
||||
($fn:ident $args:tt) => {{
|
||||
let res = unsafe { we::$fn $args };
|
||||
if res == -1 {
|
||||
Err(std::io::Error::last_os_error())
|
||||
} else {
|
||||
Ok(res)
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
pub struct Reactor {
|
||||
handle: HANDLE,
|
||||
handle: we::HANDLE,
|
||||
}
|
||||
unsafe impl Send for Reactor {}
|
||||
unsafe impl Sync for Reactor {}
|
||||
impl Reactor {
|
||||
pub fn new() -> io::Result<Reactor> {
|
||||
let handle = unsafe { epoll_create1(0) };
|
||||
let handle = unsafe { we::epoll_create1(0) };
|
||||
if handle.is_null() {
|
||||
return Err(io::Error::last_os_error());
|
||||
}
|
||||
Ok(Reactor { handle })
|
||||
}
|
||||
pub fn register(&self, sock: RawSocket, key: usize) -> io::Result<()> {
|
||||
let mut ev = epoll_event {
|
||||
events: 0,
|
||||
data: epoll_data { u64: key as u64 },
|
||||
};
|
||||
let ret =
|
||||
unsafe { epoll_ctl(self.handle, EPOLL_CTL_ADD as c_int, sock as SOCKET, &mut ev) };
|
||||
if ret == -1 {
|
||||
unsafe {
|
||||
let mut nonblocking = true as libc::c_ulong;
|
||||
let res = winsock2::ioctlsocket(
|
||||
sock as winsock2::SOCKET,
|
||||
winsock2::FIONBIO,
|
||||
&mut nonblocking,
|
||||
);
|
||||
if res != 0 {
|
||||
return Err(io::Error::last_os_error());
|
||||
}
|
||||
}
|
||||
let mut ev = we::epoll_event {
|
||||
events: 0,
|
||||
data: we::epoll_data { u64: key as u64 },
|
||||
};
|
||||
syscall!(epoll_ctl(
|
||||
self.handle,
|
||||
we::EPOLL_CTL_ADD as libc::c_int,
|
||||
sock as we::SOCKET,
|
||||
&mut ev,
|
||||
))?;
|
||||
Ok(())
|
||||
}
|
||||
pub fn reregister(
|
||||
|
@ -756,36 +775,32 @@ mod sys {
|
|||
read: bool,
|
||||
write: bool,
|
||||
) -> io::Result<()> {
|
||||
let mut flags = EPOLLONESHOT;
|
||||
let mut flags = we::EPOLLONESHOT;
|
||||
if read {
|
||||
flags |= READ_FLAGS;
|
||||
}
|
||||
if write {
|
||||
flags |= WRITE_FLAGS;
|
||||
}
|
||||
let mut ev = epoll_event {
|
||||
let mut ev = we::epoll_event {
|
||||
events: flags as u32,
|
||||
data: epoll_data { u64: key as u64 },
|
||||
data: we::epoll_data { u64: key as u64 },
|
||||
};
|
||||
let ret =
|
||||
unsafe { epoll_ctl(self.handle, EPOLL_CTL_MOD as c_int, sock as SOCKET, &mut ev) };
|
||||
if ret == -1 {
|
||||
return Err(io::Error::last_os_error());
|
||||
}
|
||||
syscall!(epoll_ctl(
|
||||
self.handle,
|
||||
we::EPOLL_CTL_MOD as libc::c_int,
|
||||
sock as we::SOCKET,
|
||||
&mut ev,
|
||||
))?;
|
||||
Ok(())
|
||||
}
|
||||
pub fn deregister(&self, sock: RawSocket) -> io::Result<()> {
|
||||
let ret = unsafe {
|
||||
epoll_ctl(
|
||||
syscall!(epoll_ctl(
|
||||
self.handle,
|
||||
EPOLL_CTL_DEL as c_int,
|
||||
sock as SOCKET,
|
||||
0 as *mut epoll_event,
|
||||
)
|
||||
};
|
||||
if ret == -1 {
|
||||
return Err(io::Error::last_os_error());
|
||||
}
|
||||
we::EPOLL_CTL_DEL as libc::c_int,
|
||||
sock as we::SOCKET,
|
||||
0 as *mut we::epoll_event,
|
||||
))?;
|
||||
Ok(())
|
||||
}
|
||||
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
|
||||
|
@ -798,35 +813,29 @@ mod sys {
|
|||
t.max(Duration::from_millis(1))
|
||||
.as_millis()
|
||||
.try_into()
|
||||
.unwrap_or(c_int::MAX)
|
||||
.unwrap_or(libc::c_int::MAX)
|
||||
}
|
||||
}
|
||||
};
|
||||
let ret = unsafe {
|
||||
epoll_wait(
|
||||
events.len = syscall!(epoll_wait(
|
||||
self.handle,
|
||||
events.list.as_mut_ptr(),
|
||||
events.list.len() as c_int,
|
||||
events.list.len() as libc::c_int,
|
||||
timeout_ms,
|
||||
)
|
||||
};
|
||||
if ret == -1 {
|
||||
return Err(io::Error::last_os_error());
|
||||
}
|
||||
events.len = ret as usize;
|
||||
Ok(ret as usize)
|
||||
))? as usize;
|
||||
Ok(events.len)
|
||||
}
|
||||
pub fn notify(&self) -> io::Result<()> {
|
||||
unsafe {
|
||||
extern "system" {
|
||||
fn PostQueuedCompletionStatus(
|
||||
CompletionPort: HANDLE,
|
||||
dwNumberOfBytesTransferred: u32,
|
||||
dwCompletionKey: usize,
|
||||
lpOverlapped: usize,
|
||||
) -> c_int;
|
||||
let res = winapi::um::ioapiset::PostQueuedCompletionStatus(
|
||||
self.handle,
|
||||
0,
|
||||
0,
|
||||
0 as *mut _,
|
||||
);
|
||||
if res != 0 {
|
||||
return Err(io::Error::last_os_error());
|
||||
}
|
||||
PostQueuedCompletionStatus(self.handle, 0, 0, 0);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -837,20 +846,21 @@ mod sys {
|
|||
self.0
|
||||
}
|
||||
}
|
||||
const READ_FLAGS: u32 = EPOLLIN | EPOLLRDHUP | EPOLLHUP | EPOLLERR | EPOLLPRI;
|
||||
const WRITE_FLAGS: u32 = EPOLLOUT | EPOLLHUP | EPOLLERR;
|
||||
const READ_FLAGS: u32 =
|
||||
we::EPOLLIN | we::EPOLLRDHUP | we::EPOLLHUP | we::EPOLLERR | we::EPOLLPRI;
|
||||
const WRITE_FLAGS: u32 = we::EPOLLOUT | we::EPOLLHUP | we::EPOLLERR;
|
||||
|
||||
pub struct Events {
|
||||
list: Box<[epoll_event]>,
|
||||
list: Box<[we::epoll_event]>,
|
||||
len: usize,
|
||||
}
|
||||
unsafe impl Send for Events {}
|
||||
unsafe impl Sync for Events {}
|
||||
impl Events {
|
||||
pub fn new() -> Events {
|
||||
let ev = epoll_event {
|
||||
let ev = we::epoll_event {
|
||||
events: 0,
|
||||
data: epoll_data { u64: 0 },
|
||||
data: we::epoll_data { u64: 0 },
|
||||
};
|
||||
Events {
|
||||
list: vec![ev; 1000].into_boxed_slice(),
|
||||
|
|
32
src/sys.rs
32
src/sys.rs
|
@ -1,35 +1,3 @@
|
|||
#[cfg(unix)]
|
||||
pub mod fcntl {
|
||||
use super::check_err;
|
||||
use std::os::unix::io::RawFd;
|
||||
|
||||
pub type OFlag = libc::c_int;
|
||||
pub type FdFlag = libc::c_int;
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
#[allow(dead_code)]
|
||||
/// Arguments passed to `fcntl`.
|
||||
pub enum FcntlArg {
|
||||
F_GETFL,
|
||||
F_SETFL(OFlag),
|
||||
F_SETFD(FdFlag),
|
||||
}
|
||||
|
||||
/// Thin wrapper around `libc::fcntl`.
|
||||
///
|
||||
/// See [`fcntl(2)`](http://man7.org/linux/man-pages/man2/fcntl.2.html) for details.
|
||||
pub fn fcntl(fd: RawFd, arg: FcntlArg) -> Result<libc::c_int, std::io::Error> {
|
||||
let res = unsafe {
|
||||
match arg {
|
||||
FcntlArg::F_GETFL => libc::fcntl(fd, libc::F_GETFL),
|
||||
FcntlArg::F_SETFL(flag) => libc::fcntl(fd, libc::F_SETFL, flag),
|
||||
FcntlArg::F_SETFD(flag) => libc::fcntl(fd, libc::F_SETFD, flag),
|
||||
}
|
||||
};
|
||||
check_err(res)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn check_err(res: libc::c_int) -> Result<libc::c_int, std::io::Error> {
|
||||
if res == -1 {
|
||||
|
|
Loading…
Reference in New Issue