feat: Add a loom implementation for event-listener

This commit is contained in:
Jacob Rothstein 2024-03-28 15:24:35 -07:00 committed by John Nunley
parent 58dbfc8bc5
commit f402b7e24c
6 changed files with 345 additions and 45 deletions

View File

@ -114,3 +114,14 @@ jobs:
- uses: rustsec/audit-check@master
with:
token: ${{ secrets.GITHUB_TOKEN }}
loom:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- name: Loom tests
run: RUSTFLAGS="--cfg=loom" cargo test --release --test loom --features loom

View File

@ -18,15 +18,19 @@ exclude = ["/.*"]
default = ["std"]
std = ["concurrent-queue/std", "parking"]
portable-atomic = ["portable-atomic-util", "portable_atomic_crate"]
loom = ["concurrent-queue/loom", "parking?/loom", "dep:loom"]
[dependencies]
concurrent-queue = { version = "2.2.0", default-features = false }
concurrent-queue = { version = "2.4.0", default-features = false }
pin-project-lite = "0.2.12"
portable-atomic-util = { version = "0.1.4", default-features = false, optional = true, features = ["alloc"] }
[target.'cfg(not(target_family = "wasm"))'.dependencies]
parking = { version = "2.0.0", optional = true }
[target.'cfg(loom)'.dependencies]
loom = { version = "0.7", optional = true }
[dependencies.portable_atomic_crate]
package = "portable-atomic"
version = "1.2.0"

View File

@ -105,7 +105,10 @@ use {
};
use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use sync::{Arc, WithMut};
use sync::Arc;
#[cfg(not(loom))]
use sync::WithMut;
use notify::{Internal, NotificationPrivate};
pub use notify::{IntoNotification, Notification};
@ -216,13 +219,20 @@ impl<T> Event<T> {
///
/// let event = Event::<usize>::with_tag();
/// ```
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(loom)))]
#[inline]
pub const fn with_tag() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}
#[cfg(all(feature = "std", loom))]
#[inline]
pub fn with_tag() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}
/// Tell whether any listeners are currently notified.
///
@ -543,12 +553,21 @@ impl Event<()> {
/// let event = Event::new();
/// ```
#[inline]
#[cfg(not(loom))]
pub const fn new() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}
#[inline]
#[cfg(loom)]
pub fn new() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}
/// Notifies a number of active listeners without emitting a `SeqCst` fence.
///
/// The number is allowed to be zero or exceed the current number of listeners.
@ -1119,6 +1138,12 @@ impl<T, B: Borrow<Inner<T>> + Unpin> InnerListener<T, B> {
match deadline {
None => parker.park(),
#[cfg(loom)]
Some(_deadline) => {
panic!("parking does not support timeouts under loom");
}
#[cfg(not(loom))]
Some(deadline) => {
// Make sure we're not timed out already.
let now = Instant::now();
@ -1330,10 +1355,9 @@ const NEVER_INSERTED_PANIC: &str = "\
EventListener was not inserted into the linked list, make sure you're not polling \
EventListener/listener! after it has finished";
#[cfg(not(loom))]
/// Synchronization primitive implementation.
mod sync {
pub(super) use core::cell;
#[cfg(not(feature = "portable-atomic"))]
pub(super) use alloc::sync::Arc;
#[cfg(not(feature = "portable-atomic"))]
@ -1344,7 +1368,7 @@ mod sync {
#[cfg(feature = "portable-atomic")]
pub(super) use portable_atomic_util::Arc;
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(loom)))]
pub(super) use std::sync::{Mutex, MutexGuard};
pub(super) trait WithMut {
@ -1366,6 +1390,51 @@ mod sync {
f(self.get_mut())
}
}
pub(crate) mod cell {
pub(crate) use core::cell::Cell;
/// This newtype around *mut T exists for interoperability with loom::cell::ConstPtr,
/// which works as a guard and performs additional logic to track access scope.
pub(crate) struct ConstPtr<T>(*mut T);
impl<T> ConstPtr<T> {
pub(crate) unsafe fn deref(&self) -> &T {
&*self.0
}
#[allow(unused)] // std code does not need this
pub(crate) unsafe fn deref_mut(&mut self) -> &mut T {
&mut *self.0
}
}
/// This UnsafeCell wrapper exists for interoperability with loom::cell::UnsafeCell, and
/// only contains the interface that is needed for this crate.
#[derive(Debug, Default)]
pub(crate) struct UnsafeCell<T>(core::cell::UnsafeCell<T>);
impl<T> UnsafeCell<T> {
pub(crate) fn new(data: T) -> UnsafeCell<T> {
UnsafeCell(core::cell::UnsafeCell::new(data))
}
pub(crate) fn get(&self) -> ConstPtr<T> {
ConstPtr(self.0.get())
}
#[allow(dead_code)] // no_std does not need this
pub(crate) fn into_inner(self) -> T {
self.0.into_inner()
}
}
}
}
#[cfg(loom)]
/// Synchronization primitive implementation.
mod sync {
pub(super) use loom::cell;
pub(super) use loom::sync::{atomic, Arc, Mutex, MutexGuard};
}
fn __test_send_and_sync() {

View File

@ -16,7 +16,7 @@ use node::{Node, NothingProducer, TaskWaiting};
use crate::notify::{GenericNotify, Internal, Notification};
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::cell::{Cell, UnsafeCell};
use crate::sync::cell::{Cell, ConstPtr, UnsafeCell};
use crate::sync::Arc;
use crate::{RegisterResult, State, Task, TaskRef};
@ -771,7 +771,10 @@ impl<T> Mutex<T> {
.is_ok()
{
// We have successfully locked the mutex.
Some(MutexGuard { mutex: self })
Some(MutexGuard {
mutex: self,
guard: self.value.get(),
})
} else {
self.try_lock_slow()
}
@ -790,7 +793,10 @@ impl<T> Mutex<T> {
.is_ok()
{
// We have successfully locked the mutex.
return Some(MutexGuard { mutex: self });
return Some(MutexGuard {
mutex: self,
guard: self.value.get(),
});
}
// Use atomic loads instead of compare-exchange.
@ -804,6 +810,7 @@ impl<T> Mutex<T> {
pub(crate) struct MutexGuard<'a, T> {
mutex: &'a Mutex<T>,
guard: ConstPtr<T>,
}
impl<'a, T> Drop for MutexGuard<'a, T> {
@ -816,13 +823,13 @@ impl<'a, T> ops::Deref for MutexGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.mutex.value.get() }
unsafe { self.guard.deref() }
}
}
impl<'a, T> ops::DerefMut for MutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.mutex.value.get() }
unsafe { self.guard.deref_mut() }
}
}

View File

@ -73,27 +73,27 @@ impl<T> crate::Inner<T> {
pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
let mut inner = self.lock();
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe {
listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(inner.tail),
next: Cell::new(None),
}),
_pin: PhantomPinned,
}));
let listener = listener.as_pin_mut().unwrap();
listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(inner.tail),
next: Cell::new(None),
}),
_pin: PhantomPinned,
}));
let listener = listener.as_pin_mut().unwrap();
// Get the inner pointer.
&*listener.link.get()
};
{
let entry_guard = listener.link.get();
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };
// Replace the tail with the new entry.
match mem::replace(&mut inner.tail, Some(entry.into())) {
None => inner.head = Some(entry.into()),
Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
};
// Replace the tail with the new entry.
match mem::replace(&mut inner.tail, Some(entry.into())) {
None => inner.head = Some(entry.into()),
Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
};
}
// If there are no unnotified entries, this is the first one.
if inner.next.is_none() {
@ -129,15 +129,12 @@ impl<T> crate::Inner<T> {
task: TaskRef<'_>,
) -> RegisterResult<T> {
let mut inner = self.lock();
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe {
let listener = match listener.as_mut().as_pin_mut() {
Some(listener) => listener,
None => return RegisterResult::NeverInserted,
};
&*listener.link.get()
let entry_guard = match listener.as_mut().as_pin_mut() {
Some(listener) => listener.link.get(),
None => return RegisterResult::NeverInserted,
};
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };
// Take out the state and check it.
match entry.state.replace(State::NotifiedTaken) {
@ -175,12 +172,8 @@ impl<T> Inner<T> {
mut listener: Pin<&mut Option<Listener<T>>>,
propagate: bool,
) -> Option<State<T>> {
let entry = unsafe {
let listener = listener.as_mut().as_pin_mut()?;
// Get the inner pointer.
&*listener.link.get()
};
let entry_guard = listener.as_mut().as_pin_mut()?.link.get();
let entry = unsafe { entry_guard.deref() };
let prev = entry.prev.get();
let next = entry.next.get();
@ -216,7 +209,11 @@ impl<T> Inner<T> {
.into_inner()
};
let mut state = entry.state.into_inner();
// This State::Created is immediately dropped and exists as a workaround for the absence of
// loom::cell::Cell::into_inner. The intent is `let mut state = entry.state.into_inner();`
//
// refs: https://github.com/tokio-rs/loom/pull/341
let mut state = entry.state.replace(State::Created);
// Update the notified count.
if state.is_notified() {

212
tests/loom.rs Normal file
View File

@ -0,0 +1,212 @@
#![cfg(loom)]
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::Context;
use std::usize;
use event_listener::{Event, EventListener};
use waker_fn::waker_fn;
#[cfg(target_family = "wasm")]
use wasm_bindgen_test::wasm_bindgen_test as test;
fn is_notified(listener: &mut EventListener) -> bool {
let waker = waker_fn(|| ());
Pin::new(listener)
.poll(&mut Context::from_waker(&waker))
.is_ready()
}
#[test]
fn notify() {
loom::model(|| {
let event = Event::new();
let mut l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
assert!(!is_notified(&mut l1));
assert!(!is_notified(&mut l2));
assert!(!is_notified(&mut l3));
assert_eq!(event.notify(2), 2);
assert_eq!(event.notify(1), 0);
assert!(is_notified(&mut l1));
assert!(is_notified(&mut l2));
assert!(!is_notified(&mut l3));
});
}
#[test]
fn notify_additional() {
loom::model(|| {
let event = Event::new();
let mut l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
assert_eq!(event.notify_additional(1), 1);
assert_eq!(event.notify(1), 0);
assert_eq!(event.notify_additional(1), 1);
assert!(is_notified(&mut l1));
assert!(is_notified(&mut l2));
assert!(!is_notified(&mut l3));
})
}
#[test]
fn notify_one() {
loom::model(|| {
let event = Event::new();
let mut l1 = event.listen();
let mut l2 = event.listen();
assert!(!is_notified(&mut l1));
assert!(!is_notified(&mut l2));
assert_eq!(event.notify(1), 1);
assert!(is_notified(&mut l1));
assert!(!is_notified(&mut l2));
assert_eq!(event.notify(1), 1);
assert!(is_notified(&mut l2));
});
}
#[test]
fn notify_all() {
loom::model(|| {
let event = Event::new();
let mut l1 = event.listen();
let mut l2 = event.listen();
assert!(!is_notified(&mut l1));
assert!(!is_notified(&mut l2));
assert_eq!(event.notify(usize::MAX), 2);
assert!(is_notified(&mut l1));
assert!(is_notified(&mut l2));
});
}
#[test]
fn drop_notified() {
loom::model(|| {
let event = Event::new();
let l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
assert_eq!(event.notify(1), 1);
drop(l1);
assert!(is_notified(&mut l2));
assert!(!is_notified(&mut l3));
});
}
#[test]
fn drop_notified2() {
loom::model(|| {
let event = Event::new();
let l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
assert_eq!(event.notify(2), 2);
drop(l1);
assert!(is_notified(&mut l2));
assert!(!is_notified(&mut l3));
});
}
#[test]
fn drop_notified_additional() {
loom::model(|| {
let event = Event::new();
let l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
let mut l4 = event.listen();
assert_eq!(event.notify_additional(1), 1);
assert_eq!(event.notify(2), 1);
drop(l1);
assert!(is_notified(&mut l2));
assert!(is_notified(&mut l3));
assert!(!is_notified(&mut l4));
});
}
#[test]
fn drop_non_notified() {
loom::model(|| {
let event = Event::new();
let mut l1 = event.listen();
let mut l2 = event.listen();
let l3 = event.listen();
assert_eq!(event.notify(1), 1);
drop(l3);
assert!(is_notified(&mut l1));
assert!(!is_notified(&mut l2));
})
}
#[test]
fn notify_all_fair() {
loom::model(|| {
let event = Event::new();
let v = Arc::new(Mutex::new(vec![]));
let mut l1 = event.listen();
let mut l2 = event.listen();
let mut l3 = event.listen();
let waker1 = {
let v = v.clone();
waker_fn(move || v.lock().unwrap().push(1))
};
let waker2 = {
let v = v.clone();
waker_fn(move || v.lock().unwrap().push(2))
};
let waker3 = {
let v = v.clone();
waker_fn(move || v.lock().unwrap().push(3))
};
assert!(Pin::new(&mut l1)
.poll(&mut Context::from_waker(&waker1))
.is_pending());
assert!(Pin::new(&mut l2)
.poll(&mut Context::from_waker(&waker2))
.is_pending());
assert!(Pin::new(&mut l3)
.poll(&mut Context::from_waker(&waker3))
.is_pending());
assert_eq!(event.notify(usize::MAX), 3);
assert_eq!(&*v.lock().unwrap(), &[1, 2, 3]);
assert!(Pin::new(&mut l1)
.poll(&mut Context::from_waker(&waker1))
.is_ready());
assert!(Pin::new(&mut l2)
.poll(&mut Context::from_waker(&waker2))
.is_ready());
assert!(Pin::new(&mut l3)
.poll(&mut Context::from_waker(&waker3))
.is_ready());
})
}