Add level and edge triggered modes to the poller (#59)

* Add level and edge triggered modes to the poller

* Refractor error handling

* Add tests for new modes
This commit is contained in:
John Nunley 2022-12-30 14:43:47 -08:00 committed by GitHub
parent bc56d1fb38
commit 181acc67d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 422 additions and 56 deletions

View File

@ -9,7 +9,7 @@ use std::time::Duration;
#[cfg(not(polling_no_io_safety))]
use std::os::unix::io::{AsFd, BorrowedFd};
use crate::Event;
use crate::{Event, PollMode};
/// Interface to epoll.
#[derive(Debug)]
@ -67,7 +67,7 @@ impl Poller {
};
if let Some(timer_fd) = timer_fd {
poller.add(timer_fd, Event::none(crate::NOTIFY_KEY))?;
poller.add(timer_fd, Event::none(crate::NOTIFY_KEY), PollMode::Oneshot)?;
}
poller.add(
@ -77,6 +77,7 @@ impl Poller {
readable: true,
writable: false,
},
PollMode::Oneshot,
)?;
log::trace!(
@ -88,16 +89,26 @@ impl Poller {
Ok(poller)
}
/// Whether this poller supports level-triggered events.
pub fn supports_level(&self) -> bool {
true
}
/// Whether the poller supports edge-triggered events.
pub fn supports_edge(&self) -> bool {
true
}
/// Adds a new file descriptor.
pub fn add(&self, fd: RawFd, ev: Event) -> io::Result<()> {
pub fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
log::trace!("add: epoll_fd={}, fd={}, ev={:?}", self.epoll_fd, fd, ev);
self.ctl(libc::EPOLL_CTL_ADD, fd, Some(ev))
self.ctl(libc::EPOLL_CTL_ADD, fd, Some((ev, mode)))
}
/// Modifies an existing file descriptor.
pub fn modify(&self, fd: RawFd, ev: Event) -> io::Result<()> {
pub fn modify(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
log::trace!("modify: epoll_fd={}, fd={}, ev={:?}", self.epoll_fd, fd, ev);
self.ctl(libc::EPOLL_CTL_MOD, fd, Some(ev))
self.ctl(libc::EPOLL_CTL_MOD, fd, Some((ev, mode)))
}
/// Deletes a file descriptor.
@ -140,6 +151,7 @@ impl Poller {
readable: true,
writable: false,
},
PollMode::Oneshot,
)?;
}
@ -181,6 +193,7 @@ impl Poller {
readable: true,
writable: false,
},
PollMode::Oneshot,
)?;
Ok(())
}
@ -203,9 +216,13 @@ impl Poller {
}
/// Passes arguments to `epoll_ctl`.
fn ctl(&self, op: libc::c_int, fd: RawFd, ev: Option<Event>) -> io::Result<()> {
let mut ev = ev.map(|ev| {
let mut flags = libc::EPOLLONESHOT;
fn ctl(&self, op: libc::c_int, fd: RawFd, ev: Option<(Event, PollMode)>) -> io::Result<()> {
let mut ev = ev.map(|(ev, mode)| {
let mut flags = match mode {
PollMode::Oneshot => libc::EPOLLONESHOT,
PollMode::Level => 0,
PollMode::Edge => libc::EPOLLET,
};
if ev.readable {
flags |= read_flags();
}

View File

@ -10,7 +10,7 @@ use std::time::Duration;
#[cfg(not(polling_no_io_safety))]
use std::os::unix::io::{AsFd, BorrowedFd};
use crate::Event;
use crate::{Event, PollMode};
/// Interface to kqueue.
#[derive(Debug)]
@ -47,6 +47,7 @@ impl Poller {
readable: true,
writable: false,
},
PollMode::Oneshot,
)?;
log::trace!(
@ -57,25 +58,41 @@ impl Poller {
Ok(poller)
}
/// Whether this poller supports level-triggered events.
pub fn supports_level(&self) -> bool {
true
}
/// Whether this poller supports edge-triggered events.
pub fn supports_edge(&self) -> bool {
true
}
/// Adds a new file descriptor.
pub fn add(&self, fd: RawFd, ev: Event) -> io::Result<()> {
pub fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
// File descriptors don't need to be added explicitly, so just modify the interest.
self.modify(fd, ev)
self.modify(fd, ev, mode)
}
/// Modifies an existing file descriptor.
pub fn modify(&self, fd: RawFd, ev: Event) -> io::Result<()> {
pub fn modify(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
if fd != self.read_stream.as_raw_fd() {
log::trace!("add: kqueue_fd={}, fd={}, ev={:?}", self.kqueue_fd, fd, ev);
}
let mode_flags = match mode {
PollMode::Oneshot => libc::EV_ONESHOT,
PollMode::Level => 0,
PollMode::Edge => libc::EV_CLEAR,
};
let read_flags = if ev.readable {
libc::EV_ADD | libc::EV_ONESHOT
libc::EV_ADD | mode_flags
} else {
libc::EV_DELETE
};
let write_flags = if ev.writable {
libc::EV_ADD | libc::EV_ONESHOT
libc::EV_ADD | mode_flags
} else {
libc::EV_DELETE
};
@ -127,7 +144,7 @@ impl Poller {
/// Deletes a file descriptor.
pub fn delete(&self, fd: RawFd) -> io::Result<()> {
// Simply delete interest in the file descriptor.
self.modify(fd, Event::none(0))
self.modify(fd, Event::none(0), PollMode::Oneshot)
}
/// Waits for I/O events with an optional timeout.
@ -166,6 +183,7 @@ impl Poller {
readable: true,
writable: false,
},
PollMode::Oneshot,
)?;
Ok(())

View File

@ -8,8 +8,10 @@
//! - [poll](https://en.wikipedia.org/wiki/Poll_(Unix)): VxWorks, Fuchsia, other Unix systems
//! - [wepoll](https://github.com/piscisaureus/wepoll): Windows, Wine (version 7.13+)
//!
//! Polling is done in oneshot mode, which means interest in I/O events needs to be re-enabled
//! after an event is delivered if we're interested in the next event of the same kind.
//! By default, polling is done in oneshot mode, which means interest in I/O events needs to
//! be re-enabled after an event is delivered if we're interested in the next event of the same
//! kind. However, level and edge triggered modes are also available for certain operating
//! systems. See the documentation of the [`PollMode`] type for more information.
//!
//! Only one thread can be waiting for I/O events at a time.
//!
@ -132,6 +134,40 @@ pub struct Event {
pub writable: bool,
}
/// The mode in which the poller waits for I/O events.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[non_exhaustive]
pub enum PollMode {
/// Poll in oneshot mode.
///
/// In this mode, the poller will only deliver one event per file descriptor or socket.
/// Once an event has been delivered, interest in the event needs to be re-enabled
/// by calling `Poller::modify` or `Poller::add`.
///
/// This is the default mode.
Oneshot,
/// Poll in level-triggered mode.
///
/// Once an event has been delivered, polling will continue to deliver that event
/// until interest in the event is disabled by calling `Poller::modify` or `Poller::delete`.
///
/// Not all operating system support this mode. Trying to register a file descriptor with
/// this mode in an unsupported operating system will raise an error. You can check if
/// the operating system supports this mode by calling `Poller::supports_level`.
Level,
/// Poll in edge-triggered mode.
///
/// Once an event has been delivered, polling will not deliver that event again unless
/// a new event occurs.
///
/// Not all operating system support this mode. Trying to register a file descriptor with
/// this mode in an unsupported operating system will raise an error. You can check if
/// the operating system supports this mode by calling `Poller::supports_edge`.
Edge,
}
impl Event {
/// All kinds of events (readable and writable).
///
@ -204,6 +240,16 @@ impl Poller {
})
}
/// Tell whether or not this `Poller` supports level-triggered polling.
pub fn supports_level(&self) -> bool {
self.poller.supports_level()
}
/// Tell whether or not this `Poller` supports edge-triggered polling.
pub fn supports_edge(&self) -> bool {
self.poller.supports_edge()
}
/// Adds a file descriptor or socket to the poller.
///
/// A file descriptor or socket is considered readable or writable when a read or write
@ -248,13 +294,31 @@ impl Poller {
/// # std::io::Result::Ok(())
/// ```
pub fn add(&self, source: impl Source, interest: Event) -> io::Result<()> {
self.add_with_mode(source, interest, PollMode::Oneshot)
}
/// Adds a file descriptor or socket to the poller in the specified mode.
///
/// This is identical to the `add()` function, but allows specifying the
/// polling mode to use for this socket.
///
/// # Errors
///
/// If the operating system does not support the specified mode, this function
/// will return an error.
pub fn add_with_mode(
&self,
source: impl Source,
interest: Event,
mode: PollMode,
) -> io::Result<()> {
if interest.key == NOTIFY_KEY {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"the key is not allowed to be `usize::MAX`",
));
}
self.poller.add(source.raw(), interest)
self.poller.add(source.raw(), interest, mode)
}
/// Modifies the interest in a file descriptor or socket.
@ -326,13 +390,37 @@ impl Poller {
/// # std::io::Result::Ok(())
/// ```
pub fn modify(&self, source: impl Source, interest: Event) -> io::Result<()> {
self.modify_with_mode(source, interest, PollMode::Oneshot)
}
/// Modifies interest in a file descriptor or socket to the poller, but with the specified
/// mode.
///
/// This is identical to the `modify()` function, but allows specifying the polling mode
/// to use for this socket.
///
/// # Performance Notes
///
/// This function can be used to change a source from one polling mode to another. However,
/// on some platforms, this switch can cause delays in the delivery of events.
///
/// # Errors
///
/// If the operating system does not support the specified mode, this function will return
/// an error.
pub fn modify_with_mode(
&self,
source: impl Source,
interest: Event,
mode: PollMode,
) -> io::Result<()> {
if interest.key == NOTIFY_KEY {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"the key is not allowed to be `usize::MAX`",
));
}
self.poller.modify(source.raw(), interest)
self.poller.modify(source.raw(), interest, mode)
}
/// Removes a file descriptor or socket from the poller.
@ -577,3 +665,14 @@ cfg_if! {
}
}
}
#[allow(unused)]
fn unsupported_error(err: impl Into<String>) -> io::Error {
io::Error::new(
#[cfg(not(polling_no_unsupported_error_kind))]
io::ErrorKind::Unsupported,
#[cfg(polling_no_unsupported_error_kind)]
io::ErrorKind::Other,
err.into(),
)
}

View File

@ -11,7 +11,7 @@ use std::time::{Duration, Instant};
// std::os::unix doesn't exist on Fuchsia
use libc::c_int as RawFd;
use crate::Event;
use crate::{Event, PollMode};
/// Interface to poll.
#[derive(Debug)]
@ -77,6 +77,8 @@ struct FdData {
poll_fds_index: usize,
/// The key of the `Event` associated with this file descriptor.
key: usize,
/// Whether to remove this file descriptor from the poller on the next call to `wait`.
remove: bool,
}
impl Poller {
@ -117,8 +119,18 @@ impl Poller {
})
}
/// Whether this poller supports level-triggered events.
pub fn supports_level(&self) -> bool {
true
}
/// Whether the poller supports edge-triggered events.
pub fn supports_edge(&self) -> bool {
false
}
/// Adds a new file descriptor.
pub fn add(&self, fd: RawFd, ev: Event) -> io::Result<()> {
pub fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
if fd == self.notify_read || fd == self.notify_write {
return Err(io::Error::from(io::ErrorKind::InvalidInput));
}
@ -141,6 +153,7 @@ impl Poller {
FdData {
poll_fds_index,
key: ev.key,
remove: cvt_mode_as_remove(mode)?,
},
);
@ -155,7 +168,7 @@ impl Poller {
}
/// Modifies an existing file descriptor.
pub fn modify(&self, fd: RawFd, ev: Event) -> io::Result<()> {
pub fn modify(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
log::trace!(
"modify: notify_read={}, fd={}, ev={:?}",
self.notify_read,
@ -168,6 +181,7 @@ impl Poller {
data.key = ev.key;
let poll_fds_index = data.poll_fds_index;
fds.poll_fds[poll_fds_index].0.events = poll_events(ev);
data.remove = cvt_mode_as_remove(mode)?;
Ok(())
})
@ -257,8 +271,10 @@ impl Poller {
readable: poll_fd.revents & READ_REVENTS != 0,
writable: poll_fd.revents & WRITE_REVENTS != 0,
});
// Remove interest
poll_fd.events = 0;
// Remove interest if necessary
if fd_data.remove {
poll_fd.events = 0;
}
if events.inner.len() == num_fd_events {
break;
@ -299,7 +315,7 @@ impl Poller {
let _ = self.pop_notification();
}
let res = f(&mut *fds);
let res = f(&mut fds);
if self.waiting_operations.fetch_sub(1, Ordering::SeqCst) == 1 {
self.operations_complete.notify_one();
@ -395,3 +411,13 @@ fn poll(fds: &mut [PollFd], deadline: Option<Instant>) -> io::Result<usize> {
}
}
}
fn cvt_mode_as_remove(mode: PollMode) -> io::Result<bool> {
match mode {
PollMode::Oneshot => Ok(true),
PollMode::Level => Ok(false),
PollMode::Edge => Err(crate::unsupported_error(
"edge-triggered I/O events are not supported in poll()",
)),
}
}

View File

@ -9,7 +9,7 @@ use std::time::Duration;
#[cfg(not(polling_no_io_safety))]
use std::os::unix::io::{AsFd, BorrowedFd};
use crate::Event;
use crate::{Event, PollMode};
/// Interface to event ports.
#[derive(Debug)]
@ -46,19 +46,30 @@ impl Poller {
readable: true,
writable: false,
},
PollMode::Oneshot,
)?;
Ok(poller)
}
/// Whether this poller supports level-triggered events.
pub fn supports_level(&self) -> bool {
false
}
/// Whether this poller supports edge-triggered events.
pub fn supports_edge(&self) -> bool {
false
}
/// Adds a file descriptor.
pub fn add(&self, fd: RawFd, ev: Event) -> io::Result<()> {
pub fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
// File descriptors don't need to be added explicitly, so just modify the interest.
self.modify(fd, ev)
self.modify(fd, ev, mode)
}
/// Modifies an existing file descriptor.
pub fn modify(&self, fd: RawFd, ev: Event) -> io::Result<()> {
pub fn modify(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
let mut flags = 0;
if ev.readable {
flags |= libc::POLLIN;
@ -67,6 +78,12 @@ impl Poller {
flags |= libc::POLLOUT;
}
if let PollMode::Edge | PollMode::Level = mode {
return Err(crate::unsupported_error(
"this kind of event is not supported with event ports",
));
}
syscall!(port_associate(
self.port_fd,
libc::PORT_SOURCE_FD,
@ -74,6 +91,7 @@ impl Poller {
flags as _,
ev.key as _,
))?;
Ok(())
}
@ -134,6 +152,7 @@ impl Poller {
readable: true,
writable: false,
},
PollMode::Oneshot,
)?;
Ok(())

View File

@ -13,7 +13,7 @@ use std::os::windows::io::{AsHandle, BorrowedHandle};
use wepoll_ffi as we;
use crate::Event;
use crate::{Event, PollMode};
/// Calls a wepoll function and results in `io::Result`.
macro_rules! wepoll {
@ -42,11 +42,7 @@ impl Poller {
pub fn new() -> io::Result<Poller> {
let handle = unsafe { we::epoll_create1(0) };
if handle.is_null() {
return Err(io::Error::new(
#[cfg(not(polling_no_unsupported_error_kind))]
io::ErrorKind::Unsupported,
#[cfg(polling_no_unsupported_error_kind)]
io::ErrorKind::Other,
return Err(crate::unsupported_error(
format!(
"Failed to initialize Wepoll: {}\nThis usually only happens for old Windows or Wine.",
io::Error::last_os_error()
@ -58,21 +54,31 @@ impl Poller {
Ok(Poller { handle, notified })
}
/// Whether this poller supports level-triggered events.
pub fn supports_level(&self) -> bool {
true
}
/// Whether this poller supports edge-triggered events.
pub fn supports_edge(&self) -> bool {
false
}
/// Adds a socket.
pub fn add(&self, sock: RawSocket, ev: Event) -> io::Result<()> {
pub fn add(&self, sock: RawSocket, ev: Event, mode: PollMode) -> io::Result<()> {
log::trace!("add: handle={:?}, sock={}, ev={:?}", self.handle, sock, ev);
self.ctl(we::EPOLL_CTL_ADD, sock, Some(ev))
self.ctl(we::EPOLL_CTL_ADD, sock, Some((ev, mode)))
}
/// Modifies a socket.
pub fn modify(&self, sock: RawSocket, ev: Event) -> io::Result<()> {
pub fn modify(&self, sock: RawSocket, ev: Event, mode: PollMode) -> io::Result<()> {
log::trace!(
"modify: handle={:?}, sock={}, ev={:?}",
self.handle,
sock,
ev
);
self.ctl(we::EPOLL_CTL_MOD, sock, Some(ev))
self.ctl(we::EPOLL_CTL_MOD, sock, Some((ev, mode)))
}
/// Deletes a socket.
@ -150,22 +156,33 @@ impl Poller {
}
/// Passes arguments to `epoll_ctl`.
fn ctl(&self, op: u32, sock: RawSocket, ev: Option<Event>) -> io::Result<()> {
let mut ev = ev.map(|ev| {
let mut flags = we::EPOLLONESHOT;
if ev.readable {
flags |= READ_FLAGS;
}
if ev.writable {
flags |= WRITE_FLAGS;
}
we::epoll_event {
events: flags as u32,
data: we::epoll_data {
u64_: ev.key as u64,
},
}
});
fn ctl(&self, op: u32, sock: RawSocket, ev: Option<(Event, PollMode)>) -> io::Result<()> {
let mut ev = ev
.map(|(ev, mode)| {
let mut flags = match mode {
PollMode::Level => 0,
PollMode::Oneshot => we::EPOLLONESHOT,
PollMode::Edge => {
return Err(crate::unsupported_error(
"edge-triggered events are not supported with wepoll",
));
}
};
if ev.readable {
flags |= READ_FLAGS;
}
if ev.writable {
flags |= WRITE_FLAGS;
}
Ok(we::epoll_event {
events: flags as u32,
data: we::epoll_data {
u64_: ev.key as u64,
},
})
})
.transpose()?;
wepoll!(epoll_ctl(
self.handle,
op as c_int,

170
tests/other_modes.rs Normal file
View File

@ -0,0 +1,170 @@
//! Tests for level triggered and edge triggered mode.
#![allow(clippy::unused_io_amount)]
use std::io::{self, prelude::*};
use std::net::{TcpListener, TcpStream};
use std::time::Duration;
use polling::{Event, PollMode, Poller};
#[test]
fn level_triggered() {
// Create our streams.
let (mut reader, mut writer) = tcp_pair().unwrap();
let reader_token = 1;
// Create our poller and register our streams.
let poller = Poller::new().unwrap();
if poller
.add_with_mode(&reader, Event::readable(reader_token), PollMode::Level)
.is_err()
{
// Only panic if we're on a platform that should support level mode.
cfg_if::cfg_if! {
if #[cfg(any(target_os = "solaris", target_os = "illumos"))] {
return;
} else {
panic!("Level mode should be supported on this platform");
}
}
}
// Write some data to the writer.
let data = [1, 2, 3, 4, 5];
writer.write_all(&data).unwrap();
// A "readable" notification should be delivered.
let mut events = Vec::new();
poller
.wait(&mut events, Some(Duration::from_secs(10)))
.unwrap();
assert_eq!(events, [Event::readable(reader_token)]);
// If we read some of the data, the notification should still be available.
reader.read_exact(&mut [0; 3]).unwrap();
events.clear();
poller
.wait(&mut events, Some(Duration::from_secs(10)))
.unwrap();
assert_eq!(events, [Event::readable(reader_token)]);
// If we read the rest of the data, the notification should be gone.
reader.read_exact(&mut [0; 2]).unwrap();
events.clear();
poller
.wait(&mut events, Some(Duration::from_secs(0)))
.unwrap();
assert_eq!(events, []);
// After modifying the stream and sending more data, it should be oneshot.
poller
.modify_with_mode(&reader, Event::readable(reader_token), PollMode::Oneshot)
.unwrap();
writer.write(&data).unwrap();
events.clear();
// BUG: Somehow, the notification here is delayed?
poller
.wait(&mut events, Some(Duration::from_secs(10)))
.unwrap();
assert_eq!(events, [Event::readable(reader_token)]);
// After reading, the notification should vanish.
reader.read(&mut [0; 5]).unwrap();
events.clear();
poller
.wait(&mut events, Some(Duration::from_secs(0)))
.unwrap();
assert_eq!(events, []);
}
#[test]
fn edge_triggered() {
// Create our streams.
let (mut reader, mut writer) = tcp_pair().unwrap();
let reader_token = 1;
// Create our poller and register our streams.
let poller = Poller::new().unwrap();
if poller
.add_with_mode(&reader, Event::readable(reader_token), PollMode::Edge)
.is_err()
{
// Only panic if we're on a platform that should support level mode.
cfg_if::cfg_if! {
if #[cfg(all(
any(
target_os = "linux",
target_os = "android",
target_os = "macos",
target_os = "ios",
target_os = "tvos",
target_os = "watchos",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly"
),
not(polling_test_poll_backend)
))] {
panic!("Edge mode should be supported on this platform");
} else {
return;
}
}
}
// Write some data to the writer.
let data = [1, 2, 3, 4, 5];
writer.write_all(&data).unwrap();
// A "readable" notification should be delivered.
let mut events = Vec::new();
poller
.wait(&mut events, Some(Duration::from_secs(10)))
.unwrap();
assert_eq!(events, [Event::readable(reader_token)]);
// If we read some of the data, the notification should not still be available.
reader.read_exact(&mut [0; 3]).unwrap();
events.clear();
poller
.wait(&mut events, Some(Duration::from_secs(0)))
.unwrap();
assert_eq!(events, []);
// If we write more data, a notification should be delivered.
writer.write_all(&data).unwrap();
events.clear();
poller
.wait(&mut events, Some(Duration::from_secs(10)))
.unwrap();
assert_eq!(events, [Event::readable(reader_token)]);
// After modifying the stream and sending more data, it should be oneshot.
poller
.modify_with_mode(&reader, Event::readable(reader_token), PollMode::Oneshot)
.unwrap();
writer.write_all(&data).unwrap();
events.clear();
poller
.wait(&mut events, Some(Duration::from_secs(10)))
.unwrap();
assert_eq!(events, [Event::readable(reader_token)]);
}
fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> {
let listener = TcpListener::bind("127.0.0.1:0")?;
let a = TcpStream::connect(listener.local_addr()?)?;
let (b, _) = listener.accept()?;
Ok((a, b))
}