m: Port to event-listener v5.0.0

Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
John Nunley 2024-02-07 07:48:52 -08:00 committed by GitHub
parent 706f2755dc
commit efdddaa2a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 59 additions and 33 deletions

View File

@ -16,8 +16,8 @@ exclude = ["/.*"]
[dependencies]
concurrent-queue = { version = "2", default-features = false }
event-listener = { version = "4.0.0", default-features = false }
event-listener-strategy = { version = "0.4.0", default-features = false }
event-listener = { version = "5.0.0", default-features = false }
event-listener-strategy = { version = "0.5.0", default-features = false }
futures-core = { version = "0.3.5", default-features = false }
pin-project-lite = "0.2.11"

View File

@ -40,6 +40,7 @@ extern crate alloc;
use core::fmt;
use core::future::Future;
use core::marker::PhantomPinned;
use core::pin::Pin;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task::{Context, Poll};
@ -52,6 +53,7 @@ use event_listener::{Event, EventListener};
use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy};
use futures_core::ready;
use futures_core::stream::Stream;
use pin_project_lite::pin_project;
struct Channel<T> {
/// Inner message queue.
@ -132,8 +134,9 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
channel: channel.clone(),
};
let r = Receiver {
listener: EventListener::new(),
listener: None,
channel,
_pin: PhantomPinned,
};
(s, r)
}
@ -172,8 +175,9 @@ pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
channel: channel.clone(),
};
let r = Receiver {
listener: EventListener::new(),
listener: None,
channel,
_pin: PhantomPinned,
};
(s, r)
}
@ -247,7 +251,8 @@ impl<T> Sender<T> {
Send::_new(SendInner {
sender: self,
msg: Some(msg),
listener: EventListener::new(),
listener: None,
_pin: PhantomPinned,
})
}
@ -477,7 +482,7 @@ impl<T> Clone for Sender<T> {
}
}
pin_project_lite::pin_project! {
pin_project! {
/// The receiving side of a channel.
///
/// Receivers can be cloned and shared among threads. When all receivers associated with a channel
@ -491,8 +496,11 @@ pin_project_lite::pin_project! {
channel: Arc<Channel<T>>,
// Listens for a send or close event to unblock this stream.
listener: Option<EventListener>,
// Keeping this type `!Unpin` enables future optimizations.
#[pin]
listener: EventListener,
_pin: PhantomPinned
}
impl<T> PinnedDrop for Receiver<T> {
@ -567,7 +575,8 @@ impl<T> Receiver<T> {
pub fn recv(&self) -> Recv<'_, T> {
Recv::_new(RecvInner {
receiver: self,
listener: EventListener::new(),
listener: None,
_pin: PhantomPinned,
})
}
@ -787,7 +796,8 @@ impl<T> Clone for Receiver<T> {
Receiver {
channel: self.channel.clone(),
listener: EventListener::new(),
listener: None,
_pin: PhantomPinned,
}
}
}
@ -800,8 +810,9 @@ impl<T> Stream for Receiver<T> {
// If this stream is listening for events, first wait for a notification.
{
let this = self.as_mut().project();
if this.listener.is_listening() {
ready!(this.listener.poll(cx));
if let Some(listener) = this.listener.as_mut() {
ready!(Pin::new(listener).poll(cx));
*this.listener = None;
}
}
@ -810,26 +821,26 @@ impl<T> Stream for Receiver<T> {
match self.try_recv() {
Ok(msg) => {
// The stream is not blocked on an event - drop the listener.
let mut this = self.project();
this.listener.as_mut().set(EventListener::new());
let this = self.as_mut().project();
*this.listener = None;
return Poll::Ready(Some(msg));
}
Err(TryRecvError::Closed) => {
// The stream is not blocked on an event - drop the listener.
let mut this = self.project();
this.listener.as_mut().set(EventListener::new());
let this = self.as_mut().project();
*this.listener = None;
return Poll::Ready(None);
}
Err(TryRecvError::Empty) => {}
}
// Receiving failed - now start listening for notifications or wait for one.
let mut this = self.as_mut().project();
if this.listener.is_listening() {
let this = self.as_mut().project();
if this.listener.is_some() {
// Go back to the outer loop to wait for a notification.
break;
} else {
this.listener.as_mut().listen(&this.channel.stream_ops);
*this.listener = Some(this.channel.stream_ops.listen());
}
}
}
@ -914,7 +925,8 @@ impl<T> WeakReceiver<T> {
}
Ok(_) => Some(Receiver {
channel: self.channel.clone(),
listener: EventListener::new(),
listener: None,
_pin: PhantomPinned,
}),
}
}
@ -1084,13 +1096,22 @@ easy_wrapper! {
pub(crate) wait();
}
pin_project_lite::pin_project! {
pin_project! {
#[derive(Debug)]
#[project(!Unpin)]
struct SendInner<'a, T> {
// Reference to the original sender.
sender: &'a Sender<T>,
// The message to send.
msg: Option<T>,
// Listener waiting on the channel.
listener: Option<EventListener>,
// Keeping this type `!Unpin` enables future optimizations.
#[pin]
listener: EventListener,
_pin: PhantomPinned
}
}
@ -1103,7 +1124,7 @@ impl<'a, T> EventListenerFuture for SendInner<'a, T> {
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Result<(), SendError<T>>> {
let mut this = self.project();
let this = self.project();
loop {
let msg = this.msg.take().unwrap();
@ -1115,11 +1136,11 @@ impl<'a, T> EventListenerFuture for SendInner<'a, T> {
}
// Sending failed - now start listening for notifications or wait for one.
if this.listener.is_listening() {
if this.listener.is_some() {
// Poll using the given strategy
ready!(S::poll(strategy, this.listener.as_mut(), context));
ready!(S::poll(strategy, &mut *this.listener, context));
} else {
this.listener.as_mut().listen(&this.sender.channel.send_ops);
*this.listener = Some(this.sender.channel.send_ops.listen());
}
}
}
@ -1134,12 +1155,19 @@ easy_wrapper! {
pub(crate) wait();
}
pin_project_lite::pin_project! {
pin_project! {
#[derive(Debug)]
#[project(!Unpin)]
struct RecvInner<'a, T> {
// Reference to the receiver.
receiver: &'a Receiver<T>,
// Listener waiting on the channel.
listener: Option<EventListener>,
// Keeping this type `!Unpin` enables future optimizations.
#[pin]
listener: EventListener,
_pin: PhantomPinned
}
}
@ -1152,7 +1180,7 @@ impl<'a, T> EventListenerFuture for RecvInner<'a, T> {
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<Result<T, RecvError>> {
let mut this = self.project();
let this = self.project();
loop {
// Attempt to receive a message.
@ -1163,13 +1191,11 @@ impl<'a, T> EventListenerFuture for RecvInner<'a, T> {
}
// Receiving failed - now start listening for notifications or wait for one.
if this.listener.is_listening() {
if this.listener.is_some() {
// Poll using the given strategy
ready!(S::poll(strategy, this.listener.as_mut(), cx));
ready!(S::poll(strategy, &mut *this.listener, cx));
} else {
this.listener
.as_mut()
.listen(&this.receiver.channel.recv_ops);
*this.listener = Some(this.receiver.channel.recv_ops.listen());
}
}
}