From 6bbd398331d90dd1bad4d9d730e5917fbb3ed8a9 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sun, 21 Jun 2020 22:11:14 +0200 Subject: [PATCH] Simplify reactor code --- Cargo.toml | 5 +- src/reactor.rs | 170 ++++++++++++++++++++++++++----------------------- src/sys.rs | 32 ---------- 3 files changed, 92 insertions(+), 115 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c9792c6..159b834 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ concurrent-queue = "1.1.1" fastrand = "1.1.0" futures-io = { version = "0.3.5", default-features = false, features = ["std"] } futures-util = { version = "0.3.5", default-features = false, features = ["std", "io"] } +libc = "0.2.70" once_cell = "1.3.1" scoped-tls = "1.0.0" slab = "0.4.2" @@ -42,11 +43,9 @@ default-features = false features = ["rt-threaded"] optional = true -[target.'cfg(unix)'.dependencies] -libc = "0.2.70" - [target.'cfg(windows)'.dependencies] wepoll-sys-stjepang = "1.0.0" +winapi = { version = "0.3.8", features = ["ioapiset"] } [dev-dependencies] criterion = "0.3" diff --git a/src/reactor.rs b/src/reactor.rs index 0fc8a20..c141db2 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -24,7 +24,7 @@ use std::mem; #[cfg(unix)] use std::os::unix::io::RawFd; #[cfg(windows)] -use std::os::windows::io::{FromRawSocket, RawSocket}; +use std::os::windows::io::RawSocket; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, MutexGuard}; use std::task::{Poll, Waker}; @@ -34,11 +34,6 @@ use concurrent_queue::ConcurrentQueue; use futures_util::future; use once_cell::sync::Lazy; use slab::Slab; -#[cfg(windows)] -use socket2::Socket; - -#[cfg(unix)] -use crate::sys::fcntl::{fcntl, FcntlArg}; /// The reactor. /// @@ -101,19 +96,6 @@ impl Reactor { let mut sources = self.sources.lock().unwrap(); let vacant = sources.vacant_entry(); - // Put the I/O handle in non-blocking mode. - #[cfg(unix)] - { - let flags = fcntl(raw, FcntlArg::F_GETFL)?; - let flags = flags | libc::O_NONBLOCK; - fcntl(raw, FcntlArg::F_SETFL(flags))?; - } - #[cfg(windows)] - { - let socket = unsafe { Socket::from_raw_socket(raw) }; - mem::ManuallyDrop::new(socket).set_nonblocking(true)?; - } - // Create a source and register it. let key = vacant.key(); self.sys.register(raw, key)?; @@ -491,6 +473,8 @@ mod sys { Ok(reactor) } pub fn register(&self, fd: RawFd, key: usize) -> io::Result<()> { + let flags = syscall!(fcntl(fd, libc::F_GETFL))?; + syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?; let ev = &mut EpollEvent::new(0, key as u64); epoll_ctl(self.epoll_fd, EpollOp::EpollCtlAdd, fd, Some(ev)) } @@ -589,7 +573,17 @@ mod sys { use std::time::Duration; use crate::sys::event::{kevent_ts, kqueue, KEvent}; - use crate::sys::fcntl::{fcntl, FcntlArg}; + + macro_rules! syscall { + ($fn:ident $args:tt) => {{ + let res = unsafe { libc::$fn $args }; + if res == -1 { + Err(std::io::Error::last_os_error()) + } else { + Ok(res) + } + }}; + } pub struct Reactor { kqueue_fd: RawFd, @@ -599,7 +593,7 @@ mod sys { impl Reactor { pub fn new() -> io::Result { let kqueue_fd = kqueue()?; - fcntl(kqueue_fd, FcntlArg::F_SETFD(libc::FD_CLOEXEC))?; + syscall!(fcntl(kqueue_fd, libc::F_SETFD, libc::FD_CLOEXEC))?; let (read_stream, write_stream) = UnixStream::pair()?; read_stream.set_nonblocking(true)?; write_stream.set_nonblocking(true)?; @@ -611,7 +605,9 @@ mod sys { reactor.reregister(reactor.read_stream.as_raw_fd(), !0, true, false)?; Ok(reactor) } - pub fn register(&self, _fd: RawFd, _key: usize) -> io::Result<()> { + pub fn register(&self, fd: RawFd, _key: usize) -> io::Result<()> { + let flags = syscall!(fcntl(fd, libc::F_GETFL))?; + syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?; Ok(()) } pub fn reregister(&self, fd: RawFd, key: usize, read: bool, write: bool) -> io::Result<()> { @@ -718,35 +714,58 @@ mod sys { mod sys { use std::convert::TryInto; use std::io; - use std::os::raw::c_int; use std::os::windows::io::{AsRawSocket, RawSocket}; use std::time::Duration; - use wepoll_sys_stjepang::*; + use wepoll_sys_stjepang as we; + use winapi::um::winsock2; + + macro_rules! syscall { + ($fn:ident $args:tt) => {{ + let res = unsafe { we::$fn $args }; + if res == -1 { + Err(std::io::Error::last_os_error()) + } else { + Ok(res) + } + }}; + } pub struct Reactor { - handle: HANDLE, + handle: we::HANDLE, } unsafe impl Send for Reactor {} unsafe impl Sync for Reactor {} impl Reactor { pub fn new() -> io::Result { - let handle = unsafe { epoll_create1(0) }; + let handle = unsafe { we::epoll_create1(0) }; if handle.is_null() { return Err(io::Error::last_os_error()); } Ok(Reactor { handle }) } pub fn register(&self, sock: RawSocket, key: usize) -> io::Result<()> { - let mut ev = epoll_event { - events: 0, - data: epoll_data { u64: key as u64 }, - }; - let ret = - unsafe { epoll_ctl(self.handle, EPOLL_CTL_ADD as c_int, sock as SOCKET, &mut ev) }; - if ret == -1 { - return Err(io::Error::last_os_error()); + unsafe { + let mut nonblocking = true as libc::c_ulong; + let res = winsock2::ioctlsocket( + sock as winsock2::SOCKET, + winsock2::FIONBIO, + &mut nonblocking, + ); + if res != 0 { + return Err(io::Error::last_os_error()); + } } + let mut ev = we::epoll_event { + events: 0, + data: we::epoll_data { u64: key as u64 }, + }; + syscall!(epoll_ctl( + self.handle, + we::EPOLL_CTL_ADD as libc::c_int, + sock as we::SOCKET, + &mut ev, + ))?; Ok(()) } pub fn reregister( @@ -756,36 +775,32 @@ mod sys { read: bool, write: bool, ) -> io::Result<()> { - let mut flags = EPOLLONESHOT; + let mut flags = we::EPOLLONESHOT; if read { flags |= READ_FLAGS; } if write { flags |= WRITE_FLAGS; } - let mut ev = epoll_event { + let mut ev = we::epoll_event { events: flags as u32, - data: epoll_data { u64: key as u64 }, + data: we::epoll_data { u64: key as u64 }, }; - let ret = - unsafe { epoll_ctl(self.handle, EPOLL_CTL_MOD as c_int, sock as SOCKET, &mut ev) }; - if ret == -1 { - return Err(io::Error::last_os_error()); - } + syscall!(epoll_ctl( + self.handle, + we::EPOLL_CTL_MOD as libc::c_int, + sock as we::SOCKET, + &mut ev, + ))?; Ok(()) } pub fn deregister(&self, sock: RawSocket) -> io::Result<()> { - let ret = unsafe { - epoll_ctl( - self.handle, - EPOLL_CTL_DEL as c_int, - sock as SOCKET, - 0 as *mut epoll_event, - ) - }; - if ret == -1 { - return Err(io::Error::last_os_error()); - } + syscall!(epoll_ctl( + self.handle, + we::EPOLL_CTL_DEL as libc::c_int, + sock as we::SOCKET, + 0 as *mut we::epoll_event, + ))?; Ok(()) } pub fn wait(&self, events: &mut Events, timeout: Option) -> io::Result { @@ -798,35 +813,29 @@ mod sys { t.max(Duration::from_millis(1)) .as_millis() .try_into() - .unwrap_or(c_int::MAX) + .unwrap_or(libc::c_int::MAX) } } }; - let ret = unsafe { - epoll_wait( - self.handle, - events.list.as_mut_ptr(), - events.list.len() as c_int, - timeout_ms, - ) - }; - if ret == -1 { - return Err(io::Error::last_os_error()); - } - events.len = ret as usize; - Ok(ret as usize) + events.len = syscall!(epoll_wait( + self.handle, + events.list.as_mut_ptr(), + events.list.len() as libc::c_int, + timeout_ms, + ))? as usize; + Ok(events.len) } pub fn notify(&self) -> io::Result<()> { unsafe { - extern "system" { - fn PostQueuedCompletionStatus( - CompletionPort: HANDLE, - dwNumberOfBytesTransferred: u32, - dwCompletionKey: usize, - lpOverlapped: usize, - ) -> c_int; + let res = winapi::um::ioapiset::PostQueuedCompletionStatus( + self.handle, + 0, + 0, + 0 as *mut _, + ); + if res != 0 { + return Err(io::Error::last_os_error()); } - PostQueuedCompletionStatus(self.handle, 0, 0, 0); } Ok(()) } @@ -837,20 +846,21 @@ mod sys { self.0 } } - const READ_FLAGS: u32 = EPOLLIN | EPOLLRDHUP | EPOLLHUP | EPOLLERR | EPOLLPRI; - const WRITE_FLAGS: u32 = EPOLLOUT | EPOLLHUP | EPOLLERR; + const READ_FLAGS: u32 = + we::EPOLLIN | we::EPOLLRDHUP | we::EPOLLHUP | we::EPOLLERR | we::EPOLLPRI; + const WRITE_FLAGS: u32 = we::EPOLLOUT | we::EPOLLHUP | we::EPOLLERR; pub struct Events { - list: Box<[epoll_event]>, + list: Box<[we::epoll_event]>, len: usize, } unsafe impl Send for Events {} unsafe impl Sync for Events {} impl Events { pub fn new() -> Events { - let ev = epoll_event { + let ev = we::epoll_event { events: 0, - data: epoll_data { u64: 0 }, + data: we::epoll_data { u64: 0 }, }; Events { list: vec![ev; 1000].into_boxed_slice(), diff --git a/src/sys.rs b/src/sys.rs index c355212..336ae90 100644 --- a/src/sys.rs +++ b/src/sys.rs @@ -1,35 +1,3 @@ -#[cfg(unix)] -pub mod fcntl { - use super::check_err; - use std::os::unix::io::RawFd; - - pub type OFlag = libc::c_int; - pub type FdFlag = libc::c_int; - - #[allow(non_camel_case_types)] - #[allow(dead_code)] - /// Arguments passed to `fcntl`. - pub enum FcntlArg { - F_GETFL, - F_SETFL(OFlag), - F_SETFD(FdFlag), - } - - /// Thin wrapper around `libc::fcntl`. - /// - /// See [`fcntl(2)`](http://man7.org/linux/man-pages/man2/fcntl.2.html) for details. - pub fn fcntl(fd: RawFd, arg: FcntlArg) -> Result { - let res = unsafe { - match arg { - FcntlArg::F_GETFL => libc::fcntl(fd, libc::F_GETFL), - FcntlArg::F_SETFL(flag) => libc::fcntl(fd, libc::F_SETFL, flag), - FcntlArg::F_SETFD(flag) => libc::fcntl(fd, libc::F_SETFD, flag), - } - }; - check_err(res) - } -} - #[cfg(unix)] fn check_err(res: libc::c_int) -> Result { if res == -1 {