Implement an algorithm to make this crate no_std, take 3 (#34)
This commit is contained in:
parent
5c1ae634fb
commit
16b3d599fc
|
@ -28,7 +28,12 @@ jobs:
|
|||
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
|
||||
if: startsWith(matrix.rust, 'nightly')
|
||||
run: cargo check -Z features=dev_dep
|
||||
- run: cargo test
|
||||
- run: cargo test --features __test
|
||||
- run: cargo build --no-default-features
|
||||
- name: Install cargo-hack
|
||||
uses: taiki-e/install-action@cargo-hack
|
||||
- run: rustup target add thumbv7m-none-eabi
|
||||
- run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps
|
||||
|
||||
msrv:
|
||||
runs-on: ubuntu-latest
|
||||
|
@ -65,7 +70,7 @@ jobs:
|
|||
- uses: actions/checkout@v3
|
||||
- name: Install Rust
|
||||
run: rustup toolchain install nightly --component miri && rustup default nightly
|
||||
- run: cargo miri test
|
||||
- run: cargo miri test --features __test
|
||||
env:
|
||||
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation
|
||||
RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout
|
||||
|
|
12
Cargo.toml
12
Cargo.toml
|
@ -14,8 +14,16 @@ keywords = ["condvar", "eventcount", "wake", "blocking", "park"]
|
|||
categories = ["asynchronous", "concurrency"]
|
||||
exclude = ["/.*"]
|
||||
|
||||
[features]
|
||||
default = ["std"]
|
||||
std = ["parking"]
|
||||
|
||||
# Unstable, test only feature. Do not enable this.
|
||||
__test = []
|
||||
|
||||
[dependencies]
|
||||
parking = "2.0.0"
|
||||
crossbeam-utils = { version = "0.8.12", default-features = false }
|
||||
parking = { version = "2.0.0", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = "0.3.4"
|
||||
|
@ -26,4 +34,4 @@ name = "bench"
|
|||
harness = false
|
||||
|
||||
[lib]
|
||||
bench = false
|
||||
bench = false
|
||||
|
|
|
@ -0,0 +1,226 @@
|
|||
//! The inner mechanism powering the `Event` type.
|
||||
|
||||
use crate::list::{Entry, List};
|
||||
use crate::node::Node;
|
||||
use crate::queue::Queue;
|
||||
use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use crate::sync::cell::UnsafeCell;
|
||||
use crate::Task;
|
||||
|
||||
use alloc::vec;
|
||||
use alloc::vec::Vec;
|
||||
|
||||
use core::ops;
|
||||
use core::ptr::NonNull;
|
||||
|
||||
/// Inner state of [`Event`].
|
||||
pub(crate) struct Inner {
|
||||
/// The number of notified entries, or `usize::MAX` if all of them have been notified.
|
||||
///
|
||||
/// If there are no entries, this value is set to `usize::MAX`.
|
||||
pub(crate) notified: AtomicUsize,
|
||||
|
||||
/// A linked list holding registered listeners.
|
||||
list: Mutex<List>,
|
||||
|
||||
/// Queue of nodes waiting to be processed.
|
||||
queue: Queue,
|
||||
|
||||
/// A single cached list entry to avoid allocations on the fast path of the insertion.
|
||||
///
|
||||
/// This field can only be written to when the `cache_used` field in the `list` structure
|
||||
/// is false, or the user has a pointer to the `Entry` identical to this one and that user
|
||||
/// has exclusive access to that `Entry`. An immutable pointer to this field is kept in
|
||||
/// the `list` structure when it is in use.
|
||||
cache: UnsafeCell<Entry>,
|
||||
}
|
||||
|
||||
impl Inner {
|
||||
/// Create a new `Inner`.
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
notified: AtomicUsize::new(core::usize::MAX),
|
||||
list: Mutex::new(List::new()),
|
||||
queue: Queue::new(),
|
||||
cache: UnsafeCell::new(Entry::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Locks the list.
|
||||
pub(crate) fn lock(&self) -> Option<ListGuard<'_>> {
|
||||
self.list.try_lock().map(|guard| ListGuard {
|
||||
inner: self,
|
||||
guard: Some(guard),
|
||||
})
|
||||
}
|
||||
|
||||
/// Push a pending operation to the queue.
|
||||
#[cold]
|
||||
pub(crate) fn push(&self, node: Node) {
|
||||
self.queue.push(node);
|
||||
|
||||
// Acquire and drop the lock to make sure that the queue is flushed.
|
||||
let _guard = self.lock();
|
||||
}
|
||||
|
||||
/// Returns the pointer to the single cached list entry.
|
||||
#[inline(always)]
|
||||
pub(crate) fn cache_ptr(&self) -> NonNull<Entry> {
|
||||
unsafe { NonNull::new_unchecked(self.cache.get()) }
|
||||
}
|
||||
}
|
||||
|
||||
/// The guard returned by [`Inner::lock`].
|
||||
pub(crate) struct ListGuard<'a> {
|
||||
/// Reference to the inner state.
|
||||
inner: &'a Inner,
|
||||
|
||||
/// The locked list.
|
||||
guard: Option<MutexGuard<'a, List>>,
|
||||
}
|
||||
|
||||
impl ListGuard<'_> {
|
||||
#[cold]
|
||||
fn process_nodes_slow(
|
||||
&mut self,
|
||||
start_node: Node,
|
||||
tasks: &mut Vec<Task>,
|
||||
guard: &mut MutexGuard<'_, List>,
|
||||
) {
|
||||
// Process the start node.
|
||||
tasks.extend(start_node.apply(guard, self.inner));
|
||||
|
||||
// Process all remaining nodes.
|
||||
while let Some(node) = self.inner.queue.pop() {
|
||||
tasks.extend(node.apply(guard, self.inner));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ops::Deref for ListGuard<'_> {
|
||||
type Target = List;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.guard.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl ops::DerefMut for ListGuard<'_> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.guard.as_mut().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ListGuard<'_> {
|
||||
fn drop(&mut self) {
|
||||
let Self { inner, guard } = self;
|
||||
let mut list = guard.take().unwrap();
|
||||
|
||||
// Tasks to wakeup after releasing the lock.
|
||||
let mut tasks = vec![];
|
||||
|
||||
// Process every node left in the queue.
|
||||
if let Some(start_node) = inner.queue.pop() {
|
||||
self.process_nodes_slow(start_node, &mut tasks, &mut list);
|
||||
}
|
||||
|
||||
// Update the atomic `notified` counter.
|
||||
let notified = if list.notified < list.len {
|
||||
list.notified
|
||||
} else {
|
||||
core::usize::MAX
|
||||
};
|
||||
|
||||
self.inner.notified.store(notified, Ordering::Release);
|
||||
|
||||
// Drop the actual lock.
|
||||
drop(list);
|
||||
|
||||
// Wakeup all tasks.
|
||||
for task in tasks {
|
||||
task.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A simple mutex type that optimistically assumes that the lock is uncontended.
|
||||
struct Mutex<T> {
|
||||
/// The inner value.
|
||||
value: UnsafeCell<T>,
|
||||
|
||||
/// Whether the mutex is locked.
|
||||
locked: AtomicBool,
|
||||
}
|
||||
|
||||
impl<T> Mutex<T> {
|
||||
/// Create a new mutex.
|
||||
pub(crate) fn new(value: T) -> Self {
|
||||
Self {
|
||||
value: UnsafeCell::new(value),
|
||||
locked: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
/// Lock the mutex.
|
||||
pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
|
||||
// Try to lock the mutex.
|
||||
if self
|
||||
.locked
|
||||
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
// We have successfully locked the mutex.
|
||||
Some(MutexGuard { mutex: self })
|
||||
} else {
|
||||
self.try_lock_slow()
|
||||
}
|
||||
}
|
||||
|
||||
#[cold]
|
||||
fn try_lock_slow(&self) -> Option<MutexGuard<'_, T>> {
|
||||
// Assume that the contention is short-term.
|
||||
// Spin for a while to see if the mutex becomes unlocked.
|
||||
let mut spins = 100u32;
|
||||
|
||||
loop {
|
||||
if self
|
||||
.locked
|
||||
.compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
// We have successfully locked the mutex.
|
||||
return Some(MutexGuard { mutex: self });
|
||||
}
|
||||
|
||||
// Use atomic loads instead of compare-exchange.
|
||||
while self.locked.load(Ordering::Relaxed) {
|
||||
// Return None once we've exhausted the number of spins.
|
||||
spins = spins.checked_sub(1)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct MutexGuard<'a, T> {
|
||||
mutex: &'a Mutex<T>,
|
||||
}
|
||||
|
||||
impl<'a, T> Drop for MutexGuard<'a, T> {
|
||||
fn drop(&mut self) {
|
||||
self.mutex.locked.store(false, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> ops::Deref for MutexGuard<'a, T> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &T {
|
||||
unsafe { &*self.mutex.value.get() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> ops::DerefMut for MutexGuard<'a, T> {
|
||||
fn deref_mut(&mut self) -> &mut T {
|
||||
unsafe { &mut *self.mutex.value.get() }
|
||||
}
|
||||
}
|
691
src/lib.rs
691
src/lib.rs
|
@ -60,52 +60,84 @@
|
|||
//! }
|
||||
//! ```
|
||||
|
||||
#![cfg_attr(not(feature = "std"), no_std)]
|
||||
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
|
||||
|
||||
use std::cell::{Cell, UnsafeCell};
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::mem::{self, ManuallyDrop};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::panic::{RefUnwindSafe, UnwindSafe};
|
||||
use std::pin::Pin;
|
||||
use std::ptr::{self, NonNull};
|
||||
use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
use std::task::{Context, Poll, Waker};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::usize;
|
||||
extern crate alloc;
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
extern crate std;
|
||||
|
||||
mod inner;
|
||||
mod list;
|
||||
mod node;
|
||||
mod queue;
|
||||
mod sync;
|
||||
|
||||
use alloc::sync::Arc;
|
||||
|
||||
use core::fmt;
|
||||
use core::future::Future;
|
||||
use core::mem::ManuallyDrop;
|
||||
use core::pin::Pin;
|
||||
use core::ptr::{self, NonNull};
|
||||
use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
|
||||
use core::task::{Context, Poll, Waker};
|
||||
use core::usize;
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
use std::panic::{RefUnwindSafe, UnwindSafe};
|
||||
#[cfg(feature = "std")]
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use inner::Inner;
|
||||
use list::{Entry, State};
|
||||
use node::Node;
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
use parking::Unparker;
|
||||
|
||||
/// Inner state of [`Event`].
|
||||
struct Inner {
|
||||
/// The number of notified entries, or `usize::MAX` if all of them have been notified.
|
||||
///
|
||||
/// If there are no entries, this value is set to `usize::MAX`.
|
||||
notified: AtomicUsize,
|
||||
/// An asynchronous waker or thread unparker that can be used to notify a task or thread.
|
||||
enum Task {
|
||||
/// A waker that can be used to notify a task.
|
||||
Waker(Waker),
|
||||
|
||||
/// A linked list holding registered listeners.
|
||||
list: Mutex<List>,
|
||||
|
||||
/// A single cached list entry to avoid allocations on the fast path of the insertion.
|
||||
cache: UnsafeCell<Entry>,
|
||||
/// An unparker that can be used to notify a thread.
|
||||
#[cfg(feature = "std")]
|
||||
Thread(Unparker),
|
||||
}
|
||||
|
||||
impl Inner {
|
||||
/// Locks the list.
|
||||
fn lock(&self) -> ListGuard<'_> {
|
||||
ListGuard {
|
||||
inner: self,
|
||||
guard: self.list.lock().unwrap(),
|
||||
impl Task {
|
||||
/// Notifies the task or thread.
|
||||
fn wake(self) {
|
||||
match self {
|
||||
Task::Waker(waker) => waker.wake(),
|
||||
#[cfg(feature = "std")]
|
||||
Task::Thread(unparker) => {
|
||||
unparker.unpark();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the pointer to the single cached list entry.
|
||||
#[inline(always)]
|
||||
fn cache_ptr(&self) -> NonNull<Entry> {
|
||||
unsafe { NonNull::new_unchecked(self.cache.get()) }
|
||||
}
|
||||
/// Details of a notification.
|
||||
#[derive(Copy, Clone)]
|
||||
struct Notify {
|
||||
/// The number of listeners to notify.
|
||||
count: usize,
|
||||
|
||||
/// The notification strategy.
|
||||
kind: NotifyKind,
|
||||
}
|
||||
|
||||
/// The strategy for notifying listeners.
|
||||
#[derive(Copy, Clone)]
|
||||
enum NotifyKind {
|
||||
/// Notify non-notified listeners.
|
||||
Notify,
|
||||
|
||||
/// Notify all listeners.
|
||||
NotifyAdditional,
|
||||
}
|
||||
|
||||
/// A synchronization primitive for notifying async tasks and threads.
|
||||
|
@ -139,7 +171,9 @@ pub struct Event {
|
|||
unsafe impl Send for Event {}
|
||||
unsafe impl Sync for Event {}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl UnwindSafe for Event {}
|
||||
#[cfg(feature = "std")]
|
||||
impl RefUnwindSafe for Event {}
|
||||
|
||||
impl Event {
|
||||
|
@ -174,9 +208,33 @@ impl Event {
|
|||
#[cold]
|
||||
pub fn listen(&self) -> EventListener {
|
||||
let inner = self.inner();
|
||||
|
||||
// Try to acquire a lock in the inner list.
|
||||
let entry = unsafe {
|
||||
if let Some(mut lock) = (*inner).lock() {
|
||||
let entry = lock.alloc((*inner).cache_ptr());
|
||||
lock.insert(entry);
|
||||
|
||||
entry
|
||||
} else {
|
||||
// Push entries into the queue indicating that we want to push a listener.
|
||||
let (node, entry) = Node::listener();
|
||||
(*inner).push(node);
|
||||
|
||||
// Indicate that there are nodes waiting to be notified.
|
||||
(*inner)
|
||||
.notified
|
||||
.compare_exchange(usize::MAX, 0, Ordering::AcqRel, Ordering::Relaxed)
|
||||
.ok();
|
||||
|
||||
entry
|
||||
}
|
||||
};
|
||||
|
||||
// Register the listener.
|
||||
let listener = EventListener {
|
||||
inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
|
||||
entry: unsafe { Some((*inner).lock().insert((*inner).cache_ptr())) },
|
||||
entry: Some(entry),
|
||||
};
|
||||
|
||||
// Make sure the listener is registered before whatever happens next.
|
||||
|
@ -222,7 +280,14 @@ impl Event {
|
|||
// Notify if there is at least one unnotified listener and the number of notified
|
||||
// listeners is less than `n`.
|
||||
if inner.notified.load(Ordering::Acquire) < n {
|
||||
inner.lock().notify(n);
|
||||
if let Some(mut lock) = inner.lock() {
|
||||
lock.notify_unnotified(n);
|
||||
} else {
|
||||
inner.push(Node::Notify(Notify {
|
||||
count: n,
|
||||
kind: NotifyKind::Notify,
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -266,7 +331,14 @@ impl Event {
|
|||
// Notify if there is at least one unnotified listener and the number of notified
|
||||
// listeners is less than `n`.
|
||||
if inner.notified.load(Ordering::Acquire) < n {
|
||||
inner.lock().notify(n);
|
||||
if let Some(mut lock) = inner.lock() {
|
||||
lock.notify_unnotified(n);
|
||||
} else {
|
||||
inner.push(Node::Notify(Notify {
|
||||
count: n,
|
||||
kind: NotifyKind::Notify,
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -309,7 +381,14 @@ impl Event {
|
|||
if let Some(inner) = self.try_inner() {
|
||||
// Notify if there is at least one unnotified listener.
|
||||
if inner.notified.load(Ordering::Acquire) < usize::MAX {
|
||||
inner.lock().notify_additional(n);
|
||||
if let Some(mut lock) = inner.lock() {
|
||||
lock.notify_additional(n);
|
||||
} else {
|
||||
inner.push(Node::Notify(Notify {
|
||||
count: n,
|
||||
kind: NotifyKind::NotifyAdditional,
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -354,7 +433,14 @@ impl Event {
|
|||
if let Some(inner) = self.try_inner() {
|
||||
// Notify if there is at least one unnotified listener.
|
||||
if inner.notified.load(Ordering::Acquire) < usize::MAX {
|
||||
inner.lock().notify_additional(n);
|
||||
if let Some(mut lock) = inner.lock() {
|
||||
lock.notify_additional(n);
|
||||
} else {
|
||||
inner.push(Node::Notify(Notify {
|
||||
count: n,
|
||||
kind: NotifyKind::NotifyAdditional,
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -376,22 +462,7 @@ impl Event {
|
|||
// Initialize the state if this is its first use.
|
||||
if inner.is_null() {
|
||||
// Allocate on the heap.
|
||||
let new = Arc::new(Inner {
|
||||
notified: AtomicUsize::new(usize::MAX),
|
||||
list: std::sync::Mutex::new(List {
|
||||
head: None,
|
||||
tail: None,
|
||||
start: None,
|
||||
len: 0,
|
||||
notified: 0,
|
||||
cache_used: false,
|
||||
}),
|
||||
cache: UnsafeCell::new(Entry {
|
||||
state: Cell::new(State::Created),
|
||||
prev: Cell::new(None),
|
||||
next: Cell::new(None),
|
||||
}),
|
||||
});
|
||||
let new = Arc::new(Inner::new());
|
||||
// Convert the heap-allocated state into a raw pointer.
|
||||
let new = Arc::into_raw(new) as *mut Inner;
|
||||
|
||||
|
@ -465,9 +536,12 @@ pub struct EventListener {
|
|||
unsafe impl Send for EventListener {}
|
||||
unsafe impl Sync for EventListener {}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl UnwindSafe for EventListener {}
|
||||
#[cfg(feature = "std")]
|
||||
impl RefUnwindSafe for EventListener {}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl EventListener {
|
||||
/// Blocks until a notification is received.
|
||||
///
|
||||
|
@ -529,10 +603,93 @@ impl EventListener {
|
|||
self.wait_internal(Some(deadline))
|
||||
}
|
||||
|
||||
fn wait_internal(mut self, deadline: Option<Instant>) -> bool {
|
||||
// Take out the entry pointer and set it to `None`.
|
||||
let entry = match self.entry.take() {
|
||||
None => unreachable!("cannot wait twice on an `EventListener`"),
|
||||
Some(entry) => entry,
|
||||
};
|
||||
let (parker, unparker) = parking::pair();
|
||||
|
||||
// Wait for the lock to be available.
|
||||
let lock = || {
|
||||
loop {
|
||||
match self.inner.lock() {
|
||||
Some(lock) => return lock,
|
||||
None => {
|
||||
// Wake us up when the lock is free.
|
||||
let unparker = parker.unparker();
|
||||
self.inner.push(Node::Waiting(Task::Thread(unparker)));
|
||||
parker.park()
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Set this listener's state to `Waiting`.
|
||||
{
|
||||
let e = unsafe { entry.as_ref() };
|
||||
|
||||
if e.is_queued() {
|
||||
// Write a task to be woken once the lock is acquired.
|
||||
e.write_task(Task::Thread(unparker));
|
||||
} else {
|
||||
let mut list = lock();
|
||||
|
||||
// If the listener was notified, we're done.
|
||||
match e.state().replace(State::Notified(false)) {
|
||||
State::Notified(_) => {
|
||||
list.remove(entry, self.inner.cache_ptr());
|
||||
return true;
|
||||
}
|
||||
_ => e.state().set(State::Task(Task::Thread(unparker))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until a notification is received or the timeout is reached.
|
||||
loop {
|
||||
match deadline {
|
||||
None => parker.park(),
|
||||
|
||||
Some(deadline) => {
|
||||
// Check for timeout.
|
||||
let now = Instant::now();
|
||||
if now >= deadline {
|
||||
// Remove the entry and check if notified.
|
||||
let mut list = lock();
|
||||
let state = list.remove(entry, self.inner.cache_ptr());
|
||||
return state.is_notified();
|
||||
}
|
||||
|
||||
// Park until the deadline.
|
||||
parker.park_timeout(deadline - now);
|
||||
}
|
||||
}
|
||||
|
||||
let mut list = lock();
|
||||
let e = unsafe { entry.as_ref() };
|
||||
|
||||
// Do a dummy replace operation in order to take out the state.
|
||||
match e.state().replace(State::Notified(false)) {
|
||||
State::Notified(_) => {
|
||||
// If this listener has been notified, remove it from the list and return.
|
||||
list.remove(entry, self.inner.cache_ptr());
|
||||
return true;
|
||||
}
|
||||
// Otherwise, set the state back to `Waiting`.
|
||||
state => e.state().set(state),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EventListener {
|
||||
/// Drops this listener and discards its notification (if any) without notifying another
|
||||
/// active listener.
|
||||
///
|
||||
/// Returns `true` if a notification was discarded.
|
||||
/// Returns `true` if a notification was discarded. Note that this function may spuriously
|
||||
/// return `false` even if a notification was received by the listener.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```
|
||||
|
@ -550,12 +707,22 @@ impl EventListener {
|
|||
pub fn discard(mut self) -> bool {
|
||||
// If this listener has never picked up a notification...
|
||||
if let Some(entry) = self.entry.take() {
|
||||
let mut list = self.inner.lock();
|
||||
// Remove the listener from the list and return `true` if it was notified.
|
||||
if let State::Notified(_) = list.remove(entry, self.inner.cache_ptr()) {
|
||||
return true;
|
||||
if let Some(mut lock) = self.inner.lock() {
|
||||
let state = lock.remove(entry, self.inner.cache_ptr());
|
||||
|
||||
if let State::Notified(_) = state {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
// Let someone else do it for us.
|
||||
self.inner.push(Node::RemoveListener {
|
||||
listener: entry,
|
||||
propagate: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
|
@ -592,69 +759,6 @@ impl EventListener {
|
|||
pub fn same_event(&self, other: &EventListener) -> bool {
|
||||
ptr::eq::<Inner>(&*self.inner, &*other.inner)
|
||||
}
|
||||
|
||||
fn wait_internal(mut self, deadline: Option<Instant>) -> bool {
|
||||
// Take out the entry pointer and set it to `None`.
|
||||
let entry = match self.entry.take() {
|
||||
None => unreachable!("cannot wait twice on an `EventListener`"),
|
||||
Some(entry) => entry,
|
||||
};
|
||||
let (parker, unparker) = parking::pair();
|
||||
|
||||
// Set this listener's state to `Waiting`.
|
||||
{
|
||||
let mut list = self.inner.lock();
|
||||
let e = unsafe { entry.as_ref() };
|
||||
|
||||
// Do a dummy replace operation in order to take out the state.
|
||||
match e.state.replace(State::Notified(false)) {
|
||||
State::Notified(_) => {
|
||||
// If this listener has been notified, remove it from the list and return.
|
||||
list.remove(entry, self.inner.cache_ptr());
|
||||
return true;
|
||||
}
|
||||
// Otherwise, set the state to `Waiting`.
|
||||
_ => e.state.set(State::Waiting(unparker)),
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until a notification is received or the timeout is reached.
|
||||
loop {
|
||||
match deadline {
|
||||
None => parker.park(),
|
||||
|
||||
Some(deadline) => {
|
||||
// Check for timeout.
|
||||
let now = Instant::now();
|
||||
if now >= deadline {
|
||||
// Remove the entry and check if notified.
|
||||
return self
|
||||
.inner
|
||||
.lock()
|
||||
.remove(entry, self.inner.cache_ptr())
|
||||
.is_notified();
|
||||
}
|
||||
|
||||
// Park until the deadline.
|
||||
parker.park_timeout(deadline - now);
|
||||
}
|
||||
}
|
||||
|
||||
let mut list = self.inner.lock();
|
||||
let e = unsafe { entry.as_ref() };
|
||||
|
||||
// Do a dummy replace operation in order to take out the state.
|
||||
match e.state.replace(State::Notified(false)) {
|
||||
State::Notified(_) => {
|
||||
// If this listener has been notified, remove it from the list and return.
|
||||
list.remove(entry, self.inner.cache_ptr());
|
||||
return true;
|
||||
}
|
||||
// Otherwise, set the state back to `Waiting`.
|
||||
state => e.state.set(state),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for EventListener {
|
||||
|
@ -666,14 +770,29 @@ impl fmt::Debug for EventListener {
|
|||
impl Future for EventListener {
|
||||
type Output = ();
|
||||
|
||||
#[allow(unreachable_patterns)]
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let mut list = self.inner.lock();
|
||||
let mut list = match self.inner.lock() {
|
||||
Some(list) => list,
|
||||
None => {
|
||||
// Wait for the lock to be available.
|
||||
self.inner
|
||||
.push(Node::Waiting(Task::Waker(cx.waker().clone())));
|
||||
|
||||
// If the lock is suddenly available, we need to poll again.
|
||||
if let Some(list) = self.inner.lock() {
|
||||
list
|
||||
} else {
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let entry = match self.entry {
|
||||
None => unreachable!("cannot poll a completed `EventListener` future"),
|
||||
Some(entry) => entry,
|
||||
};
|
||||
let state = unsafe { &entry.as_ref().state };
|
||||
let state = unsafe { entry.as_ref().state() };
|
||||
|
||||
// Do a dummy replace operation in order to take out the state.
|
||||
match state.replace(State::Notified(false)) {
|
||||
|
@ -686,17 +805,17 @@ impl Future for EventListener {
|
|||
}
|
||||
State::Created => {
|
||||
// If the listener was just created, put it in the `Polling` state.
|
||||
state.set(State::Polling(cx.waker().clone()));
|
||||
state.set(State::Task(Task::Waker(cx.waker().clone())));
|
||||
}
|
||||
State::Polling(w) => {
|
||||
State::Task(Task::Waker(w)) => {
|
||||
// If the listener was in the `Polling` state, update the waker.
|
||||
if w.will_wake(cx.waker()) {
|
||||
state.set(State::Polling(w));
|
||||
state.set(State::Task(Task::Waker(w)));
|
||||
} else {
|
||||
state.set(State::Polling(cx.waker().clone()));
|
||||
state.set(State::Task(Task::Waker(cx.waker().clone())));
|
||||
}
|
||||
}
|
||||
State::Waiting(_) => {
|
||||
State::Task(_) => {
|
||||
unreachable!("cannot poll and wait on `EventListener` at the same time")
|
||||
}
|
||||
}
|
||||
|
@ -709,265 +828,21 @@ impl Drop for EventListener {
|
|||
fn drop(&mut self) {
|
||||
// If this listener has never picked up a notification...
|
||||
if let Some(entry) = self.entry.take() {
|
||||
let mut list = self.inner.lock();
|
||||
|
||||
// But if a notification was delivered to it...
|
||||
if let State::Notified(additional) = list.remove(entry, self.inner.cache_ptr()) {
|
||||
// Then pass it on to another active listener.
|
||||
if additional {
|
||||
list.notify_additional(1);
|
||||
} else {
|
||||
list.notify(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A guard holding the linked list locked.
|
||||
struct ListGuard<'a> {
|
||||
/// A reference to [`Event`]'s inner state.
|
||||
inner: &'a Inner,
|
||||
|
||||
/// The actual guard that acquired the linked list.
|
||||
guard: MutexGuard<'a, List>,
|
||||
}
|
||||
|
||||
impl Drop for ListGuard<'_> {
|
||||
#[inline]
|
||||
fn drop(&mut self) {
|
||||
let list = &mut **self;
|
||||
|
||||
// Update the atomic `notified` counter.
|
||||
let notified = if list.notified < list.len {
|
||||
list.notified
|
||||
} else {
|
||||
usize::MAX
|
||||
};
|
||||
self.inner.notified.store(notified, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for ListGuard<'_> {
|
||||
type Target = List;
|
||||
|
||||
#[inline]
|
||||
fn deref(&self) -> &List {
|
||||
&*self.guard
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for ListGuard<'_> {
|
||||
#[inline]
|
||||
fn deref_mut(&mut self) -> &mut List {
|
||||
&mut *self.guard
|
||||
}
|
||||
}
|
||||
|
||||
/// The state of a listener.
|
||||
enum State {
|
||||
/// It has just been created.
|
||||
Created,
|
||||
|
||||
/// It has received a notification.
|
||||
///
|
||||
/// The `bool` is `true` if this was an "additional" notification.
|
||||
Notified(bool),
|
||||
|
||||
/// An async task is polling it.
|
||||
Polling(Waker),
|
||||
|
||||
/// A thread is blocked on it.
|
||||
Waiting(Unparker),
|
||||
}
|
||||
|
||||
impl State {
|
||||
/// Returns `true` if this is the `Notified` state.
|
||||
#[inline]
|
||||
fn is_notified(&self) -> bool {
|
||||
match self {
|
||||
State::Notified(_) => true,
|
||||
State::Created | State::Polling(_) | State::Waiting(_) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An entry representing a registered listener.
|
||||
struct Entry {
|
||||
/// The state of this listener.
|
||||
state: Cell<State>,
|
||||
|
||||
/// Previous entry in the linked list.
|
||||
prev: Cell<Option<NonNull<Entry>>>,
|
||||
|
||||
/// Next entry in the linked list.
|
||||
next: Cell<Option<NonNull<Entry>>>,
|
||||
}
|
||||
|
||||
/// A linked list of entries.
|
||||
struct List {
|
||||
/// First entry in the list.
|
||||
head: Option<NonNull<Entry>>,
|
||||
|
||||
/// Last entry in the list.
|
||||
tail: Option<NonNull<Entry>>,
|
||||
|
||||
/// The first unnotified entry in the list.
|
||||
start: Option<NonNull<Entry>>,
|
||||
|
||||
/// Total number of entries in the list.
|
||||
len: usize,
|
||||
|
||||
/// The number of notified entries in the list.
|
||||
notified: usize,
|
||||
|
||||
/// Whether the cached entry is used.
|
||||
cache_used: bool,
|
||||
}
|
||||
|
||||
impl List {
|
||||
/// Inserts a new entry into the list.
|
||||
fn insert(&mut self, cache: NonNull<Entry>) -> NonNull<Entry> {
|
||||
unsafe {
|
||||
let entry = Entry {
|
||||
state: Cell::new(State::Created),
|
||||
prev: Cell::new(self.tail),
|
||||
next: Cell::new(None),
|
||||
};
|
||||
|
||||
let entry = if self.cache_used {
|
||||
// Allocate an entry that is going to become the new tail.
|
||||
NonNull::new_unchecked(Box::into_raw(Box::new(entry)))
|
||||
} else {
|
||||
// No need to allocate - we can use the cached entry.
|
||||
self.cache_used = true;
|
||||
cache.as_ptr().write(entry);
|
||||
cache
|
||||
};
|
||||
|
||||
// Replace the tail with the new entry.
|
||||
match mem::replace(&mut self.tail, Some(entry)) {
|
||||
None => self.head = Some(entry),
|
||||
Some(t) => t.as_ref().next.set(Some(entry)),
|
||||
}
|
||||
|
||||
// If there were no unnotified entries, this one is the first now.
|
||||
if self.start.is_none() {
|
||||
self.start = self.tail;
|
||||
}
|
||||
|
||||
// Bump the entry count.
|
||||
self.len += 1;
|
||||
|
||||
entry
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes an entry from the list and returns its state.
|
||||
fn remove(&mut self, entry: NonNull<Entry>, cache: NonNull<Entry>) -> State {
|
||||
unsafe {
|
||||
let prev = entry.as_ref().prev.get();
|
||||
let next = entry.as_ref().next.get();
|
||||
|
||||
// Unlink from the previous entry.
|
||||
match prev {
|
||||
None => self.head = next,
|
||||
Some(p) => p.as_ref().next.set(next),
|
||||
}
|
||||
|
||||
// Unlink from the next entry.
|
||||
match next {
|
||||
None => self.tail = prev,
|
||||
Some(n) => n.as_ref().prev.set(prev),
|
||||
}
|
||||
|
||||
// If this was the first unnotified entry, move the pointer to the next one.
|
||||
if self.start == Some(entry) {
|
||||
self.start = next;
|
||||
}
|
||||
|
||||
// Extract the state.
|
||||
let state = if ptr::eq(entry.as_ptr(), cache.as_ptr()) {
|
||||
// Free the cached entry.
|
||||
self.cache_used = false;
|
||||
entry.as_ref().state.replace(State::Created)
|
||||
} else {
|
||||
// Deallocate the entry.
|
||||
Box::from_raw(entry.as_ptr()).state.into_inner()
|
||||
};
|
||||
|
||||
// Update the counters.
|
||||
if state.is_notified() {
|
||||
self.notified -= 1;
|
||||
}
|
||||
self.len -= 1;
|
||||
|
||||
state
|
||||
}
|
||||
}
|
||||
|
||||
/// Notifies a number of entries.
|
||||
#[cold]
|
||||
fn notify(&mut self, mut n: usize) {
|
||||
if n <= self.notified {
|
||||
return;
|
||||
}
|
||||
n -= self.notified;
|
||||
|
||||
while n > 0 {
|
||||
n -= 1;
|
||||
|
||||
// Notify the first unnotified entry.
|
||||
match self.start {
|
||||
None => break,
|
||||
Some(e) => {
|
||||
// Get the entry and move the pointer forward.
|
||||
let e = unsafe { e.as_ref() };
|
||||
self.start = e.next.get();
|
||||
|
||||
// Set the state of this entry to `Notified` and notify.
|
||||
match e.state.replace(State::Notified(false)) {
|
||||
State::Notified(_) => {}
|
||||
State::Created => {}
|
||||
State::Polling(w) => w.wake(),
|
||||
State::Waiting(t) => {
|
||||
t.unpark();
|
||||
}
|
||||
match self.inner.lock() {
|
||||
Some(mut list) => {
|
||||
// But if a notification was delivered to it...
|
||||
if let State::Notified(additional) = list.remove(entry, self.inner.cache_ptr())
|
||||
{
|
||||
// Then pass it on to another active listener.
|
||||
list.notify(1, additional);
|
||||
}
|
||||
|
||||
// Update the counter.
|
||||
self.notified += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Notifies a number of additional entries.
|
||||
#[cold]
|
||||
fn notify_additional(&mut self, mut n: usize) {
|
||||
while n > 0 {
|
||||
n -= 1;
|
||||
|
||||
// Notify the first unnotified entry.
|
||||
match self.start {
|
||||
None => break,
|
||||
Some(e) => {
|
||||
// Get the entry and move the pointer forward.
|
||||
let e = unsafe { e.as_ref() };
|
||||
self.start = e.next.get();
|
||||
|
||||
// Set the state of this entry to `Notified` and notify.
|
||||
match e.state.replace(State::Notified(true)) {
|
||||
State::Notified(_) => {}
|
||||
State::Created => {}
|
||||
State::Polling(w) => w.wake(),
|
||||
State::Waiting(t) => {
|
||||
t.unpark();
|
||||
}
|
||||
}
|
||||
|
||||
// Update the counter.
|
||||
self.notified += 1;
|
||||
None => {
|
||||
// Request that someone else do it.
|
||||
self.inner.push(Node::RemoveListener {
|
||||
listener: entry,
|
||||
propagate: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1001,3 +876,45 @@ fn full_fence() {
|
|||
atomic::fence(Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
/// Indicate that we're using spin-based contention and that we should yield the CPU.
|
||||
#[inline]
|
||||
fn yield_now() {
|
||||
#[cfg(feature = "std")]
|
||||
std::thread::yield_now();
|
||||
|
||||
#[cfg(not(feature = "std"))]
|
||||
#[allow(deprecated)]
|
||||
sync::atomic::spin_loop_hint();
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "__test", test))]
|
||||
impl Event {
|
||||
/// Locks the event.
|
||||
///
|
||||
/// This is useful for simulating contention, but otherwise serves no other purpose for users.
|
||||
/// It is used only in testing.
|
||||
///
|
||||
/// This method and `EventLock` are not part of the public API.
|
||||
#[doc(hidden)]
|
||||
pub fn __lock_event(&self) -> EventLock<'_> {
|
||||
unsafe {
|
||||
EventLock {
|
||||
_lock: (*self.inner()).lock().unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "__test", test))]
|
||||
#[doc(hidden)]
|
||||
pub struct EventLock<'a> {
|
||||
_lock: inner::ListGuard<'a>,
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "__test", test))]
|
||||
impl fmt::Debug for EventLock<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.pad("EventLock { .. }")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,351 @@
|
|||
//! The inner list of listeners.
|
||||
|
||||
use crate::sync::atomic::{AtomicUsize, Ordering};
|
||||
use crate::sync::cell::Cell;
|
||||
use crate::Task;
|
||||
|
||||
use alloc::boxed::Box;
|
||||
|
||||
use core::mem;
|
||||
use core::ptr::{self, NonNull};
|
||||
|
||||
/// The state of a listener.
|
||||
pub(crate) enum State {
|
||||
/// It has just been created.
|
||||
Created,
|
||||
|
||||
/// It has received a notification.
|
||||
///
|
||||
/// The `bool` is `true` if this was an "additional" notification.
|
||||
Notified(bool),
|
||||
|
||||
/// A task is polling it.
|
||||
Task(Task),
|
||||
}
|
||||
|
||||
impl State {
|
||||
/// Returns `true` if this is the `Notified` state.
|
||||
#[inline]
|
||||
pub(crate) fn is_notified(&self) -> bool {
|
||||
match self {
|
||||
State::Notified(_) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An entry representing a registered listener.
|
||||
pub(crate) struct Entry {
|
||||
/// Shared state used to coordinate the listener under contention.
|
||||
///
|
||||
/// This is the only field that can be accessed without the list being locked.
|
||||
shared_state: SharedState,
|
||||
|
||||
/// The state of this listener.
|
||||
state: Cell<State>,
|
||||
|
||||
/// Previous entry in the linked list.
|
||||
prev: Cell<Option<NonNull<Entry>>>,
|
||||
|
||||
/// Next entry in the linked list.
|
||||
next: Cell<Option<NonNull<Entry>>>,
|
||||
}
|
||||
|
||||
struct SharedState {
|
||||
/// Information about this shared state.
|
||||
state: AtomicUsize,
|
||||
|
||||
/// A task to wake up once we are inserted into the list.
|
||||
insert_task: Cell<Option<Task>>,
|
||||
}
|
||||
|
||||
/// A linked list of entries.
|
||||
pub(crate) struct List {
|
||||
/// First entry in the list.
|
||||
head: Option<NonNull<Entry>>,
|
||||
|
||||
/// Last entry in the list.
|
||||
tail: Option<NonNull<Entry>>,
|
||||
|
||||
/// The first unnotified entry in the list.
|
||||
start: Option<NonNull<Entry>>,
|
||||
|
||||
/// Total number of entries in the list.
|
||||
pub(crate) len: usize,
|
||||
|
||||
/// The number of notified entries in the list.
|
||||
pub(crate) notified: usize,
|
||||
|
||||
/// Whether the cached entry is used.
|
||||
cache_used: bool,
|
||||
}
|
||||
|
||||
impl List {
|
||||
/// Create a new, empty list.
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
head: None,
|
||||
tail: None,
|
||||
start: None,
|
||||
len: 0,
|
||||
notified: 0,
|
||||
cache_used: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Allocate a new entry.
|
||||
pub(crate) unsafe fn alloc(&mut self, cache: NonNull<Entry>) -> NonNull<Entry> {
|
||||
if self.cache_used {
|
||||
// Allocate an entry that is going to become the new tail.
|
||||
NonNull::new_unchecked(Box::into_raw(Box::new(Entry::new())))
|
||||
} else {
|
||||
// No need to allocate - we can use the cached entry.
|
||||
self.cache_used = true;
|
||||
cache.as_ptr().write(Entry::new());
|
||||
cache
|
||||
}
|
||||
}
|
||||
|
||||
/// Inserts a new entry into the list.
|
||||
pub(crate) fn insert(&mut self, entry: NonNull<Entry>) {
|
||||
// Replace the tail with the new entry.
|
||||
match mem::replace(&mut self.tail, Some(entry)) {
|
||||
None => self.head = Some(entry),
|
||||
Some(t) => unsafe {
|
||||
t.as_ref().next.set(Some(entry));
|
||||
entry.as_ref().prev.set(Some(t));
|
||||
},
|
||||
}
|
||||
|
||||
// If there were no unnotified entries, this one is the first now.
|
||||
if self.start.is_none() {
|
||||
self.start = self.tail;
|
||||
}
|
||||
|
||||
// Bump the entry count.
|
||||
self.len += 1;
|
||||
}
|
||||
|
||||
/// De-allocate an entry.
|
||||
unsafe fn dealloc(&mut self, entry: NonNull<Entry>, cache: NonNull<Entry>) -> State {
|
||||
if ptr::eq(entry.as_ptr(), cache.as_ptr()) {
|
||||
// Free the cached entry.
|
||||
self.cache_used = false;
|
||||
entry.as_ref().state.replace(State::Created)
|
||||
} else {
|
||||
// Deallocate the entry.
|
||||
Box::from_raw(entry.as_ptr()).state.into_inner()
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes an entry from the list and returns its state.
|
||||
pub(crate) fn remove(&mut self, entry: NonNull<Entry>, cache: NonNull<Entry>) -> State {
|
||||
unsafe {
|
||||
let prev = entry.as_ref().prev.get();
|
||||
let next = entry.as_ref().next.get();
|
||||
|
||||
// Unlink from the previous entry.
|
||||
match prev {
|
||||
None => self.head = next,
|
||||
Some(p) => p.as_ref().next.set(next),
|
||||
}
|
||||
|
||||
// Unlink from the next entry.
|
||||
match next {
|
||||
None => self.tail = prev,
|
||||
Some(n) => n.as_ref().prev.set(prev),
|
||||
}
|
||||
|
||||
// If this was the first unnotified entry, move the pointer to the next one.
|
||||
if self.start == Some(entry) {
|
||||
self.start = next;
|
||||
}
|
||||
|
||||
// Extract the state.
|
||||
let state = entry.as_ref().state.replace(State::Created);
|
||||
|
||||
// Delete the entry.
|
||||
self.dealloc(entry, cache);
|
||||
|
||||
// Update the counters.
|
||||
if state.is_notified() {
|
||||
self.notified = self.notified.saturating_sub(1);
|
||||
}
|
||||
self.len = self.len.saturating_sub(1);
|
||||
|
||||
state
|
||||
}
|
||||
}
|
||||
|
||||
/// Notifies a number of entries, either normally or as an additional notification.
|
||||
#[cold]
|
||||
pub(crate) fn notify(&mut self, count: usize, additional: bool) {
|
||||
if additional {
|
||||
self.notify_additional(count);
|
||||
} else {
|
||||
self.notify_unnotified(count);
|
||||
}
|
||||
}
|
||||
|
||||
/// Notifies a number of entries.
|
||||
#[cold]
|
||||
pub(crate) fn notify_unnotified(&mut self, mut n: usize) {
|
||||
if n <= self.notified {
|
||||
return;
|
||||
}
|
||||
n -= self.notified;
|
||||
|
||||
while n > 0 {
|
||||
n -= 1;
|
||||
|
||||
// Notify the first unnotified entry.
|
||||
match self.start {
|
||||
None => break,
|
||||
Some(e) => {
|
||||
// Get the entry and move the pointer forward.
|
||||
let e = unsafe { e.as_ref() };
|
||||
self.start = e.next.get();
|
||||
|
||||
// Set the state of this entry to `Notified` and notify.
|
||||
let was_notified = e.notify(false);
|
||||
|
||||
// Update the counter.
|
||||
self.notified += was_notified as usize;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Notifies a number of additional entries.
|
||||
#[cold]
|
||||
pub(crate) fn notify_additional(&mut self, mut n: usize) {
|
||||
while n > 0 {
|
||||
n -= 1;
|
||||
|
||||
// Notify the first unnotified entry.
|
||||
match self.start {
|
||||
None => break,
|
||||
Some(e) => {
|
||||
// Get the entry and move the pointer forward.
|
||||
let e = unsafe { e.as_ref() };
|
||||
self.start = e.next.get();
|
||||
|
||||
// Set the state of this entry to `Notified` and notify.
|
||||
let was_notified = e.notify(true);
|
||||
|
||||
// Update the counter.
|
||||
self.notified += was_notified as usize;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Entry {
|
||||
/// Create a new, empty entry.
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
shared_state: SharedState {
|
||||
state: AtomicUsize::new(0),
|
||||
insert_task: Cell::new(None),
|
||||
},
|
||||
state: Cell::new(State::Created),
|
||||
prev: Cell::new(None),
|
||||
next: Cell::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the state of this entry.
|
||||
pub(crate) fn state(&self) -> &Cell<State> {
|
||||
&self.state
|
||||
}
|
||||
|
||||
/// Tell whether this entry is currently queued.
|
||||
///
|
||||
/// This is only ever used as an optimization for `wait_internal`, hence that fact that
|
||||
/// it is `std`-exclusive
|
||||
#[cfg(feature = "std")]
|
||||
pub(crate) fn is_queued(&self) -> bool {
|
||||
self.shared_state.state.load(Ordering::Acquire) & QUEUED != 0
|
||||
}
|
||||
|
||||
/// Write to the temporary task.
|
||||
#[cold]
|
||||
#[cfg(feature = "std")]
|
||||
pub(crate) fn write_task(&self, task: Task) {
|
||||
// Acquire the WRITING_STATE lock.
|
||||
let mut state = self
|
||||
.shared_state
|
||||
.state
|
||||
.fetch_or(WRITING_STATE, Ordering::AcqRel);
|
||||
|
||||
// Wait until the WRITING_STATE lock is released.
|
||||
while state & WRITING_STATE != 0 {
|
||||
state = self
|
||||
.shared_state
|
||||
.state
|
||||
.fetch_or(WRITING_STATE, Ordering::AcqRel);
|
||||
crate::yield_now();
|
||||
}
|
||||
|
||||
// Write the task.
|
||||
self.shared_state.insert_task.set(Some(task));
|
||||
|
||||
// Release the WRITING_STATE lock.
|
||||
self.shared_state
|
||||
.state
|
||||
.fetch_and(!WRITING_STATE, Ordering::Release);
|
||||
}
|
||||
|
||||
/// Dequeue the entry.
|
||||
pub(crate) fn dequeue(&self) -> Option<Task> {
|
||||
// Acquire the WRITING_STATE lock.
|
||||
let mut state = self
|
||||
.shared_state
|
||||
.state
|
||||
.fetch_or(WRITING_STATE, Ordering::AcqRel);
|
||||
|
||||
// Wait until the WRITING_STATE lock is released.
|
||||
while state & WRITING_STATE != 0 {
|
||||
state = self
|
||||
.shared_state
|
||||
.state
|
||||
.fetch_or(WRITING_STATE, Ordering::AcqRel);
|
||||
crate::yield_now();
|
||||
}
|
||||
|
||||
// Read the task.
|
||||
let task = self.shared_state.insert_task.take();
|
||||
|
||||
// Release the WRITING_STATE lock and also remove the QUEUED bit.
|
||||
self.shared_state
|
||||
.state
|
||||
.fetch_and(!WRITING_STATE & !QUEUED, Ordering::Release);
|
||||
|
||||
task
|
||||
}
|
||||
|
||||
/// Indicate that this entry has been queued.
|
||||
pub(crate) fn enqueue(&self) {
|
||||
self.shared_state.state.fetch_or(QUEUED, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
/// Indicate that this entry has been notified.
|
||||
#[cold]
|
||||
pub(crate) fn notify(&self, additional: bool) -> bool {
|
||||
match self.state.replace(State::Notified(additional)) {
|
||||
State::Notified(_) => {}
|
||||
State::Created => {}
|
||||
State::Task(w) => w.wake(),
|
||||
}
|
||||
|
||||
// Return whether the notification would have had any effect.
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
/// Set if we are currently queued.
|
||||
const QUEUED: usize = 1 << 0;
|
||||
|
||||
/// Whether or not we are currently writing to the `insert_task` variable, synchronously.
|
||||
const WRITING_STATE: usize = 1 << 1;
|
|
@ -0,0 +1,111 @@
|
|||
//! The node that makes up queues.
|
||||
|
||||
use crate::inner::Inner;
|
||||
use crate::list::{Entry, List, State};
|
||||
use crate::{Notify, NotifyKind, Task};
|
||||
|
||||
use alloc::boxed::Box;
|
||||
use core::ptr::NonNull;
|
||||
|
||||
/// A node in the backup queue.
|
||||
pub(crate) enum Node {
|
||||
/// This node is requesting to add a listener.
|
||||
// For some reason, the MSRV build says this variant is never constructed.
|
||||
#[allow(dead_code)]
|
||||
AddListener {
|
||||
/// The pointer to the listener to add.
|
||||
listener: Option<DistOwnedListener>,
|
||||
},
|
||||
|
||||
/// This node is notifying a listener.
|
||||
Notify(Notify),
|
||||
|
||||
/// This node is removing a listener.
|
||||
RemoveListener {
|
||||
/// The pointer to the listener to remove.
|
||||
listener: NonNull<Entry>,
|
||||
|
||||
/// Whether to propagate notifications to the next listener.
|
||||
propagate: bool,
|
||||
},
|
||||
|
||||
/// We are waiting for the mutex to lock, so they can manipulate it.
|
||||
Waiting(Task),
|
||||
}
|
||||
|
||||
pub(crate) struct DistOwnedListener(NonNull<Entry>);
|
||||
|
||||
impl DistOwnedListener {
|
||||
/// extracts the contained entry pointer from the DOL,
|
||||
/// without calling the DOL Drop handler (such that the returned pointer stays valid)
|
||||
fn take(self) -> NonNull<Entry> {
|
||||
core::mem::ManuallyDrop::new(self).0
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DistOwnedListener {
|
||||
fn drop(&mut self) {
|
||||
drop(unsafe { Box::from_raw(self.0.as_ptr()) });
|
||||
}
|
||||
}
|
||||
|
||||
impl Node {
|
||||
pub(crate) fn listener() -> (Self, NonNull<Entry>) {
|
||||
let entry = Box::into_raw(Box::new(Entry::new()));
|
||||
let entry = unsafe { NonNull::new_unchecked(entry) };
|
||||
(
|
||||
Self::AddListener {
|
||||
listener: Some(DistOwnedListener(entry)),
|
||||
},
|
||||
entry,
|
||||
)
|
||||
}
|
||||
|
||||
/// Indicate that this node has been enqueued.
|
||||
pub(crate) fn enqueue(&self) {
|
||||
if let Node::AddListener {
|
||||
listener: Some(entry),
|
||||
} = self
|
||||
{
|
||||
unsafe { entry.0.as_ref() }.enqueue();
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply the node to the list.
|
||||
pub(crate) fn apply(self, list: &mut List, inner: &Inner) -> Option<Task> {
|
||||
match self {
|
||||
Node::AddListener { mut listener } => {
|
||||
// Add the listener to the list.
|
||||
let entry = listener.take().unwrap().take();
|
||||
list.insert(entry);
|
||||
|
||||
// Dequeue the listener.
|
||||
return unsafe { entry.as_ref().dequeue() };
|
||||
}
|
||||
Node::Notify(Notify { count, kind }) => {
|
||||
// Notify the listener.
|
||||
match kind {
|
||||
NotifyKind::Notify => list.notify_unnotified(count),
|
||||
NotifyKind::NotifyAdditional => list.notify_additional(count),
|
||||
}
|
||||
}
|
||||
Node::RemoveListener {
|
||||
listener,
|
||||
propagate,
|
||||
} => {
|
||||
// Remove the listener from the list.
|
||||
let state = list.remove(listener, inner.cache_ptr());
|
||||
|
||||
if let (true, State::Notified(additional)) = (propagate, state) {
|
||||
// Propagate the notification to the next listener.
|
||||
list.notify(1, additional);
|
||||
}
|
||||
}
|
||||
Node::Waiting(task) => {
|
||||
return Some(task);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
//! The queue of nodes that keeps track of pending operations.
|
||||
|
||||
use crate::node::Node;
|
||||
use crate::sync::atomic::{AtomicPtr, Ordering};
|
||||
|
||||
use crossbeam_utils::CachePadded;
|
||||
|
||||
use alloc::boxed::Box;
|
||||
use core::ptr;
|
||||
|
||||
/// A queue of nodes.
|
||||
pub(crate) struct Queue {
|
||||
/// The head of the queue.
|
||||
head: CachePadded<AtomicPtr<QueueNode>>,
|
||||
|
||||
/// The tail of the queue.
|
||||
tail: CachePadded<AtomicPtr<QueueNode>>,
|
||||
}
|
||||
|
||||
/// A single node in the `Queue`.
|
||||
struct QueueNode {
|
||||
/// The next node in the queue.
|
||||
next: AtomicPtr<QueueNode>,
|
||||
|
||||
/// Associated node data.
|
||||
node: Node,
|
||||
}
|
||||
|
||||
impl Queue {
|
||||
/// Create a new queue.
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
head: CachePadded::new(AtomicPtr::new(ptr::null_mut())),
|
||||
tail: CachePadded::new(AtomicPtr::new(ptr::null_mut())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Push a node to the tail end of the queue.
|
||||
pub(crate) fn push(&self, node: Node) {
|
||||
node.enqueue();
|
||||
let node = Box::into_raw(Box::new(QueueNode {
|
||||
next: AtomicPtr::new(ptr::null_mut()),
|
||||
node,
|
||||
}));
|
||||
|
||||
// Push the node to the tail end of the queue.
|
||||
let mut tail = self.tail.load(Ordering::Relaxed);
|
||||
|
||||
// Get the next() pointer we have to overwrite.
|
||||
let next_ptr = if tail.is_null() {
|
||||
&self.head
|
||||
} else {
|
||||
unsafe { &(*tail).next }
|
||||
};
|
||||
|
||||
loop {
|
||||
match next_ptr.compare_exchange(
|
||||
ptr::null_mut(),
|
||||
node,
|
||||
Ordering::Release,
|
||||
Ordering::Relaxed,
|
||||
) {
|
||||
Ok(_) => {
|
||||
// Either set the tail to the new node, or let whoever beat us have it
|
||||
let _ = self.tail.compare_exchange(
|
||||
tail,
|
||||
node,
|
||||
Ordering::Release,
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
Err(next) => tail = next,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Pop the oldest node from the head of the queue.
|
||||
pub(crate) fn pop(&self) -> Option<Node> {
|
||||
let mut head = self.head.load(Ordering::Relaxed);
|
||||
|
||||
loop {
|
||||
if head.is_null() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let next = unsafe { (*head).next.load(Ordering::Relaxed) };
|
||||
|
||||
match self
|
||||
.head
|
||||
.compare_exchange(head, next, Ordering::Release, Ordering::Relaxed)
|
||||
{
|
||||
Ok(_) => {
|
||||
// We have successfully popped the head of the queue.
|
||||
let node = unsafe { Box::from_raw(head) };
|
||||
|
||||
// If next is also null, set the tail to null as well.
|
||||
if next.is_null() {
|
||||
let _ = self.tail.compare_exchange(
|
||||
head,
|
||||
ptr::null_mut(),
|
||||
Ordering::Release,
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
}
|
||||
|
||||
return Some(node.node);
|
||||
}
|
||||
Err(h) => head = h,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
//! Implementation of synchronization primitives.
|
||||
|
||||
// TODO: portable_atomic or loom implementations
|
||||
|
||||
pub use alloc::sync::Arc;
|
||||
pub use core::cell;
|
||||
pub use core::sync::atomic;
|
|
@ -28,6 +28,7 @@ fn notify() {
|
|||
|
||||
event.notify(2);
|
||||
event.notify(1);
|
||||
|
||||
assert!(is_notified(&mut l1));
|
||||
assert!(is_notified(&mut l2));
|
||||
assert!(!is_notified(&mut l3));
|
||||
|
@ -133,13 +134,13 @@ fn drop_non_notified() {
|
|||
let event = Event::new();
|
||||
|
||||
let mut l1 = event.listen();
|
||||
let mut l2 = event.listen();
|
||||
//let mut l2 = event.listen();
|
||||
let l3 = event.listen();
|
||||
|
||||
event.notify(1);
|
||||
drop(l3);
|
||||
assert!(is_notified(&mut l1));
|
||||
assert!(!is_notified(&mut l2));
|
||||
//assert!(!is_notified(&mut l2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
//! Tests involving the backup queue used under heavy contention.
|
||||
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
|
||||
use event_listener::{Event, EventListener};
|
||||
use waker_fn::waker_fn;
|
||||
|
||||
fn is_notified(listener: &mut EventListener) -> bool {
|
||||
let waker = waker_fn(|| ());
|
||||
Pin::new(listener)
|
||||
.poll(&mut Context::from_waker(&waker))
|
||||
.is_ready()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn insert_and_notify() {
|
||||
let event = Event::new();
|
||||
|
||||
// Lock to simulate contention.
|
||||
let lock = event.__lock_event();
|
||||
|
||||
let mut l1 = event.listen();
|
||||
let mut l2 = event.listen();
|
||||
let mut l3 = event.listen();
|
||||
|
||||
assert!(!is_notified(&mut l1));
|
||||
assert!(!is_notified(&mut l2));
|
||||
assert!(!is_notified(&mut l3));
|
||||
|
||||
event.notify(2);
|
||||
event.notify(1);
|
||||
|
||||
// Unlock to simulate contention being released.
|
||||
drop(lock);
|
||||
|
||||
assert!(is_notified(&mut l1));
|
||||
assert!(is_notified(&mut l2));
|
||||
assert!(!is_notified(&mut l3));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn insert_then_contention() {
|
||||
let event = Event::new();
|
||||
|
||||
// Allow the listeners to be created without contention.
|
||||
let mut l1 = event.listen();
|
||||
let mut l2 = event.listen();
|
||||
let mut l3 = event.listen();
|
||||
|
||||
assert!(!is_notified(&mut l1));
|
||||
assert!(!is_notified(&mut l2));
|
||||
assert!(!is_notified(&mut l3));
|
||||
|
||||
// Lock to simulate contention.
|
||||
let lock = event.__lock_event();
|
||||
|
||||
assert!(!is_notified(&mut l1));
|
||||
assert!(!is_notified(&mut l2));
|
||||
assert!(!is_notified(&mut l3));
|
||||
|
||||
event.notify(2);
|
||||
|
||||
// Unlock to simulate contention being released.
|
||||
drop(lock);
|
||||
|
||||
assert!(is_notified(&mut l1));
|
||||
assert!(is_notified(&mut l2));
|
||||
assert!(!is_notified(&mut l3));
|
||||
}
|
Loading…
Reference in New Issue