polling/src/kqueue.rs

235 lines
7.0 KiB
Rust
Raw Normal View History

2020-08-06 13:02:59 +00:00
//! Bindings to kqueue (macOS, iOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD).
use std::io::{self, Read, Write};
use std::mem;
2020-08-06 13:02:59 +00:00
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::UnixStream;
use std::ptr;
use std::time::Duration;
#[cfg(not(polling_no_io_safety))]
use std::os::unix::io::{AsFd, BorrowedFd};
2020-08-06 13:02:59 +00:00
use crate::Event;
/// Interface to kqueue.
#[derive(Debug)]
pub struct Poller {
/// File descriptor for the kqueue instance.
kqueue_fd: RawFd,
/// Read side of a pipe for consuming notifications.
read_stream: UnixStream,
/// Write side of a pipe for producing notifications.
write_stream: UnixStream,
}
impl Poller {
/// Creates a new poller.
pub fn new() -> io::Result<Poller> {
// Create a kqueue instance.
let kqueue_fd = syscall!(kqueue())?;
syscall!(fcntl(kqueue_fd, libc::F_SETFD, libc::FD_CLOEXEC))?;
// Set up the notification pipe.
let (read_stream, write_stream) = UnixStream::pair()?;
read_stream.set_nonblocking(true)?;
write_stream.set_nonblocking(true)?;
2020-10-02 14:40:09 +00:00
2020-08-06 13:02:59 +00:00
let poller = Poller {
kqueue_fd,
read_stream,
write_stream,
};
poller.add(
2020-08-06 13:02:59 +00:00
poller.read_stream.as_raw_fd(),
Event {
2020-09-01 04:48:09 +00:00
key: crate::NOTIFY_KEY,
2020-08-06 13:02:59 +00:00
readable: true,
writable: false,
},
)?;
2020-09-03 10:55:46 +00:00
log::trace!(
"new: kqueue_fd={}, read_stream={:?}",
kqueue_fd,
poller.read_stream
);
2020-08-06 13:02:59 +00:00
Ok(poller)
}
/// Adds a new file descriptor.
pub fn add(&self, fd: RawFd, ev: Event) -> io::Result<()> {
2020-10-02 14:40:09 +00:00
// File descriptors don't need to be added explicitly, so just modify the interest.
self.modify(fd, ev)
}
/// Modifies an existing file descriptor.
pub fn modify(&self, fd: RawFd, ev: Event) -> io::Result<()> {
2020-08-29 15:34:45 +00:00
if fd != self.read_stream.as_raw_fd() {
log::trace!("add: kqueue_fd={}, fd={}, ev={:?}", self.kqueue_fd, fd, ev);
2020-08-29 15:34:45 +00:00
}
2020-08-29 14:00:59 +00:00
2020-10-02 14:40:09 +00:00
let read_flags = if ev.readable {
libc::EV_ADD | libc::EV_ONESHOT
2020-08-06 13:02:59 +00:00
} else {
2020-10-02 14:40:09 +00:00
libc::EV_DELETE
};
let write_flags = if ev.writable {
libc::EV_ADD | libc::EV_ONESHOT
2020-08-06 13:02:59 +00:00
} else {
2020-10-02 14:40:09 +00:00
libc::EV_DELETE
};
2020-08-06 13:02:59 +00:00
// A list of changes for kqueue.
let changelist = [
libc::kevent {
ident: fd as _,
filter: libc::EVFILT_READ,
2020-10-02 14:40:09 +00:00
flags: read_flags | libc::EV_RECEIPT,
2020-08-06 13:02:59 +00:00
udata: ev.key as _,
..unsafe { mem::zeroed() }
2020-08-06 13:02:59 +00:00
},
libc::kevent {
ident: fd as _,
filter: libc::EVFILT_WRITE,
2020-10-02 14:40:09 +00:00
flags: write_flags | libc::EV_RECEIPT,
2020-08-06 13:02:59 +00:00
udata: ev.key as _,
..unsafe { mem::zeroed() }
2020-08-06 13:02:59 +00:00
},
];
// Apply changes.
let mut eventlist = changelist;
syscall!(kevent(
self.kqueue_fd,
changelist.as_ptr() as *const libc::kevent,
changelist.len() as _,
eventlist.as_mut_ptr() as *mut libc::kevent,
eventlist.len() as _,
ptr::null(),
))?;
// Check for errors.
for ev in &eventlist {
// Explanation for ignoring EPIPE: https://github.com/tokio-rs/mio/issues/582
if (ev.flags & libc::EV_ERROR) != 0
&& ev.data != 0
&& ev.data != libc::ENOENT as _
&& ev.data != libc::EPIPE as _
{
return Err(io::Error::from_raw_os_error(ev.data as _));
}
}
Ok(())
}
2020-10-02 14:40:09 +00:00
/// Deletes a file descriptor.
pub fn delete(&self, fd: RawFd) -> io::Result<()> {
2020-10-02 14:40:09 +00:00
// Simply delete interest in the file descriptor.
self.modify(fd, Event::none(0))
2020-08-06 13:02:59 +00:00
}
/// Waits for I/O events with an optional timeout.
2020-08-14 12:38:02 +00:00
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
2020-09-03 10:55:46 +00:00
log::trace!("wait: kqueue_fd={}, timeout={:?}", self.kqueue_fd, timeout);
2020-08-29 14:00:59 +00:00
2020-08-06 13:02:59 +00:00
// Convert the `Duration` to `libc::timespec`.
let timeout = timeout.map(|t| libc::timespec {
tv_sec: t.as_secs() as libc::time_t,
tv_nsec: t.subsec_nanos() as libc::c_long,
});
// Wait for I/O events.
let changelist = [];
let eventlist = &mut events.list;
let res = syscall!(kevent(
self.kqueue_fd,
changelist.as_ptr() as *const libc::kevent,
changelist.len() as _,
eventlist.as_mut_ptr() as *mut libc::kevent,
eventlist.len() as _,
match &timeout {
None => ptr::null(),
Some(t) => t,
}
))?;
events.len = res as usize;
2020-08-29 14:00:59 +00:00
log::trace!("new events: kqueue_fd={}, res={}", self.kqueue_fd, res);
2020-08-06 13:02:59 +00:00
// Clear the notification (if received) and re-register interest in it.
while (&self.read_stream).read(&mut [0; 64]).is_ok() {}
self.modify(
2020-08-06 13:02:59 +00:00
self.read_stream.as_raw_fd(),
Event {
2020-09-01 04:48:09 +00:00
key: crate::NOTIFY_KEY,
2020-08-06 13:02:59 +00:00
readable: true,
writable: false,
},
)?;
2020-08-14 12:38:02 +00:00
Ok(())
2020-08-06 13:02:59 +00:00
}
/// Sends a notification to wake up the current or next `wait()` call.
pub fn notify(&self) -> io::Result<()> {
2020-09-03 10:55:46 +00:00
log::trace!("notify: kqueue_fd={}", self.kqueue_fd);
2020-08-06 13:02:59 +00:00
let _ = (&self.write_stream).write(&[1]);
Ok(())
}
}
impl AsRawFd for Poller {
fn as_raw_fd(&self) -> RawFd {
self.kqueue_fd
}
}
#[cfg(not(polling_no_io_safety))]
impl AsFd for Poller {
fn as_fd(&self) -> BorrowedFd<'_> {
// SAFETY: lifetime is bound by "self"
unsafe { BorrowedFd::borrow_raw(self.kqueue_fd) }
}
}
2020-08-06 13:02:59 +00:00
impl Drop for Poller {
fn drop(&mut self) {
2020-09-03 10:55:46 +00:00
log::trace!("drop: kqueue_fd={}", self.kqueue_fd);
let _ = self.delete(self.read_stream.as_raw_fd());
2020-08-06 13:02:59 +00:00
let _ = syscall!(close(self.kqueue_fd));
}
}
/// A list of reported I/O events.
pub struct Events {
2022-08-21 12:09:43 +00:00
list: Box<[libc::kevent; 1024]>,
2020-08-06 13:02:59 +00:00
len: usize,
}
unsafe impl Send for Events {}
impl Events {
/// Creates an empty list.
pub fn new() -> Events {
let ev: libc::kevent = unsafe { mem::zeroed() };
2022-08-21 12:09:43 +00:00
let list = Box::new([ev; 1024]);
2020-08-06 13:02:59 +00:00
let len = 0;
Events { list, len }
}
/// Iterates over I/O events.
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
// On some platforms, closing the read end of a pipe wakes up writers, but the
// event is reported as EVFILT_READ with the EV_EOF flag.
//
// https://github.com/golang/go/commit/23aad448b1e3f7c3b4ba2af90120bde91ac865b4
self.list[..self.len].iter().map(|ev| Event {
key: ev.udata as usize,
readable: ev.filter == libc::EVFILT_READ,
writable: ev.filter == libc::EVFILT_WRITE
|| (ev.filter == libc::EVFILT_READ && (ev.flags & libc::EV_EOF) != 0),
})
}
}