feat: Move EventListener back onto the heap

Minimal amount of changes to make EventListener a heap-allocated type
again. The existence of the EventListener implies that it is already
listening; accordingly the new() and listen() methods on EventListener
have been removed.

cc #104

Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
John Nunley 2023-12-19 16:44:34 -08:00 committed by John Nunley
parent ac18bdf617
commit 86b778074c
5 changed files with 112 additions and 309 deletions

View File

@ -1,31 +1,22 @@
use std::iter;
use std::pin::Pin;
use criterion::{criterion_group, criterion_main, Criterion};
use event_listener::{Event, EventListener};
use event_listener::Event;
const COUNT: usize = 8000;
fn bench_events(c: &mut Criterion) {
c.bench_function("notify_and_wait", |b| {
let ev = Event::new();
let mut handles = iter::repeat_with(EventListener::new)
.take(COUNT)
.collect::<Vec<_>>();
let mut handles = Vec::with_capacity(COUNT);
b.iter(|| {
for handle in &mut handles {
// SAFETY: The handle is not moved out.
let listener = unsafe { Pin::new_unchecked(handle) };
listener.listen(&ev);
}
handles.extend(iter::repeat_with(|| ev.listen()).take(COUNT));
ev.notify(COUNT);
for handle in &mut handles {
// SAFETY: The handle is not moved out.
let listener = unsafe { Pin::new_unchecked(handle) };
listener.wait();
for handle in handles.drain(..) {
handle.wait();
}
});
});

View File

@ -65,9 +65,9 @@ mod example {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(mut l) => {
Some(l) => {
// Wait until a notification is received.
l.as_mut().wait();
l.wait();
}
}
}
@ -90,9 +90,9 @@ mod example {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(mut l) => {
Some(l) => {
// Wait until a notification is received.
l.as_mut().wait_deadline(deadline)?;
l.wait_deadline(deadline)?;
}
}
}

View File

@ -56,7 +56,7 @@
//! }
//!
//! // Wait for a notification and continue the loop.
//! listener.as_mut().wait();
//! listener.wait();
//! }
//! ```
//!
@ -264,12 +264,12 @@ impl<T> Event<T> {
///
/// The above example is equivalent to this code:
///
/// ```
/// ```no_compile
/// use event_listener::{Event, EventListener};
///
/// let event = Event::new();
/// let mut listener = Box::pin(EventListener::new());
/// listener.as_mut().listen(&event);
/// listener.listen(&event);
/// ```
///
/// It creates a new listener, pins it to the heap, and inserts it into the linked list
@ -279,10 +279,18 @@ impl<T> Event<T> {
/// allocated. However, users of this `new` method must be careful to ensure that the
/// [`EventListener`] is `listen`ing before waiting on it; panics may occur otherwise.
#[cold]
pub fn listen(&self) -> Pin<Box<EventListener<T>>> {
let mut listener = Box::pin(EventListener::new());
listener.as_mut().listen(self);
listener
pub fn listen(&self) -> EventListener<T> {
let inner = ManuallyDrop::new(unsafe { Arc::from_raw(self.inner()) });
// Allocate the listener on the heap and insert it.
let mut listener = Box::pin(Listener {
event: Arc::clone(&inner),
listener: None,
});
inner.insert(listener.as_mut().project().listener);
// Return the listener.
EventListener { listener }
}
/// Notifies a number of active listeners.
@ -712,94 +720,25 @@ impl<T> Drop for Event<T> {
}
}
pin_project_lite::pin_project! {
/// A guard waiting for a notification from an [`Event`].
///
/// There are two ways for a listener to wait for a notification:
///
/// 1. In an asynchronous manner using `.await`.
/// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
///
/// If a notified listener is dropped without receiving a notification, dropping will notify
/// another active listener. Whether one *additional* listener will be notified depends on what
/// kind of notification was delivered.
///
/// The listener is not registered into the linked list inside of the [`Event`] by default if
/// it is created via the `new()` method. It needs to be pinned first before being inserted
/// using the `listen()` method. After the listener has begun `listen`ing, the user can
/// `await` it like a future or call `wait()` to block the current thread until it is notified.
///
/// ## Examples
///
/// ```
/// use event_listener::{Event, EventListener};
/// use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
/// use std::thread;
/// use std::time::Duration;
///
/// // Some flag to wait on.
/// let flag = Arc::new(AtomicBool::new(false));
///
/// // Create an event to wait on.
/// let event = Arc::new(Event::new());
///
/// thread::spawn({
/// let flag = flag.clone();
/// let event = event.clone();
/// move || {
/// thread::sleep(Duration::from_secs(2));
/// flag.store(true, Ordering::SeqCst);
///
/// // Wake up the listener.
/// event.notify_additional(std::usize::MAX);
/// }
/// });
///
/// let listener = EventListener::new();
///
/// // Make sure that the event listener is pinned before doing anything else.
/// //
/// // We pin the listener to the stack here, as it lets us avoid a heap allocation.
/// futures_lite::pin!(listener);
///
/// // Wait for the flag to become ready.
/// loop {
/// if flag.load(Ordering::Acquire) {
/// // We are done.
/// break;
/// }
///
/// if listener.is_listening() {
/// // We are inserted into the linked list and we can now wait.
/// listener.as_mut().wait();
/// } else {
/// // We need to insert ourselves into the list. Since this insertion is an atomic
/// // operation, we should check the flag again before waiting.
/// listener.as_mut().listen(&event);
/// }
/// }
/// ```
///
/// The above example is equivalent to the one provided in the crate level example. However,
/// it has some advantages. By directly creating the listener with `EventListener::new()`,
/// we have control over how the listener is handled in memory. We take advantage of this by
/// pinning the `listener` variable to the stack using the [`futures_lite::pin`] macro. In
/// contrast, `Event::listen` binds the listener to the heap.
///
/// However, this additional power comes with additional responsibility. By default, the
/// event listener is created in an "uninserted" state. This property means that any
/// notifications delivered to the [`Event`] by default will not wake up this listener.
/// Before any notifications can be received, the `listen()` method must be called on
/// `EventListener` to insert it into the list of listeners. After a `.await` or a `wait()`
/// call has completed, `listen()` must be called again if the user is still interested in
/// any events.
///
/// [`futures_lite::pin`]: https://docs.rs/futures-lite/latest/futures_lite/macro.pin.html
#[project(!Unpin)] // implied by Listener, but can generate better docs
pub struct EventListener<T = ()> {
#[pin]
listener: Listener<T, Arc<Inner<T>>>,
}
/// A guard waiting for a notification from an [`Event`].
///
/// There are two ways for a listener to wait for a notification:
///
/// 1. In an asynchronous manner using `.await`.
/// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
///
/// If a notified listener is dropped without receiving a notification, dropping will notify
/// another active listener. Whether one *additional* listener will be notified depends on what
/// kind of notification was delivered.
///
/// The listener is not registered into the linked list inside of the [`Event`] by default if
/// it is created via the `new()` method. It needs to be pinned first before being inserted
/// using the `listen()` method. After the listener has begun `listen`ing, the user can
/// `await` it like a future or call `wait()` to block the current thread until it is notified.
///
/// This structure allocates the listener on the heap.
pub struct EventListener<T = ()> {
listener: Pin<Box<Listener<T, Arc<Inner<T>>>>>,
}
unsafe impl<T: Send> Send for EventListener<T> {}
@ -807,104 +746,15 @@ unsafe impl<T: Send> Sync for EventListener<T> {}
impl<T> core::panic::UnwindSafe for EventListener<T> {}
impl<T> core::panic::RefUnwindSafe for EventListener<T> {}
impl<T> Default for EventListener<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> Unpin for EventListener<T> {}
impl<T> fmt::Debug for EventListener<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EventListener")
.field("listening", &self.is_listening())
.finish()
f.debug_struct("EventListener").finish_non_exhaustive()
}
}
impl<T> EventListener<T> {
/// Create a new `EventListener` that will wait for a notification from the given [`Event`].
///
/// This function does not register the `EventListener` into the linked list of listeners
/// contained within the [`Event`]. Make sure to call `listen` before `await`ing on
/// this future or calling `wait()`.
///
/// ## Examples
///
/// ```
/// use event_listener::{Event, EventListener};
///
/// let event = Event::new();
/// let listener = EventListener::new();
///
/// // Make sure that the listener is pinned and listening before doing anything else.
/// let mut listener = Box::pin(listener);
/// listener.as_mut().listen(&event);
/// ```
pub fn new() -> Self {
Self {
listener: Listener {
event: None,
listener: None,
},
}
}
/// Register this listener into the given [`Event`].
///
/// This method can only be called after the listener has been pinned, and must be called before
/// the listener is polled.
///
/// Notifications that exist when this function is called will be discarded.
pub fn listen(mut self: Pin<&mut Self>, event: &Event<T>) {
let inner = {
let inner = event.inner();
unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) }
};
let ListenerProject {
event,
mut listener,
} = self.as_mut().project().listener.project();
// If an event is already registered, make sure to remove it.
if let Some(current_event) = event.as_ref() {
current_event.remove(listener.as_mut(), false);
}
let inner = event.insert(inner);
inner.insert(listener);
// Make sure the listener is registered before whatever happens next.
notify::full_fence();
}
/// Tell if this [`EventListener`] is currently listening for a notification.
///
/// # Examples
///
/// ```
/// use event_listener::{Event, EventListener};
///
/// let event = Event::new();
/// let mut listener = Box::pin(EventListener::new());
///
/// // The listener starts off not listening.
/// assert!(!listener.is_listening());
///
/// // After listen() is called, the listener is listening.
/// listener.as_mut().listen(&event);
/// assert!(listener.is_listening());
///
/// // Once the future is notified, the listener is no longer listening.
/// event.notify(1);
/// listener.as_mut().wait();
/// assert!(!listener.is_listening());
/// ```
pub fn is_listening(&self) -> bool {
self.listener.listener.is_some()
}
/// Blocks until a notification is received.
///
/// # Examples
@ -919,11 +769,11 @@ impl<T> EventListener<T> {
/// event.notify(1);
///
/// // Receive the notification.
/// listener.as_mut().wait();
/// listener.wait();
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub fn wait(self: Pin<&mut Self>) -> T {
self.listener().wait_internal(None).unwrap()
pub fn wait(mut self) -> T {
self.listener.as_mut().wait_internal(None).unwrap()
}
/// Blocks until a notification is received or a timeout is reached.
@ -940,11 +790,12 @@ impl<T> EventListener<T> {
/// let mut listener = event.listen();
///
/// // There are no notification so this times out.
/// assert!(listener.as_mut().wait_timeout(Duration::from_secs(1)).is_none());
/// assert!(listener.wait_timeout(Duration::from_secs(1)).is_none());
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub fn wait_timeout(self: Pin<&mut Self>, timeout: Duration) -> Option<T> {
self.listener()
pub fn wait_timeout(mut self, timeout: Duration) -> Option<T> {
self.listener
.as_mut()
.wait_internal(Instant::now().checked_add(timeout))
}
@ -962,11 +813,11 @@ impl<T> EventListener<T> {
/// let mut listener = event.listen();
///
/// // There are no notification so this times out.
/// assert!(listener.as_mut().wait_deadline(Instant::now() + Duration::from_secs(1)).is_none());
/// assert!(listener.wait_deadline(Instant::now() + Duration::from_secs(1)).is_none());
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub fn wait_deadline(self: Pin<&mut Self>, deadline: Instant) -> Option<T> {
self.listener().wait_internal(Some(deadline))
pub fn wait_deadline(mut self, deadline: Instant) -> Option<T> {
self.listener.as_mut().wait_internal(Some(deadline))
}
/// Drops this listener and discards its notification (if any) without notifying another
@ -975,6 +826,7 @@ impl<T> EventListener<T> {
/// Returns `true` if a notification was discarded.
///
/// # Examples
///
/// ```
/// use event_listener::Event;
///
@ -984,11 +836,11 @@ impl<T> EventListener<T> {
///
/// event.notify(1);
///
/// assert!(listener1.as_mut().discard());
/// assert!(!listener2.as_mut().discard());
/// assert!(listener1.discard());
/// assert!(!listener2.discard());
/// ```
pub fn discard(self: Pin<&mut Self>) -> bool {
self.project().listener.discard()
pub fn discard(mut self) -> bool {
self.listener.as_mut().discard()
}
/// Returns `true` if this listener listens to the given `Event`.
@ -1005,11 +857,7 @@ impl<T> EventListener<T> {
/// ```
#[inline]
pub fn listens_to(&self, event: &Event<T>) -> bool {
if let Some(inner) = &self.listener.event {
return ptr::eq::<Inner<T>>(&**inner, event.inner.load(Ordering::Acquire));
}
false
ptr::eq::<Inner<T>>(&*self.listener.event, event.inner.load(Ordering::Acquire))
}
/// Returns `true` if both listeners listen to the same `Event`.
@ -1026,27 +874,15 @@ impl<T> EventListener<T> {
/// assert!(listener1.same_event(&listener2));
/// ```
pub fn same_event(&self, other: &EventListener<T>) -> bool {
if let (Some(inner1), Some(inner2)) = (self.inner(), other.inner()) {
return ptr::eq::<Inner<T>>(&**inner1, &**inner2);
}
false
}
fn listener(self: Pin<&mut Self>) -> Pin<&mut Listener<T, Arc<Inner<T>>>> {
self.project().listener
}
fn inner(&self) -> Option<&Arc<Inner<T>>> {
self.listener.event.as_ref()
ptr::eq::<Inner<T>>(&*self.listener.event, &*other.listener.event)
}
}
impl<T> Future for EventListener<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.listener().poll_internal(cx)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.listener.as_mut().poll_internal(cx)
}
}
@ -1058,7 +894,7 @@ pin_project_lite::pin_project! {
B: Unpin,
{
// The reference to the original event.
event: Option<B>,
event: B,
// The inner state of the listener.
//
@ -1075,9 +911,7 @@ pin_project_lite::pin_project! {
fn drop(mut this: Pin<&mut Self>) {
// If we're being dropped, we need to remove ourself from the list.
let this = this.project();
if let Some(inner) = this.event {
(*inner).borrow().remove(this.listener, true);
}
(*this.event).borrow().remove(this.listener, true);
}
}
}
@ -1110,7 +944,8 @@ impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
// If the pair isn't accessible, we may be being called in a destructor.
// Just create a new pair.
let (parker, unparker) = parking::pair();
self.wait_with_parker(deadline, &parker, TaskRef::Unparker(&unparker))
self.as_mut()
.wait_with_parker(deadline, &parker, TaskRef::Unparker(&unparker))
})
}
@ -1123,11 +958,7 @@ impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
unparker: TaskRef<'_>,
) -> Option<T> {
let mut this = self.project();
let inner = (*this
.event
.as_ref()
.expect("must listen() on event listener before waiting"))
.borrow();
let inner = (*this.event).borrow();
// Set the listener's state to `Task`.
if let Some(tag) = inner.register(this.listener.as_mut(), unparker).notified() {
@ -1146,7 +977,7 @@ impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
if now >= deadline {
// Remove our entry and check if we were notified.
return inner
.remove(this.listener, false)
.remove(this.listener.as_mut(), false)
.expect("We never removed ourself from the list")
.notified();
}
@ -1165,28 +996,20 @@ impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
/// active listener.
fn discard(self: Pin<&mut Self>) -> bool {
let this = self.project();
if let Some(inner) = this.event.as_ref() {
(*inner)
.borrow()
.remove(this.listener, false)
.map_or(false, |state| state.is_notified())
} else {
false
}
(*this.event)
.borrow()
.remove(this.listener, false)
.map_or(false, |state| state.is_notified())
}
/// Poll this listener for a notification.
fn poll_internal(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
let mut this = self.project();
let inner = match &this.event {
Some(inner) => (*inner).borrow(),
None => panic!(""),
};
let this = self.project();
let inner = (*this.event).borrow();
// Try to register the listener.
match inner
.register(this.listener.as_mut(), TaskRef::Waker(cx.waker()))
.register(this.listener, TaskRef::Waker(cx.waker()))
.notified()
{
Some(tag) => {

View File

@ -494,8 +494,8 @@ pub trait IntoNotification: __private::Sealed {
/// event.notify(1.additional().tag(true));
/// event.notify(1.additional().tag(false));
///
/// assert_eq!(listener1.as_mut().wait(), true);
/// assert_eq!(listener2.as_mut().wait(), false);
/// assert_eq!(listener1.wait(), true);
/// assert_eq!(listener2.wait(), false);
/// ```
#[cfg(feature = "std")]
fn tag<T: Clone>(self, tag: T) -> Tag<Self::Notify, T>
@ -528,8 +528,8 @@ pub trait IntoNotification: __private::Sealed {
/// event.notify(1.additional().tag_with(|| true));
/// event.notify(1.additional().tag_with(|| false));
///
/// assert_eq!(listener1.as_mut().wait(), true);
/// assert_eq!(listener2.as_mut().wait(), false);
/// assert_eq!(listener1.wait(), true);
/// assert_eq!(listener2.wait(), false);
/// ```
#[cfg(feature = "std")]
fn tag_with<T, F>(self, tag: F) -> TagWith<Self::Notify, F>

View File

@ -10,9 +10,11 @@ use waker_fn::waker_fn;
#[cfg(target_family = "wasm")]
use wasm_bindgen_test::wasm_bindgen_test as test;
fn is_notified(listener: Pin<&mut EventListener>) -> bool {
fn is_notified(listener: &mut EventListener) -> bool {
let waker = waker_fn(|| ());
listener.poll(&mut Context::from_waker(&waker)).is_ready()
Pin::new(listener)
.poll(&mut Context::from_waker(&waker))
.is_ready()
}
#[test]
@ -23,16 +25,16 @@ fn notify() {
let mut l2 = event.listen();
let mut l3 = event.listen();
assert!(!is_notified(l1.as_mut()));
assert!(!is_notified(l2.as_mut()));
assert!(!is_notified(l3.as_mut()));
assert!(!is_notified(&mut l1));
assert!(!is_notified(&mut l2));
assert!(!is_notified(&mut l3));
assert_eq!(event.notify(2), 2);
assert_eq!(event.notify(1), 0);
assert!(is_notified(l1.as_mut()));
assert!(is_notified(l2.as_mut()));
assert!(!is_notified(l3.as_mut()));
assert!(is_notified(&mut l1));
assert!(is_notified(&mut l2));
assert!(!is_notified(&mut l3));
}
#[test]
@ -47,9 +49,9 @@ fn notify_additional() {
assert_eq!(event.notify(1), 0);
assert_eq!(event.notify_additional(1), 1);
assert!(is_notified(l1.as_mut()));
assert!(is_notified(l2.as_mut()));
assert!(!is_notified(l3.as_mut()));
assert!(is_notified(&mut l1));
assert!(is_notified(&mut l2));
assert!(!is_notified(&mut l3));
}
#[test]
@ -59,15 +61,15 @@ fn notify_one() {
let mut l1 = event.listen();
let mut l2 = event.listen();
assert!(!is_notified(l1.as_mut()));
assert!(!is_notified(l2.as_mut()));
assert!(!is_notified(&mut l1));
assert!(!is_notified(&mut l2));
assert_eq!(event.notify(1), 1);
assert!(is_notified(l1.as_mut()));
assert!(!is_notified(l2.as_mut()));
assert!(is_notified(&mut l1));
assert!(!is_notified(&mut l2));
assert_eq!(event.notify(1), 1);
assert!(is_notified(l2.as_mut()));
assert!(is_notified(&mut l2));
}
#[test]
@ -77,12 +79,12 @@ fn notify_all() {
let mut l1 = event.listen();
let mut l2 = event.listen();
assert!(!is_notified(l1.as_mut()));
assert!(!is_notified(l2.as_mut()));
assert!(!is_notified(&mut l1));
assert!(!is_notified(&mut l2));
assert_eq!(event.notify(usize::MAX), 2);
assert!(is_notified(l1.as_mut()));
assert!(is_notified(l2.as_mut()));
assert!(is_notified(&mut l1));
assert!(is_notified(&mut l2));
}
#[test]
@ -95,8 +97,8 @@ fn drop_notified() {
assert_eq!(event.notify(1), 1);
drop(l1);
assert!(is_notified(l2.as_mut()));
assert!(!is_notified(l3.as_mut()));
assert!(is_notified(&mut l2));
assert!(!is_notified(&mut l3));
}
#[test]
@ -109,8 +111,8 @@ fn drop_notified2() {
assert_eq!(event.notify(2), 2);
drop(l1);
assert!(is_notified(l2.as_mut()));
assert!(!is_notified(l3.as_mut()));
assert!(is_notified(&mut l2));
assert!(!is_notified(&mut l3));
}
#[test]
@ -125,9 +127,9 @@ fn drop_notified_additional() {
assert_eq!(event.notify_additional(1), 1);
assert_eq!(event.notify(2), 1);
drop(l1);
assert!(is_notified(l2.as_mut()));
assert!(is_notified(l3.as_mut()));
assert!(!is_notified(l4.as_mut()));
assert!(is_notified(&mut l2));
assert!(is_notified(&mut l3));
assert!(!is_notified(&mut l4));
}
#[test]
@ -140,8 +142,8 @@ fn drop_non_notified() {
assert_eq!(event.notify(1), 1);
drop(l3);
assert!(is_notified(l1.as_mut()));
assert!(!is_notified(l2.as_mut()));
assert!(is_notified(&mut l1));
assert!(!is_notified(&mut l2));
}
#[test]
@ -189,16 +191,3 @@ fn notify_all_fair() {
.poll(&mut Context::from_waker(&waker3))
.is_ready());
}
#[test]
fn more_than_one_event() {
let event = Event::new();
let event2 = Event::new();
let mut listener = Box::pin(EventListener::<()>::new());
listener.as_mut().listen(&event);
listener.as_mut().listen(&event2);
drop(listener);
event.notify(1);
}