feat: Add explicit web support

This commit adds WASM compilation support to this crate. The main thing
is that the wait() family of APIs are removed in WASM mode, as blocking
is not allowed in WASM.

In addition, tests are added to CI to support WASM.

Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
John Nunley 2023-08-26 12:53:39 -07:00 committed by GitHub
parent 85ca6d3feb
commit c278371cfb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 201 additions and 165 deletions

View File

@ -59,6 +59,11 @@ jobs:
run: cargo hack build --all --no-dev-deps
- run: cargo hack build --all --target thumbv7m-none-eabi --no-default-features --no-dev-deps
- run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps --features portable-atomic
- name: Install wasm-pack
uses: taiki-e/install-action@wasm-pack
- run: wasm-pack test --node
- run: wasm-pack test --node --no-default-features
- run: wasm-pack test --node --no-default-features --features portable-atomic
msrv:
runs-on: ubuntu-latest

View File

@ -21,10 +21,12 @@ portable-atomic = ["portable-atomic-util", "portable_atomic_crate"]
[dependencies]
concurrent-queue = { version = "2.2.0", default-features = false }
parking = { version = "2.0.0", optional = true }
pin-project-lite = "0.2.12"
portable-atomic-util = { version = "0.1.2", default-features = false, optional = true, features = ["alloc"] }
[target.'cfg(not(target_family = "wasm"))'.dependencies]
parking = { version = "2.0.0", optional = true }
[dependencies.portable_atomic_crate]
package = "portable-atomic"
version = "1.2.0"
@ -40,6 +42,9 @@ version = "0.4.0"
default-features = false
features = ["cargo_bench_support"]
[target.'cfg(target_family = "wasm")'.dev-dependencies]
wasm-bindgen-test = "0.3"
[[bench]]
name = "bench"
harness = false

View File

@ -2,182 +2,196 @@
//!
//! This mutex exposes both blocking and async methods for acquiring a lock.
#![allow(dead_code)]
#[cfg(not(target_family = "wasm"))]
mod example {
#![allow(dead_code)]
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::{Duration, Instant};
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::{Duration, Instant};
use event_listener::Event;
use event_listener::Event;
/// A simple mutex.
struct Mutex<T> {
/// Set to `true` when the mutex is locked.
locked: AtomicBool,
/// A simple mutex.
struct Mutex<T> {
/// Set to `true` when the mutex is locked.
locked: AtomicBool,
/// Blocked lock operations.
lock_ops: Event,
/// Blocked lock operations.
lock_ops: Event,
/// The inner protected data.
data: UnsafeCell<T>,
}
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}
impl<T> Mutex<T> {
/// Creates a mutex.
fn new(t: T) -> Mutex<T> {
Mutex {
locked: AtomicBool::new(false),
lock_ops: Event::new(),
data: UnsafeCell::new(t),
}
/// The inner protected data.
data: UnsafeCell<T>,
}
/// Attempts to acquire a lock.
fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
if !self.locked.swap(true, Ordering::Acquire) {
Some(MutexGuard(self))
} else {
None
}
}
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}
/// Blocks until a lock is acquired.
fn lock(&self) -> MutexGuard<'_, T> {
let mut listener = None;
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
impl<T> Mutex<T> {
/// Creates a mutex.
fn new(t: T) -> Mutex<T> {
Mutex {
locked: AtomicBool::new(false),
lock_ops: Event::new(),
data: UnsafeCell::new(t),
}
}
// Set up an event listener or wait for a notification.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
/// Attempts to acquire a lock.
fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
if !self.locked.swap(true, Ordering::Acquire) {
Some(MutexGuard(self))
} else {
None
}
}
/// Blocks until a lock is acquired.
fn lock(&self) -> MutexGuard<'_, T> {
let mut listener = None;
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}
Some(mut l) => {
// Wait until a notification is received.
l.as_mut().wait();
// Set up an event listener or wait for a notification.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(mut l) => {
// Wait until a notification is received.
l.as_mut().wait();
}
}
}
}
/// Blocks until a lock is acquired or the timeout is reached.
fn lock_timeout(&self, timeout: Duration) -> Option<MutexGuard<'_, T>> {
let deadline = Instant::now() + timeout;
let mut listener = None;
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return Some(guard);
}
// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(mut l) => {
// Wait until a notification is received.
l.as_mut().wait_deadline(deadline)?;
}
}
}
}
/// Acquires a lock asynchronously.
async fn lock_async(&self) -> MutexGuard<'_, T> {
let mut listener = None;
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}
// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
// Wait until a notification is received.
l.await;
}
}
}
}
}
/// Blocks until a lock is acquired or the timeout is reached.
fn lock_timeout(&self, timeout: Duration) -> Option<MutexGuard<'_, T>> {
let deadline = Instant::now() + timeout;
let mut listener = None;
/// A guard holding a lock.
struct MutexGuard<'a, T>(&'a Mutex<T>);
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return Some(guard);
}
unsafe impl<T: Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}
// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(mut l) => {
// Wait until a notification is received.
l.as_mut().wait_deadline(deadline)?;
}
}
impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
self.0.locked.store(false, Ordering::Release);
self.0.lock_ops.notify(1);
}
}
/// Acquires a lock asynchronously.
async fn lock_async(&self) -> MutexGuard<'_, T> {
let mut listener = None;
impl<T> Deref for MutexGuard<'_, T> {
type Target = T;
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}
// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
// Wait until a notification is received.
l.await;
}
}
fn deref(&self) -> &T {
unsafe { &*self.0.data.get() }
}
}
impl<T> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.0.data.get() }
}
}
pub(super) fn entry() {
const N: usize = 10;
// A shared counter.
let counter = Arc::new(Mutex::new(0));
// A channel that signals when all threads are done.
let (tx, rx) = mpsc::channel();
// Spawn a bunch of threads incrementing the counter.
for _ in 0..N {
let counter = counter.clone();
let tx = tx.clone();
thread::spawn(move || {
let mut counter = counter.lock();
*counter += 1;
// If this is the last increment, signal that we're done.
if *counter == N {
tx.send(()).unwrap();
}
});
}
// Wait until the last thread increments the counter.
rx.recv().unwrap();
// The counter must equal the number of threads.
assert_eq!(*counter.lock(), N);
println!("Done!");
}
}
/// A guard holding a lock.
struct MutexGuard<'a, T>(&'a Mutex<T>);
unsafe impl<T: Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}
impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
self.0.locked.store(false, Ordering::Release);
self.0.lock_ops.notify(1);
}
}
impl<T> Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.0.data.get() }
}
}
impl<T> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.0.data.get() }
#[cfg(target_family = "wasm")]
mod example {
pub(super) fn entry() {
println!("This example is not supported on wasm yet.");
}
}
fn main() {
const N: usize = 10;
// A shared counter.
let counter = Arc::new(Mutex::new(0));
// A channel that signals when all threads are done.
let (tx, rx) = mpsc::channel();
// Spawn a bunch of threads incrementing the counter.
for _ in 0..N {
let counter = counter.clone();
let tx = tx.clone();
thread::spawn(move || {
let mut counter = counter.lock();
*counter += 1;
// If this is the last increment, signal that we're done.
if *counter == N {
tx.send(()).unwrap();
}
});
}
// Wait until the last thread increments the counter.
rx.recv().unwrap();
// The counter must equal the number of threads.
assert_eq!(*counter.lock(), N);
println!("Done!");
example::entry();
}

View File

@ -94,10 +94,11 @@ use core::pin::Pin;
use core::ptr;
use core::task::{Context, Poll, Waker};
#[cfg(feature = "std")]
use parking::{Parker, Unparker};
#[cfg(feature = "std")]
use std::time::{Duration, Instant};
#[cfg(all(feature = "std", not(target_family = "wasm"),))]
use {
parking::{Parker, Unparker},
std::time::{Duration, Instant},
};
use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use sync::{Arc, WithMut};
@ -719,7 +720,7 @@ impl<T> EventListener<T> {
/// // Receive the notification.
/// listener.as_mut().wait();
/// ```
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(target_family = "wasm"),))]
pub fn wait(self: Pin<&mut Self>) -> T {
self.listener().wait_internal(None).unwrap()
}
@ -740,7 +741,7 @@ impl<T> EventListener<T> {
/// // There are no notification so this times out.
/// assert!(listener.as_mut().wait_timeout(Duration::from_secs(1)).is_none());
/// ```
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(target_family = "wasm"),))]
pub fn wait_timeout(self: Pin<&mut Self>, timeout: Duration) -> Option<T> {
self.listener()
.wait_internal(Instant::now().checked_add(timeout))
@ -762,7 +763,7 @@ impl<T> EventListener<T> {
/// // There are no notification so this times out.
/// assert!(listener.as_mut().wait_deadline(Instant::now() + Duration::from_secs(1)).is_none());
/// ```
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(target_family = "wasm"),))]
pub fn wait_deadline(self: Pin<&mut Self>, deadline: Instant) -> Option<T> {
self.listener().wait_internal(Some(deadline))
}
@ -881,7 +882,7 @@ impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
}
/// Wait until the provided deadline.
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(target_family = "wasm"),))]
fn wait_internal(mut self: Pin<&mut Self>, deadline: Option<Instant>) -> Option<T> {
use std::cell::RefCell;
@ -915,7 +916,7 @@ impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
}
/// Wait until the provided deadline using the specified parker/unparker pair.
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(target_family = "wasm"),))]
fn wait_with_parker(
self: Pin<&mut Self>,
deadline: Option<Instant>,
@ -1077,7 +1078,7 @@ enum Task {
Waker(Waker),
/// An unparker that wakes up a thread.
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(target_family = "wasm"),))]
Unparker(Unparker),
}
@ -1085,7 +1086,7 @@ impl Task {
fn as_task_ref(&self) -> TaskRef<'_> {
match self {
Self::Waker(waker) => TaskRef::Waker(waker),
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(target_family = "wasm"),))]
Self::Unparker(unparker) => TaskRef::Unparker(unparker),
}
}
@ -1093,7 +1094,7 @@ impl Task {
fn wake(self) {
match self {
Self::Waker(waker) => waker.wake(),
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(target_family = "wasm"),))]
Self::Unparker(unparker) => {
unparker.unpark();
}
@ -1114,7 +1115,7 @@ enum TaskRef<'a> {
Waker(&'a Waker),
/// An unparker that wakes up a thread.
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(target_family = "wasm"),))]
Unparker(&'a Unparker),
}
@ -1124,7 +1125,7 @@ impl TaskRef<'_> {
fn will_wake(self, other: Self) -> bool {
match (self, other) {
(Self::Waker(a), Self::Waker(b)) => a.will_wake(b),
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(target_family = "wasm"),))]
(Self::Unparker(_), Self::Unparker(_)) => {
// TODO: Use unreleased will_unpark API.
false
@ -1137,7 +1138,7 @@ impl TaskRef<'_> {
fn into_task(self) -> Task {
match self {
Self::Waker(waker) => Task::Waker(waker.clone()),
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(target_family = "wasm"),))]
Self::Unparker(unparker) => Task::Unparker(unparker.clone()),
}
}

View File

@ -829,6 +829,9 @@ mod tests {
use super::*;
use crate::Task;
#[cfg(target_family = "wasm")]
use wasm_bindgen_test::wasm_bindgen_test as test;
#[test]
fn smoke_mutex() {
let mutex = Mutex::new(0);

View File

@ -339,6 +339,9 @@ mod tests {
use super::*;
use futures_lite::pin;
#[cfg(target_family = "wasm")]
use wasm_bindgen_test::wasm_bindgen_test as test;
macro_rules! make_listeners {
($($id:ident),*) => {
$(

View File

@ -1,8 +1,10 @@
//! Testing of the `easy_wrapper!` macro.
use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy};
use std::{marker::PhantomData, pin::Pin, task::Poll};
use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy};
#[cfg(target_family = "wasm")]
use wasm_bindgen_test::wasm_bindgen_test as test;
#[test]
fn easy_wrapper_generics() {

View File

@ -7,6 +7,9 @@ use std::usize;
use event_listener::{Event, EventListener};
use waker_fn::waker_fn;
#[cfg(target_family = "wasm")]
use wasm_bindgen_test::wasm_bindgen_test as test;
fn is_notified(listener: Pin<&mut EventListener>) -> bool {
let waker = waker_fn(|| ());
listener.poll(&mut Context::from_waker(&waker)).is_ready()