Merge pull request #175 from stjepang/cleanup-sys

Cleanup sys module
This commit is contained in:
Stjepan Glavina 2020-06-21 23:16:07 +02:00 committed by GitHub
commit 56b65e4b8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 122 additions and 190 deletions

View File

@ -13,7 +13,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
rust: [nightly, beta, stable]
rust: [nightly, beta, stable, 1.39.0]
steps:
- uses: actions/checkout@v2
@ -32,7 +32,14 @@ jobs:
profile: minimal
override: true
- name: Run basic cargo check
uses: actions-rs/cargo@v1
with:
command: check
args: --all --bins --all-features
- name: Run cargo check
if: startsWith(matrix.rust, '1.39.0') == false
uses: actions-rs/cargo@v1
with:
command: check
@ -46,6 +53,7 @@ jobs:
args: -Z features=dev_dep
- name: Run cargo test
if: startsWith(matrix.rust, '1.39.0') == false
uses: actions-rs/cargo@v1
with:
command: test

View File

@ -31,6 +31,7 @@ concurrent-queue = "1.1.1"
fastrand = "1.1.0"
futures-io = { version = "0.3.5", default-features = false, features = ["std"] }
futures-util = { version = "0.3.5", default-features = false, features = ["std", "io"] }
libc = "0.2.70"
once_cell = "1.3.1"
scoped-tls = "1.0.0"
slab = "0.4.2"
@ -42,11 +43,9 @@ default-features = false
features = ["rt-threaded"]
optional = true
[target.'cfg(unix)'.dependencies]
libc = "0.2.70"
[target.'cfg(windows)'.dependencies]
wepoll-sys-stjepang = "1.0.0"
winapi = { version = "0.3.8", features = ["ioapiset"] }
[dev-dependencies]
criterion = "0.3"

View File

@ -24,7 +24,7 @@ use std::mem;
#[cfg(unix)]
use std::os::unix::io::RawFd;
#[cfg(windows)]
use std::os::windows::io::{FromRawSocket, RawSocket};
use std::os::windows::io::RawSocket;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Poll, Waker};
@ -34,11 +34,6 @@ use concurrent_queue::ConcurrentQueue;
use futures_util::future;
use once_cell::sync::Lazy;
use slab::Slab;
#[cfg(windows)]
use socket2::Socket;
#[cfg(unix)]
use crate::sys::fcntl::{fcntl, FcntlArg};
/// The reactor.
///
@ -101,19 +96,6 @@ impl Reactor {
let mut sources = self.sources.lock().unwrap();
let vacant = sources.vacant_entry();
// Put the I/O handle in non-blocking mode.
#[cfg(unix)]
{
let flags = fcntl(raw, FcntlArg::F_GETFL)?;
let flags = flags | libc::O_NONBLOCK;
fcntl(raw, FcntlArg::F_SETFL(flags))?;
}
#[cfg(windows)]
{
let socket = unsafe { Socket::from_raw_socket(raw) };
mem::ManuallyDrop::new(socket).set_nonblocking(true)?;
}
// Create a source and register it.
let key = vacant.key();
self.sys.register(raw, key)?;
@ -459,7 +441,6 @@ impl Source {
mod sys {
use std::convert::TryInto;
use std::io;
use std::os::raw::c_void;
use std::os::unix::io::RawFd;
use std::time::Duration;
@ -492,6 +473,8 @@ mod sys {
Ok(reactor)
}
pub fn register(&self, fd: RawFd, key: usize) -> io::Result<()> {
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
let ev = &mut EpollEvent::new(0, key as u64);
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlAdd, fd, Some(ev))
}
@ -525,7 +508,7 @@ mod sys {
let mut buf = [0u8; 8];
let _ = syscall!(read(
self.event_fd,
&mut buf[0] as *mut u8 as *mut c_void,
&mut buf[0] as *mut u8 as *mut libc::c_void,
buf.len()
));
self.reregister(self.event_fd, !0, true, false)?;
@ -536,7 +519,7 @@ mod sys {
let buf: [u8; 8] = 1u64.to_ne_bytes();
let _ = syscall!(write(
self.event_fd,
&buf[0] as *const u8 as *const c_void,
&buf[0] as *const u8 as *const libc::c_void,
buf.len()
));
Ok(())
@ -590,7 +573,17 @@ mod sys {
use std::time::Duration;
use crate::sys::event::{kevent_ts, kqueue, KEvent};
use crate::sys::fcntl::{fcntl, FcntlArg};
macro_rules! syscall {
($fn:ident $args:tt) => {{
let res = unsafe { libc::$fn $args };
if res == -1 {
Err(std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
pub struct Reactor {
kqueue_fd: RawFd,
@ -600,7 +593,7 @@ mod sys {
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let kqueue_fd = kqueue()?;
fcntl(kqueue_fd, FcntlArg::F_SETFD(libc::FD_CLOEXEC))?;
syscall!(fcntl(kqueue_fd, libc::F_SETFD, libc::FD_CLOEXEC))?;
let (read_stream, write_stream) = UnixStream::pair()?;
read_stream.set_nonblocking(true)?;
write_stream.set_nonblocking(true)?;
@ -612,7 +605,9 @@ mod sys {
reactor.reregister(reactor.read_stream.as_raw_fd(), !0, true, false)?;
Ok(reactor)
}
pub fn register(&self, _fd: RawFd, _key: usize) -> io::Result<()> {
pub fn register(&self, fd: RawFd, _key: usize) -> io::Result<()> {
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
Ok(())
}
pub fn reregister(&self, fd: RawFd, key: usize, read: bool, write: bool) -> io::Result<()> {
@ -719,35 +714,58 @@ mod sys {
mod sys {
use std::convert::TryInto;
use std::io;
use std::os::raw::c_int;
use std::os::windows::io::{AsRawSocket, RawSocket};
use std::time::Duration;
use wepoll_sys_stjepang::*;
use wepoll_sys_stjepang as we;
use winapi::um::winsock2;
macro_rules! syscall {
($fn:ident $args:tt) => {{
let res = unsafe { we::$fn $args };
if res == -1 {
Err(std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
pub struct Reactor {
handle: HANDLE,
handle: we::HANDLE,
}
unsafe impl Send for Reactor {}
unsafe impl Sync for Reactor {}
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let handle = unsafe { epoll_create1(0) };
let handle = unsafe { we::epoll_create1(0) };
if handle.is_null() {
return Err(io::Error::last_os_error());
}
Ok(Reactor { handle })
}
pub fn register(&self, sock: RawSocket, key: usize) -> io::Result<()> {
let mut ev = epoll_event {
events: 0,
data: epoll_data { u64: key as u64 },
};
let ret =
unsafe { epoll_ctl(self.handle, EPOLL_CTL_ADD as c_int, sock as SOCKET, &mut ev) };
if ret == -1 {
return Err(io::Error::last_os_error());
unsafe {
let mut nonblocking = true as libc::c_ulong;
let res = winsock2::ioctlsocket(
sock as winsock2::SOCKET,
winsock2::FIONBIO,
&mut nonblocking,
);
if res != 0 {
return Err(io::Error::last_os_error());
}
}
let mut ev = we::epoll_event {
events: 0,
data: we::epoll_data { u64: key as u64 },
};
syscall!(epoll_ctl(
self.handle,
we::EPOLL_CTL_ADD as libc::c_int,
sock as we::SOCKET,
&mut ev,
))?;
Ok(())
}
pub fn reregister(
@ -757,36 +775,32 @@ mod sys {
read: bool,
write: bool,
) -> io::Result<()> {
let mut flags = EPOLLONESHOT;
let mut flags = we::EPOLLONESHOT;
if read {
flags |= READ_FLAGS;
}
if write {
flags |= WRITE_FLAGS;
}
let mut ev = epoll_event {
let mut ev = we::epoll_event {
events: flags as u32,
data: epoll_data { u64: key as u64 },
data: we::epoll_data { u64: key as u64 },
};
let ret =
unsafe { epoll_ctl(self.handle, EPOLL_CTL_MOD as c_int, sock as SOCKET, &mut ev) };
if ret == -1 {
return Err(io::Error::last_os_error());
}
syscall!(epoll_ctl(
self.handle,
we::EPOLL_CTL_MOD as libc::c_int,
sock as we::SOCKET,
&mut ev,
))?;
Ok(())
}
pub fn deregister(&self, sock: RawSocket) -> io::Result<()> {
let ret = unsafe {
epoll_ctl(
self.handle,
EPOLL_CTL_DEL as c_int,
sock as SOCKET,
0 as *mut epoll_event,
)
};
if ret == -1 {
return Err(io::Error::last_os_error());
}
syscall!(epoll_ctl(
self.handle,
we::EPOLL_CTL_DEL as libc::c_int,
sock as we::SOCKET,
0 as *mut we::epoll_event,
))?;
Ok(())
}
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
@ -799,35 +813,27 @@ mod sys {
t.max(Duration::from_millis(1))
.as_millis()
.try_into()
.unwrap_or(c_int::MAX)
.unwrap_or(libc::c_int::max_value())
}
}
};
let ret = unsafe {
epoll_wait(
self.handle,
events.list.as_mut_ptr(),
events.list.len() as c_int,
timeout_ms,
)
};
if ret == -1 {
return Err(io::Error::last_os_error());
}
events.len = ret as usize;
Ok(ret as usize)
events.len = syscall!(epoll_wait(
self.handle,
events.list.as_mut_ptr(),
events.list.len() as libc::c_int,
timeout_ms,
))? as usize;
Ok(events.len)
}
pub fn notify(&self) -> io::Result<()> {
unsafe {
extern "system" {
fn PostQueuedCompletionStatus(
CompletionPort: HANDLE,
dwNumberOfBytesTransferred: u32,
dwCompletionKey: usize,
lpOverlapped: usize,
) -> c_int;
}
PostQueuedCompletionStatus(self.handle, 0, 0, 0);
// This errors if a notification has already been posted, but that's okay.
winapi::um::ioapiset::PostQueuedCompletionStatus(
self.handle as winapi::um::winnt::HANDLE,
0,
0,
0 as *mut _,
);
}
Ok(())
}
@ -838,20 +844,21 @@ mod sys {
self.0
}
}
const READ_FLAGS: u32 = EPOLLIN | EPOLLRDHUP | EPOLLHUP | EPOLLERR | EPOLLPRI;
const WRITE_FLAGS: u32 = EPOLLOUT | EPOLLHUP | EPOLLERR;
const READ_FLAGS: u32 =
we::EPOLLIN | we::EPOLLRDHUP | we::EPOLLHUP | we::EPOLLERR | we::EPOLLPRI;
const WRITE_FLAGS: u32 = we::EPOLLOUT | we::EPOLLHUP | we::EPOLLERR;
pub struct Events {
list: Box<[epoll_event]>,
list: Box<[we::epoll_event]>,
len: usize,
}
unsafe impl Send for Events {}
unsafe impl Sync for Events {}
impl Events {
pub fn new() -> Events {
let ev = epoll_event {
let ev = we::epoll_event {
events: 0,
data: epoll_data { u64: 0 },
data: we::epoll_data { u64: 0 },
};
Events {
list: vec![ev; 1000].into_boxed_slice(),

View File

@ -1,35 +1,3 @@
#[cfg(unix)]
pub mod fcntl {
use super::check_err;
use std::os::unix::io::RawFd;
pub type OFlag = libc::c_int;
pub type FdFlag = libc::c_int;
#[allow(non_camel_case_types)]
#[allow(dead_code)]
/// Arguments passed to `fcntl`.
pub enum FcntlArg {
F_GETFL,
F_SETFL(OFlag),
F_SETFD(FdFlag),
}
/// Thin wrapper around `libc::fcntl`.
///
/// See [`fcntl(2)`](http://man7.org/linux/man-pages/man2/fcntl.2.html) for details.
pub fn fcntl(fd: RawFd, arg: FcntlArg) -> Result<libc::c_int, std::io::Error> {
let res = unsafe {
match arg {
FcntlArg::F_GETFL => libc::fcntl(fd, libc::F_GETFL),
FcntlArg::F_SETFL(flag) => libc::fcntl(fd, libc::F_SETFL, flag),
FcntlArg::F_SETFD(flag) => libc::fcntl(fd, libc::F_SETFD, flag),
}
};
check_err(res)
}
}
#[cfg(unix)]
fn check_err(res: libc::c_int) -> Result<libc::c_int, std::io::Error> {
if res == -1 {
@ -193,59 +161,6 @@ pub mod epoll {
use super::check_err;
use std::os::unix::io::RawFd;
#[macro_use]
mod dlsym {
// Based on https://github.com/tokio-rs/mio/blob/v0.6.x/src/sys/unix/dlsym.rs
// I feel very sad including this code, but I have not found a better way
// to check for the existence of a symbol in Rust.
use std::marker;
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
macro_rules! dlsym {
(fn $name:ident($($t:ty),*) -> $ret:ty) => (
#[allow(bad_style)]
static $name: $crate::sys::epoll::dlsym::DlSym<unsafe extern fn($($t),*) -> $ret> =
$crate::sys::epoll::dlsym::DlSym {
name: concat!(stringify!($name), "\0"),
addr: std::sync::atomic::AtomicUsize::new(0),
_marker: std::marker::PhantomData,
};
)
}
pub struct DlSym<F> {
pub name: &'static str,
pub addr: AtomicUsize,
pub _marker: marker::PhantomData<F>,
}
impl<F> DlSym<F> {
pub fn get(&self) -> Option<&F> {
assert_eq!(mem::size_of::<F>(), mem::size_of::<usize>());
unsafe {
if self.addr.load(Ordering::SeqCst) == 0 {
self.addr.store(fetch(self.name), Ordering::SeqCst);
}
if self.addr.load(Ordering::SeqCst) == 1 {
None
} else {
mem::transmute::<&AtomicUsize, Option<&F>>(&self.addr)
}
}
}
}
unsafe fn fetch(name: &str) -> usize {
assert_eq!(name.as_bytes()[name.len() - 1], 0);
match libc::dlsym(libc::RTLD_DEFAULT, name.as_ptr() as *const _) as usize {
0 => 1,
n => n,
}
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
#[repr(i32)]
pub enum EpollOp {
@ -257,36 +172,39 @@ pub mod epoll {
pub type EpollFlags = libc::c_int;
pub fn epoll_create1() -> Result<RawFd, std::io::Error> {
// According to libuv, `EPOLL_CLOEXEC` is not defined on Android API <
// 21. But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on that platform,
// so we use it instead.
// According to libuv, `EPOLL_CLOEXEC` is not defined on Android API < 21.
// But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on that platform, so we use it instead.
#[cfg(target_os = "android")]
const CLOEXEC: libc::c_int = libc::O_CLOEXEC;
#[cfg(not(target_os = "android"))]
const CLOEXEC: libc::c_int = libc::EPOLL_CLOEXEC;
let fd = unsafe {
// Emulate epoll_create1 if not available.
// Check if the `epoll_create1` symbol is available on this platform.
let ptr = libc::dlsym(
libc::RTLD_DEFAULT,
"epoll_create1\0".as_ptr() as *const libc::c_char,
);
dlsym!(fn epoll_create1(libc::c_int) -> libc::c_int);
match epoll_create1.get() {
Some(epoll_create1_fn) => check_err(epoll_create1_fn(CLOEXEC))?,
None => {
let fd = check_err(libc::epoll_create(1024))?;
drop(set_cloexec(fd));
fd
}
if ptr.is_null() {
// If not, use `epoll_create` and manually set `CLOEXEC`.
let fd = check_err(libc::epoll_create(1024))?;
let flags = libc::fcntl(fd, libc::F_GETFD);
libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC);
fd
} else {
// Use `epoll_create1` with `CLOEXEC`.
let epoll_create1 = std::mem::transmute::<
*mut libc::c_void,
unsafe extern "C" fn(libc::c_int) -> libc::c_int,
>(ptr);
check_err(epoll_create1(CLOEXEC))?
}
};
Ok(fd)
}
unsafe fn set_cloexec(fd: libc::c_int) -> Result<(), std::io::Error> {
let flags = libc::fcntl(fd, libc::F_GETFD);
check_err(libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC)).map(|_| ())
}
pub fn epoll_ctl<'a, T>(
epfd: RawFd,
op: EpollOp,