mirror of https://github.com/smol-rs/polling
feat: Add support for waiting on waitable handles
* Add support for polling waitable handles * Add a smoke test * Fix failing tests * Rebase on latest master Signed-off-by: John Nunley <dev@notgull.net> * Update semantics for new system Signed-off-by: John Nunley <dev@notgull.net> * Forgot about doctests Signed-off-by: John Nunley <dev@notgull.net> --------- Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
parent
6d13def8ab
commit
c6a0890627
|
@ -38,6 +38,7 @@ version = "0.48"
|
|||
features = [
|
||||
"Win32_Foundation",
|
||||
"Win32_Networking_WinSock",
|
||||
"Win32_Security",
|
||||
"Win32_Storage_FileSystem",
|
||||
"Win32_System_IO",
|
||||
"Win32_System_LibraryLoader",
|
||||
|
|
|
@ -536,6 +536,9 @@ impl<T: fmt::Debug> fmt::Debug for IoStatusBlock<T> {
|
|||
}
|
||||
}
|
||||
|
||||
unsafe impl<T: Send> Send for IoStatusBlock<T> {}
|
||||
unsafe impl<T: Sync> Sync for IoStatusBlock<T> {}
|
||||
|
||||
impl<T> From<T> for IoStatusBlock<T> {
|
||||
fn from(data: T) -> Self {
|
||||
Self {
|
||||
|
|
411
src/iocp/mod.rs
411
src/iocp/mod.rs
|
@ -30,7 +30,14 @@ mod port;
|
|||
|
||||
use afd::{base_socket, Afd, AfdPollInfo, AfdPollMask, HasAfdInfo, IoStatusBlock};
|
||||
use port::{IoCompletionPort, OverlappedEntry};
|
||||
use windows_sys::Win32::Foundation::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING, STATUS_CANCELLED};
|
||||
|
||||
use windows_sys::Win32::Foundation::{
|
||||
BOOLEAN, ERROR_INVALID_HANDLE, ERROR_IO_PENDING, STATUS_CANCELLED,
|
||||
};
|
||||
use windows_sys::Win32::System::Threading::{
|
||||
RegisterWaitForSingleObject, UnregisterWait, INFINITE, WT_EXECUTELONGFUNCTION,
|
||||
WT_EXECUTEONLYONCE,
|
||||
};
|
||||
|
||||
use crate::{Event, PollMode};
|
||||
|
||||
|
@ -39,9 +46,12 @@ use pin_project_lite::pin_project;
|
|||
|
||||
use std::cell::UnsafeCell;
|
||||
use std::collections::hash_map::{Entry, HashMap};
|
||||
use std::convert::TryFrom;
|
||||
use std::ffi::c_void;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::marker::PhantomPinned;
|
||||
use std::mem::{forget, MaybeUninit};
|
||||
use std::os::windows::io::{
|
||||
AsHandle, AsRawHandle, AsRawSocket, BorrowedHandle, BorrowedSocket, RawHandle, RawSocket,
|
||||
};
|
||||
|
@ -61,7 +71,7 @@ macro_rules! lock {
|
|||
#[derive(Debug)]
|
||||
pub(super) struct Poller {
|
||||
/// The I/O completion port.
|
||||
port: IoCompletionPort<Packet>,
|
||||
port: Arc<IoCompletionPort<Packet>>,
|
||||
|
||||
/// List of currently active AFD instances.
|
||||
///
|
||||
|
@ -72,6 +82,9 @@ pub(super) struct Poller {
|
|||
/// The state of the sources registered with this poller.
|
||||
sources: RwLock<HashMap<RawSocket, Packet>>,
|
||||
|
||||
/// The state of the waitable handles registered with this poller.
|
||||
waitables: RwLock<HashMap<RawHandle, Packet>>,
|
||||
|
||||
/// Sockets with pending updates.
|
||||
pending_updates: ConcurrentQueue<Packet>,
|
||||
|
||||
|
@ -110,9 +123,10 @@ impl Poller {
|
|||
tracing::trace!(handle = ?port, "new");
|
||||
|
||||
Ok(Poller {
|
||||
port,
|
||||
port: Arc::new(port),
|
||||
afd: Mutex::new(vec![]),
|
||||
sources: RwLock::new(HashMap::new()),
|
||||
waitables: RwLock::new(HashMap::new()),
|
||||
pending_updates: ConcurrentQueue::bounded(1024),
|
||||
polling: AtomicBool::new(false),
|
||||
packets: Mutex::new(Vec::with_capacity(1024)),
|
||||
|
@ -268,6 +282,124 @@ impl Poller {
|
|||
source.begin_delete()
|
||||
}
|
||||
|
||||
/// Add a new waitable to the poller.
|
||||
pub(super) fn add_waitable(
|
||||
&self,
|
||||
handle: RawHandle,
|
||||
interest: Event,
|
||||
mode: PollMode,
|
||||
) -> io::Result<()> {
|
||||
tracing::trace!(
|
||||
"add_waitable: handle={:?}, waitable={:p}, ev={:?}",
|
||||
self.port,
|
||||
handle,
|
||||
interest
|
||||
);
|
||||
|
||||
// We don't support edge-triggered events.
|
||||
if matches!(mode, PollMode::Edge | PollMode::EdgeOneshot) {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"edge-triggered events are not supported",
|
||||
));
|
||||
}
|
||||
|
||||
// Create a new packet.
|
||||
let handle_state = {
|
||||
let state = WaitableState {
|
||||
handle,
|
||||
port: Arc::downgrade(&self.port),
|
||||
interest,
|
||||
mode,
|
||||
status: WaitableStatus::Idle,
|
||||
};
|
||||
|
||||
Arc::pin(IoStatusBlock::from(PacketInner::Waitable {
|
||||
handle: Mutex::new(state),
|
||||
}))
|
||||
};
|
||||
|
||||
// Keep track of the source in the poller.
|
||||
{
|
||||
let mut sources = lock!(self.waitables.write());
|
||||
|
||||
match sources.entry(handle) {
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(Pin::<Arc<_>>::clone(&handle_state));
|
||||
}
|
||||
|
||||
Entry::Occupied(_) => {
|
||||
return Err(io::Error::from(io::ErrorKind::AlreadyExists));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update the packet.
|
||||
self.update_packet(handle_state)
|
||||
}
|
||||
|
||||
/// Update a waitable in the poller.
|
||||
pub(crate) fn modify_waitable(
|
||||
&self,
|
||||
waitable: RawHandle,
|
||||
interest: Event,
|
||||
mode: PollMode,
|
||||
) -> io::Result<()> {
|
||||
tracing::trace!(
|
||||
"modify_waitable: handle={:?}, waitable={:p}, ev={:?}",
|
||||
self.port,
|
||||
waitable,
|
||||
interest
|
||||
);
|
||||
|
||||
// We don't support edge-triggered events.
|
||||
if matches!(mode, PollMode::Edge | PollMode::EdgeOneshot) {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"edge-triggered events are not supported",
|
||||
));
|
||||
}
|
||||
|
||||
// Get a reference to the source.
|
||||
let source = {
|
||||
let sources = lock!(self.waitables.read());
|
||||
|
||||
sources
|
||||
.get(&waitable)
|
||||
.cloned()
|
||||
.ok_or_else(|| io::Error::from(io::ErrorKind::NotFound))?
|
||||
};
|
||||
|
||||
// Set the new event.
|
||||
if source.as_ref().set_events(interest, mode) {
|
||||
self.update_packet(source)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete a waitable from the poller.
|
||||
pub(super) fn remove_waitable(&self, waitable: RawHandle) -> io::Result<()> {
|
||||
tracing::trace!("remove: handle={:?}, waitable={:p}", self.port, waitable);
|
||||
|
||||
// Get a reference to the source.
|
||||
let source = {
|
||||
let mut sources = lock!(self.waitables.write());
|
||||
|
||||
match sources.remove(&waitable) {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
// If the source has already been removed, then we can just return.
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Indicate to the source that it is being deleted.
|
||||
// This cancels any ongoing AFD_IOCTL_POLL operations.
|
||||
source.begin_delete()
|
||||
}
|
||||
|
||||
/// Wait for events.
|
||||
pub(super) fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
|
||||
let span = tracing::trace_span!(
|
||||
|
@ -510,6 +642,11 @@ pin_project! {
|
|||
socket: Mutex<SocketState>
|
||||
},
|
||||
|
||||
/// A packet for a waitable handle.
|
||||
Waitable {
|
||||
handle: Mutex<WaitableState>
|
||||
},
|
||||
|
||||
/// A custom event sent by the user.
|
||||
Custom {
|
||||
event: Event,
|
||||
|
@ -520,6 +657,9 @@ pin_project! {
|
|||
}
|
||||
}
|
||||
|
||||
unsafe impl Send for PacketInner {}
|
||||
unsafe impl Sync for PacketInner {}
|
||||
|
||||
impl fmt::Debug for PacketInner {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
|
@ -530,6 +670,9 @@ impl fmt::Debug for PacketInner {
|
|||
.field("packet", &"..")
|
||||
.field("socket", socket)
|
||||
.finish(),
|
||||
Self::Waitable { handle } => {
|
||||
f.debug_struct("Waitable").field("handle", handle).finish()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -548,28 +691,86 @@ impl PacketUnwrapped {
|
|||
///
|
||||
/// Returns `true` if we need to be updated.
|
||||
fn set_events(self: Pin<&Self>, interest: Event, mode: PollMode) -> bool {
|
||||
let mut socket = match self.socket_state() {
|
||||
Some(s) => s,
|
||||
None => return false,
|
||||
};
|
||||
match self.data().project_ref() {
|
||||
PacketInnerProj::Socket { socket, .. } => {
|
||||
let mut socket = lock!(socket.lock());
|
||||
socket.interest = interest;
|
||||
socket.mode = mode;
|
||||
socket.interest_error = true;
|
||||
|
||||
socket.interest = interest;
|
||||
socket.mode = mode;
|
||||
socket.interest_error = true;
|
||||
|
||||
match socket.status {
|
||||
SocketStatus::Polling { readable, writable } => {
|
||||
(interest.readable && !readable) || (interest.writable && !writable)
|
||||
match socket.status {
|
||||
SocketStatus::Polling { readable, writable } => {
|
||||
(interest.readable && !readable) || (interest.writable && !writable)
|
||||
}
|
||||
_ => true,
|
||||
}
|
||||
}
|
||||
_ => true,
|
||||
PacketInnerProj::Waitable { handle } => {
|
||||
let mut handle = lock!(handle.lock());
|
||||
|
||||
// Set the new interest.
|
||||
handle.interest = interest;
|
||||
handle.mode = mode;
|
||||
|
||||
// Update if there is no ongoing wait.
|
||||
handle.status.is_idle()
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the socket and install the new status in AFD.
|
||||
fn update(self: Pin<Arc<Self>>) -> io::Result<()> {
|
||||
let mut socket = match self.as_ref().socket_state() {
|
||||
Some(s) => s,
|
||||
None => return Err(io::Error::new(io::ErrorKind::Other, "invalid socket state")),
|
||||
let mut socket = match self.as_ref().data().project_ref() {
|
||||
PacketInnerProj::Socket { socket, .. } => lock!(socket.lock()),
|
||||
PacketInnerProj::Waitable { handle } => {
|
||||
let mut handle = lock!(handle.lock());
|
||||
|
||||
// If there is no interests, or if we have been cancelled, we don't need to update.
|
||||
if !handle.interest.readable && !handle.interest.writable {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// If we are idle, we need to update.
|
||||
if !handle.status.is_idle() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Start a new wait.
|
||||
let packet = self.clone();
|
||||
let wait_handle = WaitHandle::new(
|
||||
handle.handle,
|
||||
move || {
|
||||
let mut handle = match packet.as_ref().data().project_ref() {
|
||||
PacketInnerProj::Waitable { handle } => lock!(handle.lock()),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
// Try to get the IOCP.
|
||||
let iocp = match handle.port.upgrade() {
|
||||
Some(iocp) => iocp,
|
||||
None => return,
|
||||
};
|
||||
|
||||
// Set us back into the idle state.
|
||||
handle.status = WaitableStatus::Idle;
|
||||
|
||||
// Push this packet.
|
||||
drop(handle);
|
||||
if let Err(e) = iocp.post(0, 0, packet) {
|
||||
tracing::error!("failed to post completion packet: {}", e);
|
||||
}
|
||||
},
|
||||
None,
|
||||
false,
|
||||
)?;
|
||||
|
||||
// Set the new status.
|
||||
handle.status = WaitableStatus::Waiting(wait_handle);
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
_ => return Err(io::Error::new(io::ErrorKind::Other, "invalid socket state")),
|
||||
};
|
||||
|
||||
// If we are waiting on a delete, just return, dropping the packet.
|
||||
|
@ -653,6 +854,21 @@ impl PacketUnwrapped {
|
|||
// The poller was notified.
|
||||
return Ok(FeedEventResult::Notified);
|
||||
}
|
||||
PacketInnerProj::Waitable { handle } => {
|
||||
let mut handle = lock!(handle.lock());
|
||||
let event = handle.interest;
|
||||
|
||||
// Clear the events if we are in one-shot mode.
|
||||
if matches!(handle.mode, PollMode::Oneshot) {
|
||||
handle.interest = Event::none(handle.interest.key);
|
||||
}
|
||||
|
||||
// Submit for an update.
|
||||
drop(handle);
|
||||
poller.update_packet(self)?;
|
||||
|
||||
return Ok(FeedEventResult::Event(event));
|
||||
}
|
||||
};
|
||||
|
||||
let mut socket_state = lock!(socket.lock());
|
||||
|
@ -735,10 +951,19 @@ impl PacketUnwrapped {
|
|||
/// Begin deleting this socket.
|
||||
fn begin_delete(self: Pin<Arc<Self>>) -> io::Result<()> {
|
||||
// If we aren't already being deleted, start deleting.
|
||||
let mut socket = self
|
||||
.as_ref()
|
||||
.socket_state()
|
||||
.expect("can't delete packet that doesn't belong to a socket");
|
||||
let mut socket = match self.as_ref().data().project_ref() {
|
||||
PacketInnerProj::Socket { socket, .. } => lock!(socket.lock()),
|
||||
PacketInnerProj::Waitable { handle } => {
|
||||
let mut handle = lock!(handle.lock());
|
||||
|
||||
// Set the status to be cancelled. This drops the wait handle and prevents
|
||||
// any further updates.
|
||||
handle.status = WaitableStatus::Cancelled;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
_ => panic!("can't delete packet that doesn't belong to a socket"),
|
||||
};
|
||||
if !socket.waiting_on_delete {
|
||||
socket.waiting_on_delete = true;
|
||||
|
||||
|
@ -765,17 +990,6 @@ impl PacketUnwrapped {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn socket_state(self: Pin<&Self>) -> Option<MutexGuard<'_, SocketState>> {
|
||||
let inner = self.data().project_ref();
|
||||
|
||||
let state = match inner {
|
||||
PacketInnerProj::Socket { socket, .. } => socket,
|
||||
_ => return None,
|
||||
};
|
||||
|
||||
Some(lock!(state.lock()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Per-socket state.
|
||||
|
@ -826,6 +1040,43 @@ enum SocketStatus {
|
|||
Cancelled,
|
||||
}
|
||||
|
||||
/// Per-waitable handle state.
|
||||
#[derive(Debug)]
|
||||
struct WaitableState {
|
||||
/// The handle that this state is for.
|
||||
handle: RawHandle,
|
||||
|
||||
/// The IO completion port that this handle is registered with.
|
||||
port: Weak<IoCompletionPort<Packet>>,
|
||||
|
||||
/// The event that this handle will report.
|
||||
interest: Event,
|
||||
|
||||
/// The current poll mode.
|
||||
mode: PollMode,
|
||||
|
||||
/// The status of this waitable.
|
||||
status: WaitableStatus,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum WaitableStatus {
|
||||
/// We are not polling.
|
||||
Idle,
|
||||
|
||||
/// We are waiting on this handle to become signaled.
|
||||
Waiting(WaitHandle),
|
||||
|
||||
/// This handle has been cancelled.
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl WaitableStatus {
|
||||
fn is_idle(&self) -> bool {
|
||||
matches!(self, WaitableStatus::Idle)
|
||||
}
|
||||
}
|
||||
|
||||
/// The result of calling `feed_event`.
|
||||
#[derive(Debug)]
|
||||
enum FeedEventResult {
|
||||
|
@ -839,6 +1090,77 @@ enum FeedEventResult {
|
|||
Notified,
|
||||
}
|
||||
|
||||
/// A handle for an ongoing wait operation.
|
||||
#[derive(Debug)]
|
||||
struct WaitHandle(RawHandle);
|
||||
|
||||
impl Drop for WaitHandle {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
UnregisterWait(self.0 as _);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WaitHandle {
|
||||
/// Wait for a waitable handle to become signaled.
|
||||
fn new<F>(
|
||||
handle: RawHandle,
|
||||
callback: F,
|
||||
timeout: Option<Duration>,
|
||||
long_wait: bool,
|
||||
) -> io::Result<Self>
|
||||
where
|
||||
F: FnOnce() + Send + Sync + 'static,
|
||||
{
|
||||
// Make sure a panic in the callback doesn't propagate to the OS.
|
||||
struct AbortOnDrop;
|
||||
|
||||
impl Drop for AbortOnDrop {
|
||||
fn drop(&mut self) {
|
||||
std::process::abort();
|
||||
}
|
||||
}
|
||||
|
||||
unsafe extern "system" fn wait_callback<F: FnOnce() + Send + Sync + 'static>(
|
||||
context: *mut c_void,
|
||||
_timer_fired: BOOLEAN,
|
||||
) {
|
||||
let _guard = AbortOnDrop;
|
||||
let callback = Box::from_raw(context as *mut F);
|
||||
callback();
|
||||
|
||||
// We executed without panicking, so don't abort.
|
||||
forget(_guard);
|
||||
}
|
||||
|
||||
let mut wait_handle = MaybeUninit::<RawHandle>::uninit();
|
||||
|
||||
let mut flags = WT_EXECUTEONLYONCE;
|
||||
if long_wait {
|
||||
flags |= WT_EXECUTELONGFUNCTION;
|
||||
}
|
||||
|
||||
let res = unsafe {
|
||||
RegisterWaitForSingleObject(
|
||||
wait_handle.as_mut_ptr().cast::<_>(),
|
||||
handle as _,
|
||||
Some(wait_callback::<F>),
|
||||
Box::into_raw(Box::new(callback)) as _,
|
||||
timeout.map_or(INFINITE, dur2timeout),
|
||||
flags,
|
||||
)
|
||||
};
|
||||
|
||||
if res == 0 {
|
||||
return Err(io::Error::last_os_error());
|
||||
}
|
||||
|
||||
let wait_handle = unsafe { wait_handle.assume_init() };
|
||||
Ok(Self(wait_handle))
|
||||
}
|
||||
}
|
||||
|
||||
fn event_to_afd_mask(readable: bool, writable: bool, error: bool) -> afd::AfdPollMask {
|
||||
use afd::AfdPollMask as AfdPoll;
|
||||
|
||||
|
@ -884,6 +1206,29 @@ fn afd_mask_to_event(mask: afd::AfdPollMask) -> (bool, bool) {
|
|||
(readable, writable)
|
||||
}
|
||||
|
||||
// Implementation taken from https://github.com/rust-lang/rust/blob/db5476571d9b27c862b95c1e64764b0ac8980e23/src/libstd/sys/windows/mod.rs
|
||||
fn dur2timeout(dur: Duration) -> u32 {
|
||||
// Note that a duration is a (u64, u32) (seconds, nanoseconds) pair, and the
|
||||
// timeouts in windows APIs are typically u32 milliseconds. To translate, we
|
||||
// have two pieces to take care of:
|
||||
//
|
||||
// * Nanosecond precision is rounded up
|
||||
// * Greater than u32::MAX milliseconds (50 days) is rounded up to INFINITE
|
||||
// (never time out).
|
||||
dur.as_secs()
|
||||
.checked_mul(1000)
|
||||
.and_then(|ms| ms.checked_add((dur.subsec_nanos() as u64) / 1_000_000))
|
||||
.and_then(|ms| {
|
||||
if dur.subsec_nanos() % 1_000_000 > 0 {
|
||||
ms.checked_add(1)
|
||||
} else {
|
||||
Some(ms)
|
||||
}
|
||||
})
|
||||
.and_then(|x| u32::try_from(x).ok())
|
||||
.unwrap_or(INFINITE)
|
||||
}
|
||||
|
||||
struct CallOnDrop<F: FnMut()>(F);
|
||||
|
||||
impl<F: FnMut()> Drop for CallOnDrop<F> {
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
//! A safe wrapper around the Windows I/O API.
|
||||
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
use super::dur2timeout;
|
||||
|
||||
use std::convert::TryInto;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::marker::PhantomData;
|
||||
|
@ -296,29 +298,6 @@ impl<T: CompletionHandle> Drop for OverlappedEntry<T> {
|
|||
}
|
||||
}
|
||||
|
||||
// Implementation taken from https://github.com/rust-lang/rust/blob/db5476571d9b27c862b95c1e64764b0ac8980e23/src/libstd/sys/windows/mod.rs
|
||||
fn dur2timeout(dur: Duration) -> u32 {
|
||||
// Note that a duration is a (u64, u32) (seconds, nanoseconds) pair, and the
|
||||
// timeouts in windows APIs are typically u32 milliseconds. To translate, we
|
||||
// have two pieces to take care of:
|
||||
//
|
||||
// * Nanosecond precision is rounded up
|
||||
// * Greater than u32::MAX milliseconds (50 days) is rounded up to INFINITE
|
||||
// (never time out).
|
||||
dur.as_secs()
|
||||
.checked_mul(1000)
|
||||
.and_then(|ms| ms.checked_add((dur.subsec_nanos() as u64) / 1_000_000))
|
||||
.and_then(|ms| {
|
||||
if dur.subsec_nanos() % 1_000_000 > 0 {
|
||||
ms.checked_add(1)
|
||||
} else {
|
||||
Some(ms)
|
||||
}
|
||||
})
|
||||
.and_then(|x| u32::try_from(x).ok())
|
||||
.unwrap_or(INFINITE)
|
||||
}
|
||||
|
||||
struct CallOnDrop<F: FnMut()>(F);
|
||||
|
||||
impl<F: FnMut()> Drop for CallOnDrop<F> {
|
||||
|
|
203
src/os/iocp.rs
203
src/os/iocp.rs
|
@ -3,8 +3,11 @@
|
|||
pub use crate::sys::CompletionPacket;
|
||||
|
||||
use super::__private::PollerSealed;
|
||||
use crate::Poller;
|
||||
use crate::{Event, PollMode, Poller};
|
||||
|
||||
use std::io;
|
||||
use std::os::windows::io::{AsRawHandle, RawHandle};
|
||||
use std::os::windows::prelude::{AsHandle, BorrowedHandle};
|
||||
|
||||
/// Extension trait for the [`Poller`] type that provides functionality specific to IOCP-based
|
||||
/// platforms.
|
||||
|
@ -43,10 +46,208 @@ pub trait PollerIocpExt: PollerSealed {
|
|||
/// # Ok(()) }
|
||||
/// ```
|
||||
fn post(&self, packet: CompletionPacket) -> io::Result<()>;
|
||||
|
||||
/// Add a waitable handle to this poller.
|
||||
///
|
||||
/// Some handles in Windows are "waitable", which means that they emit a "readiness" signal
|
||||
/// after some event occurs. This function can be used to wait for such events to occur
|
||||
/// on a handle. This function can be used in addition to regular socket polling.
|
||||
///
|
||||
/// Waitable objects include the following:
|
||||
///
|
||||
/// - Console inputs
|
||||
/// - Waitable events
|
||||
/// - Mutexes
|
||||
/// - Processes
|
||||
/// - Semaphores
|
||||
/// - Threads
|
||||
/// - Timer
|
||||
///
|
||||
/// Once the object has been signalled, the poller will emit the `interest` event.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// The added handle must not be dropped before it is deleted.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use polling::{Poller, Event, PollMode};
|
||||
/// use polling::os::iocp::PollerIocpExt;
|
||||
///
|
||||
/// use std::process::Command;
|
||||
///
|
||||
/// // Spawn a new process.
|
||||
/// let mut child = Command::new("echo")
|
||||
/// .arg("Hello, world!")
|
||||
/// .spawn()
|
||||
/// .unwrap();
|
||||
///
|
||||
/// // Create a new poller.
|
||||
/// let poller = Poller::new().unwrap();
|
||||
///
|
||||
/// // Add the child process to the poller.
|
||||
/// unsafe {
|
||||
/// poller.add_waitable(&child, Event::all(0), PollMode::Oneshot).unwrap();
|
||||
/// }
|
||||
///
|
||||
/// // Wait for the child process to exit.
|
||||
/// let mut events = vec![];
|
||||
/// poller.wait(&mut events, None).unwrap();
|
||||
///
|
||||
/// assert_eq!(events.len(), 1);
|
||||
/// assert_eq!(events[0], Event::all(0));
|
||||
/// ```
|
||||
unsafe fn add_waitable(
|
||||
&self,
|
||||
handle: impl AsRawWaitable,
|
||||
interest: Event,
|
||||
mode: PollMode,
|
||||
) -> io::Result<()>;
|
||||
|
||||
/// Modify an existing waitable handle.
|
||||
///
|
||||
/// This function can be used to change the emitted event and/or mode of an existing waitable
|
||||
/// handle. The handle must have been previously added to the poller using [`add_waitable`].
|
||||
///
|
||||
/// [`add_waitable`]: Self::add_waitable
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use polling::{Poller, Event, PollMode};
|
||||
/// use polling::os::iocp::PollerIocpExt;
|
||||
///
|
||||
/// use std::process::Command;
|
||||
///
|
||||
/// // Spawn a new process.
|
||||
/// let mut child = Command::new("echo")
|
||||
/// .arg("Hello, world!")
|
||||
/// .spawn()
|
||||
/// .unwrap();
|
||||
///
|
||||
/// // Create a new poller.
|
||||
/// let poller = Poller::new().unwrap();
|
||||
///
|
||||
/// // Add the child process to the poller.
|
||||
/// unsafe {
|
||||
/// poller.add_waitable(&child, Event::all(0), PollMode::Oneshot).unwrap();
|
||||
/// }
|
||||
///
|
||||
/// // Wait for the child process to exit.
|
||||
/// let mut events = vec![];
|
||||
/// poller.wait(&mut events, None).unwrap();
|
||||
///
|
||||
/// assert_eq!(events.len(), 1);
|
||||
/// assert_eq!(events[0], Event::all(0));
|
||||
///
|
||||
/// // Modify the waitable handle.
|
||||
/// poller.modify_waitable(&child, Event::readable(0), PollMode::Oneshot).unwrap();
|
||||
/// ```
|
||||
fn modify_waitable(
|
||||
&self,
|
||||
handle: impl AsWaitable,
|
||||
interest: Event,
|
||||
mode: PollMode,
|
||||
) -> io::Result<()>;
|
||||
|
||||
/// Remove a waitable handle from this poller.
|
||||
///
|
||||
/// This function can be used to remove a waitable handle from the poller. The handle must
|
||||
/// have been previously added to the poller using [`add_waitable`].
|
||||
///
|
||||
/// [`add_waitable`]: Self::add_waitable
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use polling::{Poller, Event, PollMode};
|
||||
/// use polling::os::iocp::PollerIocpExt;
|
||||
///
|
||||
/// use std::process::Command;
|
||||
///
|
||||
/// // Spawn a new process.
|
||||
/// let mut child = Command::new("echo")
|
||||
/// .arg("Hello, world!")
|
||||
/// .spawn()
|
||||
/// .unwrap();
|
||||
///
|
||||
/// // Create a new poller.
|
||||
/// let poller = Poller::new().unwrap();
|
||||
///
|
||||
/// // Add the child process to the poller.
|
||||
/// unsafe {
|
||||
/// poller.add_waitable(&child, Event::all(0), PollMode::Oneshot).unwrap();
|
||||
/// }
|
||||
///
|
||||
/// // Wait for the child process to exit.
|
||||
/// let mut events = vec![];
|
||||
/// poller.wait(&mut events, None).unwrap();
|
||||
///
|
||||
/// assert_eq!(events.len(), 1);
|
||||
/// assert_eq!(events[0], Event::all(0));
|
||||
///
|
||||
/// // Remove the waitable handle.
|
||||
/// poller.remove_waitable(&child).unwrap();
|
||||
/// ```
|
||||
fn remove_waitable(&self, handle: impl AsWaitable) -> io::Result<()>;
|
||||
}
|
||||
|
||||
impl PollerIocpExt for Poller {
|
||||
fn post(&self, packet: CompletionPacket) -> io::Result<()> {
|
||||
self.poller.post(packet)
|
||||
}
|
||||
|
||||
unsafe fn add_waitable(
|
||||
&self,
|
||||
handle: impl AsRawWaitable,
|
||||
event: Event,
|
||||
mode: PollMode,
|
||||
) -> io::Result<()> {
|
||||
self.poller
|
||||
.add_waitable(handle.as_raw_handle(), event, mode)
|
||||
}
|
||||
|
||||
fn modify_waitable(
|
||||
&self,
|
||||
handle: impl AsWaitable,
|
||||
interest: Event,
|
||||
mode: PollMode,
|
||||
) -> io::Result<()> {
|
||||
self.poller
|
||||
.modify_waitable(handle.as_waitable().as_raw_handle(), interest, mode)
|
||||
}
|
||||
|
||||
fn remove_waitable(&self, handle: impl AsWaitable) -> io::Result<()> {
|
||||
self.poller
|
||||
.remove_waitable(handle.as_waitable().as_raw_handle())
|
||||
}
|
||||
}
|
||||
|
||||
/// A type that represents a waitable handle.
|
||||
pub trait AsRawWaitable {
|
||||
/// Returns the raw handle of this waitable.
|
||||
fn as_raw_handle(&self) -> RawHandle;
|
||||
}
|
||||
|
||||
impl AsRawWaitable for RawHandle {
|
||||
fn as_raw_handle(&self) -> RawHandle {
|
||||
*self
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsRawHandle + ?Sized> AsRawWaitable for &T {
|
||||
fn as_raw_handle(&self) -> RawHandle {
|
||||
AsRawHandle::as_raw_handle(*self)
|
||||
}
|
||||
}
|
||||
|
||||
/// A type that represents a waitable handle.
|
||||
pub trait AsWaitable: AsHandle {
|
||||
/// Returns the raw handle of this waitable.
|
||||
fn as_waitable(&self) -> BorrowedHandle<'_> {
|
||||
self.as_handle()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsHandle + ?Sized> AsWaitable for T {}
|
||||
|
|
|
@ -0,0 +1,138 @@
|
|||
//! Tests for the waitable polling on Windows.
|
||||
|
||||
#![cfg(windows)]
|
||||
|
||||
use polling::os::iocp::PollerIocpExt;
|
||||
use polling::{Event, PollMode, Poller};
|
||||
|
||||
use windows_sys::Win32::Foundation::CloseHandle;
|
||||
use windows_sys::Win32::System::Threading::{CreateEventW, ResetEvent, SetEvent};
|
||||
|
||||
use std::io;
|
||||
use std::os::windows::io::{AsRawHandle, RawHandle};
|
||||
use std::os::windows::prelude::{AsHandle, BorrowedHandle};
|
||||
use std::time::Duration;
|
||||
|
||||
/// A basic wrapper around the Windows event object.
|
||||
struct EventHandle(RawHandle);
|
||||
|
||||
impl Drop for EventHandle {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
CloseHandle(self.0 as _);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EventHandle {
|
||||
fn new(manual_reset: bool) -> io::Result<Self> {
|
||||
let handle = unsafe {
|
||||
CreateEventW(
|
||||
std::ptr::null_mut(),
|
||||
manual_reset as _,
|
||||
false as _,
|
||||
std::ptr::null(),
|
||||
)
|
||||
};
|
||||
|
||||
if handle == 0 {
|
||||
Err(io::Error::last_os_error())
|
||||
} else {
|
||||
Ok(Self(handle as _))
|
||||
}
|
||||
}
|
||||
|
||||
/// Reset the event object.
|
||||
fn reset(&self) -> io::Result<()> {
|
||||
if unsafe { ResetEvent(self.0 as _) } != 0 {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(io::Error::last_os_error())
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the event object.
|
||||
fn set(&self) -> io::Result<()> {
|
||||
if unsafe { SetEvent(self.0 as _) } != 0 {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(io::Error::last_os_error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRawHandle for EventHandle {
|
||||
fn as_raw_handle(&self) -> RawHandle {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl AsHandle for EventHandle {
|
||||
fn as_handle(&self) -> BorrowedHandle<'_> {
|
||||
unsafe { BorrowedHandle::borrow_raw(self.0) }
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
let poller = Poller::new().unwrap();
|
||||
|
||||
let event = EventHandle::new(true).unwrap();
|
||||
|
||||
unsafe {
|
||||
poller
|
||||
.add_waitable(&event, Event::all(0), PollMode::Oneshot)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let mut events = vec![];
|
||||
poller
|
||||
.wait(&mut events, Some(Duration::from_millis(100)))
|
||||
.unwrap();
|
||||
|
||||
assert!(events.is_empty());
|
||||
|
||||
// Signal the event.
|
||||
event.set().unwrap();
|
||||
|
||||
poller
|
||||
.wait(&mut events, Some(Duration::from_millis(100)))
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(events.len(), 1);
|
||||
assert_eq!(events[0], Event::all(0));
|
||||
|
||||
// Interest should be cleared.
|
||||
events.clear();
|
||||
poller
|
||||
.wait(&mut events, Some(Duration::from_millis(100)))
|
||||
.unwrap();
|
||||
|
||||
assert!(events.is_empty());
|
||||
|
||||
// If we modify the waitable, it should be added again.
|
||||
poller
|
||||
.modify_waitable(&event, Event::all(0), PollMode::Oneshot)
|
||||
.unwrap();
|
||||
|
||||
events.clear();
|
||||
poller
|
||||
.wait(&mut events, Some(Duration::from_millis(100)))
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(events.len(), 1);
|
||||
assert_eq!(events[0], Event::all(0));
|
||||
|
||||
// If we reset the event, it should not be signaled.
|
||||
event.reset().unwrap();
|
||||
poller
|
||||
.modify_waitable(&event, Event::all(0), PollMode::Oneshot)
|
||||
.unwrap();
|
||||
|
||||
events.clear();
|
||||
poller
|
||||
.wait(&mut events, Some(Duration::from_millis(100)))
|
||||
.unwrap();
|
||||
|
||||
assert!(events.is_empty());
|
||||
}
|
Loading…
Reference in New Issue