refactor: use libc for kqueue

This commit is contained in:
dignifiedquire 2020-05-24 16:35:15 +02:00
parent f647532400
commit 63ffbc575c
2 changed files with 163 additions and 44 deletions

View File

@ -538,16 +538,14 @@ mod sys {
use std::os::unix::io::RawFd;
use std::time::Duration;
use crate::sys::event::{kevent_ts, kqueue, EventFilter, EventFlag, FilterFlag, KEvent};
use crate::sys::event::{kevent_ts, kqueue, FilterFlag, KEvent};
use crate::sys::fcntl::{fcntl, FcntlArg};
use crate::sys::libc;
use super::io_err;
pub struct Reactor(RawFd);
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let fd = kqueue().map_err(io_err)?;
let fd = kqueue()?;
fcntl(fd, FcntlArg::F_SETFD(libc::FD_CLOEXEC))?;
Ok(Reactor(fd))
}
@ -555,43 +553,29 @@ mod sys {
Ok(())
}
pub fn reregister(&self, fd: RawFd, key: usize, read: bool, write: bool) -> io::Result<()> {
let mut read_flags = EventFlag::EV_ONESHOT | EventFlag::EV_RECEIPT;
let mut write_flags = EventFlag::EV_ONESHOT | EventFlag::EV_RECEIPT;
let mut read_flags = libc::EV_ONESHOT | libc::EV_RECEIPT;
let mut write_flags = libc::EV_ONESHOT | libc::EV_RECEIPT;
if read {
read_flags |= EventFlag::EV_ADD;
read_flags |= libc::EV_ADD;
} else {
read_flags |= EventFlag::EV_DELETE;
read_flags |= libc::EV_DELETE;
}
if write {
write_flags |= EventFlag::EV_ADD;
write_flags |= libc::EV_ADD;
} else {
write_flags |= EventFlag::EV_DELETE;
write_flags |= libc::EV_DELETE;
}
let udata = key as _;
let changelist = [
KEvent::new(
fd as _,
EventFilter::EVFILT_READ,
read_flags,
FFLAGS,
0,
udata,
),
KEvent::new(
fd as _,
EventFilter::EVFILT_WRITE,
write_flags,
FFLAGS,
0,
udata,
),
KEvent::new(fd as _, libc::EVFILT_READ, read_flags, FFLAGS, 0, udata),
KEvent::new(fd as _, libc::EVFILT_WRITE, write_flags, FFLAGS, 0, udata),
];
let mut eventlist = changelist;
kevent_ts(self.0, &changelist, &mut eventlist, None).map_err(io_err)?;
kevent_ts(self.0, &changelist, &mut eventlist, None)?;
for ev in &eventlist {
// Explanation for ignoring EPIPE: https://github.com/tokio-rs/mio/issues/582
let (flags, data) = (ev.flags(), ev.data());
if flags.contains(EventFlag::EV_ERROR)
if (flags & libc::EV_ERROR) == 1
&& data != 0
&& data != libc::ENOENT as _
&& data != libc::EPIPE as _
@ -602,16 +586,16 @@ mod sys {
Ok(())
}
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
let flags = EventFlag::EV_RECEIPT | EventFlag::EV_DELETE;
let flags = libc::EV_RECEIPT | libc::EV_DELETE;
let changelist = [
KEvent::new(fd as _, EventFilter::EVFILT_WRITE, flags, FFLAGS, 0, 0),
KEvent::new(fd as _, EventFilter::EVFILT_READ, flags, FFLAGS, 0, 0),
KEvent::new(fd as _, libc::EVFILT_WRITE, flags, FFLAGS, 0, 0),
KEvent::new(fd as _, libc::EVFILT_READ, flags, FFLAGS, 0, 0),
];
let mut eventlist = changelist;
kevent_ts(self.0, &changelist, &mut eventlist, None).map_err(io_err)?;
kevent_ts(self.0, &changelist, &mut eventlist, None)?;
for ev in &eventlist {
let (flags, data) = (ev.flags(), ev.data());
if flags.contains(EventFlag::EV_ERROR) && data != 0 && data != libc::ENOENT as _ {
if (flags & libc::EV_ERROR == 1) && data != 0 && data != libc::ENOENT as _ {
return Err(io::Error::from_raw_os_error(data as _));
}
}
@ -622,11 +606,11 @@ mod sys {
tv_sec: t.as_secs() as libc::time_t,
tv_nsec: t.subsec_nanos() as libc::c_long,
});
events.len = kevent_ts(self.0, &[], &mut events.list, timeout).map_err(io_err)?;
events.len = kevent_ts(self.0, &[], &mut events.list, timeout)?;
Ok(events.len)
}
}
const FFLAGS: FilterFlag = FilterFlag::empty();
const FFLAGS: FilterFlag = 0;
pub struct Events {
list: Box<[KEvent]>,
@ -634,16 +618,16 @@ mod sys {
}
impl Events {
pub fn new() -> Events {
let flags = EventFlag::empty();
let event = KEvent::new(0, EventFilter::EVFILT_USER, flags, FFLAGS, 0, 0);
let flags = 0;
let event = KEvent::new(0, libc::EVFILT_USER, flags, FFLAGS, 0, 0);
let list = vec![event; 1000].into_boxed_slice();
let len = 0;
Events { list, len }
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list[..self.len].iter().map(|ev| Event {
readable: ev.filter() == EventFilter::EVFILT_READ,
writable: ev.filter() == EventFilter::EVFILT_WRITE,
readable: ev.filter() == libc::EVFILT_READ,
writable: ev.filter() == libc::EVFILT_WRITE,
key: ev.udata() as usize,
})
}
@ -670,7 +654,7 @@ mod sys {
Ok(Reactor(Epoll::new()?))
}
pub fn register(&self, sock: RawSocket, key: usize) -> io::Result<()> {
self.0.register(&As(sock), EventFlag::empty(), key as u64)
self.0.register(&As(sock), 0, key as u64)
}
pub fn reregister(
&self,
@ -679,7 +663,7 @@ mod sys {
read: bool,
write: bool,
) -> io::Result<()> {
let mut flags = EventFlag::ONESHOT;
let mut flags = libc::ONESHOT;
if read {
flags |= read_flags();
}
@ -710,10 +694,10 @@ mod sys {
}
}
fn read_flags() -> EventFlag {
EventFlag::IN | EventFlag::RDHUP
libc::IN | libc::RDHUP
}
fn write_flags() -> EventFlag {
EventFlag::OUT
libc::OUT
}
pub struct Events(wepoll_binding::Events);

View File

@ -106,7 +106,142 @@ fn check_err(res: libc::c_int) -> Result<libc::c_int, std::io::Error> {
))]
/// Kqueue.
pub mod event {
pub use nix::sys::event::{kevent_ts, kqueue, EventFilter, EventFlag, FilterFlag, KEvent};
use super::check_err;
use std::os::unix::io::RawFd;
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "dragonfly",
target_os = "openbsd"
))]
#[allow(non_camel_case_types)]
type type_of_nchanges = libc::c_int;
#[cfg(target_os = "netbsd")]
#[allow(non_camel_case_types)]
type type_of_nchanges = libc::size_t;
#[cfg(target_os = "netbsd")]
#[allow(non_camel_case_types)]
type type_of_event_filter = u32;
#[cfg(not(target_os = "netbsd"))]
#[allow(non_camel_case_types)]
type type_of_event_filter = i16;
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "openbsd"
))]
#[allow(non_camel_case_types)]
type type_of_udata = *mut libc::c_void;
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos"
))]
#[allow(non_camel_case_types)]
type type_of_data = libc::intptr_t;
#[cfg(any(target_os = "netbsd"))]
#[allow(non_camel_case_types)]
type type_of_udata = libc::intptr_t;
#[cfg(any(target_os = "netbsd", target_os = "openbsd"))]
#[allow(non_camel_case_types)]
type type_of_data = libc::int64_t;
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
#[repr(C)]
pub struct KEvent(libc::kevent);
unsafe impl Send for KEvent {}
impl KEvent {
pub fn new(
ident: libc::uintptr_t,
filter: EventFilter,
flags: EventFlag,
fflags: FilterFlag,
data: libc::intptr_t,
udata: libc::intptr_t,
) -> KEvent {
KEvent(libc::kevent {
ident,
filter: filter as type_of_event_filter,
flags,
fflags,
data: data as type_of_data,
udata: udata as type_of_udata,
})
}
pub fn filter(&self) -> EventFilter {
unsafe { std::mem::transmute(self.0.filter as type_of_event_filter) }
}
pub fn flags(&self) -> EventFlag {
self.0.flags
}
pub fn data(&self) -> libc::intptr_t {
self.0.data as libc::intptr_t
}
pub fn udata(&self) -> libc::intptr_t {
self.0.udata as libc::intptr_t
}
}
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "openbsd"
))]
pub type EventFlag = u16;
#[cfg(any(target_os = "netbsd"))]
pub type EventFlag = u32;
pub type FilterFlag = u32;
#[cfg(target_os = "netbsd")]
pub type EventFilter = u32;
#[cfg(not(target_os = "netbsd"))]
pub type EventFilter = i16;
pub fn kqueue() -> Result<RawFd, std::io::Error> {
let res = unsafe { libc::kqueue() };
check_err(res)
}
pub fn kevent_ts(
kq: RawFd,
changelist: &[KEvent],
eventlist: &mut [KEvent],
timeout_opt: Option<libc::timespec>,
) -> Result<usize, std::io::Error> {
let res = unsafe {
libc::kevent(
kq,
changelist.as_ptr() as *const libc::kevent,
changelist.len() as type_of_nchanges,
eventlist.as_mut_ptr() as *mut libc::kevent,
eventlist.len() as type_of_nchanges,
if let Some(ref timeout) = timeout_opt {
timeout as *const libc::timespec
} else {
std::ptr::null()
},
)
};
check_err(res).map(|r| r as usize)
}
}
#[cfg(unix)]