From 86b778074c2c0637b05cc1f9a562c8f50118cf3f Mon Sep 17 00:00:00 2001 From: John Nunley Date: Tue, 19 Dec 2023 16:44:34 -0800 Subject: [PATCH] feat: Move EventListener back onto the heap Minimal amount of changes to make EventListener a heap-allocated type again. The existence of the EventListener implies that it is already listening; accordingly the new() and listen() methods on EventListener have been removed. cc #104 Signed-off-by: John Nunley --- benches/bench.rs | 19 +-- examples/mutex.rs | 8 +- src/lib.rs | 313 ++++++++++------------------------------------ src/notify.rs | 8 +- tests/notify.rs | 73 +++++------ 5 files changed, 112 insertions(+), 309 deletions(-) diff --git a/benches/bench.rs b/benches/bench.rs index 55557a6..b9b136d 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -1,31 +1,22 @@ use std::iter; -use std::pin::Pin; use criterion::{criterion_group, criterion_main, Criterion}; -use event_listener::{Event, EventListener}; +use event_listener::Event; const COUNT: usize = 8000; fn bench_events(c: &mut Criterion) { c.bench_function("notify_and_wait", |b| { let ev = Event::new(); - let mut handles = iter::repeat_with(EventListener::new) - .take(COUNT) - .collect::>(); + let mut handles = Vec::with_capacity(COUNT); b.iter(|| { - for handle in &mut handles { - // SAFETY: The handle is not moved out. - let listener = unsafe { Pin::new_unchecked(handle) }; - listener.listen(&ev); - } + handles.extend(iter::repeat_with(|| ev.listen()).take(COUNT)); ev.notify(COUNT); - for handle in &mut handles { - // SAFETY: The handle is not moved out. - let listener = unsafe { Pin::new_unchecked(handle) }; - listener.wait(); + for handle in handles.drain(..) { + handle.wait(); } }); }); diff --git a/examples/mutex.rs b/examples/mutex.rs index a2b543c..bde5ccc 100644 --- a/examples/mutex.rs +++ b/examples/mutex.rs @@ -65,9 +65,9 @@ mod example { // Start listening and then try locking again. listener = Some(self.lock_ops.listen()); } - Some(mut l) => { + Some(l) => { // Wait until a notification is received. - l.as_mut().wait(); + l.wait(); } } } @@ -90,9 +90,9 @@ mod example { // Start listening and then try locking again. listener = Some(self.lock_ops.listen()); } - Some(mut l) => { + Some(l) => { // Wait until a notification is received. - l.as_mut().wait_deadline(deadline)?; + l.wait_deadline(deadline)?; } } } diff --git a/src/lib.rs b/src/lib.rs index 969f807..3c2e362 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,7 +56,7 @@ //! } //! //! // Wait for a notification and continue the loop. -//! listener.as_mut().wait(); +//! listener.wait(); //! } //! ``` //! @@ -264,12 +264,12 @@ impl Event { /// /// The above example is equivalent to this code: /// - /// ``` + /// ```no_compile /// use event_listener::{Event, EventListener}; /// /// let event = Event::new(); /// let mut listener = Box::pin(EventListener::new()); - /// listener.as_mut().listen(&event); + /// listener.listen(&event); /// ``` /// /// It creates a new listener, pins it to the heap, and inserts it into the linked list @@ -279,10 +279,18 @@ impl Event { /// allocated. However, users of this `new` method must be careful to ensure that the /// [`EventListener`] is `listen`ing before waiting on it; panics may occur otherwise. #[cold] - pub fn listen(&self) -> Pin>> { - let mut listener = Box::pin(EventListener::new()); - listener.as_mut().listen(self); - listener + pub fn listen(&self) -> EventListener { + let inner = ManuallyDrop::new(unsafe { Arc::from_raw(self.inner()) }); + + // Allocate the listener on the heap and insert it. + let mut listener = Box::pin(Listener { + event: Arc::clone(&inner), + listener: None, + }); + inner.insert(listener.as_mut().project().listener); + + // Return the listener. + EventListener { listener } } /// Notifies a number of active listeners. @@ -712,94 +720,25 @@ impl Drop for Event { } } -pin_project_lite::pin_project! { - /// A guard waiting for a notification from an [`Event`]. - /// - /// There are two ways for a listener to wait for a notification: - /// - /// 1. In an asynchronous manner using `.await`. - /// 2. In a blocking manner by calling [`EventListener::wait()`] on it. - /// - /// If a notified listener is dropped without receiving a notification, dropping will notify - /// another active listener. Whether one *additional* listener will be notified depends on what - /// kind of notification was delivered. - /// - /// The listener is not registered into the linked list inside of the [`Event`] by default if - /// it is created via the `new()` method. It needs to be pinned first before being inserted - /// using the `listen()` method. After the listener has begun `listen`ing, the user can - /// `await` it like a future or call `wait()` to block the current thread until it is notified. - /// - /// ## Examples - /// - /// ``` - /// use event_listener::{Event, EventListener}; - /// use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; - /// use std::thread; - /// use std::time::Duration; - /// - /// // Some flag to wait on. - /// let flag = Arc::new(AtomicBool::new(false)); - /// - /// // Create an event to wait on. - /// let event = Arc::new(Event::new()); - /// - /// thread::spawn({ - /// let flag = flag.clone(); - /// let event = event.clone(); - /// move || { - /// thread::sleep(Duration::from_secs(2)); - /// flag.store(true, Ordering::SeqCst); - /// - /// // Wake up the listener. - /// event.notify_additional(std::usize::MAX); - /// } - /// }); - /// - /// let listener = EventListener::new(); - /// - /// // Make sure that the event listener is pinned before doing anything else. - /// // - /// // We pin the listener to the stack here, as it lets us avoid a heap allocation. - /// futures_lite::pin!(listener); - /// - /// // Wait for the flag to become ready. - /// loop { - /// if flag.load(Ordering::Acquire) { - /// // We are done. - /// break; - /// } - /// - /// if listener.is_listening() { - /// // We are inserted into the linked list and we can now wait. - /// listener.as_mut().wait(); - /// } else { - /// // We need to insert ourselves into the list. Since this insertion is an atomic - /// // operation, we should check the flag again before waiting. - /// listener.as_mut().listen(&event); - /// } - /// } - /// ``` - /// - /// The above example is equivalent to the one provided in the crate level example. However, - /// it has some advantages. By directly creating the listener with `EventListener::new()`, - /// we have control over how the listener is handled in memory. We take advantage of this by - /// pinning the `listener` variable to the stack using the [`futures_lite::pin`] macro. In - /// contrast, `Event::listen` binds the listener to the heap. - /// - /// However, this additional power comes with additional responsibility. By default, the - /// event listener is created in an "uninserted" state. This property means that any - /// notifications delivered to the [`Event`] by default will not wake up this listener. - /// Before any notifications can be received, the `listen()` method must be called on - /// `EventListener` to insert it into the list of listeners. After a `.await` or a `wait()` - /// call has completed, `listen()` must be called again if the user is still interested in - /// any events. - /// - /// [`futures_lite::pin`]: https://docs.rs/futures-lite/latest/futures_lite/macro.pin.html - #[project(!Unpin)] // implied by Listener, but can generate better docs - pub struct EventListener { - #[pin] - listener: Listener>>, - } +/// A guard waiting for a notification from an [`Event`]. +/// +/// There are two ways for a listener to wait for a notification: +/// +/// 1. In an asynchronous manner using `.await`. +/// 2. In a blocking manner by calling [`EventListener::wait()`] on it. +/// +/// If a notified listener is dropped without receiving a notification, dropping will notify +/// another active listener. Whether one *additional* listener will be notified depends on what +/// kind of notification was delivered. +/// +/// The listener is not registered into the linked list inside of the [`Event`] by default if +/// it is created via the `new()` method. It needs to be pinned first before being inserted +/// using the `listen()` method. After the listener has begun `listen`ing, the user can +/// `await` it like a future or call `wait()` to block the current thread until it is notified. +/// +/// This structure allocates the listener on the heap. +pub struct EventListener { + listener: Pin>>>>, } unsafe impl Send for EventListener {} @@ -807,104 +746,15 @@ unsafe impl Sync for EventListener {} impl core::panic::UnwindSafe for EventListener {} impl core::panic::RefUnwindSafe for EventListener {} - -impl Default for EventListener { - fn default() -> Self { - Self::new() - } -} +impl Unpin for EventListener {} impl fmt::Debug for EventListener { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("EventListener") - .field("listening", &self.is_listening()) - .finish() + f.debug_struct("EventListener").finish_non_exhaustive() } } impl EventListener { - /// Create a new `EventListener` that will wait for a notification from the given [`Event`]. - /// - /// This function does not register the `EventListener` into the linked list of listeners - /// contained within the [`Event`]. Make sure to call `listen` before `await`ing on - /// this future or calling `wait()`. - /// - /// ## Examples - /// - /// ``` - /// use event_listener::{Event, EventListener}; - /// - /// let event = Event::new(); - /// let listener = EventListener::new(); - /// - /// // Make sure that the listener is pinned and listening before doing anything else. - /// let mut listener = Box::pin(listener); - /// listener.as_mut().listen(&event); - /// ``` - pub fn new() -> Self { - Self { - listener: Listener { - event: None, - listener: None, - }, - } - } - - /// Register this listener into the given [`Event`]. - /// - /// This method can only be called after the listener has been pinned, and must be called before - /// the listener is polled. - /// - /// Notifications that exist when this function is called will be discarded. - pub fn listen(mut self: Pin<&mut Self>, event: &Event) { - let inner = { - let inner = event.inner(); - unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) } - }; - - let ListenerProject { - event, - mut listener, - } = self.as_mut().project().listener.project(); - - // If an event is already registered, make sure to remove it. - if let Some(current_event) = event.as_ref() { - current_event.remove(listener.as_mut(), false); - } - - let inner = event.insert(inner); - inner.insert(listener); - - // Make sure the listener is registered before whatever happens next. - notify::full_fence(); - } - - /// Tell if this [`EventListener`] is currently listening for a notification. - /// - /// # Examples - /// - /// ``` - /// use event_listener::{Event, EventListener}; - /// - /// let event = Event::new(); - /// let mut listener = Box::pin(EventListener::new()); - /// - /// // The listener starts off not listening. - /// assert!(!listener.is_listening()); - /// - /// // After listen() is called, the listener is listening. - /// listener.as_mut().listen(&event); - /// assert!(listener.is_listening()); - /// - /// // Once the future is notified, the listener is no longer listening. - /// event.notify(1); - /// listener.as_mut().wait(); - /// assert!(!listener.is_listening()); - /// ``` - pub fn is_listening(&self) -> bool { - self.listener.listener.is_some() - } - /// Blocks until a notification is received. /// /// # Examples @@ -919,11 +769,11 @@ impl EventListener { /// event.notify(1); /// /// // Receive the notification. - /// listener.as_mut().wait(); + /// listener.wait(); /// ``` #[cfg(all(feature = "std", not(target_family = "wasm")))] - pub fn wait(self: Pin<&mut Self>) -> T { - self.listener().wait_internal(None).unwrap() + pub fn wait(mut self) -> T { + self.listener.as_mut().wait_internal(None).unwrap() } /// Blocks until a notification is received or a timeout is reached. @@ -940,11 +790,12 @@ impl EventListener { /// let mut listener = event.listen(); /// /// // There are no notification so this times out. - /// assert!(listener.as_mut().wait_timeout(Duration::from_secs(1)).is_none()); + /// assert!(listener.wait_timeout(Duration::from_secs(1)).is_none()); /// ``` #[cfg(all(feature = "std", not(target_family = "wasm")))] - pub fn wait_timeout(self: Pin<&mut Self>, timeout: Duration) -> Option { - self.listener() + pub fn wait_timeout(mut self, timeout: Duration) -> Option { + self.listener + .as_mut() .wait_internal(Instant::now().checked_add(timeout)) } @@ -962,11 +813,11 @@ impl EventListener { /// let mut listener = event.listen(); /// /// // There are no notification so this times out. - /// assert!(listener.as_mut().wait_deadline(Instant::now() + Duration::from_secs(1)).is_none()); + /// assert!(listener.wait_deadline(Instant::now() + Duration::from_secs(1)).is_none()); /// ``` #[cfg(all(feature = "std", not(target_family = "wasm")))] - pub fn wait_deadline(self: Pin<&mut Self>, deadline: Instant) -> Option { - self.listener().wait_internal(Some(deadline)) + pub fn wait_deadline(mut self, deadline: Instant) -> Option { + self.listener.as_mut().wait_internal(Some(deadline)) } /// Drops this listener and discards its notification (if any) without notifying another @@ -975,6 +826,7 @@ impl EventListener { /// Returns `true` if a notification was discarded. /// /// # Examples + /// /// ``` /// use event_listener::Event; /// @@ -984,11 +836,11 @@ impl EventListener { /// /// event.notify(1); /// - /// assert!(listener1.as_mut().discard()); - /// assert!(!listener2.as_mut().discard()); + /// assert!(listener1.discard()); + /// assert!(!listener2.discard()); /// ``` - pub fn discard(self: Pin<&mut Self>) -> bool { - self.project().listener.discard() + pub fn discard(mut self) -> bool { + self.listener.as_mut().discard() } /// Returns `true` if this listener listens to the given `Event`. @@ -1005,11 +857,7 @@ impl EventListener { /// ``` #[inline] pub fn listens_to(&self, event: &Event) -> bool { - if let Some(inner) = &self.listener.event { - return ptr::eq::>(&**inner, event.inner.load(Ordering::Acquire)); - } - - false + ptr::eq::>(&*self.listener.event, event.inner.load(Ordering::Acquire)) } /// Returns `true` if both listeners listen to the same `Event`. @@ -1026,27 +874,15 @@ impl EventListener { /// assert!(listener1.same_event(&listener2)); /// ``` pub fn same_event(&self, other: &EventListener) -> bool { - if let (Some(inner1), Some(inner2)) = (self.inner(), other.inner()) { - return ptr::eq::>(&**inner1, &**inner2); - } - - false - } - - fn listener(self: Pin<&mut Self>) -> Pin<&mut Listener>>> { - self.project().listener - } - - fn inner(&self) -> Option<&Arc>> { - self.listener.event.as_ref() + ptr::eq::>(&*self.listener.event, &*other.listener.event) } } impl Future for EventListener { type Output = T; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.listener().poll_internal(cx) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.listener.as_mut().poll_internal(cx) } } @@ -1058,7 +894,7 @@ pin_project_lite::pin_project! { B: Unpin, { // The reference to the original event. - event: Option, + event: B, // The inner state of the listener. // @@ -1075,9 +911,7 @@ pin_project_lite::pin_project! { fn drop(mut this: Pin<&mut Self>) { // If we're being dropped, we need to remove ourself from the list. let this = this.project(); - if let Some(inner) = this.event { - (*inner).borrow().remove(this.listener, true); - } + (*this.event).borrow().remove(this.listener, true); } } } @@ -1110,7 +944,8 @@ impl> + Unpin> Listener { // If the pair isn't accessible, we may be being called in a destructor. // Just create a new pair. let (parker, unparker) = parking::pair(); - self.wait_with_parker(deadline, &parker, TaskRef::Unparker(&unparker)) + self.as_mut() + .wait_with_parker(deadline, &parker, TaskRef::Unparker(&unparker)) }) } @@ -1123,11 +958,7 @@ impl> + Unpin> Listener { unparker: TaskRef<'_>, ) -> Option { let mut this = self.project(); - let inner = (*this - .event - .as_ref() - .expect("must listen() on event listener before waiting")) - .borrow(); + let inner = (*this.event).borrow(); // Set the listener's state to `Task`. if let Some(tag) = inner.register(this.listener.as_mut(), unparker).notified() { @@ -1146,7 +977,7 @@ impl> + Unpin> Listener { if now >= deadline { // Remove our entry and check if we were notified. return inner - .remove(this.listener, false) + .remove(this.listener.as_mut(), false) .expect("We never removed ourself from the list") .notified(); } @@ -1165,28 +996,20 @@ impl> + Unpin> Listener { /// active listener. fn discard(self: Pin<&mut Self>) -> bool { let this = self.project(); - - if let Some(inner) = this.event.as_ref() { - (*inner) - .borrow() - .remove(this.listener, false) - .map_or(false, |state| state.is_notified()) - } else { - false - } + (*this.event) + .borrow() + .remove(this.listener, false) + .map_or(false, |state| state.is_notified()) } /// Poll this listener for a notification. fn poll_internal(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - let inner = match &this.event { - Some(inner) => (*inner).borrow(), - None => panic!(""), - }; + let this = self.project(); + let inner = (*this.event).borrow(); // Try to register the listener. match inner - .register(this.listener.as_mut(), TaskRef::Waker(cx.waker())) + .register(this.listener, TaskRef::Waker(cx.waker())) .notified() { Some(tag) => { diff --git a/src/notify.rs b/src/notify.rs index 9484668..c35151c 100644 --- a/src/notify.rs +++ b/src/notify.rs @@ -494,8 +494,8 @@ pub trait IntoNotification: __private::Sealed { /// event.notify(1.additional().tag(true)); /// event.notify(1.additional().tag(false)); /// - /// assert_eq!(listener1.as_mut().wait(), true); - /// assert_eq!(listener2.as_mut().wait(), false); + /// assert_eq!(listener1.wait(), true); + /// assert_eq!(listener2.wait(), false); /// ``` #[cfg(feature = "std")] fn tag(self, tag: T) -> Tag @@ -528,8 +528,8 @@ pub trait IntoNotification: __private::Sealed { /// event.notify(1.additional().tag_with(|| true)); /// event.notify(1.additional().tag_with(|| false)); /// - /// assert_eq!(listener1.as_mut().wait(), true); - /// assert_eq!(listener2.as_mut().wait(), false); + /// assert_eq!(listener1.wait(), true); + /// assert_eq!(listener2.wait(), false); /// ``` #[cfg(feature = "std")] fn tag_with(self, tag: F) -> TagWith diff --git a/tests/notify.rs b/tests/notify.rs index 319b2b6..490a492 100644 --- a/tests/notify.rs +++ b/tests/notify.rs @@ -10,9 +10,11 @@ use waker_fn::waker_fn; #[cfg(target_family = "wasm")] use wasm_bindgen_test::wasm_bindgen_test as test; -fn is_notified(listener: Pin<&mut EventListener>) -> bool { +fn is_notified(listener: &mut EventListener) -> bool { let waker = waker_fn(|| ()); - listener.poll(&mut Context::from_waker(&waker)).is_ready() + Pin::new(listener) + .poll(&mut Context::from_waker(&waker)) + .is_ready() } #[test] @@ -23,16 +25,16 @@ fn notify() { let mut l2 = event.listen(); let mut l3 = event.listen(); - assert!(!is_notified(l1.as_mut())); - assert!(!is_notified(l2.as_mut())); - assert!(!is_notified(l3.as_mut())); + assert!(!is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); assert_eq!(event.notify(2), 2); assert_eq!(event.notify(1), 0); - assert!(is_notified(l1.as_mut())); - assert!(is_notified(l2.as_mut())); - assert!(!is_notified(l3.as_mut())); + assert!(is_notified(&mut l1)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); } #[test] @@ -47,9 +49,9 @@ fn notify_additional() { assert_eq!(event.notify(1), 0); assert_eq!(event.notify_additional(1), 1); - assert!(is_notified(l1.as_mut())); - assert!(is_notified(l2.as_mut())); - assert!(!is_notified(l3.as_mut())); + assert!(is_notified(&mut l1)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); } #[test] @@ -59,15 +61,15 @@ fn notify_one() { let mut l1 = event.listen(); let mut l2 = event.listen(); - assert!(!is_notified(l1.as_mut())); - assert!(!is_notified(l2.as_mut())); + assert!(!is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); assert_eq!(event.notify(1), 1); - assert!(is_notified(l1.as_mut())); - assert!(!is_notified(l2.as_mut())); + assert!(is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); assert_eq!(event.notify(1), 1); - assert!(is_notified(l2.as_mut())); + assert!(is_notified(&mut l2)); } #[test] @@ -77,12 +79,12 @@ fn notify_all() { let mut l1 = event.listen(); let mut l2 = event.listen(); - assert!(!is_notified(l1.as_mut())); - assert!(!is_notified(l2.as_mut())); + assert!(!is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); assert_eq!(event.notify(usize::MAX), 2); - assert!(is_notified(l1.as_mut())); - assert!(is_notified(l2.as_mut())); + assert!(is_notified(&mut l1)); + assert!(is_notified(&mut l2)); } #[test] @@ -95,8 +97,8 @@ fn drop_notified() { assert_eq!(event.notify(1), 1); drop(l1); - assert!(is_notified(l2.as_mut())); - assert!(!is_notified(l3.as_mut())); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); } #[test] @@ -109,8 +111,8 @@ fn drop_notified2() { assert_eq!(event.notify(2), 2); drop(l1); - assert!(is_notified(l2.as_mut())); - assert!(!is_notified(l3.as_mut())); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); } #[test] @@ -125,9 +127,9 @@ fn drop_notified_additional() { assert_eq!(event.notify_additional(1), 1); assert_eq!(event.notify(2), 1); drop(l1); - assert!(is_notified(l2.as_mut())); - assert!(is_notified(l3.as_mut())); - assert!(!is_notified(l4.as_mut())); + assert!(is_notified(&mut l2)); + assert!(is_notified(&mut l3)); + assert!(!is_notified(&mut l4)); } #[test] @@ -140,8 +142,8 @@ fn drop_non_notified() { assert_eq!(event.notify(1), 1); drop(l3); - assert!(is_notified(l1.as_mut())); - assert!(!is_notified(l2.as_mut())); + assert!(is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); } #[test] @@ -189,16 +191,3 @@ fn notify_all_fair() { .poll(&mut Context::from_waker(&waker3)) .is_ready()); } - -#[test] -fn more_than_one_event() { - let event = Event::new(); - let event2 = Event::new(); - - let mut listener = Box::pin(EventListener::<()>::new()); - listener.as_mut().listen(&event); - listener.as_mut().listen(&event2); - - drop(listener); - event.notify(1); -}