breaking: Extract the Events struct and make the Event struct opaque

* Add better documentation for the IOCP module

* Extract Events from Poller

This prevents the need to have an intermediate buffer to read events
from, reducing the need for an allocation and a copy. This is a breaking
change.

* Add event extra information

Foundation for more details later on.

* Add PRI and HUP events
* Fix various failing tests

- Make sure that waitable handles interact properly with the new
  infrastructure
- Fix failing doctests

* Review comments

- Make set_* take a boolean for the value of the flag
- Make Events !Sync
- Fix visibility modifiers
- Inline more methods
- Use a better strategy for testing

* Move completion packets into the Events buffer

This removes one of the mutexes that we have to lock.

* Review comments

Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
John Nunley 2023-08-14 10:03:20 -07:00 committed by GitHub
parent e42664d57e
commit a521cd2c29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 878 additions and 160 deletions

View File

@ -1,7 +1,7 @@
use std::io;
use std::net::TcpListener;
use polling::{Event, Poller};
use polling::{Event, Events, Poller};
fn main() -> io::Result<()> {
let l1 = TcpListener::bind("127.0.0.1:8001")?;
@ -19,12 +19,12 @@ fn main() -> io::Result<()> {
println!(" $ nc 127.0.0.1 8001");
println!(" $ nc 127.0.0.1 8002");
let mut events = Vec::new();
let mut events = Events::new();
loop {
events.clear();
poller.wait(&mut events, None)?;
for ev in &events {
for ev in events.iter() {
match ev.key {
1 => {
println!("Accept on l1");

View File

@ -13,7 +13,7 @@
))]
mod example {
use polling::os::kqueue::{PollerKqueueExt, Signal};
use polling::{PollMode, Poller};
use polling::{Events, PollMode, Poller};
pub(super) fn main2() {
// Create a poller.
@ -23,7 +23,7 @@ mod example {
let sigint = Signal(libc::SIGINT);
poller.add_filter(sigint, 1, PollMode::Oneshot).unwrap();
let mut events = vec![];
let mut events = Events::new();
println!("Press Ctrl+C to exit...");
@ -32,7 +32,7 @@ mod example {
poller.wait(&mut events, None).unwrap();
// Process events.
for ev in events.drain(..) {
for ev in events.iter() {
match ev.key {
1 => {
println!("SIGINT received");
@ -41,6 +41,8 @@ mod example {
_ => unreachable!(),
}
}
events.clear();
}
}
}

View File

@ -59,11 +59,7 @@ impl Poller {
poller.add(
poller.event_fd.as_raw_fd(),
Event {
key: crate::NOTIFY_KEY,
readable: true,
writable: false,
},
Event::readable(crate::NOTIFY_KEY),
PollMode::Oneshot,
)?;
}
@ -106,7 +102,7 @@ impl Poller {
&self.epoll_fd,
unsafe { rustix::fd::BorrowedFd::borrow_raw(fd) },
epoll::EventData::new_u64(ev.key as u64),
epoll_flags(&ev, mode),
epoll_flags(&ev, mode) | ev.extra.flags,
)?;
Ok(())
@ -126,7 +122,7 @@ impl Poller {
&self.epoll_fd,
fd,
epoll::EventData::new_u64(ev.key as u64),
epoll_flags(&ev, mode),
epoll_flags(&ev, mode) | ev.extra.flags,
)?;
Ok(())
@ -177,11 +173,7 @@ impl Poller {
// Set interest in timerfd.
self.modify(
timer_fd.as_fd(),
Event {
key: crate::NOTIFY_KEY,
readable: true,
writable: false,
},
Event::readable(crate::NOTIFY_KEY),
PollMode::Oneshot,
)?;
}
@ -213,11 +205,7 @@ impl Poller {
let _ = read(&self.event_fd, &mut buf);
self.modify(
self.event_fd.as_fd(),
Event {
key: crate::NOTIFY_KEY,
readable: true,
writable: false,
},
Event::readable(crate::NOTIFY_KEY),
PollMode::Oneshot,
)?;
Ok(())
@ -308,9 +296,9 @@ unsafe impl Send for Events {}
impl Events {
/// Creates an empty list.
pub fn new() -> Events {
pub fn with_capacity(cap: usize) -> Events {
Events {
list: epoll::EventVec::with_capacity(1024),
list: epoll::EventVec::with_capacity(cap),
}
}
@ -322,7 +310,58 @@ impl Events {
key: ev.data.u64() as usize,
readable: flags.intersects(read_flags()),
writable: flags.intersects(write_flags()),
extra: EventExtra { flags },
}
})
}
/// Clear the list.
pub fn clear(&mut self) {
self.list.clear();
}
/// Get the capacity of the list.
pub fn capacity(&self) -> usize {
self.list.capacity()
}
}
/// Extra information about this event.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EventExtra {
flags: epoll::EventFlags,
}
impl EventExtra {
/// Create an empty version of the data.
#[inline]
pub fn empty() -> EventExtra {
EventExtra {
flags: epoll::EventFlags::empty(),
}
}
/// Add the interrupt flag to this event.
#[inline]
pub fn set_hup(&mut self, active: bool) {
self.flags.set(epoll::EventFlags::HUP, active);
}
/// Add the priority flag to this event.
#[inline]
pub fn set_pri(&mut self, active: bool) {
self.flags.set(epoll::EventFlags::PRI, active);
}
/// Tell if the interrupt flag is set.
#[inline]
pub fn is_hup(&self) -> bool {
self.flags.contains(epoll::EventFlags::HUP)
}
/// Tell if the priority flag is set.
#[inline]
pub fn is_pri(&self) -> bool {
self.flags.contains(epoll::EventFlags::PRI)
}
}

View File

@ -66,7 +66,7 @@ impl AfdPollInfo {
}
}
#[derive(Default, Copy, Clone)]
#[derive(Default, Copy, Clone, PartialEq, Eq)]
#[repr(transparent)]
pub(super) struct AfdPollMask(u32);
@ -89,6 +89,44 @@ impl AfdPollMask {
pub(crate) fn intersects(self, other: AfdPollMask) -> bool {
(self.0 & other.0) != 0
}
/// Sets a flag.
pub(crate) fn set(&mut self, other: AfdPollMask, value: bool) {
if value {
*self |= other;
} else {
self.0 &= !other.0;
}
}
}
impl fmt::Debug for AfdPollMask {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
const FLAGS: &[(&str, AfdPollMask)] = &[
("RECEIVE", AfdPollMask::RECEIVE),
("RECEIVE_EXPEDITED", AfdPollMask::RECEIVE_EXPEDITED),
("SEND", AfdPollMask::SEND),
("DISCONNECT", AfdPollMask::DISCONNECT),
("ABORT", AfdPollMask::ABORT),
("LOCAL_CLOSE", AfdPollMask::LOCAL_CLOSE),
("ACCEPT", AfdPollMask::ACCEPT),
("CONNECT_FAIL", AfdPollMask::CONNECT_FAIL),
];
let mut first = true;
for (name, value) in FLAGS {
if self.intersects(*value) {
if !first {
write!(f, " | ")?;
}
first = false;
write!(f, "{}", name)?;
}
}
Ok(())
}
}
impl ops::BitOr for AfdPollMask {
@ -105,6 +143,20 @@ impl ops::BitOrAssign for AfdPollMask {
}
}
impl ops::BitAnd for AfdPollMask {
type Output = Self;
fn bitand(self, rhs: Self) -> Self {
AfdPollMask(self.0 & rhs.0)
}
}
impl ops::BitAndAssign for AfdPollMask {
fn bitand_assign(&mut self, rhs: Self) {
self.0 &= rhs.0;
}
}
pub(super) trait HasAfdInfo {
fn afd_info(self: Pin<&Self>) -> Pin<&UnsafeCell<AfdPollInfo>>;
}

View File

@ -75,26 +75,42 @@ pub(super) struct Poller {
/// List of currently active AFD instances.
///
/// Weak references are kept here so that the AFD handle is automatically dropped
/// when the last associated socket is dropped.
/// AFD acts as the actual source of the socket events. It's essentially running `WSAPoll` on
/// the sockets and then posting the events to the IOCP.
///
/// AFD instances can be keyed to an unlimited number of sockets. However, each AFD instance
/// polls their sockets linearly. Therefore, it is best to limit the number of sockets each AFD
/// instance is responsible for. The limit of 32 is chosen because that's what `wepoll` uses.
///
/// Weak references are kept here so that the AFD handle is automatically dropped when the last
/// associated socket is dropped.
afd: Mutex<Vec<Weak<Afd<Packet>>>>,
/// The state of the sources registered with this poller.
///
/// Each source is keyed by its raw socket ID.
sources: RwLock<HashMap<RawSocket, Packet>>,
/// The state of the waitable handles registered with this poller.
waitables: RwLock<HashMap<RawHandle, Packet>>,
/// Sockets with pending updates.
///
/// This list contains packets with sockets that need to have their AFD state adjusted by
/// calling the `update()` function on them. It's best to queue up packets as they need to
/// be updated and then run all of the updates before we start waiting on the IOCP, rather than
/// updating them as we come. If we're waiting on the IOCP updates should be run immediately.
pending_updates: ConcurrentQueue<Packet>,
/// Are we currently polling?
///
/// This indicates whether or not we are blocking on the IOCP, and is used to determine
/// whether pending updates should be run immediately or queued.
polling: AtomicBool,
/// A list of completion packets.
packets: Mutex<Vec<OverlappedEntry<Packet>>>,
/// The packet used to notify the poller.
///
/// This is a special-case packet that is used to wake up the poller when it is waiting.
notifier: Packet,
}
@ -119,7 +135,6 @@ impl Poller {
)))?;
let port = IoCompletionPort::new(0)?;
tracing::trace!(handle = ?port, "new");
Ok(Poller {
@ -129,7 +144,6 @@ impl Poller {
waitables: RwLock::new(HashMap::new()),
pending_updates: ConcurrentQueue::bounded(1024),
polling: AtomicBool::new(false),
packets: Mutex::new(Vec::with_capacity(1024)),
notifier: Arc::pin(
PacketInner::Wakeup {
_pinned: PhantomPinned,
@ -178,6 +192,7 @@ impl Poller {
// Create a new packet.
let socket_state = {
// Create a new socket state and assign an AFD handle to it.
let state = SocketState {
socket,
base_socket: base_socket(socket)?,
@ -189,6 +204,7 @@ impl Poller {
status: SocketStatus::Idle,
};
// We wrap this socket state in a Packet so the IOCP can use it.
Arc::pin(IoStatusBlock::from(PacketInner::Socket {
packet: UnsafeCell::new(AfdPollInfo::default()),
socket: Mutex::new(state),
@ -249,6 +265,7 @@ impl Poller {
// Set the new event.
if source.as_ref().set_events(interest, mode) {
// The packet needs to be updated.
self.update_packet(source)?;
}
@ -264,7 +281,7 @@ impl Poller {
);
let _enter = span.enter();
// Get a reference to the source.
// Remove the source from our associative map.
let source = {
let mut sources = lock!(self.sources.write());
@ -409,8 +426,8 @@ impl Poller {
);
let _enter = span.enter();
// Make sure we have a consistent timeout.
let deadline = timeout.and_then(|timeout| Instant::now().checked_add(timeout));
let mut packets = lock!(self.packets.lock());
let mut notified = false;
events.packets.clear();
@ -421,6 +438,7 @@ impl Poller {
let was_polling = self.polling.swap(true, Ordering::SeqCst);
debug_assert!(!was_polling);
// Even if we panic, we want to make sure we indicate that polling has stopped.
let guard = CallOnDrop(|| {
let was_polling = self.polling.swap(false, Ordering::SeqCst);
debug_assert!(was_polling);
@ -433,7 +451,7 @@ impl Poller {
let timeout = deadline.map(|t| t.saturating_duration_since(Instant::now()));
// Wait for I/O events.
let len = self.port.wait(&mut packets, timeout)?;
let len = self.port.wait(&mut events.completions, timeout)?;
tracing::trace!(
handle = ?self.port,
res = ?len,
@ -443,7 +461,7 @@ impl Poller {
drop(guard);
// Process all of the events.
for entry in packets.drain(..) {
for entry in events.completions.drain(..) {
let packet = entry.into_packet();
// Feed the event into the packet.
@ -500,18 +518,22 @@ impl Poller {
// If we failed to queue the update, we need to drain the queue first.
self.drain_update_queue(true)?;
// Loop back and try again.
}
}
/// Drain the update queue.
fn drain_update_queue(&self, limit: bool) -> io::Result<()> {
// Determine how many packets to process.
let max = if limit {
// Only drain the queue's capacity, since this could in theory run forever.
self.pending_updates.capacity().unwrap()
} else {
// Less of a concern if we're draining the queue prior to a poll operation.
std::usize::MAX
};
// Only drain the queue's capacity, since this could in theory run forever.
self.pending_updates
.try_iter()
.take(max)
@ -519,11 +541,14 @@ impl Poller {
}
/// Get a handle to the AFD reference.
///
/// This finds an AFD handle with less than 32 associated sockets, or creates a new one if
/// one does not exist.
fn afd_handle(&self) -> io::Result<Arc<Afd<Packet>>> {
const AFD_MAX_SIZE: usize = 32;
// Crawl the list and see if there are any existing AFD instances that we can use.
// Remove any unused AFD pointers.
// While we're here, remove any unused AFD pointers.
let mut afd_handles = lock!(self.afd.lock());
let mut i = 0;
while i < afd_handles.len() {
@ -561,7 +586,7 @@ impl Poller {
// Register the AFD instance with the I/O completion port.
self.port.register(&*afd, true)?;
// Insert a weak pointer to the AFD instance into the list.
// Insert a weak pointer to the AFD instance into the list for other sockets.
afd_handles.push(Arc::downgrade(&afd));
Ok(afd)
@ -584,22 +609,77 @@ impl AsHandle for Poller {
pub(super) struct Events {
/// List of IOCP packets.
packets: Vec<Event>,
/// Buffer for completion packets.
completions: Vec<OverlappedEntry<Packet>>,
}
unsafe impl Send for Events {}
impl Events {
/// Creates an empty list of events.
pub(super) fn new() -> Events {
pub fn with_capacity(cap: usize) -> Events {
Events {
packets: Vec::with_capacity(1024),
packets: Vec::with_capacity(cap),
completions: Vec::with_capacity(cap),
}
}
/// Iterate over I/O events.
pub(super) fn iter(&self) -> impl Iterator<Item = Event> + '_ {
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.packets.iter().copied()
}
/// Clear the list of events.
pub fn clear(&mut self) {
self.packets.clear();
}
/// The capacity of the list of events.
pub fn capacity(&self) -> usize {
self.packets.capacity()
}
}
/// Extra information about an event.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct EventExtra {
/// Flags associated with this event.
flags: AfdPollMask,
}
impl EventExtra {
/// Create a new, empty version of this struct.
#[inline]
pub fn empty() -> EventExtra {
EventExtra {
flags: AfdPollMask::empty(),
}
}
/// Is this a HUP event?
#[inline]
pub fn is_hup(&self) -> bool {
self.flags.intersects(AfdPollMask::ABORT)
}
/// Is this a PRI event?
#[inline]
pub fn is_pri(&self) -> bool {
self.flags.intersects(AfdPollMask::RECEIVE_EXPEDITED)
}
/// Set up a listener for HUP events.
#[inline]
pub fn set_hup(&mut self, active: bool) {
self.flags.set(AfdPollMask::ABORT, active);
}
/// Set up a listener for PRI events.
#[inline]
pub fn set_pri(&mut self, active: bool) {
self.flags.set(AfdPollMask::RECEIVE_EXPEDITED, active);
}
}
/// A packet used to wake up the poller with an event.
@ -624,6 +704,8 @@ impl CompletionPacket {
}
/// The type of our completion packet.
///
/// It needs to be pinned, since it contains data that is expected by IOCP not to be moved.
type Packet = Pin<Arc<PacketUnwrapped>>;
type PacketUnwrapped = IoStatusBlock<PacketInner>;
@ -698,9 +780,11 @@ impl PacketUnwrapped {
socket.mode = mode;
socket.interest_error = true;
// If there was a change, indicate that we need an update.
match socket.status {
SocketStatus::Polling { readable, writable } => {
(interest.readable && !readable) || (interest.writable && !writable)
SocketStatus::Polling { flags } => {
let our_flags = event_to_afd_mask(socket.interest, socket.interest_error);
our_flags != flags
}
_ => true,
}
@ -715,11 +799,18 @@ impl PacketUnwrapped {
// Update if there is no ongoing wait.
handle.status.is_idle()
}
_ => false,
_ => true,
}
}
/// Update the socket and install the new status in AFD.
///
/// This function does one of the following:
///
/// - Nothing, if the packet is waiting on being dropped anyways.
/// - Cancels the ongoing poll, if we want to poll for different events than we are currently
/// polling for.
/// - Starts a new AFD_POLL operation, if we are not currently polling.
fn update(self: Pin<Arc<Self>>) -> io::Result<()> {
let mut socket = match self.as_ref().data().project_ref() {
PacketInnerProj::Socket { socket, .. } => lock!(socket.lock()),
@ -780,12 +871,11 @@ impl PacketUnwrapped {
// Check the current status.
match socket.status {
SocketStatus::Polling { readable, writable } => {
SocketStatus::Polling { flags } => {
// If we need to poll for events aside from what we are currently polling, we need
// to update the packet. Cancel the ongoing poll.
if (socket.interest.readable && !readable)
|| (socket.interest.writable && !writable)
{
let our_flags = event_to_afd_mask(socket.interest, socket.interest_error);
if our_flags != flags {
return self.cancel(socket);
}
@ -801,15 +891,8 @@ impl PacketUnwrapped {
SocketStatus::Idle => {
// Start a new poll.
let result = socket.afd.poll(
self.clone(),
socket.base_socket,
event_to_afd_mask(
socket.interest.readable,
socket.interest.writable,
socket.interest_error,
),
);
let mask = event_to_afd_mask(socket.interest, socket.interest_error);
let result = socket.afd.poll(self.clone(), socket.base_socket, mask);
match result {
Ok(()) => {}
@ -830,10 +913,7 @@ impl PacketUnwrapped {
}
// We are now polling for the current events.
socket.status = SocketStatus::Polling {
readable: socket.interest.readable,
writable: socket.interest.writable,
};
socket.status = SocketStatus::Polling { flags: mask };
Ok(())
}
@ -841,6 +921,9 @@ impl PacketUnwrapped {
}
/// This socket state was notified; see if we need to update it.
///
/// This indicates that this packet was indicated as "ready" by the IOCP and needs to be
/// processed.
fn feed_event(self: Pin<Arc<Self>>, poller: &Poller) -> io::Result<FeedEventResult> {
let inner = self.as_ref().data().project_ref();
@ -902,6 +985,7 @@ impl PacketUnwrapped {
// Check in on the AFD data.
let afd_data = &*afd_info.get();
// There was at least one event.
if afd_data.handle_count() >= 1 {
let events = afd_data.events();
@ -917,6 +1001,7 @@ impl PacketUnwrapped {
let (readable, writable) = afd_mask_to_event(events);
event.readable = readable;
event.writable = writable;
event.extra.flags = events;
}
}
}
@ -928,7 +1013,13 @@ impl PacketUnwrapped {
// If this event doesn't have anything that interests us, don't return or
// update the oneshot state.
let return_value = if event.readable || event.writable {
let return_value = if event.readable
|| event.writable
|| event
.extra
.flags
.intersects(socket_state.interest.extra.flags)
{
// If we are in oneshot mode, remove the interest.
if matches!(socket_state.mode, PollMode::Oneshot) {
socket_state.interest = Event::none(socket_state.interest.key);
@ -1028,11 +1119,8 @@ enum SocketStatus {
/// We are currently polling these events.
Polling {
/// We are currently polling for readable events.
readable: bool,
/// We are currently polling for writable events.
writable: bool,
/// The flags we are currently polling for.
flags: AfdPollMask,
},
/// The last poll operation was cancelled, and we're waiting for it to
@ -1161,7 +1249,15 @@ impl WaitHandle {
}
}
fn event_to_afd_mask(readable: bool, writable: bool, error: bool) -> afd::AfdPollMask {
/// Translate an event to the mask expected by AFD.
#[inline]
fn event_to_afd_mask(event: Event, error: bool) -> afd::AfdPollMask {
event_properties_to_afd_mask(event.readable, event.writable, error) | event.extra.flags
}
/// Translate an event to the mask expected by AFD.
#[inline]
fn event_properties_to_afd_mask(readable: bool, writable: bool, error: bool) -> afd::AfdPollMask {
use afd::AfdPollMask as AfdPoll;
let mut mask = AfdPoll::empty();
@ -1182,6 +1278,8 @@ fn event_to_afd_mask(readable: bool, writable: bool, error: bool) -> afd::AfdPol
mask
}
/// Convert the mask reported by AFD to an event.
#[inline]
fn afd_mask_to_event(mask: afd::AfdPollMask) -> (bool, bool) {
use afd::AfdPollMask as AfdPoll;

View File

@ -221,9 +221,9 @@ unsafe impl Send for Events {}
impl Events {
/// Creates an empty list.
pub fn new() -> Events {
pub fn with_capacity(cap: usize) -> Events {
Events {
list: Vec::with_capacity(1024),
list: Vec::with_capacity(cap),
}
}
@ -246,8 +246,55 @@ impl Events {
writable: matches!(ev.filter(), kqueue::EventFilter::Write(..))
|| (matches!(ev.filter(), kqueue::EventFilter::Read(..))
&& (ev.flags().intersects(kqueue::EventFlags::EOF))),
extra: EventExtra,
})
}
/// Clears the list.
pub fn clear(&mut self) {
self.list.clear();
}
/// Get the capacity of the list.
pub fn capacity(&self) -> usize {
self.list.capacity()
}
}
/// Extra information associated with an event.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct EventExtra;
impl EventExtra {
/// Create a new, empty version of this struct.
#[inline]
pub fn empty() -> EventExtra {
EventExtra
}
/// Set the interrupt flag.
#[inline]
pub fn set_hup(&mut self, _value: bool) {
// No-op.
}
/// Set the priority flag.
#[inline]
pub fn set_pri(&mut self, _value: bool) {
// No-op.
}
/// Is the interrupt flag set?
#[inline]
pub fn is_hup(&self) -> bool {
false
}
/// Is the priority flag set?
#[inline]
pub fn is_pri(&self) -> bool {
false
}
}
pub(crate) fn mode_to_flags(mode: PollMode) -> kqueue::EventFlags {

View File

@ -18,7 +18,7 @@
//! # Examples
//!
//! ```no_run
//! use polling::{Event, Poller};
//! use polling::{Event, Events, Poller};
//! use std::net::TcpListener;
//!
//! // Create a TCP listener.
@ -33,13 +33,13 @@
//! }
//!
//! // The event loop.
//! let mut events = Vec::new();
//! let mut events = Events::new();
//! loop {
//! // Wait for at least one I/O event.
//! events.clear();
//! poller.wait(&mut events, None)?;
//!
//! for ev in &events {
//! for ev in events.iter() {
//! if ev.key == key {
//! // Perform a non-blocking accept operation.
//! socket.accept()?;
@ -65,8 +65,11 @@
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
use std::cell::Cell;
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::time::Duration;
@ -131,6 +134,8 @@ pub struct Event {
pub readable: bool,
/// Can it do a write operation without blocking?
pub writable: bool,
/// System-specific event data.
extra: sys::EventExtra,
}
/// The mode in which the poller waits for I/O events.
@ -186,6 +191,7 @@ impl Event {
key,
readable: true,
writable: true,
extra: sys::EventExtra::empty(),
}
}
@ -197,6 +203,7 @@ impl Event {
key,
readable: true,
writable: false,
extra: sys::EventExtra::empty(),
}
}
@ -208,6 +215,7 @@ impl Event {
key,
readable: false,
writable: true,
extra: sys::EventExtra::empty(),
}
}
@ -219,14 +227,140 @@ impl Event {
key,
readable: false,
writable: false,
extra: sys::EventExtra::empty(),
}
}
/// Add interruption events to this interest.
///
/// This usually indicates that the file descriptor or socket has been closed. It corresponds
/// to the `EPOLLHUP` and `POLLHUP` events.
///
/// Interruption events are only supported on the following platforms:
///
/// - `epoll`
/// - `poll`
/// - IOCP
/// - Event Ports
///
/// On other platforms, this function is a no-op.
#[inline]
pub fn set_interrupt(&mut self, active: bool) {
self.extra.set_hup(active);
}
/// Add interruption events to this interest.
///
/// This usually indicates that the file descriptor or socket has been closed. It corresponds
/// to the `EPOLLHUP` and `POLLHUP` events.
///
/// Interruption events are only supported on the following platforms:
///
/// - `epoll`
/// - `poll`
/// - IOCP
/// - Event Ports
///
/// On other platforms, this function is a no-op.
#[inline]
pub fn with_interrupt(mut self) -> Self {
self.set_interrupt(true);
self
}
/// Add priority events to this interest.
///
/// This indicates that there is urgent data to read. It corresponds to the `EPOLLPRI` and
/// `POLLPRI` events.
///
/// Priority events are only supported on the following platforms:
///
/// - `epoll`
/// - `poll`
/// - IOCP
/// - Event Ports
///
/// On other platforms, this function is a no-op.
#[inline]
pub fn set_priority(&mut self, active: bool) {
self.extra.set_pri(active);
}
/// Add priority events to this interest.
///
/// This indicates that there is urgent data to read. It corresponds to the `EPOLLPRI` and
/// `POLLPRI` events.
///
/// Priority events are only supported on the following platforms:
///
/// - `epoll`
/// - `poll`
/// - IOCP
/// - Event Ports
///
/// On other platforms, this function is a no-op.
#[inline]
pub fn with_priority(mut self) -> Self {
self.set_priority(true);
self
}
/// Tell if this event is the result of an interrupt notification.
///
/// This usually indicates that the file descriptor or socket has been closed. It corresponds
/// to the `EPOLLHUP` and `POLLHUP` events.
///
/// Interruption events are only supported on the following platforms:
///
/// - `epoll`
/// - `poll`
/// - IOCP
/// - Event Ports
///
/// On other platforms, this always returns `false`.
#[inline]
pub fn is_interrupt(&self) -> bool {
self.extra.is_hup()
}
/// Tell if this event is the result of a priority notification.
///
/// This indicates that there is urgent data to read. It corresponds to the `EPOLLPRI` and
/// `POLLPRI` events.
///
/// Priority events are only supported on the following platforms:
///
/// - `epoll`
/// - `poll`
/// - IOCP
/// - Event Ports
///
/// On other platforms, this always returns `false`.
#[inline]
pub fn is_priority(&self) -> bool {
self.extra.is_pri()
}
/// Remove any extra information from this event.
#[inline]
pub fn clear_extra(&mut self) {
self.extra = sys::EventExtra::empty();
}
/// Get a version of this event with no extra information.
///
/// This is useful for comparing events with `==`.
#[inline]
pub fn with_no_extra(mut self) -> Self {
self.clear_extra();
self
}
}
/// Waits for I/O events.
pub struct Poller {
poller: sys::Poller,
events: Mutex<sys::Events>,
lock: Mutex<()>,
notified: AtomicBool,
}
@ -244,7 +378,7 @@ impl Poller {
pub fn new() -> io::Result<Poller> {
Ok(Poller {
poller: sys::Poller::new()?,
events: Mutex::new(sys::Events::new()),
lock: Mutex::new(()),
notified: AtomicBool::new(false),
})
}
@ -492,7 +626,7 @@ impl Poller {
/// # Examples
///
/// ```
/// use polling::{Event, Poller};
/// use polling::{Event, Events, Poller};
/// use std::net::TcpListener;
/// use std::time::Duration;
///
@ -505,26 +639,24 @@ impl Poller {
/// poller.add(&socket, Event::all(key))?;
/// }
///
/// let mut events = Vec::new();
/// let mut events = Events::new();
/// let n = poller.wait(&mut events, Some(Duration::from_secs(1)))?;
/// poller.delete(&socket)?;
/// # std::io::Result::Ok(())
/// ```
pub fn wait(&self, events: &mut Vec<Event>, timeout: Option<Duration>) -> io::Result<usize> {
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let span = tracing::trace_span!("Poller::wait", ?timeout);
let _enter = span.enter();
if let Ok(mut lock) = self.events.try_lock() {
if let Ok(_lock) = self.lock.try_lock() {
// Wait for I/O events.
self.poller.wait(&mut lock, timeout)?;
self.poller.wait(&mut events.events, timeout)?;
// Clear the notification, if any.
self.notified.swap(false, Ordering::SeqCst);
// Collect events.
let len = events.len();
events.extend(lock.iter().filter(|ev| ev.key != usize::MAX));
Ok(events.len() - len)
// Indicate number of events.
Ok(events.len())
} else {
tracing::trace!("wait: skipping because another thread is already waiting on I/O");
Ok(0)
@ -541,14 +673,14 @@ impl Poller {
/// # Examples
///
/// ```
/// use polling::Poller;
/// use polling::{Events, Poller};
///
/// let poller = Poller::new()?;
///
/// // Notify the poller.
/// poller.notify()?;
///
/// let mut events = Vec::new();
/// let mut events = Events::new();
/// poller.wait(&mut events, None)?; // wakes up immediately
/// assert!(events.is_empty());
/// # std::io::Result::Ok(())
@ -568,6 +700,165 @@ impl Poller {
}
}
/// A container for I/O events.
pub struct Events {
events: sys::Events,
/// This is intended to be used from &mut, thread locally, so we should make it !Sync
/// for consistency with the rest of the API.
_not_sync: PhantomData<Cell<()>>,
}
impl Default for Events {
#[inline]
fn default() -> Self {
Self::new()
}
}
impl Events {
/// Create a new container for events, using the default capacity.
///
/// The default capacity is 1024.
///
/// # Examples
///
/// ```
/// use polling::Events;
///
/// let events = Events::new();
/// ```
#[inline]
pub fn new() -> Self {
// ESP-IDF has a low amount of RAM, so we use a smaller default capacity.
#[cfg(target_os = "espidf")]
const DEFAULT_CAPACITY: usize = 32;
#[cfg(not(target_os = "espidf"))]
const DEFAULT_CAPACITY: usize = 1024;
Self::with_capacity(NonZeroUsize::new(DEFAULT_CAPACITY).unwrap())
}
/// Create a new container with the provided capacity.
///
/// # Examples
///
/// ```
/// use polling::Events;
/// use std::num::NonZeroUsize;
///
/// let capacity = NonZeroUsize::new(1024).unwrap();
/// let events = Events::with_capacity(capacity);
/// ```
#[inline]
pub fn with_capacity(capacity: NonZeroUsize) -> Self {
Self {
events: sys::Events::with_capacity(capacity.get()),
_not_sync: PhantomData,
}
}
/// Create a new iterator over I/O events.
///
/// This returns all of the events in the container, excluding the notification event.
///
/// # Examples
///
/// ```
/// use polling::{Event, Events, Poller};
/// use std::time::Duration;
///
/// # fn main() -> std::io::Result<()> {
/// let poller = Poller::new()?;
/// let mut events = Events::new();
///
/// poller.wait(&mut events, Some(Duration::from_secs(0)))?;
/// assert!(events.iter().next().is_none());
/// # Ok(()) }
/// ```
#[inline]
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.events.iter().filter(|ev| ev.key != NOTIFY_KEY)
}
/// Delete all of the events in the container.
///
/// # Examples
///
/// ```no_run
/// use polling::{Event, Events, Poller};
///
/// # fn main() -> std::io::Result<()> {
/// let poller = Poller::new()?;
/// let mut events = Events::new();
///
/// /* register some sources */
///
/// poller.wait(&mut events, None)?;
///
/// events.clear();
/// # Ok(()) }
/// ```
#[inline]
pub fn clear(&mut self) {
self.events.clear();
}
/// Returns the number of events in the container.
///
/// # Examples
///
/// ```
/// use polling::Events;
///
/// let events = Events::new();
/// assert_eq!(events.len(), 0);
/// ```
#[inline]
pub fn len(&self) -> usize {
self.iter().count()
}
/// Returns `true` if the container contains no events.
///
/// # Examples
///
/// ```
/// use polling::Events;
///
/// let events = Events::new();
/// assert!(events.is_empty());
/// ```
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Get the total capacity of the list.
///
/// # Examples
///
/// ```
/// use polling::Events;
/// use std::num::NonZeroUsize;
///
/// let cap = NonZeroUsize::new(10).unwrap();
/// let events = Events::with_capacity(std::num::NonZeroUsize::new(10).unwrap());
/// assert_eq!(events.capacity(), cap);
/// ```
#[inline]
pub fn capacity(&self) -> NonZeroUsize {
NonZeroUsize::new(self.events.capacity()).unwrap()
}
}
impl fmt::Debug for Events {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Events { .. }")
}
}
#[cfg(all(
any(
target_os = "linux",
@ -712,3 +1003,17 @@ cfg_if! {
fn unsupported_error(err: impl Into<String>) -> io::Error {
io::Error::new(io::ErrorKind::Unsupported, err.into())
}
fn _assert_send_and_sync() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<Poller>();
assert_sync::<Poller>();
assert_send::<Event>();
assert_sync::<Event>();
assert_send::<Events>();
// Events can be !Sync
}

View File

@ -19,7 +19,7 @@ pub trait PollerIocpExt: PollerSealed {
/// # Examples
///
/// ```rust
/// use polling::{Poller, Event};
/// use polling::{Poller, Event, Events};
/// use polling::os::iocp::{CompletionPacket, PollerIocpExt};
///
/// use std::thread;
@ -39,7 +39,7 @@ pub trait PollerIocpExt: PollerSealed {
/// });
///
/// // Wait for the event.
/// let mut events = vec![];
/// let mut events = Events::new();
/// poller.wait(&mut events, None)?;
///
/// assert_eq!(events.len(), 1);
@ -72,7 +72,7 @@ pub trait PollerIocpExt: PollerSealed {
/// # Examples
///
/// ```no_run
/// use polling::{Poller, Event, PollMode};
/// use polling::{Poller, Event, Events, PollMode};
/// use polling::os::iocp::PollerIocpExt;
///
/// use std::process::Command;
@ -92,11 +92,11 @@ pub trait PollerIocpExt: PollerSealed {
/// }
///
/// // Wait for the child process to exit.
/// let mut events = vec![];
/// let mut events = Events::new();
/// poller.wait(&mut events, None).unwrap();
///
/// assert_eq!(events.len(), 1);
/// assert_eq!(events[0], Event::all(0));
/// assert_eq!(events.iter().next().unwrap(), Event::all(0));
/// ```
unsafe fn add_waitable(
&self,
@ -115,7 +115,7 @@ pub trait PollerIocpExt: PollerSealed {
/// # Examples
///
/// ```no_run
/// use polling::{Poller, Event, PollMode};
/// use polling::{Poller, Event, Events, PollMode};
/// use polling::os::iocp::PollerIocpExt;
///
/// use std::process::Command;
@ -135,11 +135,11 @@ pub trait PollerIocpExt: PollerSealed {
/// }
///
/// // Wait for the child process to exit.
/// let mut events = vec![];
/// let mut events = Events::new();
/// poller.wait(&mut events, None).unwrap();
///
/// assert_eq!(events.len(), 1);
/// assert_eq!(events[0], Event::all(0));
/// assert_eq!(events.iter().next().unwrap(), Event::all(0));
///
/// // Modify the waitable handle.
/// poller.modify_waitable(&child, Event::readable(0), PollMode::Oneshot).unwrap();
@ -161,7 +161,7 @@ pub trait PollerIocpExt: PollerSealed {
/// # Examples
///
/// ```no_run
/// use polling::{Poller, Event, PollMode};
/// use polling::{Poller, Event, Events, PollMode};
/// use polling::os::iocp::PollerIocpExt;
///
/// use std::process::Command;
@ -181,11 +181,11 @@ pub trait PollerIocpExt: PollerSealed {
/// }
///
/// // Wait for the child process to exit.
/// let mut events = vec![];
/// let mut events = Events::new();
/// poller.wait(&mut events, None).unwrap();
///
/// assert_eq!(events.len(), 1);
/// assert_eq!(events[0], Event::all(0));
/// assert_eq!(events.iter().next().unwrap(), Event::all(0));
///
/// // Remove the waitable handle.
/// poller.remove_waitable(&child).unwrap();

View File

@ -31,7 +31,7 @@ pub trait PollerKqueueExt<F: Filter>: PollerSealed {
/// # Examples
///
/// ```no_run
/// use polling::{Poller, PollMode};
/// use polling::{Events, Poller, PollMode};
/// use polling::os::kqueue::{Filter, PollerKqueueExt, Signal};
///
/// let poller = Poller::new().unwrap();
@ -40,7 +40,7 @@ pub trait PollerKqueueExt<F: Filter>: PollerSealed {
/// poller.add_filter(Signal(libc::SIGINT), 0, PollMode::Oneshot).unwrap();
///
/// // Wait for the signal.
/// let mut events = vec![];
/// let mut events = Events::new();
/// poller.wait(&mut events, None).unwrap();
/// # let _ = events;
/// ```
@ -54,7 +54,7 @@ pub trait PollerKqueueExt<F: Filter>: PollerSealed {
/// # Examples
///
/// ```no_run
/// use polling::{Poller, PollMode};
/// use polling::{Events, Poller, PollMode};
/// use polling::os::kqueue::{Filter, PollerKqueueExt, Signal};
///
/// let poller = Poller::new().unwrap();
@ -66,7 +66,7 @@ pub trait PollerKqueueExt<F: Filter>: PollerSealed {
/// poller.modify_filter(Signal(libc::SIGINT), 1, PollMode::Oneshot).unwrap();
///
/// // Wait for the signal.
/// let mut events = vec![];
/// let mut events = Events::new();
/// poller.wait(&mut events, None).unwrap();
/// # let _ = events;
/// ```
@ -183,7 +183,12 @@ pub enum ProcessOps {
impl<'a> Process<'a> {
/// Monitor a child process.
pub fn new(child: &'a Child, ops: ProcessOps) -> Self {
///
/// # Safety
///
/// Once registered into the `Poller`, the `Child` object must outlive this filter's
/// registration into the poller.
pub unsafe fn new(child: &'a Child, ops: ProcessOps) -> Self {
Self { child, ops }
}
}

View File

@ -268,10 +268,12 @@ impl Poller {
let poll_fd = &mut fds.poll_fds[fd_data.poll_fds_index];
if !poll_fd.revents().is_empty() {
// Store event
let revents = poll_fd.revents();
events.inner.push(Event {
key: fd_data.key,
readable: poll_fd.revents().intersects(read_events()),
writable: poll_fd.revents().intersects(write_events()),
readable: revents.intersects(read_events()),
writable: revents.intersects(write_events()),
extra: EventExtra { flags: revents },
});
// Remove interest if necessary
if fd_data.remove {
@ -364,14 +366,67 @@ pub struct Events {
impl Events {
/// Creates an empty list.
pub fn new() -> Events {
Self { inner: Vec::new() }
pub fn with_capacity(cap: usize) -> Events {
Self {
inner: Vec::with_capacity(cap),
}
}
/// Iterates over I/O events.
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.inner.iter().copied()
}
/// Clear the list.
pub fn clear(&mut self) {
self.inner.clear();
}
/// Get the capacity of the list.
pub fn capacity(&self) -> usize {
self.inner.capacity()
}
}
/// Extra information associated with an event.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct EventExtra {
/// Flags associated with this event.
flags: PollFlags,
}
impl EventExtra {
/// Creates an empty set of extra information.
#[inline]
pub fn empty() -> Self {
Self {
flags: PollFlags::empty(),
}
}
/// Set the interrupt flag.
#[inline]
pub fn set_hup(&mut self, value: bool) {
self.flags.set(PollFlags::HUP, value);
}
/// Set the priority flag.
#[inline]
pub fn set_pri(&mut self, value: bool) {
self.flags.set(PollFlags::PRI, value);
}
/// Is this an interrupt event?
#[inline]
pub fn is_hup(&self) -> bool {
self.flags.contains(PollFlags::HUP)
}
/// Is this a priority event?
#[inline]
pub fn is_pri(&self) -> bool {
self.flags.contains(PollFlags::PRI)
}
}
fn cvt_mode_as_remove(mode: PollMode) -> io::Result<bool> {

View File

@ -181,18 +181,73 @@ unsafe impl Send for Events {}
impl Events {
/// Creates an empty list.
pub fn new() -> Events {
pub fn with_capacity(cap: usize) -> Events {
Events {
list: Vec::with_capacity(1024),
list: Vec::with_capacity(cap),
}
}
/// Iterates over I/O events.
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list.iter().map(|ev| Event {
key: ev.userdata() as usize,
readable: PollFlags::from_bits_truncate(ev.events() as _).intersects(read_flags()),
writable: PollFlags::from_bits_truncate(ev.events() as _).intersects(write_flags()),
self.list.iter().map(|ev| {
let flags = PollFlags::from_bits_truncate(ev.events() as _);
Event {
key: ev.userdata() as usize,
readable: flags.intersects(read_flags()),
writable: flags.intersects(write_flags()),
extra: EventExtra { flags },
}
})
}
/// Clear the list.
pub fn clear(&mut self) {
self.list.clear();
}
/// Get the capacity of the list.
pub fn capacity(&self) -> usize {
self.list.capacity()
}
}
/// Extra information associated with an event.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct EventExtra {
/// Flags associated with this event.
flags: PollFlags,
}
impl EventExtra {
/// Create a new, empty version of this struct.
#[inline]
pub fn empty() -> EventExtra {
EventExtra {
flags: PollFlags::empty(),
}
}
/// Set the interrupt flag.
#[inline]
pub fn set_hup(&mut self, value: bool) {
self.flags.set(PollFlags::HUP, value);
}
/// Set the priority flag.
#[inline]
pub fn set_pri(&mut self, value: bool) {
self.flags.set(PollFlags::PRI, value);
}
/// Is this an interrupt event?
#[inline]
pub fn is_hup(&self) -> bool {
self.flags.contains(PollFlags::HUP)
}
/// Is this a priority event?
#[inline]
pub fn is_pri(&self) -> bool {
self.flags.contains(PollFlags::PRI)
}
}

View File

@ -4,14 +4,14 @@ use std::thread;
use std::time::Duration;
use easy_parallel::Parallel;
use polling::{Event, Poller};
use polling::{Event, Events, Poller};
#[test]
fn concurrent_add() -> io::Result<()> {
let (reader, mut writer) = tcp_pair()?;
let poller = Poller::new()?;
let mut events = Vec::new();
let mut events = Events::new();
let result = Parallel::new()
.add(|| {
@ -33,7 +33,11 @@ fn concurrent_add() -> io::Result<()> {
poller.delete(&reader)?;
result?;
assert_eq!(events, [Event::readable(0)]);
assert_eq!(events.len(), 1);
assert_eq!(
events.iter().next().unwrap().with_no_extra(),
Event::readable(0)
);
Ok(())
}
@ -46,7 +50,7 @@ fn concurrent_modify() -> io::Result<()> {
poller.add(&reader, Event::none(0))?;
}
let mut events = Vec::new();
let mut events = Events::new();
Parallel::new()
.add(|| {
@ -63,7 +67,11 @@ fn concurrent_modify() -> io::Result<()> {
.into_iter()
.collect::<io::Result<()>>()?;
assert_eq!(events, [Event::readable(0)]);
assert_eq!(events.len(), 1);
assert_eq!(
events.iter().next().unwrap().with_no_extra(),
Event::readable(0)
);
Ok(())
}

View File

@ -1,4 +1,4 @@
use polling::{Event, Poller};
use polling::{Event, Events, Poller};
use std::io::{self, Write};
use std::net::{TcpListener, TcpStream};
use std::time::Duration;
@ -12,7 +12,7 @@ fn basic_io() {
}
// Nothing should be available at first.
let mut events = vec![];
let mut events = Events::new();
assert_eq!(
poller
.wait(&mut events, Some(Duration::from_secs(0)))
@ -29,8 +29,12 @@ fn basic_io() {
.unwrap(),
1
);
assert_eq!(&*events, &[Event::readable(1)]);
assert_eq!(events.len(), 1);
assert_eq!(
events.iter().next().unwrap().with_no_extra(),
Event::readable(1)
);
poller.delete(&read).unwrap();
}

View File

@ -7,6 +7,8 @@ use std::io::{self, prelude::*};
use std::net::{TcpListener, TcpStream};
use std::time::Duration;
use polling::Events;
#[test]
fn many_connections() {
// Create 100 connections.
@ -25,7 +27,7 @@ fn many_connections() {
}
}
let mut events = vec![];
let mut events = Events::new();
while !connections.is_empty() {
// Choose a random connection to write to.
let i = fastrand::usize(..connections.len());
@ -40,10 +42,12 @@ fn many_connections() {
.unwrap();
// Check that the connection is readable.
assert_eq!(events.len(), 1, "events: {:?}", events);
assert_eq!(events[0].key, id);
assert!(events[0].readable);
assert!(!events[0].writable);
let current_events = events.iter().collect::<Vec<_>>();
assert_eq!(current_events.len(), 1, "events: {:?}", current_events);
assert_eq!(
current_events[0].with_no_extra(),
polling::Event::readable(id)
);
// Read the byte from the connection.
let mut buf = [0];

View File

@ -3,16 +3,18 @@ use std::thread;
use std::time::Duration;
use easy_parallel::Parallel;
use polling::Events;
use polling::Poller;
#[test]
fn simple() -> io::Result<()> {
let poller = Poller::new()?;
let mut events = Vec::new();
let mut events = Events::new();
for _ in 0..10 {
poller.notify()?;
poller.wait(&mut events, None)?;
assert!(events.is_empty());
}
Ok(())
@ -21,7 +23,7 @@ fn simple() -> io::Result<()> {
#[test]
fn concurrent() -> io::Result<()> {
let poller = Poller::new()?;
let mut events = Vec::new();
let mut events = Events::new();
for _ in 0..2 {
Parallel::new()

View File

@ -6,7 +6,7 @@ use std::io::{self, prelude::*};
use std::net::{TcpListener, TcpStream};
use std::time::Duration;
use polling::{Event, PollMode, Poller};
use polling::{Event, Events, PollMode, Poller};
#[test]
fn level_triggered() {
@ -34,12 +34,16 @@ fn level_triggered() {
writer.write_all(&data).unwrap();
// A "readable" notification should be delivered.
let mut events = Vec::new();
let mut events = Events::new();
poller
.wait(&mut events, Some(Duration::from_secs(10)))
.unwrap();
assert_eq!(events, [Event::readable(reader_token)]);
assert_eq!(events.len(), 1);
assert_eq!(
events.iter().next().unwrap().with_no_extra(),
Event::readable(reader_token)
);
// If we read some of the data, the notification should still be available.
reader.read_exact(&mut [0; 3]).unwrap();
@ -47,7 +51,12 @@ fn level_triggered() {
poller
.wait(&mut events, Some(Duration::from_secs(10)))
.unwrap();
assert_eq!(events, [Event::readable(reader_token)]);
assert_eq!(events.len(), 1);
assert_eq!(
events.iter().next().unwrap().with_no_extra(),
Event::readable(reader_token)
);
// If we read the rest of the data, the notification should be gone.
reader.read_exact(&mut [0; 2]).unwrap();
@ -56,7 +65,7 @@ fn level_triggered() {
.wait(&mut events, Some(Duration::from_secs(0)))
.unwrap();
assert_eq!(events, []);
assert!(events.is_empty());
// After modifying the stream and sending more data, it should be oneshot.
poller
@ -71,7 +80,11 @@ fn level_triggered() {
.wait(&mut events, Some(Duration::from_secs(10)))
.unwrap();
assert_eq!(events, [Event::readable(reader_token)]);
assert_eq!(events.len(), 1);
assert_eq!(
events.iter().next().unwrap().with_no_extra(),
Event::readable(reader_token)
);
// After reading, the notification should vanish.
reader.read(&mut [0; 5]).unwrap();
@ -80,7 +93,7 @@ fn level_triggered() {
.wait(&mut events, Some(Duration::from_secs(0)))
.unwrap();
assert_eq!(events, []);
assert!(events.is_empty());
}
#[test]
@ -123,12 +136,16 @@ fn edge_triggered() {
writer.write_all(&data).unwrap();
// A "readable" notification should be delivered.
let mut events = Vec::new();
let mut events = Events::new();
poller
.wait(&mut events, Some(Duration::from_secs(10)))
.unwrap();
assert_eq!(events, [Event::readable(reader_token)]);
assert_eq!(events.len(), 1);
assert_eq!(
events.iter().next().unwrap().with_no_extra(),
Event::readable(reader_token)
);
// If we read some of the data, the notification should not still be available.
reader.read_exact(&mut [0; 3]).unwrap();
@ -136,7 +153,7 @@ fn edge_triggered() {
poller
.wait(&mut events, Some(Duration::from_secs(0)))
.unwrap();
assert_eq!(events, []);
assert!(events.is_empty());
// If we write more data, a notification should be delivered.
writer.write_all(&data).unwrap();
@ -144,7 +161,12 @@ fn edge_triggered() {
poller
.wait(&mut events, Some(Duration::from_secs(10)))
.unwrap();
assert_eq!(events, [Event::readable(reader_token)]);
assert_eq!(events.len(), 1);
assert_eq!(
events.iter().next().unwrap().with_no_extra(),
Event::readable(reader_token)
);
// After modifying the stream and sending more data, it should be oneshot.
poller
@ -157,7 +179,11 @@ fn edge_triggered() {
.wait(&mut events, Some(Duration::from_secs(10)))
.unwrap();
assert_eq!(events, [Event::readable(reader_token)]);
assert_eq!(events.len(), 1);
assert_eq!(
events.iter().next().unwrap().with_no_extra(),
Event::readable(reader_token)
);
}
#[test]
@ -206,12 +232,16 @@ fn edge_oneshot_triggered() {
writer.write_all(&data).unwrap();
// A "readable" notification should be delivered.
let mut events = Vec::new();
let mut events = Events::new();
poller
.wait(&mut events, Some(Duration::from_secs(10)))
.unwrap();
assert_eq!(events, [Event::readable(reader_token)]);
assert_eq!(events.len(), 1);
assert_eq!(
events.iter().next().unwrap().with_no_extra(),
Event::readable(reader_token)
);
// If we read some of the data, the notification should not still be available.
reader.read_exact(&mut [0; 3]).unwrap();
@ -219,7 +249,7 @@ fn edge_oneshot_triggered() {
poller
.wait(&mut events, Some(Duration::from_secs(0)))
.unwrap();
assert_eq!(events, []);
assert!(events.is_empty());
// If we modify to re-enable the notification, it should be delivered.
poller
@ -233,7 +263,12 @@ fn edge_oneshot_triggered() {
poller
.wait(&mut events, Some(Duration::from_secs(0)))
.unwrap();
assert_eq!(events, [Event::readable(reader_token)]);
assert_eq!(events.len(), 1);
assert_eq!(
events.iter().next().unwrap().with_no_extra(),
Event::readable(reader_token)
);
}
fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> {

View File

@ -1,12 +1,12 @@
use std::io;
use std::time::{Duration, Instant};
use polling::Poller;
use polling::{Events, Poller};
#[test]
fn below_ms() -> io::Result<()> {
let poller = Poller::new()?;
let mut events = Vec::new();
let mut events = Events::new();
let dur = Duration::from_micros(100);
let margin = Duration::from_micros(500);
@ -42,7 +42,7 @@ fn below_ms() -> io::Result<()> {
#[test]
fn above_ms() -> io::Result<()> {
let poller = Poller::new()?;
let mut events = Vec::new();
let mut events = Events::new();
let dur = Duration::from_micros(3_100);
let margin = Duration::from_micros(500);

View File

@ -1,12 +1,12 @@
use std::io;
use std::time::{Duration, Instant};
use polling::Poller;
use polling::{Events, Poller};
#[test]
fn twice() -> io::Result<()> {
let poller = Poller::new()?;
let mut events = Vec::new();
let mut events = Events::new();
for _ in 0..2 {
let start = Instant::now();
@ -22,7 +22,7 @@ fn twice() -> io::Result<()> {
#[test]
fn non_blocking() -> io::Result<()> {
let poller = Poller::new()?;
let mut events = Vec::new();
let mut events = Events::new();
for _ in 0..100 {
poller.wait(&mut events, Some(Duration::from_secs(0)))?;

View File

@ -3,7 +3,7 @@
#![cfg(windows)]
use polling::os::iocp::{CompletionPacket, PollerIocpExt};
use polling::{Event, Poller};
use polling::{Event, Events, Poller};
use std::sync::Arc;
use std::thread;
@ -12,7 +12,7 @@ use std::time::Duration;
#[test]
fn post_smoke() {
let poller = Poller::new().unwrap();
let mut events = Vec::new();
let mut events = Events::new();
poller
.post(CompletionPacket::new(Event::readable(1)))
@ -20,13 +20,16 @@ fn post_smoke() {
poller.wait(&mut events, None).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0], Event::readable(1));
assert_eq!(
events.iter().next().unwrap().with_no_extra(),
Event::readable(1)
);
}
#[test]
fn post_multithread() {
let poller = Arc::new(Poller::new().unwrap());
let mut events = Vec::new();
let mut events = Events::new();
thread::spawn({
let poller = Arc::clone(&poller);
@ -47,7 +50,11 @@ fn post_multithread() {
.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events.pop(), Some(Event::writable(i)));
assert_eq!(
events.iter().next().unwrap().with_no_extra(),
Event::writable(i)
);
events.clear();
}
poller

View File

@ -3,7 +3,7 @@
#![cfg(windows)]
use polling::os::iocp::PollerIocpExt;
use polling::{Event, PollMode, Poller};
use polling::{Event, Events, PollMode, Poller};
use windows_sys::Win32::Foundation::CloseHandle;
use windows_sys::Win32::System::Threading::{CreateEventW, ResetEvent, SetEvent};
@ -85,7 +85,7 @@ fn smoke() {
.unwrap();
}
let mut events = vec![];
let mut events = Events::new();
poller
.wait(&mut events, Some(Duration::from_millis(100)))
.unwrap();
@ -100,7 +100,7 @@ fn smoke() {
.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0], Event::all(0));
assert_eq!(events.iter().next().unwrap().with_no_extra(), Event::all(0));
// Interest should be cleared.
events.clear();
@ -121,7 +121,7 @@ fn smoke() {
.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0], Event::all(0));
assert_eq!(events.iter().next().unwrap().with_no_extra(), Event::all(0));
// If we reset the event, it should not be signaled.
event.reset().unwrap();