EventListener is now used through pinning
The EventListener for the upcoming libstd-based implementation needs to be pinned, so this commit sets up the infrastructure for the pinned EventListener. This is a breaking change.
This commit is contained in:
parent
c659cf84f3
commit
996ee4d4f9
|
@ -15,8 +15,8 @@ fn bench_events(c: &mut Criterion) {
|
|||
|
||||
ev.notify(COUNT);
|
||||
|
||||
for handle in handles {
|
||||
handle.wait();
|
||||
for mut handle in handles {
|
||||
handle.as_mut().wait();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
|
|
@ -63,9 +63,9 @@ impl<T> Mutex<T> {
|
|||
// Start listening and then try locking again.
|
||||
listener = Some(self.lock_ops.listen());
|
||||
}
|
||||
Some(l) => {
|
||||
Some(mut l) => {
|
||||
// Wait until a notification is received.
|
||||
l.wait();
|
||||
l.as_mut().wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -88,9 +88,9 @@ impl<T> Mutex<T> {
|
|||
// Start listening and then try locking again.
|
||||
listener = Some(self.lock_ops.listen());
|
||||
}
|
||||
Some(l) => {
|
||||
Some(mut l) => {
|
||||
// Wait until a notification is received.
|
||||
if !l.wait_deadline(deadline) {
|
||||
if !l.as_mut().wait_deadline(deadline) {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
|
177
src/lib.rs
177
src/lib.rs
|
@ -48,7 +48,7 @@
|
|||
//! }
|
||||
//!
|
||||
//! // Start listening for events.
|
||||
//! let listener = event.listen();
|
||||
//! let mut listener = event.listen();
|
||||
//!
|
||||
//! // Check the flag again after creating the listener.
|
||||
//! if flag.load(Ordering::SeqCst) {
|
||||
|
@ -56,7 +56,7 @@
|
|||
//! }
|
||||
//!
|
||||
//! // Wait for a notification and continue the loop.
|
||||
//! listener.wait();
|
||||
//! listener.as_mut().wait();
|
||||
//! }
|
||||
//! ```
|
||||
|
||||
|
@ -76,6 +76,7 @@ use alloc::sync::Arc;
|
|||
use core::borrow::Borrow;
|
||||
use core::fmt;
|
||||
use core::future::Future;
|
||||
use core::marker::PhantomPinned;
|
||||
use core::mem::ManuallyDrop;
|
||||
use core::pin::Pin;
|
||||
use core::ptr;
|
||||
|
@ -173,19 +174,9 @@ impl Event {
|
|||
/// let listener = event.listen();
|
||||
/// ```
|
||||
#[cold]
|
||||
pub fn listen(&self) -> EventListener {
|
||||
let inner = self.inner();
|
||||
|
||||
// Register the listener.
|
||||
let mut listener = EventListener(Listener {
|
||||
event: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
|
||||
listener: None,
|
||||
});
|
||||
|
||||
listener.0.event.insert(&mut listener.0.listener);
|
||||
|
||||
// Make sure the listener is registered before whatever happens next.
|
||||
full_fence();
|
||||
pub fn listen(&self) -> Pin<Box<EventListener>> {
|
||||
let mut listener = Box::pin(EventListener::new(self));
|
||||
listener.as_mut().listen();
|
||||
listener
|
||||
}
|
||||
|
||||
|
@ -452,13 +443,31 @@ impl fmt::Debug for EventListener {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl UnwindSafe for EventListener {}
|
||||
#[cfg(feature = "std")]
|
||||
impl RefUnwindSafe for EventListener {}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl EventListener {
|
||||
/// Create a new `EventListener` that will wait for a notification from the given [`Event`].
|
||||
pub fn new(event: &Event) -> Self {
|
||||
let inner = event.inner();
|
||||
|
||||
let listener = Listener {
|
||||
event: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
|
||||
listener: None,
|
||||
_pin: PhantomPinned,
|
||||
};
|
||||
|
||||
Self(listener)
|
||||
}
|
||||
|
||||
/// Register this listener into the given [`Event`].
|
||||
///
|
||||
/// This method can only be called after the listener has been pinned, and must be called before
|
||||
/// the listener is polled.
|
||||
pub fn listen(self: Pin<&mut Self>) {
|
||||
self.listener().insert();
|
||||
|
||||
// Make sure the listener is registered before whatever happens next.
|
||||
full_fence();
|
||||
}
|
||||
|
||||
/// Blocks until a notification is received.
|
||||
///
|
||||
/// # Examples
|
||||
|
@ -467,16 +476,17 @@ impl EventListener {
|
|||
/// use event_listener::Event;
|
||||
///
|
||||
/// let event = Event::new();
|
||||
/// let listener = event.listen();
|
||||
/// let mut listener = event.listen();
|
||||
///
|
||||
/// // Notify `listener`.
|
||||
/// event.notify(1);
|
||||
///
|
||||
/// // Receive the notification.
|
||||
/// listener.wait();
|
||||
/// listener.as_mut().wait();
|
||||
/// ```
|
||||
pub fn wait(self) {
|
||||
self.0.wait_internal(None);
|
||||
#[cfg(feature = "std")]
|
||||
pub fn wait(self: Pin<&mut Self>) {
|
||||
self.listener().wait_internal(None);
|
||||
}
|
||||
|
||||
/// Blocks until a notification is received or a timeout is reached.
|
||||
|
@ -490,13 +500,15 @@ impl EventListener {
|
|||
/// use event_listener::Event;
|
||||
///
|
||||
/// let event = Event::new();
|
||||
/// let listener = event.listen();
|
||||
/// let mut listener = event.listen();
|
||||
///
|
||||
/// // There are no notification so this times out.
|
||||
/// assert!(!listener.wait_timeout(Duration::from_secs(1)));
|
||||
/// assert!(!listener.as_mut().wait_timeout(Duration::from_secs(1)));
|
||||
/// ```
|
||||
pub fn wait_timeout(self, timeout: Duration) -> bool {
|
||||
self.0.wait_internal(Instant::now().checked_add(timeout))
|
||||
#[cfg(feature = "std")]
|
||||
pub fn wait_timeout(self: Pin<&mut Self>, timeout: Duration) -> bool {
|
||||
self.listener()
|
||||
.wait_internal(Instant::now().checked_add(timeout))
|
||||
}
|
||||
|
||||
/// Blocks until a notification is received or a deadline is reached.
|
||||
|
@ -510,39 +522,36 @@ impl EventListener {
|
|||
/// use event_listener::Event;
|
||||
///
|
||||
/// let event = Event::new();
|
||||
/// let listener = event.listen();
|
||||
/// let mut listener = event.listen();
|
||||
///
|
||||
/// // There are no notification so this times out.
|
||||
/// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1)));
|
||||
/// assert!(!listener.as_mut().wait_deadline(Instant::now() + Duration::from_secs(1)));
|
||||
/// ```
|
||||
#[cfg(feature = "std")]
|
||||
pub fn wait_deadline(self, deadline: Instant) -> bool {
|
||||
self.0.wait_internal(Some(deadline))
|
||||
pub fn wait_deadline(self: Pin<&mut Self>, deadline: Instant) -> bool {
|
||||
self.listener().wait_internal(Some(deadline))
|
||||
}
|
||||
}
|
||||
|
||||
impl EventListener {
|
||||
/// Drops this listener and discards its notification (if any) without notifying another
|
||||
/// active listener.
|
||||
///
|
||||
/// Returns `true` if a notification was discarded. Note that this function may spuriously
|
||||
/// return `false` even if a notification was received by the listener.
|
||||
/// Returns `true` if a notification was discarded.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```
|
||||
/// use event_listener::Event;
|
||||
///
|
||||
/// let event = Event::new();
|
||||
/// let listener1 = event.listen();
|
||||
/// let listener2 = event.listen();
|
||||
/// let mut listener1 = event.listen();
|
||||
/// let mut listener2 = event.listen();
|
||||
///
|
||||
/// event.notify(1);
|
||||
///
|
||||
/// assert!(listener1.discard());
|
||||
/// assert!(!listener2.discard());
|
||||
/// assert!(listener1.as_mut().discard());
|
||||
/// assert!(!listener2.as_mut().discard());
|
||||
/// ```
|
||||
pub fn discard(self) -> bool {
|
||||
self.0.discard()
|
||||
pub fn discard(self: Pin<&mut Self>) -> bool {
|
||||
self.listener().discard()
|
||||
}
|
||||
|
||||
/// Returns `true` if this listener listens to the given `Event`.
|
||||
|
@ -579,8 +588,8 @@ impl EventListener {
|
|||
ptr::eq::<Inner>(&**self.inner(), &**other.inner())
|
||||
}
|
||||
|
||||
fn listener(&mut self) -> &mut Listener<Arc<Inner>> {
|
||||
&mut self.0
|
||||
fn listener(self: Pin<&mut Self>) -> Pin<&mut Listener<Arc<Inner>>> {
|
||||
unsafe { self.map_unchecked_mut(|this| &mut this.0) }
|
||||
}
|
||||
|
||||
fn inner(&self) -> &Arc<Inner> {
|
||||
|
@ -591,7 +600,7 @@ impl EventListener {
|
|||
impl Future for EventListener {
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
self.listener().poll_internal(cx)
|
||||
}
|
||||
}
|
||||
|
@ -602,15 +611,36 @@ struct Listener<B: Borrow<Inner> + Unpin> {
|
|||
|
||||
/// The inner state of the listener.
|
||||
listener: Option<sys::Listener>,
|
||||
|
||||
/// Enforce pinning.
|
||||
_pin: PhantomPinned,
|
||||
}
|
||||
|
||||
unsafe impl<B: Borrow<Inner> + Unpin + Send> Send for Listener<B> {}
|
||||
unsafe impl<B: Borrow<Inner> + Unpin + Sync> Sync for Listener<B> {}
|
||||
|
||||
impl<B: Borrow<Inner> + Unpin> Listener<B> {
|
||||
/// Pin-project this listener.
|
||||
fn project(self: Pin<&mut Self>) -> (&Inner, Pin<&mut Option<sys::Listener>>) {
|
||||
// SAFETY: `event` is `Unpin`, and `listener`'s pin status is preserved
|
||||
unsafe {
|
||||
let Listener {
|
||||
event, listener, ..
|
||||
} = self.get_unchecked_mut();
|
||||
|
||||
((*event).borrow(), Pin::new_unchecked(listener))
|
||||
}
|
||||
}
|
||||
|
||||
/// Register this listener with the event.
|
||||
fn insert(self: Pin<&mut Self>) {
|
||||
let (inner, listener) = self.project();
|
||||
inner.insert(listener);
|
||||
}
|
||||
|
||||
/// Wait until the provided deadline.
|
||||
#[cfg(feature = "std")]
|
||||
fn wait_internal(self, deadline: Option<Instant>) -> bool {
|
||||
fn wait_internal(mut self: Pin<&mut Self>, deadline: Option<Instant>) -> bool {
|
||||
use std::cell::RefCell;
|
||||
|
||||
std::thread_local! {
|
||||
|
@ -619,9 +649,9 @@ impl<B: Borrow<Inner> + Unpin> Listener<B> {
|
|||
}
|
||||
|
||||
// Try to borrow the thread-local parker/unparker pair.
|
||||
let mut this = Some(self);
|
||||
PARKER
|
||||
.try_with({
|
||||
let this = self.as_mut();
|
||||
|parker| {
|
||||
let mut pair = parker
|
||||
.try_borrow_mut()
|
||||
|
@ -631,33 +661,29 @@ impl<B: Borrow<Inner> + Unpin> Listener<B> {
|
|||
(parker, Task::Unparker(unparker))
|
||||
});
|
||||
|
||||
this.take()
|
||||
.unwrap()
|
||||
.wait_with_parker(deadline, parker, unparker.as_task_ref())
|
||||
this.wait_with_parker(deadline, parker, unparker.as_task_ref())
|
||||
}
|
||||
})
|
||||
.unwrap_or_else(|_| {
|
||||
// If the pair isn't accessible, we may be being called in a destructor.
|
||||
// Just create a new pair.
|
||||
let (parker, unparker) = parking::pair();
|
||||
this.take().unwrap().wait_with_parker(
|
||||
deadline,
|
||||
&parker,
|
||||
TaskRef::Unparker(&unparker),
|
||||
)
|
||||
self.wait_with_parker(deadline, &parker, TaskRef::Unparker(&unparker))
|
||||
})
|
||||
}
|
||||
|
||||
/// Wait until the provided deadline using the specified parker/unparker pair.
|
||||
#[cfg(feature = "std")]
|
||||
fn wait_with_parker(
|
||||
mut self,
|
||||
self: Pin<&mut Self>,
|
||||
deadline: Option<Instant>,
|
||||
parker: &parking::Parker,
|
||||
unparker: TaskRef<'_>,
|
||||
) -> bool {
|
||||
let (inner, mut listener) = self.project();
|
||||
|
||||
// Set the listener's state to `Task`.
|
||||
match self.event.borrow().register(&mut self.listener, unparker) {
|
||||
match inner.register(listener.as_mut(), unparker) {
|
||||
Some(true) => {
|
||||
// We were already notified, so we don't need to park.
|
||||
return true;
|
||||
|
@ -683,10 +709,8 @@ impl<B: Borrow<Inner> + Unpin> Listener<B> {
|
|||
let now = Instant::now();
|
||||
if now >= deadline {
|
||||
// Remove our entry and check if we were notified.
|
||||
return self
|
||||
.event
|
||||
.borrow()
|
||||
.remove(&mut self.listener, false)
|
||||
return inner
|
||||
.remove(listener, false)
|
||||
.expect("We never removed ourself from the list")
|
||||
.is_notified();
|
||||
}
|
||||
|
@ -694,10 +718,8 @@ impl<B: Borrow<Inner> + Unpin> Listener<B> {
|
|||
}
|
||||
|
||||
// See if we were notified.
|
||||
if self
|
||||
.event
|
||||
.borrow()
|
||||
.register(&mut self.listener, unparker)
|
||||
if inner
|
||||
.register(listener.as_mut(), unparker)
|
||||
.expect("We never removed ourself from the list")
|
||||
{
|
||||
return true;
|
||||
|
@ -707,21 +729,20 @@ impl<B: Borrow<Inner> + Unpin> Listener<B> {
|
|||
|
||||
/// Drops this listener and discards its notification (if any) without notifying another
|
||||
/// active listener.
|
||||
fn discard(mut self) -> bool {
|
||||
self.event
|
||||
.borrow()
|
||||
.remove(&mut self.listener, false)
|
||||
fn discard(self: Pin<&mut Self>) -> bool {
|
||||
let (inner, listener) = self.project();
|
||||
|
||||
inner
|
||||
.remove(listener, false)
|
||||
.map_or(false, |state| state.is_notified())
|
||||
}
|
||||
|
||||
/// Poll this listener for a notification.
|
||||
fn poll_internal(&mut self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
fn poll_internal(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
|
||||
let (inner, mut listener) = self.project();
|
||||
|
||||
// Try to register the listener.
|
||||
match self
|
||||
.event
|
||||
.borrow()
|
||||
.register(&mut self.listener, TaskRef::Waker(cx.waker()))
|
||||
{
|
||||
match inner.register(listener.as_mut(), TaskRef::Waker(cx.waker())) {
|
||||
Some(true) => {
|
||||
// We were already notified, so we don't need to park.
|
||||
Poll::Ready(())
|
||||
|
@ -743,7 +764,9 @@ impl<B: Borrow<Inner> + Unpin> Listener<B> {
|
|||
impl<B: Borrow<Inner> + Unpin> Drop for Listener<B> {
|
||||
fn drop(&mut self) {
|
||||
// If we're being dropped, we need to remove ourself from the list.
|
||||
self.event.borrow().remove(&mut self.listener, true);
|
||||
let (inner, listener) = unsafe { Pin::new_unchecked(self).project() };
|
||||
|
||||
inner.remove(listener, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ use core::fmt;
|
|||
use core::mem;
|
||||
use core::num::NonZeroUsize;
|
||||
use core::ops;
|
||||
use core::pin::Pin;
|
||||
|
||||
use alloc::vec::Vec;
|
||||
|
||||
|
@ -33,8 +34,8 @@ impl crate::Inner {
|
|||
/// Add a new listener to the list.
|
||||
///
|
||||
/// Does nothing if the list is already registered.
|
||||
pub(crate) fn insert(&self, listener: &mut Option<Listener>) {
|
||||
if listener.is_some() {
|
||||
pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener>>) {
|
||||
if listener.as_ref().as_pin_ref().is_some() {
|
||||
// Already inserted.
|
||||
return;
|
||||
}
|
||||
|
@ -55,8 +56,12 @@ impl crate::Inner {
|
|||
}
|
||||
|
||||
/// Remove a listener from the list.
|
||||
pub(crate) fn remove(&self, listener: &mut Option<Listener>, propogate: bool) -> Option<State> {
|
||||
let state = match listener.take() {
|
||||
pub(crate) fn remove(
|
||||
&self,
|
||||
mut listener: Pin<&mut Option<Listener>>,
|
||||
propogate: bool,
|
||||
) -> Option<State> {
|
||||
let state = match listener.as_mut().take() {
|
||||
Some(Listener::HasNode(key)) => {
|
||||
match self.try_lock() {
|
||||
Some(mut list) => {
|
||||
|
@ -117,11 +122,11 @@ impl crate::Inner {
|
|||
/// isn't inserted, returns `None`.
|
||||
pub(crate) fn register(
|
||||
&self,
|
||||
listener: &mut Option<Listener>,
|
||||
mut listener: Pin<&mut Option<Listener>>,
|
||||
task: TaskRef<'_>,
|
||||
) -> Option<bool> {
|
||||
loop {
|
||||
match listener.take() {
|
||||
match listener.as_mut().take() {
|
||||
Some(Listener::HasNode(key)) => {
|
||||
*listener = Some(Listener::HasNode(key));
|
||||
match self.try_lock() {
|
||||
|
@ -570,7 +575,7 @@ impl ListenerSlab {
|
|||
/// isn't inserted, returns `None`.
|
||||
pub(crate) fn register(
|
||||
&mut self,
|
||||
listener: &mut Option<Listener>,
|
||||
mut listener: Pin<&mut Option<Listener>>,
|
||||
task: TaskRef<'_>,
|
||||
) -> Option<bool> {
|
||||
let key = match *listener {
|
||||
|
|
|
@ -199,7 +199,7 @@ pub trait EventListenerFuture {
|
|||
///
|
||||
/// This function should use the `Strategy::poll` method to poll the future, and proceed
|
||||
/// based on the result.
|
||||
fn poll_with_strategy<S: Strategy>(
|
||||
fn poll_with_strategy<'a, S: Strategy<'a>>(
|
||||
self: Pin<&mut Self>,
|
||||
strategy: &mut S,
|
||||
context: &mut S::Context,
|
||||
|
@ -334,49 +334,46 @@ impl<F: EventListenerFuture + ?Sized> Future for FutureWrapper<F> {
|
|||
/// wait_on(listener, &mut NonBlocking::default()).await;
|
||||
/// # });
|
||||
/// ```
|
||||
pub trait Strategy {
|
||||
pub trait Strategy<'a> {
|
||||
/// The context needed to poll the future.
|
||||
type Context: ?Sized;
|
||||
|
||||
/// The future returned by the [`Strategy::wait`] method.
|
||||
type Future: Future;
|
||||
type Future: Future + 'a;
|
||||
|
||||
/// Poll the event listener until it is ready.
|
||||
fn poll(
|
||||
&mut self,
|
||||
event_listener: EventListener,
|
||||
event_listener: Pin<&mut EventListener>,
|
||||
context: &mut Self::Context,
|
||||
) -> Result<(), EventListener>;
|
||||
) -> Poll<()>;
|
||||
|
||||
/// Wait for the event listener to become ready.
|
||||
fn wait(&mut self, evl: EventListener) -> Self::Future;
|
||||
fn wait(&mut self, evl: Pin<&'a mut EventListener>) -> Self::Future;
|
||||
}
|
||||
|
||||
/// A strategy that uses polling to efficiently wait for an event.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
|
||||
pub struct NonBlocking<'a> {
|
||||
_marker: PhantomData<Context<'a>>,
|
||||
pub struct NonBlocking<'a, 'b> {
|
||||
_marker: PhantomData<(Context<'a>, Pin<&'b mut EventListener>)>,
|
||||
}
|
||||
|
||||
impl<'a> Strategy for NonBlocking<'a> {
|
||||
impl<'a, 'b> Strategy<'b> for NonBlocking<'a, 'b> {
|
||||
type Context = Context<'a>;
|
||||
type Future = EventListener;
|
||||
type Future = Pin<&'b mut EventListener>;
|
||||
|
||||
#[inline]
|
||||
fn wait(&mut self, evl: EventListener) -> Self::Future {
|
||||
fn wait(&mut self, evl: Self::Future) -> Self::Future {
|
||||
evl
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn poll(
|
||||
&mut self,
|
||||
mut event_listener: EventListener,
|
||||
event_listener: Pin<&mut EventListener>,
|
||||
context: &mut Self::Context,
|
||||
) -> Result<(), EventListener> {
|
||||
match Pin::new(&mut event_listener).poll(context) {
|
||||
Poll::Ready(()) => Ok(()),
|
||||
Poll::Pending => Err(event_listener),
|
||||
}
|
||||
) -> Poll<()> {
|
||||
event_listener.poll(context)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -388,12 +385,12 @@ pub struct Blocking {
|
|||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl Strategy for Blocking {
|
||||
impl<'a> Strategy<'a> for Blocking {
|
||||
type Context = ();
|
||||
type Future = Ready;
|
||||
|
||||
#[inline]
|
||||
fn wait(&mut self, evl: EventListener) -> Self::Future {
|
||||
fn wait(&mut self, evl: Pin<&mut EventListener>) -> Self::Future {
|
||||
evl.wait();
|
||||
Ready { _private: () }
|
||||
}
|
||||
|
@ -401,11 +398,11 @@ impl Strategy for Blocking {
|
|||
#[inline]
|
||||
fn poll(
|
||||
&mut self,
|
||||
event_listener: EventListener,
|
||||
event_listener: Pin<&mut EventListener>,
|
||||
_context: &mut Self::Context,
|
||||
) -> Result<(), EventListener> {
|
||||
) -> Poll<()> {
|
||||
event_listener.wait();
|
||||
Ok(())
|
||||
Poll::Ready(())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ use std::usize;
|
|||
use event_listener::{Event, EventListener};
|
||||
use waker_fn::waker_fn;
|
||||
|
||||
fn is_notified(listener: &mut EventListener) -> bool {
|
||||
fn is_notified(listener: &mut Pin<Box<EventListener>>) -> bool {
|
||||
let waker = waker_fn(|| ());
|
||||
Pin::new(listener)
|
||||
.poll(&mut Context::from_waker(&waker))
|
||||
|
|
Loading…
Reference in New Issue