Initial commit

This commit is contained in:
Stjepan Glavina 2020-05-16 19:44:50 +02:00
commit 049f9a779b
13 changed files with 1341 additions and 0 deletions

1
.github/FUNDING.yml vendored Normal file
View File

@ -0,0 +1 @@
github: stjepang

44
.github/workflows/build-and-test.yaml vendored Normal file
View File

@ -0,0 +1,44 @@
name: Build and test
on:
push:
branches:
- master
pull_request:
jobs:
build_and_test:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest]
rust: [nightly, beta, stable]
steps:
- uses: actions/checkout@v2
- name: Set current week of the year in environnement
if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macOS')
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
- name: Set current week of the year in environnement
if: startsWith(matrix.os, 'windows')
run: echo "::set-env name=CURRENT_WEEK::$(Get-Date -UFormat %V)"
- name: Install latest ${{ matrix.rust }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
profile: minimal
override: true
- name: Run cargo check
uses: actions-rs/cargo@v1
with:
command: check
args: --all --benches --bins --examples --tests --all-features
- name: Run cargo test
uses: actions-rs/cargo@v1
with:
command: test

26
.github/workflows/lint.yaml vendored Normal file
View File

@ -0,0 +1,26 @@
name: Lint
on:
push:
branches:
- master
pull_request:
jobs:
clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set current week of the year in environnement
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
components: clippy
- uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --all-features -- -W clippy::all

20
.github/workflows/security.yaml vendored Normal file
View File

@ -0,0 +1,20 @@
name: Security audit
on:
push:
branches:
- master
pull_request:
jobs:
security_audit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set current week of the year in environnement
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
- uses: actions-rs/audit-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

20
CHANGELOG.md Normal file
View File

@ -0,0 +1,20 @@
# Version 0.3.2
- Make `Blocking` implement `Send` in more cases.
# Version 0.3.1
- Add `Blocking::with_mut()`.
# Version 0.3.0
- Remove `Blocking::spawn()`.
- Implement `Future` for `Blocking` only when the inner type is a `FnOnce`.
# Version 0.2.0
- Initial version
# Version 0.1.0
- Reserved crate name

6
Cargo.lock generated Normal file
View File

@ -0,0 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "event-listener"
version = "0.1.0"

13
Cargo.toml Normal file
View File

@ -0,0 +1,13 @@
[package]
name = "event-listener"
version = "1.0.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
description = "Notify async tasks or threads"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/stjepang/event-listener"
homepage = "https://github.com/stjepang/event-listener"
documentation = "https://docs.rs/event-listener"
keywords = ["condvar", "eventcount", "wake", "blocking", "park"]
categories = ["asynchronous", "concurrency"]
readme = "README.md"

201
LICENSE-APACHE Normal file
View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

23
LICENSE-MIT Normal file
View File

@ -0,0 +1,23 @@
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

86
README.md Normal file
View File

@ -0,0 +1,86 @@
# event-listener
[![Build](https://github.com/stjepang/event-listener/workflows/Build%20and%20test/badge.svg)](
https://github.com/stjepang/event-listener/actions)
[![License](https://img.shields.io/badge/license-MIT%2FApache--2.0-blue.svg)](
https://github.com/stjepang/event-listener)
[![Cargo](https://img.shields.io/crates/v/event-listener.svg)](
https://crates.io/crates/event-listener)
[![Documentation](https://docs.rs/event-listener/badge.svg)](
https://docs.rs/event-listener)
Notify async tasks or threads.
This is a synchronization primitive similar to [eventcounts] invented by Dmitry Vyukov.
You can use this crate to turn non-blocking data structures into async or blocking data
structures. See a [simple mutex] implementation that exposes an async and a blocking interface
for acquiring locks.
[eventcounts]: http://www.1024cores.net/home/lock-free-algorithms/eventcounts
[simple mutex]: TODO
## Examples
Wait until another thread sets a boolean flag:
```rust
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use event_listener::Event;
let flag = Arc::new(AtomicBool::new(false));
let event = Arc::new(Event::new());
// Spawn a thread that will set the flag after 1 second.
thread::spawn({
let flag = flag.clone();
let event = event.clone();
move || {
// Wait for a second.
thread::sleep(Duration::from_secs(1));
// Set the flag.
flag.store(true, Ordering::SeqCst);
// Notify all listeners that the flag has been set.
event.notify_all();
}
});
// Wait until the flag is set.
loop {
// Check the flag.
if flag.load(Ordering::SeqCst) {
break;
}
// Start listening for events.
let listener = event.listen();
// Check the flag again after creating the listener.
if flag.load(Ordering::SeqCst) {
break;
}
// Wait for a notification and continue the loop.
listener.wait();
}
```
## License
Licensed under either of
* Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
* MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
at your option.
#### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
dual licensed as above, without any additional terms or conditions.

178
examples/mutex.rs Normal file
View File

@ -0,0 +1,178 @@
//! A simple mutex implementation.
//!
//! This mutex exposes both blocking and async methods for acquiring a lock.
#![allow(dead_code)]
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::{Duration, Instant};
use event_listener::Event;
/// A simple mutex.
struct Mutex<T> {
/// Set to `true` when the mutex is locked.
locked: AtomicBool,
/// Blocked lock operations.
lock_ops: Event,
/// The inner protected data.
data: UnsafeCell<T>,
}
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}
impl<T> Mutex<T> {
/// Creates a mutex.
fn new(t: T) -> Mutex<T> {
Mutex {
locked: AtomicBool::new(false),
lock_ops: Event::new(),
data: UnsafeCell::new(t),
}
}
/// Attempts to acquire a lock.
fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
if self.locked.swap(true, Ordering::Acquire) {
Some(MutexGuard(self))
} else {
None
}
}
/// Blocks until a lock is acquired.
fn lock(&self) -> MutexGuard<'_, T> {
let mut listener = None;
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}
// Set up an event listener or wait for a notification.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
// Wait until a notification is received.
l.wait();
}
}
}
}
/// Blocks until a lock is acquired or the timeout is reached.
fn lock_timeout(&self, timeout: Duration) -> Option<MutexGuard<'_, T>> {
let deadline = Instant::now() + timeout;
let mut listener = None;
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return Some(guard);
}
// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
// Wait until a notification is received.
if !l.wait_deadline(deadline) {
return None;
}
}
}
}
}
/// Acquires a lock asynchronously.
async fn lock_async(&self) -> MutexGuard<'_, T> {
let mut listener = None;
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}
// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
// Wait until a notification is received.
l.await;
}
}
}
}
}
/// A guard holding a lock.
struct MutexGuard<'a, T>(&'a Mutex<T>);
unsafe impl<T: Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}
impl<T> Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.0.data.get() }
}
}
impl<T> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.0.data.get() }
}
}
fn main() {
const N: usize = 10;
// A shared counter.
let counter = Arc::new(Mutex::new(0));
// A channel that signals when all threads are done.
let (tx, rx) = mpsc::channel();
// Spawn a bunch of threads incrementing the counter.
for _ in 0..N {
let counter = counter.clone();
let tx = tx.clone();
thread::spawn(move || {
let mut counter = counter.lock();
*counter += 1;
// If this is the last increment, signal that we're done.
if *counter == N {
tx.send(()).unwrap();
}
});
}
// Wait until the last thread increments the counter.
rx.recv().unwrap();
// The counter must equal the number of threads.
assert_eq!(*counter.lock(), N);
println!("Done!");
}

722
src/lib.rs Normal file
View File

@ -0,0 +1,722 @@
//! Notify async tasks or threads.
//!
//! This is a synchronization primitive similar to [eventcounts] invented by Dmitry Vyukov.
//!
//! You can use this crate to turn non-blocking data structures into async or blocking data
//! structures. See a [simple mutex] implementation that exposes an async and a blocking interface
//! for acquiring locks.
//!
//! [eventcounts]: http://www.1024cores.net/home/lock-free-algorithms/eventcounts
//! [simple mutex]: TODO
//!
//! # Examples
//!
//! Wait until another thread sets a boolean flag:
//!
//! ```
//! use std::sync::atomic::{AtomicBool, Ordering};
//! use std::sync::Arc;
//! use std::thread;
//! use std::time::Duration;
//! use event_listener::Event;
//!
//! let flag = Arc::new(AtomicBool::new(false));
//! let event = Arc::new(Event::new());
//!
//! // Spawn a thread that will set the flag after 1 second.
//! thread::spawn({
//! let flag = flag.clone();
//! let event = event.clone();
//! move || {
//! // Wait for a second.
//! thread::sleep(Duration::from_secs(1));
//!
//! // Set the flag.
//! flag.store(true, Ordering::SeqCst);
//!
//! // Notify all listeners that the flag has been set.
//! event.notify_all();
//! }
//! });
//!
//! // Wait until the flag is set.
//! loop {
//! // Check the flag.
//! if flag.load(Ordering::SeqCst) {
//! break;
//! }
//!
//! // Start listening for events.
//! let listener = event.listen();
//!
//! // Check the flag again after creating the listener.
//! if flag.load(Ordering::SeqCst) {
//! break;
//! }
//!
//! // Wait for a notification and continue the loop.
//! listener.wait();
//! }
//! ```
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::cell::Cell;
use std::fmt;
use std::future::Future;
use std::mem::{self, ManuallyDrop};
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Context, Poll, Waker};
use std::thread::{self, Thread};
use std::time::{Duration, Instant};
/// A bit set inside [`Event`] when there is at least one listener that has already been notified.
const NOTIFIED: usize = 1 << 0;
/// A bit set inside [`Event`] when there is at least one notifiable listener.
const NOTIFIABLE: usize = 1 << 1;
/// Inner state of [`Event`].
struct Inner {
/// Holds bits [`NOTIFIED`] and [`NOTIFIABLE`].
flags: AtomicUsize,
/// A linked list holding registered listeners.
list: Mutex<List>,
}
impl Inner {
/// Locks the list.
fn lock(&self) -> ListGuard<'_> {
ListGuard {
inner: self,
guard: self.list.lock().unwrap(),
}
}
}
/// A synchronization primitive for notifying async tasks and threads.
///
/// Listeners can be registered using [`Event::listen()`]. There are two ways of notifying
/// listeners:
///
/// 1. [`Event::notify_one()`] notifies one listener.
/// 2. [`Event::notify_all()`] notifies all listeners.
///
/// If there are no active listeners at the time a notification is sent, it simply gets lost.
///
/// Note that [`Event::notify_one()`] does not notify one *additional* listener - it only makes
/// sure *at least* one listener among the active ones is notified.
///
/// There are two ways for a listener to wait for a notification:
///
/// 1. In an asynchronous manner using `.await`.
/// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
///
/// If a notified listener is dropped without receiving a notification, dropping will notify
/// another active listener.
///
/// Listeners are registered and notified in the first-in first-out fashion, ensuring fairness.
pub struct Event {
/// A pointer to heap-allocated inner state.
///
/// This pointer is initially null and gets lazily initialized on first use. Semantically, it
/// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the [`Arc`]'s
/// reference count.
inner: AtomicPtr<Inner>,
}
unsafe impl Send for Event {}
unsafe impl Sync for Event {}
impl Event {
/// Creates a new [`Event`].
///
/// # Examples
///
/// ```
/// use event_listener::Event;
///
/// let event = Event::new();
/// ```
#[inline]
pub fn new() -> Event {
Event {
inner: AtomicPtr::default(),
}
}
/// Returns a guard listening for a notification.
///
/// # Examples
///
/// ```
/// use event_listener::Event;
///
/// let event = Event::new();
/// let listener = event.listen();
/// ```
#[cold]
pub fn listen(&self) -> EventListener {
let inner = self.inner();
let listener = EventListener {
inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
entry: Some(inner.lock().insert()),
};
// Make sure the listener is registered before whatever happens next.
full_fence();
listener
}
/// Notifies a single active listener.
///
/// Note that this does not notify one *additional* listener - it only makes sure *at least*
/// one listener among the active ones is notified.
///
/// # Examples
///
/// ```
/// use event_listener::Event;
///
/// let event = Event::new();
///
/// // This notification gets lost because there are no listeners.
/// event.notify_one();
///
/// let listener1 = event.listen();
/// let listener2 = event.listen();
///
/// // Notifies just one of `listener1` and `listener2`.
/// //
/// // Listener queueing is fair, which means `listener1` gets notified
/// // here since it was the first to start listening.
/// event.notify_one();
/// ```
#[inline]
pub fn notify_one(&self) {
let inner = self.inner();
// Make sure the notification comes after whatever triggered it.
full_fence();
// Notify if no active listeners have been notified and there is at least one listener.
let flags = inner.flags.load(Ordering::Relaxed);
if flags & NOTIFIED == 0 && flags & NOTIFIABLE != 0 {
inner.lock().notify(false);
}
}
/// Notifies all active listeners.
///
/// # Examples
///
/// ```
/// use event_listener::Event;
///
/// let event = Event::new();
///
/// // This notification gets lost because there are no listeners.
/// event.notify_all();
///
/// let listener1 = event.listen();
/// let listener2 = event.listen();
///
/// // Both `listener1` and `listener2` get notified.
/// event.notify_all();
/// ```
#[inline]
pub fn notify_all(&self) {
let inner = self.inner();
// Make sure the notification comes after whatever triggered it.
full_fence();
// Notify if there is at least one listener.
if inner.flags.load(Ordering::Relaxed) & NOTIFIABLE != 0 {
inner.lock().notify(true);
}
}
/// Returns a reference to the inner state.
fn inner(&self) -> &Inner {
let mut inner = self.inner.load(Ordering::Acquire);
// Initialize the state if this is its first use.
if inner.is_null() {
// Allocate on the heap.
let new = Arc::new(Inner {
flags: AtomicUsize::new(0),
list: Mutex::new(List {
head: None,
tail: None,
len: 0,
notifiable: 0,
}),
});
// Convert the heap-allocated state into a raw pointer.
let new = Arc::into_raw(new) as *mut Inner;
// Attempt to replace the null-pointer with the new state pointer.
inner = self.inner.compare_and_swap(inner, new, Ordering::AcqRel);
// Check if the old pointer value was indeed null.
if inner.is_null() {
// If yes, then use the new state pointer.
inner = new;
} else {
// If not, that means a concurrent operation has initialized the state.
// In that case, use the old pointer and deallocate the new one.
unsafe {
drop(Arc::from_raw(new));
}
}
}
unsafe { &*inner }
}
}
impl Drop for Event {
#[inline]
fn drop(&mut self) {
let inner: *mut Inner = *self.inner.get_mut();
// If the state pointer has been initialized, deallocate it.
if !inner.is_null() {
unsafe {
drop(Arc::from_raw(inner));
}
}
}
}
impl fmt::Debug for Event {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Event { .. }")
}
}
impl Default for Event {
fn default() -> Event {
Event::new()
}
}
/// A guard waiting for a notification from an [`Event`].
///
/// There are two ways for a listener to wait for a notification:
///
/// 1. In an asynchronous manner using `.await`.
/// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
///
/// If a notified listener is dropped without receiving a notification, dropping will notify
/// another active listener.
pub struct EventListener {
/// A reference to [`Event`]'s inner state.
inner: Arc<Inner>,
/// A pointer to this listener's entry in the linked list.
entry: Option<NonNull<Entry>>,
}
unsafe impl Send for EventListener {}
unsafe impl Sync for EventListener {}
impl EventListener {
/// Blocks until a notification is received.
///
/// # Examples
///
/// ```
/// use event_listener::Event;
///
/// let event = Event::new();
/// let listener = event.listen();
///
/// // Notify `listener`.
/// event.notify_one();
///
/// // Receive the notification.
/// listener.wait();
/// ```
pub fn wait(self) {
self.wait_internal(None);
}
/// Blocks until a notification is received or a timeout is reached.
///
/// Returns `true` if a notification was received.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
/// use event_listener::Event;
///
/// let event = Event::new();
/// let listener = event.listen();
///
/// // There are no notification so this times out.
/// assert!(!listener.wait_timeout(Duration::from_secs(1)));
/// ```
pub fn wait_timeout(self, timeout: Duration) -> bool {
self.wait_internal(Some(Instant::now() + timeout))
}
/// Blocks until a notification is received or a deadline is reached.
///
/// Returns `true` if a notification was received.
///
/// # Examples
///
/// ```
/// use std::time::{Duration, Instant};
/// use event_listener::Event;
///
/// let event = Event::new();
/// let listener = event.listen();
///
/// // There are no notification so this times out.
/// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1)));
/// ```
pub fn wait_deadline(self, deadline: Instant) -> bool {
self.wait_internal(Some(deadline))
}
fn wait_internal(mut self, deadline: Option<Instant>) -> bool {
// Take out the entry pointer and set it to `None`.
let entry = match self.entry.take() {
None => unreachable!("cannot wait twice on an `EventListener`"),
Some(entry) => entry,
};
// Set this listener's state to `Waiting`.
{
let mut list = self.inner.lock();
let e = unsafe { entry.as_ref() };
// Do a dummy replace operation in order to take out the state.
match e.state.replace(State::Notified) {
State::Notified => {
// If this listener has been notified, remove it from the list and return.
list.remove(entry);
return true;
}
// Otherwise, set the state to `Waiting`.
_ => e.state.set(State::Waiting(thread::current())),
}
}
// Wait until a notification is received or the timeout is reached.
loop {
match deadline {
None => thread::park(),
Some(deadline) => {
// Check for timeout.
let now = Instant::now();
if now >= deadline {
return false;
}
// Park until the deadline.
thread::park_timeout(deadline - now);
}
}
let mut list = self.inner.lock();
let e = unsafe { entry.as_ref() };
// Do a dummy replace operation in order to take out the state.
match e.state.replace(State::Notified) {
State::Notified => {
// If this listener has been notified, remove it from the list and return.
list.remove(entry);
return true;
}
// Otherwise, set the state back to `Waiting`.
state => e.state.set(state),
}
}
}
}
impl fmt::Debug for EventListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("EventListener { .. }")
}
}
impl Future for EventListener {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut list = self.inner.lock();
let entry = match self.entry {
None => unreachable!("cannot poll a completed `EventListener` future"),
Some(entry) => entry,
};
let state = unsafe { &entry.as_ref().state };
// Do a dummy replace operation in order to take out the state.
match state.replace(State::Notified) {
State::Notified => {
// If this listener has been notified, remove it from the list and return.
list.remove(entry);
drop(list);
self.entry = None;
return Poll::Ready(());
}
State::Created => {
// If the listener was just created, put it in the `Polling` state.
state.set(State::Polling(cx.waker().clone()));
}
State::Polling(w) => {
// If the listener was in the `Pooling` state, keep it.
state.set(State::Polling(w));
}
State::Waiting(_) => {
unreachable!("cannot poll and wait on `EventListener` at the same time")
}
}
Poll::Pending
}
}
impl Drop for EventListener {
fn drop(&mut self) {
// If this listener has never picked up a notification...
if let Some(entry) = self.entry.take() {
let mut list = self.inner.lock();
// But if a notification was delivered to it...
if list.remove(entry).is_notified() {
// Then pass it on to another active listener.
list.notify(false);
}
}
}
}
/// A guard holding the linked list locked.
struct ListGuard<'a> {
/// A reference to [`Event`]'s inner state.
inner: &'a Inner,
/// The actual guard that acquired the linked list.
guard: MutexGuard<'a, List>,
}
impl Drop for ListGuard<'_> {
#[inline]
fn drop(&mut self) {
let list = &mut **self;
let mut flags = 0;
// Set the `NOTIFIED` flag if there is at least one notified listener.
if list.len - list.notifiable > 0 {
flags |= NOTIFIED;
}
// Set the `NOTIFIABLE` flag if there is at least one notifiable listener.
if list.notifiable > 0 {
flags |= NOTIFIABLE;
}
self.inner.flags.store(flags, Ordering::Release);
}
}
impl Deref for ListGuard<'_> {
type Target = List;
#[inline]
fn deref(&self) -> &List {
&*self.guard
}
}
impl DerefMut for ListGuard<'_> {
#[inline]
fn deref_mut(&mut self) -> &mut List {
&mut *self.guard
}
}
/// The state of a listener.
enum State {
/// It has just been created.
Created,
/// It has received a notification.
Notified,
/// An async task is polling it.
Polling(Waker),
/// A thread is blocked on it.
Waiting(Thread),
}
impl State {
/// Returns `true` if this is the `Notified` state.
#[inline]
fn is_notified(&self) -> bool {
match self {
State::Notified => true,
State::Created | State::Polling(_) | State::Waiting(_) => false,
}
}
}
/// An entry representing a registered listener.
struct Entry {
/// THe state of this listener.
state: Cell<State>,
/// Previous entry in the linked list.
prev: Cell<Option<NonNull<Entry>>>,
/// Next entry in the linked list.
next: Cell<Option<NonNull<Entry>>>,
}
/// A linked list of entries.
struct List {
/// First entry in the list.
head: Option<NonNull<Entry>>,
/// Last entry in the list.
tail: Option<NonNull<Entry>>,
/// Total number of entries in the list.
len: usize,
/// Number of notifiable entries in the list.
///
/// Notifiable entries are those that haven't been notified yet.
notifiable: usize,
}
impl List {
/// Inserts a new entry into the list.
fn insert(&mut self) -> NonNull<Entry> {
unsafe {
// Allocate an entry that is going to become the new tail.
let entry = NonNull::new_unchecked(Box::into_raw(Box::new(Entry {
state: Cell::new(State::Created),
prev: Cell::new(self.tail),
next: Cell::new(None),
})));
// Replace the tail with the new entry.
match mem::replace(&mut self.tail, Some(entry)) {
None => self.head = Some(entry),
Some(t) => t.as_ref().next.set(Some(entry)),
}
// Bump the total count and the count of notifiable entries.
self.len += 1;
self.notifiable += 1;
entry
}
}
/// Removes an entry from the list and returns its state.
fn remove(&mut self, entry: NonNull<Entry>) -> State {
unsafe {
let prev = entry.as_ref().prev.get();
let next = entry.as_ref().next.get();
// Unlink from the previous entry.
match prev {
None => self.head = next,
Some(p) => p.as_ref().next.set(next),
}
// Unlink from the next entry.
match next {
None => self.tail = prev,
Some(n) => n.as_ref().prev.set(prev),
}
// Deallocate and extract the state.
let entry = Box::from_raw(entry.as_ptr());
let state = entry.state.into_inner();
// Update the counters.
if !state.is_notified() {
self.notifiable -= 1;
}
self.len -= 1;
state
}
}
/// Notifies an entry.
#[cold]
fn notify(&mut self, notify_all: bool) {
let mut entry = self.tail;
// Iterate over the entries in the list.
while let Some(e) = entry {
let e = unsafe { e.as_ref() };
// Set the state of this entry to `Notified`.
let state = e.state.replace(State::Notified);
let is_notified = state.is_notified();
// Wake the task or unpark the thread.
match state {
State::Notified => {}
State::Created => {}
State::Polling(w) => w.wake(),
State::Waiting(t) => t.unpark(),
}
// Update the count of notifiable entries.
if !is_notified {
self.notifiable -= 1;
}
// If all entries need to be notified, go to the next one.
if notify_all {
entry = e.prev.get();
} else {
break;
}
}
}
}
/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
#[inline]
fn full_fence() {
if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
// HACK(stjepang): On x86 architectures there are two different ways of executing
// a `SeqCst` fence.
//
// 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
// 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg` instruction.
//
// Both instructions have the effect of a full barrier, but empirical benchmarks have shown
// that the second one is sometimes a bit faster.
//
// The ideal solution here would be to use inline assembly, but we're instead creating a
// temporary atomic variable and compare-and-exchanging its value. No sane compiler to
// x86 platforms is going to optimize this away.
let a = AtomicUsize::new(0);
a.compare_and_swap(0, 1, Ordering::SeqCst);
} else {
atomic::fence(Ordering::SeqCst);
}
}