Replace wepoll-binding with wepoll-sys

This commit is contained in:
Stjepan Glavina 2020-06-21 15:32:56 +02:00
parent d27dd3c5cb
commit ecd832eb31
2 changed files with 93 additions and 30 deletions

View File

@ -47,7 +47,7 @@ optional = true
libc = "0.2.70" libc = "0.2.70"
[target.'cfg(windows)'.dependencies] [target.'cfg(windows)'.dependencies]
wepoll-binding = "2.0.2" wepoll-sys = "2.0.0"
[dev-dependencies] [dev-dependencies]
futures = { version = "0.3.5", default-features = false, features = ["std"] } futures = { version = "0.3.5", default-features = false, features = ["std"] }

View File

@ -486,7 +486,7 @@ mod sys {
pub fn new() -> io::Result<Reactor> { pub fn new() -> io::Result<Reactor> {
let epoll_fd = epoll_create1()?; let epoll_fd = epoll_create1()?;
let io_event = Lazy::<IoEvent>::new(|| IoEvent::new().unwrap()); let io_event = Lazy::<IoEvent>::new(|| IoEvent::new().unwrap());
Ok(Reactor { epoll_fd }) Ok(Reactor { epoll_fd, io_event })
} }
pub fn register(&self, fd: RawFd, key: usize) -> io::Result<()> { pub fn register(&self, fd: RawFd, key: usize) -> io::Result<()> {
let ev = &mut EpollEvent::new(0, key as u64); let ev = &mut EpollEvent::new(0, key as u64);
@ -694,28 +694,43 @@ mod sys {
/// Raw bindings to wepoll (Windows). /// Raw bindings to wepoll (Windows).
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
mod sys { mod sys {
use std::convert::TryInto;
use std::io; use std::io;
use std::os::raw::c_int;
use std::os::windows::io::{AsRawSocket, RawSocket}; use std::os::windows::io::{AsRawSocket, RawSocket};
use std::time::Duration; use std::time::Duration;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use wepoll_binding::{Epoll, EventFlag}; use wepoll_sys::*;
use crate::io_event::IoEvent; use crate::io_event::IoEvent;
pub struct Reactor { pub struct Reactor {
epoll: Epoll, handle: HANDLE,
io_event: Lazy<IoEvent>, io_event: Lazy<IoEvent>,
} }
unsafe impl Send for Reactor {}
unsafe impl Sync for Reactor {}
impl Reactor { impl Reactor {
pub fn new() -> io::Result<Reactor> { pub fn new() -> io::Result<Reactor> {
let epoll = Epoll::new()?; let handle = unsafe { epoll_create1(0) };
if handle.is_null() {
return Err(io::Error::last_os_error());
}
let io_event = Lazy::<IoEvent>::new(|| IoEvent::new().unwrap()); let io_event = Lazy::<IoEvent>::new(|| IoEvent::new().unwrap());
Ok(Reactor { epoll, io_event }) Ok(Reactor { handle, io_event })
} }
pub fn register(&self, sock: RawSocket, key: usize) -> io::Result<()> { pub fn register(&self, sock: RawSocket, key: usize) -> io::Result<()> {
self.epoll let mut ev = epoll_event {
.register(&As(sock), EventFlag::empty(), key as u64) events: 0,
data: epoll_data { u64: key as u64 },
};
let ret =
unsafe { epoll_ctl(self.handle, EPOLL_CTL_ADD as c_int, sock as SOCKET, &mut ev) };
if ret == -1 {
return Err(io::Error::last_os_error());
}
Ok(())
} }
pub fn reregister( pub fn reregister(
&self, &self,
@ -724,30 +739,66 @@ mod sys {
read: bool, read: bool,
write: bool, write: bool,
) -> io::Result<()> { ) -> io::Result<()> {
let mut flags = EventFlag::ONESHOT; let mut flags = EPOLLONESHOT;
if read { if read {
flags |= read_flags(); flags |= read_flags();
} }
if write { if write {
flags |= write_flags(); flags |= write_flags();
} }
self.epoll.reregister(&As(sock), flags, key as u64) let mut ev = epoll_event {
events: flags as u32,
data: epoll_data { u64: key as u64 },
};
let ret =
unsafe { epoll_ctl(self.handle, EPOLL_CTL_MOD as c_int, sock as SOCKET, &mut ev) };
if ret == -1 {
return Err(io::Error::last_os_error());
}
Ok(())
} }
pub fn deregister(&self, sock: RawSocket) -> io::Result<()> { pub fn deregister(&self, sock: RawSocket) -> io::Result<()> {
self.epoll.deregister(&As(sock)) let ret = unsafe {
epoll_ctl(
self.handle,
EPOLL_CTL_DEL as c_int,
sock as SOCKET,
0 as *mut epoll_event,
)
};
if ret == -1 {
return Err(io::Error::last_os_error());
}
Ok(())
} }
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> { pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout = timeout.map(|t| { let timeout_ms = match timeout {
if t == Duration::from_millis(0) { None => -1,
t Some(t) => {
} else { if t == Duration::from_millis(0) {
t.max(Duration::from_millis(1)) 0
} else {
t.max(Duration::from_millis(1))
.as_millis()
.try_into()
.unwrap_or(c_int::MAX)
}
} }
}); };
events.0.clear(); let ret = unsafe {
let res = self.epoll.poll(&mut events.0, timeout); epoll_wait(
self.handle,
events.list.as_mut_ptr(),
events.list.len() as c_int,
timeout_ms,
)
};
if ret == -1 {
return Err(io::Error::last_os_error());
}
self.io_event.clear(); self.io_event.clear();
res events.len = ret as usize;
Ok(ret as usize)
} }
pub fn notify(&self) -> io::Result<()> { pub fn notify(&self) -> io::Result<()> {
self.io_event.notify(); self.io_event.notify();
@ -760,23 +811,35 @@ mod sys {
self.0 self.0
} }
} }
fn read_flags() -> EventFlag { fn read_flags() -> u32 {
EventFlag::IN | EventFlag::RDHUP | EventFlag::HUP | EventFlag::ERR | EventFlag::PRI EPOLLIN | EPOLLRDHUP | EPOLLHUP | EPOLLERR | EPOLLPRI
} }
fn write_flags() -> EventFlag { fn write_flags() -> u32 {
EventFlag::OUT | EventFlag::HUP | EventFlag::ERR EPOLLOUT | EPOLLHUP | EPOLLERR
} }
pub struct Events(wepoll_binding::Events); pub struct Events {
list: Box<[epoll_event]>,
len: usize,
}
unsafe impl Send for Events {}
unsafe impl Sync for Events {}
impl Events { impl Events {
pub fn new() -> Events { pub fn new() -> Events {
Events(wepoll_binding::Events::with_capacity(1000)) let ev = epoll_event {
events: 0,
data: epoll_data { u64: 0 },
};
Events {
list: vec![ev; 1000].into_boxed_slice(),
len: 0,
}
} }
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ { pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.0.iter().map(|ev| Event { self.list[..self.len].iter().map(|ev| Event {
readable: ev.flags().intersects(read_flags()), readable: (ev.events & read_flags()) != 0,
writable: ev.flags().intersects(write_flags()), writable: (ev.events & write_flags()) != 0,
key: ev.data() as usize, key: unsafe { ev.data.u64 } as usize,
}) })
} }
} }