mirror of https://github.com/smol-rs/polling
feat: Add a pipe-based notifier to epoll
In some containers, eventfd is not available as it cannot be implemented securely in some hosts. This commit adds a fallback notifier that uses a pipe instead of eventfd. Closes #122 Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
parent
a521cd2c29
commit
2c279b871c
|
@ -55,6 +55,12 @@ jobs:
|
||||||
# the backend that uses poll, and is not a public API.
|
# the backend that uses poll, and is not a public API.
|
||||||
RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg polling_test_poll_backend
|
RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg polling_test_poll_backend
|
||||||
if: startsWith(matrix.os, 'ubuntu')
|
if: startsWith(matrix.os, 'ubuntu')
|
||||||
|
- run: cargo test
|
||||||
|
env:
|
||||||
|
# Note: This cfg is intended to make it easy for polling developers to test
|
||||||
|
# the backend that uses pipes, and is not a public API.
|
||||||
|
RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg polling_test_epoll_pipe
|
||||||
|
if: startsWith(matrix.os, 'ubuntu')
|
||||||
- run: cargo hack build --feature-powerset --no-dev-deps
|
- run: cargo hack build --feature-powerset --no-dev-deps
|
||||||
- name: Add rust-src
|
- name: Add rust-src
|
||||||
if: startsWith(matrix.rust, 'nightly')
|
if: startsWith(matrix.rust, 'nightly')
|
||||||
|
|
128
src/epoll.rs
128
src/epoll.rs
|
@ -7,7 +7,9 @@ use std::time::Duration;
|
||||||
|
|
||||||
use rustix::event::{epoll, eventfd, EventfdFlags};
|
use rustix::event::{epoll, eventfd, EventfdFlags};
|
||||||
use rustix::fd::OwnedFd;
|
use rustix::fd::OwnedFd;
|
||||||
use rustix::io::{read, write};
|
use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
|
||||||
|
use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
|
||||||
|
use rustix::pipe::{pipe, pipe_with, PipeFlags};
|
||||||
use rustix::time::{
|
use rustix::time::{
|
||||||
timerfd_create, timerfd_settime, Itimerspec, TimerfdClockId, TimerfdFlags, TimerfdTimerFlags,
|
timerfd_create, timerfd_settime, Itimerspec, TimerfdClockId, TimerfdFlags, TimerfdTimerFlags,
|
||||||
Timespec,
|
Timespec,
|
||||||
|
@ -20,8 +22,10 @@ use crate::{Event, PollMode};
|
||||||
pub struct Poller {
|
pub struct Poller {
|
||||||
/// File descriptor for the epoll instance.
|
/// File descriptor for the epoll instance.
|
||||||
epoll_fd: OwnedFd,
|
epoll_fd: OwnedFd,
|
||||||
/// File descriptor for the eventfd that produces notifications.
|
|
||||||
event_fd: OwnedFd,
|
/// Notifier used to wake up epoll.
|
||||||
|
notifier: Notifier,
|
||||||
|
|
||||||
/// File descriptor for the timerfd that produces timeouts.
|
/// File descriptor for the timerfd that produces timeouts.
|
||||||
timer_fd: Option<OwnedFd>,
|
timer_fd: Option<OwnedFd>,
|
||||||
}
|
}
|
||||||
|
@ -34,8 +38,8 @@ impl Poller {
|
||||||
// Use `epoll_create1` with `EPOLL_CLOEXEC`.
|
// Use `epoll_create1` with `EPOLL_CLOEXEC`.
|
||||||
let epoll_fd = epoll::create(epoll::CreateFlags::CLOEXEC)?;
|
let epoll_fd = epoll::create(epoll::CreateFlags::CLOEXEC)?;
|
||||||
|
|
||||||
// Set up eventfd and timerfd.
|
// Set up notifier and timerfd.
|
||||||
let event_fd = eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?;
|
let notifier = Notifier::new()?;
|
||||||
let timer_fd = timerfd_create(
|
let timer_fd = timerfd_create(
|
||||||
TimerfdClockId::Monotonic,
|
TimerfdClockId::Monotonic,
|
||||||
TimerfdFlags::CLOEXEC | TimerfdFlags::NONBLOCK,
|
TimerfdFlags::CLOEXEC | TimerfdFlags::NONBLOCK,
|
||||||
|
@ -44,7 +48,7 @@ impl Poller {
|
||||||
|
|
||||||
let poller = Poller {
|
let poller = Poller {
|
||||||
epoll_fd,
|
epoll_fd,
|
||||||
event_fd,
|
notifier,
|
||||||
timer_fd,
|
timer_fd,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -58,7 +62,7 @@ impl Poller {
|
||||||
}
|
}
|
||||||
|
|
||||||
poller.add(
|
poller.add(
|
||||||
poller.event_fd.as_raw_fd(),
|
poller.notifier.as_fd().as_raw_fd(),
|
||||||
Event::readable(crate::NOTIFY_KEY),
|
Event::readable(crate::NOTIFY_KEY),
|
||||||
PollMode::Oneshot,
|
PollMode::Oneshot,
|
||||||
)?;
|
)?;
|
||||||
|
@ -66,7 +70,7 @@ impl Poller {
|
||||||
|
|
||||||
tracing::trace!(
|
tracing::trace!(
|
||||||
epoll_fd = ?poller.epoll_fd.as_raw_fd(),
|
epoll_fd = ?poller.epoll_fd.as_raw_fd(),
|
||||||
event_fd = ?poller.event_fd.as_raw_fd(),
|
notifier = ?poller.notifier,
|
||||||
timer_fd = ?poller.timer_fd,
|
timer_fd = ?poller.timer_fd,
|
||||||
"new",
|
"new",
|
||||||
);
|
);
|
||||||
|
@ -201,10 +205,9 @@ impl Poller {
|
||||||
);
|
);
|
||||||
|
|
||||||
// Clear the notification (if received) and re-register interest in it.
|
// Clear the notification (if received) and re-register interest in it.
|
||||||
let mut buf = [0u8; 8];
|
self.notifier.clear();
|
||||||
let _ = read(&self.event_fd, &mut buf);
|
|
||||||
self.modify(
|
self.modify(
|
||||||
self.event_fd.as_fd(),
|
self.notifier.as_fd(),
|
||||||
Event::readable(crate::NOTIFY_KEY),
|
Event::readable(crate::NOTIFY_KEY),
|
||||||
PollMode::Oneshot,
|
PollMode::Oneshot,
|
||||||
)?;
|
)?;
|
||||||
|
@ -216,12 +219,11 @@ impl Poller {
|
||||||
let span = tracing::trace_span!(
|
let span = tracing::trace_span!(
|
||||||
"notify",
|
"notify",
|
||||||
epoll_fd = ?self.epoll_fd.as_raw_fd(),
|
epoll_fd = ?self.epoll_fd.as_raw_fd(),
|
||||||
event_fd = ?self.event_fd.as_raw_fd(),
|
notifier = ?self.notifier,
|
||||||
);
|
);
|
||||||
let _enter = span.enter();
|
let _enter = span.enter();
|
||||||
|
|
||||||
let buf: [u8; 8] = 1u64.to_ne_bytes();
|
self.notifier.notify();
|
||||||
let _ = write(&self.event_fd, &buf);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -243,7 +245,7 @@ impl Drop for Poller {
|
||||||
let span = tracing::trace_span!(
|
let span = tracing::trace_span!(
|
||||||
"drop",
|
"drop",
|
||||||
epoll_fd = ?self.epoll_fd.as_raw_fd(),
|
epoll_fd = ?self.epoll_fd.as_raw_fd(),
|
||||||
event_fd = ?self.event_fd.as_raw_fd(),
|
notifier = ?self.notifier,
|
||||||
timer_fd = ?self.timer_fd
|
timer_fd = ?self.timer_fd
|
||||||
);
|
);
|
||||||
let _enter = span.enter();
|
let _enter = span.enter();
|
||||||
|
@ -251,7 +253,7 @@ impl Drop for Poller {
|
||||||
if let Some(timer_fd) = self.timer_fd.take() {
|
if let Some(timer_fd) = self.timer_fd.take() {
|
||||||
let _ = self.delete(timer_fd.as_fd());
|
let _ = self.delete(timer_fd.as_fd());
|
||||||
}
|
}
|
||||||
let _ = self.delete(self.event_fd.as_fd());
|
let _ = self.delete(self.notifier.as_fd());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -365,3 +367,97 @@ impl EventExtra {
|
||||||
self.flags.contains(epoll::EventFlags::PRI)
|
self.flags.contains(epoll::EventFlags::PRI)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The notifier for Linux.
|
||||||
|
///
|
||||||
|
/// Certain container runtimes do not expose eventfd to the client, as it relies on the host and
|
||||||
|
/// can be used to "escape" the container under certain conditions. Gramine is the prime example,
|
||||||
|
/// see [here](gramine). In this case, fall back to using a pipe.
|
||||||
|
///
|
||||||
|
/// [gramine]: https://gramine.readthedocs.io/en/stable/manifest-syntax.html#allowing-eventfd
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum Notifier {
|
||||||
|
/// The primary notifier, using eventfd.
|
||||||
|
EventFd(OwnedFd),
|
||||||
|
|
||||||
|
/// The fallback notifier, using a pipe.
|
||||||
|
Pipe {
|
||||||
|
/// The read end of the pipe.
|
||||||
|
read_pipe: OwnedFd,
|
||||||
|
|
||||||
|
/// The write end of the pipe.
|
||||||
|
write_pipe: OwnedFd,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Notifier {
|
||||||
|
/// Create a new notifier.
|
||||||
|
fn new() -> io::Result<Self> {
|
||||||
|
// Skip eventfd for testing if necessary.
|
||||||
|
if !cfg!(polling_test_epoll_pipe) {
|
||||||
|
// Try to create an eventfd.
|
||||||
|
match eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK) {
|
||||||
|
Ok(fd) => {
|
||||||
|
tracing::trace!("created eventfd for notifier");
|
||||||
|
return Ok(Notifier::EventFd(fd));
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(err) => {
|
||||||
|
tracing::warn!(
|
||||||
|
"eventfd() failed with error ({}), falling back to pipe",
|
||||||
|
err
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let (read, write) = pipe_with(PipeFlags::CLOEXEC).or_else(|_| {
|
||||||
|
let (read, write) = pipe()?;
|
||||||
|
fcntl_setfd(&read, fcntl_getfd(&read)? | FdFlags::CLOEXEC)?;
|
||||||
|
fcntl_setfd(&write, fcntl_getfd(&write)? | FdFlags::CLOEXEC)?;
|
||||||
|
io::Result::Ok((read, write))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
fcntl_setfl(&read, fcntl_getfl(&read)? | OFlags::NONBLOCK)?;
|
||||||
|
Ok(Notifier::Pipe {
|
||||||
|
read_pipe: read,
|
||||||
|
write_pipe: write,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The file descriptor to register in the poller.
|
||||||
|
fn as_fd(&self) -> BorrowedFd<'_> {
|
||||||
|
match self {
|
||||||
|
Notifier::EventFd(fd) => fd.as_fd(),
|
||||||
|
Notifier::Pipe {
|
||||||
|
read_pipe: read, ..
|
||||||
|
} => read.as_fd(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Notify the poller.
|
||||||
|
fn notify(&self) {
|
||||||
|
match self {
|
||||||
|
Self::EventFd(fd) => {
|
||||||
|
let buf: [u8; 8] = 1u64.to_ne_bytes();
|
||||||
|
let _ = write(fd, &buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
Self::Pipe { write_pipe, .. } => {
|
||||||
|
write(write_pipe, &[0; 1]).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Clear the notification.
|
||||||
|
fn clear(&self) {
|
||||||
|
match self {
|
||||||
|
Self::EventFd(fd) => {
|
||||||
|
let mut buf = [0u8; 8];
|
||||||
|
let _ = read(fd, &mut buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
Self::Pipe { read_pipe, .. } => while read(read_pipe, &mut [0u8; 1024]).is_ok() {},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue