Bump to event-listener v3.0.0
This commit makes async-channel use the new release of event-listener. Highlights include a marked increase in efficiency and no_std support. Supersedes #54 Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
parent
4cae9cb0cb
commit
51ab1273b4
|
@ -17,8 +17,12 @@ exclude = ["/.*"]
|
|||
[dependencies]
|
||||
concurrent-queue = "2"
|
||||
event-listener = "2.4.0"
|
||||
event-listener-strategy = { git = "https://github.com/smol-rs/event-listener" }
|
||||
futures-core = "0.3.5"
|
||||
|
||||
[dev-dependencies]
|
||||
easy-parallel = "3"
|
||||
futures-lite = "1"
|
||||
|
||||
[patch.crates-io]
|
||||
event-listener = { git = "https://github.com/smol-rs/event-listener" }
|
||||
|
|
155
src/lib.rs
155
src/lib.rs
|
@ -47,6 +47,7 @@ use std::usize;
|
|||
|
||||
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
|
||||
use event_listener::{Event, EventListener};
|
||||
use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy};
|
||||
use futures_core::stream::Stream;
|
||||
|
||||
struct Channel<T> {
|
||||
|
@ -240,11 +241,11 @@ impl<T> Sender<T> {
|
|||
/// # });
|
||||
/// ```
|
||||
pub fn send(&self, msg: T) -> Send<'_, T> {
|
||||
Send {
|
||||
Send::_new(SendInner {
|
||||
sender: self,
|
||||
listener: None,
|
||||
msg: Some(msg),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Sends a message into this channel using the blocking strategy.
|
||||
|
@ -485,7 +486,11 @@ pub struct Receiver<T> {
|
|||
channel: Arc<Channel<T>>,
|
||||
|
||||
/// Listens for a send or close event to unblock this stream.
|
||||
listener: Option<EventListener>,
|
||||
///
|
||||
/// TODO: This is pinned and boxed because `Receiver<T>` is `Unpin` and the newest version
|
||||
/// of `event_listener::EventListener` is not. At the next major release, we can remove the
|
||||
/// `Pin<Box<>>` and make `Receiver<T>` `!Unpin`.
|
||||
listener: Option<Pin<Box<EventListener>>>,
|
||||
}
|
||||
|
||||
impl<T> Receiver<T> {
|
||||
|
@ -546,10 +551,10 @@ impl<T> Receiver<T> {
|
|||
/// # });
|
||||
/// ```
|
||||
pub fn recv(&self) -> Recv<'_, T> {
|
||||
Recv {
|
||||
Recv::_new(RecvInner {
|
||||
receiver: self,
|
||||
listener: None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Receives a message from the channel using the blocking strategy.
|
||||
|
@ -1059,20 +1064,34 @@ impl fmt::Display for TryRecvError {
|
|||
}
|
||||
}
|
||||
|
||||
/// A future returned by [`Sender::send()`].
|
||||
easy_wrapper! {
|
||||
/// A future returned by [`Sender::send()`].
|
||||
#[derive(Debug)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct Send<'a, T>(SendInner<'a, T> => Result<(), SendError<T>>);
|
||||
pub(crate) wait();
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct Send<'a, T> {
|
||||
struct SendInner<'a, T> {
|
||||
sender: &'a Sender<T>,
|
||||
listener: Option<EventListener>,
|
||||
/// TODO: This is pinned and boxed because `Send<T>` is `Unpin` and the newest version of
|
||||
/// `event_listener::EventListener` is not. At the next breaking release of this crate, we can
|
||||
/// remove the `Pin<Box<>>` and make `Send<T>` `!Unpin`.
|
||||
listener: Option<Pin<Box<EventListener>>>,
|
||||
msg: Option<T>,
|
||||
}
|
||||
|
||||
impl<'a, T> Send<'a, T> {
|
||||
impl<'a, T> Unpin for SendInner<'a, T> {}
|
||||
|
||||
impl<'a, T> EventListenerFuture for SendInner<'a, T> {
|
||||
type Output = Result<(), SendError<T>>;
|
||||
|
||||
/// Run this future with the given `Strategy`.
|
||||
fn run_with_strategy<S: Strategy>(
|
||||
&mut self,
|
||||
cx: &mut S::Context,
|
||||
fn poll_with_strategy<'x, S: Strategy<'x>>(
|
||||
mut self: Pin<&'x mut Self>,
|
||||
strategy: &mut S,
|
||||
context: &mut S::Context,
|
||||
) -> Poll<Result<(), SendError<T>>> {
|
||||
loop {
|
||||
let msg = self.msg.take().unwrap();
|
||||
|
@ -1084,55 +1103,50 @@ impl<'a, T> Send<'a, T> {
|
|||
}
|
||||
|
||||
// Sending failed - now start listening for notifications or wait for one.
|
||||
match self.listener.take() {
|
||||
match self.listener.as_mut() {
|
||||
None => {
|
||||
// Start listening and then try sending again.
|
||||
self.listener = Some(self.sender.channel.send_ops.listen());
|
||||
}
|
||||
Some(l) => {
|
||||
// Poll using the given strategy
|
||||
if let Err(l) = S::poll(l, cx) {
|
||||
self.listener = Some(l);
|
||||
if let Poll::Pending = S::poll(strategy, l.as_mut(), context) {
|
||||
return Poll::Pending;
|
||||
} else {
|
||||
self.listener = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Run using the blocking strategy.
|
||||
fn wait(mut self) -> Result<(), SendError<T>> {
|
||||
match self.run_with_strategy::<Blocking>(&mut ()) {
|
||||
Poll::Ready(res) => res,
|
||||
Poll::Pending => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> Unpin for Send<'a, T> {}
|
||||
|
||||
impl<'a, T> Future for Send<'a, T> {
|
||||
type Output = Result<(), SendError<T>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
self.run_with_strategy::<NonBlocking<'_>>(cx)
|
||||
}
|
||||
easy_wrapper! {
|
||||
/// A future returned by [`Receiver::recv()`].
|
||||
#[derive(Debug)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct Recv<'a, T>(RecvInner<'a, T> => Result<T, RecvError>);
|
||||
pub(crate) wait();
|
||||
}
|
||||
|
||||
/// A future returned by [`Receiver::recv()`].
|
||||
#[derive(Debug)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct Recv<'a, T> {
|
||||
struct RecvInner<'a, T> {
|
||||
receiver: &'a Receiver<T>,
|
||||
listener: Option<EventListener>,
|
||||
/// TODO: This is pinned and boxed because `Recv<T>` is `Unpin` and the newest version of
|
||||
/// `event_listener::EventListener` is not. At the next breaking release of this crate, we can
|
||||
/// remove the `Pin<Box<>>` and make `Recv<T>` `!Unpin`.
|
||||
listener: Option<Pin<Box<EventListener>>>,
|
||||
}
|
||||
|
||||
impl<'a, T> Unpin for Recv<'a, T> {}
|
||||
impl<'a, T> Unpin for RecvInner<'a, T> {}
|
||||
|
||||
impl<'a, T> EventListenerFuture for RecvInner<'a, T> {
|
||||
type Output = Result<T, RecvError>;
|
||||
|
||||
impl<'a, T> Recv<'a, T> {
|
||||
/// Run this future with the given `Strategy`.
|
||||
fn run_with_strategy<S: Strategy>(
|
||||
&mut self,
|
||||
fn poll_with_strategy<'x, S: Strategy<'x>>(
|
||||
mut self: Pin<&'x mut Self>,
|
||||
strategy: &mut S,
|
||||
cx: &mut S::Context,
|
||||
) -> Poll<Result<T, RecvError>> {
|
||||
loop {
|
||||
|
@ -1144,73 +1158,20 @@ impl<'a, T> Recv<'a, T> {
|
|||
}
|
||||
|
||||
// Receiving failed - now start listening for notifications or wait for one.
|
||||
match self.listener.take() {
|
||||
match self.listener.as_mut() {
|
||||
None => {
|
||||
// Start listening and then try receiving again.
|
||||
self.listener = Some(self.receiver.channel.recv_ops.listen());
|
||||
}
|
||||
Some(l) => {
|
||||
// Poll using the given strategy.
|
||||
if let Err(l) = S::poll(l, cx) {
|
||||
self.listener = Some(l);
|
||||
if let Poll::Pending = S::poll(strategy, l.as_mut(), cx) {
|
||||
return Poll::Pending;
|
||||
} else {
|
||||
self.listener = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Run with the blocking strategy.
|
||||
fn wait(mut self) -> Result<T, RecvError> {
|
||||
match self.run_with_strategy::<Blocking>(&mut ()) {
|
||||
Poll::Ready(res) => res,
|
||||
Poll::Pending => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> Future for Recv<'a, T> {
|
||||
type Output = Result<T, RecvError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
self.run_with_strategy::<NonBlocking<'_>>(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// A strategy used to poll an `EventListener`.
|
||||
trait Strategy {
|
||||
/// Context needed to be provided to the `poll` method.
|
||||
type Context;
|
||||
|
||||
/// Polls the given `EventListener`.
|
||||
///
|
||||
/// Returns the `EventListener` back if it was not completed; otherwise,
|
||||
/// returns `Ok(())`.
|
||||
fn poll(evl: EventListener, cx: &mut Self::Context) -> Result<(), EventListener>;
|
||||
}
|
||||
|
||||
/// Non-blocking strategy for use in asynchronous code.
|
||||
struct NonBlocking<'a>(&'a mut ());
|
||||
|
||||
impl<'a> Strategy for NonBlocking<'a> {
|
||||
type Context = Context<'a>;
|
||||
|
||||
fn poll(mut evl: EventListener, cx: &mut Context<'a>) -> Result<(), EventListener> {
|
||||
match Pin::new(&mut evl).poll(cx) {
|
||||
Poll::Ready(()) => Ok(()),
|
||||
Poll::Pending => Err(evl),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocking strategy for use in synchronous code.
|
||||
struct Blocking;
|
||||
|
||||
impl Strategy for Blocking {
|
||||
type Context = ();
|
||||
|
||||
fn poll(evl: EventListener, _cx: &mut ()) -> Result<(), EventListener> {
|
||||
evl.wait();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue