From a521cd2c292c4cb4befe40491c15d267b34990fe Mon Sep 17 00:00:00 2001 From: John Nunley Date: Mon, 14 Aug 2023 10:03:20 -0700 Subject: [PATCH] breaking: Extract the Events struct and make the Event struct opaque * Add better documentation for the IOCP module * Extract Events from Poller This prevents the need to have an intermediate buffer to read events from, reducing the need for an allocation and a copy. This is a breaking change. * Add event extra information Foundation for more details later on. * Add PRI and HUP events * Fix various failing tests - Make sure that waitable handles interact properly with the new infrastructure - Fix failing doctests * Review comments - Make set_* take a boolean for the value of the flag - Make Events !Sync - Fix visibility modifiers - Inline more methods - Use a better strategy for testing * Move completion packets into the Events buffer This removes one of the mutexes that we have to lock. * Review comments Signed-off-by: John Nunley --- examples/two-listeners.rs | 6 +- examples/wait-signal.rs | 8 +- src/epoll.rs | 77 +++++-- src/iocp/afd.rs | 54 ++++- src/iocp/mod.rs | 186 +++++++++++++---- src/kqueue.rs | 51 ++++- src/lib.rs | 337 +++++++++++++++++++++++++++++-- src/os/iocp.rs | 22 +- src/os/kqueue.rs | 15 +- src/poll.rs | 63 +++++- src/port.rs | 67 +++++- tests/concurrent_modification.rs | 18 +- tests/io.rs | 10 +- tests/many_connections.rs | 14 +- tests/notify.rs | 6 +- tests/other_modes.rs | 67 ++++-- tests/precision.rs | 6 +- tests/timeout.rs | 6 +- tests/windows_post.rs | 17 +- tests/windows_waitable.rs | 8 +- 20 files changed, 878 insertions(+), 160 deletions(-) diff --git a/examples/two-listeners.rs b/examples/two-listeners.rs index 02b2339..bf54eee 100644 --- a/examples/two-listeners.rs +++ b/examples/two-listeners.rs @@ -1,7 +1,7 @@ use std::io; use std::net::TcpListener; -use polling::{Event, Poller}; +use polling::{Event, Events, Poller}; fn main() -> io::Result<()> { let l1 = TcpListener::bind("127.0.0.1:8001")?; @@ -19,12 +19,12 @@ fn main() -> io::Result<()> { println!(" $ nc 127.0.0.1 8001"); println!(" $ nc 127.0.0.1 8002"); - let mut events = Vec::new(); + let mut events = Events::new(); loop { events.clear(); poller.wait(&mut events, None)?; - for ev in &events { + for ev in events.iter() { match ev.key { 1 => { println!("Accept on l1"); diff --git a/examples/wait-signal.rs b/examples/wait-signal.rs index 02a4c5d..4a645af 100644 --- a/examples/wait-signal.rs +++ b/examples/wait-signal.rs @@ -13,7 +13,7 @@ ))] mod example { use polling::os::kqueue::{PollerKqueueExt, Signal}; - use polling::{PollMode, Poller}; + use polling::{Events, PollMode, Poller}; pub(super) fn main2() { // Create a poller. @@ -23,7 +23,7 @@ mod example { let sigint = Signal(libc::SIGINT); poller.add_filter(sigint, 1, PollMode::Oneshot).unwrap(); - let mut events = vec![]; + let mut events = Events::new(); println!("Press Ctrl+C to exit..."); @@ -32,7 +32,7 @@ mod example { poller.wait(&mut events, None).unwrap(); // Process events. - for ev in events.drain(..) { + for ev in events.iter() { match ev.key { 1 => { println!("SIGINT received"); @@ -41,6 +41,8 @@ mod example { _ => unreachable!(), } } + + events.clear(); } } } diff --git a/src/epoll.rs b/src/epoll.rs index 22e994d..6d39f9f 100644 --- a/src/epoll.rs +++ b/src/epoll.rs @@ -59,11 +59,7 @@ impl Poller { poller.add( poller.event_fd.as_raw_fd(), - Event { - key: crate::NOTIFY_KEY, - readable: true, - writable: false, - }, + Event::readable(crate::NOTIFY_KEY), PollMode::Oneshot, )?; } @@ -106,7 +102,7 @@ impl Poller { &self.epoll_fd, unsafe { rustix::fd::BorrowedFd::borrow_raw(fd) }, epoll::EventData::new_u64(ev.key as u64), - epoll_flags(&ev, mode), + epoll_flags(&ev, mode) | ev.extra.flags, )?; Ok(()) @@ -126,7 +122,7 @@ impl Poller { &self.epoll_fd, fd, epoll::EventData::new_u64(ev.key as u64), - epoll_flags(&ev, mode), + epoll_flags(&ev, mode) | ev.extra.flags, )?; Ok(()) @@ -177,11 +173,7 @@ impl Poller { // Set interest in timerfd. self.modify( timer_fd.as_fd(), - Event { - key: crate::NOTIFY_KEY, - readable: true, - writable: false, - }, + Event::readable(crate::NOTIFY_KEY), PollMode::Oneshot, )?; } @@ -213,11 +205,7 @@ impl Poller { let _ = read(&self.event_fd, &mut buf); self.modify( self.event_fd.as_fd(), - Event { - key: crate::NOTIFY_KEY, - readable: true, - writable: false, - }, + Event::readable(crate::NOTIFY_KEY), PollMode::Oneshot, )?; Ok(()) @@ -308,9 +296,9 @@ unsafe impl Send for Events {} impl Events { /// Creates an empty list. - pub fn new() -> Events { + pub fn with_capacity(cap: usize) -> Events { Events { - list: epoll::EventVec::with_capacity(1024), + list: epoll::EventVec::with_capacity(cap), } } @@ -322,7 +310,58 @@ impl Events { key: ev.data.u64() as usize, readable: flags.intersects(read_flags()), writable: flags.intersects(write_flags()), + extra: EventExtra { flags }, } }) } + + /// Clear the list. + pub fn clear(&mut self) { + self.list.clear(); + } + + /// Get the capacity of the list. + pub fn capacity(&self) -> usize { + self.list.capacity() + } +} + +/// Extra information about this event. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct EventExtra { + flags: epoll::EventFlags, +} + +impl EventExtra { + /// Create an empty version of the data. + #[inline] + pub fn empty() -> EventExtra { + EventExtra { + flags: epoll::EventFlags::empty(), + } + } + + /// Add the interrupt flag to this event. + #[inline] + pub fn set_hup(&mut self, active: bool) { + self.flags.set(epoll::EventFlags::HUP, active); + } + + /// Add the priority flag to this event. + #[inline] + pub fn set_pri(&mut self, active: bool) { + self.flags.set(epoll::EventFlags::PRI, active); + } + + /// Tell if the interrupt flag is set. + #[inline] + pub fn is_hup(&self) -> bool { + self.flags.contains(epoll::EventFlags::HUP) + } + + /// Tell if the priority flag is set. + #[inline] + pub fn is_pri(&self) -> bool { + self.flags.contains(epoll::EventFlags::PRI) + } } diff --git a/src/iocp/afd.rs b/src/iocp/afd.rs index 4fb1bf8..faf3bab 100644 --- a/src/iocp/afd.rs +++ b/src/iocp/afd.rs @@ -66,7 +66,7 @@ impl AfdPollInfo { } } -#[derive(Default, Copy, Clone)] +#[derive(Default, Copy, Clone, PartialEq, Eq)] #[repr(transparent)] pub(super) struct AfdPollMask(u32); @@ -89,6 +89,44 @@ impl AfdPollMask { pub(crate) fn intersects(self, other: AfdPollMask) -> bool { (self.0 & other.0) != 0 } + + /// Sets a flag. + pub(crate) fn set(&mut self, other: AfdPollMask, value: bool) { + if value { + *self |= other; + } else { + self.0 &= !other.0; + } + } +} + +impl fmt::Debug for AfdPollMask { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + const FLAGS: &[(&str, AfdPollMask)] = &[ + ("RECEIVE", AfdPollMask::RECEIVE), + ("RECEIVE_EXPEDITED", AfdPollMask::RECEIVE_EXPEDITED), + ("SEND", AfdPollMask::SEND), + ("DISCONNECT", AfdPollMask::DISCONNECT), + ("ABORT", AfdPollMask::ABORT), + ("LOCAL_CLOSE", AfdPollMask::LOCAL_CLOSE), + ("ACCEPT", AfdPollMask::ACCEPT), + ("CONNECT_FAIL", AfdPollMask::CONNECT_FAIL), + ]; + + let mut first = true; + for (name, value) in FLAGS { + if self.intersects(*value) { + if !first { + write!(f, " | ")?; + } + + first = false; + write!(f, "{}", name)?; + } + } + + Ok(()) + } } impl ops::BitOr for AfdPollMask { @@ -105,6 +143,20 @@ impl ops::BitOrAssign for AfdPollMask { } } +impl ops::BitAnd for AfdPollMask { + type Output = Self; + + fn bitand(self, rhs: Self) -> Self { + AfdPollMask(self.0 & rhs.0) + } +} + +impl ops::BitAndAssign for AfdPollMask { + fn bitand_assign(&mut self, rhs: Self) { + self.0 &= rhs.0; + } +} + pub(super) trait HasAfdInfo { fn afd_info(self: Pin<&Self>) -> Pin<&UnsafeCell>; } diff --git a/src/iocp/mod.rs b/src/iocp/mod.rs index 8d725a7..1597d43 100644 --- a/src/iocp/mod.rs +++ b/src/iocp/mod.rs @@ -75,26 +75,42 @@ pub(super) struct Poller { /// List of currently active AFD instances. /// - /// Weak references are kept here so that the AFD handle is automatically dropped - /// when the last associated socket is dropped. + /// AFD acts as the actual source of the socket events. It's essentially running `WSAPoll` on + /// the sockets and then posting the events to the IOCP. + /// + /// AFD instances can be keyed to an unlimited number of sockets. However, each AFD instance + /// polls their sockets linearly. Therefore, it is best to limit the number of sockets each AFD + /// instance is responsible for. The limit of 32 is chosen because that's what `wepoll` uses. + /// + /// Weak references are kept here so that the AFD handle is automatically dropped when the last + /// associated socket is dropped. afd: Mutex>>>, /// The state of the sources registered with this poller. + /// + /// Each source is keyed by its raw socket ID. sources: RwLock>, /// The state of the waitable handles registered with this poller. waitables: RwLock>, /// Sockets with pending updates. + /// + /// This list contains packets with sockets that need to have their AFD state adjusted by + /// calling the `update()` function on them. It's best to queue up packets as they need to + /// be updated and then run all of the updates before we start waiting on the IOCP, rather than + /// updating them as we come. If we're waiting on the IOCP updates should be run immediately. pending_updates: ConcurrentQueue, /// Are we currently polling? + /// + /// This indicates whether or not we are blocking on the IOCP, and is used to determine + /// whether pending updates should be run immediately or queued. polling: AtomicBool, - /// A list of completion packets. - packets: Mutex>>, - /// The packet used to notify the poller. + /// + /// This is a special-case packet that is used to wake up the poller when it is waiting. notifier: Packet, } @@ -119,7 +135,6 @@ impl Poller { )))?; let port = IoCompletionPort::new(0)?; - tracing::trace!(handle = ?port, "new"); Ok(Poller { @@ -129,7 +144,6 @@ impl Poller { waitables: RwLock::new(HashMap::new()), pending_updates: ConcurrentQueue::bounded(1024), polling: AtomicBool::new(false), - packets: Mutex::new(Vec::with_capacity(1024)), notifier: Arc::pin( PacketInner::Wakeup { _pinned: PhantomPinned, @@ -178,6 +192,7 @@ impl Poller { // Create a new packet. let socket_state = { + // Create a new socket state and assign an AFD handle to it. let state = SocketState { socket, base_socket: base_socket(socket)?, @@ -189,6 +204,7 @@ impl Poller { status: SocketStatus::Idle, }; + // We wrap this socket state in a Packet so the IOCP can use it. Arc::pin(IoStatusBlock::from(PacketInner::Socket { packet: UnsafeCell::new(AfdPollInfo::default()), socket: Mutex::new(state), @@ -249,6 +265,7 @@ impl Poller { // Set the new event. if source.as_ref().set_events(interest, mode) { + // The packet needs to be updated. self.update_packet(source)?; } @@ -264,7 +281,7 @@ impl Poller { ); let _enter = span.enter(); - // Get a reference to the source. + // Remove the source from our associative map. let source = { let mut sources = lock!(self.sources.write()); @@ -409,8 +426,8 @@ impl Poller { ); let _enter = span.enter(); + // Make sure we have a consistent timeout. let deadline = timeout.and_then(|timeout| Instant::now().checked_add(timeout)); - let mut packets = lock!(self.packets.lock()); let mut notified = false; events.packets.clear(); @@ -421,6 +438,7 @@ impl Poller { let was_polling = self.polling.swap(true, Ordering::SeqCst); debug_assert!(!was_polling); + // Even if we panic, we want to make sure we indicate that polling has stopped. let guard = CallOnDrop(|| { let was_polling = self.polling.swap(false, Ordering::SeqCst); debug_assert!(was_polling); @@ -433,7 +451,7 @@ impl Poller { let timeout = deadline.map(|t| t.saturating_duration_since(Instant::now())); // Wait for I/O events. - let len = self.port.wait(&mut packets, timeout)?; + let len = self.port.wait(&mut events.completions, timeout)?; tracing::trace!( handle = ?self.port, res = ?len, @@ -443,7 +461,7 @@ impl Poller { drop(guard); // Process all of the events. - for entry in packets.drain(..) { + for entry in events.completions.drain(..) { let packet = entry.into_packet(); // Feed the event into the packet. @@ -500,18 +518,22 @@ impl Poller { // If we failed to queue the update, we need to drain the queue first. self.drain_update_queue(true)?; + + // Loop back and try again. } } /// Drain the update queue. fn drain_update_queue(&self, limit: bool) -> io::Result<()> { + // Determine how many packets to process. let max = if limit { + // Only drain the queue's capacity, since this could in theory run forever. self.pending_updates.capacity().unwrap() } else { + // Less of a concern if we're draining the queue prior to a poll operation. std::usize::MAX }; - // Only drain the queue's capacity, since this could in theory run forever. self.pending_updates .try_iter() .take(max) @@ -519,11 +541,14 @@ impl Poller { } /// Get a handle to the AFD reference. + /// + /// This finds an AFD handle with less than 32 associated sockets, or creates a new one if + /// one does not exist. fn afd_handle(&self) -> io::Result>> { const AFD_MAX_SIZE: usize = 32; // Crawl the list and see if there are any existing AFD instances that we can use. - // Remove any unused AFD pointers. + // While we're here, remove any unused AFD pointers. let mut afd_handles = lock!(self.afd.lock()); let mut i = 0; while i < afd_handles.len() { @@ -561,7 +586,7 @@ impl Poller { // Register the AFD instance with the I/O completion port. self.port.register(&*afd, true)?; - // Insert a weak pointer to the AFD instance into the list. + // Insert a weak pointer to the AFD instance into the list for other sockets. afd_handles.push(Arc::downgrade(&afd)); Ok(afd) @@ -584,22 +609,77 @@ impl AsHandle for Poller { pub(super) struct Events { /// List of IOCP packets. packets: Vec, + + /// Buffer for completion packets. + completions: Vec>, } unsafe impl Send for Events {} impl Events { /// Creates an empty list of events. - pub(super) fn new() -> Events { + pub fn with_capacity(cap: usize) -> Events { Events { - packets: Vec::with_capacity(1024), + packets: Vec::with_capacity(cap), + completions: Vec::with_capacity(cap), } } /// Iterate over I/O events. - pub(super) fn iter(&self) -> impl Iterator + '_ { + pub fn iter(&self) -> impl Iterator + '_ { self.packets.iter().copied() } + + /// Clear the list of events. + pub fn clear(&mut self) { + self.packets.clear(); + } + + /// The capacity of the list of events. + pub fn capacity(&self) -> usize { + self.packets.capacity() + } +} + +/// Extra information about an event. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct EventExtra { + /// Flags associated with this event. + flags: AfdPollMask, +} + +impl EventExtra { + /// Create a new, empty version of this struct. + #[inline] + pub fn empty() -> EventExtra { + EventExtra { + flags: AfdPollMask::empty(), + } + } + + /// Is this a HUP event? + #[inline] + pub fn is_hup(&self) -> bool { + self.flags.intersects(AfdPollMask::ABORT) + } + + /// Is this a PRI event? + #[inline] + pub fn is_pri(&self) -> bool { + self.flags.intersects(AfdPollMask::RECEIVE_EXPEDITED) + } + + /// Set up a listener for HUP events. + #[inline] + pub fn set_hup(&mut self, active: bool) { + self.flags.set(AfdPollMask::ABORT, active); + } + + /// Set up a listener for PRI events. + #[inline] + pub fn set_pri(&mut self, active: bool) { + self.flags.set(AfdPollMask::RECEIVE_EXPEDITED, active); + } } /// A packet used to wake up the poller with an event. @@ -624,6 +704,8 @@ impl CompletionPacket { } /// The type of our completion packet. +/// +/// It needs to be pinned, since it contains data that is expected by IOCP not to be moved. type Packet = Pin>; type PacketUnwrapped = IoStatusBlock; @@ -698,9 +780,11 @@ impl PacketUnwrapped { socket.mode = mode; socket.interest_error = true; + // If there was a change, indicate that we need an update. match socket.status { - SocketStatus::Polling { readable, writable } => { - (interest.readable && !readable) || (interest.writable && !writable) + SocketStatus::Polling { flags } => { + let our_flags = event_to_afd_mask(socket.interest, socket.interest_error); + our_flags != flags } _ => true, } @@ -715,11 +799,18 @@ impl PacketUnwrapped { // Update if there is no ongoing wait. handle.status.is_idle() } - _ => false, + _ => true, } } /// Update the socket and install the new status in AFD. + /// + /// This function does one of the following: + /// + /// - Nothing, if the packet is waiting on being dropped anyways. + /// - Cancels the ongoing poll, if we want to poll for different events than we are currently + /// polling for. + /// - Starts a new AFD_POLL operation, if we are not currently polling. fn update(self: Pin>) -> io::Result<()> { let mut socket = match self.as_ref().data().project_ref() { PacketInnerProj::Socket { socket, .. } => lock!(socket.lock()), @@ -780,12 +871,11 @@ impl PacketUnwrapped { // Check the current status. match socket.status { - SocketStatus::Polling { readable, writable } => { + SocketStatus::Polling { flags } => { // If we need to poll for events aside from what we are currently polling, we need // to update the packet. Cancel the ongoing poll. - if (socket.interest.readable && !readable) - || (socket.interest.writable && !writable) - { + let our_flags = event_to_afd_mask(socket.interest, socket.interest_error); + if our_flags != flags { return self.cancel(socket); } @@ -801,15 +891,8 @@ impl PacketUnwrapped { SocketStatus::Idle => { // Start a new poll. - let result = socket.afd.poll( - self.clone(), - socket.base_socket, - event_to_afd_mask( - socket.interest.readable, - socket.interest.writable, - socket.interest_error, - ), - ); + let mask = event_to_afd_mask(socket.interest, socket.interest_error); + let result = socket.afd.poll(self.clone(), socket.base_socket, mask); match result { Ok(()) => {} @@ -830,10 +913,7 @@ impl PacketUnwrapped { } // We are now polling for the current events. - socket.status = SocketStatus::Polling { - readable: socket.interest.readable, - writable: socket.interest.writable, - }; + socket.status = SocketStatus::Polling { flags: mask }; Ok(()) } @@ -841,6 +921,9 @@ impl PacketUnwrapped { } /// This socket state was notified; see if we need to update it. + /// + /// This indicates that this packet was indicated as "ready" by the IOCP and needs to be + /// processed. fn feed_event(self: Pin>, poller: &Poller) -> io::Result { let inner = self.as_ref().data().project_ref(); @@ -902,6 +985,7 @@ impl PacketUnwrapped { // Check in on the AFD data. let afd_data = &*afd_info.get(); + // There was at least one event. if afd_data.handle_count() >= 1 { let events = afd_data.events(); @@ -917,6 +1001,7 @@ impl PacketUnwrapped { let (readable, writable) = afd_mask_to_event(events); event.readable = readable; event.writable = writable; + event.extra.flags = events; } } } @@ -928,7 +1013,13 @@ impl PacketUnwrapped { // If this event doesn't have anything that interests us, don't return or // update the oneshot state. - let return_value = if event.readable || event.writable { + let return_value = if event.readable + || event.writable + || event + .extra + .flags + .intersects(socket_state.interest.extra.flags) + { // If we are in oneshot mode, remove the interest. if matches!(socket_state.mode, PollMode::Oneshot) { socket_state.interest = Event::none(socket_state.interest.key); @@ -1028,11 +1119,8 @@ enum SocketStatus { /// We are currently polling these events. Polling { - /// We are currently polling for readable events. - readable: bool, - - /// We are currently polling for writable events. - writable: bool, + /// The flags we are currently polling for. + flags: AfdPollMask, }, /// The last poll operation was cancelled, and we're waiting for it to @@ -1161,7 +1249,15 @@ impl WaitHandle { } } -fn event_to_afd_mask(readable: bool, writable: bool, error: bool) -> afd::AfdPollMask { +/// Translate an event to the mask expected by AFD. +#[inline] +fn event_to_afd_mask(event: Event, error: bool) -> afd::AfdPollMask { + event_properties_to_afd_mask(event.readable, event.writable, error) | event.extra.flags +} + +/// Translate an event to the mask expected by AFD. +#[inline] +fn event_properties_to_afd_mask(readable: bool, writable: bool, error: bool) -> afd::AfdPollMask { use afd::AfdPollMask as AfdPoll; let mut mask = AfdPoll::empty(); @@ -1182,6 +1278,8 @@ fn event_to_afd_mask(readable: bool, writable: bool, error: bool) -> afd::AfdPol mask } +/// Convert the mask reported by AFD to an event. +#[inline] fn afd_mask_to_event(mask: afd::AfdPollMask) -> (bool, bool) { use afd::AfdPollMask as AfdPoll; diff --git a/src/kqueue.rs b/src/kqueue.rs index 46191b8..5ed6d09 100644 --- a/src/kqueue.rs +++ b/src/kqueue.rs @@ -221,9 +221,9 @@ unsafe impl Send for Events {} impl Events { /// Creates an empty list. - pub fn new() -> Events { + pub fn with_capacity(cap: usize) -> Events { Events { - list: Vec::with_capacity(1024), + list: Vec::with_capacity(cap), } } @@ -246,8 +246,55 @@ impl Events { writable: matches!(ev.filter(), kqueue::EventFilter::Write(..)) || (matches!(ev.filter(), kqueue::EventFilter::Read(..)) && (ev.flags().intersects(kqueue::EventFlags::EOF))), + extra: EventExtra, }) } + + /// Clears the list. + pub fn clear(&mut self) { + self.list.clear(); + } + + /// Get the capacity of the list. + pub fn capacity(&self) -> usize { + self.list.capacity() + } +} + +/// Extra information associated with an event. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct EventExtra; + +impl EventExtra { + /// Create a new, empty version of this struct. + #[inline] + pub fn empty() -> EventExtra { + EventExtra + } + + /// Set the interrupt flag. + #[inline] + pub fn set_hup(&mut self, _value: bool) { + // No-op. + } + + /// Set the priority flag. + #[inline] + pub fn set_pri(&mut self, _value: bool) { + // No-op. + } + + /// Is the interrupt flag set? + #[inline] + pub fn is_hup(&self) -> bool { + false + } + + /// Is the priority flag set? + #[inline] + pub fn is_pri(&self) -> bool { + false + } } pub(crate) fn mode_to_flags(mode: PollMode) -> kqueue::EventFlags { diff --git a/src/lib.rs b/src/lib.rs index 188ab87..ea2e7ad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,7 +18,7 @@ //! # Examples //! //! ```no_run -//! use polling::{Event, Poller}; +//! use polling::{Event, Events, Poller}; //! use std::net::TcpListener; //! //! // Create a TCP listener. @@ -33,13 +33,13 @@ //! } //! //! // The event loop. -//! let mut events = Vec::new(); +//! let mut events = Events::new(); //! loop { //! // Wait for at least one I/O event. //! events.clear(); //! poller.wait(&mut events, None)?; //! -//! for ev in &events { +//! for ev in events.iter() { //! if ev.key == key { //! // Perform a non-blocking accept operation. //! socket.accept()?; @@ -65,8 +65,11 @@ html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] +use std::cell::Cell; use std::fmt; use std::io; +use std::marker::PhantomData; +use std::num::NonZeroUsize; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Mutex; use std::time::Duration; @@ -131,6 +134,8 @@ pub struct Event { pub readable: bool, /// Can it do a write operation without blocking? pub writable: bool, + /// System-specific event data. + extra: sys::EventExtra, } /// The mode in which the poller waits for I/O events. @@ -186,6 +191,7 @@ impl Event { key, readable: true, writable: true, + extra: sys::EventExtra::empty(), } } @@ -197,6 +203,7 @@ impl Event { key, readable: true, writable: false, + extra: sys::EventExtra::empty(), } } @@ -208,6 +215,7 @@ impl Event { key, readable: false, writable: true, + extra: sys::EventExtra::empty(), } } @@ -219,14 +227,140 @@ impl Event { key, readable: false, writable: false, + extra: sys::EventExtra::empty(), } } + + /// Add interruption events to this interest. + /// + /// This usually indicates that the file descriptor or socket has been closed. It corresponds + /// to the `EPOLLHUP` and `POLLHUP` events. + /// + /// Interruption events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this function is a no-op. + #[inline] + pub fn set_interrupt(&mut self, active: bool) { + self.extra.set_hup(active); + } + + /// Add interruption events to this interest. + /// + /// This usually indicates that the file descriptor or socket has been closed. It corresponds + /// to the `EPOLLHUP` and `POLLHUP` events. + /// + /// Interruption events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this function is a no-op. + #[inline] + pub fn with_interrupt(mut self) -> Self { + self.set_interrupt(true); + self + } + + /// Add priority events to this interest. + /// + /// This indicates that there is urgent data to read. It corresponds to the `EPOLLPRI` and + /// `POLLPRI` events. + /// + /// Priority events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this function is a no-op. + #[inline] + pub fn set_priority(&mut self, active: bool) { + self.extra.set_pri(active); + } + + /// Add priority events to this interest. + /// + /// This indicates that there is urgent data to read. It corresponds to the `EPOLLPRI` and + /// `POLLPRI` events. + /// + /// Priority events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this function is a no-op. + #[inline] + pub fn with_priority(mut self) -> Self { + self.set_priority(true); + self + } + + /// Tell if this event is the result of an interrupt notification. + /// + /// This usually indicates that the file descriptor or socket has been closed. It corresponds + /// to the `EPOLLHUP` and `POLLHUP` events. + /// + /// Interruption events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this always returns `false`. + #[inline] + pub fn is_interrupt(&self) -> bool { + self.extra.is_hup() + } + + /// Tell if this event is the result of a priority notification. + /// + /// This indicates that there is urgent data to read. It corresponds to the `EPOLLPRI` and + /// `POLLPRI` events. + /// + /// Priority events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this always returns `false`. + #[inline] + pub fn is_priority(&self) -> bool { + self.extra.is_pri() + } + + /// Remove any extra information from this event. + #[inline] + pub fn clear_extra(&mut self) { + self.extra = sys::EventExtra::empty(); + } + + /// Get a version of this event with no extra information. + /// + /// This is useful for comparing events with `==`. + #[inline] + pub fn with_no_extra(mut self) -> Self { + self.clear_extra(); + self + } } /// Waits for I/O events. pub struct Poller { poller: sys::Poller, - events: Mutex, + lock: Mutex<()>, notified: AtomicBool, } @@ -244,7 +378,7 @@ impl Poller { pub fn new() -> io::Result { Ok(Poller { poller: sys::Poller::new()?, - events: Mutex::new(sys::Events::new()), + lock: Mutex::new(()), notified: AtomicBool::new(false), }) } @@ -492,7 +626,7 @@ impl Poller { /// # Examples /// /// ``` - /// use polling::{Event, Poller}; + /// use polling::{Event, Events, Poller}; /// use std::net::TcpListener; /// use std::time::Duration; /// @@ -505,26 +639,24 @@ impl Poller { /// poller.add(&socket, Event::all(key))?; /// } /// - /// let mut events = Vec::new(); + /// let mut events = Events::new(); /// let n = poller.wait(&mut events, Some(Duration::from_secs(1)))?; /// poller.delete(&socket)?; /// # std::io::Result::Ok(()) /// ``` - pub fn wait(&self, events: &mut Vec, timeout: Option) -> io::Result { + pub fn wait(&self, events: &mut Events, timeout: Option) -> io::Result { let span = tracing::trace_span!("Poller::wait", ?timeout); let _enter = span.enter(); - if let Ok(mut lock) = self.events.try_lock() { + if let Ok(_lock) = self.lock.try_lock() { // Wait for I/O events. - self.poller.wait(&mut lock, timeout)?; + self.poller.wait(&mut events.events, timeout)?; // Clear the notification, if any. self.notified.swap(false, Ordering::SeqCst); - // Collect events. - let len = events.len(); - events.extend(lock.iter().filter(|ev| ev.key != usize::MAX)); - Ok(events.len() - len) + // Indicate number of events. + Ok(events.len()) } else { tracing::trace!("wait: skipping because another thread is already waiting on I/O"); Ok(0) @@ -541,14 +673,14 @@ impl Poller { /// # Examples /// /// ``` - /// use polling::Poller; + /// use polling::{Events, Poller}; /// /// let poller = Poller::new()?; /// /// // Notify the poller. /// poller.notify()?; /// - /// let mut events = Vec::new(); + /// let mut events = Events::new(); /// poller.wait(&mut events, None)?; // wakes up immediately /// assert!(events.is_empty()); /// # std::io::Result::Ok(()) @@ -568,6 +700,165 @@ impl Poller { } } +/// A container for I/O events. +pub struct Events { + events: sys::Events, + + /// This is intended to be used from &mut, thread locally, so we should make it !Sync + /// for consistency with the rest of the API. + _not_sync: PhantomData>, +} + +impl Default for Events { + #[inline] + fn default() -> Self { + Self::new() + } +} + +impl Events { + /// Create a new container for events, using the default capacity. + /// + /// The default capacity is 1024. + /// + /// # Examples + /// + /// ``` + /// use polling::Events; + /// + /// let events = Events::new(); + /// ``` + #[inline] + pub fn new() -> Self { + // ESP-IDF has a low amount of RAM, so we use a smaller default capacity. + #[cfg(target_os = "espidf")] + const DEFAULT_CAPACITY: usize = 32; + + #[cfg(not(target_os = "espidf"))] + const DEFAULT_CAPACITY: usize = 1024; + + Self::with_capacity(NonZeroUsize::new(DEFAULT_CAPACITY).unwrap()) + } + + /// Create a new container with the provided capacity. + /// + /// # Examples + /// + /// ``` + /// use polling::Events; + /// use std::num::NonZeroUsize; + /// + /// let capacity = NonZeroUsize::new(1024).unwrap(); + /// let events = Events::with_capacity(capacity); + /// ``` + #[inline] + pub fn with_capacity(capacity: NonZeroUsize) -> Self { + Self { + events: sys::Events::with_capacity(capacity.get()), + _not_sync: PhantomData, + } + } + + /// Create a new iterator over I/O events. + /// + /// This returns all of the events in the container, excluding the notification event. + /// + /// # Examples + /// + /// ``` + /// use polling::{Event, Events, Poller}; + /// use std::time::Duration; + /// + /// # fn main() -> std::io::Result<()> { + /// let poller = Poller::new()?; + /// let mut events = Events::new(); + /// + /// poller.wait(&mut events, Some(Duration::from_secs(0)))?; + /// assert!(events.iter().next().is_none()); + /// # Ok(()) } + /// ``` + #[inline] + pub fn iter(&self) -> impl Iterator + '_ { + self.events.iter().filter(|ev| ev.key != NOTIFY_KEY) + } + + /// Delete all of the events in the container. + /// + /// # Examples + /// + /// ```no_run + /// use polling::{Event, Events, Poller}; + /// + /// # fn main() -> std::io::Result<()> { + /// let poller = Poller::new()?; + /// let mut events = Events::new(); + /// + /// /* register some sources */ + /// + /// poller.wait(&mut events, None)?; + /// + /// events.clear(); + /// # Ok(()) } + /// ``` + #[inline] + pub fn clear(&mut self) { + self.events.clear(); + } + + /// Returns the number of events in the container. + /// + /// # Examples + /// + /// ``` + /// use polling::Events; + /// + /// let events = Events::new(); + /// assert_eq!(events.len(), 0); + /// ``` + #[inline] + pub fn len(&self) -> usize { + self.iter().count() + } + + /// Returns `true` if the container contains no events. + /// + /// # Examples + /// + /// ``` + /// use polling::Events; + /// + /// let events = Events::new(); + /// assert!(events.is_empty()); + /// ``` + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Get the total capacity of the list. + /// + /// # Examples + /// + /// ``` + /// use polling::Events; + /// use std::num::NonZeroUsize; + /// + /// let cap = NonZeroUsize::new(10).unwrap(); + /// let events = Events::with_capacity(std::num::NonZeroUsize::new(10).unwrap()); + /// assert_eq!(events.capacity(), cap); + /// ``` + #[inline] + pub fn capacity(&self) -> NonZeroUsize { + NonZeroUsize::new(self.events.capacity()).unwrap() + } +} + +impl fmt::Debug for Events { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Events { .. }") + } +} + #[cfg(all( any( target_os = "linux", @@ -712,3 +1003,17 @@ cfg_if! { fn unsupported_error(err: impl Into) -> io::Error { io::Error::new(io::ErrorKind::Unsupported, err.into()) } + +fn _assert_send_and_sync() { + fn assert_send() {} + fn assert_sync() {} + + assert_send::(); + assert_sync::(); + + assert_send::(); + assert_sync::(); + + assert_send::(); + // Events can be !Sync +} diff --git a/src/os/iocp.rs b/src/os/iocp.rs index 625606c..bc37843 100644 --- a/src/os/iocp.rs +++ b/src/os/iocp.rs @@ -19,7 +19,7 @@ pub trait PollerIocpExt: PollerSealed { /// # Examples /// /// ```rust - /// use polling::{Poller, Event}; + /// use polling::{Poller, Event, Events}; /// use polling::os::iocp::{CompletionPacket, PollerIocpExt}; /// /// use std::thread; @@ -39,7 +39,7 @@ pub trait PollerIocpExt: PollerSealed { /// }); /// /// // Wait for the event. - /// let mut events = vec![]; + /// let mut events = Events::new(); /// poller.wait(&mut events, None)?; /// /// assert_eq!(events.len(), 1); @@ -72,7 +72,7 @@ pub trait PollerIocpExt: PollerSealed { /// # Examples /// /// ```no_run - /// use polling::{Poller, Event, PollMode}; + /// use polling::{Poller, Event, Events, PollMode}; /// use polling::os::iocp::PollerIocpExt; /// /// use std::process::Command; @@ -92,11 +92,11 @@ pub trait PollerIocpExt: PollerSealed { /// } /// /// // Wait for the child process to exit. - /// let mut events = vec![]; + /// let mut events = Events::new(); /// poller.wait(&mut events, None).unwrap(); /// /// assert_eq!(events.len(), 1); - /// assert_eq!(events[0], Event::all(0)); + /// assert_eq!(events.iter().next().unwrap(), Event::all(0)); /// ``` unsafe fn add_waitable( &self, @@ -115,7 +115,7 @@ pub trait PollerIocpExt: PollerSealed { /// # Examples /// /// ```no_run - /// use polling::{Poller, Event, PollMode}; + /// use polling::{Poller, Event, Events, PollMode}; /// use polling::os::iocp::PollerIocpExt; /// /// use std::process::Command; @@ -135,11 +135,11 @@ pub trait PollerIocpExt: PollerSealed { /// } /// /// // Wait for the child process to exit. - /// let mut events = vec![]; + /// let mut events = Events::new(); /// poller.wait(&mut events, None).unwrap(); /// /// assert_eq!(events.len(), 1); - /// assert_eq!(events[0], Event::all(0)); + /// assert_eq!(events.iter().next().unwrap(), Event::all(0)); /// /// // Modify the waitable handle. /// poller.modify_waitable(&child, Event::readable(0), PollMode::Oneshot).unwrap(); @@ -161,7 +161,7 @@ pub trait PollerIocpExt: PollerSealed { /// # Examples /// /// ```no_run - /// use polling::{Poller, Event, PollMode}; + /// use polling::{Poller, Event, Events, PollMode}; /// use polling::os::iocp::PollerIocpExt; /// /// use std::process::Command; @@ -181,11 +181,11 @@ pub trait PollerIocpExt: PollerSealed { /// } /// /// // Wait for the child process to exit. - /// let mut events = vec![]; + /// let mut events = Events::new(); /// poller.wait(&mut events, None).unwrap(); /// /// assert_eq!(events.len(), 1); - /// assert_eq!(events[0], Event::all(0)); + /// assert_eq!(events.iter().next().unwrap(), Event::all(0)); /// /// // Remove the waitable handle. /// poller.remove_waitable(&child).unwrap(); diff --git a/src/os/kqueue.rs b/src/os/kqueue.rs index 684bd3e..c3db033 100644 --- a/src/os/kqueue.rs +++ b/src/os/kqueue.rs @@ -31,7 +31,7 @@ pub trait PollerKqueueExt: PollerSealed { /// # Examples /// /// ```no_run - /// use polling::{Poller, PollMode}; + /// use polling::{Events, Poller, PollMode}; /// use polling::os::kqueue::{Filter, PollerKqueueExt, Signal}; /// /// let poller = Poller::new().unwrap(); @@ -40,7 +40,7 @@ pub trait PollerKqueueExt: PollerSealed { /// poller.add_filter(Signal(libc::SIGINT), 0, PollMode::Oneshot).unwrap(); /// /// // Wait for the signal. - /// let mut events = vec![]; + /// let mut events = Events::new(); /// poller.wait(&mut events, None).unwrap(); /// # let _ = events; /// ``` @@ -54,7 +54,7 @@ pub trait PollerKqueueExt: PollerSealed { /// # Examples /// /// ```no_run - /// use polling::{Poller, PollMode}; + /// use polling::{Events, Poller, PollMode}; /// use polling::os::kqueue::{Filter, PollerKqueueExt, Signal}; /// /// let poller = Poller::new().unwrap(); @@ -66,7 +66,7 @@ pub trait PollerKqueueExt: PollerSealed { /// poller.modify_filter(Signal(libc::SIGINT), 1, PollMode::Oneshot).unwrap(); /// /// // Wait for the signal. - /// let mut events = vec![]; + /// let mut events = Events::new(); /// poller.wait(&mut events, None).unwrap(); /// # let _ = events; /// ``` @@ -183,7 +183,12 @@ pub enum ProcessOps { impl<'a> Process<'a> { /// Monitor a child process. - pub fn new(child: &'a Child, ops: ProcessOps) -> Self { + /// + /// # Safety + /// + /// Once registered into the `Poller`, the `Child` object must outlive this filter's + /// registration into the poller. + pub unsafe fn new(child: &'a Child, ops: ProcessOps) -> Self { Self { child, ops } } } diff --git a/src/poll.rs b/src/poll.rs index b6e31ee..5199f87 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -268,10 +268,12 @@ impl Poller { let poll_fd = &mut fds.poll_fds[fd_data.poll_fds_index]; if !poll_fd.revents().is_empty() { // Store event + let revents = poll_fd.revents(); events.inner.push(Event { key: fd_data.key, - readable: poll_fd.revents().intersects(read_events()), - writable: poll_fd.revents().intersects(write_events()), + readable: revents.intersects(read_events()), + writable: revents.intersects(write_events()), + extra: EventExtra { flags: revents }, }); // Remove interest if necessary if fd_data.remove { @@ -364,14 +366,67 @@ pub struct Events { impl Events { /// Creates an empty list. - pub fn new() -> Events { - Self { inner: Vec::new() } + pub fn with_capacity(cap: usize) -> Events { + Self { + inner: Vec::with_capacity(cap), + } } /// Iterates over I/O events. pub fn iter(&self) -> impl Iterator + '_ { self.inner.iter().copied() } + + /// Clear the list. + pub fn clear(&mut self) { + self.inner.clear(); + } + + /// Get the capacity of the list. + pub fn capacity(&self) -> usize { + self.inner.capacity() + } +} + +/// Extra information associated with an event. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct EventExtra { + /// Flags associated with this event. + flags: PollFlags, +} + +impl EventExtra { + /// Creates an empty set of extra information. + #[inline] + pub fn empty() -> Self { + Self { + flags: PollFlags::empty(), + } + } + + /// Set the interrupt flag. + #[inline] + pub fn set_hup(&mut self, value: bool) { + self.flags.set(PollFlags::HUP, value); + } + + /// Set the priority flag. + #[inline] + pub fn set_pri(&mut self, value: bool) { + self.flags.set(PollFlags::PRI, value); + } + + /// Is this an interrupt event? + #[inline] + pub fn is_hup(&self) -> bool { + self.flags.contains(PollFlags::HUP) + } + + /// Is this a priority event? + #[inline] + pub fn is_pri(&self) -> bool { + self.flags.contains(PollFlags::PRI) + } } fn cvt_mode_as_remove(mode: PollMode) -> io::Result { diff --git a/src/port.rs b/src/port.rs index bd55b15..039c851 100644 --- a/src/port.rs +++ b/src/port.rs @@ -181,18 +181,73 @@ unsafe impl Send for Events {} impl Events { /// Creates an empty list. - pub fn new() -> Events { + pub fn with_capacity(cap: usize) -> Events { Events { - list: Vec::with_capacity(1024), + list: Vec::with_capacity(cap), } } /// Iterates over I/O events. pub fn iter(&self) -> impl Iterator + '_ { - self.list.iter().map(|ev| Event { - key: ev.userdata() as usize, - readable: PollFlags::from_bits_truncate(ev.events() as _).intersects(read_flags()), - writable: PollFlags::from_bits_truncate(ev.events() as _).intersects(write_flags()), + self.list.iter().map(|ev| { + let flags = PollFlags::from_bits_truncate(ev.events() as _); + Event { + key: ev.userdata() as usize, + readable: flags.intersects(read_flags()), + writable: flags.intersects(write_flags()), + extra: EventExtra { flags }, + } }) } + + /// Clear the list. + pub fn clear(&mut self) { + self.list.clear(); + } + + /// Get the capacity of the list. + pub fn capacity(&self) -> usize { + self.list.capacity() + } +} + +/// Extra information associated with an event. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct EventExtra { + /// Flags associated with this event. + flags: PollFlags, +} + +impl EventExtra { + /// Create a new, empty version of this struct. + #[inline] + pub fn empty() -> EventExtra { + EventExtra { + flags: PollFlags::empty(), + } + } + + /// Set the interrupt flag. + #[inline] + pub fn set_hup(&mut self, value: bool) { + self.flags.set(PollFlags::HUP, value); + } + + /// Set the priority flag. + #[inline] + pub fn set_pri(&mut self, value: bool) { + self.flags.set(PollFlags::PRI, value); + } + + /// Is this an interrupt event? + #[inline] + pub fn is_hup(&self) -> bool { + self.flags.contains(PollFlags::HUP) + } + + /// Is this a priority event? + #[inline] + pub fn is_pri(&self) -> bool { + self.flags.contains(PollFlags::PRI) + } } diff --git a/tests/concurrent_modification.rs b/tests/concurrent_modification.rs index 8cf6691..0797b0f 100644 --- a/tests/concurrent_modification.rs +++ b/tests/concurrent_modification.rs @@ -4,14 +4,14 @@ use std::thread; use std::time::Duration; use easy_parallel::Parallel; -use polling::{Event, Poller}; +use polling::{Event, Events, Poller}; #[test] fn concurrent_add() -> io::Result<()> { let (reader, mut writer) = tcp_pair()?; let poller = Poller::new()?; - let mut events = Vec::new(); + let mut events = Events::new(); let result = Parallel::new() .add(|| { @@ -33,7 +33,11 @@ fn concurrent_add() -> io::Result<()> { poller.delete(&reader)?; result?; - assert_eq!(events, [Event::readable(0)]); + assert_eq!(events.len(), 1); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(0) + ); Ok(()) } @@ -46,7 +50,7 @@ fn concurrent_modify() -> io::Result<()> { poller.add(&reader, Event::none(0))?; } - let mut events = Vec::new(); + let mut events = Events::new(); Parallel::new() .add(|| { @@ -63,7 +67,11 @@ fn concurrent_modify() -> io::Result<()> { .into_iter() .collect::>()?; - assert_eq!(events, [Event::readable(0)]); + assert_eq!(events.len(), 1); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(0) + ); Ok(()) } diff --git a/tests/io.rs b/tests/io.rs index 10b3d48..2e6ce04 100644 --- a/tests/io.rs +++ b/tests/io.rs @@ -1,4 +1,4 @@ -use polling::{Event, Poller}; +use polling::{Event, Events, Poller}; use std::io::{self, Write}; use std::net::{TcpListener, TcpStream}; use std::time::Duration; @@ -12,7 +12,7 @@ fn basic_io() { } // Nothing should be available at first. - let mut events = vec![]; + let mut events = Events::new(); assert_eq!( poller .wait(&mut events, Some(Duration::from_secs(0))) @@ -29,8 +29,12 @@ fn basic_io() { .unwrap(), 1 ); - assert_eq!(&*events, &[Event::readable(1)]); + assert_eq!(events.len(), 1); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(1) + ); poller.delete(&read).unwrap(); } diff --git a/tests/many_connections.rs b/tests/many_connections.rs index 41d640b..6a74c9e 100644 --- a/tests/many_connections.rs +++ b/tests/many_connections.rs @@ -7,6 +7,8 @@ use std::io::{self, prelude::*}; use std::net::{TcpListener, TcpStream}; use std::time::Duration; +use polling::Events; + #[test] fn many_connections() { // Create 100 connections. @@ -25,7 +27,7 @@ fn many_connections() { } } - let mut events = vec![]; + let mut events = Events::new(); while !connections.is_empty() { // Choose a random connection to write to. let i = fastrand::usize(..connections.len()); @@ -40,10 +42,12 @@ fn many_connections() { .unwrap(); // Check that the connection is readable. - assert_eq!(events.len(), 1, "events: {:?}", events); - assert_eq!(events[0].key, id); - assert!(events[0].readable); - assert!(!events[0].writable); + let current_events = events.iter().collect::>(); + assert_eq!(current_events.len(), 1, "events: {:?}", current_events); + assert_eq!( + current_events[0].with_no_extra(), + polling::Event::readable(id) + ); // Read the byte from the connection. let mut buf = [0]; diff --git a/tests/notify.rs b/tests/notify.rs index a5ca481..7dff0c3 100644 --- a/tests/notify.rs +++ b/tests/notify.rs @@ -3,16 +3,18 @@ use std::thread; use std::time::Duration; use easy_parallel::Parallel; +use polling::Events; use polling::Poller; #[test] fn simple() -> io::Result<()> { let poller = Poller::new()?; - let mut events = Vec::new(); + let mut events = Events::new(); for _ in 0..10 { poller.notify()?; poller.wait(&mut events, None)?; + assert!(events.is_empty()); } Ok(()) @@ -21,7 +23,7 @@ fn simple() -> io::Result<()> { #[test] fn concurrent() -> io::Result<()> { let poller = Poller::new()?; - let mut events = Vec::new(); + let mut events = Events::new(); for _ in 0..2 { Parallel::new() diff --git a/tests/other_modes.rs b/tests/other_modes.rs index 12ef1bc..407e42b 100644 --- a/tests/other_modes.rs +++ b/tests/other_modes.rs @@ -6,7 +6,7 @@ use std::io::{self, prelude::*}; use std::net::{TcpListener, TcpStream}; use std::time::Duration; -use polling::{Event, PollMode, Poller}; +use polling::{Event, Events, PollMode, Poller}; #[test] fn level_triggered() { @@ -34,12 +34,16 @@ fn level_triggered() { writer.write_all(&data).unwrap(); // A "readable" notification should be delivered. - let mut events = Vec::new(); + let mut events = Events::new(); poller .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!(events, [Event::readable(reader_token)]); + assert_eq!(events.len(), 1); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(reader_token) + ); // If we read some of the data, the notification should still be available. reader.read_exact(&mut [0; 3]).unwrap(); @@ -47,7 +51,12 @@ fn level_triggered() { poller .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!(events, [Event::readable(reader_token)]); + + assert_eq!(events.len(), 1); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(reader_token) + ); // If we read the rest of the data, the notification should be gone. reader.read_exact(&mut [0; 2]).unwrap(); @@ -56,7 +65,7 @@ fn level_triggered() { .wait(&mut events, Some(Duration::from_secs(0))) .unwrap(); - assert_eq!(events, []); + assert!(events.is_empty()); // After modifying the stream and sending more data, it should be oneshot. poller @@ -71,7 +80,11 @@ fn level_triggered() { .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!(events, [Event::readable(reader_token)]); + assert_eq!(events.len(), 1); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(reader_token) + ); // After reading, the notification should vanish. reader.read(&mut [0; 5]).unwrap(); @@ -80,7 +93,7 @@ fn level_triggered() { .wait(&mut events, Some(Duration::from_secs(0))) .unwrap(); - assert_eq!(events, []); + assert!(events.is_empty()); } #[test] @@ -123,12 +136,16 @@ fn edge_triggered() { writer.write_all(&data).unwrap(); // A "readable" notification should be delivered. - let mut events = Vec::new(); + let mut events = Events::new(); poller .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!(events, [Event::readable(reader_token)]); + assert_eq!(events.len(), 1); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(reader_token) + ); // If we read some of the data, the notification should not still be available. reader.read_exact(&mut [0; 3]).unwrap(); @@ -136,7 +153,7 @@ fn edge_triggered() { poller .wait(&mut events, Some(Duration::from_secs(0))) .unwrap(); - assert_eq!(events, []); + assert!(events.is_empty()); // If we write more data, a notification should be delivered. writer.write_all(&data).unwrap(); @@ -144,7 +161,12 @@ fn edge_triggered() { poller .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!(events, [Event::readable(reader_token)]); + + assert_eq!(events.len(), 1); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(reader_token) + ); // After modifying the stream and sending more data, it should be oneshot. poller @@ -157,7 +179,11 @@ fn edge_triggered() { .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!(events, [Event::readable(reader_token)]); + assert_eq!(events.len(), 1); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(reader_token) + ); } #[test] @@ -206,12 +232,16 @@ fn edge_oneshot_triggered() { writer.write_all(&data).unwrap(); // A "readable" notification should be delivered. - let mut events = Vec::new(); + let mut events = Events::new(); poller .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!(events, [Event::readable(reader_token)]); + assert_eq!(events.len(), 1); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(reader_token) + ); // If we read some of the data, the notification should not still be available. reader.read_exact(&mut [0; 3]).unwrap(); @@ -219,7 +249,7 @@ fn edge_oneshot_triggered() { poller .wait(&mut events, Some(Duration::from_secs(0))) .unwrap(); - assert_eq!(events, []); + assert!(events.is_empty()); // If we modify to re-enable the notification, it should be delivered. poller @@ -233,7 +263,12 @@ fn edge_oneshot_triggered() { poller .wait(&mut events, Some(Duration::from_secs(0))) .unwrap(); - assert_eq!(events, [Event::readable(reader_token)]); + + assert_eq!(events.len(), 1); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(reader_token) + ); } fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> { diff --git a/tests/precision.rs b/tests/precision.rs index de5d605..1776fa8 100644 --- a/tests/precision.rs +++ b/tests/precision.rs @@ -1,12 +1,12 @@ use std::io; use std::time::{Duration, Instant}; -use polling::Poller; +use polling::{Events, Poller}; #[test] fn below_ms() -> io::Result<()> { let poller = Poller::new()?; - let mut events = Vec::new(); + let mut events = Events::new(); let dur = Duration::from_micros(100); let margin = Duration::from_micros(500); @@ -42,7 +42,7 @@ fn below_ms() -> io::Result<()> { #[test] fn above_ms() -> io::Result<()> { let poller = Poller::new()?; - let mut events = Vec::new(); + let mut events = Events::new(); let dur = Duration::from_micros(3_100); let margin = Duration::from_micros(500); diff --git a/tests/timeout.rs b/tests/timeout.rs index abf902c..62763a1 100644 --- a/tests/timeout.rs +++ b/tests/timeout.rs @@ -1,12 +1,12 @@ use std::io; use std::time::{Duration, Instant}; -use polling::Poller; +use polling::{Events, Poller}; #[test] fn twice() -> io::Result<()> { let poller = Poller::new()?; - let mut events = Vec::new(); + let mut events = Events::new(); for _ in 0..2 { let start = Instant::now(); @@ -22,7 +22,7 @@ fn twice() -> io::Result<()> { #[test] fn non_blocking() -> io::Result<()> { let poller = Poller::new()?; - let mut events = Vec::new(); + let mut events = Events::new(); for _ in 0..100 { poller.wait(&mut events, Some(Duration::from_secs(0)))?; diff --git a/tests/windows_post.rs b/tests/windows_post.rs index 488fab3..33c3b6c 100644 --- a/tests/windows_post.rs +++ b/tests/windows_post.rs @@ -3,7 +3,7 @@ #![cfg(windows)] use polling::os::iocp::{CompletionPacket, PollerIocpExt}; -use polling::{Event, Poller}; +use polling::{Event, Events, Poller}; use std::sync::Arc; use std::thread; @@ -12,7 +12,7 @@ use std::time::Duration; #[test] fn post_smoke() { let poller = Poller::new().unwrap(); - let mut events = Vec::new(); + let mut events = Events::new(); poller .post(CompletionPacket::new(Event::readable(1))) @@ -20,13 +20,16 @@ fn post_smoke() { poller.wait(&mut events, None).unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events[0], Event::readable(1)); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(1) + ); } #[test] fn post_multithread() { let poller = Arc::new(Poller::new().unwrap()); - let mut events = Vec::new(); + let mut events = Events::new(); thread::spawn({ let poller = Arc::clone(&poller); @@ -47,7 +50,11 @@ fn post_multithread() { .unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events.pop(), Some(Event::writable(i))); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::writable(i) + ); + events.clear(); } poller diff --git a/tests/windows_waitable.rs b/tests/windows_waitable.rs index bd17a72..dcd0edc 100644 --- a/tests/windows_waitable.rs +++ b/tests/windows_waitable.rs @@ -3,7 +3,7 @@ #![cfg(windows)] use polling::os::iocp::PollerIocpExt; -use polling::{Event, PollMode, Poller}; +use polling::{Event, Events, PollMode, Poller}; use windows_sys::Win32::Foundation::CloseHandle; use windows_sys::Win32::System::Threading::{CreateEventW, ResetEvent, SetEvent}; @@ -85,7 +85,7 @@ fn smoke() { .unwrap(); } - let mut events = vec![]; + let mut events = Events::new(); poller .wait(&mut events, Some(Duration::from_millis(100))) .unwrap(); @@ -100,7 +100,7 @@ fn smoke() { .unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events[0], Event::all(0)); + assert_eq!(events.iter().next().unwrap().with_no_extra(), Event::all(0)); // Interest should be cleared. events.clear(); @@ -121,7 +121,7 @@ fn smoke() { .unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events[0], Event::all(0)); + assert_eq!(events.iter().next().unwrap().with_no_extra(), Event::all(0)); // If we reset the event, it should not be signaled. event.reset().unwrap();