Compare commits
33 Commits
dce5c466bd
...
3eabd637af
Author | SHA1 | Date |
---|---|---|
John Nunley | 3eabd637af | |
John Nunley | ed02521ed0 | |
John Nunley | 6e24922493 | |
John Nunley | e4116bb9d7 | |
John Nunley | bae7aef8f4 | |
John Nunley | eba1ae5311 | |
John Nunley | d6bd4750f9 | |
John Nunley | da7b6f8b56 | |
John Nunley | 8dbb5f5081 | |
John Nunley | 0608dff353 | |
Alex Touchet | b2e8e5cb3e | |
John Nunley | dbdd28742d | |
John Nunley | a777426828 | |
John Nunley | 7c1dabd9b6 | |
John Nunley | 70da197003 | |
John Nunley | d98976d5e7 | |
John Nunley | d7d8589784 | |
John Nunley | 6c3146d6c9 | |
John Nunley | fbe8f595cf | |
John Nunley | eec2f6cfa7 | |
John Nunley | 7963c1bf37 | |
John Nunley | cbba22b72a | |
John Nunley | 051c8d084a | |
John Nunley | 3a608ee50e | |
John Nunley | f2aa168daa | |
John Nunley | f35a87342b | |
John Nunley | b6bc34091f | |
John Nunley | c1a0d08aeb | |
John Nunley | 80752853f9 | |
John Nunley | 625c4fd6fb | |
John Nunley | 93f9670e9e | |
Taiki Endo | eef0e86a2f | |
John Nunley | 648bbfa776 |
|
@ -13,7 +13,6 @@ on:
|
|||
|
||||
env:
|
||||
CARGO_INCREMENTAL: 0
|
||||
CARGO_NET_GIT_FETCH_WITH_CLI: true
|
||||
CARGO_NET_RETRY: 10
|
||||
CARGO_TERM_COLOR: always
|
||||
RUST_BACKTRACE: 1
|
||||
|
@ -27,72 +26,37 @@ defaults:
|
|||
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
include:
|
||||
- rust: stable
|
||||
- rust: beta
|
||||
- rust: nightly
|
||||
- rust: nightly
|
||||
target: i686-unknown-linux-gnu
|
||||
os: [ubuntu-latest]
|
||||
rust: [nightly, beta, stable]
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install Rust
|
||||
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
|
||||
- name: Install cross-compilation tools
|
||||
uses: taiki-e/setup-cross-toolchain-action@v1
|
||||
with:
|
||||
target: ${{ matrix.target }}
|
||||
if: matrix.target != ''
|
||||
- run: cargo build --all --all-targets
|
||||
- run: cargo build --all --no-default-features --all-targets
|
||||
- run: cargo build --all --all-features --all-targets
|
||||
- run: cargo test --all
|
||||
- run: cargo test --all --release
|
||||
- run: cargo test --no-default-features --tests
|
||||
- run: cargo test --no-default-features --tests --release
|
||||
- name: Install cargo-hack
|
||||
uses: taiki-e/install-action@cargo-hack
|
||||
- run: rustup target add thumbv7m-none-eabi
|
||||
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
|
||||
run: cargo hack build --all --no-dev-deps
|
||||
- run: cargo hack build --all --target thumbv7m-none-eabi --no-default-features --no-dev-deps
|
||||
- run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps --features portable-atomic
|
||||
- name: Install wasm-pack
|
||||
uses: taiki-e/install-action@wasm-pack
|
||||
- run: wasm-pack test --node
|
||||
- run: wasm-pack test --node --no-default-features
|
||||
- run: wasm-pack test --node --no-default-features --features portable-atomic
|
||||
- name: Clone some dependent crates
|
||||
run: |
|
||||
git clone https://github.com/smol-rs/event-listener-strategy.git
|
||||
git clone https://github.com/smol-rs/async-channel.git
|
||||
git clone https://github.com/smol-rs/async-lock.git
|
||||
- name: Patch dependent crates
|
||||
run: |
|
||||
echo '[patch.crates-io]' >> event-listener-strategy/Cargo.toml
|
||||
echo 'event-listener = { path = ".." }' >> event-listener-strategy/Cargo.toml
|
||||
echo '[patch.crates-io]' >> async-channel/Cargo.toml
|
||||
echo 'event-listener = { path = ".." }' >> async-channel/Cargo.toml
|
||||
echo 'event-listener-strategy = { path = "../event-listener-strategy" }' >> async-channel/Cargo.toml
|
||||
echo '[patch.crates-io]' >> async-lock/Cargo.toml
|
||||
echo 'event-listener = { path = ".." }' >> async-lock/Cargo.toml
|
||||
echo 'event-listener-strategy = { path = "../event-listener-strategy" }' >> async-lock/Cargo.toml
|
||||
echo 'async-channel = { path = "../async-channel" }' >> async-lock/Cargo.toml
|
||||
- name: Test dependent crates
|
||||
run: |
|
||||
cargo test --manifest-path=event-listener-strategy/Cargo.toml
|
||||
cargo test --manifest-path=async-channel/Cargo.toml
|
||||
cargo test --manifest-path=async-lock/Cargo.toml
|
||||
if: startsWith(matrix.rust, 'nightly')
|
||||
run: cargo check -Z features=dev_dep
|
||||
- run: cargo test
|
||||
|
||||
msrv:
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
# When updating this, the reminder to update the minimum supported
|
||||
# Rust version in Cargo.toml.
|
||||
rust: ['1.59']
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install cargo-hack
|
||||
uses: taiki-e/install-action@cargo-hack
|
||||
- run: cargo hack build --all --rust-version
|
||||
- run: cargo hack build --all --no-default-features --rust-version
|
||||
- name: Install Rust
|
||||
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
|
||||
- run: cargo build
|
||||
- run: cargo build --no-default-features
|
||||
|
||||
clippy:
|
||||
runs-on: ubuntu-latest
|
||||
|
@ -100,7 +64,9 @@ jobs:
|
|||
- uses: actions/checkout@v4
|
||||
- name: Install Rust
|
||||
run: rustup update stable
|
||||
- run: cargo clippy --all --all-features --all-targets
|
||||
- run: cargo clippy --all-features --all-targets
|
||||
- run: cargo clippy --all-targets
|
||||
- run: cargo clippy --no-default-features --all-targets
|
||||
|
||||
fmt:
|
||||
runs-on: ubuntu-latest
|
||||
|
@ -116,34 +82,10 @@ jobs:
|
|||
- uses: actions/checkout@v4
|
||||
- name: Install Rust
|
||||
run: rustup toolchain install nightly --component miri && rustup default nightly
|
||||
- run: |
|
||||
echo "MIRIFLAGS=-Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation" >>"${GITHUB_ENV}"
|
||||
echo "RUSTFLAGS=${RUSTFLAGS} -Z randomize-layout" >>"${GITHUB_ENV}"
|
||||
- run: cargo miri test --all
|
||||
- run: cargo miri test --no-default-features --tests
|
||||
- run: cargo miri test --no-default-features --features portable-atomic --tests
|
||||
- name: Clone some dependent crates
|
||||
run: |
|
||||
git clone https://github.com/smol-rs/event-listener-strategy.git
|
||||
git clone https://github.com/smol-rs/async-channel.git
|
||||
git clone https://github.com/smol-rs/async-lock.git
|
||||
- name: Patch dependent crates
|
||||
run: |
|
||||
echo '[patch.crates-io]' >> event-listener-strategy/Cargo.toml
|
||||
echo 'event-listener = { path = ".." }' >> event-listener-strategy/Cargo.toml
|
||||
echo '[patch.crates-io]' >> async-channel/Cargo.toml
|
||||
echo 'event-listener = { path = ".." }' >> async-channel/Cargo.toml
|
||||
echo 'event-listener-strategy = { path = "../event-listener-strategy" }' >> async-channel/Cargo.toml
|
||||
echo '[patch.crates-io]' >> async-lock/Cargo.toml
|
||||
echo 'event-listener = { path = ".." }' >> async-lock/Cargo.toml
|
||||
echo 'event-listener-strategy = { path = "../event-listener-strategy" }' >> async-lock/Cargo.toml
|
||||
echo 'async-channel = { path = "../async-channel" }' >> async-lock/Cargo.toml
|
||||
- name: Test dependent crates
|
||||
# async-channel isn't included here as it appears to be broken on MIRI.
|
||||
# See https://github.com/smol-rs/async-channel/issues/85
|
||||
run: |
|
||||
cargo miri test --manifest-path=event-listener-strategy/Cargo.toml
|
||||
cargo miri test --manifest-path=async-lock/Cargo.toml
|
||||
- run: cargo miri test
|
||||
env:
|
||||
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation
|
||||
RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout
|
||||
|
||||
security_audit:
|
||||
permissions:
|
||||
|
@ -152,7 +94,7 @@ jobs:
|
|||
issues: write
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v3
|
||||
# https://github.com/rustsec/audit-check/issues/2
|
||||
- uses: rustsec/audit-check@master
|
||||
with:
|
||||
|
@ -166,5 +108,5 @@ jobs:
|
|||
run: rustup update stable
|
||||
- name: Loom tests
|
||||
run: RUSTFLAGS="--cfg=loom" cargo test --release --test loom --features loom
|
||||
|
||||
|
||||
- name: Loom tests for lock-free
|
||||
run: RUSTFLAGS="--cfg=loom" cargo test --no-default-features --release --test loom --features loom
|
||||
|
|
42
Cargo.toml
42
Cargo.toml
|
@ -2,11 +2,11 @@
|
|||
name = "event-listener"
|
||||
# When publishing a new version:
|
||||
# - Update CHANGELOG.md
|
||||
# - Create "v5.x.y" git tag
|
||||
# - Create "v2.x.y" git tag
|
||||
version = "5.3.0"
|
||||
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
|
||||
edition = "2021"
|
||||
rust-version = "1.60"
|
||||
rust-version = "1.59"
|
||||
description = "Notify async tasks or threads"
|
||||
license = "Apache-2.0 OR MIT"
|
||||
repository = "https://github.com/smol-rs/event-listener"
|
||||
|
@ -16,43 +16,25 @@ exclude = ["/.*"]
|
|||
|
||||
[features]
|
||||
default = ["std"]
|
||||
std = ["concurrent-queue/std", "parking"]
|
||||
portable-atomic = ["portable-atomic-util", "portable_atomic_crate"]
|
||||
loom = ["concurrent-queue/loom", "parking?/loom", "dep:loom"]
|
||||
std = []
|
||||
portable-atomic = ["portable-atomic-crate", "portable-atomic-util"]
|
||||
|
||||
[dependencies]
|
||||
concurrent-queue = { version = "2.4.0", default-features = false }
|
||||
pin-project-lite = "0.2.12"
|
||||
portable-atomic-util = { version = "0.1.4", default-features = false, optional = true, features = ["alloc"] }
|
||||
portable-atomic-crate = { version = "1.6.0", package = "portable-atomic", optional = true }
|
||||
portable-atomic-util = { version = "0.1.5", features = ["alloc"], optional = true }
|
||||
|
||||
[target.'cfg(not(target_family = "wasm"))'.dependencies]
|
||||
parking = { version = "2.0.0", optional = true }
|
||||
[dev-dependencies]
|
||||
futures-lite = "2.3.0"
|
||||
criterion = { version = "0.3.4", default-features = false, features = ["cargo_bench_support"] }
|
||||
waker-fn = "1"
|
||||
|
||||
[target.'cfg(loom)'.dependencies]
|
||||
loom = { version = "0.7", optional = true }
|
||||
|
||||
[dependencies.portable_atomic_crate]
|
||||
package = "portable-atomic"
|
||||
version = "1.2.0"
|
||||
default-features = false
|
||||
optional = true
|
||||
|
||||
[dev-dependencies]
|
||||
futures-lite = "2.0.0"
|
||||
try-lock = "0.2.5"
|
||||
waker-fn = "1"
|
||||
|
||||
[dev-dependencies.criterion]
|
||||
version = "0.5"
|
||||
default-features = false
|
||||
features = ["cargo_bench_support"]
|
||||
|
||||
[target.'cfg(target_family = "wasm")'.dev-dependencies]
|
||||
wasm-bindgen-test = "0.3"
|
||||
|
||||
[[bench]]
|
||||
name = "bench"
|
||||
harness = false
|
||||
required-features = ["std"]
|
||||
|
||||
[lib]
|
||||
bench = false
|
||||
bench = false
|
|
@ -1,5 +1,3 @@
|
|||
use std::iter;
|
||||
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use event_listener::{Event, Listener};
|
||||
|
||||
|
@ -8,14 +6,16 @@ const COUNT: usize = 8000;
|
|||
fn bench_events(c: &mut Criterion) {
|
||||
c.bench_function("notify_and_wait", |b| {
|
||||
let ev = Event::new();
|
||||
let mut handles = Vec::with_capacity(COUNT);
|
||||
|
||||
b.iter(|| {
|
||||
handles.extend(iter::repeat_with(|| ev.listen()).take(COUNT));
|
||||
let mut handles = Vec::with_capacity(COUNT);
|
||||
|
||||
for _ in 0..COUNT {
|
||||
handles.push(ev.listen());
|
||||
}
|
||||
|
||||
ev.notify(COUNT);
|
||||
|
||||
for handle in handles.drain(..) {
|
||||
for handle in handles {
|
||||
handle.wait();
|
||||
}
|
||||
});
|
||||
|
|
|
@ -2,25 +2,29 @@
|
|||
//!
|
||||
//! This mutex exposes both blocking and async methods for acquiring a lock.
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
mod example {
|
||||
#![allow(dead_code)]
|
||||
#![allow(dead_code)]
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
mod ex {
|
||||
use std::cell::UnsafeCell;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{mpsc, Arc};
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use event_listener::{listener, Event, Listener};
|
||||
use try_lock::{Locked, TryLock};
|
||||
use event_listener::{Event, Listener};
|
||||
|
||||
/// A simple mutex.
|
||||
struct Mutex<T> {
|
||||
/// Set to `true` when the mutex is locked.
|
||||
locked: AtomicBool,
|
||||
|
||||
/// Blocked lock operations.
|
||||
lock_ops: Event,
|
||||
|
||||
/// The inner non-blocking mutex.
|
||||
data: TryLock<T>,
|
||||
/// The inner protected data.
|
||||
data: UnsafeCell<T>,
|
||||
}
|
||||
|
||||
unsafe impl<T: Send> Send for Mutex<T> {}
|
||||
|
@ -30,40 +34,49 @@ mod example {
|
|||
/// Creates a mutex.
|
||||
fn new(t: T) -> Mutex<T> {
|
||||
Mutex {
|
||||
locked: AtomicBool::new(false),
|
||||
lock_ops: Event::new(),
|
||||
data: TryLock::new(t),
|
||||
data: UnsafeCell::new(t),
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to acquire a lock.
|
||||
fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
|
||||
self.data.try_lock().map(MutexGuard)
|
||||
if !self.locked.swap(true, Ordering::Acquire) {
|
||||
Some(MutexGuard(self))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks until a lock is acquired.
|
||||
fn lock(&self) -> MutexGuard<'_, T> {
|
||||
let mut listener = None;
|
||||
|
||||
loop {
|
||||
// Attempt grabbing a lock.
|
||||
if let Some(guard) = self.try_lock() {
|
||||
return guard;
|
||||
}
|
||||
|
||||
// Set up an event listener.
|
||||
listener!(self.lock_ops => listener);
|
||||
|
||||
// Try again.
|
||||
if let Some(guard) = self.try_lock() {
|
||||
return guard;
|
||||
// Set up an event listener or wait for a notification.
|
||||
match listener.take() {
|
||||
None => {
|
||||
// Start listening and then try locking again.
|
||||
listener = Some(self.lock_ops.listen());
|
||||
}
|
||||
Some(l) => {
|
||||
// Wait until a notification is received.
|
||||
l.wait();
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for a notification.
|
||||
listener.wait();
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks until a lock is acquired or the timeout is reached.
|
||||
fn lock_timeout(&self, timeout: Duration) -> Option<MutexGuard<'_, T>> {
|
||||
let deadline = Instant::now() + timeout;
|
||||
let mut listener = None;
|
||||
|
||||
loop {
|
||||
// Attempt grabbing a lock.
|
||||
|
@ -71,59 +84,73 @@ mod example {
|
|||
return Some(guard);
|
||||
}
|
||||
|
||||
// Set up an event listener.
|
||||
listener!(self.lock_ops => listener);
|
||||
|
||||
// Try again.
|
||||
if let Some(guard) = self.try_lock() {
|
||||
return Some(guard);
|
||||
// Set up an event listener or wait for an event.
|
||||
match listener.take() {
|
||||
None => {
|
||||
// Start listening and then try locking again.
|
||||
listener = Some(self.lock_ops.listen());
|
||||
}
|
||||
Some(l) => {
|
||||
// Wait until a notification is received.
|
||||
l.wait_deadline(deadline)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until a notification is received.
|
||||
listener.wait_deadline(deadline)?;
|
||||
}
|
||||
}
|
||||
|
||||
/// Acquires a lock asynchronously.
|
||||
async fn lock_async(&self) -> MutexGuard<'_, T> {
|
||||
let mut listener = None;
|
||||
|
||||
loop {
|
||||
// Attempt grabbing a lock.
|
||||
if let Some(guard) = self.try_lock() {
|
||||
return guard;
|
||||
}
|
||||
|
||||
// Set up an event listener.
|
||||
listener!(self.lock_ops => listener);
|
||||
|
||||
// Try again.
|
||||
if let Some(guard) = self.try_lock() {
|
||||
return guard;
|
||||
// Set up an event listener or wait for an event.
|
||||
match listener.take() {
|
||||
None => {
|
||||
// Start listening and then try locking again.
|
||||
listener = Some(self.lock_ops.listen());
|
||||
}
|
||||
Some(l) => {
|
||||
// Wait until a notification is received.
|
||||
l.await;
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until a notification is received.
|
||||
listener.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A guard holding a lock.
|
||||
struct MutexGuard<'a, T>(Locked<'a, T>);
|
||||
struct MutexGuard<'a, T>(&'a Mutex<T>);
|
||||
|
||||
unsafe impl<T: Send> Send for MutexGuard<'_, T> {}
|
||||
unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}
|
||||
|
||||
impl<T> Drop for MutexGuard<'_, T> {
|
||||
fn drop(&mut self) {
|
||||
self.0.locked.store(false, Ordering::Release);
|
||||
self.0.lock_ops.notify(1);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Deref for MutexGuard<'_, T> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &T {
|
||||
&self.0
|
||||
unsafe { &*self.0.data.get() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DerefMut for MutexGuard<'_, T> {
|
||||
fn deref_mut(&mut self) -> &mut T {
|
||||
&mut self.0
|
||||
unsafe { &mut *self.0.data.get() }
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn entry() {
|
||||
pub(crate) fn entry() {
|
||||
const N: usize = 10;
|
||||
|
||||
// A shared counter.
|
||||
|
@ -158,13 +185,13 @@ mod example {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(target_family = "wasm")]
|
||||
mod example {
|
||||
pub(super) fn entry() {
|
||||
println!("This example is not supported on wasm yet.");
|
||||
#[cfg(not(feature = "std"))]
|
||||
mod ex {
|
||||
pub(crate) fn entry() {
|
||||
eprintln!("this example requires the 'std' feature")
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
example::entry();
|
||||
ex::entry();
|
||||
}
|
||||
|
|
1611
src/lib.rs
1611
src/lib.rs
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,867 @@
|
|||
//! Implementation of the linked list using lock-free primitives.
|
||||
|
||||
use crate::loom::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering};
|
||||
use crate::loom::cell::{Cell, UnsafeCell};
|
||||
use crate::notify::{GenericNotify, Internal, Notification};
|
||||
|
||||
#[cfg(not(loom))]
|
||||
use core::hint::spin_loop;
|
||||
#[cfg(loom)]
|
||||
use loom::hint::spin_loop;
|
||||
|
||||
use core::cmp::Reverse;
|
||||
use core::fmt;
|
||||
use core::iter;
|
||||
use core::marker::PhantomData;
|
||||
use core::mem::{self, MaybeUninit};
|
||||
use core::num::NonZeroUsize;
|
||||
use core::ptr::{self, NonNull};
|
||||
use core::slice;
|
||||
use core::task::{Context, Poll, Waker};
|
||||
|
||||
use alloc::boxed::Box;
|
||||
use alloc::collections::BinaryHeap;
|
||||
|
||||
/// The total number of buckets stored in each thread local.
|
||||
/// All buckets combined can hold up to `usize::MAX - 1` entries.
|
||||
const BUCKETS: usize = (usize::BITS - 1) as usize;
|
||||
|
||||
/// This listener has no data.
|
||||
const NEW: usize = 0;
|
||||
|
||||
/// This listener is occupied.
|
||||
const OCCUPIED: usize = 0b00001;
|
||||
|
||||
/// This listener is notified.
|
||||
const NOTIFIED: usize = 0b00010;
|
||||
|
||||
/// This listener's notification is additional.
|
||||
const ADDITIONAL: usize = 0b00100;
|
||||
|
||||
/// This listener is in the process of registering a new waker.
|
||||
const REGISTERING: usize = 0b01000;
|
||||
|
||||
/// This listener is in the process of notifying a previous waker.
|
||||
const NOTIFYING: usize = 0b10000;
|
||||
|
||||
/// State of a listener that was just removed.
|
||||
enum NotificationState {
|
||||
/// There was no notification.
|
||||
Unnotified,
|
||||
|
||||
/// We were notified with `notify()`
|
||||
Notified,
|
||||
|
||||
/// We were notified with `notify_additional()`
|
||||
NotifiedAdditional,
|
||||
}
|
||||
|
||||
/// Inner implementation of [`Event`].
|
||||
pub(crate) struct Inner<T> {
|
||||
/// List of slots containing listeners.
|
||||
slots: Slots<T>,
|
||||
|
||||
/// Free indexes for listeners.
|
||||
indexes: Indexes,
|
||||
|
||||
/// Head of the linked list.
|
||||
head: AtomicUsize,
|
||||
|
||||
/// End of the linked list.
|
||||
tail: AtomicUsize,
|
||||
|
||||
/// Whether someone is notifying this list.
|
||||
///
|
||||
/// Only one task can notify this event at a time. This task is called the "notifier".
|
||||
notifying: AtomicBool,
|
||||
|
||||
/// Number of notifications.
|
||||
///
|
||||
/// The notifier will check this for further notifications.
|
||||
standard_notify: AtomicUsize,
|
||||
|
||||
/// Number of additional notifications.
|
||||
///
|
||||
/// The notifier will check this for further notifications.
|
||||
additional_notify: AtomicUsize,
|
||||
}
|
||||
|
||||
impl<T> Inner<T> {
|
||||
/// Create a new linked list.
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
slots: Slots::new(),
|
||||
indexes: Indexes::new(),
|
||||
head: AtomicUsize::new(0),
|
||||
tail: AtomicUsize::new(0),
|
||||
notifying: AtomicBool::new(false),
|
||||
standard_notify: AtomicUsize::new(0),
|
||||
additional_notify: AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
/// Debug output.
|
||||
#[inline]
|
||||
pub(crate) fn debug_fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Event").finish_non_exhaustive()
|
||||
}
|
||||
|
||||
/// We never have enough info to tell this for sure.
|
||||
pub(crate) fn notifyable(&self, _limit: usize) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
/// Tell whether any listeners are currently notified.
|
||||
#[inline]
|
||||
pub fn is_notified(&self) -> bool {
|
||||
let mut head = self.head.load(Ordering::Acquire);
|
||||
|
||||
loop {
|
||||
if head == 0 {
|
||||
// No entries left.
|
||||
return false;
|
||||
}
|
||||
let slot = self.slots.get(head);
|
||||
|
||||
// If this slot isn't occupied, use the next one.
|
||||
let state = slot.state.load(Ordering::Acquire);
|
||||
if state & OCCUPIED == 0 {
|
||||
head = slot.next.load(Ordering::Acquire);
|
||||
} else {
|
||||
return state & NOTIFIED != 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Insert a listener into this linked list.
|
||||
#[cold]
|
||||
pub(crate) fn listen(&self) -> Listener<T> {
|
||||
/// Update some local state and the current slot's tail at once.
|
||||
struct TailUpdater<'a> {
|
||||
tail: Cell<usize>,
|
||||
current_prev: &'a AtomicUsize,
|
||||
}
|
||||
|
||||
impl TailUpdater<'_> {
|
||||
#[inline]
|
||||
fn tail(&self) -> usize {
|
||||
self.tail.get()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn update(&self, new_tail: usize, ordering: Ordering) {
|
||||
self.tail.set(new_tail);
|
||||
self.current_prev.store(new_tail, ordering);
|
||||
}
|
||||
}
|
||||
|
||||
let (index, slot) = loop {
|
||||
let index = self.indexes.alloc();
|
||||
let slot = self.slots.get(index);
|
||||
|
||||
// If a notification from last time is still being notified, allocate a new ID.
|
||||
if slot.state.load(Ordering::Acquire) & NOTIFYING != 0 {
|
||||
continue;
|
||||
} else {
|
||||
break (index, slot);
|
||||
}
|
||||
};
|
||||
|
||||
let state = TailUpdater {
|
||||
tail: Cell::new(0),
|
||||
current_prev: &slot.prev,
|
||||
};
|
||||
|
||||
// Set our link's state up.
|
||||
slot.state.store(OCCUPIED, Ordering::Relaxed);
|
||||
slot.next.store(0, Ordering::Release);
|
||||
state.update(self.tail.load(Ordering::Relaxed), Ordering::Relaxed);
|
||||
|
||||
// Try appending this new node to the back of the list.
|
||||
loop {
|
||||
if state.tail() == 0 {
|
||||
// The list must be empty; try to make ourselves the head.
|
||||
let old_head = self
|
||||
.head
|
||||
.compare_exchange(0, index, Ordering::AcqRel, Ordering::Acquire)
|
||||
.unwrap_or_else(|x| x);
|
||||
|
||||
if old_head == 0 {
|
||||
// Success! We are now the head. Set the tail and break.
|
||||
self.tail.store(index, Ordering::Release);
|
||||
break;
|
||||
} else {
|
||||
// Use this head as our current tail and traverse the list.
|
||||
state.update(old_head, Ordering::Acquire);
|
||||
}
|
||||
} else {
|
||||
// The tail's end should be 0. Set our index to it.
|
||||
let tail_slot = self.slots.get(state.tail());
|
||||
let old_tail = tail_slot
|
||||
.next
|
||||
.compare_exchange(0, index, Ordering::AcqRel, Ordering::Acquire)
|
||||
.unwrap_or_else(|x| x);
|
||||
|
||||
if old_tail == 0 {
|
||||
// Success! We are now inserted. Set the tail to our slot number.
|
||||
let _ = self.tail.compare_exchange(
|
||||
state.tail(),
|
||||
index,
|
||||
Ordering::AcqRel,
|
||||
Ordering::Acquire,
|
||||
);
|
||||
break;
|
||||
} else {
|
||||
// The tail is occupied, move onto the next item and continue.
|
||||
state.update(old_tail, Ordering::Acquire);
|
||||
}
|
||||
}
|
||||
|
||||
// We may be trapped here for some time.
|
||||
spin_loop();
|
||||
}
|
||||
|
||||
Listener {
|
||||
index: Some(NonZeroUsize::new(index).unwrap()),
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
|
||||
// T should always be `()`.
|
||||
debug_assert_eq!(mem::size_of::<T>(), 0);
|
||||
debug_assert!(!mem::needs_drop::<T>());
|
||||
|
||||
// Get the count and the slot.
|
||||
let count = notify.count(Internal::new());
|
||||
let notification_slot = if notify.is_additional(Internal::new()) {
|
||||
&self.additional_notify
|
||||
} else {
|
||||
&self.standard_notify
|
||||
};
|
||||
|
||||
// Bump the count.
|
||||
notification_slot.fetch_add(count, Ordering::SeqCst);
|
||||
|
||||
// Try to become the notifier for a while.
|
||||
let mut no_lock = true;
|
||||
for _ in 0..128 {
|
||||
no_lock = self
|
||||
.notifying
|
||||
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
|
||||
.unwrap_or_else(|x| x);
|
||||
if !no_lock {
|
||||
break;
|
||||
}
|
||||
|
||||
core::hint::spin_loop();
|
||||
}
|
||||
|
||||
if no_lock {
|
||||
// We did not capture the lock. The notifier should see the above addition and keep
|
||||
// going.
|
||||
return 0;
|
||||
}
|
||||
|
||||
// We have the lock! Make sure to release it when we're done.
|
||||
let _guard = CallOnDrop(|| self.notifying.store(false, Ordering::Release));
|
||||
|
||||
// Notify the actual wakers.
|
||||
self.notify_locked(count)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn remove(&self, listener: &mut Listener<T>) {
|
||||
let is_additional = match self.take(listener) {
|
||||
NotificationState::Unnotified => return,
|
||||
NotificationState::Notified => false,
|
||||
NotificationState::NotifiedAdditional => true,
|
||||
};
|
||||
|
||||
// Propagate the notification.
|
||||
self.notify(GenericNotify::new(1, is_additional, || unreachable!()));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn discard(&self, listener: &mut Listener<T>) -> bool {
|
||||
!matches!(self.take(listener), NotificationState::Unnotified)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn poll(&self, listener: &mut Listener<T>, cx: &mut Context<'_>) -> Poll<T> {
|
||||
let index = match listener.index {
|
||||
Some(index) => index.get(),
|
||||
None => unreachable!("cannot poll a completed `EventListener` future"),
|
||||
};
|
||||
let slot = self.slots.get(index);
|
||||
|
||||
// Tell whether or not we have been notified.
|
||||
if slot.state.load(Ordering::Acquire) & NOTIFIED != 0 {
|
||||
// We have succeeded!
|
||||
// SAFETY: T should *always* be ()
|
||||
debug_assert_eq!(mem::size_of::<T>(), 0);
|
||||
debug_assert!(!mem::needs_drop::<T>());
|
||||
self.take(listener);
|
||||
Poll::Ready(unsafe { mem::zeroed() })
|
||||
} else {
|
||||
// Register a waker and wait.
|
||||
slot.register(cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn take(&self, listener: &mut Listener<T>) -> NotificationState {
|
||||
let index = match listener.index.take() {
|
||||
Some(x) => x.get(),
|
||||
None => return NotificationState::Unnotified,
|
||||
};
|
||||
let slot = self.slots.get(index);
|
||||
|
||||
// Mark this state as unoccupied.
|
||||
let state = slot.state.fetch_and(!OCCUPIED, Ordering::AcqRel);
|
||||
debug_assert_ne!(state & OCCUPIED, 0);
|
||||
debug_assert_eq!(state & REGISTERING, 0);
|
||||
|
||||
let prev = slot.prev.swap(0, Ordering::AcqRel);
|
||||
let next = slot.next.load(Ordering::Acquire);
|
||||
|
||||
// Unlink from the previous entry.
|
||||
if prev == 0 {
|
||||
self.head.store(next, Ordering::Release);
|
||||
} else {
|
||||
self.slots.get(prev).next.store(next, Ordering::Release);
|
||||
}
|
||||
if next == 0 {
|
||||
self.tail.store(prev, Ordering::Release);
|
||||
} else {
|
||||
self.slots.get(next).prev.store(prev, Ordering::Release);
|
||||
}
|
||||
|
||||
// Free the index and let another user take the slot.
|
||||
self.indexes.free(index);
|
||||
|
||||
match (state & NOTIFIED != 0, state & ADDITIONAL != 0) {
|
||||
(false, _) => NotificationState::Unnotified,
|
||||
(true, false) => NotificationState::Notified,
|
||||
(true, true) => NotificationState::NotifiedAdditional,
|
||||
}
|
||||
}
|
||||
|
||||
fn notify_locked(&self, ret_limit: usize) -> usize {
|
||||
/// Whether to notify as normal or as additional.
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
#[repr(usize)]
|
||||
enum NotifyMethod {
|
||||
Standard = 0,
|
||||
Additional = 1,
|
||||
}
|
||||
|
||||
impl NotifyMethod {
|
||||
fn invert(self) -> Self {
|
||||
match self {
|
||||
Self::Standard => Self::Additional,
|
||||
Self::Additional => Self::Standard,
|
||||
}
|
||||
}
|
||||
|
||||
fn notification_slot<T>(self, inner: &Inner<T>) -> &AtomicUsize {
|
||||
match self {
|
||||
Self::Standard => &inner.standard_notify,
|
||||
Self::Additional => &inner.additional_notify,
|
||||
}
|
||||
}
|
||||
|
||||
/// Use this method to notify the list.
|
||||
fn notify<T>(self, inner: &Inner<T>, mut count: usize) -> usize {
|
||||
let mut notified = 0;
|
||||
let mut cursor = inner.head.load(Ordering::Acquire);
|
||||
let mut prev_cursor = cursor;
|
||||
|
||||
while cursor != 0 && count > 0 {
|
||||
// Get the slot at our cursor.
|
||||
let slot = inner.slots.get(cursor);
|
||||
|
||||
// If the entry is in the progress of being destroyed, try again.
|
||||
let state = slot.state.load(Ordering::Acquire);
|
||||
if state & OCCUPIED == 0x0 {
|
||||
if prev_cursor == cursor {
|
||||
// We've hit a hole, stop notifying.
|
||||
break;
|
||||
} else {
|
||||
// Load the previous next.
|
||||
core::hint::spin_loop();
|
||||
cursor = slot.next.load(Ordering::Acquire);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
prev_cursor = cursor;
|
||||
|
||||
// If the entry is already notified, skip it and decrement the count if needed.
|
||||
if state & NOTIFIED != 0x0 {
|
||||
cursor = slot.next.load(Ordering::Acquire);
|
||||
if let NotifyMethod::Standard = self {
|
||||
count -= 1;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Notify the entry.
|
||||
slot.notify(self == NotifyMethod::Additional);
|
||||
|
||||
// Update counts and move on.
|
||||
count -= 1;
|
||||
notified += 1;
|
||||
cursor = slot.next.load(Ordering::Acquire);
|
||||
}
|
||||
|
||||
notified
|
||||
}
|
||||
}
|
||||
|
||||
/// Make sure we don't miss a notification.
|
||||
struct NotifyState {
|
||||
current: NotifyMethod,
|
||||
progress_made: [bool; 2],
|
||||
}
|
||||
|
||||
impl NotifyState {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
current: NotifyMethod::Standard,
|
||||
progress_made: [true, true],
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn make_progress(&mut self) {
|
||||
self.progress_made[self.current as usize] = true;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn next_method(&mut self) -> Option<NotifyMethod> {
|
||||
// If we have made no progress, break out.
|
||||
if self.progress_made == [false, false] {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Replace our current progress with "false" and switch to the inverted.
|
||||
self.progress_made[self.current as usize] = false;
|
||||
self.current = self.current.invert();
|
||||
|
||||
Some(self.current)
|
||||
}
|
||||
}
|
||||
|
||||
let mut state = NotifyState::new();
|
||||
let mut notified = 0;
|
||||
|
||||
while let Some(method) = state.next_method() {
|
||||
// Move out the count.
|
||||
let count = method.notification_slot(self).swap(0, Ordering::Acquire);
|
||||
if count > 0 {
|
||||
// We've made progress in this state.
|
||||
state.make_progress();
|
||||
|
||||
// Notify `count` entries.
|
||||
notified += method.notify(self, count);
|
||||
}
|
||||
}
|
||||
|
||||
core::cmp::min(notified, ret_limit)
|
||||
}
|
||||
}
|
||||
|
||||
/// Inner implementation of [`EventListener`].
|
||||
pub(crate) struct Listener<T> {
|
||||
/// Index into the listener.
|
||||
index: Option<NonZeroUsize>,
|
||||
|
||||
/// We don't physically hold a `T`.
|
||||
_phantom: PhantomData<Box<T>>,
|
||||
}
|
||||
|
||||
/// Inner listener data.
|
||||
struct Link<T> {
|
||||
/// Next entry in the list.
|
||||
next: AtomicUsize,
|
||||
|
||||
/// Previous entry in the list.
|
||||
prev: AtomicUsize,
|
||||
|
||||
/// State of the link.
|
||||
state: AtomicUsize,
|
||||
|
||||
/// Slot for the waker.
|
||||
waker: Cell<Option<Waker>>,
|
||||
|
||||
/// `T` should always be `()`.
|
||||
_phantom: PhantomData<Box<T>>,
|
||||
}
|
||||
|
||||
impl<T> Link<T> {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
next: AtomicUsize::new(0),
|
||||
prev: AtomicUsize::new(0),
|
||||
state: AtomicUsize::new(NEW),
|
||||
waker: Cell::new(None),
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Notify this particular listener.
|
||||
fn notify(&self, additional: bool) {
|
||||
let mask = if additional {
|
||||
NOTIFYING | NOTIFIED | ADDITIONAL
|
||||
} else {
|
||||
NOTIFYING | NOTIFIED
|
||||
};
|
||||
let old_state = self.state.fetch_or(mask, Ordering::SeqCst);
|
||||
|
||||
// Remove the NOTIFYING flag once we're done.
|
||||
let _guard = CallOnDrop(|| {
|
||||
self.state.fetch_and(!NOTIFYING, Ordering::Release);
|
||||
});
|
||||
|
||||
// Three possibilities here:
|
||||
// - NOTIFIED is set. Someone else beat us to it.
|
||||
// - REGISTERING is not set. In which case we can freely take and wake.
|
||||
// - REGISTERING is set. In which case it will wake the waker.
|
||||
if old_state & NOTIFIED != 0 {
|
||||
// Do nothing.
|
||||
} else if old_state & REGISTERING == 0 {
|
||||
// SAFETY: No one else is fighting for this waker.
|
||||
if let Some(waker) = self.waker.take() {
|
||||
// In case waking the waker takes a while, make sure the slot is open.
|
||||
drop(_guard);
|
||||
waker.wake();
|
||||
}
|
||||
} else {
|
||||
// Do nothing. The task who set REGISTERING will wake the waker.
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a new waker into this listener.
|
||||
fn register(&self, waker: &Waker) {
|
||||
// Set the REGISTERING flag.
|
||||
let old_state = self.state.fetch_or(REGISTERING, Ordering::SeqCst);
|
||||
if old_state & NOTIFIED != 0 {
|
||||
// poll() somehow missed the notification. Wake the event loop and try again.
|
||||
let _guard = CallOnDrop(|| {
|
||||
self.state.fetch_and(!REGISTERING, Ordering::SeqCst);
|
||||
});
|
||||
waker.wake_by_ref();
|
||||
return;
|
||||
}
|
||||
|
||||
// Unset REGISTERING before exiting out.
|
||||
let _guard = CallOnDrop(|| {
|
||||
let old_state = self.state.fetch_and(!REGISTERING, Ordering::SeqCst);
|
||||
if old_state & NOTIFIED != 0 {
|
||||
// There was a notification while we were registering. Wake our waker up.
|
||||
if let Some(waker) = self.waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// SAFETY: We have exclusive access to `self.waker`.
|
||||
match self.waker.take() {
|
||||
Some(w) if waker.will_wake(&w) => {
|
||||
// Put it back.
|
||||
self.waker.set(Some(w));
|
||||
}
|
||||
|
||||
_ => self.waker.set(Some(waker.clone())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Atomically expandable list of slots.
|
||||
struct Slots<T> {
|
||||
/// Buckets in the list.
|
||||
buckets: [AtomicPtr<Link<T>>; BUCKETS],
|
||||
}
|
||||
|
||||
impl<T> Slots<T> {
|
||||
fn new() -> Self {
|
||||
unsafe {
|
||||
// Create an empty array.
|
||||
let mut buckets: [MaybeUninit<AtomicPtr<Link<T>>>; BUCKETS] = {
|
||||
let raw: MaybeUninit<[AtomicPtr<Link<T>>; BUCKETS]> = MaybeUninit::uninit();
|
||||
// SAFETY: MaybeUninit<[T; N]> has the same layout as [MaybeUninit<T>; N]
|
||||
mem::transmute(raw)
|
||||
};
|
||||
|
||||
// Initialize every bucket to null.
|
||||
for bucket in buckets.iter_mut() {
|
||||
*bucket = MaybeUninit::new(AtomicPtr::new(ptr::null_mut()))
|
||||
}
|
||||
|
||||
// SAFETY: The array is now fully initialized.
|
||||
Self {
|
||||
buckets: mem::transmute(buckets),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the slot at the given index.
|
||||
fn get(&self, index: usize) -> &Link<T> {
|
||||
let bucket = self.bucket(index);
|
||||
let slot_index = index_to_slot_index(index);
|
||||
|
||||
&bucket[slot_index]
|
||||
}
|
||||
|
||||
/// Get the bucket for the index.
|
||||
#[inline]
|
||||
fn bucket(&self, index: usize) -> &[Link<T>] {
|
||||
let bucket_index = index_to_bucket(index);
|
||||
let size = bucket_index_to_size(bucket_index);
|
||||
|
||||
// Load the pointer for the bucket.
|
||||
let bucket_ptr = unsafe {
|
||||
// SAFETY: `bucket` will never be less than `BUCKETS`
|
||||
self.buckets.get_unchecked(bucket_index)
|
||||
};
|
||||
let bucket = bucket_ptr.load(Ordering::Acquire);
|
||||
|
||||
// If the bucket doesn't exist already, allocate it.
|
||||
let ptr = match NonNull::new(bucket) {
|
||||
Some(bucket) => bucket,
|
||||
|
||||
None => {
|
||||
let new_bucket = iter::repeat_with(|| Link::<T>::new())
|
||||
.take(size)
|
||||
.collect::<Box<[_]>>();
|
||||
let new_bucket = Box::into_raw(new_bucket) as *mut Link<T>;
|
||||
|
||||
// Try to replace it.
|
||||
let old_bucket = bucket_ptr
|
||||
.compare_exchange(
|
||||
ptr::null_mut(),
|
||||
new_bucket,
|
||||
Ordering::AcqRel,
|
||||
Ordering::Acquire,
|
||||
)
|
||||
.unwrap_or_else(|x| x);
|
||||
|
||||
match NonNull::new(old_bucket) {
|
||||
None => unsafe { NonNull::new_unchecked(new_bucket) },
|
||||
Some(old_bucket) => {
|
||||
// Drop the newly created bucket and use the old one.
|
||||
drop(unsafe { Box::from_raw(slice::from_raw_parts_mut(new_bucket, size)) });
|
||||
old_bucket
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
unsafe { slice::from_raw_parts(ptr.as_ptr(), size) }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Slots<T> {
|
||||
fn drop(&mut self) {
|
||||
// Free every bucket.
|
||||
for (i, bucket) in self.buckets.iter_mut().enumerate() {
|
||||
crate::loom::ptr_with_mut(bucket, |bucket| {
|
||||
let bucket = *bucket;
|
||||
if bucket.is_null() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Drop the bucket.
|
||||
let size = bucket_index_to_size(i);
|
||||
drop(unsafe { Box::from_raw(slice::from_raw_parts_mut(bucket, size)) });
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Available indexes into the list.
|
||||
struct Indexes {
|
||||
/// List of indexes into our list.
|
||||
available: Lock<BinaryHeap<Reverse<usize>>>,
|
||||
|
||||
/// The highest index we've produced so far.
|
||||
last: AtomicUsize,
|
||||
}
|
||||
|
||||
impl Indexes {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
available: Lock::new(BinaryHeap::new()),
|
||||
last: AtomicUsize::new(1),
|
||||
}
|
||||
}
|
||||
|
||||
/// Allocate a new index.
|
||||
fn alloc(&self) -> usize {
|
||||
self.available
|
||||
.access(|available| available.pop().map(|Reverse(x)| x))
|
||||
.flatten()
|
||||
.unwrap_or_else(|| self.last.fetch_add(1, Ordering::SeqCst))
|
||||
}
|
||||
|
||||
/// Free an index.
|
||||
fn free(&self, index: usize) {
|
||||
self.available.access(|available| {
|
||||
available.push(Reverse(index));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// An exclusive lock around some information.
|
||||
struct Lock<T> {
|
||||
/// Whether we are locked.
|
||||
is_locked: AtomicBool,
|
||||
|
||||
/// The information we are guarding.
|
||||
data: UnsafeCell<T>,
|
||||
}
|
||||
|
||||
impl<T> Lock<T> {
|
||||
/// Create a new lock.
|
||||
fn new(data: T) -> Self {
|
||||
Self {
|
||||
is_locked: AtomicBool::new(false),
|
||||
data: UnsafeCell::new(data),
|
||||
}
|
||||
}
|
||||
|
||||
/// Access the underlying data.
|
||||
fn access<R>(&self, f: impl FnOnce(&mut T) -> R) -> Option<R> {
|
||||
// Lock the spinlock.
|
||||
if self
|
||||
.is_locked
|
||||
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
// Restore on drop.
|
||||
let _drop = CallOnDrop(|| self.is_locked.store(false, Ordering::Release));
|
||||
|
||||
// SAFETY: We have exclusive access.
|
||||
Some(cell_with_mut(&self.data, |ptr| f(unsafe { &mut *ptr })))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(loom))]
|
||||
fn cell_with_mut<T, R>(cell: &UnsafeCell<T>, f: impl FnOnce(*mut T) -> R) -> R {
|
||||
f(cell.get())
|
||||
}
|
||||
|
||||
#[cfg(loom)]
|
||||
fn cell_with_mut<T, R>(cell: &UnsafeCell<T>, f: impl FnOnce(*mut T) -> R) -> R {
|
||||
cell.with_mut(f)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn bucket_index_to_size(i: usize) -> usize {
|
||||
1 << i
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn index_to_bucket(i: usize) -> usize {
|
||||
usize::from(usize::BITS as u16) - ((i + 1).leading_zeros() as usize) - 1
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn index_to_slot_index(i: usize) -> usize {
|
||||
let size = bucket_index_to_size(index_to_bucket(i));
|
||||
i - (size - 1)
|
||||
}
|
||||
|
||||
struct CallOnDrop<F: FnMut()>(F);
|
||||
|
||||
impl<F: FnMut()> Drop for CallOnDrop<F> {
|
||||
fn drop(&mut self) {
|
||||
(self.0)();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[cfg(not(miri))]
|
||||
const MAX: usize = 0xFFFF;
|
||||
#[cfg(miri)]
|
||||
const MAX: usize = 0xFF;
|
||||
|
||||
#[test]
|
||||
fn lock() {
|
||||
let lock = Lock::new(());
|
||||
|
||||
let x = lock.access(|()| {
|
||||
assert!(lock.access(|()| unreachable!()).is_none());
|
||||
});
|
||||
assert!(x.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn slots() {
|
||||
let slots = Slots::<()>::new();
|
||||
|
||||
// Don't exhaust our memory; only do this many.
|
||||
for i in 1..MAX {
|
||||
slots.get(i);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn indexes_alloc() {
|
||||
let index = Indexes::new();
|
||||
let mut last = 0;
|
||||
|
||||
for _ in 0..MAX {
|
||||
let val = index.alloc();
|
||||
assert_eq!(val, last + 1);
|
||||
last = val;
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn indexes_realloc() {
|
||||
let index = Indexes::new();
|
||||
|
||||
assert_eq!(index.alloc(), 1);
|
||||
assert_eq!(index.alloc(), 2);
|
||||
assert_eq!(index.alloc(), 3);
|
||||
assert_eq!(index.alloc(), 4);
|
||||
|
||||
index.free(3);
|
||||
index.free(2);
|
||||
|
||||
assert_eq!(index.alloc(), 2);
|
||||
assert_eq!(index.alloc(), 3);
|
||||
assert_eq!(index.alloc(), 5);
|
||||
assert_eq!(index.alloc(), 6);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn link_notify() {
|
||||
let link = Link::<()>::new();
|
||||
let waker = waker_fn::waker_fn(|| ());
|
||||
|
||||
link.register(&waker);
|
||||
assert_eq!(link.state.load(Ordering::SeqCst), NEW);
|
||||
link.notify(false);
|
||||
assert_eq!(link.state.load(Ordering::SeqCst), NOTIFIED);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn link_notify_additional() {
|
||||
let link = Link::<()>::new();
|
||||
let waker = waker_fn::waker_fn(|| ());
|
||||
|
||||
link.register(&waker);
|
||||
assert_eq!(link.state.load(Ordering::SeqCst), NEW);
|
||||
link.notify(true);
|
||||
assert_eq!(link.state.load(Ordering::SeqCst), NOTIFIED | ADDITIONAL);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
//! Linked list of listeners.
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
mod mutex;
|
||||
#[cfg(feature = "std")]
|
||||
pub(crate) use mutex::*;
|
||||
|
||||
#[cfg(not(feature = "std"))]
|
||||
mod lock_free;
|
||||
#[cfg(not(feature = "std"))]
|
||||
pub(crate) use lock_free::*;
|
|
@ -0,0 +1,522 @@
|
|||
//! Implementation of the linked list using standard library mutexes.
|
||||
|
||||
use crate::loom::atomic::{AtomicUsize, Ordering};
|
||||
use crate::loom::cell::Cell;
|
||||
use crate::notify::{GenericNotify, Internal, Notification};
|
||||
|
||||
use std::boxed::Box;
|
||||
use std::fmt;
|
||||
use std::mem;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::ptr::{self, NonNull};
|
||||
use std::sync::{Mutex, MutexGuard, TryLockError};
|
||||
use std::task::{Context, Poll, Waker};
|
||||
use std::thread::{self, Thread};
|
||||
use std::time::Instant;
|
||||
use std::usize;
|
||||
|
||||
/// Inner state of [`Event`].
|
||||
pub(crate) struct Inner<T> {
|
||||
/// The number of notified entries, or `usize::MAX` if all of them have been notified.
|
||||
///
|
||||
/// If there are no entries, this value is set to `usize::MAX`.
|
||||
notified: AtomicUsize,
|
||||
|
||||
/// A linked list holding registered listeners.
|
||||
list: Mutex<List<T>>,
|
||||
|
||||
/// A single cached list entry to avoid allocations on the fast path of the insertion.
|
||||
// TODO: Add ability to use loom::cell::UnsafeCell
|
||||
cache: std::cell::UnsafeCell<Entry<T>>,
|
||||
}
|
||||
|
||||
impl<T> Inner<T> {
|
||||
/// Create a new linked list.
|
||||
pub(crate) fn new() -> Self {
|
||||
Inner {
|
||||
notified: AtomicUsize::new(usize::MAX),
|
||||
list: std::sync::Mutex::new(List::<T> {
|
||||
head: None,
|
||||
tail: None,
|
||||
start: None,
|
||||
len: 0,
|
||||
notified: 0,
|
||||
cache_used: false,
|
||||
}),
|
||||
cache: std::cell::UnsafeCell::new(Entry {
|
||||
state: Cell::new(State::Created),
|
||||
prev: Cell::new(None),
|
||||
next: Cell::new(None),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Debug output.
|
||||
#[inline]
|
||||
pub(crate) fn debug_fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let guard = match self.list.try_lock() {
|
||||
Err(TryLockError::WouldBlock) => {
|
||||
return f
|
||||
.debug_tuple("Event")
|
||||
.field(&format_args!("<locked>"))
|
||||
.finish()
|
||||
}
|
||||
|
||||
Err(TryLockError::Poisoned(err)) => err.into_inner(),
|
||||
|
||||
Ok(lock) => lock,
|
||||
};
|
||||
|
||||
f.debug_struct("Event")
|
||||
.field("listeners_notified", &guard.notified)
|
||||
.field("listeners_total", &guard.len)
|
||||
.finish()
|
||||
}
|
||||
|
||||
/// Tell whether there is enough room to notify this list.
|
||||
#[inline]
|
||||
pub(crate) fn notifyable(&self, limit: usize) -> bool {
|
||||
self.notified.load(Ordering::Acquire) < limit
|
||||
}
|
||||
|
||||
/// Tell if any listeners are currently notified.
|
||||
#[inline]
|
||||
pub(crate) fn is_notified(&self) -> bool {
|
||||
self.notified.load(Ordering::Acquire) > 0
|
||||
}
|
||||
|
||||
/// Get the total number of listeners.
|
||||
#[inline]
|
||||
pub(crate) fn total_listeners(&self) -> usize {
|
||||
self.lock().guard.len
|
||||
}
|
||||
|
||||
/// Create a listener for this linked list.
|
||||
#[cold]
|
||||
pub(crate) fn listen(&self) -> Listener<T> {
|
||||
Listener {
|
||||
entry: Some(self.lock().insert(self.cache_ptr())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Notify the inner list.
|
||||
#[cold]
|
||||
pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
|
||||
self.lock().notify(notify)
|
||||
}
|
||||
|
||||
/// Discard a listener.
|
||||
#[inline]
|
||||
pub(crate) fn discard(&self, listener: &mut Listener<T>) -> bool {
|
||||
// If this listener has never picked up a notification...
|
||||
if let Some(entry) = listener.entry.take() {
|
||||
let mut list = self.lock();
|
||||
// Remove the listener from the list and return `true` if it was notified.
|
||||
if list.remove(entry, self.cache_ptr()).is_notified() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Poll a listener.
|
||||
#[inline]
|
||||
pub(crate) fn poll(&self, listener: &mut Listener<T>, cx: &mut Context<'_>) -> Poll<T> {
|
||||
let mut list = self.lock();
|
||||
|
||||
let entry = match listener.entry {
|
||||
None => unreachable!("cannot poll a completed `EventListener` future"),
|
||||
Some(entry) => entry,
|
||||
};
|
||||
let state = unsafe { &entry.as_ref().state };
|
||||
|
||||
// Do a dummy replace operation in order to take out the state.
|
||||
match take_state(state) {
|
||||
State::Notified { .. } => {
|
||||
// If this listener has been notified, remove it from the list and return.
|
||||
let state = list.remove(entry, self.cache_ptr());
|
||||
drop(list);
|
||||
listener.entry = None;
|
||||
|
||||
match state {
|
||||
State::Notified { tag: Some(tag), .. } => return Poll::Ready(tag),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
State::Created => {
|
||||
// If the listener was just created, put it in the `Polling` state.
|
||||
state.set(State::Polling(cx.waker().clone()));
|
||||
}
|
||||
State::Polling(w) => {
|
||||
// If the listener was in the `Polling` state, update the waker.
|
||||
if w.will_wake(cx.waker()) {
|
||||
state.set(State::Polling(w));
|
||||
} else {
|
||||
state.set(State::Polling(cx.waker().clone()));
|
||||
}
|
||||
}
|
||||
State::Waiting(_) => {
|
||||
unreachable!("cannot poll and wait on `EventListener` at the same time")
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
/// Block on a listener.
|
||||
#[inline]
|
||||
pub(crate) fn wait(&self, listener: &mut Listener<T>, deadline: Option<Instant>) -> Option<T> {
|
||||
// Take out the entry pointer and set it to `None`.
|
||||
let entry = match listener.entry.take() {
|
||||
None => unreachable!("cannot wait twice on an `EventListener`"),
|
||||
Some(entry) => entry,
|
||||
};
|
||||
|
||||
// Set this listener's state to `Waiting`.
|
||||
{
|
||||
let mut list = self.lock();
|
||||
let e = unsafe { entry.as_ref() };
|
||||
|
||||
// Do a dummy replace operation in order to take out the state.
|
||||
match take_state(&e.state) {
|
||||
State::Notified { .. } => {
|
||||
// If this listener has been notified, remove it from the list and return.
|
||||
return list.remove(entry, self.cache_ptr()).take();
|
||||
}
|
||||
// Otherwise, set the state to `Waiting`.
|
||||
_ => e.state.set(State::Waiting(thread::current())),
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until a notification is received or the timeout is reached.
|
||||
loop {
|
||||
match deadline {
|
||||
None => thread::park(),
|
||||
|
||||
Some(deadline) => {
|
||||
// Check for timeout.
|
||||
let now = Instant::now();
|
||||
if now >= deadline {
|
||||
// Remove the entry and check if notified.
|
||||
return self.lock().remove(entry, self.cache_ptr()).take();
|
||||
}
|
||||
|
||||
// Park until the deadline.
|
||||
thread::park_timeout(deadline - now);
|
||||
}
|
||||
}
|
||||
|
||||
let mut list = self.lock();
|
||||
let e = unsafe { entry.as_ref() };
|
||||
|
||||
// Do a dummy replace operation in order to take out the state.
|
||||
match take_state(&e.state) {
|
||||
State::Notified { .. } => {
|
||||
// If this listener has been notified, remove it from the list and return.
|
||||
return list.remove(entry, self.cache_ptr()).take();
|
||||
}
|
||||
// Otherwise, set the state back to `Waiting`.
|
||||
state => e.state.set(state),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop a listener.
|
||||
#[inline]
|
||||
pub(crate) fn remove(&self, listener: &mut Listener<T>) {
|
||||
// If this listener has never picked up a notification...
|
||||
if let Some(entry) = listener.entry.take() {
|
||||
let mut list = self.lock();
|
||||
|
||||
// But if a notification was delivered to it...
|
||||
if let State::Notified {
|
||||
additional,
|
||||
tag: Some(tag),
|
||||
} = list.remove(entry, self.cache_ptr())
|
||||
{
|
||||
let mut tag = Some(tag);
|
||||
|
||||
// Then pass it on to another active listener.
|
||||
list.notify(GenericNotify::new(1, additional, || tag.take().unwrap()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Locks the list.
|
||||
fn lock(&self) -> ListGuard<'_, T> {
|
||||
ListGuard {
|
||||
inner: self,
|
||||
guard: self.list.lock().unwrap_or_else(|e| e.into_inner()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the pointer to the single cached list entry.
|
||||
#[inline(always)]
|
||||
fn cache_ptr(&self) -> NonNull<Entry<T>> {
|
||||
unsafe { NonNull::new_unchecked(self.cache.get()) }
|
||||
}
|
||||
}
|
||||
|
||||
/// Inner state of the `EventListener`.
|
||||
pub(crate) struct Listener<T> {
|
||||
/// A pointer to this listener's entry in the linked list.
|
||||
entry: Option<NonNull<Entry<T>>>,
|
||||
}
|
||||
|
||||
/// A guard holding the linked list locked.
|
||||
struct ListGuard<'a, T> {
|
||||
/// A reference to [`Event`]'s inner state.
|
||||
inner: &'a Inner<T>,
|
||||
|
||||
/// The actual guard that acquired the linked list.
|
||||
guard: MutexGuard<'a, List<T>>,
|
||||
}
|
||||
|
||||
impl<T> Drop for ListGuard<'_, T> {
|
||||
#[inline]
|
||||
fn drop(&mut self) {
|
||||
let list = &mut **self;
|
||||
|
||||
// Update the atomic `notified` counter.
|
||||
let notified = if list.notified < list.len {
|
||||
list.notified
|
||||
} else {
|
||||
usize::MAX
|
||||
};
|
||||
self.inner.notified.store(notified, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Deref for ListGuard<'_, T> {
|
||||
type Target = List<T>;
|
||||
|
||||
#[inline]
|
||||
#[cfg_attr(coverage, coverage(off))]
|
||||
fn deref(&self) -> &List<T> {
|
||||
&self.guard
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DerefMut for ListGuard<'_, T> {
|
||||
#[inline]
|
||||
fn deref_mut(&mut self) -> &mut List<T> {
|
||||
&mut self.guard
|
||||
}
|
||||
}
|
||||
|
||||
/// An entry representing a registered listener.
|
||||
struct Entry<T> {
|
||||
/// THe state of this listener.
|
||||
state: Cell<State<T>>,
|
||||
|
||||
/// Previous entry in the linked list.
|
||||
prev: Cell<Option<NonNull<Entry<T>>>>,
|
||||
|
||||
/// Next entry in the linked list.
|
||||
next: Cell<Option<NonNull<Entry<T>>>>,
|
||||
}
|
||||
|
||||
/// A linked list of entries.
|
||||
struct List<T> {
|
||||
/// First entry in the list.
|
||||
head: Option<NonNull<Entry<T>>>,
|
||||
|
||||
/// Last entry in the list.
|
||||
tail: Option<NonNull<Entry<T>>>,
|
||||
|
||||
/// The first unnotified entry in the list.
|
||||
start: Option<NonNull<Entry<T>>>,
|
||||
|
||||
/// Total number of entries in the list.
|
||||
len: usize,
|
||||
|
||||
/// The number of notified entries in the list.
|
||||
notified: usize,
|
||||
|
||||
/// Whether the cached entry is used.
|
||||
cache_used: bool,
|
||||
}
|
||||
|
||||
impl<T> List<T> {
|
||||
/// Inserts a new entry into the list.
|
||||
fn insert(&mut self, cache: NonNull<Entry<T>>) -> NonNull<Entry<T>> {
|
||||
unsafe {
|
||||
let entry = Entry {
|
||||
state: Cell::new(State::Created),
|
||||
prev: Cell::new(self.tail),
|
||||
next: Cell::new(None),
|
||||
};
|
||||
|
||||
let entry = if self.cache_used {
|
||||
// Allocate an entry that is going to become the new tail.
|
||||
NonNull::new_unchecked(Box::into_raw(Box::new(entry)))
|
||||
} else {
|
||||
// No need to allocate - we can use the cached entry.
|
||||
self.cache_used = true;
|
||||
cache.as_ptr().write(entry);
|
||||
cache
|
||||
};
|
||||
|
||||
// Replace the tail with the new entry.
|
||||
match mem::replace(&mut self.tail, Some(entry)) {
|
||||
None => self.head = Some(entry),
|
||||
Some(t) => t.as_ref().next.set(Some(entry)),
|
||||
}
|
||||
|
||||
// If there were no unnotified entries, this one is the first now.
|
||||
if self.start.is_none() {
|
||||
self.start = self.tail;
|
||||
}
|
||||
|
||||
// Bump the entry count.
|
||||
self.len += 1;
|
||||
|
||||
entry
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes an entry from the list and returns its state.
|
||||
fn remove(&mut self, entry: NonNull<Entry<T>>, cache: NonNull<Entry<T>>) -> State<T> {
|
||||
unsafe {
|
||||
let prev = entry.as_ref().prev.get();
|
||||
let next = entry.as_ref().next.get();
|
||||
|
||||
// Unlink from the previous entry.
|
||||
match prev {
|
||||
None => self.head = next,
|
||||
Some(p) => p.as_ref().next.set(next),
|
||||
}
|
||||
|
||||
// Unlink from the next entry.
|
||||
match next {
|
||||
None => self.tail = prev,
|
||||
Some(n) => n.as_ref().prev.set(prev),
|
||||
}
|
||||
|
||||
// If this was the first unnotified entry, move the pointer to the next one.
|
||||
if self.start == Some(entry) {
|
||||
self.start = next;
|
||||
}
|
||||
|
||||
// Extract the state.
|
||||
let state = if ptr::eq(entry.as_ptr(), cache.as_ptr()) {
|
||||
// Free the cached entry.
|
||||
self.cache_used = false;
|
||||
entry.as_ref().state.replace(State::Created)
|
||||
} else {
|
||||
// Deallocate the entry.
|
||||
Box::from_raw(entry.as_ptr()).state.replace(State::Created)
|
||||
};
|
||||
|
||||
// Update the counters.
|
||||
if state.is_notified() {
|
||||
self.notified -= 1;
|
||||
}
|
||||
self.len -= 1;
|
||||
|
||||
state
|
||||
}
|
||||
}
|
||||
|
||||
/// Notifies a number of entries.
|
||||
#[cold]
|
||||
fn notify(&mut self, mut notify: impl Notification<Tag = T>) -> usize {
|
||||
let mut n = notify.count(Internal::new());
|
||||
|
||||
if !notify.is_additional(Internal::new()) {
|
||||
if n <= self.notified {
|
||||
return 0;
|
||||
}
|
||||
n -= self.notified;
|
||||
}
|
||||
|
||||
let count = n;
|
||||
|
||||
while n > 0 {
|
||||
n -= 1;
|
||||
|
||||
// Notify the first unnotified entry.
|
||||
match self.start {
|
||||
None => return count - n - 1,
|
||||
Some(e) => {
|
||||
// Get the entry and move the pointer forward.
|
||||
let e = unsafe { e.as_ref() };
|
||||
self.start = e.next.get();
|
||||
|
||||
// Set the state of this entry to `Notified` and notify.
|
||||
match e.state.replace(State::Notified {
|
||||
additional: notify.is_additional(Internal::new()),
|
||||
tag: Some(notify.next_tag(Internal::new())),
|
||||
}) {
|
||||
State::Notified { .. } => {}
|
||||
State::Created => {}
|
||||
State::Polling(w) => w.wake(),
|
||||
State::Waiting(t) => t.unpark(),
|
||||
}
|
||||
|
||||
// Update the counter.
|
||||
self.notified += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
count - n
|
||||
}
|
||||
}
|
||||
|
||||
/// The state of a listener.
|
||||
enum State<T> {
|
||||
/// It has just been created.
|
||||
Created,
|
||||
|
||||
/// It has received a notification.
|
||||
Notified {
|
||||
/// This is `true` if this was an "additional" notification.
|
||||
additional: bool,
|
||||
|
||||
/// The tag associated with this event.
|
||||
tag: Option<T>,
|
||||
},
|
||||
|
||||
/// An async task is polling it.
|
||||
Polling(Waker),
|
||||
|
||||
/// A thread is blocked on it.
|
||||
Waiting(Thread),
|
||||
}
|
||||
|
||||
impl<T> State<T> {
|
||||
/// Returns `true` if this is the `Notified` state.
|
||||
#[inline]
|
||||
fn is_notified(&self) -> bool {
|
||||
match self {
|
||||
State::Notified { .. } => true,
|
||||
State::Created | State::Polling(_) | State::Waiting(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Take the tag out if it exists.
|
||||
#[inline]
|
||||
fn take(&mut self) -> Option<T> {
|
||||
match self {
|
||||
State::Notified { tag, .. } => Some(tag.take().expect("tag taken twice")),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Take the `State` out of the slot without moving out the tag.
|
||||
#[inline]
|
||||
fn take_state<T>(state: &Cell<State<T>>) -> State<T> {
|
||||
match state.replace(State::Notified {
|
||||
additional: false,
|
||||
tag: None,
|
||||
}) {
|
||||
State::Notified { additional, tag } => {
|
||||
// Replace the tag so remove() can take it out.
|
||||
state.replace(State::Notified { additional, tag })
|
||||
}
|
||||
|
||||
state => state,
|
||||
}
|
||||
}
|
1434
src/no_std.rs
1434
src/no_std.rs
File diff suppressed because it is too large
Load Diff
|
@ -1,249 +0,0 @@
|
|||
//! An operation that can be delayed.
|
||||
|
||||
//! The node that makes up queues.
|
||||
|
||||
use crate::notify::{GenericNotify, Internal, NotificationPrivate, TagProducer};
|
||||
use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
|
||||
use crate::sync::Arc;
|
||||
use crate::sys::ListenerSlab;
|
||||
use crate::{State, Task};
|
||||
|
||||
use alloc::boxed::Box;
|
||||
|
||||
use core::fmt;
|
||||
use core::marker::PhantomData;
|
||||
use core::mem;
|
||||
use core::num::NonZeroUsize;
|
||||
use core::ptr;
|
||||
|
||||
pub(crate) struct NothingProducer<T>(PhantomData<T>);
|
||||
|
||||
impl<T> Default for NothingProducer<T> {
|
||||
fn default() -> Self {
|
||||
Self(PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> fmt::Debug for NothingProducer<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("NothingProducer").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> TagProducer for NothingProducer<T> {
|
||||
type Tag = T;
|
||||
|
||||
fn next_tag(&mut self) -> Self::Tag {
|
||||
// This has to be a zero-sized type with no drop handler.
|
||||
assert_eq!(mem::size_of::<Self::Tag>(), 0);
|
||||
assert!(!mem::needs_drop::<Self::Tag>());
|
||||
|
||||
// SAFETY: As this is a ZST without a drop handler, zero is valid.
|
||||
unsafe { mem::zeroed() }
|
||||
}
|
||||
}
|
||||
|
||||
/// A node in the backup queue.
|
||||
pub(crate) enum Node<T> {
|
||||
/// This node is requesting to add a listener.
|
||||
// For some reason, the MSRV build says this variant is never constructed.
|
||||
#[allow(dead_code)]
|
||||
AddListener {
|
||||
/// The state of the listener that wants to be added.
|
||||
task_waiting: Arc<TaskWaiting>,
|
||||
},
|
||||
|
||||
/// This node is notifying a listener.
|
||||
Notify(GenericNotify<NothingProducer<T>>),
|
||||
|
||||
/// This node is removing a listener.
|
||||
RemoveListener {
|
||||
/// The ID of the listener to remove.
|
||||
listener: NonZeroUsize,
|
||||
|
||||
/// Whether to propagate notifications to the next listener.
|
||||
propagate: bool,
|
||||
},
|
||||
|
||||
/// We are waiting for the mutex to lock, so they can manipulate it.
|
||||
Waiting(Task),
|
||||
}
|
||||
|
||||
impl<T> fmt::Debug for Node<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::AddListener { .. } => f.write_str("AddListener"),
|
||||
Self::Notify(notify) => f
|
||||
.debug_struct("Notify")
|
||||
.field("count", ¬ify.count(Internal::new()))
|
||||
.field("is_additional", ¬ify.is_additional(Internal::new()))
|
||||
.finish(),
|
||||
Self::RemoveListener {
|
||||
listener,
|
||||
propagate,
|
||||
} => f
|
||||
.debug_struct("RemoveListener")
|
||||
.field("listener", listener)
|
||||
.field("propagate", propagate)
|
||||
.finish(),
|
||||
Self::Waiting(_) => f.write_str("Waiting"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TaskWaiting {
|
||||
/// The task that is being waited on.
|
||||
task: AtomicCell<Task>,
|
||||
|
||||
/// The ID of the new entry.
|
||||
///
|
||||
/// This is set to zero when the task is still queued, or usize::MAX when the node should not
|
||||
/// be added at all.
|
||||
entry_id: AtomicUsize,
|
||||
}
|
||||
|
||||
impl<T> Node<T> {
|
||||
pub(crate) fn listener() -> (Self, Arc<TaskWaiting>) {
|
||||
// Create a new `TaskWaiting` structure.
|
||||
let task_waiting = Arc::new(TaskWaiting {
|
||||
task: AtomicCell::new(),
|
||||
entry_id: AtomicUsize::new(0),
|
||||
});
|
||||
|
||||
(
|
||||
Self::AddListener {
|
||||
task_waiting: task_waiting.clone(),
|
||||
},
|
||||
task_waiting,
|
||||
)
|
||||
}
|
||||
|
||||
/// Apply the node to the list.
|
||||
pub(super) fn apply(self, list: &mut ListenerSlab<T>) -> Option<Task> {
|
||||
match self {
|
||||
Node::AddListener { task_waiting } => {
|
||||
// If we're cancelled, do nothing.
|
||||
if task_waiting.entry_id.load(Ordering::Relaxed) == usize::MAX {
|
||||
return task_waiting.task.take().map(|t| *t);
|
||||
}
|
||||
|
||||
// Add a new entry to the list.
|
||||
let key = list.insert(State::Created);
|
||||
assert!(key.get() != usize::MAX);
|
||||
|
||||
// Send the new key to the listener and wake it if necessary.
|
||||
let old_value = task_waiting.entry_id.swap(key.get(), Ordering::Release);
|
||||
|
||||
// If we're cancelled, remove ourselves from the list.
|
||||
if old_value == usize::MAX {
|
||||
list.remove(key, false);
|
||||
}
|
||||
|
||||
return task_waiting.task.take().map(|t| *t);
|
||||
}
|
||||
Node::Notify(notify) => {
|
||||
// Notify the next `count` listeners.
|
||||
list.notify(notify);
|
||||
}
|
||||
Node::RemoveListener {
|
||||
listener,
|
||||
propagate,
|
||||
} => {
|
||||
// Remove the listener from the list.
|
||||
list.remove(listener, propagate);
|
||||
}
|
||||
Node::Waiting(task) => {
|
||||
return Some(task);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl TaskWaiting {
|
||||
/// Determine if we are still queued.
|
||||
///
|
||||
/// Returns `Some` with the entry ID if we are no longer queued.
|
||||
pub(crate) fn status(&self) -> Option<NonZeroUsize> {
|
||||
NonZeroUsize::new(self.entry_id.load(Ordering::Acquire))
|
||||
}
|
||||
|
||||
/// Register a listener.
|
||||
pub(crate) fn register(&self, task: Task) {
|
||||
// Set the task.
|
||||
if let Some(task) = self.task.replace(Some(Box::new(task))) {
|
||||
task.wake();
|
||||
}
|
||||
|
||||
// If the entry ID is non-zero, then we are no longer queued.
|
||||
if self.status().is_some() {
|
||||
// Wake the task.
|
||||
if let Some(task) = self.task.take() {
|
||||
task.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Mark this listener as cancelled, indicating that it should not be inserted into the list.
|
||||
///
|
||||
/// If this listener was already inserted into the list, returns the entry ID. Otherwise returns
|
||||
/// `None`.
|
||||
pub(crate) fn cancel(&self) -> Option<NonZeroUsize> {
|
||||
// Set the entry ID to usize::MAX.
|
||||
let id = self.entry_id.swap(usize::MAX, Ordering::Release);
|
||||
|
||||
// Wake the task.
|
||||
if let Some(task) = self.task.take() {
|
||||
task.wake();
|
||||
}
|
||||
|
||||
// Return the entry ID if we were queued.
|
||||
NonZeroUsize::new(id)
|
||||
}
|
||||
}
|
||||
|
||||
/// A shared pointer to a value.
|
||||
///
|
||||
/// The inner value is a `Box<T>`.
|
||||
#[derive(Debug)]
|
||||
struct AtomicCell<T>(AtomicPtr<T>);
|
||||
|
||||
impl<T> AtomicCell<T> {
|
||||
/// Create a new `AtomicCell`.
|
||||
fn new() -> Self {
|
||||
Self(AtomicPtr::new(ptr::null_mut()))
|
||||
}
|
||||
|
||||
/// Swap the value out.
|
||||
fn replace(&self, value: Option<Box<T>>) -> Option<Box<T>> {
|
||||
let old_value = match value {
|
||||
Some(value) => self.0.swap(Box::into_raw(value), Ordering::AcqRel),
|
||||
// Acquire is needed to synchronize with the store of a non-null ptr, but since a null ptr
|
||||
// will never be dereferenced, there is no need to synchronize the store of a null ptr.
|
||||
None => self.0.swap(ptr::null_mut(), Ordering::Acquire),
|
||||
};
|
||||
|
||||
if old_value.is_null() {
|
||||
None
|
||||
} else {
|
||||
// SAFETY:
|
||||
// - AcqRel/Acquire ensures that it does not read a pointer to potentially invalid memory.
|
||||
// - We've checked that old_value is not null.
|
||||
// - We do not store invalid pointers other than null in self.0.
|
||||
Some(unsafe { Box::from_raw(old_value) })
|
||||
}
|
||||
}
|
||||
|
||||
/// Take the value out.
|
||||
fn take(&self) -> Option<Box<T>> {
|
||||
self.replace(None)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for AtomicCell<T> {
|
||||
fn drop(&mut self) {
|
||||
self.take();
|
||||
}
|
||||
}
|
|
@ -1,9 +1,10 @@
|
|||
//! The `Notification` trait for specifying notification.
|
||||
|
||||
use crate::sync::atomic::{self, Ordering};
|
||||
#[cfg(feature = "std")]
|
||||
use core::fmt;
|
||||
|
||||
use crate::loom::atomic::{self, Ordering};
|
||||
|
||||
pub(crate) use __private::Internal;
|
||||
|
||||
/// The type of notification to use with an [`Event`].
|
||||
|
@ -215,16 +216,8 @@ pub struct TagWith<N: ?Sized, F> {
|
|||
#[cfg(feature = "std")]
|
||||
impl<N: fmt::Debug, F> fmt::Debug for TagWith<N, F> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
struct Ellipses;
|
||||
|
||||
impl fmt::Debug for Ellipses {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.write_str("..")
|
||||
}
|
||||
}
|
||||
|
||||
f.debug_struct("TagWith")
|
||||
.field("tag", &Ellipses)
|
||||
.field("tag", &format_args!(".."))
|
||||
.field("inner", &self.inner)
|
||||
.finish()
|
||||
}
|
||||
|
@ -342,7 +335,7 @@ impl<T, F: FnMut() -> T> TagProducer for F {
|
|||
/// into this:
|
||||
///
|
||||
/// ```
|
||||
/// use event_listener::{Event, IntoNotification, Listener};
|
||||
/// use event_listener::{Event, IntoNotification};
|
||||
///
|
||||
/// let event = Event::new();
|
||||
///
|
||||
|
@ -406,7 +399,7 @@ pub trait IntoNotification: __private::Sealed {
|
|||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use event_listener::{Event, IntoNotification, Listener};
|
||||
/// use event_listener::{Event, IntoNotification};
|
||||
///
|
||||
/// let event = Event::new();
|
||||
///
|
||||
|
@ -442,7 +435,7 @@ pub trait IntoNotification: __private::Sealed {
|
|||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use event_listener::{Event, IntoNotification, Listener};
|
||||
/// use event_listener::{Event, IntoNotification};
|
||||
/// use std::sync::atomic::{self, Ordering};
|
||||
///
|
||||
/// let event = Event::new();
|
||||
|
@ -483,7 +476,7 @@ pub trait IntoNotification: __private::Sealed {
|
|||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use event_listener::{IntoNotification, Listener, Event};
|
||||
/// use event_listener::{IntoNotification, Event, Listener};
|
||||
///
|
||||
/// let event = Event::<bool>::with_tag();
|
||||
///
|
||||
|
@ -517,7 +510,7 @@ pub trait IntoNotification: __private::Sealed {
|
|||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use event_listener::{IntoNotification, Listener, Event};
|
||||
/// use event_listener::{IntoNotification, Event, Listener};
|
||||
///
|
||||
/// let event = Event::<bool>::with_tag();
|
||||
///
|
||||
|
@ -620,3 +613,13 @@ mod __private {
|
|||
pub trait Sealed {}
|
||||
impl<N: super::NotificationPrivate + ?Sized> Sealed for N {}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn negative_notification_should_panic() {
|
||||
let event = crate::Event::new();
|
||||
event.notify(-1);
|
||||
}
|
||||
}
|
||||
|
|
397
src/std.rs
397
src/std.rs
|
@ -1,397 +0,0 @@
|
|||
//! libstd-based implementation of `event-listener`.
|
||||
//!
|
||||
//! This implementation crates an intrusive linked list of listeners.
|
||||
|
||||
use crate::notify::{GenericNotify, Internal, Notification};
|
||||
use crate::sync::atomic::Ordering;
|
||||
use crate::sync::cell::{Cell, UnsafeCell};
|
||||
use crate::sync::{Mutex, MutexGuard};
|
||||
use crate::{RegisterResult, State, TaskRef};
|
||||
|
||||
use core::marker::PhantomPinned;
|
||||
use core::mem;
|
||||
use core::ops::{Deref, DerefMut};
|
||||
use core::pin::Pin;
|
||||
use core::ptr::NonNull;
|
||||
|
||||
pub(super) struct List<T>(Mutex<Inner<T>>);
|
||||
|
||||
struct Inner<T> {
|
||||
/// The head of the linked list.
|
||||
head: Option<NonNull<Link<T>>>,
|
||||
|
||||
/// The tail of the linked list.
|
||||
tail: Option<NonNull<Link<T>>>,
|
||||
|
||||
/// The first unnotified listener.
|
||||
next: Option<NonNull<Link<T>>>,
|
||||
|
||||
/// Total number of listeners.
|
||||
len: usize,
|
||||
|
||||
/// The number of notified listeners.
|
||||
notified: usize,
|
||||
}
|
||||
|
||||
impl<T> List<T> {
|
||||
/// Create a new, empty event listener list.
|
||||
pub(super) fn new() -> Self {
|
||||
Self(Mutex::new(Inner {
|
||||
head: None,
|
||||
tail: None,
|
||||
next: None,
|
||||
len: 0,
|
||||
notified: 0,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Get the total number of listeners without blocking.
|
||||
pub(crate) fn try_total_listeners(&self) -> Option<usize> {
|
||||
self.0.try_lock().ok().map(|list| list.len)
|
||||
}
|
||||
|
||||
/// Get the total number of listeners with blocking.
|
||||
pub(crate) fn total_listeners(&self) -> usize {
|
||||
self.0.lock().unwrap_or_else(|e| e.into_inner()).len
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> crate::Inner<T> {
|
||||
fn lock(&self) -> ListLock<'_, '_, T> {
|
||||
ListLock {
|
||||
inner: self,
|
||||
lock: self.list.0.lock().unwrap_or_else(|e| e.into_inner()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether or not this number of listeners would lead to a notification.
|
||||
pub(crate) fn needs_notification(&self, limit: usize) -> bool {
|
||||
self.notified.load(Ordering::Acquire) < limit
|
||||
}
|
||||
|
||||
/// Add a new listener to the list.
|
||||
pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
|
||||
let mut inner = self.lock();
|
||||
|
||||
listener.as_mut().set(Some(Listener {
|
||||
link: UnsafeCell::new(Link {
|
||||
state: Cell::new(State::Created),
|
||||
prev: Cell::new(inner.tail),
|
||||
next: Cell::new(None),
|
||||
}),
|
||||
_pin: PhantomPinned,
|
||||
}));
|
||||
let listener = listener.as_pin_mut().unwrap();
|
||||
|
||||
{
|
||||
let entry_guard = listener.link.get();
|
||||
// SAFETY: We are locked, so we can access the inner `link`.
|
||||
let entry = unsafe { entry_guard.deref() };
|
||||
|
||||
// Replace the tail with the new entry.
|
||||
match mem::replace(&mut inner.tail, Some(entry.into())) {
|
||||
None => inner.head = Some(entry.into()),
|
||||
Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
|
||||
};
|
||||
}
|
||||
|
||||
// If there are no unnotified entries, this is the first one.
|
||||
if inner.next.is_none() {
|
||||
inner.next = inner.tail;
|
||||
}
|
||||
|
||||
// Bump the entry count.
|
||||
inner.len += 1;
|
||||
}
|
||||
|
||||
/// Remove a listener from the list.
|
||||
pub(crate) fn remove(
|
||||
&self,
|
||||
listener: Pin<&mut Option<Listener<T>>>,
|
||||
propagate: bool,
|
||||
) -> Option<State<T>> {
|
||||
self.lock().remove(listener, propagate)
|
||||
}
|
||||
|
||||
/// Notifies a number of entries.
|
||||
#[cold]
|
||||
pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
|
||||
self.lock().notify(notify)
|
||||
}
|
||||
|
||||
/// Register a task to be notified when the event is triggered.
|
||||
///
|
||||
/// Returns `true` if the listener was already notified, and `false` otherwise. If the listener
|
||||
/// isn't inserted, returns `None`.
|
||||
pub(crate) fn register(
|
||||
&self,
|
||||
mut listener: Pin<&mut Option<Listener<T>>>,
|
||||
task: TaskRef<'_>,
|
||||
) -> RegisterResult<T> {
|
||||
let mut inner = self.lock();
|
||||
let entry_guard = match listener.as_mut().as_pin_mut() {
|
||||
Some(listener) => listener.link.get(),
|
||||
None => return RegisterResult::NeverInserted,
|
||||
};
|
||||
// SAFETY: We are locked, so we can access the inner `link`.
|
||||
let entry = unsafe { entry_guard.deref() };
|
||||
|
||||
// Take out the state and check it.
|
||||
match entry.state.replace(State::NotifiedTaken) {
|
||||
State::Notified { tag, .. } => {
|
||||
// We have been notified, remove the listener.
|
||||
inner.remove(listener, false);
|
||||
RegisterResult::Notified(tag)
|
||||
}
|
||||
|
||||
State::Task(other_task) => {
|
||||
// Only replace the task if it's different.
|
||||
entry.state.set(State::Task({
|
||||
if !task.will_wake(other_task.as_task_ref()) {
|
||||
task.into_task()
|
||||
} else {
|
||||
other_task
|
||||
}
|
||||
}));
|
||||
|
||||
RegisterResult::Registered
|
||||
}
|
||||
|
||||
_ => {
|
||||
// We have not been notified, register the task.
|
||||
entry.state.set(State::Task(task.into_task()));
|
||||
RegisterResult::Registered
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Inner<T> {
|
||||
fn remove(
|
||||
&mut self,
|
||||
mut listener: Pin<&mut Option<Listener<T>>>,
|
||||
propagate: bool,
|
||||
) -> Option<State<T>> {
|
||||
let entry_guard = listener.as_mut().as_pin_mut()?.link.get();
|
||||
let entry = unsafe { entry_guard.deref() };
|
||||
|
||||
let prev = entry.prev.get();
|
||||
let next = entry.next.get();
|
||||
|
||||
// Unlink from the previous entry.
|
||||
match prev {
|
||||
None => self.head = next,
|
||||
Some(p) => unsafe {
|
||||
p.as_ref().next.set(next);
|
||||
},
|
||||
}
|
||||
|
||||
// Unlink from the next entry.
|
||||
match next {
|
||||
None => self.tail = prev,
|
||||
Some(n) => unsafe {
|
||||
n.as_ref().prev.set(prev);
|
||||
},
|
||||
}
|
||||
|
||||
// If this was the first unnotified entry, update the next pointer.
|
||||
if self.next == Some(entry.into()) {
|
||||
self.next = next;
|
||||
}
|
||||
|
||||
// The entry is now fully unlinked, so we can now take it out safely.
|
||||
let entry = unsafe {
|
||||
listener
|
||||
.get_unchecked_mut()
|
||||
.take()
|
||||
.unwrap()
|
||||
.link
|
||||
.into_inner()
|
||||
};
|
||||
|
||||
// This State::Created is immediately dropped and exists as a workaround for the absence of
|
||||
// loom::cell::Cell::into_inner. The intent is `let mut state = entry.state.into_inner();`
|
||||
//
|
||||
// refs: https://github.com/tokio-rs/loom/pull/341
|
||||
let mut state = entry.state.replace(State::Created);
|
||||
|
||||
// Update the notified count.
|
||||
if state.is_notified() {
|
||||
self.notified -= 1;
|
||||
|
||||
if propagate {
|
||||
let state = mem::replace(&mut state, State::NotifiedTaken);
|
||||
if let State::Notified { additional, tag } = state {
|
||||
let tags = {
|
||||
let mut tag = Some(tag);
|
||||
move || tag.take().expect("tag already taken")
|
||||
};
|
||||
self.notify(GenericNotify::new(1, additional, tags));
|
||||
}
|
||||
}
|
||||
}
|
||||
self.len -= 1;
|
||||
|
||||
Some(state)
|
||||
}
|
||||
|
||||
#[cold]
|
||||
fn notify(&mut self, mut notify: impl Notification<Tag = T>) -> usize {
|
||||
let mut n = notify.count(Internal::new());
|
||||
let is_additional = notify.is_additional(Internal::new());
|
||||
|
||||
if !is_additional {
|
||||
if n < self.notified {
|
||||
return 0;
|
||||
}
|
||||
n -= self.notified;
|
||||
}
|
||||
|
||||
let original_count = n;
|
||||
while n > 0 {
|
||||
n -= 1;
|
||||
|
||||
// Notify the next entry.
|
||||
match self.next {
|
||||
None => return original_count - n - 1,
|
||||
|
||||
Some(e) => {
|
||||
// Get the entry and move the pointer forwards.
|
||||
let entry = unsafe { e.as_ref() };
|
||||
self.next = entry.next.get();
|
||||
|
||||
// Set the state to `Notified` and notify.
|
||||
let tag = notify.next_tag(Internal::new());
|
||||
if let State::Task(task) = entry.state.replace(State::Notified {
|
||||
additional: is_additional,
|
||||
tag,
|
||||
}) {
|
||||
task.wake();
|
||||
}
|
||||
|
||||
// Bump the notified count.
|
||||
self.notified += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
original_count - n
|
||||
}
|
||||
}
|
||||
|
||||
struct ListLock<'a, 'b, T> {
|
||||
lock: MutexGuard<'a, Inner<T>>,
|
||||
inner: &'b crate::Inner<T>,
|
||||
}
|
||||
|
||||
impl<T> Deref for ListLock<'_, '_, T> {
|
||||
type Target = Inner<T>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.lock
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DerefMut for ListLock<'_, '_, T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.lock
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for ListLock<'_, '_, T> {
|
||||
fn drop(&mut self) {
|
||||
let list = &mut **self;
|
||||
|
||||
// Update the notified count.
|
||||
let notified = if list.notified < list.len {
|
||||
list.notified
|
||||
} else {
|
||||
core::usize::MAX
|
||||
};
|
||||
|
||||
self.inner.notified.store(notified, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct Listener<T> {
|
||||
/// The inner link in the linked list.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// This can only be accessed while the central mutex is locked.
|
||||
link: UnsafeCell<Link<T>>,
|
||||
|
||||
/// This listener cannot be moved after being pinned.
|
||||
_pin: PhantomPinned,
|
||||
}
|
||||
|
||||
struct Link<T> {
|
||||
/// The current state of the listener.
|
||||
state: Cell<State<T>>,
|
||||
|
||||
/// The previous link in the linked list.
|
||||
prev: Cell<Option<NonNull<Link<T>>>>,
|
||||
|
||||
/// The next link in the linked list.
|
||||
next: Cell<Option<NonNull<Link<T>>>>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures_lite::pin;
|
||||
|
||||
#[cfg(target_family = "wasm")]
|
||||
use wasm_bindgen_test::wasm_bindgen_test as test;
|
||||
|
||||
macro_rules! make_listeners {
|
||||
($($id:ident),*) => {
|
||||
$(
|
||||
let $id = Option::<Listener<()>>::None;
|
||||
pin!($id);
|
||||
)*
|
||||
};
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn insert() {
|
||||
let inner = crate::Inner::new();
|
||||
make_listeners!(listen1, listen2, listen3);
|
||||
|
||||
// Register the listeners.
|
||||
inner.insert(listen1.as_mut());
|
||||
inner.insert(listen2.as_mut());
|
||||
inner.insert(listen3.as_mut());
|
||||
|
||||
assert_eq!(inner.lock().len, 3);
|
||||
|
||||
// Remove one.
|
||||
assert_eq!(inner.remove(listen2, false), Some(State::Created));
|
||||
assert_eq!(inner.lock().len, 2);
|
||||
|
||||
// Remove another.
|
||||
assert_eq!(inner.remove(listen1, false), Some(State::Created));
|
||||
assert_eq!(inner.lock().len, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drop_non_notified() {
|
||||
let inner = crate::Inner::new();
|
||||
make_listeners!(listen1, listen2, listen3);
|
||||
|
||||
// Register the listeners.
|
||||
inner.insert(listen1.as_mut());
|
||||
inner.insert(listen2.as_mut());
|
||||
inner.insert(listen3.as_mut());
|
||||
|
||||
// Notify one.
|
||||
inner.notify(GenericNotify::new(1, false, || ()));
|
||||
|
||||
// Remove one.
|
||||
inner.remove(listen3, true);
|
||||
|
||||
// Remove the rest.
|
||||
inner.remove(listen1, true);
|
||||
inner.remove(listen2, true);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,133 @@
|
|||
//! A simple test case using a "counter" type.
|
||||
|
||||
use event_listener::Event;
|
||||
use futures_lite::future::{block_on, poll_once};
|
||||
|
||||
use std::sync::atomic::{fence, AtomicUsize, Ordering};
|
||||
use std::sync::{mpsc, Arc, Barrier};
|
||||
use std::thread;
|
||||
|
||||
struct Counter {
|
||||
counter: AtomicUsize,
|
||||
|
||||
/// Signalled once `counter` has been changed.
|
||||
changed: Event,
|
||||
}
|
||||
|
||||
impl Counter {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
counter: AtomicUsize::new(0),
|
||||
changed: Event::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for the counter to be incremented.
|
||||
async fn change(&self) -> usize {
|
||||
let original = self.counter.load(Ordering::Acquire);
|
||||
let mut current = original;
|
||||
|
||||
loop {
|
||||
if current != original {
|
||||
return current;
|
||||
}
|
||||
|
||||
// Start listening.
|
||||
let listener = self.changed.listen();
|
||||
|
||||
// Try again.
|
||||
current = self.counter.load(Ordering::Acquire);
|
||||
if current != original {
|
||||
return current;
|
||||
}
|
||||
|
||||
// Wait for a change to be notified.
|
||||
listener.await;
|
||||
|
||||
// Update the counter.
|
||||
current = self.counter.load(Ordering::Acquire);
|
||||
}
|
||||
}
|
||||
|
||||
/// Increment the counter.
|
||||
fn increment(&self) {
|
||||
self.counter.fetch_add(1, Ordering::Relaxed);
|
||||
self.changed.notify_additional(usize::MAX);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn counter() {
|
||||
let counter = Arc::new(Counter::new());
|
||||
let (send, recv) = mpsc::channel();
|
||||
|
||||
thread::spawn({
|
||||
let counter = counter.clone();
|
||||
move || {
|
||||
// Test normal.
|
||||
recv.recv().unwrap();
|
||||
counter.increment();
|
||||
|
||||
// Test relaxed.
|
||||
recv.recv().unwrap();
|
||||
counter.counter.fetch_add(1, Ordering::Relaxed);
|
||||
fence(Ordering::SeqCst);
|
||||
counter.changed.notify_additional_relaxed(usize::MAX);
|
||||
counter.changed.notify_additional_relaxed(usize::MAX);
|
||||
}
|
||||
});
|
||||
|
||||
thread::spawn(move || {
|
||||
let waiter = counter.change();
|
||||
futures_lite::pin!(waiter);
|
||||
|
||||
assert!(block_on(poll_once(waiter.as_mut())).is_none());
|
||||
send.send(()).unwrap();
|
||||
assert_eq!(block_on(waiter), 1);
|
||||
|
||||
let waiter1 = counter.change();
|
||||
let waiter2 = counter.change();
|
||||
futures_lite::pin!(waiter1);
|
||||
futures_lite::pin!(waiter2);
|
||||
|
||||
assert!(block_on(poll_once(waiter1.as_mut())).is_none());
|
||||
assert!(block_on(poll_once(waiter2.as_mut())).is_none());
|
||||
send.send(()).unwrap();
|
||||
assert_eq!(block_on(waiter1), 2);
|
||||
assert_eq!(block_on(waiter2), 2);
|
||||
});
|
||||
|
||||
#[cfg(miri)]
|
||||
thread::sleep(std::time::Duration::from_secs(5));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn simultaneous_notification() {
|
||||
let thread_count = if cfg!(miri) { 10 } else { 1000 };
|
||||
let barrier = Arc::new(Barrier::new(thread_count + 1));
|
||||
let counter = Arc::new(Counter::new());
|
||||
|
||||
for _ in 0..thread_count {
|
||||
let barrier = barrier.clone();
|
||||
let counter = counter.clone();
|
||||
thread::spawn(move || {
|
||||
// Wait for a listener to be created.
|
||||
barrier.wait();
|
||||
|
||||
// Notify all at the same time.
|
||||
counter.increment();
|
||||
});
|
||||
}
|
||||
|
||||
// Wait for a notification.
|
||||
let listener = counter.change();
|
||||
futures_lite::pin!(listener);
|
||||
assert!(block_on(poll_once(listener.as_mut())).is_none());
|
||||
|
||||
// Signal the other threads.
|
||||
barrier.wait();
|
||||
|
||||
// Wait to be notified.
|
||||
thread::sleep(std::time::Duration::from_secs(3));
|
||||
block_on(listener);
|
||||
}
|
121
tests/notify.rs
121
tests/notify.rs
|
@ -4,12 +4,9 @@ use std::sync::{Arc, Mutex};
|
|||
use std::task::Context;
|
||||
use std::usize;
|
||||
|
||||
use event_listener::{Event, EventListener};
|
||||
use event_listener::{Event, EventListener, Listener};
|
||||
use waker_fn::waker_fn;
|
||||
|
||||
#[cfg(target_family = "wasm")]
|
||||
use wasm_bindgen_test::wasm_bindgen_test as test;
|
||||
|
||||
fn is_notified(listener: &mut EventListener) -> bool {
|
||||
let waker = waker_fn(|| ());
|
||||
Pin::new(listener)
|
||||
|
@ -17,6 +14,20 @@ fn is_notified(listener: &mut EventListener) -> bool {
|
|||
.is_ready()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn debug() {
|
||||
let event = Event::new();
|
||||
let fmt = format!("{:?}", &event);
|
||||
assert!(fmt.contains("Event"));
|
||||
|
||||
let listener = event.listen();
|
||||
let fmt = format!("{:?}", &listener);
|
||||
assert!(fmt.contains("EventListener"));
|
||||
|
||||
let fmt = format!("{:?}", &event);
|
||||
assert!(fmt.contains("Event"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn notify() {
|
||||
let event = Event::new();
|
||||
|
@ -25,13 +36,15 @@ fn notify() {
|
|||
let mut l2 = event.listen();
|
||||
let mut l3 = event.listen();
|
||||
|
||||
assert!(!event.is_notified());
|
||||
assert!(!is_notified(&mut l1));
|
||||
assert!(!is_notified(&mut l2));
|
||||
assert!(!is_notified(&mut l3));
|
||||
|
||||
assert_eq!(event.notify(2), 2);
|
||||
assert!(event.is_notified());
|
||||
assert_eq!(event.notify(1), 0);
|
||||
|
||||
assert!(event.is_notified());
|
||||
assert!(is_notified(&mut l1));
|
||||
assert!(is_notified(&mut l2));
|
||||
assert!(!is_notified(&mut l3));
|
||||
|
@ -45,9 +58,54 @@ fn notify_additional() {
|
|||
let mut l2 = event.listen();
|
||||
let mut l3 = event.listen();
|
||||
|
||||
assert!(!event.is_notified());
|
||||
assert_eq!(event.notify_additional(1), 1);
|
||||
assert_eq!(event.notify(1), 0);
|
||||
assert_eq!(event.notify_additional(1), 1);
|
||||
assert!(event.is_notified());
|
||||
|
||||
assert!(is_notified(&mut l1));
|
||||
assert!(is_notified(&mut l2));
|
||||
assert!(!is_notified(&mut l3));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn notify_zero() {
|
||||
let event = Event::new();
|
||||
assert_eq!(event.notify(1), 0);
|
||||
assert!(!event.is_notified());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn notify_relaxed() {
|
||||
let event = Event::new();
|
||||
|
||||
let mut l1 = event.listen();
|
||||
let mut l2 = event.listen();
|
||||
let mut l3 = event.listen();
|
||||
|
||||
assert!(!is_notified(&mut l1));
|
||||
assert!(!is_notified(&mut l2));
|
||||
assert!(!is_notified(&mut l3));
|
||||
|
||||
assert_eq!(event.notify_relaxed(2), 2);
|
||||
assert_eq!(event.notify_relaxed(1), 0);
|
||||
assert!(is_notified(&mut l1));
|
||||
assert!(is_notified(&mut l2));
|
||||
assert!(!is_notified(&mut l3));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn notify_additional_relaxed() {
|
||||
let event = Event::new();
|
||||
|
||||
let mut l1 = event.listen();
|
||||
let mut l2 = event.listen();
|
||||
let mut l3 = event.listen();
|
||||
|
||||
assert_eq!(event.notify_additional_relaxed(1), 1);
|
||||
assert_eq!(event.notify_relaxed(1), 0);
|
||||
assert_eq!(event.notify_additional_relaxed(1), 1);
|
||||
|
||||
assert!(is_notified(&mut l1));
|
||||
assert!(is_notified(&mut l2));
|
||||
|
@ -81,8 +139,10 @@ fn notify_all() {
|
|||
|
||||
assert!(!is_notified(&mut l1));
|
||||
assert!(!is_notified(&mut l2));
|
||||
assert!(!event.is_notified());
|
||||
|
||||
assert_eq!(event.notify(usize::MAX), 2);
|
||||
assert!(event.is_notified());
|
||||
assert!(is_notified(&mut l1));
|
||||
assert!(is_notified(&mut l2));
|
||||
}
|
||||
|
@ -95,7 +155,7 @@ fn drop_notified() {
|
|||
let mut l2 = event.listen();
|
||||
let mut l3 = event.listen();
|
||||
|
||||
assert_eq!(event.notify(1), 1);
|
||||
event.notify(1);
|
||||
drop(l1);
|
||||
assert!(is_notified(&mut l2));
|
||||
assert!(!is_notified(&mut l3));
|
||||
|
@ -146,6 +206,55 @@ fn drop_non_notified() {
|
|||
assert!(!is_notified(&mut l2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn discard() {
|
||||
let event = Event::default();
|
||||
|
||||
let l1 = event.listen();
|
||||
assert!(!l1.discard());
|
||||
|
||||
let l1 = event.listen();
|
||||
event.notify(1);
|
||||
assert!(l1.discard());
|
||||
|
||||
let l1 = event.listen();
|
||||
event.notify_additional(1);
|
||||
assert!(l1.discard());
|
||||
|
||||
let mut l1 = event.listen();
|
||||
event.notify(1);
|
||||
assert!(is_notified(&mut l1));
|
||||
assert!(!l1.discard());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn same_event() {
|
||||
let e1 = Event::new();
|
||||
let e2 = Event::new();
|
||||
|
||||
let l1 = e1.listen();
|
||||
let l2 = e1.listen();
|
||||
let l3 = e2.listen();
|
||||
|
||||
assert!(l1.listens_to(&e1));
|
||||
assert!(!l1.listens_to(&e2));
|
||||
assert!(l1.same_event(&l2));
|
||||
assert!(!l1.same_event(&l3));
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic = "cannot poll a completed `EventListener` future"]
|
||||
fn poll_twice() {
|
||||
let event = Event::new();
|
||||
let mut l1 = event.listen();
|
||||
event.notify(1);
|
||||
|
||||
assert!(is_notified(&mut l1));
|
||||
|
||||
// Panic here.
|
||||
is_notified(&mut l1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn notify_all_fair() {
|
||||
let event = Event::new();
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
//! Test the wait() family of methods.
|
||||
|
||||
#![cfg(feature = "std")]
|
||||
|
||||
use event_listener::{Event, EventListener, IntoNotification, Listener};
|
||||
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use waker_fn::waker_fn;
|
||||
|
||||
fn is_notified(listener: &mut EventListener) -> bool {
|
||||
let waker = waker_fn(|| ());
|
||||
Pin::new(listener)
|
||||
.poll(&mut Context::from_waker(&waker))
|
||||
.is_ready()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn total_listeners() {
|
||||
let event = Event::new();
|
||||
assert_eq!(event.total_listeners(), 0);
|
||||
|
||||
let listener = event.listen();
|
||||
assert_eq!(event.total_listeners(), 1);
|
||||
|
||||
drop(listener);
|
||||
assert_eq!(event.total_listeners(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wait() {
|
||||
let event = Event::new();
|
||||
let listener = event.listen();
|
||||
|
||||
assert_eq!(event.notify(1), 1);
|
||||
listener.wait();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wait_tag() {
|
||||
let event = Event::with_tag();
|
||||
let listener = event.listen();
|
||||
|
||||
assert_eq!(event.notify(1.tag(5i32)), 1);
|
||||
assert_eq!(listener.wait(), 5i32);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wait_timeout() {
|
||||
let event = Event::new();
|
||||
let listener = event.listen();
|
||||
|
||||
assert_eq!(event.notify(1), 1);
|
||||
assert_eq!(listener.wait_timeout(Duration::from_millis(50)), Some(()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wait_deadline() {
|
||||
let event = Event::new();
|
||||
let listener = event.listen();
|
||||
|
||||
assert_eq!(event.notify(1), 1);
|
||||
assert_eq!(
|
||||
listener.wait_deadline(Instant::now() + Duration::from_millis(50)),
|
||||
Some(())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wait_timeout_expiry() {
|
||||
let event = Event::new();
|
||||
let listener = event.listen();
|
||||
|
||||
let start = Instant::now();
|
||||
assert_eq!(listener.wait_timeout(Duration::from_millis(200)), None);
|
||||
assert!(Instant::now().duration_since(start) >= Duration::from_millis(200));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unpark() {
|
||||
let event = Arc::new(Event::new());
|
||||
let listener = event.listen();
|
||||
|
||||
thread::spawn({
|
||||
let event = event.clone();
|
||||
move || {
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
event.notify(1);
|
||||
}
|
||||
});
|
||||
|
||||
listener.wait();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unpark_timeout() {
|
||||
let event = Arc::new(Event::new());
|
||||
let listener = event.listen();
|
||||
|
||||
thread::spawn({
|
||||
let event = event.clone();
|
||||
move || {
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
event.notify(1);
|
||||
}
|
||||
});
|
||||
|
||||
let x = listener.wait_timeout(Duration::from_millis(200));
|
||||
assert!(x.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic = "cannot wait twice on an `EventListener`"]
|
||||
fn wait_twice() {
|
||||
let event = Event::new();
|
||||
let mut listener = event.listen();
|
||||
event.notify(1);
|
||||
|
||||
assert!(is_notified(&mut listener));
|
||||
|
||||
// Panic here.
|
||||
listener.wait();
|
||||
}
|
|
@ -0,0 +1,120 @@
|
|||
//! Tests relating to tagging.
|
||||
|
||||
#![cfg(feature = "std")]
|
||||
|
||||
use std::future::Future;
|
||||
use std::panic;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use event_listener::{Event, EventListener, IntoNotification};
|
||||
use waker_fn::waker_fn;
|
||||
|
||||
fn notified<T>(listener: &mut EventListener<T>) -> Option<T> {
|
||||
let waker = waker_fn(|| ());
|
||||
match Pin::new(listener).poll(&mut Context::from_waker(&waker)) {
|
||||
Poll::Ready(tag) => Some(tag),
|
||||
Poll::Pending => None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn notify_tag() {
|
||||
let event = Event::with_tag();
|
||||
|
||||
let mut l1 = event.listen();
|
||||
let mut l2 = event.listen();
|
||||
let mut l3 = event.listen();
|
||||
|
||||
assert!(notified(&mut l1).is_none());
|
||||
assert!(notified(&mut l2).is_none());
|
||||
assert!(notified(&mut l3).is_none());
|
||||
|
||||
assert_eq!(event.notify(2.tag(true)), 2);
|
||||
assert_eq!(event.notify(1.tag(false)), 0);
|
||||
assert_eq!(notified(&mut l1), Some(true));
|
||||
assert_eq!(notified(&mut l2), Some(true));
|
||||
assert!(notified(&mut l3).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn notify_additional_tag() {
|
||||
let event = Event::with_tag();
|
||||
|
||||
let mut l1 = event.listen();
|
||||
let mut l2 = event.listen();
|
||||
let mut l3 = event.listen();
|
||||
|
||||
assert_eq!(event.notify(1.additional().tag(1)), 1);
|
||||
assert_eq!(event.notify(1.tag(2)), 0);
|
||||
assert_eq!(event.notify(1.additional().tag(3)), 1);
|
||||
|
||||
assert_eq!(notified(&mut l1), Some(1));
|
||||
assert_eq!(notified(&mut l2), Some(3));
|
||||
assert!(notified(&mut l3).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn notify_with_tag() {
|
||||
let event = Event::with_tag();
|
||||
|
||||
let mut l1 = event.listen();
|
||||
let mut l2 = event.listen();
|
||||
let mut l3 = event.listen();
|
||||
|
||||
assert!(notified(&mut l1).is_none());
|
||||
assert!(notified(&mut l2).is_none());
|
||||
assert!(notified(&mut l3).is_none());
|
||||
|
||||
let mut i = 0usize;
|
||||
|
||||
event.notify(2.tag_with(|| {
|
||||
i += 1;
|
||||
i
|
||||
}));
|
||||
assert_eq!(notified(&mut l1), Some(1));
|
||||
assert_eq!(notified(&mut l2), Some(2));
|
||||
assert!(notified(&mut l3).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drop_notify_with_tag() {
|
||||
let event = Event::with_tag();
|
||||
|
||||
let l1 = event.listen();
|
||||
let mut l2 = event.listen();
|
||||
let mut l3 = event.listen();
|
||||
|
||||
assert_eq!(event.notify(1.tag_with(|| 5i32)), 1);
|
||||
drop(l1);
|
||||
assert_eq!(notified(&mut l2), Some(5));
|
||||
assert!(notified(&mut l3).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn panic_with_tag() {
|
||||
let event = Event::with_tag();
|
||||
|
||||
let mut l1 = event.listen();
|
||||
let mut l2 = event.listen();
|
||||
let mut l3 = event.listen();
|
||||
let mut l4 = event.listen();
|
||||
|
||||
let p = panic::catch_unwind(|| {
|
||||
let mut i = 0usize;
|
||||
|
||||
event.notify(4.tag_with(|| {
|
||||
i += 1;
|
||||
if i > 2 {
|
||||
panic!();
|
||||
}
|
||||
i
|
||||
}));
|
||||
});
|
||||
assert!(p.is_err());
|
||||
|
||||
assert_eq!(notified(&mut l1), Some(1));
|
||||
assert_eq!(notified(&mut l2), Some(2));
|
||||
assert!(notified(&mut l3).is_none());
|
||||
assert!(notified(&mut l4).is_none());
|
||||
}
|
Loading…
Reference in New Issue