diff --git a/Cargo.toml b/Cargo.toml index 2915bfa..af7b526 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,7 @@ optional = true libc = "0.2.70" [target.'cfg(windows)'.dependencies] -wepoll-binding = "2.0.2" +wepoll-sys = "2.0.0" [dev-dependencies] futures = { version = "0.3.5", default-features = false, features = ["std"] } diff --git a/src/reactor.rs b/src/reactor.rs index 475f21c..d1ae330 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -486,7 +486,7 @@ mod sys { pub fn new() -> io::Result { let epoll_fd = epoll_create1()?; let io_event = Lazy::::new(|| IoEvent::new().unwrap()); - Ok(Reactor { epoll_fd }) + Ok(Reactor { epoll_fd, io_event }) } pub fn register(&self, fd: RawFd, key: usize) -> io::Result<()> { let ev = &mut EpollEvent::new(0, key as u64); @@ -694,28 +694,43 @@ mod sys { /// Raw bindings to wepoll (Windows). #[cfg(target_os = "windows")] mod sys { + use std::convert::TryInto; use std::io; + use std::os::raw::c_int; use std::os::windows::io::{AsRawSocket, RawSocket}; use std::time::Duration; use once_cell::sync::Lazy; - use wepoll_binding::{Epoll, EventFlag}; + use wepoll_sys::*; use crate::io_event::IoEvent; pub struct Reactor { - epoll: Epoll, + handle: HANDLE, io_event: Lazy, } + unsafe impl Send for Reactor {} + unsafe impl Sync for Reactor {} impl Reactor { pub fn new() -> io::Result { - let epoll = Epoll::new()?; + let handle = unsafe { epoll_create1(0) }; + if handle.is_null() { + return Err(io::Error::last_os_error()); + } let io_event = Lazy::::new(|| IoEvent::new().unwrap()); - Ok(Reactor { epoll, io_event }) + Ok(Reactor { handle, io_event }) } pub fn register(&self, sock: RawSocket, key: usize) -> io::Result<()> { - self.epoll - .register(&As(sock), EventFlag::empty(), key as u64) + let mut ev = epoll_event { + events: 0, + data: epoll_data { u64: key as u64 }, + }; + let ret = + unsafe { epoll_ctl(self.handle, EPOLL_CTL_ADD as c_int, sock as SOCKET, &mut ev) }; + if ret == -1 { + return Err(io::Error::last_os_error()); + } + Ok(()) } pub fn reregister( &self, @@ -724,30 +739,66 @@ mod sys { read: bool, write: bool, ) -> io::Result<()> { - let mut flags = EventFlag::ONESHOT; + let mut flags = EPOLLONESHOT; if read { flags |= read_flags(); } if write { flags |= write_flags(); } - self.epoll.reregister(&As(sock), flags, key as u64) + let mut ev = epoll_event { + events: flags as u32, + data: epoll_data { u64: key as u64 }, + }; + let ret = + unsafe { epoll_ctl(self.handle, EPOLL_CTL_MOD as c_int, sock as SOCKET, &mut ev) }; + if ret == -1 { + return Err(io::Error::last_os_error()); + } + Ok(()) } pub fn deregister(&self, sock: RawSocket) -> io::Result<()> { - self.epoll.deregister(&As(sock)) + let ret = unsafe { + epoll_ctl( + self.handle, + EPOLL_CTL_DEL as c_int, + sock as SOCKET, + 0 as *mut epoll_event, + ) + }; + if ret == -1 { + return Err(io::Error::last_os_error()); + } + Ok(()) } pub fn wait(&self, events: &mut Events, timeout: Option) -> io::Result { - let timeout = timeout.map(|t| { - if t == Duration::from_millis(0) { - t - } else { - t.max(Duration::from_millis(1)) + let timeout_ms = match timeout { + None => -1, + Some(t) => { + if t == Duration::from_millis(0) { + 0 + } else { + t.max(Duration::from_millis(1)) + .as_millis() + .try_into() + .unwrap_or(c_int::MAX) + } } - }); - events.0.clear(); - let res = self.epoll.poll(&mut events.0, timeout); + }; + let ret = unsafe { + epoll_wait( + self.handle, + events.list.as_mut_ptr(), + events.list.len() as c_int, + timeout_ms, + ) + }; + if ret == -1 { + return Err(io::Error::last_os_error()); + } self.io_event.clear(); - res + events.len = ret as usize; + Ok(ret as usize) } pub fn notify(&self) -> io::Result<()> { self.io_event.notify(); @@ -760,23 +811,35 @@ mod sys { self.0 } } - fn read_flags() -> EventFlag { - EventFlag::IN | EventFlag::RDHUP | EventFlag::HUP | EventFlag::ERR | EventFlag::PRI + fn read_flags() -> u32 { + EPOLLIN | EPOLLRDHUP | EPOLLHUP | EPOLLERR | EPOLLPRI } - fn write_flags() -> EventFlag { - EventFlag::OUT | EventFlag::HUP | EventFlag::ERR + fn write_flags() -> u32 { + EPOLLOUT | EPOLLHUP | EPOLLERR } - pub struct Events(wepoll_binding::Events); + pub struct Events { + list: Box<[epoll_event]>, + len: usize, + } + unsafe impl Send for Events {} + unsafe impl Sync for Events {} impl Events { pub fn new() -> Events { - Events(wepoll_binding::Events::with_capacity(1000)) + let ev = epoll_event { + events: 0, + data: epoll_data { u64: 0 }, + }; + Events { + list: vec![ev; 1000].into_boxed_slice(), + len: 0, + } } pub fn iter(&self) -> impl Iterator + '_ { - self.0.iter().map(|ev| Event { - readable: ev.flags().intersects(read_flags()), - writable: ev.flags().intersects(write_flags()), - key: ev.data() as usize, + self.list[..self.len].iter().map(|ev| Event { + readable: (ev.events & read_flags()) != 0, + writable: (ev.events & write_flags()) != 0, + key: unsafe { ev.data.u64 } as usize, }) } }