mirror of https://github.com/smol-rs/polling
m: Use EVFILT_USER instead of a self-pipe on kqueue where supported (#73)
* Use EVFILT_USER instead of a self-pipe * Fix kqueue flags
This commit is contained in:
parent
729b5ee071
commit
27f23a9384
253
src/kqueue.rs
253
src/kqueue.rs
|
@ -1,9 +1,8 @@
|
|||
//! Bindings to kqueue (macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD).
|
||||
|
||||
use std::io::{self, Read, Write};
|
||||
use std::io;
|
||||
use std::mem;
|
||||
use std::os::unix::io::{AsRawFd, RawFd};
|
||||
use std::os::unix::net::UnixStream;
|
||||
use std::ptr;
|
||||
use std::time::Duration;
|
||||
|
||||
|
@ -17,10 +16,12 @@ use crate::{Event, PollMode};
|
|||
pub struct Poller {
|
||||
/// File descriptor for the kqueue instance.
|
||||
kqueue_fd: RawFd,
|
||||
/// Read side of a pipe for consuming notifications.
|
||||
read_stream: UnixStream,
|
||||
/// Write side of a pipe for producing notifications.
|
||||
write_stream: UnixStream,
|
||||
|
||||
/// Notification pipe for waking up the poller.
|
||||
///
|
||||
/// On platforms that support `EVFILT_USER`, this uses that to wake up the poller. Otherwise, it
|
||||
/// uses a pipe.
|
||||
notify: notify::Notify,
|
||||
}
|
||||
|
||||
impl Poller {
|
||||
|
@ -30,31 +31,15 @@ impl Poller {
|
|||
let kqueue_fd = syscall!(kqueue())?;
|
||||
syscall!(fcntl(kqueue_fd, libc::F_SETFD, libc::FD_CLOEXEC))?;
|
||||
|
||||
// Set up the notification pipe.
|
||||
let (read_stream, write_stream) = UnixStream::pair()?;
|
||||
read_stream.set_nonblocking(true)?;
|
||||
write_stream.set_nonblocking(true)?;
|
||||
|
||||
let poller = Poller {
|
||||
kqueue_fd,
|
||||
read_stream,
|
||||
write_stream,
|
||||
notify: notify::Notify::new()?,
|
||||
};
|
||||
poller.add(
|
||||
poller.read_stream.as_raw_fd(),
|
||||
Event {
|
||||
key: crate::NOTIFY_KEY,
|
||||
readable: true,
|
||||
writable: false,
|
||||
},
|
||||
PollMode::Oneshot,
|
||||
)?;
|
||||
|
||||
log::trace!(
|
||||
"new: kqueue_fd={}, read_stream={:?}",
|
||||
kqueue_fd,
|
||||
poller.read_stream
|
||||
);
|
||||
// Register the notification pipe.
|
||||
poller.notify.register(&poller)?;
|
||||
|
||||
log::trace!("new: kqueue_fd={}", kqueue_fd,);
|
||||
Ok(poller)
|
||||
}
|
||||
|
||||
|
@ -76,7 +61,7 @@ impl Poller {
|
|||
|
||||
/// Modifies an existing file descriptor.
|
||||
pub fn modify(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
|
||||
if fd != self.read_stream.as_raw_fd() {
|
||||
if !self.notify.has_fd(fd) {
|
||||
log::trace!("add: kqueue_fd={}, fd={}, ev={:?}", self.kqueue_fd, fd, ev);
|
||||
}
|
||||
|
||||
|
@ -116,18 +101,33 @@ impl Poller {
|
|||
];
|
||||
|
||||
// Apply changes.
|
||||
self.submit_changes(changelist)
|
||||
}
|
||||
|
||||
/// Submit one or more changes to the kernel queue and check to see if they succeeded.
|
||||
fn submit_changes<A>(&self, changelist: A) -> io::Result<()>
|
||||
where
|
||||
A: Copy + AsRef<[libc::kevent]> + AsMut<[libc::kevent]>,
|
||||
{
|
||||
let mut eventlist = changelist;
|
||||
syscall!(kevent(
|
||||
self.kqueue_fd,
|
||||
changelist.as_ptr() as *const libc::kevent,
|
||||
changelist.len() as _,
|
||||
eventlist.as_mut_ptr() as *mut libc::kevent,
|
||||
eventlist.len() as _,
|
||||
ptr::null(),
|
||||
))?;
|
||||
|
||||
// Apply changes.
|
||||
{
|
||||
let changelist = changelist.as_ref();
|
||||
let eventlist = eventlist.as_mut();
|
||||
|
||||
syscall!(kevent(
|
||||
self.kqueue_fd,
|
||||
changelist.as_ptr() as *const libc::kevent,
|
||||
changelist.len() as _,
|
||||
eventlist.as_mut_ptr() as *mut libc::kevent,
|
||||
eventlist.len() as _,
|
||||
ptr::null(),
|
||||
))?;
|
||||
}
|
||||
|
||||
// Check for errors.
|
||||
for ev in &eventlist {
|
||||
for &ev in eventlist.as_ref() {
|
||||
// Explanation for ignoring EPIPE: https://github.com/tokio-rs/mio/issues/582
|
||||
if (ev.flags & libc::EV_ERROR) != 0
|
||||
&& ev.data != 0
|
||||
|
@ -175,16 +175,7 @@ impl Poller {
|
|||
log::trace!("new events: kqueue_fd={}, res={}", self.kqueue_fd, res);
|
||||
|
||||
// Clear the notification (if received) and re-register interest in it.
|
||||
while (&self.read_stream).read(&mut [0; 64]).is_ok() {}
|
||||
self.modify(
|
||||
self.read_stream.as_raw_fd(),
|
||||
Event {
|
||||
key: crate::NOTIFY_KEY,
|
||||
readable: true,
|
||||
writable: false,
|
||||
},
|
||||
PollMode::Oneshot,
|
||||
)?;
|
||||
self.notify.reregister(self)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -192,7 +183,7 @@ impl Poller {
|
|||
/// Sends a notification to wake up the current or next `wait()` call.
|
||||
pub fn notify(&self) -> io::Result<()> {
|
||||
log::trace!("notify: kqueue_fd={}", self.kqueue_fd);
|
||||
let _ = (&self.write_stream).write(&[1]);
|
||||
self.notify.notify(self).ok();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -214,7 +205,7 @@ impl AsFd for Poller {
|
|||
impl Drop for Poller {
|
||||
fn drop(&mut self) {
|
||||
log::trace!("drop: kqueue_fd={}", self.kqueue_fd);
|
||||
let _ = self.delete(self.read_stream.as_raw_fd());
|
||||
let _ = self.notify.deregister(self);
|
||||
let _ = syscall!(close(self.kqueue_fd));
|
||||
}
|
||||
}
|
||||
|
@ -250,3 +241,167 @@ impl Events {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(
|
||||
target_os = "freebsd",
|
||||
target_os = "dragonfly",
|
||||
target_os = "macos",
|
||||
target_os = "ios",
|
||||
target_os = "tvos",
|
||||
target_os = "watchos",
|
||||
))]
|
||||
mod notify {
|
||||
use super::Poller;
|
||||
use std::os::unix::io::RawFd;
|
||||
use std::{io, mem};
|
||||
|
||||
/// A notification pipe.
|
||||
///
|
||||
/// This implementation uses `EVFILT_USER` to avoid allocating a pipe.
|
||||
#[derive(Debug)]
|
||||
pub(super) struct Notify;
|
||||
|
||||
impl Notify {
|
||||
/// Creates a new notification pipe.
|
||||
pub(super) fn new() -> io::Result<Self> {
|
||||
Ok(Self)
|
||||
}
|
||||
|
||||
/// Registers this notification pipe in the `Poller`.
|
||||
pub(super) fn register(&self, poller: &Poller) -> io::Result<()> {
|
||||
// Register an EVFILT_USER event.
|
||||
poller.submit_changes([libc::kevent {
|
||||
ident: 0,
|
||||
filter: libc::EVFILT_USER,
|
||||
flags: libc::EV_ADD | libc::EV_RECEIPT | libc::EV_CLEAR,
|
||||
udata: crate::NOTIFY_KEY as _,
|
||||
..unsafe { mem::zeroed() }
|
||||
}])
|
||||
}
|
||||
|
||||
/// Reregister this notification pipe in the `Poller`.
|
||||
pub(super) fn reregister(&self, _poller: &Poller) -> io::Result<()> {
|
||||
// We don't need to do anything, it's already registered as EV_CLEAR.
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Notifies the `Poller`.
|
||||
pub(super) fn notify(&self, poller: &Poller) -> io::Result<()> {
|
||||
// Trigger the EVFILT_USER event.
|
||||
poller.submit_changes([libc::kevent {
|
||||
ident: 0,
|
||||
filter: libc::EVFILT_USER,
|
||||
flags: libc::EV_ADD | libc::EV_RECEIPT,
|
||||
fflags: libc::NOTE_TRIGGER,
|
||||
udata: crate::NOTIFY_KEY as _,
|
||||
..unsafe { mem::zeroed() }
|
||||
}])?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Deregisters this notification pipe from the `Poller`.
|
||||
pub(super) fn deregister(&self, poller: &Poller) -> io::Result<()> {
|
||||
// Deregister the EVFILT_USER event.
|
||||
poller.submit_changes([libc::kevent {
|
||||
ident: 0,
|
||||
filter: libc::EVFILT_USER,
|
||||
flags: libc::EV_RECEIPT | libc::EV_DELETE,
|
||||
udata: crate::NOTIFY_KEY as _,
|
||||
..unsafe { mem::zeroed() }
|
||||
}])
|
||||
}
|
||||
|
||||
/// Whether this raw file descriptor is associated with this pipe.
|
||||
pub(super) fn has_fd(&self, _fd: RawFd) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(any(
|
||||
target_os = "freebsd",
|
||||
target_os = "dragonfly",
|
||||
target_os = "macos",
|
||||
target_os = "ios",
|
||||
target_os = "tvos",
|
||||
target_os = "watchos",
|
||||
)))]
|
||||
mod notify {
|
||||
use super::Poller;
|
||||
use crate::{Event, PollMode, NOTIFY_KEY};
|
||||
use std::io::{self, prelude::*};
|
||||
use std::os::unix::{
|
||||
io::{AsRawFd, RawFd},
|
||||
net::UnixStream,
|
||||
};
|
||||
|
||||
/// A notification pipe.
|
||||
///
|
||||
/// This implementation uses a pipe to send notifications.
|
||||
#[derive(Debug)]
|
||||
pub(super) struct Notify {
|
||||
/// The read end of the pipe.
|
||||
read_stream: UnixStream,
|
||||
|
||||
/// The write end of the pipe.
|
||||
write_stream: UnixStream,
|
||||
}
|
||||
|
||||
impl Notify {
|
||||
/// Creates a new notification pipe.
|
||||
pub(super) fn new() -> io::Result<Self> {
|
||||
let (read_stream, write_stream) = UnixStream::pair()?;
|
||||
read_stream.set_nonblocking(true)?;
|
||||
write_stream.set_nonblocking(true)?;
|
||||
|
||||
Ok(Self {
|
||||
read_stream,
|
||||
write_stream,
|
||||
})
|
||||
}
|
||||
|
||||
/// Registers this notification pipe in the `Poller`.
|
||||
pub(super) fn register(&self, poller: &Poller) -> io::Result<()> {
|
||||
// Register the read end of this pipe.
|
||||
poller.add(
|
||||
self.read_stream.as_raw_fd(),
|
||||
Event::readable(NOTIFY_KEY),
|
||||
PollMode::Oneshot,
|
||||
)
|
||||
}
|
||||
|
||||
/// Reregister this notification pipe in the `Poller`.
|
||||
pub(super) fn reregister(&self, poller: &Poller) -> io::Result<()> {
|
||||
// Clear out the notification.
|
||||
while (&self.read_stream).read(&mut [0; 64]).is_ok() {}
|
||||
|
||||
// Reregister the read end of this pipe.
|
||||
poller.modify(
|
||||
self.read_stream.as_raw_fd(),
|
||||
Event::readable(NOTIFY_KEY),
|
||||
PollMode::Oneshot,
|
||||
)
|
||||
}
|
||||
|
||||
/// Notifies the `Poller`.
|
||||
#[allow(clippy::unused_io_amount)]
|
||||
pub(super) fn notify(&self, _poller: &Poller) -> io::Result<()> {
|
||||
// Write to the write end of the pipe
|
||||
(&self.write_stream).write(&[1])?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Deregisters this notification pipe from the `Poller`.
|
||||
pub(super) fn deregister(&self, poller: &Poller) -> io::Result<()> {
|
||||
// Deregister the read end of the pipe.
|
||||
poller.delete(self.read_stream.as_raw_fd())
|
||||
}
|
||||
|
||||
/// Whether this raw file descriptor is associated with this pipe.
|
||||
pub(super) fn has_fd(&self, fd: RawFd) -> bool {
|
||||
self.read_stream.as_raw_fd() == fd
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue