Replace libc with rustix in some backends (#108)

This commit is contained in:
John Nunley 2023-04-23 07:26:29 -07:00 committed by GitHub
parent d3a171b88b
commit 8d8d2efcc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 401 additions and 554 deletions

View File

@ -203,7 +203,7 @@ jobs:
os: [ubuntu-latest, windows-latest]
# When updating this, the reminder to update the minimum supported
# Rust version in Cargo.toml.
rust: ['1.47']
rust: ['1.48']
steps:
- uses: actions/checkout@v3
- name: Install Rust

View File

@ -6,7 +6,7 @@ name = "polling"
version = "2.8.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
rust-version = "1.47"
rust-version = "1.48"
description = "Portable interface to epoll, kqueue, event ports, and IOCP"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/smol-rs/polling"
@ -30,6 +30,7 @@ autocfg = "1"
[target.'cfg(any(unix, target_os = "fuchsia", target_os = "vxworks"))'.dependencies]
libc = "0.2.77"
rustix = { version = "0.37.11", features = ["process", "time", "fs", "std"], default-features = false }
[target.'cfg(windows)'.dependencies]
bitflags = "1.3.2"

View File

@ -3,9 +3,15 @@
use std::convert::TryInto;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::ptr;
use std::time::Duration;
use rustix::fd::OwnedFd;
use rustix::io::{epoll, eventfd, read, write, EventfdFlags};
use rustix::time::{
timerfd_create, timerfd_settime, Itimerspec, TimerfdClockId, TimerfdFlags, TimerfdTimerFlags,
Timespec,
};
#[cfg(not(polling_no_io_safety))]
use std::os::unix::io::{AsFd, BorrowedFd};
@ -15,11 +21,11 @@ use crate::{Event, PollMode};
#[derive(Debug)]
pub struct Poller {
/// File descriptor for the epoll instance.
epoll_fd: RawFd,
epoll_fd: OwnedFd,
/// File descriptor for the eventfd that produces notifications.
event_fd: RawFd,
event_fd: OwnedFd,
/// File descriptor for the timerfd that produces timeouts.
timer_fd: Option<RawFd>,
timer_fd: Option<OwnedFd>,
}
impl Poller {
@ -28,36 +34,14 @@ impl Poller {
// Create an epoll instance.
//
// Use `epoll_create1` with `EPOLL_CLOEXEC`.
let epoll_fd = syscall!(syscall(
libc::SYS_epoll_create1,
libc::EPOLL_CLOEXEC as libc::c_int
))
.map(|fd| fd as libc::c_int)
.or_else(|e| {
match e.raw_os_error() {
Some(libc::ENOSYS) => {
// If `epoll_create1` is not implemented, use `epoll_create`
// and manually set `FD_CLOEXEC`.
let fd = syscall!(epoll_create(1024))?;
if let Ok(flags) = syscall!(fcntl(fd, libc::F_GETFD)) {
let _ = syscall!(fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC));
}
Ok(fd)
}
_ => Err(e),
}
})?;
let epoll_fd = epoll::epoll_create(epoll::CreateFlags::CLOEXEC)?;
// Set up eventfd and timerfd.
let event_fd = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
let timer_fd = syscall!(syscall(
libc::SYS_timerfd_create,
libc::CLOCK_MONOTONIC as libc::c_int,
(libc::TFD_CLOEXEC | libc::TFD_NONBLOCK) as libc::c_int,
))
.map(|fd| fd as libc::c_int)
let event_fd = eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?;
let timer_fd = timerfd_create(
TimerfdClockId::Monotonic,
TimerfdFlags::CLOEXEC | TimerfdFlags::NONBLOCK,
)
.ok();
let poller = Poller {
@ -66,12 +50,16 @@ impl Poller {
timer_fd,
};
if let Some(timer_fd) = timer_fd {
poller.add(timer_fd, Event::none(crate::NOTIFY_KEY), PollMode::Oneshot)?;
if let Some(ref timer_fd) = poller.timer_fd {
poller.add(
timer_fd.as_raw_fd(),
Event::none(crate::NOTIFY_KEY),
PollMode::Oneshot,
)?;
}
poller.add(
event_fd,
poller.event_fd.as_raw_fd(),
Event {
key: crate::NOTIFY_KEY,
readable: true,
@ -82,9 +70,9 @@ impl Poller {
log::trace!(
"new: epoll_fd={}, event_fd={}, timer_fd={:?}",
epoll_fd,
event_fd,
timer_fd
poller.epoll_fd.as_raw_fd(),
poller.event_fd.as_raw_fd(),
poller.timer_fd
);
Ok(poller)
}
@ -101,29 +89,65 @@ impl Poller {
/// Adds a new file descriptor.
pub fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
log::trace!("add: epoll_fd={}, fd={}, ev={:?}", self.epoll_fd, fd, ev);
self.ctl(libc::EPOLL_CTL_ADD, fd, Some((ev, mode)))
log::trace!(
"add: epoll_fd={}, fd={}, ev={:?}",
self.epoll_fd.as_raw_fd(),
fd,
ev
);
epoll::epoll_add(
&self.epoll_fd,
unsafe { rustix::fd::BorrowedFd::borrow_raw(fd) },
ev.key as u64,
epoll_flags(&ev, mode),
)?;
Ok(())
}
/// Modifies an existing file descriptor.
pub fn modify(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
log::trace!("modify: epoll_fd={}, fd={}, ev={:?}", self.epoll_fd, fd, ev);
self.ctl(libc::EPOLL_CTL_MOD, fd, Some((ev, mode)))
log::trace!(
"modify: epoll_fd={}, fd={}, ev={:?}",
self.epoll_fd.as_raw_fd(),
fd,
ev
);
epoll::epoll_mod(
&self.epoll_fd,
unsafe { rustix::fd::BorrowedFd::borrow_raw(fd) },
ev.key as u64,
epoll_flags(&ev, mode),
)?;
Ok(())
}
/// Deletes a file descriptor.
pub fn delete(&self, fd: RawFd) -> io::Result<()> {
log::trace!("remove: epoll_fd={}, fd={}", self.epoll_fd, fd);
self.ctl(libc::EPOLL_CTL_DEL, fd, None)
log::trace!("remove: epoll_fd={}, fd={}", self.epoll_fd.as_raw_fd(), fd);
epoll::epoll_del(&self.epoll_fd, unsafe {
rustix::fd::BorrowedFd::borrow_raw(fd)
})?;
Ok(())
}
/// Waits for I/O events with an optional timeout.
#[allow(clippy::needless_update)]
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
log::trace!("wait: epoll_fd={}, timeout={:?}", self.epoll_fd, timeout);
log::trace!(
"wait: epoll_fd={}, timeout={:?}",
self.epoll_fd.as_raw_fd(),
timeout
);
if let Some(timer_fd) = self.timer_fd {
if let Some(ref timer_fd) = self.timer_fd {
// Configure the timeout using timerfd.
let new_val = libc::itimerspec {
let new_val = Itimerspec {
it_interval: TS_ZERO,
it_value: match timeout {
None => TS_ZERO,
@ -134,18 +158,14 @@ impl Poller {
ts
}
},
..unsafe { std::mem::zeroed() }
};
syscall!(timerfd_settime(
timer_fd as libc::c_int,
0 as libc::c_int,
&new_val as *const libc::itimerspec,
ptr::null_mut() as *mut libc::itimerspec
))?;
timerfd_settime(timer_fd, TimerfdTimerFlags::empty(), &new_val)?;
// Set interest in timerfd.
self.modify(
timer_fd,
timer_fd.as_raw_fd(),
Event {
key: crate::NOTIFY_KEY,
readable: true,
@ -156,7 +176,7 @@ impl Poller {
}
// Timeout in milliseconds for epoll.
let timeout_ms = match (self.timer_fd, timeout) {
let timeout_ms = match (&self.timer_fd, timeout) {
(_, Some(t)) if t == Duration::from_secs(0) => 0,
(None, Some(t)) => {
// Round up to a whole millisecond.
@ -170,24 +190,18 @@ impl Poller {
};
// Wait for I/O events.
let res = syscall!(epoll_wait(
self.epoll_fd,
events.list.as_mut_ptr() as *mut libc::epoll_event,
events.list.len() as libc::c_int,
timeout_ms as libc::c_int,
))?;
events.len = res as usize;
log::trace!("new events: epoll_fd={}, res={}", self.epoll_fd, res);
epoll::epoll_wait(&self.epoll_fd, &mut events.list, timeout_ms)?;
log::trace!(
"new events: epoll_fd={}, res={}",
self.epoll_fd.as_raw_fd(),
events.list.len()
);
// Clear the notification (if received) and re-register interest in it.
let mut buf = [0u8; 8];
let _ = syscall!(read(
self.event_fd,
buf.as_mut_ptr() as *mut libc::c_void,
buf.len()
));
let _ = read(&self.event_fd, &mut buf);
self.modify(
self.event_fd,
self.event_fd.as_raw_fd(),
Event {
key: crate::NOTIFY_KEY,
readable: true,
@ -202,62 +216,26 @@ impl Poller {
pub fn notify(&self) -> io::Result<()> {
log::trace!(
"notify: epoll_fd={}, event_fd={}",
self.epoll_fd,
self.event_fd
self.epoll_fd.as_raw_fd(),
self.event_fd.as_raw_fd()
);
let buf: [u8; 8] = 1u64.to_ne_bytes();
let _ = syscall!(write(
self.event_fd,
buf.as_ptr() as *const libc::c_void,
buf.len()
));
Ok(())
}
/// Passes arguments to `epoll_ctl`.
fn ctl(&self, op: libc::c_int, fd: RawFd, ev: Option<(Event, PollMode)>) -> io::Result<()> {
let mut ev = ev.map(|(ev, mode)| {
let mut flags = match mode {
PollMode::Oneshot => libc::EPOLLONESHOT,
PollMode::Level => 0,
PollMode::Edge => libc::EPOLLET,
PollMode::EdgeOneshot => libc::EPOLLONESHOT | libc::EPOLLET,
};
if ev.readable {
flags |= read_flags();
}
if ev.writable {
flags |= write_flags();
}
libc::epoll_event {
events: flags as _,
u64: ev.key as u64,
}
});
syscall!(epoll_ctl(
self.epoll_fd,
op,
fd,
ev.as_mut()
.map(|ev| ev as *mut libc::epoll_event)
.unwrap_or(ptr::null_mut()),
))?;
let _ = write(&self.event_fd, &buf);
Ok(())
}
}
impl AsRawFd for Poller {
fn as_raw_fd(&self) -> RawFd {
self.epoll_fd
self.epoll_fd.as_raw_fd()
}
}
#[cfg(not(polling_no_io_safety))]
impl AsFd for Poller {
fn as_fd(&self) -> BorrowedFd<'_> {
// SAFETY: lifetime is bound by "self"
unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
self.epoll_fd.as_fd()
}
}
@ -265,39 +243,53 @@ impl Drop for Poller {
fn drop(&mut self) {
log::trace!(
"drop: epoll_fd={}, event_fd={}, timer_fd={:?}",
self.epoll_fd,
self.event_fd,
self.epoll_fd.as_raw_fd(),
self.event_fd.as_raw_fd(),
self.timer_fd
);
if let Some(timer_fd) = self.timer_fd {
let _ = self.delete(timer_fd);
let _ = syscall!(close(timer_fd));
if let Some(timer_fd) = self.timer_fd.take() {
let _ = self.delete(timer_fd.as_raw_fd());
}
let _ = self.delete(self.event_fd);
let _ = syscall!(close(self.event_fd));
let _ = syscall!(close(self.epoll_fd));
let _ = self.delete(self.event_fd.as_raw_fd());
}
}
/// `timespec` value that equals zero.
const TS_ZERO: libc::timespec =
unsafe { std::mem::transmute([0u8; std::mem::size_of::<libc::timespec>()]) };
const TS_ZERO: Timespec = unsafe { std::mem::transmute([0u8; std::mem::size_of::<Timespec>()]) };
/// Get the EPOLL flags for the interest.
fn epoll_flags(interest: &Event, mode: PollMode) -> epoll::EventFlags {
let mut flags = match mode {
PollMode::Oneshot => epoll::EventFlags::ONESHOT,
PollMode::Level => epoll::EventFlags::empty(),
PollMode::Edge => epoll::EventFlags::ET,
PollMode::EdgeOneshot => epoll::EventFlags::ET | epoll::EventFlags::ONESHOT,
};
if interest.readable {
flags |= read_flags();
}
if interest.writable {
flags |= write_flags();
}
flags
}
/// Epoll flags for all possible readability events.
fn read_flags() -> libc::c_int {
libc::EPOLLIN | libc::EPOLLRDHUP | libc::EPOLLHUP | libc::EPOLLERR | libc::EPOLLPRI
fn read_flags() -> epoll::EventFlags {
use epoll::EventFlags as Epoll;
Epoll::IN | Epoll::HUP | Epoll::ERR | Epoll::PRI
}
/// Epoll flags for all possible writability events.
fn write_flags() -> libc::c_int {
libc::EPOLLOUT | libc::EPOLLHUP | libc::EPOLLERR
fn write_flags() -> epoll::EventFlags {
use epoll::EventFlags as Epoll;
Epoll::OUT | Epoll::HUP | Epoll::ERR
}
/// A list of reported I/O events.
pub struct Events {
list: Box<[libc::epoll_event; 1024]>,
len: usize,
list: epoll::EventVec,
}
unsafe impl Send for Events {}
@ -305,18 +297,17 @@ unsafe impl Send for Events {}
impl Events {
/// Creates an empty list.
pub fn new() -> Events {
let ev = libc::epoll_event { events: 0, u64: 0 };
let list = Box::new([ev; 1024]);
let len = 0;
Events { list, len }
Events {
list: epoll::EventVec::with_capacity(1024),
}
}
/// Iterates over I/O events.
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list[..self.len].iter().map(|ev| Event {
key: ev.u64 as usize,
readable: (ev.events as libc::c_int & read_flags()) != 0,
writable: (ev.events as libc::c_int & write_flags()) != 0,
self.list.iter().map(|(flags, data)| Event {
key: data as usize,
readable: flags.intersects(read_flags()),
writable: flags.intersects(write_flags()),
})
}
}

View File

@ -1,21 +1,22 @@
//! Bindings to kqueue (macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD).
use std::io;
use std::mem;
use std::os::unix::io::{AsRawFd, RawFd};
use std::ptr;
use std::time::Duration;
#[cfg(not(polling_no_io_safety))]
use std::os::unix::io::{AsFd, BorrowedFd};
use rustix::fd::OwnedFd;
use rustix::io::{fcntl_setfd, kqueue, Errno, FdFlags};
use crate::{Event, PollMode};
/// Interface to kqueue.
#[derive(Debug)]
pub struct Poller {
/// File descriptor for the kqueue instance.
kqueue_fd: RawFd,
kqueue_fd: OwnedFd,
/// Notification pipe for waking up the poller.
///
@ -28,8 +29,8 @@ impl Poller {
/// Creates a new poller.
pub fn new() -> io::Result<Poller> {
// Create a kqueue instance.
let kqueue_fd = syscall!(kqueue())?;
syscall!(fcntl(kqueue_fd, libc::F_SETFD, libc::FD_CLOEXEC))?;
let kqueue_fd = kqueue::kqueue()?;
fcntl_setfd(&kqueue_fd, FdFlags::CLOEXEC)?;
let poller = Poller {
kqueue_fd,
@ -39,7 +40,7 @@ impl Poller {
// Register the notification pipe.
poller.notify.register(&poller)?;
log::trace!("new: kqueue_fd={}", kqueue_fd,);
log::trace!("new: kqueue_fd={:?}", poller.kqueue_fd);
Ok(poller)
}
@ -62,38 +63,39 @@ impl Poller {
/// Modifies an existing file descriptor.
pub fn modify(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
if !self.notify.has_fd(fd) {
log::trace!("add: kqueue_fd={}, fd={}, ev={:?}", self.kqueue_fd, fd, ev);
log::trace!(
"add: kqueue_fd={:?}, fd={}, ev={:?}",
self.kqueue_fd,
fd,
ev
);
}
let mode_flags = mode_to_flags(mode);
let read_flags = if ev.readable {
libc::EV_ADD | mode_flags
kqueue::EventFlags::ADD | mode_flags
} else {
libc::EV_DELETE
kqueue::EventFlags::DELETE
};
let write_flags = if ev.writable {
libc::EV_ADD | mode_flags
kqueue::EventFlags::ADD | mode_flags
} else {
libc::EV_DELETE
kqueue::EventFlags::DELETE
};
// A list of changes for kqueue.
let changelist = [
libc::kevent {
ident: fd as _,
filter: libc::EVFILT_READ,
flags: read_flags | libc::EV_RECEIPT,
udata: ev.key as _,
..unsafe { mem::zeroed() }
},
libc::kevent {
ident: fd as _,
filter: libc::EVFILT_WRITE,
flags: write_flags | libc::EV_RECEIPT,
udata: ev.key as _,
..unsafe { mem::zeroed() }
},
kqueue::Event::new(
kqueue::EventFilter::Read(fd),
read_flags | kqueue::EventFlags::RECEIPT,
ev.key as _,
),
kqueue::Event::new(
kqueue::EventFilter::Write(fd),
write_flags | kqueue::EventFlags::RECEIPT,
ev.key as _,
),
];
// Apply changes.
@ -103,34 +105,31 @@ impl Poller {
/// Submit one or more changes to the kernel queue and check to see if they succeeded.
pub(crate) fn submit_changes<A>(&self, changelist: A) -> io::Result<()>
where
A: Copy + AsRef<[libc::kevent]> + AsMut<[libc::kevent]>,
A: Copy + AsRef<[kqueue::Event]> + AsMut<[kqueue::Event]>,
{
let mut eventlist = changelist;
let mut eventlist = Vec::with_capacity(changelist.as_ref().len());
// Apply changes.
{
let changelist = changelist.as_ref();
let eventlist = eventlist.as_mut();
syscall!(kevent(
self.kqueue_fd,
changelist.as_ptr() as *const libc::kevent,
changelist.len() as _,
eventlist.as_mut_ptr() as *mut libc::kevent,
eventlist.len() as _,
ptr::null(),
))?;
unsafe {
kqueue::kevent(&self.kqueue_fd, changelist, &mut eventlist, None)?;
}
}
// Check for errors.
for &ev in eventlist.as_ref() {
for &ev in &eventlist {
// TODO: Once the data field is exposed in rustix, use that.
let data = unsafe { (*(&ev as *const kqueue::Event as *const libc::kevent)).data };
// Explanation for ignoring EPIPE: https://github.com/tokio-rs/mio/issues/582
if (ev.flags & libc::EV_ERROR) != 0
&& ev.data != 0
&& ev.data != libc::ENOENT as _
&& ev.data != libc::EPIPE as _
if (ev.flags().contains(kqueue::EventFlags::ERROR))
&& data != 0
&& data != Errno::NOENT.raw_os_error() as _
&& data != Errno::PIPE.raw_os_error() as _
{
return Err(io::Error::from_raw_os_error(ev.data as _));
return Err(io::Error::from_raw_os_error(data as _));
}
}
@ -145,30 +144,18 @@ impl Poller {
/// Waits for I/O events with an optional timeout.
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
log::trace!("wait: kqueue_fd={}, timeout={:?}", self.kqueue_fd, timeout);
// Convert the `Duration` to `libc::timespec`.
let timeout = timeout.map(|t| libc::timespec {
tv_sec: t.as_secs() as libc::time_t,
tv_nsec: t.subsec_nanos() as libc::c_long,
});
log::trace!(
"wait: kqueue_fd={:?}, timeout={:?}",
self.kqueue_fd,
timeout
);
// Wait for I/O events.
let changelist = [];
let eventlist = &mut events.list;
let res = syscall!(kevent(
self.kqueue_fd,
changelist.as_ptr() as *const libc::kevent,
changelist.len() as _,
eventlist.as_mut_ptr() as *mut libc::kevent,
eventlist.len() as _,
match &timeout {
None => ptr::null(),
Some(t) => t,
}
))?;
events.len = res as usize;
log::trace!("new events: kqueue_fd={}, res={}", self.kqueue_fd, res);
let res = unsafe { kqueue::kevent(&self.kqueue_fd, &changelist, eventlist, timeout)? };
log::trace!("new events: kqueue_fd={:?}, res={}", self.kqueue_fd, res);
// Clear the notification (if received) and re-register interest in it.
self.notify.reregister(self)?;
@ -178,7 +165,7 @@ impl Poller {
/// Sends a notification to wake up the current or next `wait()` call.
pub fn notify(&self) -> io::Result<()> {
log::trace!("notify: kqueue_fd={}", self.kqueue_fd);
log::trace!("notify: kqueue_fd={:?}", self.kqueue_fd);
self.notify.notify(self).ok();
Ok(())
}
@ -186,30 +173,27 @@ impl Poller {
impl AsRawFd for Poller {
fn as_raw_fd(&self) -> RawFd {
self.kqueue_fd
self.kqueue_fd.as_raw_fd()
}
}
#[cfg(not(polling_no_io_safety))]
impl AsFd for Poller {
fn as_fd(&self) -> BorrowedFd<'_> {
// SAFETY: lifetime is bound by "self"
unsafe { BorrowedFd::borrow_raw(self.kqueue_fd) }
self.kqueue_fd.as_fd()
}
}
impl Drop for Poller {
fn drop(&mut self) {
log::trace!("drop: kqueue_fd={}", self.kqueue_fd);
log::trace!("drop: kqueue_fd={:?}", self.kqueue_fd);
let _ = self.notify.deregister(self);
let _ = syscall!(close(self.kqueue_fd));
}
}
/// A list of reported I/O events.
pub struct Events {
list: Box<[libc::kevent; 1024]>,
len: usize,
list: Vec<kqueue::Event>,
}
unsafe impl Send for Events {}
@ -217,56 +201,45 @@ unsafe impl Send for Events {}
impl Events {
/// Creates an empty list.
pub fn new() -> Events {
let ev: libc::kevent = unsafe { mem::zeroed() };
let list = Box::new([ev; 1024]);
let len = 0;
Events { list, len }
Events {
list: Vec::with_capacity(1024),
}
}
/// Iterates over I/O events.
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
const READABLES: &[FilterName] = &[
libc::EVFILT_READ,
libc::EVFILT_VNODE,
libc::EVFILT_PROC,
libc::EVFILT_SIGNAL,
libc::EVFILT_TIMER,
];
// On some platforms, closing the read end of a pipe wakes up writers, but the
// event is reported as EVFILT_READ with the EV_EOF flag.
//
// https://github.com/golang/go/commit/23aad448b1e3f7c3b4ba2af90120bde91ac865b4
self.list[..self.len].iter().map(|ev| Event {
key: ev.udata as usize,
readable: READABLES.contains(&ev.filter),
writable: ev.filter == libc::EVFILT_WRITE
|| (ev.filter == libc::EVFILT_READ && (ev.flags & libc::EV_EOF) != 0),
self.list.iter().map(|ev| Event {
key: ev.udata() as usize,
readable: matches!(
ev.filter(),
kqueue::EventFilter::Read(..)
| kqueue::EventFilter::Vnode { .. }
| kqueue::EventFilter::Proc { .. }
| kqueue::EventFilter::Signal { .. }
| kqueue::EventFilter::Timer { .. }
),
writable: matches!(ev.filter(), kqueue::EventFilter::Write(..))
|| (matches!(ev.filter(), kqueue::EventFilter::Read(..))
&& (ev.flags().intersects(kqueue::EventFlags::EOF))),
})
}
}
pub(crate) fn mode_to_flags(mode: PollMode) -> FilterFlags {
pub(crate) fn mode_to_flags(mode: PollMode) -> kqueue::EventFlags {
use kqueue::EventFlags as EV;
match mode {
PollMode::Oneshot => libc::EV_ONESHOT,
PollMode::Level => 0,
PollMode::Edge => libc::EV_CLEAR,
PollMode::EdgeOneshot => libc::EV_ONESHOT | libc::EV_CLEAR,
PollMode::Oneshot => EV::ONESHOT,
PollMode::Level => EV::empty(),
PollMode::Edge => EV::CLEAR,
PollMode::EdgeOneshot => EV::ONESHOT | EV::CLEAR,
}
}
#[cfg(target_os = "netbsd")]
pub(crate) type FilterFlags = u32;
#[cfg(not(target_os = "netbsd"))]
pub(crate) type FilterFlags = libc::c_ushort;
#[cfg(target_os = "netbsd")]
pub(crate) type FilterName = u32;
#[cfg(not(target_os = "netbsd"))]
pub(crate) type FilterName = libc::c_short;
#[cfg(any(
target_os = "freebsd",
target_os = "dragonfly",
@ -277,8 +250,9 @@ pub(crate) type FilterName = libc::c_short;
))]
mod notify {
use super::Poller;
use rustix::io::kqueue;
use std::io;
use std::os::unix::io::RawFd;
use std::{io, mem};
/// A notification pipe.
///
@ -295,13 +269,15 @@ mod notify {
/// Registers this notification pipe in the `Poller`.
pub(super) fn register(&self, poller: &Poller) -> io::Result<()> {
// Register an EVFILT_USER event.
poller.submit_changes([libc::kevent {
ident: 0,
filter: libc::EVFILT_USER,
flags: libc::EV_ADD | libc::EV_RECEIPT | libc::EV_CLEAR,
udata: crate::NOTIFY_KEY as _,
..unsafe { mem::zeroed() }
}])
poller.submit_changes([kqueue::Event::new(
kqueue::EventFilter::User {
ident: 0,
flags: kqueue::UserFlags::empty(),
user_flags: kqueue::UserDefinedFlags::new(0),
},
kqueue::EventFlags::ADD | kqueue::EventFlags::RECEIPT | kqueue::EventFlags::CLEAR,
crate::NOTIFY_KEY as _,
)])
}
/// Reregister this notification pipe in the `Poller`.
@ -313,14 +289,15 @@ mod notify {
/// Notifies the `Poller`.
pub(super) fn notify(&self, poller: &Poller) -> io::Result<()> {
// Trigger the EVFILT_USER event.
poller.submit_changes([libc::kevent {
ident: 0,
filter: libc::EVFILT_USER,
flags: libc::EV_ADD | libc::EV_RECEIPT,
fflags: libc::NOTE_TRIGGER,
udata: crate::NOTIFY_KEY as _,
..unsafe { mem::zeroed() }
}])?;
poller.submit_changes([kqueue::Event::new(
kqueue::EventFilter::User {
ident: 0,
flags: kqueue::UserFlags::TRIGGER,
user_flags: kqueue::UserDefinedFlags::new(0),
},
kqueue::EventFlags::ADD | kqueue::EventFlags::RECEIPT,
crate::NOTIFY_KEY as _,
)])?;
Ok(())
}
@ -328,13 +305,15 @@ mod notify {
/// Deregisters this notification pipe from the `Poller`.
pub(super) fn deregister(&self, poller: &Poller) -> io::Result<()> {
// Deregister the EVFILT_USER event.
poller.submit_changes([libc::kevent {
ident: 0,
filter: libc::EVFILT_USER,
flags: libc::EV_RECEIPT | libc::EV_DELETE,
udata: crate::NOTIFY_KEY as _,
..unsafe { mem::zeroed() }
}])
poller.submit_changes([kqueue::Event::new(
kqueue::EventFilter::User {
ident: 0,
flags: kqueue::UserFlags::empty(),
user_flags: kqueue::UserDefinedFlags::new(0),
},
kqueue::EventFlags::DELETE | kqueue::EventFlags::RECEIPT,
crate::NOTIFY_KEY as _,
)])
}
/// Whether this raw file descriptor is associated with this pipe.

View File

@ -64,19 +64,6 @@ use std::usize;
use cfg_if::cfg_if;
/// Calls a libc function and results in `io::Result`.
#[cfg(unix)]
macro_rules! syscall {
($fn:ident $args:tt) => {{
let res = unsafe { libc::$fn $args };
if res == -1 {
Err(std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
cfg_if! {
// Note: This cfg is intended to make it easy for polling developers to test
// the backend that uses poll, and is not a public API.

View File

@ -1,12 +1,13 @@
//! Functionality that is only available for `kqueue`-based platforms.
use crate::sys::{mode_to_flags, FilterFlags};
use crate::sys::mode_to_flags;
use crate::{PollMode, Poller};
use std::convert::TryInto;
use std::io;
use std::process::Child;
use std::time::Duration;
use std::{io, mem};
use rustix::io::kqueue;
use super::__private::PollerSealed;
use __private::FilterSealed;
@ -102,7 +103,7 @@ impl<F: Filter> PollerKqueueExt<F> for Poller {
fn modify_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()> {
// Convert the filter into a kevent.
let event = filter.filter(libc::EV_ADD | mode_to_flags(mode), key);
let event = filter.filter(kqueue::EventFlags::ADD | mode_to_flags(mode), key);
// Modify the filter.
self.poller.submit_changes([event])
@ -110,7 +111,7 @@ impl<F: Filter> PollerKqueueExt<F> for Poller {
fn delete_filter(&self, filter: F) -> io::Result<()> {
// Convert the filter into a kevent.
let event = filter.filter(libc::EV_DELETE, 0);
let event = filter.filter(kqueue::EventFlags::DELETE, 0);
// Delete the filter.
self.poller.submit_changes([event])
@ -122,7 +123,7 @@ pub trait Filter: FilterSealed {}
unsafe impl<T: FilterSealed + ?Sized> FilterSealed for &T {
#[inline(always)]
fn filter(&self, flags: FilterFlags, key: usize) -> libc::kevent {
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event {
(**self).filter(flags, key)
}
}
@ -142,14 +143,15 @@ pub type c_int = i32;
unsafe impl FilterSealed for Signal {
#[inline(always)]
fn filter(&self, flags: FilterFlags, key: usize) -> libc::kevent {
libc::kevent {
ident: self.0 as _,
filter: libc::EVFILT_SIGNAL,
flags: flags | libc::EV_RECEIPT,
udata: key as _,
..unsafe { mem::zeroed() }
}
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event {
kqueue::Event::new(
kqueue::EventFilter::Signal {
signal: rustix::process::Signal::from_raw(self.0).expect("invalid signal number"),
times: 0,
},
flags | kqueue::EventFlags::RECEIPT,
key as _,
)
}
}
@ -188,21 +190,21 @@ impl<'a> Process<'a> {
unsafe impl FilterSealed for Process<'_> {
#[inline(always)]
fn filter(&self, flags: FilterFlags, key: usize) -> libc::kevent {
let fflags = match self.ops {
ProcessOps::Exit => libc::NOTE_EXIT,
ProcessOps::Fork => libc::NOTE_FORK,
ProcessOps::Exec => libc::NOTE_EXEC,
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event {
let events = match self.ops {
ProcessOps::Exit => kqueue::ProcessEvents::EXIT,
ProcessOps::Fork => kqueue::ProcessEvents::FORK,
ProcessOps::Exec => kqueue::ProcessEvents::EXEC,
};
libc::kevent {
ident: self.child.id() as _,
filter: libc::EVFILT_PROC,
flags: flags | libc::EV_RECEIPT,
fflags,
udata: key as _,
..unsafe { mem::zeroed() }
}
kqueue::Event::new(
kqueue::EventFilter::Proc {
pid: rustix::process::Pid::from_child(self.child),
flags: events,
},
flags | kqueue::EventFlags::RECEIPT,
key as _,
)
}
}
@ -221,82 +223,28 @@ pub struct Timer {
}
unsafe impl FilterSealed for Timer {
fn filter(&self, flags: FilterFlags, key: usize) -> libc::kevent {
// Figure out the granularity of the timer.
let (fflags, data) = {
#[cfg(not(any(target_os = "dragonfly", target_os = "netbsd", target_os = "openbsd")))]
{
let subsec_nanos = self.timeout.subsec_nanos();
match (subsec_nanos % 1_000, subsec_nanos % 1_000_000, subsec_nanos) {
(_, _, 0) => (
libc::NOTE_SECONDS,
self.timeout.as_secs().try_into().expect("too many seconds"),
),
(_, 0, _) => (
// Note: 0 by default means milliseconds.
0,
self.timeout
.as_millis()
.try_into()
.expect("too many milliseconds"),
),
(0, _, _) => (
libc::NOTE_USECONDS,
self.timeout
.as_micros()
.try_into()
.expect("too many microseconds"),
),
(_, _, _) => (
libc::NOTE_NSECONDS,
self.timeout
.as_nanos()
.try_into()
.expect("too many nanoseconds"),
),
}
}
#[cfg(any(target_os = "dragonfly", target_os = "netbsd", target_os = "openbsd"))]
{
// OpenBSD/Dragonfly/NetBSD only supports milliseconds.
// NetBSD 10 supports NOTE_SECONDS et al, once Rust drops support for
// NetBSD 9 we can use the same code as above.
// See also: https://github.com/rust-lang/libc/pull/3080
(
0,
self.timeout
.as_millis()
.try_into()
.expect("too many milliseconds"),
)
}
};
#[allow(clippy::needless_update)]
libc::kevent {
ident: self.id as _,
filter: libc::EVFILT_TIMER,
flags: flags | libc::EV_RECEIPT,
fflags,
data,
udata: key as _,
..unsafe { mem::zeroed() }
}
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event {
kqueue::Event::new(
kqueue::EventFilter::Timer {
ident: self.id as _,
timer: Some(self.timeout),
},
flags | kqueue::EventFlags::RECEIPT,
key as _,
)
}
}
impl Filter for Timer {}
mod __private {
use crate::sys::FilterFlags;
use rustix::io::kqueue;
#[doc(hidden)]
pub unsafe trait FilterSealed {
/// Get the filter for the given event.
///
/// This filter's flags must have `EV_RECEIPT`.
fn filter(&self, flags: FilterFlags, key: usize) -> libc::kevent;
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event;
}
}

View File

@ -2,14 +2,20 @@
use std::collections::HashMap;
use std::convert::TryInto;
use std::fmt::{self, Debug, Formatter};
use std::io;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Condvar, Mutex};
use std::time::{Duration, Instant};
use rustix::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd};
use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
use rustix::io::{
fcntl_getfd, fcntl_setfd, pipe, pipe_with, poll, read, write, FdFlags, PipeFlags, PollFd,
PollFlags,
};
// std::os::unix doesn't exist on Fuchsia
use libc::c_int as RawFd;
type RawFd = std::os::raw::c_int;
use crate::{Event, PollMode};
@ -21,13 +27,13 @@ pub struct Poller {
/// The file descriptor of the read half of the notify pipe. This is also stored as the first
/// file descriptor in `fds.poll_fds`.
notify_read: RawFd,
notify_read: OwnedFd,
/// The file descriptor of the write half of the notify pipe.
///
/// Data is written to this to wake up the current instance of `wait`, which can occur when the
/// user notifies it (in which case `notified` would have been set) or when an operation needs
/// to occur (in which case `waiting_operations` would have been incremented).
notify_write: RawFd,
notify_write: OwnedFd,
/// The number of operations (`add`, `modify` or `delete`) that are currently waiting on the
/// mutex to become free. When this is nonzero, `wait` must be suspended until it reaches zero
@ -49,27 +55,12 @@ struct Fds {
///
/// The first file descriptor is always present and is used to notify the poller. It is also
/// stored in `notify_read`.
poll_fds: Vec<PollFd>,
poll_fds: Vec<PollFd<'static>>,
/// The map of each file descriptor to data associated with it. This does not include the file
/// descriptors `notify_read` or `notify_write`.
fd_data: HashMap<RawFd, FdData>,
}
/// Transparent wrapper around `libc::pollfd`, used to support `Debug` derives without adding the
/// `extra_traits` feature of `libc`.
#[repr(transparent)]
struct PollFd(libc::pollfd);
impl Debug for PollFd {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("pollfd")
.field("fd", &self.0.fd)
.field("events", &self.0.events)
.field("revents", &self.0.revents)
.finish()
}
}
/// Data associated with a file descriptor in a poller.
#[derive(Debug)]
struct FdData {
@ -85,34 +76,36 @@ impl Poller {
/// Creates a new poller.
pub fn new() -> io::Result<Poller> {
// Create the notification pipe.
let mut notify_pipe = [0; 2];
syscall!(pipe(notify_pipe.as_mut_ptr()))?;
let (notify_read, notify_write) = pipe_with(PipeFlags::CLOEXEC).or_else(|_| {
let (notify_read, notify_write) = pipe()?;
fcntl_setfd(&notify_read, fcntl_getfd(&notify_read)? | FdFlags::CLOEXEC)?;
fcntl_setfd(
&notify_write,
fcntl_getfd(&notify_write)? | FdFlags::CLOEXEC,
)?;
io::Result::Ok((notify_read, notify_write))
})?;
// Put the reading side into non-blocking mode.
let notify_read_flags = syscall!(fcntl(notify_pipe[0], libc::F_GETFL))?;
syscall!(fcntl(
notify_pipe[0],
libc::F_SETFL,
notify_read_flags | libc::O_NONBLOCK
))?;
fcntl_setfl(&notify_read, fcntl_getfl(&notify_read)? | OFlags::NONBLOCK)?;
log::trace!(
"new: notify_read={}, notify_write={}",
notify_pipe[0],
notify_pipe[1]
"new: notify_read={:?}, notify_write={:?}",
notify_read,
notify_write,
);
Ok(Self {
fds: Mutex::new(Fds {
poll_fds: vec![PollFd(libc::pollfd {
fd: notify_pipe[0],
events: libc::POLLRDNORM,
revents: 0,
})],
poll_fds: vec![PollFd::from_borrowed_fd(
// SAFETY: `read` will remain valid until we drop `self`.
unsafe { BorrowedFd::borrow_raw(notify_read.as_raw_fd()) },
PollFlags::RDNORM,
)],
fd_data: HashMap::new(),
}),
notify_read: notify_pipe[0],
notify_write: notify_pipe[1],
notify_read,
notify_write,
waiting_operations: AtomicUsize::new(0),
operations_complete: Condvar::new(),
notified: AtomicBool::new(false),
@ -131,12 +124,12 @@ impl Poller {
/// Adds a new file descriptor.
pub fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
if fd == self.notify_read || fd == self.notify_write {
if fd == self.notify_read.as_raw_fd() || fd == self.notify_write.as_raw_fd() {
return Err(io::Error::from(io::ErrorKind::InvalidInput));
}
log::trace!(
"add: notify_read={}, fd={}, ev={:?}",
"add: notify_read={:?}, fd={}, ev={:?}",
self.notify_read,
fd,
ev
@ -157,11 +150,11 @@ impl Poller {
},
);
fds.poll_fds.push(PollFd(libc::pollfd {
fd,
events: poll_events(ev),
revents: 0,
}));
fds.poll_fds.push(PollFd::from_borrowed_fd(
// SAFETY: Until we have I/O safety, assume that `fd` is valid forever.
unsafe { BorrowedFd::borrow_raw(fd) },
poll_events(ev),
));
Ok(())
})
@ -170,7 +163,7 @@ impl Poller {
/// Modifies an existing file descriptor.
pub fn modify(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
log::trace!(
"modify: notify_read={}, fd={}, ev={:?}",
"modify: notify_read={:?}, fd={}, ev={:?}",
self.notify_read,
fd,
ev
@ -180,7 +173,8 @@ impl Poller {
let data = fds.fd_data.get_mut(&fd).ok_or(io::ErrorKind::NotFound)?;
data.key = ev.key;
let poll_fds_index = data.poll_fds_index;
fds.poll_fds[poll_fds_index].0.events = poll_events(ev);
fds.poll_fds[poll_fds_index] =
PollFd::from_borrowed_fd(unsafe { BorrowedFd::borrow_raw(fd) }, poll_events(ev));
data.remove = cvt_mode_as_remove(mode)?;
Ok(())
@ -189,14 +183,14 @@ impl Poller {
/// Deletes a file descriptor.
pub fn delete(&self, fd: RawFd) -> io::Result<()> {
log::trace!("delete: notify_read={}, fd={}", self.notify_read, fd);
log::trace!("delete: notify_read={:?}, fd={}", self.notify_read, fd);
self.modify_fds(|fds| {
let data = fds.fd_data.remove(&fd).ok_or(io::ErrorKind::NotFound)?;
fds.poll_fds.swap_remove(data.poll_fds_index);
if let Some(swapped_pollfd) = fds.poll_fds.get(data.poll_fds_index) {
fds.fd_data
.get_mut(&swapped_pollfd.0.fd)
.get_mut(&swapped_pollfd.as_fd().as_raw_fd())
.unwrap()
.poll_fds_index = data.poll_fds_index;
}
@ -208,7 +202,7 @@ impl Poller {
/// Waits for I/O events with an optional timeout.
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
log::trace!(
"wait: notify_read={}, timeout={:?}",
"wait: notify_read={:?}, timeout={:?}",
self.notify_read,
timeout
);
@ -233,21 +227,33 @@ impl Poller {
fds = self.operations_complete.wait(fds).unwrap();
}
// Convert the timeout to milliseconds.
let timeout_ms = deadline
.map(|deadline| {
let timeout = deadline.saturating_duration_since(Instant::now());
// Round up to a whole millisecond.
let mut ms = timeout.as_millis().try_into().unwrap_or(std::u64::MAX);
if Duration::from_millis(ms) < timeout {
ms = ms.saturating_add(1);
}
ms.try_into().unwrap_or(std::i32::MAX)
})
.unwrap_or(-1);
// Perform the poll.
let num_events = poll(&mut fds.poll_fds, deadline)?;
let notified = fds.poll_fds[0].0.revents != 0;
let num_events = poll(&mut fds.poll_fds, timeout_ms)?;
let notified = !fds.poll_fds[0].revents().is_empty();
let num_fd_events = if notified { num_events - 1 } else { num_events };
log::trace!(
"new events: notify_read={}, num={}",
"new events: notify_read={:?}, num={}",
self.notify_read,
num_events
);
// Read all notifications.
if notified {
while syscall!(read(self.notify_read, &mut [0; 64] as *mut _ as *mut _, 64)).is_ok()
{
}
while read(&self.notify_read, &mut [0; 64]).is_ok() {}
}
// If the only event that occurred during polling was notification and it wasn't to
@ -263,17 +269,20 @@ impl Poller {
events.inner.reserve(num_fd_events);
for fd_data in fds.fd_data.values_mut() {
let PollFd(poll_fd) = &mut fds.poll_fds[fd_data.poll_fds_index];
if poll_fd.revents != 0 {
let poll_fd = &mut fds.poll_fds[fd_data.poll_fds_index];
if !poll_fd.revents().is_empty() {
// Store event
events.inner.push(Event {
key: fd_data.key,
readable: poll_fd.revents & READ_REVENTS != 0,
writable: poll_fd.revents & WRITE_REVENTS != 0,
readable: poll_fd.revents().intersects(read_events()),
writable: poll_fd.revents().intersects(write_events()),
});
// Remove interest if necessary
if fd_data.remove {
poll_fd.events = 0;
*poll_fd = PollFd::from_borrowed_fd(
unsafe { BorrowedFd::borrow_raw(poll_fd.as_fd().as_raw_fd()) },
PollFlags::empty(),
);
}
if events.inner.len() == num_fd_events {
@ -291,7 +300,7 @@ impl Poller {
/// Sends a notification to wake up the current or next `wait()` call.
pub fn notify(&self) -> io::Result<()> {
log::trace!("notify: notify_read={}", self.notify_read);
log::trace!("notify: notify_read={:?}", self.notify_read);
if !self.notified.swap(true, Ordering::SeqCst) {
self.notify_inner()?;
@ -326,44 +335,45 @@ impl Poller {
/// Wake the current thread that is calling `wait`.
fn notify_inner(&self) -> io::Result<()> {
syscall!(write(self.notify_write, &0_u8 as *const _ as *const _, 1))?;
write(&self.notify_write, &[0; 1])?;
Ok(())
}
/// Remove a notification created by `notify_inner`.
fn pop_notification(&self) -> io::Result<()> {
syscall!(read(self.notify_read, &mut [0; 1] as *mut _ as *mut _, 1))?;
read(&self.notify_read, &mut [0; 1])?;
Ok(())
}
}
impl Drop for Poller {
fn drop(&mut self) {
log::trace!("drop: notify_read={}", self.notify_read);
let _ = syscall!(close(self.notify_read));
let _ = syscall!(close(self.notify_write));
log::trace!("drop: notify_read={:?}", self.notify_read);
}
}
/// Get the input poll events for the given event.
fn poll_events(ev: Event) -> libc::c_short {
fn poll_events(ev: Event) -> PollFlags {
(if ev.readable {
libc::POLLIN | libc::POLLPRI
PollFlags::IN | PollFlags::PRI
} else {
0
PollFlags::empty()
}) | (if ev.writable {
libc::POLLOUT | libc::POLLWRBAND
PollFlags::OUT | PollFlags::WRBAND
} else {
0
PollFlags::empty()
})
}
/// Returned poll events for reading.
const READ_REVENTS: libc::c_short = libc::POLLIN | libc::POLLPRI | libc::POLLHUP | libc::POLLERR;
fn read_events() -> PollFlags {
PollFlags::IN | PollFlags::PRI | PollFlags::HUP | PollFlags::ERR
}
/// Returned poll events for writing.
const WRITE_REVENTS: libc::c_short =
libc::POLLOUT | libc::POLLWRBAND | libc::POLLHUP | libc::POLLERR;
fn write_events() -> PollFlags {
PollFlags::OUT | PollFlags::WRBAND | PollFlags::HUP | PollFlags::ERR
}
/// A list of reported I/O events.
pub struct Events {
@ -382,36 +392,6 @@ impl Events {
}
}
/// Helper function to call poll.
fn poll(fds: &mut [PollFd], deadline: Option<Instant>) -> io::Result<usize> {
loop {
// Convert the timeout to milliseconds.
let timeout_ms = deadline
.map(|deadline| {
let timeout = deadline.saturating_duration_since(Instant::now());
// Round up to a whole millisecond.
let mut ms = timeout.as_millis().try_into().unwrap_or(std::u64::MAX);
if Duration::from_millis(ms) < timeout {
ms = ms.saturating_add(1);
}
ms.try_into().unwrap_or(std::i32::MAX)
})
.unwrap_or(-1);
match syscall!(poll(
fds.as_mut_ptr() as *mut libc::pollfd,
fds.len() as libc::nfds_t,
timeout_ms,
)) {
Ok(num_events) => break Ok(num_events as usize),
// poll returns EAGAIN if we can retry it.
Err(e) if e.raw_os_error() == Some(libc::EAGAIN) => continue,
Err(e) => return Err(e),
}
}
}
fn cvt_mode_as_remove(mode: PollMode) -> io::Result<bool> {
match mode {
PollMode::Oneshot => Ok(true),

View File

@ -2,27 +2,29 @@
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::ptr;
use std::time::Duration;
#[cfg(not(polling_no_io_safety))]
use std::os::unix::io::{AsFd, BorrowedFd};
use rustix::fd::OwnedFd;
use rustix::io::{fcntl_getfd, fcntl_setfd, port, FdFlags, PollFlags};
use crate::{Event, PollMode};
/// Interface to event ports.
#[derive(Debug)]
pub struct Poller {
/// File descriptor for the port instance.
port_fd: RawFd,
port_fd: OwnedFd,
}
impl Poller {
/// Creates a new poller.
pub fn new() -> io::Result<Poller> {
let port_fd = syscall!(port_create())?;
let flags = syscall!(fcntl(port_fd, libc::F_GETFD))?;
syscall!(fcntl(port_fd, libc::F_SETFD, flags | libc::FD_CLOEXEC))?;
let port_fd = port::port_create()?;
let flags = fcntl_getfd(&port_fd)?;
fcntl_setfd(&port_fd, flags | FdFlags::CLOEXEC)?;
Ok(Poller { port_fd })
}
@ -45,12 +47,12 @@ impl Poller {
/// Modifies an existing file descriptor.
pub fn modify(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
let mut flags = 0;
let mut flags = PollFlags::empty();
if ev.readable {
flags |= libc::POLLIN;
flags |= read_flags();
}
if ev.writable {
flags |= libc::POLLOUT;
flags |= write_flags();
}
if mode != PollMode::Oneshot {
@ -59,27 +61,20 @@ impl Poller {
));
}
syscall!(port_associate(
self.port_fd,
libc::PORT_SOURCE_FD,
fd as _,
flags as _,
ev.key as _,
))?;
unsafe {
port::port_associate_fd(&self.port_fd, fd, flags, ev.key as _)?;
}
Ok(())
}
/// Deletes a file descriptor.
pub fn delete(&self, fd: RawFd) -> io::Result<()> {
if let Err(e) = syscall!(port_dissociate(
self.port_fd,
libc::PORT_SOURCE_FD,
fd as usize,
)) {
match e.raw_os_error().unwrap() {
libc::ENOENT => return Ok(()),
_ => return Err(e),
let result = unsafe { port::port_dissociate_fd(&self.port_fd, fd) };
if let Err(e) = result {
match e {
rustix::io::Errno::NOENT => return Ok(()),
_ => return Err(e.into()),
}
}
@ -88,47 +83,28 @@ impl Poller {
/// Waits for I/O events with an optional timeout.
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
let mut timeout = timeout.map(|t| libc::timespec {
tv_sec: t.as_secs() as libc::time_t,
tv_nsec: t.subsec_nanos() as libc::c_long,
});
let mut nget: u32 = 1;
// Wait for I/O events.
let res = syscall!(port_getn(
self.port_fd,
events.list.as_mut_ptr() as *mut libc::port_event,
events.list.len() as libc::c_uint,
&mut nget as _,
match &mut timeout {
None => ptr::null_mut(),
Some(t) => t,
}
));
let res = port::port_getn(&self.port_fd, &mut events.list, 1, timeout);
// Event ports sets the return value to -1 and returns ETIME on timer expire. The number of
// returned events is stored in nget, but in our case it should always be 0 since we set
// nget to 1 initially.
let nevents = match res {
Err(e) => match e.raw_os_error().unwrap() {
libc::ETIME => 0,
_ => return Err(e),
},
Ok(_) => nget as usize,
};
events.len = nevents;
if let Err(e) = res {
match e {
rustix::io::Errno::TIME => {}
_ => return Err(e.into()),
}
}
Ok(())
}
/// Sends a notification to wake up the current or next `wait()` call.
pub fn notify(&self) -> io::Result<()> {
const PORT_SOURCE_USER: i32 = 3;
// Use port_send to send a notification to the port.
syscall!(port_send(
self.port_fd,
libc::PORT_SOURCE_USER,
crate::NOTIFY_KEY as _
))?;
port::port_send(&self.port_fd, PORT_SOURCE_USER, crate::NOTIFY_KEY as _)?;
Ok(())
}
@ -136,38 +112,30 @@ impl Poller {
impl AsRawFd for Poller {
fn as_raw_fd(&self) -> RawFd {
self.port_fd
self.port_fd.as_raw_fd()
}
}
#[cfg(not(polling_no_io_safety))]
impl AsFd for Poller {
fn as_fd(&self) -> BorrowedFd<'_> {
// SAFETY: lifetime is bound by self
unsafe { BorrowedFd::borrow_raw(self.port_fd) }
}
}
impl Drop for Poller {
fn drop(&mut self) {
let _ = syscall!(close(self.port_fd));
self.port_fd.as_fd()
}
}
/// Poll flags for all possible readability events.
fn read_flags() -> libc::c_short {
libc::POLLIN | libc::POLLHUP | libc::POLLERR | libc::POLLPRI
fn read_flags() -> PollFlags {
PollFlags::IN | PollFlags::HUP | PollFlags::ERR | PollFlags::PRI
}
/// Poll flags for all possible writability events.
fn write_flags() -> libc::c_short {
libc::POLLOUT | libc::POLLHUP | libc::POLLERR
fn write_flags() -> PollFlags {
PollFlags::OUT | PollFlags::HUP | PollFlags::ERR
}
/// A list of reported I/O events.
pub struct Events {
list: Box<[libc::port_event; 1024]>,
len: usize,
list: Vec<port::Event>,
}
unsafe impl Send for Events {}
@ -175,24 +143,17 @@ unsafe impl Send for Events {}
impl Events {
/// Creates an empty list.
pub fn new() -> Events {
let ev = libc::port_event {
portev_events: 0,
portev_source: 0,
portev_pad: 0,
portev_object: 0,
portev_user: 0 as _,
};
let list = Box::new([ev; 1024]);
let len = 0;
Events { list, len }
Events {
list: Vec::with_capacity(1024),
}
}
/// Iterates over I/O events.
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list[..self.len].iter().map(|ev| Event {
key: ev.portev_user as _,
readable: (ev.portev_events & read_flags() as libc::c_int) != 0,
writable: (ev.portev_events & write_flags() as libc::c_int) != 0,
self.list.iter().map(|ev| Event {
key: ev.userdata() as usize,
readable: PollFlags::from_bits_truncate(ev.events() as _).intersects(read_flags()),
writable: PollFlags::from_bits_truncate(ev.events() as _).intersects(write_flags()),
})
}
}