feat: Add a force send function

Closes #44 by adding a "force_send" method. This method can replace an
existing element in the list, in which case that element is returned.
This can be used to make "limited capacity" channels.

Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
John Nunley 2024-05-05 12:07:45 -07:00 committed by John Nunley
parent 790456a747
commit 3fc1130185
2 changed files with 66 additions and 2 deletions

View File

@ -44,11 +44,10 @@ use core::marker::PhantomPinned;
use core::pin::Pin;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task::{Context, Poll};
use core::usize;
use alloc::sync::Arc;
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};
use event_listener::{Event, EventListener};
use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy};
use futures_core::ready;
@ -286,6 +285,47 @@ impl<T> Sender<T> {
self.send(msg).wait()
}
/// Forcefully push a message into this channel.
///
/// If the channel is full, this method will replace an existing message in the
/// channel and return it as `Ok(Some(value))`. If the channel is closed, this
/// method will return an error.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::{bounded, SendError};
///
/// let (s, r) = bounded(3);
///
/// assert_eq!(s.send(1).await, Ok(()));
/// assert_eq!(s.send(2).await, Ok(()));
/// assert_eq!(s.force_send(3), Ok(None));
/// assert_eq!(s.force_send(4), Ok(Some(1)));
///
/// assert_eq!(r.recv().await, Ok(2));
/// assert_eq!(r.recv().await, Ok(3));
/// assert_eq!(r.recv().await, Ok(4));
/// # });
/// ```
pub fn force_send(&self, msg: T) -> Result<Option<T>, SendError<T>> {
match self.channel.queue.force_push(msg) {
Ok(backlog) => {
// Notify a blocked receive operation. If the notified operation gets canceled,
// it will notify another blocked receive operation.
self.channel.recv_ops.notify_additional(1);
// Notify all blocked streams.
self.channel.stream_ops.notify(usize::MAX);
Ok(backlog)
}
Err(ForcePushError(reject)) => Err(SendError(reject)),
}
}
/// Closes the channel.
///
/// Returns `true` if this call has closed the channel and it was not closed already.

View File

@ -184,6 +184,30 @@ fn send() {
.run();
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn force_send() {
let (s, r) = bounded(1);
Parallel::new()
.add(|| {
s.force_send(7).unwrap();
sleep(ms(1000));
s.force_send(8).unwrap();
sleep(ms(1000));
s.force_send(9).unwrap();
sleep(ms(1000));
s.force_send(10).unwrap();
})
.add(|| {
sleep(ms(1500));
assert_eq!(future::block_on(r.recv()), Ok(8));
assert_eq!(future::block_on(r.recv()), Ok(9));
assert_eq!(future::block_on(r.recv()), Ok(10));
})
.run();
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn send_after_close() {