Compare commits

...

18 Commits

Author SHA1 Message Date
John Nunley f659391c3a
tests: Add further testing
Co-authored-by: Jacob Rothstein <hi@jbr.me>
Signed-off-by: John Nunley <dev@notgull.net>
2024-04-15 20:10:06 -07:00
John Nunley 6b73d6995d
feat: Add a way to get the current number of listeners
Mirror of #114

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-15 20:08:12 -07:00
John Nunley 34d9dc4374
m: Update fmt::Debug to produce useful output
cc #86, it's harder to get this info on no_std so I've ignored it for
now

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-15 20:08:09 -07:00
John Nunley 89c1001d7d
feat: Re-add the is_notified method
cc #48 and #59

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-15 20:07:59 -07:00
Alex Touchet cdeb708d60
Fix build badge and update links (#45) 2024-04-15 20:07:53 -07:00
John Nunley bdbd973850
docs: Update documentation
- Update Cargo.toml and CHANGELOG.md
- Add back the smol-rs logo (#63)

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-15 20:07:38 -07:00
John Nunley 47cea329e8
bench: Add basic benchmarks
Replay of #31 with minor modifications

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-15 20:07:22 -07:00
John Nunley dbfbb3efc8
feat: Add no_std implementation
This commit adds an implementation of the event-listener algorithm built
without locks. This is a replacement of the old no_std backend. It is
written without concurrent-queue and therefore closes #109.

The idea behind this implementation is to store the listeners in "slots"
in an infinitely large list. Then, assign each listener a number and
then use that to wait on each listener.

The list used is similar to the one used by the thread-local crate. It
consists of a list of "buckets" that hold slots. The number of slots
increases in an amortized way. The first slot holds 1, the second slot
holds 2, the third slot holds 4... all the way up to usize::MAX slots.

Indexes are done by having a list of reusable indexes and using those
when possible, only increasing the max index when necessary. This part
of the code could use some work; under contention it's possible to
make some slots unusuable. This can happen under two cases:

- If there is contention on the indexes list when the listener is being
  freed.
- If the slot is still being notified when it is attempted to be reused.

Both of these cases are probably fixable, and should be fixed before
release. Otherwise long running server processes using this code will
run out of memory under heavy loads.

From here the rest of the implementation is an atomic linked list based
on the above primitives. It functions very similarly to the std variant.
The main difference is that the Link structure's waker functions very
similarly to AtomicWaker from the atomic-waker crate. Aside from that
the code isn't very interesting on its own.

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-15 20:06:54 -07:00
John Nunley 19ef495efb
m: Move internals to std module
This will allow us to create a no-std module

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-15 20:06:45 -07:00
John Nunley 5511324183
tests: Add a greater breadth of tests
This also makes sure that MIRI can run here.

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-15 20:06:26 -07:00
John Nunley 375e5fdba2
feat: Add back notify() returning the event count
This way added earlier as a way of telling how many listeners were woken
up by a notify() call. It's very simple to implement with the mutex
strategy.

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-15 20:04:57 -07:00
John Nunley 2d62017541
feat: Add back listener! macro
For now it just creates a listener on the heap. We can rework this
later, maybe to include stack listeners. For now it's best to keep it
this way, I feel.

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-15 20:04:43 -07:00
John Nunley c04af898c0
feat: Add back the Listener trait
The Listener trait was previously used to abstract over heap and stack
based event listeners. However at this point we only have heap based
event listeners. Still, to avoid a breaking change we need to have this
trait.

Other minor fixups include:

- Make the locking ignore poisoning.
- Reduce formatting code

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-15 20:04:28 -07:00
John Nunley 5b0b36ae63
feat: Add back portable-atomic feature
This commit adds back a feature that allows one to use portable-atomic
instead of regular atomics.

Kind of useless without no-std, but it's here again.

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-15 20:03:25 -07:00
John Nunley 3d52d2c8ef
feat: Add back tagged events
This commit re-adds the "notify.rs" file pretty much verbatim. This
allows for tagged notifications to be used, i.e. convey additional
control information along with the notification. The tags are stored in
the links with the notification.

This also adds some tests for the new functionality.

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-15 20:02:03 -07:00
John Nunley b3a4ddb3b6
feat: Add back tag generic
This commit adds back the <T> parameter on Event and EventListener. So
far nothing is done with the generic parameter. However it will
eventually be used to bring back tagged events.

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-15 20:01:38 -07:00
Taiki Endo a51a2102a5
chore: Re-apply Taiki's commits
@taiki-e's commits since v2.5.3 aren't the problem, as they only change
CI/framing details. This commit brings back all work by @taiki-e.

Co-authored-by: John Nunley <dev@notgull.net>
Signed-off-by: John Nunley <dev@notgull.net>
2024-04-15 20:01:20 -07:00
John Nunley 6368046898
chore: Revert back to v2.5.3
I'm man enough to know when I've been wrong.

The intrusive-list-based list implementation is riddled with bugs, fails
MIRI, and overall increases the complexity of this crate. I've invested
a lot of time and energy into improving it; effort which I feel has been
wasted. So I figure I'll cut my losses and revert.

This PR undoes every commit made between now and the release of
event-listener version 2.5.3. The next commits will be replaying newer
features in order to avoid a breaking change.

Signed-off-by: John Nunley <dev@notgullnet>
2024-04-15 19:58:35 -07:00
17 changed files with 2437 additions and 3620 deletions

View File

@ -13,7 +13,6 @@ on:
env:
CARGO_INCREMENTAL: 0
CARGO_NET_GIT_FETCH_WITH_CLI: true
CARGO_NET_RETRY: 10
CARGO_TERM_COLOR: always
RUST_BACKTRACE: 1
@ -27,72 +26,37 @@ defaults:
jobs:
test:
runs-on: ubuntu-latest
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
include:
- rust: stable
- rust: beta
- rust: nightly
- rust: nightly
target: i686-unknown-linux-gnu
os: [ubuntu-latest]
rust: [nightly, beta, stable]
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- name: Install cross-compilation tools
uses: taiki-e/setup-cross-toolchain-action@v1
with:
target: ${{ matrix.target }}
if: matrix.target != ''
- run: cargo build --all --all-targets
- run: cargo build --all --no-default-features --all-targets
- run: cargo build --all --all-features --all-targets
- run: cargo test --all
- run: cargo test --all --release
- run: cargo test --no-default-features --tests
- run: cargo test --no-default-features --tests --release
- name: Install cargo-hack
uses: taiki-e/install-action@cargo-hack
- run: rustup target add thumbv7m-none-eabi
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
run: cargo hack build --all --no-dev-deps
- run: cargo hack build --all --target thumbv7m-none-eabi --no-default-features --no-dev-deps
- run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps --features portable-atomic
- name: Install wasm-pack
uses: taiki-e/install-action@wasm-pack
- run: wasm-pack test --node
- run: wasm-pack test --node --no-default-features
- run: wasm-pack test --node --no-default-features --features portable-atomic
- name: Clone some dependent crates
run: |
git clone https://github.com/smol-rs/event-listener-strategy.git
git clone https://github.com/smol-rs/async-channel.git
git clone https://github.com/smol-rs/async-lock.git
- name: Patch dependent crates
run: |
echo '[patch.crates-io]' >> event-listener-strategy/Cargo.toml
echo 'event-listener = { path = ".." }' >> event-listener-strategy/Cargo.toml
echo '[patch.crates-io]' >> async-channel/Cargo.toml
echo 'event-listener = { path = ".." }' >> async-channel/Cargo.toml
echo 'event-listener-strategy = { path = "../event-listener-strategy" }' >> async-channel/Cargo.toml
echo '[patch.crates-io]' >> async-lock/Cargo.toml
echo 'event-listener = { path = ".." }' >> async-lock/Cargo.toml
echo 'event-listener-strategy = { path = "../event-listener-strategy" }' >> async-lock/Cargo.toml
echo 'async-channel = { path = "../async-channel" }' >> async-lock/Cargo.toml
- name: Test dependent crates
run: |
cargo test --manifest-path=event-listener-strategy/Cargo.toml
cargo test --manifest-path=async-channel/Cargo.toml
cargo test --manifest-path=async-lock/Cargo.toml
if: startsWith(matrix.rust, 'nightly')
run: cargo check -Z features=dev_dep
- run: cargo test
msrv:
runs-on: ubuntu-latest
strategy:
matrix:
# When updating this, the reminder to update the minimum supported
# Rust version in Cargo.toml.
rust: ['1.59']
steps:
- uses: actions/checkout@v4
- name: Install cargo-hack
uses: taiki-e/install-action@cargo-hack
- run: cargo hack build --all --rust-version
- run: cargo hack build --all --no-default-features --rust-version
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- run: cargo build
- run: cargo build --no-default-features
clippy:
runs-on: ubuntu-latest
@ -100,7 +64,9 @@ jobs:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- run: cargo clippy --all --all-features --all-targets
- run: cargo clippy --all-features --all-targets
- run: cargo clippy --all-targets
- run: cargo clippy --no-default-features --all-targets
fmt:
runs-on: ubuntu-latest
@ -116,34 +82,10 @@ jobs:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup toolchain install nightly --component miri && rustup default nightly
- run: |
echo "MIRIFLAGS=-Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation" >>"${GITHUB_ENV}"
echo "RUSTFLAGS=${RUSTFLAGS} -Z randomize-layout" >>"${GITHUB_ENV}"
- run: cargo miri test --all
- run: cargo miri test --no-default-features --tests
- run: cargo miri test --no-default-features --features portable-atomic --tests
- name: Clone some dependent crates
run: |
git clone https://github.com/smol-rs/event-listener-strategy.git
git clone https://github.com/smol-rs/async-channel.git
git clone https://github.com/smol-rs/async-lock.git
- name: Patch dependent crates
run: |
echo '[patch.crates-io]' >> event-listener-strategy/Cargo.toml
echo 'event-listener = { path = ".." }' >> event-listener-strategy/Cargo.toml
echo '[patch.crates-io]' >> async-channel/Cargo.toml
echo 'event-listener = { path = ".." }' >> async-channel/Cargo.toml
echo 'event-listener-strategy = { path = "../event-listener-strategy" }' >> async-channel/Cargo.toml
echo '[patch.crates-io]' >> async-lock/Cargo.toml
echo 'event-listener = { path = ".." }' >> async-lock/Cargo.toml
echo 'event-listener-strategy = { path = "../event-listener-strategy" }' >> async-lock/Cargo.toml
echo 'async-channel = { path = "../async-channel" }' >> async-lock/Cargo.toml
- name: Test dependent crates
# async-channel isn't included here as it appears to be broken on MIRI.
# See https://github.com/smol-rs/async-channel/issues/85
run: |
cargo miri test --manifest-path=event-listener-strategy/Cargo.toml
cargo miri test --manifest-path=async-lock/Cargo.toml
- run: cargo miri test
env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation
RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout
security_audit:
permissions:
@ -152,19 +94,9 @@ jobs:
issues: write
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v3
# https://github.com/rustsec/audit-check/issues/2
- uses: rustsec/audit-check@master
with:
token: ${{ secrets.GITHUB_TOKEN }}
loom:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- name: Loom tests
run: RUSTFLAGS="--cfg=loom" cargo test --release --test loom --features loom

View File

@ -2,11 +2,11 @@
name = "event-listener"
# When publishing a new version:
# - Update CHANGELOG.md
# - Create "v5.x.y" git tag
# - Create "v2.x.y" git tag
version = "5.3.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2021"
rust-version = "1.60"
rust-version = "1.59"
description = "Notify async tasks or threads"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/smol-rs/event-listener"
@ -16,43 +16,22 @@ exclude = ["/.*"]
[features]
default = ["std"]
std = ["concurrent-queue/std", "parking"]
portable-atomic = ["portable-atomic-util", "portable_atomic_crate"]
loom = ["concurrent-queue/loom", "parking?/loom", "dep:loom"]
std = []
portable-atomic = ["portable-atomic-crate", "portable-atomic-util"]
[dependencies]
concurrent-queue = { version = "2.4.0", default-features = false }
pin-project-lite = "0.2.12"
portable-atomic-util = { version = "0.1.4", default-features = false, optional = true, features = ["alloc"] }
[target.'cfg(not(target_family = "wasm"))'.dependencies]
parking = { version = "2.0.0", optional = true }
[target.'cfg(loom)'.dependencies]
loom = { version = "0.7", optional = true }
[dependencies.portable_atomic_crate]
package = "portable-atomic"
version = "1.2.0"
default-features = false
optional = true
portable-atomic-crate = { version = "1.6.0", package = "portable-atomic", optional = true }
portable-atomic-util = { version = "0.1.5", features = ["alloc"], optional = true }
[dev-dependencies]
futures-lite = "2.0.0"
try-lock = "0.2.5"
futures-lite = "2.3.0"
criterion = { version = "0.3.4", default-features = false, features = ["cargo_bench_support"] }
waker-fn = "1"
[dev-dependencies.criterion]
version = "0.5"
default-features = false
features = ["cargo_bench_support"]
[target.'cfg(target_family = "wasm")'.dev-dependencies]
wasm-bindgen-test = "0.3"
[[bench]]
name = "bench"
harness = false
required-features = ["std"]
[lib]
bench = false
bench = false

View File

@ -1,5 +1,3 @@
use std::iter;
use criterion::{criterion_group, criterion_main, Criterion};
use event_listener::{Event, Listener};
@ -8,14 +6,16 @@ const COUNT: usize = 8000;
fn bench_events(c: &mut Criterion) {
c.bench_function("notify_and_wait", |b| {
let ev = Event::new();
let mut handles = Vec::with_capacity(COUNT);
b.iter(|| {
handles.extend(iter::repeat_with(|| ev.listen()).take(COUNT));
let mut handles = Vec::with_capacity(COUNT);
for _ in 0..COUNT {
handles.push(ev.listen());
}
ev.notify(COUNT);
for handle in handles.drain(..) {
for handle in handles {
handle.wait();
}
});

View File

@ -2,25 +2,29 @@
//!
//! This mutex exposes both blocking and async methods for acquiring a lock.
#[cfg(not(target_family = "wasm"))]
mod example {
#![allow(dead_code)]
#![allow(dead_code)]
#[cfg(feature = "std")]
mod ex {
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::{Duration, Instant};
use event_listener::{listener, Event, Listener};
use try_lock::{Locked, TryLock};
use event_listener::{Event, Listener};
/// A simple mutex.
struct Mutex<T> {
/// Set to `true` when the mutex is locked.
locked: AtomicBool,
/// Blocked lock operations.
lock_ops: Event,
/// The inner non-blocking mutex.
data: TryLock<T>,
/// The inner protected data.
data: UnsafeCell<T>,
}
unsafe impl<T: Send> Send for Mutex<T> {}
@ -30,40 +34,49 @@ mod example {
/// Creates a mutex.
fn new(t: T) -> Mutex<T> {
Mutex {
locked: AtomicBool::new(false),
lock_ops: Event::new(),
data: TryLock::new(t),
data: UnsafeCell::new(t),
}
}
/// Attempts to acquire a lock.
fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
self.data.try_lock().map(MutexGuard)
if !self.locked.swap(true, Ordering::Acquire) {
Some(MutexGuard(self))
} else {
None
}
}
/// Blocks until a lock is acquired.
fn lock(&self) -> MutexGuard<'_, T> {
let mut listener = None;
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}
// Set up an event listener.
listener!(self.lock_ops => listener);
// Try again.
if let Some(guard) = self.try_lock() {
return guard;
// Set up an event listener or wait for a notification.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
// Wait until a notification is received.
l.wait();
}
}
// Wait for a notification.
listener.wait();
}
}
/// Blocks until a lock is acquired or the timeout is reached.
fn lock_timeout(&self, timeout: Duration) -> Option<MutexGuard<'_, T>> {
let deadline = Instant::now() + timeout;
let mut listener = None;
loop {
// Attempt grabbing a lock.
@ -71,59 +84,73 @@ mod example {
return Some(guard);
}
// Set up an event listener.
listener!(self.lock_ops => listener);
// Try again.
if let Some(guard) = self.try_lock() {
return Some(guard);
// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
// Wait until a notification is received.
l.wait_deadline(deadline)?;
}
}
// Wait until a notification is received.
listener.wait_deadline(deadline)?;
}
}
/// Acquires a lock asynchronously.
async fn lock_async(&self) -> MutexGuard<'_, T> {
let mut listener = None;
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}
// Set up an event listener.
listener!(self.lock_ops => listener);
// Try again.
if let Some(guard) = self.try_lock() {
return guard;
// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
// Wait until a notification is received.
l.await;
}
}
// Wait until a notification is received.
listener.await;
}
}
}
/// A guard holding a lock.
struct MutexGuard<'a, T>(Locked<'a, T>);
struct MutexGuard<'a, T>(&'a Mutex<T>);
unsafe impl<T: Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}
impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
self.0.locked.store(false, Ordering::Release);
self.0.lock_ops.notify(1);
}
}
impl<T> Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
&self.0
unsafe { &*self.0.data.get() }
}
}
impl<T> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.0
unsafe { &mut *self.0.data.get() }
}
}
pub(super) fn entry() {
pub(crate) fn entry() {
const N: usize = 10;
// A shared counter.
@ -158,13 +185,13 @@ mod example {
}
}
#[cfg(target_family = "wasm")]
mod example {
pub(super) fn entry() {
println!("This example is not supported on wasm yet.");
#[cfg(not(feature = "std"))]
mod ex {
pub(crate) fn entry() {
eprintln!("this example requires the 'std' feature")
}
}
fn main() {
example::entry();
ex::entry();
}

1559
src/lib.rs

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,851 @@
//! Implementation of the linked list using lock-free primitives.
use crate::notify::{GenericNotify, Internal, Notification};
use core::cell::{Cell, UnsafeCell};
use core::cmp::Reverse;
use core::fmt;
use core::hint::spin_loop;
use core::iter;
use core::marker::PhantomData;
use core::mem::{self, MaybeUninit};
use core::num::NonZeroUsize;
use core::ptr::{self, NonNull};
use core::slice;
use core::task::{Context, Poll, Waker};
use alloc::boxed::Box;
use alloc::collections::BinaryHeap;
use crate::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering};
/// The total number of buckets stored in each thread local.
/// All buckets combined can hold up to `usize::MAX - 1` entries.
const BUCKETS: usize = (usize::BITS - 1) as usize;
/// This listener has no data.
const NEW: usize = 0;
/// This listener is occupied.
const OCCUPIED: usize = 0b00001;
/// This listener is notified.
const NOTIFIED: usize = 0b00010;
/// This listener's notification is additional.
const ADDITIONAL: usize = 0b00100;
/// This listener is in the process of registering a new waker.
const REGISTERING: usize = 0b01000;
/// This listener is in the process of notifying a previous waker.
const NOTIFYING: usize = 0b10000;
/// State of a listener that was just removed.
enum NotificationState {
/// There was no notification.
Unnotified,
/// We were notified with `notify()`
Notified,
/// We were notified with `notify_additional()`
NotifiedAdditional,
}
/// Inner implementation of [`Event`].
pub(crate) struct Inner<T> {
/// List of slots containing listeners.
slots: Slots<T>,
/// Free indexes for listeners.
indexes: Indexes,
/// Head of the linked list.
head: AtomicUsize,
/// End of the linked list.
tail: AtomicUsize,
/// Whether someone is notifying this list.
///
/// Only one task can notify this event at a time. This task is called the "notifier".
notifying: AtomicBool,
/// Number of notifications.
///
/// The notifier will check this for further notifications.
standard_notify: AtomicUsize,
/// Number of additional notifications.
///
/// The notifier will check this for further notifications.
additional_notify: AtomicUsize,
}
impl<T> Inner<T> {
/// Create a new linked list.
pub(crate) fn new() -> Self {
Self {
slots: Slots::new(),
indexes: Indexes::new(),
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
notifying: AtomicBool::new(false),
standard_notify: AtomicUsize::new(0),
additional_notify: AtomicUsize::new(0),
}
}
/// Debug output.
#[inline]
pub(crate) fn debug_fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Event").finish_non_exhaustive()
}
/// We never have enough info to tell this for sure.
pub(crate) fn notifyable(&self, _limit: usize) -> bool {
true
}
/// Tell whether any listeners are currently notified.
#[inline]
pub fn is_notified(&self) -> bool {
let mut head = self.head.load(Ordering::Acquire);
loop {
if head == 0 {
// No entries left.
return false;
}
let slot = self.slots.get(head);
// If this slot isn't occupied, use the next one.
let state = slot.state.load(Ordering::Acquire);
if state & OCCUPIED == 0 {
head = slot.next.load(Ordering::Acquire);
} else {
return state & NOTIFIED != 0;
}
}
}
/// Insert a listener into this linked list.
#[cold]
pub(crate) fn listen(&self) -> Listener<T> {
/// Update some local state and the current slot's tail at once.
struct TailUpdater<'a> {
tail: Cell<usize>,
current_prev: &'a AtomicUsize,
}
impl TailUpdater<'_> {
#[inline]
fn tail(&self) -> usize {
self.tail.get()
}
#[inline]
fn update(&self, new_tail: usize, ordering: Ordering) {
self.tail.set(new_tail);
self.current_prev.store(new_tail, ordering);
}
}
let (index, slot) = loop {
let index = self.indexes.alloc();
let slot = self.slots.get(index);
// If a notification from last time is still being notified, allocate a new ID.
if slot.state.load(Ordering::Acquire) & NOTIFYING != 0 {
continue;
} else {
break (index, slot);
}
};
let state = TailUpdater {
tail: Cell::new(0),
current_prev: &slot.prev,
};
// Set our link's state up.
slot.state.store(OCCUPIED, Ordering::Relaxed);
slot.next.store(0, Ordering::Release);
state.update(self.tail.load(Ordering::Relaxed), Ordering::Relaxed);
// Try appending this new node to the back of the list.
loop {
if state.tail() == 0 {
// The list must be empty; try to make ourselves the head.
let old_head = self
.head
.compare_exchange(0, index, Ordering::AcqRel, Ordering::Acquire)
.unwrap_or_else(|x| x);
if old_head == 0 {
// Success! We are now the head. Set the tail and break.
self.tail.store(index, Ordering::Release);
break;
} else {
// Use this head as our current tail and traverse the list.
state.update(old_head, Ordering::Acquire);
}
} else {
// The tail's end should be 0. Set our index to it.
let tail_slot = self.slots.get(state.tail());
let old_tail = tail_slot
.next
.compare_exchange(0, index, Ordering::AcqRel, Ordering::Acquire)
.unwrap_or_else(|x| x);
if old_tail == 0 {
// Success! We are now inserted. Set the tail to our slot number.
let _ = self.tail.compare_exchange(
state.tail(),
index,
Ordering::AcqRel,
Ordering::Acquire,
);
break;
} else {
// The tail is occupied, move onto the next item and continue.
state.update(old_tail, Ordering::Acquire);
}
}
// We may be trapped here for some time.
spin_loop();
}
Listener {
index: Some(NonZeroUsize::new(index).unwrap()),
_phantom: PhantomData,
}
}
pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
// T should always be `()`.
debug_assert_eq!(mem::size_of::<T>(), 0);
debug_assert!(!mem::needs_drop::<T>());
// Get the count and the slot.
let count = notify.count(Internal::new());
let notification_slot = if notify.is_additional(Internal::new()) {
&self.additional_notify
} else {
&self.standard_notify
};
// Bump the count.
notification_slot.fetch_add(count, Ordering::SeqCst);
// Try to become the notifier for a while.
let mut no_lock = true;
for _ in 0..128 {
no_lock = self
.notifying
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.unwrap_or_else(|x| x);
if !no_lock {
break;
}
core::hint::spin_loop();
}
if no_lock {
// We did not capture the lock. The notifier should see the above addition and keep
// going.
return 0;
}
// We have the lock! Make sure to release it when we're done.
let _guard = CallOnDrop(|| self.notifying.store(false, Ordering::Release));
// Notify the actual wakers.
self.notify_locked(count)
}
#[inline]
pub(crate) fn remove(&self, listener: &mut Listener<T>) {
let is_additional = match self.take(listener) {
NotificationState::Unnotified => return,
NotificationState::Notified => false,
NotificationState::NotifiedAdditional => true,
};
// Propagate the notification.
self.notify(GenericNotify::new(1, is_additional, || unreachable!()));
}
#[inline]
pub(crate) fn discard(&self, listener: &mut Listener<T>) -> bool {
!matches!(self.take(listener), NotificationState::Unnotified)
}
#[inline]
pub(crate) fn poll(&self, listener: &mut Listener<T>, cx: &mut Context<'_>) -> Poll<T> {
let index = match listener.index {
Some(index) => index.get(),
None => unreachable!("cannot poll a completed `EventListener` future"),
};
let slot = self.slots.get(index);
// Tell whether or not we have been notified.
if slot.state.load(Ordering::Acquire) & NOTIFIED != 0 {
// We have succeeded!
// SAFETY: T should *always* be ()
debug_assert_eq!(mem::size_of::<T>(), 0);
debug_assert!(!mem::needs_drop::<T>());
self.take(listener);
Poll::Ready(unsafe { mem::zeroed() })
} else {
// Register a waker and wait.
slot.register(cx.waker());
Poll::Pending
}
}
#[inline]
fn take(&self, listener: &mut Listener<T>) -> NotificationState {
let index = match listener.index.take() {
Some(x) => x.get(),
None => return NotificationState::Unnotified,
};
let slot = self.slots.get(index);
// Mark this state as unoccupied.
let state = slot.state.fetch_and(!OCCUPIED, Ordering::AcqRel);
debug_assert_ne!(state & OCCUPIED, 0);
debug_assert_eq!(state & REGISTERING, 0);
let prev = slot.prev.swap(0, Ordering::AcqRel);
let next = slot.next.load(Ordering::Acquire);
// Unlink from the previous entry.
if prev == 0 {
self.head.store(next, Ordering::Release);
} else {
self.slots.get(prev).next.store(next, Ordering::Release);
}
if next == 0 {
self.tail.store(prev, Ordering::Release);
} else {
self.slots.get(next).prev.store(prev, Ordering::Release);
}
// Free the index and let another user take the slot.
self.indexes.free(index);
match (state & NOTIFIED != 0, state & ADDITIONAL != 0) {
(false, _) => NotificationState::Unnotified,
(true, false) => NotificationState::Notified,
(true, true) => NotificationState::NotifiedAdditional,
}
}
fn notify_locked(&self, ret_limit: usize) -> usize {
/// Whether to notify as normal or as additional.
#[derive(Debug, Clone, Copy, PartialEq)]
#[repr(usize)]
enum NotifyMethod {
Standard = 0,
Additional = 1,
}
impl NotifyMethod {
fn invert(self) -> Self {
match self {
Self::Standard => Self::Additional,
Self::Additional => Self::Standard,
}
}
fn notification_slot<T>(self, inner: &Inner<T>) -> &AtomicUsize {
match self {
Self::Standard => &inner.standard_notify,
Self::Additional => &inner.additional_notify,
}
}
/// Use this method to notify the list.
fn notify<T>(self, inner: &Inner<T>, mut count: usize) -> usize {
let mut notified = 0;
let mut cursor = inner.head.load(Ordering::Acquire);
let mut prev_cursor = cursor;
while cursor != 0 && count > 0 {
// Get the slot at our cursor.
let slot = inner.slots.get(cursor);
// If the entry is in the progress of being destroyed, try again.
let state = slot.state.load(Ordering::Acquire);
if state & OCCUPIED == 0x0 {
if prev_cursor == cursor {
// We've hit a hole, stop notifying.
break;
} else {
// Load the previous next.
core::hint::spin_loop();
cursor = slot.next.load(Ordering::Acquire);
continue;
}
}
prev_cursor = cursor;
// If the entry is already notified, skip it and decrement the count if needed.
if state & NOTIFIED != 0x0 {
cursor = slot.next.load(Ordering::Acquire);
if let NotifyMethod::Standard = self {
count -= 1;
}
continue;
}
// Notify the entry.
slot.notify(self == NotifyMethod::Additional);
// Update counts and move on.
count -= 1;
notified += 1;
cursor = slot.next.load(Ordering::Acquire);
}
notified
}
}
/// Make sure we don't miss a notification.
struct NotifyState {
current: NotifyMethod,
progress_made: [bool; 2],
}
impl NotifyState {
fn new() -> Self {
Self {
current: NotifyMethod::Standard,
progress_made: [true, true],
}
}
#[inline]
fn make_progress(&mut self) {
self.progress_made[self.current as usize] = true;
}
#[inline]
fn next_method(&mut self) -> Option<NotifyMethod> {
// If we have made no progress, break out.
if self.progress_made == [false, false] {
return None;
}
// Replace our current progress with "false" and switch to the inverted.
self.progress_made[self.current as usize] = false;
self.current = self.current.invert();
Some(self.current)
}
}
let mut state = NotifyState::new();
let mut notified = 0;
while let Some(method) = state.next_method() {
// Move out the count.
let count = method.notification_slot(self).swap(0, Ordering::Acquire);
if count > 0 {
// We've made progress in this state.
state.make_progress();
// Notify `count` entries.
notified += method.notify(self, count);
}
}
core::cmp::min(notified, ret_limit)
}
}
/// Inner implementation of [`EventListener`].
pub(crate) struct Listener<T> {
/// Index into the listener.
index: Option<NonZeroUsize>,
/// We don't physically hold a `T`.
_phantom: PhantomData<Box<T>>,
}
/// Inner listener data.
struct Link<T> {
/// Next entry in the list.
next: AtomicUsize,
/// Previous entry in the list.
prev: AtomicUsize,
/// State of the link.
state: AtomicUsize,
/// Slot for the waker.
waker: Cell<Option<Waker>>,
/// `T` should always be `()`.
_phantom: PhantomData<Box<T>>,
}
impl<T> Link<T> {
fn new() -> Self {
Self {
next: AtomicUsize::new(0),
prev: AtomicUsize::new(0),
state: AtomicUsize::new(NEW),
waker: Cell::new(None),
_phantom: PhantomData,
}
}
/// Notify this particular listener.
fn notify(&self, additional: bool) {
let mask = if additional {
NOTIFYING | NOTIFIED | ADDITIONAL
} else {
NOTIFYING | NOTIFIED
};
let old_state = self.state.fetch_or(mask, Ordering::SeqCst);
// Remove the NOTIFYING flag once we're done.
let _guard = CallOnDrop(|| {
self.state.fetch_and(!NOTIFYING, Ordering::Release);
});
// Three possibilities here:
// - NOTIFIED is set. Someone else beat us to it.
// - REGISTERING is not set. In which case we can freely take and wake.
// - REGISTERING is set. In which case it will wake the waker.
if old_state & NOTIFIED != 0 {
// Do nothing.
} else if old_state & REGISTERING == 0 {
// SAFETY: No one else is fighting for this waker.
if let Some(waker) = self.waker.take() {
// In case waking the waker takes a while, make sure the slot is open.
drop(_guard);
waker.wake();
}
} else {
// Do nothing. The task who set REGISTERING will wake the waker.
}
}
/// Register a new waker into this listener.
fn register(&self, waker: &Waker) {
// Set the REGISTERING flag.
let old_state = self.state.fetch_or(REGISTERING, Ordering::SeqCst);
if old_state & NOTIFIED != 0 {
// poll() somehow missed the notification. Wake the event loop and try again.
let _guard = CallOnDrop(|| {
self.state.fetch_and(!REGISTERING, Ordering::SeqCst);
});
waker.wake_by_ref();
return;
}
// Unset REGISTERING before exiting out.
let _guard = CallOnDrop(|| {
let old_state = self.state.fetch_and(!REGISTERING, Ordering::SeqCst);
if old_state & NOTIFIED != 0 {
// There was a notification while we were registering. Wake our waker up.
if let Some(waker) = self.waker.take() {
waker.wake();
}
}
});
// SAFETY: We have exclusive access to `self.waker`.
match self.waker.take() {
Some(w) if waker.will_wake(&w) => {
// Put it back.
self.waker.set(Some(w));
}
_ => self.waker.set(Some(waker.clone())),
}
}
}
/// Atomically expandable list of slots.
struct Slots<T> {
/// Buckets in the list.
buckets: [AtomicPtr<Link<T>>; BUCKETS],
}
impl<T> Slots<T> {
fn new() -> Self {
unsafe {
// Create an empty array.
let mut buckets: [MaybeUninit<AtomicPtr<Link<T>>>; BUCKETS] = {
let raw: MaybeUninit<[AtomicPtr<Link<T>>; BUCKETS]> = MaybeUninit::uninit();
// SAFETY: MaybeUninit<[T; N]> has the same layout as [MaybeUninit<T>; N]
mem::transmute(raw)
};
// Initialize every bucket to null.
for bucket in buckets.iter_mut() {
*bucket = MaybeUninit::new(AtomicPtr::new(ptr::null_mut()))
}
// SAFETY: The array is now fully initialized.
Self {
buckets: mem::transmute(buckets),
}
}
}
/// Get the slot at the given index.
fn get(&self, index: usize) -> &Link<T> {
let bucket = self.bucket(index);
let slot_index = index_to_slot_index(index);
&bucket[slot_index]
}
/// Get the bucket for the index.
#[inline]
fn bucket(&self, index: usize) -> &[Link<T>] {
let bucket_index = index_to_bucket(index);
let size = bucket_index_to_size(bucket_index);
// Load the pointer for the bucket.
let bucket_ptr = unsafe {
// SAFETY: `bucket` will never be less than `BUCKETS`
self.buckets.get_unchecked(bucket_index)
};
let bucket = bucket_ptr.load(Ordering::Acquire);
// If the bucket doesn't exist already, allocate it.
let ptr = match NonNull::new(bucket) {
Some(bucket) => bucket,
None => {
let new_bucket = iter::repeat_with(|| Link::<T>::new())
.take(size)
.collect::<Box<[_]>>();
let new_bucket = Box::into_raw(new_bucket) as *mut Link<T>;
// Try to replace it.
let old_bucket = bucket_ptr
.compare_exchange(
ptr::null_mut(),
new_bucket,
Ordering::AcqRel,
Ordering::Acquire,
)
.unwrap_or_else(|x| x);
match NonNull::new(old_bucket) {
None => unsafe { NonNull::new_unchecked(new_bucket) },
Some(old_bucket) => {
// Drop the newly created bucket and use the old one.
drop(unsafe { Box::from_raw(slice::from_raw_parts_mut(new_bucket, size)) });
old_bucket
}
}
}
};
unsafe { slice::from_raw_parts(ptr.as_ptr(), size) }
}
}
impl<T> Drop for Slots<T> {
fn drop(&mut self) {
// Free every bucket.
for (i, bucket) in self.buckets.iter_mut().enumerate() {
let bucket = bucket.get_mut();
if bucket.is_null() {
return;
}
// Drop the bucket.
let size = bucket_index_to_size(i);
drop(unsafe { Box::from_raw(slice::from_raw_parts_mut(*bucket, size)) });
}
}
}
/// Available indexes into the list.
struct Indexes {
/// List of indexes into our list.
available: Lock<BinaryHeap<Reverse<usize>>>,
/// The highest index we've produced so far.
last: AtomicUsize,
}
impl Indexes {
fn new() -> Self {
Self {
available: Lock::new(BinaryHeap::new()),
last: AtomicUsize::new(1),
}
}
/// Allocate a new index.
fn alloc(&self) -> usize {
self.available
.access(|available| available.pop().map(|Reverse(x)| x))
.flatten()
.unwrap_or_else(|| self.last.fetch_add(1, Ordering::SeqCst))
}
/// Free an index.
fn free(&self, index: usize) {
self.available.access(|available| {
available.push(Reverse(index));
});
}
}
/// An exclusive lock around some information.
struct Lock<T> {
/// Whether we are locked.
is_locked: AtomicBool,
/// The information we are guarding.
data: UnsafeCell<T>,
}
impl<T> Lock<T> {
/// Create a new lock.
fn new(data: T) -> Self {
Self {
is_locked: AtomicBool::new(false),
data: UnsafeCell::new(data),
}
}
/// Access the underlying data.
fn access<R>(&self, f: impl FnOnce(&mut T) -> R) -> Option<R> {
// Lock the spinlock.
if self
.is_locked
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
// Restore on drop.
let _drop = CallOnDrop(|| self.is_locked.store(false, Ordering::Release));
// SAFETY: We have exclusive access.
Some(f(unsafe { &mut *self.data.get() }))
} else {
None
}
}
}
#[inline]
fn bucket_index_to_size(i: usize) -> usize {
1 << i
}
#[inline]
fn index_to_bucket(i: usize) -> usize {
usize::from(usize::BITS as u16) - ((i + 1).leading_zeros() as usize) - 1
}
#[inline]
fn index_to_slot_index(i: usize) -> usize {
let size = bucket_index_to_size(index_to_bucket(i));
i - (size - 1)
}
struct CallOnDrop<F: FnMut()>(F);
impl<F: FnMut()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(not(miri))]
const MAX: usize = 0xFFFF;
#[cfg(miri)]
const MAX: usize = 0xFF;
#[test]
fn lock() {
let lock = Lock::new(());
let x = lock.access(|()| {
assert!(lock.access(|()| unreachable!()).is_none());
});
assert!(x.is_some());
}
#[test]
fn slots() {
let slots = Slots::<()>::new();
// Don't exhaust our memory; only do this many.
for i in 1..MAX {
slots.get(i);
}
}
#[test]
fn indexes_alloc() {
let index = Indexes::new();
let mut last = 0;
for _ in 0..MAX {
let val = index.alloc();
assert_eq!(val, last + 1);
last = val;
}
}
#[test]
fn indexes_realloc() {
let index = Indexes::new();
assert_eq!(index.alloc(), 1);
assert_eq!(index.alloc(), 2);
assert_eq!(index.alloc(), 3);
assert_eq!(index.alloc(), 4);
index.free(3);
index.free(2);
assert_eq!(index.alloc(), 2);
assert_eq!(index.alloc(), 3);
assert_eq!(index.alloc(), 5);
assert_eq!(index.alloc(), 6);
}
#[test]
fn link_notify() {
let link = Link::<()>::new();
let waker = waker_fn::waker_fn(|| ());
link.register(&waker);
assert_eq!(link.state.load(Ordering::SeqCst), NEW);
link.notify(false);
assert_eq!(link.state.load(Ordering::SeqCst), NOTIFIED);
}
#[test]
fn link_notify_additional() {
let link = Link::<()>::new();
let waker = waker_fn::waker_fn(|| ());
link.register(&waker);
assert_eq!(link.state.load(Ordering::SeqCst), NEW);
link.notify(true);
assert_eq!(link.state.load(Ordering::SeqCst), NOTIFIED | ADDITIONAL);
}
}

11
src/linked_list/mod.rs Normal file
View File

@ -0,0 +1,11 @@
//! Linked list of listeners.
#[cfg(feature = "std")]
mod mutex;
#[cfg(feature = "std")]
pub(crate) use mutex::*;
#[cfg(not(feature = "std"))]
mod lock_free;
#[cfg(not(feature = "std"))]
pub(crate) use lock_free::*;

519
src/linked_list/mutex.rs Normal file
View File

@ -0,0 +1,519 @@
//! Implementation of the linked list using standard library mutexes.
use crate::notify::{GenericNotify, Internal, Notification};
use crate::sync::atomic::{AtomicUsize, Ordering};
use std::boxed::Box;
use std::cell::{Cell, UnsafeCell};
use std::fmt;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::ptr::{self, NonNull};
use std::sync::{Mutex, MutexGuard, TryLockError};
use std::task::{Context, Poll, Waker};
use std::thread::{self, Thread};
use std::time::Instant;
/// Inner state of [`Event`].
pub(crate) struct Inner<T> {
/// The number of notified entries, or `usize::MAX` if all of them have been notified.
///
/// If there are no entries, this value is set to `usize::MAX`.
notified: AtomicUsize,
/// A linked list holding registered listeners.
list: Mutex<List<T>>,
/// A single cached list entry to avoid allocations on the fast path of the insertion.
cache: UnsafeCell<Entry<T>>,
}
impl<T> Inner<T> {
/// Create a new linked list.
pub(crate) fn new() -> Self {
Inner {
notified: AtomicUsize::new(usize::MAX),
list: Mutex::new(List::<T> {
head: None,
tail: None,
start: None,
len: 0,
notified: 0,
cache_used: false,
}),
cache: UnsafeCell::new(Entry {
state: Cell::new(State::Created),
prev: Cell::new(None),
next: Cell::new(None),
}),
}
}
/// Debug output.
#[inline]
pub(crate) fn debug_fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let guard = match self.list.try_lock() {
Err(TryLockError::WouldBlock) => {
return f
.debug_tuple("Event")
.field(&format_args!("<locked>"))
.finish()
}
Err(TryLockError::Poisoned(err)) => err.into_inner(),
Ok(lock) => lock,
};
f.debug_struct("Event")
.field("listeners_notified", &guard.notified)
.field("listeners_total", &guard.len)
.finish()
}
/// Tell whether there is enough room to notify this list.
#[inline]
pub(crate) fn notifyable(&self, limit: usize) -> bool {
self.notified.load(Ordering::Acquire) < limit
}
/// Tell if any listeners are currently notified.
#[inline]
pub(crate) fn is_notified(&self) -> bool {
self.notified.load(Ordering::Acquire) > 0
}
/// Get the total number of listeners.
#[inline]
pub(crate) fn total_listeners(&self) -> usize {
self.lock().guard.len
}
/// Create a listener for this linked list.
#[cold]
pub(crate) fn listen(&self) -> Listener<T> {
Listener {
entry: Some(self.lock().insert(self.cache_ptr())),
}
}
/// Notify the inner list.
#[cold]
pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
self.lock().notify(notify)
}
/// Discard a listener.
#[inline]
pub(crate) fn discard(&self, listener: &mut Listener<T>) -> bool {
// If this listener has never picked up a notification...
if let Some(entry) = listener.entry.take() {
let mut list = self.lock();
// Remove the listener from the list and return `true` if it was notified.
if list.remove(entry, self.cache_ptr()).is_notified() {
return true;
}
}
false
}
/// Poll a listener.
#[inline]
pub(crate) fn poll(&self, listener: &mut Listener<T>, cx: &mut Context<'_>) -> Poll<T> {
let mut list = self.lock();
let entry = match listener.entry {
None => unreachable!("cannot poll a completed `EventListener` future"),
Some(entry) => entry,
};
let state = unsafe { &entry.as_ref().state };
// Do a dummy replace operation in order to take out the state.
match take_state(state) {
State::Notified { .. } => {
// If this listener has been notified, remove it from the list and return.
let state = list.remove(entry, self.cache_ptr());
drop(list);
listener.entry = None;
match state {
State::Notified { tag: Some(tag), .. } => return Poll::Ready(tag),
_ => unreachable!(),
}
}
State::Created => {
// If the listener was just created, put it in the `Polling` state.
state.set(State::Polling(cx.waker().clone()));
}
State::Polling(w) => {
// If the listener was in the `Polling` state, update the waker.
if w.will_wake(cx.waker()) {
state.set(State::Polling(w));
} else {
state.set(State::Polling(cx.waker().clone()));
}
}
State::Waiting(_) => {
unreachable!("cannot poll and wait on `EventListener` at the same time")
}
}
Poll::Pending
}
/// Block on a listener.
#[inline]
pub(crate) fn wait(&self, listener: &mut Listener<T>, deadline: Option<Instant>) -> Option<T> {
// Take out the entry pointer and set it to `None`.
let entry = match listener.entry.take() {
None => unreachable!("cannot wait twice on an `EventListener`"),
Some(entry) => entry,
};
// Set this listener's state to `Waiting`.
{
let mut list = self.lock();
let e = unsafe { entry.as_ref() };
// Do a dummy replace operation in order to take out the state.
match take_state(&e.state) {
State::Notified { .. } => {
// If this listener has been notified, remove it from the list and return.
return list.remove(entry, self.cache_ptr()).take();
}
// Otherwise, set the state to `Waiting`.
_ => e.state.set(State::Waiting(thread::current())),
}
}
// Wait until a notification is received or the timeout is reached.
loop {
match deadline {
None => thread::park(),
Some(deadline) => {
// Check for timeout.
let now = Instant::now();
if now >= deadline {
// Remove the entry and check if notified.
return self.lock().remove(entry, self.cache_ptr()).take();
}
// Park until the deadline.
thread::park_timeout(deadline - now);
}
}
let mut list = self.lock();
let e = unsafe { entry.as_ref() };
// Do a dummy replace operation in order to take out the state.
match take_state(&e.state) {
State::Notified { .. } => {
// If this listener has been notified, remove it from the list and return.
return list.remove(entry, self.cache_ptr()).take();
}
// Otherwise, set the state back to `Waiting`.
state => e.state.set(state),
}
}
}
/// Drop a listener.
#[inline]
pub(crate) fn remove(&self, listener: &mut Listener<T>) {
// If this listener has never picked up a notification...
if let Some(entry) = listener.entry.take() {
let mut list = self.lock();
// But if a notification was delivered to it...
if let State::Notified {
additional,
tag: Some(tag),
} = list.remove(entry, self.cache_ptr())
{
let mut tag = Some(tag);
// Then pass it on to another active listener.
list.notify(GenericNotify::new(1, additional, || tag.take().unwrap()));
}
}
}
/// Locks the list.
fn lock(&self) -> ListGuard<'_, T> {
ListGuard {
inner: self,
guard: self.list.lock().unwrap_or_else(|e| e.into_inner()),
}
}
/// Returns the pointer to the single cached list entry.
#[inline(always)]
fn cache_ptr(&self) -> NonNull<Entry<T>> {
unsafe { NonNull::new_unchecked(self.cache.get()) }
}
}
/// Inner state of the `EventListener`.
pub(crate) struct Listener<T> {
/// A pointer to this listener's entry in the linked list.
entry: Option<NonNull<Entry<T>>>,
}
/// A guard holding the linked list locked.
struct ListGuard<'a, T> {
/// A reference to [`Event`]'s inner state.
inner: &'a Inner<T>,
/// The actual guard that acquired the linked list.
guard: MutexGuard<'a, List<T>>,
}
impl<T> Drop for ListGuard<'_, T> {
#[inline]
fn drop(&mut self) {
let list = &mut **self;
// Update the atomic `notified` counter.
let notified = if list.notified < list.len {
list.notified
} else {
usize::MAX
};
self.inner.notified.store(notified, Ordering::Release);
}
}
impl<T> Deref for ListGuard<'_, T> {
type Target = List<T>;
#[inline]
#[cfg_attr(coverage, coverage(off))]
fn deref(&self) -> &List<T> {
&self.guard
}
}
impl<T> DerefMut for ListGuard<'_, T> {
#[inline]
fn deref_mut(&mut self) -> &mut List<T> {
&mut self.guard
}
}
/// An entry representing a registered listener.
struct Entry<T> {
/// THe state of this listener.
state: Cell<State<T>>,
/// Previous entry in the linked list.
prev: Cell<Option<NonNull<Entry<T>>>>,
/// Next entry in the linked list.
next: Cell<Option<NonNull<Entry<T>>>>,
}
/// A linked list of entries.
struct List<T> {
/// First entry in the list.
head: Option<NonNull<Entry<T>>>,
/// Last entry in the list.
tail: Option<NonNull<Entry<T>>>,
/// The first unnotified entry in the list.
start: Option<NonNull<Entry<T>>>,
/// Total number of entries in the list.
len: usize,
/// The number of notified entries in the list.
notified: usize,
/// Whether the cached entry is used.
cache_used: bool,
}
impl<T> List<T> {
/// Inserts a new entry into the list.
fn insert(&mut self, cache: NonNull<Entry<T>>) -> NonNull<Entry<T>> {
unsafe {
let entry = Entry {
state: Cell::new(State::Created),
prev: Cell::new(self.tail),
next: Cell::new(None),
};
let entry = if self.cache_used {
// Allocate an entry that is going to become the new tail.
NonNull::new_unchecked(Box::into_raw(Box::new(entry)))
} else {
// No need to allocate - we can use the cached entry.
self.cache_used = true;
cache.as_ptr().write(entry);
cache
};
// Replace the tail with the new entry.
match mem::replace(&mut self.tail, Some(entry)) {
None => self.head = Some(entry),
Some(t) => t.as_ref().next.set(Some(entry)),
}
// If there were no unnotified entries, this one is the first now.
if self.start.is_none() {
self.start = self.tail;
}
// Bump the entry count.
self.len += 1;
entry
}
}
/// Removes an entry from the list and returns its state.
fn remove(&mut self, entry: NonNull<Entry<T>>, cache: NonNull<Entry<T>>) -> State<T> {
unsafe {
let prev = entry.as_ref().prev.get();
let next = entry.as_ref().next.get();
// Unlink from the previous entry.
match prev {
None => self.head = next,
Some(p) => p.as_ref().next.set(next),
}
// Unlink from the next entry.
match next {
None => self.tail = prev,
Some(n) => n.as_ref().prev.set(prev),
}
// If this was the first unnotified entry, move the pointer to the next one.
if self.start == Some(entry) {
self.start = next;
}
// Extract the state.
let state = if ptr::eq(entry.as_ptr(), cache.as_ptr()) {
// Free the cached entry.
self.cache_used = false;
entry.as_ref().state.replace(State::Created)
} else {
// Deallocate the entry.
Box::from_raw(entry.as_ptr()).state.replace(State::Created)
};
// Update the counters.
if state.is_notified() {
self.notified -= 1;
}
self.len -= 1;
state
}
}
/// Notifies a number of entries.
#[cold]
fn notify(&mut self, mut notify: impl Notification<Tag = T>) -> usize {
let mut n = notify.count(Internal::new());
if !notify.is_additional(Internal::new()) {
if n <= self.notified {
return 0;
}
n -= self.notified;
}
let count = n;
while n > 0 {
n -= 1;
// Notify the first unnotified entry.
match self.start {
None => return count - n - 1,
Some(e) => {
// Get the entry and move the pointer forward.
let e = unsafe { e.as_ref() };
self.start = e.next.get();
// Set the state of this entry to `Notified` and notify.
match e.state.replace(State::Notified {
additional: notify.is_additional(Internal::new()),
tag: Some(notify.next_tag(Internal::new())),
}) {
State::Notified { .. } => {}
State::Created => {}
State::Polling(w) => w.wake(),
State::Waiting(t) => t.unpark(),
}
// Update the counter.
self.notified += 1;
}
}
}
count - n
}
}
/// The state of a listener.
enum State<T> {
/// It has just been created.
Created,
/// It has received a notification.
Notified {
/// This is `true` if this was an "additional" notification.
additional: bool,
/// The tag associated with this event.
tag: Option<T>,
},
/// An async task is polling it.
Polling(Waker),
/// A thread is blocked on it.
Waiting(Thread),
}
impl<T> State<T> {
/// Returns `true` if this is the `Notified` state.
#[inline]
fn is_notified(&self) -> bool {
match self {
State::Notified { .. } => true,
State::Created | State::Polling(_) | State::Waiting(_) => false,
}
}
/// Take the tag out if it exists.
#[inline]
fn take(&mut self) -> Option<T> {
match self {
State::Notified { tag, .. } => Some(tag.take().expect("tag taken twice")),
_ => None,
}
}
}
/// Take the `State` out of the slot without moving out the tag.
#[inline]
fn take_state<T>(state: &Cell<State<T>>) -> State<T> {
match state.replace(State::Notified {
additional: false,
tag: None,
}) {
State::Notified { additional, tag } => {
// Replace the tag so remove() can take it out.
state.replace(State::Notified { additional, tag })
}
state => state,
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,249 +0,0 @@
//! An operation that can be delayed.
//! The node that makes up queues.
use crate::notify::{GenericNotify, Internal, NotificationPrivate, TagProducer};
use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use crate::sync::Arc;
use crate::sys::ListenerSlab;
use crate::{State, Task};
use alloc::boxed::Box;
use core::fmt;
use core::marker::PhantomData;
use core::mem;
use core::num::NonZeroUsize;
use core::ptr;
pub(crate) struct NothingProducer<T>(PhantomData<T>);
impl<T> Default for NothingProducer<T> {
fn default() -> Self {
Self(PhantomData)
}
}
impl<T> fmt::Debug for NothingProducer<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NothingProducer").finish()
}
}
impl<T> TagProducer for NothingProducer<T> {
type Tag = T;
fn next_tag(&mut self) -> Self::Tag {
// This has to be a zero-sized type with no drop handler.
assert_eq!(mem::size_of::<Self::Tag>(), 0);
assert!(!mem::needs_drop::<Self::Tag>());
// SAFETY: As this is a ZST without a drop handler, zero is valid.
unsafe { mem::zeroed() }
}
}
/// A node in the backup queue.
pub(crate) enum Node<T> {
/// This node is requesting to add a listener.
// For some reason, the MSRV build says this variant is never constructed.
#[allow(dead_code)]
AddListener {
/// The state of the listener that wants to be added.
task_waiting: Arc<TaskWaiting>,
},
/// This node is notifying a listener.
Notify(GenericNotify<NothingProducer<T>>),
/// This node is removing a listener.
RemoveListener {
/// The ID of the listener to remove.
listener: NonZeroUsize,
/// Whether to propagate notifications to the next listener.
propagate: bool,
},
/// We are waiting for the mutex to lock, so they can manipulate it.
Waiting(Task),
}
impl<T> fmt::Debug for Node<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::AddListener { .. } => f.write_str("AddListener"),
Self::Notify(notify) => f
.debug_struct("Notify")
.field("count", &notify.count(Internal::new()))
.field("is_additional", &notify.is_additional(Internal::new()))
.finish(),
Self::RemoveListener {
listener,
propagate,
} => f
.debug_struct("RemoveListener")
.field("listener", listener)
.field("propagate", propagate)
.finish(),
Self::Waiting(_) => f.write_str("Waiting"),
}
}
}
#[derive(Debug)]
pub(crate) struct TaskWaiting {
/// The task that is being waited on.
task: AtomicCell<Task>,
/// The ID of the new entry.
///
/// This is set to zero when the task is still queued, or usize::MAX when the node should not
/// be added at all.
entry_id: AtomicUsize,
}
impl<T> Node<T> {
pub(crate) fn listener() -> (Self, Arc<TaskWaiting>) {
// Create a new `TaskWaiting` structure.
let task_waiting = Arc::new(TaskWaiting {
task: AtomicCell::new(),
entry_id: AtomicUsize::new(0),
});
(
Self::AddListener {
task_waiting: task_waiting.clone(),
},
task_waiting,
)
}
/// Apply the node to the list.
pub(super) fn apply(self, list: &mut ListenerSlab<T>) -> Option<Task> {
match self {
Node::AddListener { task_waiting } => {
// If we're cancelled, do nothing.
if task_waiting.entry_id.load(Ordering::Relaxed) == usize::MAX {
return task_waiting.task.take().map(|t| *t);
}
// Add a new entry to the list.
let key = list.insert(State::Created);
assert!(key.get() != usize::MAX);
// Send the new key to the listener and wake it if necessary.
let old_value = task_waiting.entry_id.swap(key.get(), Ordering::Release);
// If we're cancelled, remove ourselves from the list.
if old_value == usize::MAX {
list.remove(key, false);
}
return task_waiting.task.take().map(|t| *t);
}
Node::Notify(notify) => {
// Notify the next `count` listeners.
list.notify(notify);
}
Node::RemoveListener {
listener,
propagate,
} => {
// Remove the listener from the list.
list.remove(listener, propagate);
}
Node::Waiting(task) => {
return Some(task);
}
}
None
}
}
impl TaskWaiting {
/// Determine if we are still queued.
///
/// Returns `Some` with the entry ID if we are no longer queued.
pub(crate) fn status(&self) -> Option<NonZeroUsize> {
NonZeroUsize::new(self.entry_id.load(Ordering::Acquire))
}
/// Register a listener.
pub(crate) fn register(&self, task: Task) {
// Set the task.
if let Some(task) = self.task.replace(Some(Box::new(task))) {
task.wake();
}
// If the entry ID is non-zero, then we are no longer queued.
if self.status().is_some() {
// Wake the task.
if let Some(task) = self.task.take() {
task.wake();
}
}
}
/// Mark this listener as cancelled, indicating that it should not be inserted into the list.
///
/// If this listener was already inserted into the list, returns the entry ID. Otherwise returns
/// `None`.
pub(crate) fn cancel(&self) -> Option<NonZeroUsize> {
// Set the entry ID to usize::MAX.
let id = self.entry_id.swap(usize::MAX, Ordering::Release);
// Wake the task.
if let Some(task) = self.task.take() {
task.wake();
}
// Return the entry ID if we were queued.
NonZeroUsize::new(id)
}
}
/// A shared pointer to a value.
///
/// The inner value is a `Box<T>`.
#[derive(Debug)]
struct AtomicCell<T>(AtomicPtr<T>);
impl<T> AtomicCell<T> {
/// Create a new `AtomicCell`.
fn new() -> Self {
Self(AtomicPtr::new(ptr::null_mut()))
}
/// Swap the value out.
fn replace(&self, value: Option<Box<T>>) -> Option<Box<T>> {
let old_value = match value {
Some(value) => self.0.swap(Box::into_raw(value), Ordering::AcqRel),
// Acquire is needed to synchronize with the store of a non-null ptr, but since a null ptr
// will never be dereferenced, there is no need to synchronize the store of a null ptr.
None => self.0.swap(ptr::null_mut(), Ordering::Acquire),
};
if old_value.is_null() {
None
} else {
// SAFETY:
// - AcqRel/Acquire ensures that it does not read a pointer to potentially invalid memory.
// - We've checked that old_value is not null.
// - We do not store invalid pointers other than null in self.0.
Some(unsafe { Box::from_raw(old_value) })
}
}
/// Take the value out.
fn take(&self) -> Option<Box<T>> {
self.replace(None)
}
}
impl<T> Drop for AtomicCell<T> {
fn drop(&mut self) {
self.take();
}
}

View File

@ -1,9 +1,10 @@
//! The `Notification` trait for specifying notification.
use crate::sync::atomic::{self, Ordering};
#[cfg(feature = "std")]
use core::fmt;
use crate::sync::atomic::{self, Ordering};
pub(crate) use __private::Internal;
/// The type of notification to use with an [`Event`].
@ -215,16 +216,8 @@ pub struct TagWith<N: ?Sized, F> {
#[cfg(feature = "std")]
impl<N: fmt::Debug, F> fmt::Debug for TagWith<N, F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
struct Ellipses;
impl fmt::Debug for Ellipses {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("..")
}
}
f.debug_struct("TagWith")
.field("tag", &Ellipses)
.field("tag", &format_args!(".."))
.field("inner", &self.inner)
.finish()
}
@ -342,7 +335,7 @@ impl<T, F: FnMut() -> T> TagProducer for F {
/// into this:
///
/// ```
/// use event_listener::{Event, IntoNotification, Listener};
/// use event_listener::{Event, IntoNotification};
///
/// let event = Event::new();
///
@ -406,7 +399,7 @@ pub trait IntoNotification: __private::Sealed {
/// # Examples
///
/// ```
/// use event_listener::{Event, IntoNotification, Listener};
/// use event_listener::{Event, IntoNotification};
///
/// let event = Event::new();
///
@ -442,7 +435,7 @@ pub trait IntoNotification: __private::Sealed {
/// # Examples
///
/// ```
/// use event_listener::{Event, IntoNotification, Listener};
/// use event_listener::{Event, IntoNotification};
/// use std::sync::atomic::{self, Ordering};
///
/// let event = Event::new();
@ -483,7 +476,7 @@ pub trait IntoNotification: __private::Sealed {
/// # Examples
///
/// ```
/// use event_listener::{IntoNotification, Listener, Event};
/// use event_listener::{IntoNotification, Event, Listener};
///
/// let event = Event::<bool>::with_tag();
///
@ -517,7 +510,7 @@ pub trait IntoNotification: __private::Sealed {
/// # Examples
///
/// ```
/// use event_listener::{IntoNotification, Listener, Event};
/// use event_listener::{IntoNotification, Event, Listener};
///
/// let event = Event::<bool>::with_tag();
///
@ -575,7 +568,7 @@ impl_for_numeric_types! { usize u8 u16 u32 u64 u128 isize i8 i16 i32 i64 i128 }
/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
#[inline]
pub(super) fn full_fence() {
#[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), not(miri), not(loom)))]
#[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), not(miri)))]
{
use core::{arch::asm, cell::UnsafeCell};
// HACK(stjepang): On x86 architectures there are two different ways of executing
@ -620,3 +613,13 @@ mod __private {
pub trait Sealed {}
impl<N: super::NotificationPrivate + ?Sized> Sealed for N {}
}
#[cfg(test)]
mod tests {
#[test]
#[should_panic]
fn negative_notification_should_panic() {
let event = crate::Event::new();
event.notify(-1);
}
}

View File

@ -1,397 +0,0 @@
//! libstd-based implementation of `event-listener`.
//!
//! This implementation crates an intrusive linked list of listeners.
use crate::notify::{GenericNotify, Internal, Notification};
use crate::sync::atomic::Ordering;
use crate::sync::cell::{Cell, UnsafeCell};
use crate::sync::{Mutex, MutexGuard};
use crate::{RegisterResult, State, TaskRef};
use core::marker::PhantomPinned;
use core::mem;
use core::ops::{Deref, DerefMut};
use core::pin::Pin;
use core::ptr::NonNull;
pub(super) struct List<T>(Mutex<Inner<T>>);
struct Inner<T> {
/// The head of the linked list.
head: Option<NonNull<Link<T>>>,
/// The tail of the linked list.
tail: Option<NonNull<Link<T>>>,
/// The first unnotified listener.
next: Option<NonNull<Link<T>>>,
/// Total number of listeners.
len: usize,
/// The number of notified listeners.
notified: usize,
}
impl<T> List<T> {
/// Create a new, empty event listener list.
pub(super) fn new() -> Self {
Self(Mutex::new(Inner {
head: None,
tail: None,
next: None,
len: 0,
notified: 0,
}))
}
/// Get the total number of listeners without blocking.
pub(crate) fn try_total_listeners(&self) -> Option<usize> {
self.0.try_lock().ok().map(|list| list.len)
}
/// Get the total number of listeners with blocking.
pub(crate) fn total_listeners(&self) -> usize {
self.0.lock().unwrap_or_else(|e| e.into_inner()).len
}
}
impl<T> crate::Inner<T> {
fn lock(&self) -> ListLock<'_, '_, T> {
ListLock {
inner: self,
lock: self.list.0.lock().unwrap_or_else(|e| e.into_inner()),
}
}
/// Whether or not this number of listeners would lead to a notification.
pub(crate) fn needs_notification(&self, limit: usize) -> bool {
self.notified.load(Ordering::Acquire) < limit
}
/// Add a new listener to the list.
pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
let mut inner = self.lock();
listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(inner.tail),
next: Cell::new(None),
}),
_pin: PhantomPinned,
}));
let listener = listener.as_pin_mut().unwrap();
{
let entry_guard = listener.link.get();
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };
// Replace the tail with the new entry.
match mem::replace(&mut inner.tail, Some(entry.into())) {
None => inner.head = Some(entry.into()),
Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
};
}
// If there are no unnotified entries, this is the first one.
if inner.next.is_none() {
inner.next = inner.tail;
}
// Bump the entry count.
inner.len += 1;
}
/// Remove a listener from the list.
pub(crate) fn remove(
&self,
listener: Pin<&mut Option<Listener<T>>>,
propagate: bool,
) -> Option<State<T>> {
self.lock().remove(listener, propagate)
}
/// Notifies a number of entries.
#[cold]
pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
self.lock().notify(notify)
}
/// Register a task to be notified when the event is triggered.
///
/// Returns `true` if the listener was already notified, and `false` otherwise. If the listener
/// isn't inserted, returns `None`.
pub(crate) fn register(
&self,
mut listener: Pin<&mut Option<Listener<T>>>,
task: TaskRef<'_>,
) -> RegisterResult<T> {
let mut inner = self.lock();
let entry_guard = match listener.as_mut().as_pin_mut() {
Some(listener) => listener.link.get(),
None => return RegisterResult::NeverInserted,
};
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };
// Take out the state and check it.
match entry.state.replace(State::NotifiedTaken) {
State::Notified { tag, .. } => {
// We have been notified, remove the listener.
inner.remove(listener, false);
RegisterResult::Notified(tag)
}
State::Task(other_task) => {
// Only replace the task if it's different.
entry.state.set(State::Task({
if !task.will_wake(other_task.as_task_ref()) {
task.into_task()
} else {
other_task
}
}));
RegisterResult::Registered
}
_ => {
// We have not been notified, register the task.
entry.state.set(State::Task(task.into_task()));
RegisterResult::Registered
}
}
}
}
impl<T> Inner<T> {
fn remove(
&mut self,
mut listener: Pin<&mut Option<Listener<T>>>,
propagate: bool,
) -> Option<State<T>> {
let entry_guard = listener.as_mut().as_pin_mut()?.link.get();
let entry = unsafe { entry_guard.deref() };
let prev = entry.prev.get();
let next = entry.next.get();
// Unlink from the previous entry.
match prev {
None => self.head = next,
Some(p) => unsafe {
p.as_ref().next.set(next);
},
}
// Unlink from the next entry.
match next {
None => self.tail = prev,
Some(n) => unsafe {
n.as_ref().prev.set(prev);
},
}
// If this was the first unnotified entry, update the next pointer.
if self.next == Some(entry.into()) {
self.next = next;
}
// The entry is now fully unlinked, so we can now take it out safely.
let entry = unsafe {
listener
.get_unchecked_mut()
.take()
.unwrap()
.link
.into_inner()
};
// This State::Created is immediately dropped and exists as a workaround for the absence of
// loom::cell::Cell::into_inner. The intent is `let mut state = entry.state.into_inner();`
//
// refs: https://github.com/tokio-rs/loom/pull/341
let mut state = entry.state.replace(State::Created);
// Update the notified count.
if state.is_notified() {
self.notified -= 1;
if propagate {
let state = mem::replace(&mut state, State::NotifiedTaken);
if let State::Notified { additional, tag } = state {
let tags = {
let mut tag = Some(tag);
move || tag.take().expect("tag already taken")
};
self.notify(GenericNotify::new(1, additional, tags));
}
}
}
self.len -= 1;
Some(state)
}
#[cold]
fn notify(&mut self, mut notify: impl Notification<Tag = T>) -> usize {
let mut n = notify.count(Internal::new());
let is_additional = notify.is_additional(Internal::new());
if !is_additional {
if n < self.notified {
return 0;
}
n -= self.notified;
}
let original_count = n;
while n > 0 {
n -= 1;
// Notify the next entry.
match self.next {
None => return original_count - n - 1,
Some(e) => {
// Get the entry and move the pointer forwards.
let entry = unsafe { e.as_ref() };
self.next = entry.next.get();
// Set the state to `Notified` and notify.
let tag = notify.next_tag(Internal::new());
if let State::Task(task) = entry.state.replace(State::Notified {
additional: is_additional,
tag,
}) {
task.wake();
}
// Bump the notified count.
self.notified += 1;
}
}
}
original_count - n
}
}
struct ListLock<'a, 'b, T> {
lock: MutexGuard<'a, Inner<T>>,
inner: &'b crate::Inner<T>,
}
impl<T> Deref for ListLock<'_, '_, T> {
type Target = Inner<T>;
fn deref(&self) -> &Self::Target {
&self.lock
}
}
impl<T> DerefMut for ListLock<'_, '_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.lock
}
}
impl<T> Drop for ListLock<'_, '_, T> {
fn drop(&mut self) {
let list = &mut **self;
// Update the notified count.
let notified = if list.notified < list.len {
list.notified
} else {
core::usize::MAX
};
self.inner.notified.store(notified, Ordering::Release);
}
}
pub(crate) struct Listener<T> {
/// The inner link in the linked list.
///
/// # Safety
///
/// This can only be accessed while the central mutex is locked.
link: UnsafeCell<Link<T>>,
/// This listener cannot be moved after being pinned.
_pin: PhantomPinned,
}
struct Link<T> {
/// The current state of the listener.
state: Cell<State<T>>,
/// The previous link in the linked list.
prev: Cell<Option<NonNull<Link<T>>>>,
/// The next link in the linked list.
next: Cell<Option<NonNull<Link<T>>>>,
}
#[cfg(test)]
mod tests {
use super::*;
use futures_lite::pin;
#[cfg(target_family = "wasm")]
use wasm_bindgen_test::wasm_bindgen_test as test;
macro_rules! make_listeners {
($($id:ident),*) => {
$(
let $id = Option::<Listener<()>>::None;
pin!($id);
)*
};
}
#[test]
fn insert() {
let inner = crate::Inner::new();
make_listeners!(listen1, listen2, listen3);
// Register the listeners.
inner.insert(listen1.as_mut());
inner.insert(listen2.as_mut());
inner.insert(listen3.as_mut());
assert_eq!(inner.lock().len, 3);
// Remove one.
assert_eq!(inner.remove(listen2, false), Some(State::Created));
assert_eq!(inner.lock().len, 2);
// Remove another.
assert_eq!(inner.remove(listen1, false), Some(State::Created));
assert_eq!(inner.lock().len, 1);
}
#[test]
fn drop_non_notified() {
let inner = crate::Inner::new();
make_listeners!(listen1, listen2, listen3);
// Register the listeners.
inner.insert(listen1.as_mut());
inner.insert(listen2.as_mut());
inner.insert(listen3.as_mut());
// Notify one.
inner.notify(GenericNotify::new(1, false, || ()));
// Remove one.
inner.remove(listen3, true);
// Remove the rest.
inner.remove(listen1, true);
inner.remove(listen2, true);
}
}

133
tests/counter.rs Normal file
View File

@ -0,0 +1,133 @@
//! A simple test case using a "counter" type.
use event_listener::Event;
use futures_lite::future::{block_on, poll_once};
use std::sync::atomic::{fence, AtomicUsize, Ordering};
use std::sync::{mpsc, Arc, Barrier};
use std::thread;
struct Counter {
counter: AtomicUsize,
/// Signalled once `counter` has been changed.
changed: Event,
}
impl Counter {
fn new() -> Self {
Self {
counter: AtomicUsize::new(0),
changed: Event::new(),
}
}
/// Wait for the counter to be incremented.
async fn change(&self) -> usize {
let original = self.counter.load(Ordering::Acquire);
let mut current = original;
loop {
if current != original {
return current;
}
// Start listening.
let listener = self.changed.listen();
// Try again.
current = self.counter.load(Ordering::Acquire);
if current != original {
return current;
}
// Wait for a change to be notified.
listener.await;
// Update the counter.
current = self.counter.load(Ordering::Acquire);
}
}
/// Increment the counter.
fn increment(&self) {
self.counter.fetch_add(1, Ordering::Relaxed);
self.changed.notify_additional(usize::MAX);
}
}
#[test]
fn counter() {
let counter = Arc::new(Counter::new());
let (send, recv) = mpsc::channel();
thread::spawn({
let counter = counter.clone();
move || {
// Test normal.
recv.recv().unwrap();
counter.increment();
// Test relaxed.
recv.recv().unwrap();
counter.counter.fetch_add(1, Ordering::Relaxed);
fence(Ordering::SeqCst);
counter.changed.notify_additional_relaxed(usize::MAX);
counter.changed.notify_additional_relaxed(usize::MAX);
}
});
thread::spawn(move || {
let waiter = counter.change();
futures_lite::pin!(waiter);
assert!(block_on(poll_once(waiter.as_mut())).is_none());
send.send(()).unwrap();
assert_eq!(block_on(waiter), 1);
let waiter1 = counter.change();
let waiter2 = counter.change();
futures_lite::pin!(waiter1);
futures_lite::pin!(waiter2);
assert!(block_on(poll_once(waiter1.as_mut())).is_none());
assert!(block_on(poll_once(waiter2.as_mut())).is_none());
send.send(()).unwrap();
assert_eq!(block_on(waiter1), 2);
assert_eq!(block_on(waiter2), 2);
});
#[cfg(miri)]
thread::sleep(std::time::Duration::from_secs(5));
}
#[test]
fn simultaneous_notification() {
let thread_count = if cfg!(miri) { 10 } else { 1000 };
let barrier = Arc::new(Barrier::new(thread_count + 1));
let counter = Arc::new(Counter::new());
for _ in 0..thread_count {
let barrier = barrier.clone();
let counter = counter.clone();
thread::spawn(move || {
// Wait for a listener to be created.
barrier.wait();
// Notify all at the same time.
counter.increment();
});
}
// Wait for a notification.
let listener = counter.change();
futures_lite::pin!(listener);
assert!(block_on(poll_once(listener.as_mut())).is_none());
// Signal the other threads.
barrier.wait();
// Wait to be notified.
thread::sleep(std::time::Duration::from_secs(3));
block_on(listener);
}

View File

@ -1,212 +0,0 @@
#![cfg(loom)]
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::Context;
use std::usize;
use event_listener::{Event, EventListener};
use waker_fn::waker_fn;
#[cfg(target_family = "wasm")]
use wasm_bindgen_test::wasm_bindgen_test as test;
fn is_notified(listener: &mut EventListener) -> bool {
let waker = waker_fn(|| ());
Pin::new(listener)
.poll(&mut Context::from_waker(&waker))
.is_ready()
}
#[test]
fn notify() {
loom::model(|| {
let event = Event::new();
let mut l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
assert!(!is_notified(&mut l1));
assert!(!is_notified(&mut l2));
assert!(!is_notified(&mut l3));
assert_eq!(event.notify(2), 2);
assert_eq!(event.notify(1), 0);
assert!(is_notified(&mut l1));
assert!(is_notified(&mut l2));
assert!(!is_notified(&mut l3));
});
}
#[test]
fn notify_additional() {
loom::model(|| {
let event = Event::new();
let mut l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
assert_eq!(event.notify_additional(1), 1);
assert_eq!(event.notify(1), 0);
assert_eq!(event.notify_additional(1), 1);
assert!(is_notified(&mut l1));
assert!(is_notified(&mut l2));
assert!(!is_notified(&mut l3));
})
}
#[test]
fn notify_one() {
loom::model(|| {
let event = Event::new();
let mut l1 = event.listen();
let mut l2 = event.listen();
assert!(!is_notified(&mut l1));
assert!(!is_notified(&mut l2));
assert_eq!(event.notify(1), 1);
assert!(is_notified(&mut l1));
assert!(!is_notified(&mut l2));
assert_eq!(event.notify(1), 1);
assert!(is_notified(&mut l2));
});
}
#[test]
fn notify_all() {
loom::model(|| {
let event = Event::new();
let mut l1 = event.listen();
let mut l2 = event.listen();
assert!(!is_notified(&mut l1));
assert!(!is_notified(&mut l2));
assert_eq!(event.notify(usize::MAX), 2);
assert!(is_notified(&mut l1));
assert!(is_notified(&mut l2));
});
}
#[test]
fn drop_notified() {
loom::model(|| {
let event = Event::new();
let l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
assert_eq!(event.notify(1), 1);
drop(l1);
assert!(is_notified(&mut l2));
assert!(!is_notified(&mut l3));
});
}
#[test]
fn drop_notified2() {
loom::model(|| {
let event = Event::new();
let l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
assert_eq!(event.notify(2), 2);
drop(l1);
assert!(is_notified(&mut l2));
assert!(!is_notified(&mut l3));
});
}
#[test]
fn drop_notified_additional() {
loom::model(|| {
let event = Event::new();
let l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
let mut l4 = event.listen();
assert_eq!(event.notify_additional(1), 1);
assert_eq!(event.notify(2), 1);
drop(l1);
assert!(is_notified(&mut l2));
assert!(is_notified(&mut l3));
assert!(!is_notified(&mut l4));
});
}
#[test]
fn drop_non_notified() {
loom::model(|| {
let event = Event::new();
let mut l1 = event.listen();
let mut l2 = event.listen();
let l3 = event.listen();
assert_eq!(event.notify(1), 1);
drop(l3);
assert!(is_notified(&mut l1));
assert!(!is_notified(&mut l2));
})
}
#[test]
fn notify_all_fair() {
loom::model(|| {
let event = Event::new();
let v = Arc::new(Mutex::new(vec![]));
let mut l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
let waker1 = {
let v = v.clone();
waker_fn(move || v.lock().unwrap().push(1))
};
let waker2 = {
let v = v.clone();
waker_fn(move || v.lock().unwrap().push(2))
};
let waker3 = {
let v = v.clone();
waker_fn(move || v.lock().unwrap().push(3))
};
assert!(Pin::new(&mut l1)
.poll(&mut Context::from_waker(&waker1))
.is_pending());
assert!(Pin::new(&mut l2)
.poll(&mut Context::from_waker(&waker2))
.is_pending());
assert!(Pin::new(&mut l3)
.poll(&mut Context::from_waker(&waker3))
.is_pending());
assert_eq!(event.notify(usize::MAX), 3);
assert_eq!(&*v.lock().unwrap(), &[1, 2, 3]);
assert!(Pin::new(&mut l1)
.poll(&mut Context::from_waker(&waker1))
.is_ready());
assert!(Pin::new(&mut l2)
.poll(&mut Context::from_waker(&waker2))
.is_ready());
assert!(Pin::new(&mut l3)
.poll(&mut Context::from_waker(&waker3))
.is_ready());
})
}

View File

@ -4,12 +4,9 @@ use std::sync::{Arc, Mutex};
use std::task::Context;
use std::usize;
use event_listener::{Event, EventListener};
use event_listener::{Event, EventListener, Listener};
use waker_fn::waker_fn;
#[cfg(target_family = "wasm")]
use wasm_bindgen_test::wasm_bindgen_test as test;
fn is_notified(listener: &mut EventListener) -> bool {
let waker = waker_fn(|| ());
Pin::new(listener)
@ -17,6 +14,20 @@ fn is_notified(listener: &mut EventListener) -> bool {
.is_ready()
}
#[test]
fn debug() {
let event = Event::new();
let fmt = format!("{:?}", &event);
assert!(fmt.contains("Event"));
let listener = event.listen();
let fmt = format!("{:?}", &listener);
assert!(fmt.contains("EventListener"));
let fmt = format!("{:?}", &event);
assert!(fmt.contains("Event"));
}
#[test]
fn notify() {
let event = Event::new();
@ -25,13 +36,15 @@ fn notify() {
let mut l2 = event.listen();
let mut l3 = event.listen();
assert!(!event.is_notified());
assert!(!is_notified(&mut l1));
assert!(!is_notified(&mut l2));
assert!(!is_notified(&mut l3));
assert_eq!(event.notify(2), 2);
assert!(event.is_notified());
assert_eq!(event.notify(1), 0);
assert!(event.is_notified());
assert!(is_notified(&mut l1));
assert!(is_notified(&mut l2));
assert!(!is_notified(&mut l3));
@ -45,9 +58,54 @@ fn notify_additional() {
let mut l2 = event.listen();
let mut l3 = event.listen();
assert!(!event.is_notified());
assert_eq!(event.notify_additional(1), 1);
assert_eq!(event.notify(1), 0);
assert_eq!(event.notify_additional(1), 1);
assert!(event.is_notified());
assert!(is_notified(&mut l1));
assert!(is_notified(&mut l2));
assert!(!is_notified(&mut l3));
}
#[test]
fn notify_zero() {
let event = Event::new();
assert_eq!(event.notify(1), 0);
assert!(!event.is_notified());
}
#[test]
fn notify_relaxed() {
let event = Event::new();
let mut l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
assert!(!is_notified(&mut l1));
assert!(!is_notified(&mut l2));
assert!(!is_notified(&mut l3));
assert_eq!(event.notify_relaxed(2), 2);
assert_eq!(event.notify_relaxed(1), 0);
assert!(is_notified(&mut l1));
assert!(is_notified(&mut l2));
assert!(!is_notified(&mut l3));
}
#[test]
fn notify_additional_relaxed() {
let event = Event::new();
let mut l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
assert_eq!(event.notify_additional_relaxed(1), 1);
assert_eq!(event.notify_relaxed(1), 0);
assert_eq!(event.notify_additional_relaxed(1), 1);
assert!(is_notified(&mut l1));
assert!(is_notified(&mut l2));
@ -81,8 +139,10 @@ fn notify_all() {
assert!(!is_notified(&mut l1));
assert!(!is_notified(&mut l2));
assert!(!event.is_notified());
assert_eq!(event.notify(usize::MAX), 2);
assert!(event.is_notified());
assert!(is_notified(&mut l1));
assert!(is_notified(&mut l2));
}
@ -95,7 +155,7 @@ fn drop_notified() {
let mut l2 = event.listen();
let mut l3 = event.listen();
assert_eq!(event.notify(1), 1);
event.notify(1);
drop(l1);
assert!(is_notified(&mut l2));
assert!(!is_notified(&mut l3));
@ -146,6 +206,55 @@ fn drop_non_notified() {
assert!(!is_notified(&mut l2));
}
#[test]
fn discard() {
let event = Event::default();
let l1 = event.listen();
assert!(!l1.discard());
let l1 = event.listen();
event.notify(1);
assert!(l1.discard());
let l1 = event.listen();
event.notify_additional(1);
assert!(l1.discard());
let mut l1 = event.listen();
event.notify(1);
assert!(is_notified(&mut l1));
assert!(!l1.discard());
}
#[test]
fn same_event() {
let e1 = Event::new();
let e2 = Event::new();
let l1 = e1.listen();
let l2 = e1.listen();
let l3 = e2.listen();
assert!(l1.listens_to(&e1));
assert!(!l1.listens_to(&e2));
assert!(l1.same_event(&l2));
assert!(!l1.same_event(&l3));
}
#[test]
#[should_panic = "cannot poll a completed `EventListener` future"]
fn poll_twice() {
let event = Event::new();
let mut l1 = event.listen();
event.notify(1);
assert!(is_notified(&mut l1));
// Panic here.
is_notified(&mut l1);
}
#[test]
fn notify_all_fair() {
let event = Event::new();

128
tests/park.rs Normal file
View File

@ -0,0 +1,128 @@
//! Test the wait() family of methods.
#![cfg(feature = "std")]
use event_listener::{Event, EventListener, IntoNotification, Listener};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::thread;
use std::time::{Duration, Instant};
use waker_fn::waker_fn;
fn is_notified(listener: &mut EventListener) -> bool {
let waker = waker_fn(|| ());
Pin::new(listener)
.poll(&mut Context::from_waker(&waker))
.is_ready()
}
#[test]
fn total_listeners() {
let event = Event::new();
assert_eq!(event.total_listeners(), 0);
let listener = event.listen();
assert_eq!(event.total_listeners(), 1);
drop(listener);
assert_eq!(event.total_listeners(), 0);
}
#[test]
fn wait() {
let event = Event::new();
let listener = event.listen();
assert_eq!(event.notify(1), 1);
listener.wait();
}
#[test]
fn wait_tag() {
let event = Event::with_tag();
let listener = event.listen();
assert_eq!(event.notify(1.tag(5i32)), 1);
assert_eq!(listener.wait(), 5i32);
}
#[test]
fn wait_timeout() {
let event = Event::new();
let listener = event.listen();
assert_eq!(event.notify(1), 1);
assert_eq!(listener.wait_timeout(Duration::from_millis(50)), Some(()));
}
#[test]
fn wait_deadline() {
let event = Event::new();
let listener = event.listen();
assert_eq!(event.notify(1), 1);
assert_eq!(
listener.wait_deadline(Instant::now() + Duration::from_millis(50)),
Some(())
);
}
#[test]
fn wait_timeout_expiry() {
let event = Event::new();
let listener = event.listen();
let start = Instant::now();
assert_eq!(listener.wait_timeout(Duration::from_millis(200)), None);
assert!(Instant::now().duration_since(start) >= Duration::from_millis(200));
}
#[test]
fn unpark() {
let event = Arc::new(Event::new());
let listener = event.listen();
thread::spawn({
let event = event.clone();
move || {
thread::sleep(Duration::from_millis(100));
event.notify(1);
}
});
listener.wait();
}
#[test]
fn unpark_timeout() {
let event = Arc::new(Event::new());
let listener = event.listen();
thread::spawn({
let event = event.clone();
move || {
thread::sleep(Duration::from_millis(100));
event.notify(1);
}
});
let x = listener.wait_timeout(Duration::from_millis(200));
assert!(x.is_some());
}
#[test]
#[should_panic = "cannot wait twice on an `EventListener`"]
fn wait_twice() {
let event = Event::new();
let mut listener = event.listen();
event.notify(1);
assert!(is_notified(&mut listener));
// Panic here.
listener.wait();
}

120
tests/tag.rs Normal file
View File

@ -0,0 +1,120 @@
//! Tests relating to tagging.
#![cfg(feature = "std")]
use std::future::Future;
use std::panic;
use std::pin::Pin;
use std::task::{Context, Poll};
use event_listener::{Event, EventListener, IntoNotification};
use waker_fn::waker_fn;
fn notified<T>(listener: &mut EventListener<T>) -> Option<T> {
let waker = waker_fn(|| ());
match Pin::new(listener).poll(&mut Context::from_waker(&waker)) {
Poll::Ready(tag) => Some(tag),
Poll::Pending => None,
}
}
#[test]
fn notify_tag() {
let event = Event::with_tag();
let mut l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
assert!(notified(&mut l1).is_none());
assert!(notified(&mut l2).is_none());
assert!(notified(&mut l3).is_none());
assert_eq!(event.notify(2.tag(true)), 2);
assert_eq!(event.notify(1.tag(false)), 0);
assert_eq!(notified(&mut l1), Some(true));
assert_eq!(notified(&mut l2), Some(true));
assert!(notified(&mut l3).is_none());
}
#[test]
fn notify_additional_tag() {
let event = Event::with_tag();
let mut l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
assert_eq!(event.notify(1.additional().tag(1)), 1);
assert_eq!(event.notify(1.tag(2)), 0);
assert_eq!(event.notify(1.additional().tag(3)), 1);
assert_eq!(notified(&mut l1), Some(1));
assert_eq!(notified(&mut l2), Some(3));
assert!(notified(&mut l3).is_none());
}
#[test]
fn notify_with_tag() {
let event = Event::with_tag();
let mut l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
assert!(notified(&mut l1).is_none());
assert!(notified(&mut l2).is_none());
assert!(notified(&mut l3).is_none());
let mut i = 0usize;
event.notify(2.tag_with(|| {
i += 1;
i
}));
assert_eq!(notified(&mut l1), Some(1));
assert_eq!(notified(&mut l2), Some(2));
assert!(notified(&mut l3).is_none());
}
#[test]
fn drop_notify_with_tag() {
let event = Event::with_tag();
let l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
assert_eq!(event.notify(1.tag_with(|| 5i32)), 1);
drop(l1);
assert_eq!(notified(&mut l2), Some(5));
assert!(notified(&mut l3).is_none());
}
#[test]
fn panic_with_tag() {
let event = Event::with_tag();
let mut l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
let mut l4 = event.listen();
let p = panic::catch_unwind(|| {
let mut i = 0usize;
event.notify(4.tag_with(|| {
i += 1;
if i > 2 {
panic!();
}
i
}));
});
assert!(p.is_err());
assert_eq!(notified(&mut l1), Some(1));
assert_eq!(notified(&mut l2), Some(2));
assert!(notified(&mut l3).is_none());
assert!(notified(&mut l4).is_none());
}