From ff4c96fe4dc72f4233e17e016c36051b2f6e0aa5 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Mon, 12 Feb 2024 06:27:42 -0800 Subject: [PATCH] m: Port to event-listener v5.0.0 cc smol-rs/event-listener#105 Signed-off-by: John Nunley --- .github/workflows/ci.yml | 5 ++++ Cargo.toml | 6 ++--- src/barrier.rs | 11 ++++----- src/lib.rs | 1 + src/mutex.rs | 29 ++++++++++++----------- src/once_cell.rs | 27 +++++++++++----------- src/rwlock/futures.rs | 11 ++++++--- src/rwlock/raw.rs | 50 +++++++++++++++++++++++----------------- src/semaphore.rs | 33 ++++++++++++++++---------- tests/barrier.rs | 1 + tests/rwlock.rs | 6 +++-- 11 files changed, 105 insertions(+), 75 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6b8673e..fd0c397 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -58,6 +58,11 @@ jobs: msrv: runs-on: ubuntu-latest + strategy: + matrix: + # When updating this, the reminder to update the minimum supported + # Rust version in Cargo.toml. + rust: ['1.61'] steps: - uses: actions/checkout@v4 - name: Install cargo-hack diff --git a/Cargo.toml b/Cargo.toml index 8d4b683..88a24be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,8 +15,8 @@ categories = ["asynchronous", "concurrency"] exclude = ["/.*"] [dependencies] -event-listener = { version = "4.0.0", default-features = false } -event-listener-strategy = { version = "0.4.0", default-features = false } +event-listener = { version = "5.0.0", default-features = false } +event-listener-strategy = { version = "0.5.0", default-features = false } pin-project-lite = "0.2.11" [features] @@ -24,7 +24,7 @@ default = ["std"] std = ["event-listener/std", "event-listener-strategy/std"] [dev-dependencies] -async-channel = "2.0.0" +async-channel = "2.2.0" fastrand = "2.0.0" futures-lite = "2.0.0" waker-fn = "1.1.0" diff --git a/src/barrier.rs b/src/barrier.rs index 519838c..8d06fc1 100644 --- a/src/barrier.rs +++ b/src/barrier.rs @@ -82,7 +82,7 @@ impl Barrier { BarrierWait::_new(BarrierWaitInner { barrier: self, lock: Some(self.state.lock()), - evl: EventListener::new(), + evl: None, state: WaitState::Initial, }) } @@ -148,8 +148,7 @@ pin_project_lite::pin_project! { lock: Option>, // An event listener for the `barrier.event` event. - #[pin] - evl: EventListener, + evl: Option, // The current state of the future. state: WaitState, @@ -200,7 +199,7 @@ impl EventListenerFuture for BarrierWaitInner<'_> { if state.count < this.barrier.n { // We need to wait for the event. - this.evl.as_mut().listen(&this.barrier.event); + *this.evl = Some(this.barrier.event.listen()); *this.state = WaitState::Waiting { local_gen }; } else { // We are the last one. @@ -212,7 +211,7 @@ impl EventListenerFuture for BarrierWaitInner<'_> { } WaitState::Waiting { local_gen } => { - ready!(strategy.poll(this.evl.as_mut(), cx)); + ready!(strategy.poll(this.evl, cx)); // We are now re-acquiring the mutex. this.lock.as_mut().set(Some(this.barrier.state.lock())); @@ -233,7 +232,7 @@ impl EventListenerFuture for BarrierWaitInner<'_> { if *local_gen == state.generation_id && state.count < this.barrier.n { // We need to wait for the event again. - this.evl.as_mut().listen(&this.barrier.event); + *this.evl = Some(this.barrier.event.listen()); *this.state = WaitState::Waiting { local_gen: *local_gen, }; diff --git a/src/lib.rs b/src/lib.rs index e0aadde..6d64aa3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,6 +58,7 @@ macro_rules! ready { /// Pins a variable on the stack. /// /// TODO: Drop in favor of `core::pin::pin`, once MSRV is bumped to 1.68. +#[cfg(all(feature = "std", not(target_family = "wasm")))] macro_rules! pin { ($($x:ident),* $(,)?) => { $( diff --git a/src/mutex.rs b/src/mutex.rs index 5bad082..4dc6bac 100644 --- a/src/mutex.rs +++ b/src/mutex.rs @@ -1,7 +1,7 @@ use core::borrow::Borrow; use core::cell::UnsafeCell; use core::fmt; -use core::marker::PhantomData; +use core::marker::{PhantomData, PhantomPinned}; use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::sync::atomic::{AtomicUsize, Ordering}; @@ -445,8 +445,7 @@ pin_project_lite::pin_project! { mutex: Option, // The event listener waiting on the mutex. - #[pin] - listener: EventListener, + listener: Option, // The point at which the mutex lock was started. start: Start, @@ -457,6 +456,10 @@ pin_project_lite::pin_project! { // Capture the `T` lifetime. #[pin] _marker: PhantomData, + + // Keeping this type `!Unpin` enables future optimizations. + #[pin] + _pin: PhantomPinned } impl>> PinnedDrop for AcquireSlow { @@ -477,18 +480,16 @@ impl>> AcquireSlow { /// Create a new `AcquireSlow` future. #[cold] fn new(mutex: B) -> Self { - // Create a new instance of the listener. - let listener = { EventListener::new() }; - AcquireSlow { mutex: Some(mutex), - listener, + listener: None, start: Start { #[cfg(all(feature = "std", not(target_family = "wasm")))] start: None, }, starved: false, _marker: PhantomData, + _pin: PhantomPinned, } } @@ -517,7 +518,7 @@ impl>> EventListenerFuture for AcquireSlow strategy: &mut S, context: &mut S::Context, ) -> Poll { - let mut this = self.as_mut().project(); + let this = self.as_mut().project(); #[cfg(all(feature = "std", not(target_family = "wasm")))] let start = *this.start.start.get_or_insert_with(Instant::now); let mutex = Borrow::>::borrow( @@ -528,8 +529,8 @@ impl>> EventListenerFuture for AcquireSlow if !*this.starved { loop { // Start listening for events. - if !this.listener.is_listening() { - this.listener.as_mut().listen(&mutex.lock_ops); + if this.listener.is_none() { + *this.listener = Some(mutex.lock_ops.listen()); // Try locking if nobody is being starved. match mutex @@ -547,7 +548,7 @@ impl>> EventListenerFuture for AcquireSlow _ => break, } } else { - ready!(strategy.poll(this.listener.as_mut(), context)); + ready!(strategy.poll(this.listener, context)); // Try locking if nobody is being starved. match mutex @@ -591,9 +592,9 @@ impl>> EventListenerFuture for AcquireSlow // Fairer locking loop. loop { - if !this.listener.is_listening() { + if this.listener.is_none() { // Start listening for events. - this.listener.as_mut().listen(&mutex.lock_ops); + *this.listener = Some(mutex.lock_ops.listen()); // Try locking if nobody else is being starved. match mutex @@ -615,7 +616,7 @@ impl>> EventListenerFuture for AcquireSlow } } else { // Wait for a notification. - ready!(strategy.poll(this.listener.as_mut(), context)); + ready!(strategy.poll(this.listener, context)); // Try acquiring the lock without waiting for others. if mutex.state.fetch_or(1, Ordering::Acquire) % 2 == 0 { diff --git a/src/once_cell.rs b/src/once_cell.rs index 4aaeb9f..8d9485d 100644 --- a/src/once_cell.rs +++ b/src/once_cell.rs @@ -9,9 +9,12 @@ use core::sync::atomic::{AtomicUsize, Ordering}; #[cfg(all(feature = "std", not(target_family = "wasm")))] use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; -use event_listener::{Event, EventListener}; +use event_listener::Event; use event_listener_strategy::{NonBlocking, Strategy}; +#[cfg(all(feature = "std", not(target_family = "wasm")))] +use event_listener::Listener; + /// The current state of the `OnceCell`. #[derive(Copy, Clone, PartialEq, Eq)] #[repr(usize)] @@ -274,9 +277,7 @@ impl OnceCell { } // Slow path: wait for the value to be initialized. - let listener = EventListener::new(); - pin!(listener); - listener.as_mut().listen(&self.passive_waiters); + event_listener::listener!(self.passive_waiters => listener); // Try again. if let Some(value) = self.get() { @@ -329,9 +330,7 @@ impl OnceCell { } // Slow path: wait for the value to be initialized. - let listener = EventListener::new(); - pin!(listener); - listener.as_mut().listen(&self.passive_waiters); + event_listener::listener!(self.passive_waiters => listener); // Try again. if let Some(value) = self.get() { @@ -503,10 +502,11 @@ impl OnceCell { /// ``` #[cfg(all(feature = "std", not(target_family = "wasm")))] pub fn get_or_init_blocking(&self, closure: impl FnOnce() -> T + Unpin) -> &T { - match self.get_or_try_init_blocking(move || { + let result = self.get_or_try_init_blocking(move || { let result: Result = Ok(closure()); result - }) { + }); + match result { Ok(value) => value, Err(infallible) => match infallible {}, } @@ -591,8 +591,7 @@ impl OnceCell { strategy: &mut impl for<'a> Strategy<'a>, ) -> Result<(), E> { // The event listener we're currently waiting on. - let event_listener = EventListener::new(); - pin!(event_listener); + let mut event_listener = None; let mut closure = Some(closure); @@ -611,10 +610,10 @@ impl OnceCell { // but we do not have the ability to initialize it. // // We need to wait the initialization to complete. - if event_listener.is_listening() { - strategy.wait(event_listener.as_mut()).await; + if let Some(listener) = event_listener.take() { + strategy.wait(listener).await; } else { - event_listener.as_mut().listen(&self.active_initializers); + event_listener = Some(self.active_initializers.listen()); } } State::Uninitialized => { diff --git a/src/rwlock/futures.rs b/src/rwlock/futures.rs index c759157..3904c81 100644 --- a/src/rwlock/futures.rs +++ b/src/rwlock/futures.rs @@ -426,13 +426,18 @@ pin_project_lite::pin_project! { impl PinnedDrop for UpgradeArcInner { fn drop(this: Pin<&mut Self>) { let this = this.project(); - if !this.raw.is_ready() { + let is_ready = this.raw.is_ready(); + + // SAFETY: The drop impl for raw assumes that it is pinned. + unsafe { + ManuallyDrop::drop(this.raw.get_unchecked_mut()); + } + + if !is_ready { // SAFETY: we drop the `Arc` (decrementing the reference count) // only if this future was cancelled before returning an // upgraded lock. unsafe { - // SAFETY: The drop impl for raw assumes that it is pinned. - ManuallyDrop::drop(this.raw.get_unchecked_mut()); ManuallyDrop::drop(this.lock); }; } diff --git a/src/rwlock/raw.rs b/src/rwlock/raw.rs index 88583c8..6e96a53 100644 --- a/src/rwlock/raw.rs +++ b/src/rwlock/raw.rs @@ -6,6 +6,7 @@ //! the locking code only once, and also lets us make //! [`RwLockReadGuard`](super::RwLockReadGuard) covariant in `T`. +use core::marker::PhantomPinned; use core::mem::forget; use core::pin::Pin; use core::sync::atomic::{AtomicUsize, Ordering}; @@ -86,7 +87,8 @@ impl RawRwLock { RawRead { lock: self, state: self.state.load(Ordering::Acquire), - listener: EventListener::new(), + listener: None, + _pin: PhantomPinned, } } @@ -161,7 +163,7 @@ impl RawRwLock { pub(super) fn write(&self) -> RawWrite<'_> { RawWrite { lock: self, - no_readers: EventListener::new(), + no_readers: None, state: WriteState::Acquiring { lock: self.mutex.lock(), }, @@ -193,7 +195,8 @@ impl RawRwLock { RawUpgrade { lock: Some(self), - listener: EventListener::new(), + listener: None, + _pin: PhantomPinned, } } @@ -292,8 +295,11 @@ pin_project_lite::pin_project! { state: usize, // The listener for the "no writers" event. + listener: Option, + + // Making this type `!Unpin` enables future optimizations. #[pin] - listener: EventListener, + _pin: PhantomPinned } } @@ -305,7 +311,7 @@ impl<'a> EventListenerFuture for RawRead<'a> { strategy: &mut S, cx: &mut S::Context, ) -> Poll<()> { - let mut this = self.project(); + let this = self.project(); loop { if *this.state & WRITER_BIT == 0 { @@ -327,14 +333,14 @@ impl<'a> EventListenerFuture for RawRead<'a> { } } else { // Start listening for "no writer" events. - let load_ordering = if !this.listener.is_listening() { - this.listener.as_mut().listen(&this.lock.no_writer); + let load_ordering = if this.listener.is_none() { + *this.listener = Some(this.lock.no_writer.listen()); // Make sure there really is no writer. Ordering::SeqCst } else { // Wait for the writer to finish. - ready!(strategy.poll(this.listener.as_mut(), cx)); + ready!(strategy.poll(this.listener, cx)); // Notify the next reader waiting in list. this.lock.no_writer.notify(1); @@ -409,8 +415,7 @@ pin_project_lite::pin_project! { pub(super) lock: &'a RawRwLock, // Our listener for the "no readers" event. - #[pin] - no_readers: EventListener, + no_readers: Option, // Current state fof this future. #[pin] @@ -473,12 +478,12 @@ impl<'a> EventListenerFuture for RawWrite<'a> { } // Start waiting for the readers to finish. - this.no_readers.as_mut().listen(&this.lock.no_readers); + *this.no_readers = Some(this.lock.no_readers.listen()); this.state.as_mut().set(WriteState::WaitingReaders); } WriteStateProj::WaitingReaders => { - let load_ordering = if this.no_readers.is_listening() { + let load_ordering = if this.no_readers.is_some() { Ordering::Acquire } else { Ordering::SeqCst @@ -492,12 +497,12 @@ impl<'a> EventListenerFuture for RawWrite<'a> { } // Wait for the readers to finish. - if !this.no_readers.is_listening() { + if this.no_readers.is_none() { // Register a listener. - this.no_readers.as_mut().listen(&this.lock.no_readers); + *this.no_readers = Some(this.lock.no_readers.listen()); } else { // Wait for the readers to finish. - ready!(strategy.poll(this.no_readers.as_mut(), cx)); + ready!(strategy.poll(this.no_readers, cx)); }; } WriteStateProj::Acquired => panic!("Write lock already acquired"), @@ -513,8 +518,11 @@ pin_project_lite::pin_project! { lock: Option<&'a RawRwLock>, // The event listener we are waiting on. + listener: Option, + + // Keeping this future `!Unpin` enables future optimizations. #[pin] - listener: EventListener, + _pin: PhantomPinned } impl PinnedDrop for RawUpgrade<'_> { @@ -539,12 +547,12 @@ impl<'a> EventListenerFuture for RawUpgrade<'a> { strategy: &mut S, cx: &mut S::Context, ) -> Poll<&'a RawRwLock> { - let mut this = self.project(); + let this = self.project(); let lock = this.lock.expect("cannot poll future after completion"); // If there are readers, we need to wait for them to finish. loop { - let load_ordering = if this.listener.is_listening() { + let load_ordering = if this.listener.is_some() { Ordering::Acquire } else { Ordering::SeqCst @@ -557,12 +565,12 @@ impl<'a> EventListenerFuture for RawUpgrade<'a> { } // If there are readers, wait for them to finish. - if !this.listener.is_listening() { + if this.listener.is_none() { // Start listening for "no readers" events. - this.listener.as_mut().listen(&lock.no_readers); + *this.listener = Some(lock.no_readers.listen()); } else { // Wait for the readers to finish. - ready!(strategy.poll(this.listener.as_mut(), cx)); + ready!(strategy.poll(this.listener, cx)); }; } diff --git a/src/semaphore.rs b/src/semaphore.rs index 4f28055..cd9aa7a 100644 --- a/src/semaphore.rs +++ b/src/semaphore.rs @@ -1,4 +1,5 @@ use core::fmt; +use core::marker::PhantomPinned; use core::mem; use core::pin::Pin; use core::sync::atomic::{AtomicUsize, Ordering}; @@ -88,7 +89,8 @@ impl Semaphore { pub fn acquire(&self) -> Acquire<'_> { Acquire::_new(AcquireInner { semaphore: self, - listener: EventListener::new(), + listener: None, + _pin: PhantomPinned, }) } @@ -176,7 +178,8 @@ impl Semaphore { pub fn acquire_arc(self: &Arc) -> AcquireArc { AcquireArc::_new(AcquireArcInner { semaphore: self.clone(), - listener: EventListener::new(), + listener: None, + _pin: PhantomPinned, }) } @@ -246,8 +249,11 @@ pin_project_lite::pin_project! { semaphore: &'a Semaphore, // The listener waiting on the semaphore. + listener: Option, + + // Keeping this future `!Unpin` enables future optimizations. #[pin] - listener: EventListener, + _pin: PhantomPinned } } @@ -265,17 +271,17 @@ impl<'a> EventListenerFuture for AcquireInner<'a> { strategy: &mut S, cx: &mut S::Context, ) -> Poll { - let mut this = self.project(); + let this = self.project(); loop { match this.semaphore.try_acquire() { Some(guard) => return Poll::Ready(guard), None => { // Wait on the listener. - if !this.listener.is_listening() { - this.listener.as_mut().listen(&this.semaphore.event); + if this.listener.is_none() { + *this.listener = Some(this.semaphore.event.listen()); } else { - ready!(strategy.poll(this.listener.as_mut(), cx)); + ready!(strategy.poll(this.listener, cx)); } } } @@ -296,8 +302,11 @@ pin_project_lite::pin_project! { semaphore: Arc, // The listener waiting on the semaphore. + listener: Option, + + // Keeping this future `!Unpin` enables future optimizations. #[pin] - listener: EventListener, + _pin: PhantomPinned } } @@ -315,17 +324,17 @@ impl EventListenerFuture for AcquireArcInner { strategy: &mut S, cx: &mut S::Context, ) -> Poll { - let mut this = self.project(); + let this = self.project(); loop { match this.semaphore.try_acquire_arc() { Some(guard) => return Poll::Ready(guard), None => { // Wait on the listener. - if !this.listener.is_listening() { - this.listener.as_mut().listen(&this.semaphore.event); + if this.listener.is_none() { + *this.listener = Some(this.semaphore.event.listen()); } else { - ready!(strategy.poll(this.listener.as_mut(), cx)); + ready!(strategy.poll(this.listener, cx)); } } } diff --git a/tests/barrier.rs b/tests/barrier.rs index 657478c..b79209a 100644 --- a/tests/barrier.rs +++ b/tests/barrier.rs @@ -47,6 +47,7 @@ fn smoke() { #[cfg(all(feature = "std", not(target_family = "wasm")))] #[test] +#[cfg_attr(miri, ignore)] fn smoke_blocking() { future::block_on(async move { const N: usize = 10; diff --git a/tests/rwlock.rs b/tests/rwlock.rs index fd38ea0..7ce0b7c 100644 --- a/tests/rwlock.rs +++ b/tests/rwlock.rs @@ -120,7 +120,8 @@ fn get_mut() { assert_eq!(lock.into_inner(), 20); } -#[cfg(not(target_family = "wasm"))] +// Miri bug; this works when async is replaced with blocking +#[cfg(not(any(target_family = "wasm", miri)))] #[test] fn contention() { const N: u32 = 10; @@ -154,7 +155,8 @@ fn contention() { }); } -#[cfg(not(target_family = "wasm"))] +// Miri bug; this works when async is replaced with blocking +#[cfg(not(any(target_family = "wasm", miri)))] #[test] fn contention_arc() { const N: u32 = 10;