event-listener/src/no_std.rs

1435 lines
44 KiB
Rust

//! Implementation of `event-listener` built exclusively on atomics.
//!
//! On `no_std`, we don't have access to `Mutex`, so we can't use intrusive linked lists like the `std`
//! implementation. Normally, we would use a concurrent atomic queue to store listeners, but benchmarks
//! show that using queues in this way is very slow, especially for the single threaded use-case.
//!
//! We've found that it's easier to assume that the `Event` won't be under high contention in most use
//! cases. Therefore, we use a spinlock that protects a linked list of listeners, and fall back to an
//! atomic queue if the lock is contended. Benchmarks show that this is about 20% slower than the std
//! implementation, but still much faster than using a queue.
#[path = "no_std/node.rs"]
mod node;
use node::{Node, NothingProducer, TaskWaiting};
use crate::notify::{GenericNotify, Internal, Notification};
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::cell::{Cell, ConstPtr, UnsafeCell};
use crate::sync::Arc;
use crate::{RegisterResult, State, Task, TaskRef};
use core::fmt;
use core::marker::PhantomData;
use core::mem;
use core::num::NonZeroUsize;
use core::ops;
use core::pin::Pin;
use alloc::vec::Vec;
impl<T> crate::Inner<T> {
/// Locks the list.
fn try_lock(&self) -> Option<ListGuard<'_, T>> {
self.list.inner.try_lock().map(|guard| ListGuard {
inner: self,
guard: Some(guard),
tasks: alloc::vec![],
})
}
/// Force a queue update.
fn queue_update(&self) {
// Locking and unlocking the mutex will drain the queue if there is no contention.
drop(self.try_lock());
}
pub(crate) fn needs_notification(&self, _limit: usize) -> bool {
// TODO: Figure out a stable way to do this optimization.
true
}
/// Add a new listener to the list.
///
/// Does nothing if the list is already registered.
pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
if listener.as_ref().as_pin_ref().is_some() {
// Already inserted.
return;
}
match self.try_lock() {
Some(mut lock) => {
let key = lock.insert(State::Created);
*listener = Some(Listener::HasNode(key));
}
None => {
// Push it to the queue.
let (node, task_waiting) = Node::listener();
self.list.queue.push(node).unwrap();
*listener = Some(Listener::Queued(task_waiting));
// Force a queue update.
self.queue_update();
}
}
}
/// Remove a listener from the list.
pub(crate) fn remove(
&self,
mut listener: Pin<&mut Option<Listener<T>>>,
propagate: bool,
) -> Option<State<T>> {
loop {
let state = match listener.as_mut().take() {
Some(Listener::HasNode(key)) => {
match self.try_lock() {
Some(mut list) => {
// Fast path removal.
list.remove(key, propagate)
}
None => {
// Slow path removal.
// This is why intrusive lists don't work on no_std.
let node = Node::RemoveListener {
listener: key,
propagate,
};
self.list.queue.push(node).unwrap();
// Force a queue update.
self.queue_update();
None
}
}
}
Some(Listener::Queued(tw)) => {
// Make sure it's not added after the queue is drained.
if let Some(key) = tw.cancel() {
// If it was already added, set up our listener and try again.
*listener = Some(Listener::HasNode(key));
continue;
}
None
}
None => None,
_ => unreachable!(),
};
return state;
}
}
/// Notifies a number of entries.
#[cold]
pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
match self.try_lock() {
Some(mut guard) => {
// Notify the listeners.
guard.notify(notify)
}
None => {
// Push it to the queue.
let node = Node::Notify(GenericNotify::new(
notify.count(Internal::new()),
notify.is_additional(Internal::new()),
NothingProducer::default(),
));
self.list.queue.push(node).unwrap();
// Force a queue update.
self.queue_update();
// We haven't notified anyone yet.
0
}
}
}
/// Register a task to be notified when the event is triggered.
///
/// Returns `true` if the listener was already notified, and `false` otherwise. If the listener
/// isn't inserted, returns `None`.
pub(crate) fn register(
&self,
mut listener: Pin<&mut Option<Listener<T>>>,
task: TaskRef<'_>,
) -> RegisterResult<T> {
loop {
match listener.as_mut().take() {
Some(Listener::HasNode(key)) => {
*listener = Some(Listener::HasNode(key));
match self.try_lock() {
Some(mut guard) => {
// Fast path registration.
return guard.register(listener, task);
}
None => {
// Wait for the lock.
let node = Node::Waiting(task.into_task());
self.list.queue.push(node).unwrap();
// Force a queue update.
self.queue_update();
return RegisterResult::Registered;
}
}
}
Some(Listener::Queued(task_waiting)) => {
// Force a queue update.
self.queue_update();
// Are we done yet?
match task_waiting.status() {
Some(key) => {
assert!(key.get() != usize::MAX);
// We're inserted now, adjust state.
*listener = Some(Listener::HasNode(key));
}
None => {
// We're still queued, so register the task.
task_waiting.register(task.into_task());
*listener = Some(Listener::Queued(task_waiting));
// Force a queue update.
self.queue_update();
return RegisterResult::Registered;
}
}
}
None => return RegisterResult::NeverInserted,
_ => unreachable!(),
}
}
}
}
#[derive(Debug)]
pub(crate) struct List<T> {
/// The inner list.
inner: Mutex<ListenerSlab<T>>,
/// The queue of pending operations.
queue: concurrent_queue::ConcurrentQueue<Node<T>>,
}
impl<T> List<T> {
pub(super) fn new() -> List<T> {
List {
inner: Mutex::new(ListenerSlab::new()),
queue: concurrent_queue::ConcurrentQueue::unbounded(),
}
}
/// Try to get the total number of listeners without blocking.
pub(super) fn try_total_listeners(&self) -> Option<usize> {
self.inner.try_lock().map(|lock| lock.listeners.len())
}
}
/// The guard returned by [`Inner::lock`].
pub(crate) struct ListGuard<'a, T> {
/// Reference to the inner state.
pub(crate) inner: &'a crate::Inner<T>,
/// The locked list.
pub(crate) guard: Option<MutexGuard<'a, ListenerSlab<T>>>,
/// Tasks to wake up once this guard is dropped.
tasks: Vec<Task>,
}
impl<T> ListGuard<'_, T> {
#[cold]
fn process_nodes_slow(&mut self, start_node: Node<T>) {
let guard = self.guard.as_mut().unwrap();
// Process the start node.
self.tasks.extend(start_node.apply(guard));
// Process all remaining nodes.
while let Ok(node) = self.inner.list.queue.pop() {
self.tasks.extend(node.apply(guard));
}
}
#[inline]
fn process_nodes(&mut self) {
// Process every node left in the queue.
if let Ok(start_node) = self.inner.list.queue.pop() {
self.process_nodes_slow(start_node);
}
}
}
impl<T> ops::Deref for ListGuard<'_, T> {
type Target = ListenerSlab<T>;
fn deref(&self) -> &Self::Target {
self.guard.as_ref().unwrap()
}
}
impl<T> ops::DerefMut for ListGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.guard.as_mut().unwrap()
}
}
impl<T> Drop for ListGuard<'_, T> {
fn drop(&mut self) {
while self.guard.is_some() {
// Process every node left in the queue.
self.process_nodes();
// Update the atomic `notified` counter.
let list = self.guard.take().unwrap();
let notified = if list.notified < list.len {
list.notified
} else {
core::usize::MAX
};
self.inner.notified.store(notified, Ordering::Release);
// Drop the actual lock.
drop(list);
// Wakeup all tasks.
for task in self.tasks.drain(..) {
task.wake();
}
// There is a deadlock where a node is pushed to the end of the queue after we've finished
// process_nodes() but before we've finished dropping the lock. This can lead to some
// notifications not being properly delivered, or listeners not being added to the list.
// Therefore check before we finish dropping if there is anything left in the queue, and
// if so, lock it again and force a queue update.
if !self.inner.list.queue.is_empty() {
self.guard = self.inner.list.inner.try_lock();
}
}
}
}
/// An entry representing a registered listener.
enum Entry<T> {
/// Contains the listener state.
Listener {
/// The state of the listener.
state: Cell<State<T>>,
/// The previous listener in the list.
prev: Cell<Option<NonZeroUsize>>,
/// The next listener in the list.
next: Cell<Option<NonZeroUsize>>,
},
/// An empty slot that contains the index of the next empty slot.
Empty(NonZeroUsize),
/// Sentinel value.
Sentinel,
}
struct TakenState<'a, T> {
slot: &'a Cell<State<T>>,
state: State<T>,
}
impl<T> Drop for TakenState<'_, T> {
fn drop(&mut self) {
self.slot
.set(mem::replace(&mut self.state, State::NotifiedTaken));
}
}
impl<T> fmt::Debug for TakenState<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.state, f)
}
}
impl<T: PartialEq> PartialEq for TakenState<'_, T> {
fn eq(&self, other: &Self) -> bool {
self.state == other.state
}
}
impl<'a, T> TakenState<'a, T> {
fn new(slot: &'a Cell<State<T>>) -> Self {
let state = slot.replace(State::NotifiedTaken);
Self { slot, state }
}
}
impl<T> fmt::Debug for Entry<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Entry::Listener { state, next, prev } => f
.debug_struct("Listener")
.field("state", &TakenState::new(state))
.field("prev", prev)
.field("next", next)
.finish(),
Entry::Empty(next) => f.debug_tuple("Empty").field(next).finish(),
Entry::Sentinel => f.debug_tuple("Sentinel").finish(),
}
}
}
impl<T: PartialEq> PartialEq for Entry<T> {
fn eq(&self, other: &Entry<T>) -> bool {
match (self, other) {
(
Self::Listener {
state: state1,
prev: prev1,
next: next1,
},
Self::Listener {
state: state2,
prev: prev2,
next: next2,
},
) => {
if TakenState::new(state1) != TakenState::new(state2) {
return false;
}
prev1.get() == prev2.get() && next1.get() == next2.get()
}
(Self::Empty(next1), Self::Empty(next2)) => next1 == next2,
(Self::Sentinel, Self::Sentinel) => true,
_ => false,
}
}
}
impl<T> Entry<T> {
fn state(&self) -> &Cell<State<T>> {
match self {
Entry::Listener { state, .. } => state,
_ => unreachable!(),
}
}
fn prev(&self) -> &Cell<Option<NonZeroUsize>> {
match self {
Entry::Listener { prev, .. } => prev,
_ => unreachable!(),
}
}
fn next(&self) -> &Cell<Option<NonZeroUsize>> {
match self {
Entry::Listener { next, .. } => next,
_ => unreachable!(),
}
}
}
/// A linked list of entries.
pub(crate) struct ListenerSlab<T> {
/// The raw list of entries.
listeners: Vec<Entry<T>>,
/// First entry in the list.
head: Option<NonZeroUsize>,
/// Last entry in the list.
tail: Option<NonZeroUsize>,
/// The first unnotified entry in the list.
start: Option<NonZeroUsize>,
/// The number of notified entries in the list.
notified: usize,
/// The total number of listeners.
len: usize,
/// The index of the first `Empty` entry, or the length of the list plus one if there
/// are no empty entries.
first_empty: NonZeroUsize,
}
impl<T> fmt::Debug for ListenerSlab<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ListenerSlab")
.field("listeners", &self.listeners)
.field("head", &self.head)
.field("tail", &self.tail)
.field("start", &self.start)
.field("notified", &self.notified)
.field("len", &self.len)
.field("first_empty", &self.first_empty)
.finish()
}
}
impl<T> ListenerSlab<T> {
/// Create a new, empty list.
pub(crate) fn new() -> Self {
Self {
listeners: alloc::vec![Entry::Sentinel],
head: None,
tail: None,
start: None,
notified: 0,
len: 0,
first_empty: unsafe { NonZeroUsize::new_unchecked(1) },
}
}
/// Inserts a new entry into the list.
pub(crate) fn insert(&mut self, state: State<T>) -> NonZeroUsize {
// Add the new entry into the list.
let key = {
let entry = Entry::Listener {
state: Cell::new(state),
prev: Cell::new(self.tail),
next: Cell::new(None),
};
let key = self.first_empty;
if self.first_empty.get() == self.listeners.len() {
// No empty entries, so add a new entry.
self.listeners.push(entry);
// SAFETY: Guaranteed to not overflow, since the Vec would have panicked already.
self.first_empty = unsafe { NonZeroUsize::new_unchecked(self.listeners.len()) };
} else {
// There is an empty entry, so replace it.
let slot = &mut self.listeners[key.get()];
let next = match mem::replace(slot, entry) {
Entry::Empty(next) => next,
_ => unreachable!(),
};
self.first_empty = next;
}
key
};
// Replace the tail with the new entry.
match mem::replace(&mut self.tail, Some(key)) {
None => self.head = Some(key),
Some(tail) => {
let tail = &self.listeners[tail.get()];
tail.next().set(Some(key));
}
}
// If there are no listeners that have been notified, then the new listener is the next
// listener to be notified.
if self.start.is_none() {
self.start = Some(key);
}
// Increment the length.
self.len += 1;
key
}
/// Removes an entry from the list and returns its state.
pub(crate) fn remove(&mut self, key: NonZeroUsize, propagate: bool) -> Option<State<T>> {
let entry = &self.listeners[key.get()];
let prev = entry.prev().get();
let next = entry.next().get();
// Unlink from the previous entry.
match prev {
None => self.head = next,
Some(p) => self.listeners[p.get()].next().set(next),
}
// Unlink from the next entry.
match next {
None => self.tail = prev,
Some(n) => self.listeners[n.get()].prev().set(prev),
}
// If this was the first unnotified entry, move the pointer to the next one.
if self.start == Some(key) {
self.start = next;
}
// Extract the state.
let entry = mem::replace(
&mut self.listeners[key.get()],
Entry::Empty(self.first_empty),
);
self.first_empty = key;
let mut state = match entry {
Entry::Listener { state, .. } => state.into_inner(),
_ => unreachable!(),
};
// Update the counters.
if state.is_notified() {
self.notified = self.notified.saturating_sub(1);
if propagate {
// Propagate the notification to the next entry.
let state = mem::replace(&mut state, State::NotifiedTaken);
if let State::Notified { tag, additional } = state {
let tags = {
let mut tag = Some(tag);
move || tag.take().expect("called more than once")
};
self.notify(GenericNotify::new(1, additional, tags));
}
}
}
self.len -= 1;
Some(state)
}
/// Notifies a number of listeners.
#[cold]
pub(crate) fn notify(&mut self, mut notify: impl Notification<Tag = T>) -> usize {
let mut n = notify.count(Internal::new());
let is_additional = notify.is_additional(Internal::new());
if !is_additional {
// Make sure we're not notifying more than we have.
if n <= self.notified {
return 0;
}
n -= self.notified;
}
let original_count = n;
while n > 0 {
n -= 1;
// Notify the next entry.
match self.start {
None => return original_count - n - 1,
Some(e) => {
// Get the entry and move the pointer forwards.
let entry = &self.listeners[e.get()];
self.start = entry.next().get();
// Set the state to `Notified` and notify.
let tag = notify.next_tag(Internal::new());
if let State::Task(task) = entry.state().replace(State::Notified {
tag,
additional: is_additional,
}) {
task.wake();
}
// Bump the notified count.
self.notified += 1;
}
}
}
original_count - n
}
/// Register a task to be notified when the event is triggered.
///
/// Returns `true` if the listener was already notified, and `false` otherwise. If the listener
/// isn't inserted, returns `None`.
pub(crate) fn register(
&mut self,
mut listener: Pin<&mut Option<Listener<T>>>,
task: TaskRef<'_>,
) -> RegisterResult<T> {
let key = match *listener {
Some(Listener::HasNode(key)) => key,
_ => return RegisterResult::NeverInserted,
};
let entry = &self.listeners[key.get()];
// Take the state out and check it.
match entry.state().replace(State::NotifiedTaken) {
State::Notified { tag, .. } => {
// The listener was already notified, so we don't need to do anything.
self.remove(key, false);
*listener = None;
RegisterResult::Notified(tag)
}
State::Task(other_task) => {
// Only replace the task if it's not the same as the one we're registering.
if task.will_wake(other_task.as_task_ref()) {
entry.state().set(State::Task(other_task));
} else {
entry.state().set(State::Task(task.into_task()));
}
RegisterResult::Registered
}
_ => {
// Register the task.
entry.state().set(State::Task(task.into_task()));
RegisterResult::Registered
}
}
}
}
pub(crate) enum Listener<T> {
/// The listener has a node inside of the linked list.
HasNode(NonZeroUsize),
/// The listener has an entry in the queue that may or may not have a task waiting.
Queued(Arc<TaskWaiting>),
/// Eat the generic type for consistency.
_EatGenericType(PhantomData<T>),
}
impl<T> fmt::Debug for Listener<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::HasNode(key) => f.debug_tuple("HasNode").field(key).finish(),
Self::Queued(tw) => f.debug_tuple("Queued").field(tw).finish(),
Self::_EatGenericType(_) => unreachable!(),
}
}
}
impl<T> Unpin for Listener<T> {}
impl<T> PartialEq for Listener<T> {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::HasNode(a), Self::HasNode(b)) => a == b,
(Self::Queued(a), Self::Queued(b)) => Arc::ptr_eq(a, b),
_ => false,
}
}
}
/// A simple mutex type that optimistically assumes that the lock is uncontended.
pub(crate) struct Mutex<T> {
/// The inner value.
value: UnsafeCell<T>,
/// Whether the mutex is locked.
locked: AtomicBool,
}
impl<T: fmt::Debug> fmt::Debug for Mutex<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(lock) = self.try_lock() {
f.debug_tuple("Mutex").field(&*lock).finish()
} else {
f.write_str("Mutex { <locked> }")
}
}
}
impl<T> Mutex<T> {
/// Create a new mutex.
pub(crate) fn new(value: T) -> Self {
Self {
value: UnsafeCell::new(value),
locked: AtomicBool::new(false),
}
}
/// Lock the mutex.
pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
// Try to lock the mutex.
if self
.locked
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
// We have successfully locked the mutex.
Some(MutexGuard {
mutex: self,
guard: self.value.get(),
})
} else {
self.try_lock_slow()
}
}
#[cold]
fn try_lock_slow(&self) -> Option<MutexGuard<'_, T>> {
// Assume that the contention is short-term.
// Spin for a while to see if the mutex becomes unlocked.
let mut spins = 100u32;
loop {
if self
.locked
.compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
// We have successfully locked the mutex.
return Some(MutexGuard {
mutex: self,
guard: self.value.get(),
});
}
// Use atomic loads instead of compare-exchange.
while self.locked.load(Ordering::Relaxed) {
// Return None once we've exhausted the number of spins.
spins = spins.checked_sub(1)?;
}
}
}
}
pub(crate) struct MutexGuard<'a, T> {
mutex: &'a Mutex<T>,
guard: ConstPtr<T>,
}
impl<'a, T> Drop for MutexGuard<'a, T> {
fn drop(&mut self) {
self.mutex.locked.store(false, Ordering::Release);
}
}
impl<'a, T> ops::Deref for MutexGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { self.guard.deref() }
}
}
impl<'a, T> ops::DerefMut for MutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { self.guard.deref_mut() }
}
}
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(target_family = "wasm")]
use wasm_bindgen_test::wasm_bindgen_test as test;
#[test]
fn smoke_mutex() {
let mutex = Mutex::new(0);
{
let mut guard = mutex.try_lock().unwrap();
*guard += 1;
}
{
let mut guard = mutex.try_lock().unwrap();
*guard += 1;
}
let guard = mutex.try_lock().unwrap();
assert_eq!(*guard, 2);
}
#[test]
fn smoke_listener_slab() {
let mut listeners = ListenerSlab::<()>::new();
// Insert a few listeners.
let key1 = listeners.insert(State::Created);
let key2 = listeners.insert(State::Created);
let key3 = listeners.insert(State::Created);
assert_eq!(listeners.len, 3);
assert_eq!(listeners.notified, 0);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key1));
assert_eq!(listeners.start, Some(key1));
assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(None),
next: Cell::new(Some(key2)),
}
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key1)),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
// Remove one.
assert_eq!(listeners.remove(key2, false), Some(State::Created));
assert_eq!(listeners.len, 2);
assert_eq!(listeners.notified, 0);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key1));
assert_eq!(listeners.start, Some(key1));
assert_eq!(listeners.first_empty, NonZeroUsize::new(2).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(None),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[2],
Entry::Empty(NonZeroUsize::new(4).unwrap())
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key1)),
next: Cell::new(None),
}
);
}
#[test]
fn listener_slab_notify() {
let mut listeners = ListenerSlab::new();
// Insert a few listeners.
let key1 = listeners.insert(State::Created);
let key2 = listeners.insert(State::Created);
let key3 = listeners.insert(State::Created);
// Notify one.
listeners.notify(GenericNotify::new(1, true, || ()));
assert_eq!(listeners.len, 3);
assert_eq!(listeners.notified, 1);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key1));
assert_eq!(listeners.start, Some(key2));
assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Listener {
state: Cell::new(State::Notified {
additional: true,
tag: ()
}),
prev: Cell::new(None),
next: Cell::new(Some(key2)),
}
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key1)),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
// Remove the notified listener.
assert_eq!(
listeners.remove(key1, false),
Some(State::Notified {
additional: true,
tag: ()
})
);
assert_eq!(listeners.len, 2);
assert_eq!(listeners.notified, 0);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key2));
assert_eq!(listeners.start, Some(key2));
assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Empty(NonZeroUsize::new(4).unwrap())
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(None),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
}
#[test]
fn listener_slab_register() {
let woken = Arc::new(AtomicBool::new(false));
let waker = waker_fn::waker_fn({
let woken = woken.clone();
move || woken.store(true, Ordering::SeqCst)
});
let mut listeners = ListenerSlab::new();
// Insert a few listeners.
let key1 = listeners.insert(State::Created);
let key2 = listeners.insert(State::Created);
let key3 = listeners.insert(State::Created);
// Register one.
assert_eq!(
listeners.register(
Pin::new(&mut Some(Listener::HasNode(key2))),
TaskRef::Waker(&waker)
),
RegisterResult::Registered
);
assert_eq!(listeners.len, 3);
assert_eq!(listeners.notified, 0);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key1));
assert_eq!(listeners.start, Some(key1));
assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(None),
next: Cell::new(Some(key2)),
}
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Task(Task::Waker(waker.clone()))),
prev: Cell::new(Some(key1)),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
// Notify the listener.
listeners.notify(GenericNotify::new(2, false, || ()));
assert_eq!(listeners.len, 3);
assert_eq!(listeners.notified, 2);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key1));
assert_eq!(listeners.start, Some(key3));
assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Listener {
state: Cell::new(State::Notified {
additional: false,
tag: (),
}),
prev: Cell::new(None),
next: Cell::new(Some(key2)),
}
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Notified {
additional: false,
tag: (),
}),
prev: Cell::new(Some(key1)),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
assert!(woken.load(Ordering::SeqCst));
assert_eq!(
listeners.register(
Pin::new(&mut Some(Listener::HasNode(key2))),
TaskRef::Waker(&waker)
),
RegisterResult::Notified(())
);
}
#[test]
fn listener_slab_notify_prop() {
let woken = Arc::new(AtomicBool::new(false));
let waker = waker_fn::waker_fn({
let woken = woken.clone();
move || woken.store(true, Ordering::SeqCst)
});
let mut listeners = ListenerSlab::new();
// Insert a few listeners.
let key1 = listeners.insert(State::Created);
let key2 = listeners.insert(State::Created);
let key3 = listeners.insert(State::Created);
// Register one.
assert_eq!(
listeners.register(
Pin::new(&mut Some(Listener::HasNode(key2))),
TaskRef::Waker(&waker)
),
RegisterResult::Registered
);
assert_eq!(listeners.len, 3);
assert_eq!(listeners.notified, 0);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key1));
assert_eq!(listeners.start, Some(key1));
assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(None),
next: Cell::new(Some(key2)),
}
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Task(Task::Waker(waker.clone()))),
prev: Cell::new(Some(key1)),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
// Notify the first listener.
listeners.notify(GenericNotify::new(1, false, || ()));
assert_eq!(listeners.len, 3);
assert_eq!(listeners.notified, 1);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key1));
assert_eq!(listeners.start, Some(key2));
assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Listener {
state: Cell::new(State::Notified {
additional: false,
tag: (),
}),
prev: Cell::new(None),
next: Cell::new(Some(key2)),
}
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Task(Task::Waker(waker.clone()))),
prev: Cell::new(Some(key1)),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
// Calling notify again should not change anything.
listeners.notify(GenericNotify::new(1, false, || ()));
assert_eq!(listeners.len, 3);
assert_eq!(listeners.notified, 1);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key1));
assert_eq!(listeners.start, Some(key2));
assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Listener {
state: Cell::new(State::Notified {
additional: false,
tag: (),
}),
prev: Cell::new(None),
next: Cell::new(Some(key2)),
}
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Task(Task::Waker(waker.clone()))),
prev: Cell::new(Some(key1)),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
// Remove the first listener.
assert_eq!(
listeners.remove(key1, false),
Some(State::Notified {
additional: false,
tag: ()
})
);
assert_eq!(listeners.len, 2);
assert_eq!(listeners.notified, 0);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key2));
assert_eq!(listeners.start, Some(key2));
assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Empty(NonZeroUsize::new(4).unwrap())
);
assert_eq!(*listeners.listeners[2].prev(), Cell::new(None));
assert_eq!(*listeners.listeners[2].next(), Cell::new(Some(key3)));
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
// Notify the second listener.
listeners.notify(GenericNotify::new(1, false, || ()));
assert!(woken.load(Ordering::SeqCst));
assert_eq!(listeners.len, 2);
assert_eq!(listeners.notified, 1);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key2));
assert_eq!(listeners.start, Some(key3));
assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Empty(NonZeroUsize::new(4).unwrap())
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Notified {
additional: false,
tag: (),
}),
prev: Cell::new(None),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
// Remove and propagate the second listener.
assert_eq!(listeners.remove(key2, true), Some(State::NotifiedTaken));
// The third listener should be notified.
assert_eq!(listeners.len, 1);
assert_eq!(listeners.notified, 1);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key3));
assert_eq!(listeners.start, None);
assert_eq!(listeners.first_empty, NonZeroUsize::new(2).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Empty(NonZeroUsize::new(4).unwrap())
);
assert_eq!(
listeners.listeners[2],
Entry::Empty(NonZeroUsize::new(1).unwrap())
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Notified {
additional: false,
tag: (),
}),
prev: Cell::new(None),
next: Cell::new(None),
}
);
// Remove the third listener.
assert_eq!(
listeners.remove(key3, false),
Some(State::Notified {
additional: false,
tag: ()
})
);
}
#[test]
fn uncontended_inner() {
let inner = crate::Inner::new();
// Register two listeners.
let (mut listener1, mut listener2, mut listener3) = (None, None, None);
inner.insert(Pin::new(&mut listener1));
inner.insert(Pin::new(&mut listener2));
inner.insert(Pin::new(&mut listener3));
assert_eq!(
listener1,
Some(Listener::HasNode(NonZeroUsize::new(1).unwrap()))
);
assert_eq!(
listener2,
Some(Listener::HasNode(NonZeroUsize::new(2).unwrap()))
);
// Register a waker in the second listener.
let woken = Arc::new(AtomicBool::new(false));
let waker = waker_fn::waker_fn({
let woken = woken.clone();
move || woken.store(true, Ordering::SeqCst)
});
assert_eq!(
inner.register(Pin::new(&mut listener2), TaskRef::Waker(&waker)),
RegisterResult::Registered
);
// Notify the first listener.
inner.notify(GenericNotify::new(1, false, || ()));
assert!(!woken.load(Ordering::SeqCst));
// Another notify should do nothing.
inner.notify(GenericNotify::new(1, false, || ()));
assert!(!woken.load(Ordering::SeqCst));
// Receive the notification.
assert_eq!(
inner.register(Pin::new(&mut listener1), TaskRef::Waker(&waker)),
RegisterResult::Notified(())
);
// First listener is already removed.
assert!(listener1.is_none());
// Notify the second listener.
inner.notify(GenericNotify::new(1, false, || ()));
assert!(woken.load(Ordering::SeqCst));
// Remove the second listener and propagate the notification.
assert_eq!(
inner.remove(Pin::new(&mut listener2), true),
Some(State::NotifiedTaken)
);
// Second listener is already removed.
assert!(listener2.is_none());
// Third listener should be notified.
assert_eq!(
inner.register(Pin::new(&mut listener3), TaskRef::Waker(&waker)),
RegisterResult::Notified(())
);
}
}