From fb080c2ff45f2b3fa60d49c83bc0b508d45babcc Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Mon, 21 Sep 2020 18:58:31 +0200 Subject: [PATCH] Merge crates into a single repo --- .github/workflows/build-and-test.yaml | 1 + .gitignore | 2 +- Cargo.toml | 9 + async-barrier/Cargo.toml | 21 + async-barrier/src/lib.rs | 139 +++++ async-barrier/tests/smoke.rs | 49 ++ async-mutex/Cargo.toml | 24 + async-mutex/benches/async-mutex.rs | 42 ++ async-mutex/benches/async-std.rs | 42 ++ async-mutex/benches/futures-intrusive.rs | 42 ++ async-mutex/benches/futures.rs | 42 ++ async-mutex/benches/tokio.rs | 42 ++ async-mutex/examples/fairness.rs | 38 ++ async-mutex/src/lib.rs | 455 ++++++++++++++++ async-mutex/tests/mutex.rs | 67 +++ async-rwlock/Cargo.toml | 22 + async-rwlock/src/lib.rs | 663 +++++++++++++++++++++++ async-rwlock/tests/std.rs | 153 ++++++ async-rwlock/tests/upgradable.rs | 93 ++++ async-semaphore/Cargo.toml | 19 + async-semaphore/src/lib.rs | 196 +++++++ async-semaphore/tests/semaphore.rs | 83 +++ 22 files changed, 2243 insertions(+), 1 deletion(-) create mode 100644 async-barrier/Cargo.toml create mode 100644 async-barrier/src/lib.rs create mode 100644 async-barrier/tests/smoke.rs create mode 100644 async-mutex/Cargo.toml create mode 100644 async-mutex/benches/async-mutex.rs create mode 100644 async-mutex/benches/async-std.rs create mode 100644 async-mutex/benches/futures-intrusive.rs create mode 100644 async-mutex/benches/futures.rs create mode 100644 async-mutex/benches/tokio.rs create mode 100644 async-mutex/examples/fairness.rs create mode 100644 async-mutex/src/lib.rs create mode 100644 async-mutex/tests/mutex.rs create mode 100644 async-rwlock/Cargo.toml create mode 100644 async-rwlock/src/lib.rs create mode 100644 async-rwlock/tests/std.rs create mode 100644 async-rwlock/tests/upgradable.rs create mode 100644 async-semaphore/Cargo.toml create mode 100644 async-semaphore/src/lib.rs create mode 100644 async-semaphore/tests/semaphore.rs diff --git a/.github/workflows/build-and-test.yaml b/.github/workflows/build-and-test.yaml index fd7598b..85c2b77 100644 --- a/.github/workflows/build-and-test.yaml +++ b/.github/workflows/build-and-test.yaml @@ -49,3 +49,4 @@ jobs: uses: actions-rs/cargo@v1 with: command: test + args: --all diff --git a/.gitignore b/.gitignore index 96ef6c0..2c96eb1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ -/target +target/ Cargo.lock diff --git a/Cargo.toml b/Cargo.toml index a4d4bdf..3b440ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,12 @@ async-barrier = "1.0.1" async-mutex = "1.3.0" async-rwlock = "1.1.0" async-semaphore = "1.1.0" + +[workspace] +members = [ + ".", + "async-barrier", + "async-mutex", + "async-rwlock", + "async-semaphore", +] diff --git a/async-barrier/Cargo.toml b/async-barrier/Cargo.toml new file mode 100644 index 0000000..e7ddb21 --- /dev/null +++ b/async-barrier/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "async-barrier" +version = "1.0.1" +authors = ["Stjepan Glavina "] +edition = "2018" +description = "An async barrier" +license = "Apache-2.0 OR MIT" +repository = "https://github.com/stjepang/async-lock" +homepage = "https://github.com/stjepang/async-lock" +documentation = "https://docs.rs/async-barrier" +keywords = ["rendezvous", "sync", "async", "synchronize", "synchronization"] +categories = ["asynchronous", "concurrency"] +readme = "../README.md" + +[dependencies] +async-mutex = "1.1.5" +event-listener = "2.4.0" + +[dev-dependencies] +async-channel = "1.4.1" +futures-lite = "1.0.0" diff --git a/async-barrier/src/lib.rs b/async-barrier/src/lib.rs new file mode 100644 index 0000000..bf5d2bb --- /dev/null +++ b/async-barrier/src/lib.rs @@ -0,0 +1,139 @@ +//! An async barrier. +//! +//! This crate is an async version of [`std::sync::Barrier`]. + +#![forbid(unsafe_code)] +#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] + +use async_mutex::Mutex; +use event_listener::Event; + +/// A counter to synchronize multiple tasks at the same time. +#[derive(Debug)] +pub struct Barrier { + n: usize, + state: Mutex, + event: Event, +} + +#[derive(Debug)] +struct State { + count: usize, + generation_id: u64, +} + +impl Barrier { + /// Creates a barrier that can block the given number of tasks. + /// + /// A barrier will block `n`-1 tasks which call [`wait()`] and then wake up all tasks + /// at once when the `n`th task calls [`wait()`]. + /// + /// [`wait()`]: `Barrier::wait()` + /// + /// # Examples + /// + /// ``` + /// use async_barrier::Barrier; + /// + /// let barrier = Barrier::new(5); + /// ``` + pub fn new(n: usize) -> Barrier { + Barrier { + n, + state: Mutex::new(State { + count: 0, + generation_id: 0, + }), + event: Event::new(), + } + } + + /// Blocks the current task until all tasks reach this point. + /// + /// Barriers are reusable after all tasks have synchronized, and can be used continuously. + /// + /// Returns a [`BarrierWaitResult`] indicating whether this task is the "leader", meaning the + /// last task to call this method. + /// + /// # Examples + /// + /// ``` + /// use async_barrier::Barrier; + /// use futures_lite::future; + /// use std::sync::Arc; + /// use std::thread; + /// + /// let barrier = Arc::new(Barrier::new(5)); + /// + /// for _ in 0..5 { + /// let b = barrier.clone(); + /// thread::spawn(move || { + /// future::block_on(async { + /// // The same messages will be printed together. + /// // There will NOT be interleaving of "before" and "after". + /// println!("before wait"); + /// b.wait().await; + /// println!("after wait"); + /// }); + /// }); + /// } + /// ``` + pub async fn wait(&self) -> BarrierWaitResult { + let mut state = self.state.lock().await; + let local_gen = state.generation_id; + state.count += 1; + + if state.count < self.n { + while local_gen == state.generation_id && state.count < self.n { + let listener = self.event.listen(); + drop(state); + listener.await; + state = self.state.lock().await; + } + BarrierWaitResult { is_leader: false } + } else { + state.count = 0; + state.generation_id = state.generation_id.wrapping_add(1); + self.event.notify(std::usize::MAX); + BarrierWaitResult { is_leader: true } + } + } +} + +/// Returned by [`Barrier::wait()`] when all tasks have called it. +/// +/// # Examples +/// +/// ``` +/// # futures_lite::future::block_on(async { +/// use async_barrier::Barrier; +/// +/// let barrier = Barrier::new(1); +/// let barrier_wait_result = barrier.wait().await; +/// # }); +/// ``` +#[derive(Debug, Clone)] +pub struct BarrierWaitResult { + is_leader: bool, +} + +impl BarrierWaitResult { + /// Returns `true` if this task was the last to call to [`Barrier::wait()`]. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_barrier::Barrier; + /// use futures_lite::future; + /// + /// let barrier = Barrier::new(2); + /// let (a, b) = future::zip(barrier.wait(), barrier.wait()).await; + /// assert_eq!(a.is_leader(), false); + /// assert_eq!(b.is_leader(), true); + /// # }); + /// ``` + pub fn is_leader(&self) -> bool { + self.is_leader + } +} diff --git a/async-barrier/tests/smoke.rs b/async-barrier/tests/smoke.rs new file mode 100644 index 0000000..f0a382b --- /dev/null +++ b/async-barrier/tests/smoke.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; +use std::thread; + +use async_barrier::Barrier; +use futures_lite::future; + +#[test] +fn smoke() { + future::block_on(async move { + const N: usize = 10; + + let barrier = Arc::new(Barrier::new(N)); + + for _ in 0..10 { + let (tx, rx) = async_channel::unbounded(); + + for _ in 0..N - 1 { + let c = barrier.clone(); + let tx = tx.clone(); + + thread::spawn(move || { + future::block_on(async move { + let res = c.wait().await; + tx.send(res.is_leader()).await.unwrap(); + }) + }); + } + + // At this point, all spawned threads should be blocked, + // so we shouldn't get anything from the cahnnel. + let res = rx.try_recv(); + assert!(match res { + Err(_err) => true, + _ => false, + }); + + let mut leader_found = barrier.wait().await.is_leader(); + + // Now, the barrier is cleared and we should get data. + for _ in 0..N - 1 { + if rx.recv().await.unwrap() { + assert!(!leader_found); + leader_found = true; + } + } + assert!(leader_found); + } + }); +} diff --git a/async-mutex/Cargo.toml b/async-mutex/Cargo.toml new file mode 100644 index 0000000..a028dbe --- /dev/null +++ b/async-mutex/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "async-mutex" +version = "1.3.0" +authors = ["Stjepan Glavina "] +edition = "2018" +description = "Async mutex" +license = "Apache-2.0 OR MIT" +repository = "https://github.com/stjepang/async-lock" +homepage = "https://github.com/stjepang/async-lock" +documentation = "https://docs.rs/async-mutex" +keywords = ["asynchronous", "mutex", "lock", "synchronization"] +categories = ["asynchronous", "concurrency"] +readme = "../README.md" + +[dependencies] +event-listener = "2.0.0" + +[dev-dependencies] +async-std = "1.6.2" +futures = "0.3.5" +futures-intrusive = "0.3.1" +futures-lite = "1.0.0" +smol = "0.1.18" +tokio = { version = "0.2.21", features = ["sync", "parking_lot"] } diff --git a/async-mutex/benches/async-mutex.rs b/async-mutex/benches/async-mutex.rs new file mode 100644 index 0000000..bfbbdc6 --- /dev/null +++ b/async-mutex/benches/async-mutex.rs @@ -0,0 +1,42 @@ +#![feature(test)] + +extern crate test; + +use std::sync::Arc; + +use async_mutex::Mutex; +use async_std::task; +use test::Bencher; + +#[bench] +fn create(b: &mut Bencher) { + b.iter(|| Mutex::new(())); +} + +#[bench] +fn contention(b: &mut Bencher) { + b.iter(|| task::block_on(run(10, 1000))); +} + +#[bench] +fn no_contention(b: &mut Bencher) { + b.iter(|| task::block_on(run(1, 10000))); +} + +async fn run(task: usize, iter: usize) { + let m = Arc::new(Mutex::new(())); + let mut tasks = Vec::new(); + + for _ in 0..task { + let m = m.clone(); + tasks.push(task::spawn(async move { + for _ in 0..iter { + let _ = m.lock().await; + } + })); + } + + for t in tasks { + t.await; + } +} diff --git a/async-mutex/benches/async-std.rs b/async-mutex/benches/async-std.rs new file mode 100644 index 0000000..b159ba1 --- /dev/null +++ b/async-mutex/benches/async-std.rs @@ -0,0 +1,42 @@ +#![feature(test)] + +extern crate test; + +use std::sync::Arc; + +use async_std::sync::Mutex; +use async_std::task; +use test::Bencher; + +#[bench] +fn create(b: &mut Bencher) { + b.iter(|| Mutex::new(())); +} + +#[bench] +fn contention(b: &mut Bencher) { + b.iter(|| task::block_on(run(10, 1000))); +} + +#[bench] +fn no_contention(b: &mut Bencher) { + b.iter(|| task::block_on(run(1, 10000))); +} + +async fn run(task: usize, iter: usize) { + let m = Arc::new(Mutex::new(())); + let mut tasks = Vec::new(); + + for _ in 0..task { + let m = m.clone(); + tasks.push(task::spawn(async move { + for _ in 0..iter { + let _ = m.lock().await; + } + })); + } + + for t in tasks { + t.await; + } +} diff --git a/async-mutex/benches/futures-intrusive.rs b/async-mutex/benches/futures-intrusive.rs new file mode 100644 index 0000000..a8f0155 --- /dev/null +++ b/async-mutex/benches/futures-intrusive.rs @@ -0,0 +1,42 @@ +#![feature(test)] + +extern crate test; + +use std::sync::Arc; + +use async_std::task; +use futures_intrusive::sync::Mutex; +use test::Bencher; + +#[bench] +fn create(b: &mut Bencher) { + b.iter(|| Mutex::new((), true)); +} + +#[bench] +fn contention(b: &mut Bencher) { + b.iter(|| task::block_on(run(10, 1000))); +} + +#[bench] +fn no_contention(b: &mut Bencher) { + b.iter(|| task::block_on(run(1, 10000))); +} + +async fn run(task: usize, iter: usize) { + let m = Arc::new(Mutex::new((), true)); + let mut tasks = Vec::new(); + + for _ in 0..task { + let m = m.clone(); + tasks.push(task::spawn(async move { + for _ in 0..iter { + let _ = m.lock().await; + } + })); + } + + for t in tasks { + t.await; + } +} diff --git a/async-mutex/benches/futures.rs b/async-mutex/benches/futures.rs new file mode 100644 index 0000000..effd2a3 --- /dev/null +++ b/async-mutex/benches/futures.rs @@ -0,0 +1,42 @@ +#![feature(test)] + +extern crate test; + +use std::sync::Arc; + +use async_std::task; +use futures::lock::Mutex; +use test::Bencher; + +#[bench] +fn create(b: &mut Bencher) { + b.iter(|| Mutex::new(())); +} + +#[bench] +fn contention(b: &mut Bencher) { + b.iter(|| task::block_on(run(10, 1000))); +} + +#[bench] +fn no_contention(b: &mut Bencher) { + b.iter(|| task::block_on(run(1, 10000))); +} + +async fn run(task: usize, iter: usize) { + let m = Arc::new(Mutex::new(())); + let mut tasks = Vec::new(); + + for _ in 0..task { + let m = m.clone(); + tasks.push(task::spawn(async move { + for _ in 0..iter { + let _ = m.lock().await; + } + })); + } + + for t in tasks { + t.await; + } +} diff --git a/async-mutex/benches/tokio.rs b/async-mutex/benches/tokio.rs new file mode 100644 index 0000000..ffc182a --- /dev/null +++ b/async-mutex/benches/tokio.rs @@ -0,0 +1,42 @@ +#![feature(test)] + +extern crate test; + +use std::sync::Arc; + +use async_std::task; +use test::Bencher; +use tokio::sync::Mutex; + +#[bench] +fn create(b: &mut Bencher) { + b.iter(|| Mutex::new(())); +} + +#[bench] +fn contention(b: &mut Bencher) { + b.iter(|| task::block_on(run(10, 1000))); +} + +#[bench] +fn no_contention(b: &mut Bencher) { + b.iter(|| task::block_on(run(1, 10000))); +} + +async fn run(task: usize, iter: usize) { + let m = Arc::new(Mutex::new(())); + let mut tasks = Vec::new(); + + for _ in 0..task { + let m = m.clone(); + tasks.push(task::spawn(async move { + for _ in 0..iter { + let _ = m.lock().await; + } + })); + } + + for t in tasks { + t.await; + } +} diff --git a/async-mutex/examples/fairness.rs b/async-mutex/examples/fairness.rs new file mode 100644 index 0000000..6f0d9a4 --- /dev/null +++ b/async-mutex/examples/fairness.rs @@ -0,0 +1,38 @@ +//! Demonstrates fairness properties of the mutex. +//! +//! A number of threads run a loop in which they hold the lock for a little bit and re-acquire it +//! immediately after. In the end we print the number of times each thread acquired the lock. + +use std::sync::Arc; +use std::thread; +use std::time::{Duration, Instant}; + +use async_mutex::Mutex; +use smol::Timer; + +fn main() { + let num_threads = 30; + let mut threads = Vec::new(); + let hits = Arc::new(Mutex::new(vec![0; num_threads])); + + for i in 0..num_threads { + let hits = hits.clone(); + threads.push(thread::spawn(move || { + smol::run(async { + let start = Instant::now(); + + while start.elapsed() < Duration::from_secs(1) { + let mut hits = hits.lock().await; + hits[i] += 1; + Timer::after(Duration::from_micros(5000)).await; + } + }) + })); + } + + for t in threads { + t.join().unwrap(); + } + + dbg!(hits); +} diff --git a/async-mutex/src/lib.rs b/async-mutex/src/lib.rs new file mode 100644 index 0000000..b97e9a4 --- /dev/null +++ b/async-mutex/src/lib.rs @@ -0,0 +1,455 @@ +//! An async mutex. +//! +//! The locking mechanism uses eventual fairness to ensure locking will be fair on average without +//! sacrificing performance. This is done by forcing a fair lock whenever a lock operation is +//! starved for longer than 0.5 milliseconds. +//! +//! # Examples +//! +//! ``` +//! # futures_lite::future::block_on(async { +//! use async_mutex::Mutex; +//! +//! let m = Mutex::new(1); +//! +//! let mut guard = m.lock().await; +//! *guard = 2; +//! +//! assert!(m.try_lock().is_none()); +//! drop(guard); +//! assert_eq!(*m.try_lock().unwrap(), 2); +//! # }) +//! ``` + +#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] + +use std::cell::UnsafeCell; +use std::fmt; +use std::ops::{Deref, DerefMut}; +use std::process; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use std::usize; + +use event_listener::Event; + +/// An async mutex. +pub struct Mutex { + /// Current state of the mutex. + /// + /// The least significant bit is set to 1 if the mutex is locked. + /// The other bits hold the number of starved lock operations. + state: AtomicUsize, + + /// Lock operations waiting for the mutex to be released. + lock_ops: Event, + + /// The value inside the mutex. + data: UnsafeCell, +} + +unsafe impl Send for Mutex {} +unsafe impl Sync for Mutex {} + +impl Mutex { + /// Creates a new async mutex. + /// + /// # Examples + /// + /// ``` + /// use async_mutex::Mutex; + /// + /// let mutex = Mutex::new(0); + /// ``` + pub fn new(data: T) -> Mutex { + Mutex { + state: AtomicUsize::new(0), + lock_ops: Event::new(), + data: UnsafeCell::new(data), + } + } + + /// Consumes the mutex, returning the underlying data. + /// + /// # Examples + /// + /// ``` + /// use async_mutex::Mutex; + /// + /// let mutex = Mutex::new(10); + /// assert_eq!(mutex.into_inner(), 10); + /// ``` + pub fn into_inner(self) -> T { + self.data.into_inner() + } +} + +impl Mutex { + /// Acquires the mutex. + /// + /// Returns a guard that releases the mutex when dropped. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_mutex::Mutex; + /// + /// let mutex = Mutex::new(10); + /// let guard = mutex.lock().await; + /// assert_eq!(*guard, 10); + /// # }) + /// ``` + #[inline] + pub async fn lock(&self) -> MutexGuard<'_, T> { + if let Some(guard) = self.try_lock() { + return guard; + } + self.acquire_slow().await; + MutexGuard(self) + } + + /// Slow path for acquiring the mutex. + #[cold] + async fn acquire_slow(&self) { + // Get the current time. + let start = Instant::now(); + + loop { + // Start listening for events. + let listener = self.lock_ops.listen(); + + // Try locking if nobody is being starved. + match self.state.compare_and_swap(0, 1, Ordering::Acquire) { + // Lock acquired! + 0 => return, + + // Lock is held and nobody is starved. + 1 => {} + + // Somebody is starved. + _ => break, + } + + // Wait for a notification. + listener.await; + + // Try locking if nobody is being starved. + match self.state.compare_and_swap(0, 1, Ordering::Acquire) { + // Lock acquired! + 0 => return, + + // Lock is held and nobody is starved. + 1 => {} + + // Somebody is starved. + _ => { + // Notify the first listener in line because we probably received a + // notification that was meant for a starved task. + self.lock_ops.notify(1); + break; + } + } + + // If waiting for too long, fall back to a fairer locking strategy that will prevent + // newer lock operations from starving us forever. + if start.elapsed() > Duration::from_micros(500) { + break; + } + } + + // Increment the number of starved lock operations. + if self.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 { + // In case of potential overflow, abort. + process::abort(); + } + + // Decrement the counter when exiting this function. + let _call = CallOnDrop(|| { + self.state.fetch_sub(2, Ordering::Release); + }); + + loop { + // Start listening for events. + let listener = self.lock_ops.listen(); + + // Try locking if nobody else is being starved. + match self.state.compare_and_swap(2, 2 | 1, Ordering::Acquire) { + // Lock acquired! + 2 => return, + + // Lock is held by someone. + s if s % 2 == 1 => {} + + // Lock is available. + _ => { + // Be fair: notify the first listener and then go wait in line. + self.lock_ops.notify(1); + } + } + + // Wait for a notification. + listener.await; + + // Try acquiring the lock without waiting for others. + if self.state.fetch_or(1, Ordering::Acquire) % 2 == 0 { + return; + } + } + } + + /// Attempts to acquire the mutex. + /// + /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a + /// guard is returned that releases the mutex when dropped. + /// + /// # Examples + /// + /// ``` + /// use async_mutex::Mutex; + /// + /// let mutex = Mutex::new(10); + /// if let Some(guard) = mutex.try_lock() { + /// assert_eq!(*guard, 10); + /// } + /// # ; + /// ``` + #[inline] + pub fn try_lock(&self) -> Option> { + if self.state.compare_and_swap(0, 1, Ordering::Acquire) == 0 { + Some(MutexGuard(self)) + } else { + None + } + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the mutex mutably, no actual locking takes place -- the mutable + /// borrow statically guarantees the mutex is not already acquired. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_mutex::Mutex; + /// + /// let mut mutex = Mutex::new(0); + /// *mutex.get_mut() = 10; + /// assert_eq!(*mutex.lock().await, 10); + /// # }) + /// ``` + pub fn get_mut(&mut self) -> &mut T { + unsafe { &mut *self.data.get() } + } +} + +impl Mutex { + /// Acquires the mutex and clones a reference to it. + /// + /// Returns an owned guard that releases the mutex when dropped. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_mutex::Mutex; + /// use std::sync::Arc; + /// + /// let mutex = Arc::new(Mutex::new(10)); + /// let guard = mutex.lock_arc().await; + /// assert_eq!(*guard, 10); + /// # }) + /// ``` + #[inline] + pub async fn lock_arc(self: &Arc) -> MutexGuardArc { + if let Some(guard) = self.try_lock_arc() { + return guard; + } + self.acquire_slow().await; + MutexGuardArc(self.clone()) + } + + /// Attempts to acquire the mutex and clone a reference to it. + /// + /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, an + /// owned guard is returned that releases the mutex when dropped. + /// + /// # Examples + /// + /// ``` + /// use async_mutex::Mutex; + /// use std::sync::Arc; + /// + /// let mutex = Arc::new(Mutex::new(10)); + /// if let Some(guard) = mutex.try_lock() { + /// assert_eq!(*guard, 10); + /// } + /// # ; + /// ``` + #[inline] + pub fn try_lock_arc(self: &Arc) -> Option> { + if self.state.compare_and_swap(0, 1, Ordering::Acquire) == 0 { + Some(MutexGuardArc(self.clone())) + } else { + None + } + } +} + +impl fmt::Debug for Mutex { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + struct Locked; + impl fmt::Debug for Locked { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } + } + + match self.try_lock() { + None => f.debug_struct("Mutex").field("data", &Locked).finish(), + Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(), + } + } +} + +impl From for Mutex { + fn from(val: T) -> Mutex { + Mutex::new(val) + } +} + +impl Default for Mutex { + fn default() -> Mutex { + Mutex::new(Default::default()) + } +} + +/// A guard that releases the mutex when dropped. +pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex); + +unsafe impl Send for MutexGuard<'_, T> {} +unsafe impl Sync for MutexGuard<'_, T> {} + +impl<'a, T: ?Sized> MutexGuard<'a, T> { + /// Returns a reference to the mutex a guard came from. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_mutex::{Mutex, MutexGuard}; + /// + /// let mutex = Mutex::new(10i32); + /// let guard = mutex.lock().await; + /// dbg!(MutexGuard::source(&guard)); + /// # }) + /// ``` + pub fn source(guard: &MutexGuard<'a, T>) -> &'a Mutex { + guard.0 + } +} + +impl Drop for MutexGuard<'_, T> { + fn drop(&mut self) { + // Remove the last bit and notify a waiting lock operation. + self.0.state.fetch_sub(1, Ordering::Release); + self.0.lock_ops.notify(1); + } +} + +impl fmt::Debug for MutexGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl fmt::Display for MutexGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +impl Deref for MutexGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.0.data.get() } + } +} + +impl DerefMut for MutexGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.0.data.get() } + } +} + +/// An owned guard that releases the mutex when dropped. +pub struct MutexGuardArc(Arc>); + +unsafe impl Send for MutexGuardArc {} +unsafe impl Sync for MutexGuardArc {} + +impl MutexGuardArc { + /// Returns a reference to the mutex a guard came from. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_mutex::{Mutex, MutexGuardArc}; + /// use std::sync::Arc; + /// + /// let mutex = Arc::new(Mutex::new(10i32)); + /// let guard = mutex.lock_arc().await; + /// dbg!(MutexGuardArc::source(&guard)); + /// # }) + /// ``` + pub fn source(guard: &MutexGuardArc) -> &Arc> { + &guard.0 + } +} + +impl Drop for MutexGuardArc { + fn drop(&mut self) { + // Remove the last bit and notify a waiting lock operation. + self.0.state.fetch_sub(1, Ordering::Release); + self.0.lock_ops.notify(1); + } +} + +impl fmt::Debug for MutexGuardArc { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl fmt::Display for MutexGuardArc { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +impl Deref for MutexGuardArc { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.0.data.get() } + } +} + +impl DerefMut for MutexGuardArc { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.0.data.get() } + } +} + +/// Calls a function when dropped. +struct CallOnDrop(F); + +impl Drop for CallOnDrop { + fn drop(&mut self) { + (self.0)(); + } +} diff --git a/async-mutex/tests/mutex.rs b/async-mutex/tests/mutex.rs new file mode 100644 index 0000000..1e5e787 --- /dev/null +++ b/async-mutex/tests/mutex.rs @@ -0,0 +1,67 @@ +use std::sync::Arc; +use std::thread; + +use async_mutex::Mutex; +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::prelude::*; + +#[test] +fn smoke() { + block_on(async { + let m = Mutex::new(()); + drop(m.lock().await); + drop(m.lock().await); + }) +} + +#[test] +fn try_lock() { + let m = Mutex::new(()); + *m.try_lock().unwrap() = (); +} + +#[test] +fn into_inner() { + let m = Mutex::new(10i32); + assert_eq!(m.into_inner(), 10); +} + +#[test] +fn get_mut() { + let mut m = Mutex::new(10i32); + *m.get_mut() = 20; + assert_eq!(m.into_inner(), 20); +} + +#[test] +fn contention() { + block_on(async { + let (tx, mut rx) = mpsc::unbounded(); + + let tx = Arc::new(tx); + let mutex = Arc::new(Mutex::new(0i32)); + let num_tasks = 100; + + for _ in 0..num_tasks { + let tx = tx.clone(); + let mutex = mutex.clone(); + + thread::spawn(|| { + block_on(async move { + let mut lock = mutex.lock().await; + *lock += 1; + tx.unbounded_send(()).unwrap(); + drop(lock); + }) + }); + } + + for _ in 0..num_tasks { + rx.next().await.unwrap(); + } + + let lock = mutex.lock().await; + assert_eq!(num_tasks, *lock); + }); +} diff --git a/async-rwlock/Cargo.toml b/async-rwlock/Cargo.toml new file mode 100644 index 0000000..211bfa9 --- /dev/null +++ b/async-rwlock/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "async-rwlock" +version = "1.2.1" +authors = ["Stjepan Glavina "] +edition = "2018" +description = "Async reader-writer lock" +license = "Apache-2.0 OR MIT" +repository = "https://github.com/stjepang/async-lock" +homepage = "https://github.com/stjepang/async-lock" +documentation = "https://docs.rs/async-rwlock" +keywords = ["asynchronous", "rwlock", "mutex", "lock", "synchronization"] +categories = ["asynchronous", "concurrency"] +readme = "../README.md" + +[dependencies] +async-mutex = "1.1.5" +event-listener = "2.4.0" + +[dev-dependencies] +async-channel = "1.4.1" +fastrand = "1.3.4" +futures-lite = "1.0.0" diff --git a/async-rwlock/src/lib.rs b/async-rwlock/src/lib.rs new file mode 100644 index 0000000..8258a33 --- /dev/null +++ b/async-rwlock/src/lib.rs @@ -0,0 +1,663 @@ +//! An async reader-writer lock. +//! +//! This type of lock allows multiple readers or one writer at any point in time. +//! +//! The locking strategy is write-preferring, which means writers are never starved. +//! Releasing a write lock wakes the next blocked reader and the next blocked writer. +//! +//! # Examples +//! +//! ``` +//! # futures_lite::future::block_on(async { +//! use async_rwlock::RwLock; +//! +//! let lock = RwLock::new(5); +//! +//! // Multiple read locks can be held at a time. +//! let r1 = lock.read().await; +//! let r2 = lock.read().await; +//! assert_eq!(*r1, 5); +//! assert_eq!(*r2, 5); +//! drop((r1, r2)); +//! +//! // Only one write lock can be held at a time. +//! let mut w = lock.write().await; +//! *w += 1; +//! assert_eq!(*w, 6); +//! # }) +//! ``` + +#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] + +use std::cell::UnsafeCell; +use std::fmt; +use std::mem; +use std::ops::{Deref, DerefMut}; +use std::process; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use async_mutex::{Mutex, MutexGuard}; +use event_listener::Event; + +const WRITER_BIT: usize = 1; +const ONE_READER: usize = 2; + +/// An async reader-writer lock. +/// +/// # Examples +/// +/// ``` +/// # futures_lite::future::block_on(async { +/// use async_rwlock::RwLock; +/// +/// let lock = RwLock::new(5); +/// +/// // Multiple read locks can be held at a time. +/// let r1 = lock.read().await; +/// let r2 = lock.read().await; +/// assert_eq!(*r1, 5); +/// assert_eq!(*r2, 5); +/// drop((r1, r2)); +/// +/// // Only one write locks can be held at a time. +/// let mut w = lock.write().await; +/// *w += 1; +/// assert_eq!(*w, 6); +/// # }) +/// ``` +pub struct RwLock { + /// Acquired by the writer. + mutex: Mutex<()>, + + /// Event triggered when the last reader is dropped. + no_readers: Event, + + /// Event triggered when the writer is dropped. + no_writer: Event, + + /// Current state of the lock. + /// + /// The least significant bit (`WRITER_BIT`) is set to 1 when a writer is holding the lock or + /// trying to acquire it. + /// + /// The upper bits contain the number of currently active readers. Each active reader + /// increments the state by `ONE_READER`. + state: AtomicUsize, + + /// The inner value. + value: UnsafeCell, +} + +unsafe impl Send for RwLock {} +unsafe impl Sync for RwLock {} + +impl RwLock { + /// Creates a new reader-writer lock. + /// + /// # Examples + /// + /// ``` + /// use async_rwlock::RwLock; + /// + /// let lock = RwLock::new(0); + /// ``` + pub fn new(t: T) -> RwLock { + RwLock { + mutex: Mutex::new(()), + no_readers: Event::new(), + no_writer: Event::new(), + state: AtomicUsize::new(0), + value: UnsafeCell::new(t), + } + } + + /// Unwraps the lock and returns the inner value. + /// + /// # Examples + /// + /// ``` + /// use async_rwlock::RwLock; + /// + /// let lock = RwLock::new(5); + /// assert_eq!(lock.into_inner(), 5); + /// ``` + pub fn into_inner(self) -> T { + self.value.into_inner() + } +} + +impl RwLock { + /// Attempts to acquire a read lock. + /// + /// If a read lock could not be acquired at this time, then [`None`] is returned. Otherwise, a + /// guard is returned that releases the lock when dropped. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_rwlock::RwLock; + /// + /// let lock = RwLock::new(1); + /// + /// let reader = lock.read().await; + /// assert_eq!(*reader, 1); + /// + /// assert!(lock.try_read().is_some()); + /// # }) + /// ``` + pub fn try_read(&self) -> Option> { + let mut state = self.state.load(Ordering::Acquire); + + loop { + // If there's a writer holding the lock or attempting to acquire it, we cannot acquire + // a read lock here. + if state & WRITER_BIT != 0 { + return None; + } + + // Make sure the number of readers doesn't overflow. + if state > std::isize::MAX as usize { + process::abort(); + } + + // Increment the number of readers. + match self.state.compare_exchange( + state, + state + ONE_READER, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => return Some(RwLockReadGuard(self)), + Err(s) => state = s, + } + } + } + + /// Acquires a read lock. + /// + /// Returns a guard that releases the lock when dropped. + /// + /// Note that attempts to acquire a read lock will block if there are also concurrent attempts + /// to acquire a write lock. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_rwlock::RwLock; + /// + /// let lock = RwLock::new(1); + /// + /// let reader = lock.read().await; + /// assert_eq!(*reader, 1); + /// + /// assert!(lock.try_read().is_some()); + /// # }) + /// ``` + pub async fn read(&self) -> RwLockReadGuard<'_, T> { + let mut state = self.state.load(Ordering::Acquire); + + loop { + if state & WRITER_BIT == 0 { + // Make sure the number of readers doesn't overflow. + if state > std::isize::MAX as usize { + process::abort(); + } + + // If nobody is holding a write lock or attempting to acquire it, increment the + // number of readers. + match self.state.compare_exchange( + state, + state + ONE_READER, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => return RwLockReadGuard(self), + Err(s) => state = s, + } + } else { + // Start listening for "no writer" events. + let listener = self.no_writer.listen(); + + // Check again if there's a writer. + if self.state.load(Ordering::SeqCst) & WRITER_BIT != 0 { + // Wait until the writer is dropped. + listener.await; + // Notify the next reader waiting in line. + self.no_writer.notify(1); + } + + // Reload the state. + state = self.state.load(Ordering::Acquire); + } + } + } + + /// Attempts to acquire a read lock with the possiblity to upgrade to a write lock. + /// + /// If a read lock could not be acquired at this time, then [`None`] is returned. Otherwise, a + /// guard is returned that releases the lock when dropped. + /// + /// Upgradable read lock reserves the right to be upgraded to a write lock, which means there + /// can be at most one upgradable read lock at a time. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_rwlock::{RwLock, RwLockUpgradableReadGuard}; + /// + /// let lock = RwLock::new(1); + /// + /// let reader = lock.upgradable_read().await; + /// assert_eq!(*reader, 1); + /// assert_eq!(*lock.try_read().unwrap(), 1); + /// + /// let mut writer = RwLockUpgradableReadGuard::upgrade(reader).await; + /// *writer = 2; + /// # }) + /// ``` + #[inline] + pub fn try_upgradable_read(&self) -> Option> { + // First try grabbing the mutex. + let lock = self.mutex.try_lock()?; + + let mut state = self.state.load(Ordering::Acquire); + + // Make sure the number of readers doesn't overflow. + if state > std::isize::MAX as usize { + process::abort(); + } + + // Increment the number of readers. + loop { + match self.state.compare_exchange( + state, + state + ONE_READER, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + return Some(RwLockUpgradableReadGuard { + reader: RwLockReadGuard(self), + reserved: lock, + }) + } + Err(s) => state = s, + } + } + } + + /// Attempts to acquire a read lock with the possiblity to upgrade to a write lock. + /// + /// Returns a guard that releases the lock when dropped. + /// + /// Upgradable read lock reserves the right to be upgraded to a write lock, which means there + /// can be at most one upgradable read lock at a time. + /// + /// Note that attempts to acquire an upgradable read lock will block if there are concurrent + /// attempts to acquire another upgradable read lock or a write lock. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_rwlock::{RwLock, RwLockUpgradableReadGuard}; + /// + /// let lock = RwLock::new(1); + /// + /// let reader = lock.upgradable_read().await; + /// assert_eq!(*reader, 1); + /// assert_eq!(*lock.try_read().unwrap(), 1); + /// + /// let mut writer = RwLockUpgradableReadGuard::upgrade(reader).await; + /// *writer = 2; + /// # }) + /// ``` + pub async fn upgradable_read(&self) -> RwLockUpgradableReadGuard<'_, T> { + // First grab the mutex. + let lock = self.mutex.lock().await; + + let mut state = self.state.load(Ordering::Acquire); + + // Make sure the number of readers doesn't overflow. + if state > std::isize::MAX as usize { + process::abort(); + } + + // Increment the number of readers. + loop { + match self.state.compare_exchange( + state, + state + ONE_READER, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + return RwLockUpgradableReadGuard { + reader: RwLockReadGuard(self), + reserved: lock, + } + } + Err(s) => state = s, + } + } + } + + /// Attempts to acquire a write lock. + /// + /// If a write lock could not be acquired at this time, then [`None`] is returned. Otherwise, a + /// guard is returned that releases the lock when dropped. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_rwlock::RwLock; + /// + /// let lock = RwLock::new(1); + /// + /// assert!(lock.try_write().is_some()); + /// let reader = lock.read().await; + /// assert!(lock.try_write().is_none()); + /// # }) + /// ``` + pub fn try_write(&self) -> Option> { + // First try grabbing the mutex. + let lock = self.mutex.try_lock()?; + + // If there are no readers, grab the write lock. + if self.state.compare_and_swap(0, WRITER_BIT, Ordering::AcqRel) == 0 { + Some(RwLockWriteGuard(self, lock)) + } else { + None + } + } + + /// Acquires a write lock. + /// + /// Returns a guard that releases the lock when dropped. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_rwlock::RwLock; + /// + /// let lock = RwLock::new(1); + /// + /// let writer = lock.write().await; + /// assert!(lock.try_read().is_none()); + /// # }) + /// ``` + pub async fn write(&self) -> RwLockWriteGuard<'_, T> { + // First grab the mutex. + let lock = self.mutex.lock().await; + + // Set `WRITER_BIT` and create a guard that unsets it in case this future is canceled. + self.state.fetch_or(WRITER_BIT, Ordering::SeqCst); + let guard = RwLockWriteGuard(self, lock); + + // If there are readers, we need to wait for them to finish. + while self.state.load(Ordering::SeqCst) != WRITER_BIT { + // Start listening for "no readers" events. + let listener = self.no_readers.listen(); + + // Check again if there are readers. + if self.state.load(Ordering::Acquire) != WRITER_BIT { + // Wait for the readers to finish. + listener.await; + } + } + + guard + } + + /// Returns a mutable reference to the inner value. + /// + /// Since this call borrows the lock mutably, no actual locking takes place. The mutable borrow + /// statically guarantees no locks exist. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_rwlock::RwLock; + /// + /// let mut lock = RwLock::new(1); + /// + /// *lock.get_mut() = 2; + /// assert_eq!(*lock.read().await, 2); + /// # }) + /// ``` + pub fn get_mut(&mut self) -> &mut T { + unsafe { &mut *self.value.get() } + } +} + +impl fmt::Debug for RwLock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + struct Locked; + impl fmt::Debug for Locked { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } + } + + match self.try_read() { + None => f.debug_struct("RwLock").field("value", &Locked).finish(), + Some(guard) => f.debug_struct("RwLock").field("value", &&*guard).finish(), + } + } +} + +impl From for RwLock { + fn from(val: T) -> RwLock { + RwLock::new(val) + } +} + +impl Default for RwLock { + fn default() -> RwLock { + RwLock::new(Default::default()) + } +} + +/// A guard that releases the read lock when dropped. +pub struct RwLockReadGuard<'a, T: ?Sized>(&'a RwLock); + +unsafe impl Send for RwLockReadGuard<'_, T> {} +unsafe impl Sync for RwLockReadGuard<'_, T> {} + +impl Drop for RwLockReadGuard<'_, T> { + fn drop(&mut self) { + // Decrement the number of readers. + if self.0.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER { + // If this was the last reader, trigger the "no readers" event. + self.0.no_readers.notify(1); + } + } +} + +impl fmt::Debug for RwLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl fmt::Display for RwLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +impl Deref for RwLockReadGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.0.value.get() } + } +} + +/// A guard that releases the upgradable read lock when dropped. +pub struct RwLockUpgradableReadGuard<'a, T: ?Sized> { + reader: RwLockReadGuard<'a, T>, + reserved: MutexGuard<'a, ()>, +} + +unsafe impl Send for RwLockUpgradableReadGuard<'_, T> {} +unsafe impl Sync for RwLockUpgradableReadGuard<'_, T> {} + +impl<'a, T: ?Sized> RwLockUpgradableReadGuard<'a, T> { + /// Converts this guard into a write guard. + fn into_writer(self) -> RwLockWriteGuard<'a, T> { + let writer = RwLockWriteGuard(self.reader.0, self.reserved); + mem::forget(self.reader); + writer + } + + /// Attempts to upgrade into a write lock. + /// + /// If a write lock could not be acquired at this time, then [`None`] is returned. Otherwise, + /// an upgraded guard is returned that releases the write lock when dropped. + /// + /// This function can only fail if there are other active read locks. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_rwlock::{RwLock, RwLockUpgradableReadGuard}; + /// + /// let lock = RwLock::new(1); + /// + /// let reader = lock.upgradable_read().await; + /// assert_eq!(*reader, 1); + /// + /// let reader2 = lock.read().await; + /// let reader = RwLockUpgradableReadGuard::try_upgrade(reader).unwrap_err(); + /// + /// drop(reader2); + /// let writer = RwLockUpgradableReadGuard::try_upgrade(reader).unwrap(); + /// # }) + /// ``` + pub fn try_upgrade(guard: Self) -> Result, Self> { + // If there are no readers, grab the write lock. + if guard + .reader + .0 + .state + .compare_and_swap(ONE_READER, WRITER_BIT, Ordering::AcqRel) + == ONE_READER + { + Ok(guard.into_writer()) + } else { + Err(guard) + } + } + + /// Upgrades into a write lock. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_rwlock::{RwLock, RwLockUpgradableReadGuard}; + /// + /// let lock = RwLock::new(1); + /// + /// let reader = lock.upgradable_read().await; + /// assert_eq!(*reader, 1); + /// + /// let mut writer = RwLockUpgradableReadGuard::upgrade(reader).await; + /// *writer = 2; + /// # }) + /// ``` + pub async fn upgrade(guard: Self) -> RwLockWriteGuard<'a, T> { + // Set `WRITER_BIT` and decrement the number of readers at the same time. + guard + .reader + .0 + .state + .fetch_sub(ONE_READER - WRITER_BIT, Ordering::SeqCst); + + // Convert into a write guard that unsets `WRITER_BIT` in case this future is canceled. + let guard = guard.into_writer(); + + // If there are readers, we need to wait for them to finish. + while guard.0.state.load(Ordering::SeqCst) != WRITER_BIT { + // Start listening for "no readers" events. + let listener = guard.0.no_readers.listen(); + + // Check again if there are readers. + if guard.0.state.load(Ordering::Acquire) != WRITER_BIT { + // Wait for the readers to finish. + listener.await; + } + } + + guard + } +} + +impl fmt::Debug for RwLockUpgradableReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl fmt::Display for RwLockUpgradableReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +impl Deref for RwLockUpgradableReadGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.reader.0.value.get() } + } +} + +/// A guard that releases the write lock when dropped. +pub struct RwLockWriteGuard<'a, T: ?Sized>(&'a RwLock, MutexGuard<'a, ()>); + +unsafe impl Send for RwLockWriteGuard<'_, T> {} +unsafe impl Sync for RwLockWriteGuard<'_, T> {} + +impl Drop for RwLockWriteGuard<'_, T> { + fn drop(&mut self) { + // Unset `WRITER_BIT`. + self.0.state.fetch_and(!WRITER_BIT, Ordering::SeqCst); + // Trigger the "no writer" event. + self.0.no_writer.notify(1); + } +} + +impl fmt::Debug for RwLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl fmt::Display for RwLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +impl Deref for RwLockWriteGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.0.value.get() } + } +} + +impl DerefMut for RwLockWriteGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.0.value.get() } + } +} diff --git a/async-rwlock/tests/std.rs b/async-rwlock/tests/std.rs new file mode 100644 index 0000000..249e583 --- /dev/null +++ b/async-rwlock/tests/std.rs @@ -0,0 +1,153 @@ +//! These tests were borrowed from `std::sync::RwLock`. + +use std::future::Future; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread; + +use async_rwlock::RwLock; +use futures_lite::{future, FutureExt}; + +fn spawn(f: impl Future + Send + 'static) -> future::Boxed { + let (s, r) = async_channel::bounded(1); + thread::spawn(move || { + future::block_on(async { + let _ = s.send(f.await).await; + }) + }); + async move { r.recv().await.unwrap() }.boxed() +} + +#[test] +fn smoke() { + future::block_on(async { + let lock = RwLock::new(()); + drop(lock.read().await); + drop(lock.write().await); + drop((lock.read().await, lock.read().await)); + drop(lock.write().await); + }); +} + +#[test] +fn try_write() { + future::block_on(async { + let lock = RwLock::new(0isize); + let read_guard = lock.read().await; + assert!(lock.try_write().is_none()); + drop(read_guard); + }); +} + +#[test] +fn into_inner() { + let lock = RwLock::new(10); + assert_eq!(lock.into_inner(), 10); +} + +#[test] +fn into_inner_and_drop() { + struct Counter(Arc); + + impl Drop for Counter { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + + let cnt = Arc::new(AtomicUsize::new(0)); + let lock = RwLock::new(Counter(cnt.clone())); + assert_eq!(cnt.load(Ordering::SeqCst), 0); + + { + let _inner = lock.into_inner(); + assert_eq!(cnt.load(Ordering::SeqCst), 0); + } + + assert_eq!(cnt.load(Ordering::SeqCst), 1); +} + +#[test] +fn get_mut() { + let mut lock = RwLock::new(10); + *lock.get_mut() = 20; + assert_eq!(lock.into_inner(), 20); +} + +#[test] +fn contention() { + const N: u32 = 10; + const M: usize = 1000; + + let (tx, rx) = async_channel::unbounded(); + let tx = Arc::new(tx); + let rw = Arc::new(RwLock::new(())); + + // Spawn N tasks that randomly acquire the lock M times. + for _ in 0..N { + let tx = tx.clone(); + let rw = rw.clone(); + + spawn(async move { + for _ in 0..M { + if fastrand::u32(..N) == 0 { + drop(rw.write().await); + } else { + drop(rw.read().await); + } + } + tx.send(()).await.unwrap(); + }); + } + + future::block_on(async move { + for _ in 0..N { + rx.recv().await.unwrap(); + } + }); +} + +#[test] +fn writer_and_readers() { + let lock = Arc::new(RwLock::new(0i32)); + let (tx, rx) = async_channel::unbounded(); + + // Spawn a writer task. + spawn({ + let lock = lock.clone(); + async move { + let mut lock = lock.write().await; + for _ in 0..1000 { + let tmp = *lock; + *lock = -1; + future::yield_now().await; + *lock = tmp + 1; + } + tx.send(()).await.unwrap(); + } + }); + + // Readers try to catch the writer in the act. + let mut readers = Vec::new(); + for _ in 0..5 { + let lock = lock.clone(); + readers.push(spawn(async move { + for _ in 0..1000 { + let lock = lock.read().await; + assert!(*lock >= 0); + } + })); + } + + future::block_on(async move { + // Wait for readers to pass their asserts. + for r in readers { + r.await; + } + + // Wait for writer to finish. + rx.recv().await.unwrap(); + let lock = lock.read().await; + assert_eq!(*lock, 1000); + }); +} diff --git a/async-rwlock/tests/upgradable.rs b/async-rwlock/tests/upgradable.rs new file mode 100644 index 0000000..a1e7a53 --- /dev/null +++ b/async-rwlock/tests/upgradable.rs @@ -0,0 +1,93 @@ +use std::sync::Arc; + +use async_rwlock::{RwLock, RwLockUpgradableReadGuard}; +use futures_lite::future; + +#[test] +fn upgrade() { + future::block_on(async { + let lock: RwLock = RwLock::new(0); + + let read_guard = lock.read().await; + let read_guard2 = lock.read().await; + // Should be able to obtain an upgradable lock. + let upgradable_guard = lock.upgradable_read().await; + // Should be able to obtain a read lock when an upgradable lock is active. + let read_guard3 = lock.read().await; + assert_eq!(0, *read_guard3); + drop(read_guard); + drop(read_guard2); + drop(read_guard3); + + // Writers should not pass. + assert!(lock.try_write().is_none()); + + let mut write_guard = RwLockUpgradableReadGuard::try_upgrade(upgradable_guard).expect( + "should be able to upgrade an upgradable lock because there are no more readers", + ); + *write_guard += 1; + drop(write_guard); + + let read_guard = lock.read().await; + assert_eq!(1, *read_guard) + }); +} + +#[test] +fn not_upgrade() { + future::block_on(async { + let mutex: RwLock = RwLock::new(0); + + let read_guard = mutex.read().await; + let read_guard2 = mutex.read().await; + // Should be able to obtain an upgradable lock. + let upgradable_guard = mutex.upgradable_read().await; + // Should be able to obtain a shared lock when an upgradable lock is active. + let read_guard3 = mutex.read().await; + assert_eq!(0, *read_guard3); + drop(read_guard); + drop(read_guard2); + drop(read_guard3); + + // Drop the upgradable lock. + drop(upgradable_guard); + + assert_eq!(0, *(mutex.read().await)); + + // Should be able to acquire a write lock because there are no more readers. + let mut write_guard = mutex.write().await; + *write_guard += 1; + drop(write_guard); + + let read_guard = mutex.read().await; + assert_eq!(1, *read_guard) + }); +} + +#[test] +fn upgradable_with_concurrent_writer() { + future::block_on(async { + let lock: Arc> = Arc::new(RwLock::new(0)); + let lock2 = lock.clone(); + + let upgradable_guard = lock.upgradable_read().await; + + future::or( + async move { + let mut write_guard = lock2.write().await; + *write_guard = 1; + }, + async move { + let mut write_guard = RwLockUpgradableReadGuard::upgrade(upgradable_guard).await; + assert_eq!(*write_guard, 0); + *write_guard = 2; + }, + ) + .await; + + assert_eq!(2, *(lock.write().await)); + + let read_guard = lock.read().await; + assert_eq!(2, *read_guard); + }); +} diff --git a/async-semaphore/Cargo.toml b/async-semaphore/Cargo.toml new file mode 100644 index 0000000..3e9573b --- /dev/null +++ b/async-semaphore/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "async-semaphore" +version = "1.1.0" +authors = ["Stjepan Glavina "] +edition = "2018" +description = "An async semaphore" +license = "Apache-2.0 OR MIT" +repository = "https://github.com/stjepang/async-lock" +homepage = "https://github.com/stjepang/async-lock" +documentation = "https://docs.rs/async-semaphore" +keywords = ["limit", "concurrent", "monitor", "synchronization", "sync"] +categories = ["asynchronous", "concurrency"] +readme = "../README.md" + +[dependencies] +event-listener = "2.4.0" + +[dev-dependencies] +futures-lite = "1.0.0" diff --git a/async-semaphore/src/lib.rs b/async-semaphore/src/lib.rs new file mode 100644 index 0000000..f857459 --- /dev/null +++ b/async-semaphore/src/lib.rs @@ -0,0 +1,196 @@ +//! An async semaphore. +//! +//! A semaphore is a synchronization primitive that limits the number of concurrent operations. + +#![forbid(unsafe_code)] +#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use event_listener::Event; + +/// A counter for limiting the number of concurrent operations. +#[derive(Debug)] +pub struct Semaphore { + count: AtomicUsize, + event: Event, +} + +impl Semaphore { + /// Creates a new semaphore with a limit of `n` concurrent operations. + /// + /// # Examples + /// + /// ``` + /// use async_semaphore::Semaphore; + /// + /// let s = Semaphore::new(5); + /// ``` + pub fn new(n: usize) -> Semaphore { + Semaphore { + count: AtomicUsize::new(n), + event: Event::new(), + } + } + + /// Attempts to get a permit for a concurrent operation. + /// + /// If the permit could not be acquired at this time, then [`None`] is returned. Otherwise, a + /// guard is returned that releases the mutex when dropped. + /// + /// # Examples + /// + /// ``` + /// use async_semaphore::Semaphore; + /// + /// let s = Semaphore::new(2); + /// + /// let g1 = s.try_acquire().unwrap(); + /// let g2 = s.try_acquire().unwrap(); + /// + /// assert!(s.try_acquire().is_none()); + /// drop(g2); + /// assert!(s.try_acquire().is_some()); + /// ``` + pub fn try_acquire(&self) -> Option> { + let mut count = self.count.load(Ordering::Acquire); + loop { + if count == 0 { + return None; + } + + match self.count.compare_exchange_weak( + count, + count - 1, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => return Some(SemaphoreGuard(self)), + Err(c) => count = c, + } + } + } + + /// Waits for a permit for a concurrent operation. + /// + /// Returns a guard that releases the permit when dropped. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_semaphore::Semaphore; + /// + /// let s = Semaphore::new(2); + /// let guard = s.acquire().await; + /// # }); + /// ``` + pub async fn acquire(&self) -> SemaphoreGuard<'_> { + let mut listener = None; + + loop { + if let Some(guard) = self.try_acquire() { + return guard; + } + + match listener.take() { + None => listener = Some(self.event.listen()), + Some(l) => l.await, + } + } + } +} + +impl Semaphore { + /// Attempts to get an owned permit for a concurrent operation. + /// + /// If the permit could not be acquired at this time, then [`None`] is returned. Otherwise, an + /// owned guard is returned that releases the mutex when dropped. + /// + /// # Examples + /// + /// ``` + /// use async_semaphore::Semaphore; + /// use std::sync::Arc; + /// + /// let s = Arc::new(Semaphore::new(2)); + /// + /// let g1 = s.try_acquire_arc().unwrap(); + /// let g2 = s.try_acquire_arc().unwrap(); + /// + /// assert!(s.try_acquire_arc().is_none()); + /// drop(g2); + /// assert!(s.try_acquire_arc().is_some()); + /// ``` + pub fn try_acquire_arc(self: &Arc) -> Option { + let mut count = self.count.load(Ordering::Acquire); + loop { + if count == 0 { + return None; + } + + match self.count.compare_exchange_weak( + count, + count - 1, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => return Some(SemaphoreGuardArc(self.clone())), + Err(c) => count = c, + } + } + } + + /// Waits for an owned permit for a concurrent operation. + /// + /// Returns a guard that releases the permit when dropped. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_semaphore::Semaphore; + /// use std::sync::Arc; + /// + /// let s = Arc::new(Semaphore::new(2)); + /// let guard = s.acquire_arc().await; + /// # }); + /// ``` + pub async fn acquire_arc(self: &Arc) -> SemaphoreGuardArc { + let mut listener = None; + + loop { + if let Some(guard) = self.try_acquire_arc() { + return guard; + } + + match listener.take() { + None => listener = Some(self.event.listen()), + Some(l) => l.await, + } + } + } +} + +/// A guard that releases the acquired permit. +#[derive(Debug)] +pub struct SemaphoreGuard<'a>(&'a Semaphore); + +impl Drop for SemaphoreGuard<'_> { + fn drop(&mut self) { + self.0.count.fetch_add(1, Ordering::AcqRel); + self.0.event.notify(1); + } +} + +/// An owned guard that releases the acquired permit. +#[derive(Debug)] +pub struct SemaphoreGuardArc(Arc); + +impl Drop for SemaphoreGuardArc { + fn drop(&mut self) { + self.0.count.fetch_add(1, Ordering::AcqRel); + self.0.event.notify(1); + } +} diff --git a/async-semaphore/tests/semaphore.rs b/async-semaphore/tests/semaphore.rs new file mode 100644 index 0000000..b8a52e7 --- /dev/null +++ b/async-semaphore/tests/semaphore.rs @@ -0,0 +1,83 @@ +use std::sync::{mpsc, Arc}; +use std::thread; + +use async_semaphore::Semaphore; +use futures_lite::future; + +#[test] +fn try_acquire() { + let s = Semaphore::new(2); + let g1 = s.try_acquire().unwrap(); + let _g2 = s.try_acquire().unwrap(); + + assert!(s.try_acquire().is_none()); + drop(g1); + assert!(s.try_acquire().is_some()); +} + +#[test] +fn stress() { + let s = Arc::new(Semaphore::new(5)); + let (tx, rx) = mpsc::channel::<()>(); + + for _ in 0..50 { + let s = s.clone(); + let tx = tx.clone(); + + thread::spawn(move || { + future::block_on(async { + for _ in 0..10_000 { + s.acquire().await; + } + drop(tx); + }) + }); + } + + drop(tx); + let _ = rx.recv(); + + let _g1 = s.try_acquire().unwrap(); + let g2 = s.try_acquire().unwrap(); + let _g3 = s.try_acquire().unwrap(); + let _g4 = s.try_acquire().unwrap(); + let _g5 = s.try_acquire().unwrap(); + + assert!(s.try_acquire().is_none()); + drop(g2); + assert!(s.try_acquire().is_some()); +} + +#[test] +fn as_mutex() { + let s = Arc::new(Semaphore::new(1)); + let s2 = s.clone(); + let _t = thread::spawn(move || { + future::block_on(async { + let _g = s2.acquire().await; + }); + }); + future::block_on(async { + let _g = s.acquire().await; + }); +} + +#[test] +fn multi_resource() { + let s = Arc::new(Semaphore::new(2)); + let s2 = s.clone(); + let (tx1, rx1) = mpsc::channel(); + let (tx2, rx2) = mpsc::channel(); + let _t = thread::spawn(move || { + future::block_on(async { + let _g = s2.acquire().await; + let _ = rx2.recv(); + tx1.send(()).unwrap(); + }); + }); + future::block_on(async { + let _g = s.acquire().await; + tx2.send(()).unwrap(); + rx1.recv().unwrap(); + }); +}