polling/src/os/kqueue.rs

252 lines
7.8 KiB
Rust

//! Functionality that is only available for `kqueue`-based platforms.
use crate::sys::mode_to_flags;
use crate::{PollMode, Poller};
use std::io;
use std::process::Child;
use std::time::Duration;
use rustix::event::kqueue;
use super::__private::PollerSealed;
use __private::FilterSealed;
// TODO(notgull): We should also have EVFILT_AIO, EVFILT_VNODE and EVFILT_USER. However, the current
// API makes it difficult to effectively express events from these filters. At the next breaking
// change, we should change `Event` to be a struct with private fields, and encode additional
// information in there.
/// Functionality that is only available for `kqueue`-based platforms.
///
/// `kqueue` is able to monitor much more than just read/write readiness on file descriptors. Using
/// this extension trait, you can monitor for signals, process exits, and more. See the implementors
/// of the [`Filter`] trait for more information.
pub trait PollerKqueueExt<F: Filter>: PollerSealed {
/// Add a filter to the poller.
///
/// This is similar to [`add`][Poller::add], but it allows you to specify a filter instead of
/// a socket. See the implementors of the [`Filter`] trait for more information.
///
/// # Examples
///
/// ```no_run
/// use polling::{Events, Poller, PollMode};
/// use polling::os::kqueue::{Filter, PollerKqueueExt, Signal};
///
/// let poller = Poller::new().unwrap();
///
/// // Register the SIGINT signal.
/// poller.add_filter(Signal(rustix::process::Signal::Int as _), 0, PollMode::Oneshot).unwrap();
///
/// // Wait for the signal.
/// let mut events = Events::new();
/// poller.wait(&mut events, None).unwrap();
/// # let _ = events;
/// ```
fn add_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()>;
/// Modify a filter in the poller.
///
/// This is similar to [`modify`][Poller::modify], but it allows you to specify a filter
/// instead of a socket. See the implementors of the [`Filter`] trait for more information.
///
/// # Examples
///
/// ```no_run
/// use polling::{Events, Poller, PollMode};
/// use polling::os::kqueue::{Filter, PollerKqueueExt, Signal};
///
/// let poller = Poller::new().unwrap();
///
/// // Register the SIGINT signal.
/// poller.add_filter(Signal(rustix::process::Signal::Int as _), 0, PollMode::Oneshot).unwrap();
///
/// // Re-register with a different key.
/// poller.modify_filter(Signal(rustix::process::Signal::Int as _), 1, PollMode::Oneshot).unwrap();
///
/// // Wait for the signal.
/// let mut events = Events::new();
/// poller.wait(&mut events, None).unwrap();
/// # let _ = events;
/// ```
fn modify_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()>;
/// Remove a filter from the poller.
///
/// This is used to remove filters that were previously added with
/// [`add_filter`](PollerKqueueExt::add_filter).
///
/// # Examples
///
/// ```no_run
/// use polling::{Poller, PollMode};
/// use polling::os::kqueue::{Filter, PollerKqueueExt, Signal};
///
/// let poller = Poller::new().unwrap();
///
/// // Register the SIGINT signal.
/// poller.add_filter(Signal(rustix::process::Signal::Int as _), 0, PollMode::Oneshot).unwrap();
///
/// // Remove the filter.
/// poller.delete_filter(Signal(rustix::process::Signal::Int as _)).unwrap();
/// ```
fn delete_filter(&self, filter: F) -> io::Result<()>;
}
impl<F: Filter> PollerKqueueExt<F> for Poller {
#[inline(always)]
fn add_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()> {
// No difference between adding and modifying in kqueue.
self.modify_filter(filter, key, mode)
}
fn modify_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()> {
// Convert the filter into a kevent.
let event = filter.filter(kqueue::EventFlags::ADD | mode_to_flags(mode), key);
// Modify the filter.
self.poller.submit_changes([event])
}
fn delete_filter(&self, filter: F) -> io::Result<()> {
// Convert the filter into a kevent.
let event = filter.filter(kqueue::EventFlags::DELETE, 0);
// Delete the filter.
self.poller.submit_changes([event])
}
}
/// A filter that can be registered into a `kqueue`.
pub trait Filter: FilterSealed {}
unsafe impl<T: FilterSealed + ?Sized> FilterSealed for &T {
#[inline(always)]
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event {
(**self).filter(flags, key)
}
}
impl<T: Filter + ?Sized> Filter for &T {}
/// Monitor this signal number.
///
/// No matter what `PollMode` is specified, this filter will always be
/// oneshot-only.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Signal(pub std::os::raw::c_int);
unsafe impl FilterSealed for Signal {
#[inline(always)]
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event {
kqueue::Event::new(
kqueue::EventFilter::Signal {
signal: rustix::process::Signal::from_raw(self.0).expect("invalid signal number"),
times: 0,
},
flags | kqueue::EventFlags::RECEIPT,
key as _,
)
}
}
impl Filter for Signal {}
/// Monitor a child process.
#[derive(Debug)]
pub struct Process<'a> {
/// The child process to monitor.
child: &'a Child,
/// The operation to monitor.
ops: ProcessOps,
}
/// The operations that a monitored process can perform.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[non_exhaustive]
pub enum ProcessOps {
/// The process exited.
Exit,
/// The process was forked.
Fork,
/// The process executed a new process.
Exec,
}
impl<'a> Process<'a> {
/// Monitor a child process.
///
/// # Safety
///
/// Once registered into the `Poller`, the `Child` object must outlive this filter's
/// registration into the poller.
pub unsafe fn new(child: &'a Child, ops: ProcessOps) -> Self {
Self { child, ops }
}
}
unsafe impl FilterSealed for Process<'_> {
#[inline(always)]
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event {
let events = match self.ops {
ProcessOps::Exit => kqueue::ProcessEvents::EXIT,
ProcessOps::Fork => kqueue::ProcessEvents::FORK,
ProcessOps::Exec => kqueue::ProcessEvents::EXEC,
};
kqueue::Event::new(
kqueue::EventFilter::Proc {
pid: rustix::process::Pid::from_child(self.child),
flags: events,
},
flags | kqueue::EventFlags::RECEIPT,
key as _,
)
}
}
impl Filter for Process<'_> {}
/// Wait for a timeout to expire.
///
/// Modifying the timeout after it has been added to the poller will reset it.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Timer {
/// Identifier for the timer.
pub id: usize,
/// The timeout to wait for.
pub timeout: Duration,
}
unsafe impl FilterSealed for Timer {
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event {
kqueue::Event::new(
kqueue::EventFilter::Timer {
ident: self.id as _,
timer: Some(self.timeout),
},
flags | kqueue::EventFlags::RECEIPT,
key as _,
)
}
}
impl Filter for Timer {}
mod __private {
use rustix::event::kqueue;
#[doc(hidden)]
pub unsafe trait FilterSealed {
/// Get the filter for the given event.
///
/// This filter's flags must have `EV_RECEIPT`.
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event;
}
}