Merge crates into a single repo

This commit is contained in:
Stjepan Glavina 2020-09-21 18:58:31 +02:00
parent 70de794cf7
commit fb080c2ff4
22 changed files with 2243 additions and 1 deletions

View File

@ -49,3 +49,4 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: --all

2
.gitignore vendored
View File

@ -1,2 +1,2 @@
/target
target/
Cargo.lock

View File

@ -17,3 +17,12 @@ async-barrier = "1.0.1"
async-mutex = "1.3.0"
async-rwlock = "1.1.0"
async-semaphore = "1.1.0"
[workspace]
members = [
".",
"async-barrier",
"async-mutex",
"async-rwlock",
"async-semaphore",
]

21
async-barrier/Cargo.toml Normal file
View File

@ -0,0 +1,21 @@
[package]
name = "async-barrier"
version = "1.0.1"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
description = "An async barrier"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/stjepang/async-lock"
homepage = "https://github.com/stjepang/async-lock"
documentation = "https://docs.rs/async-barrier"
keywords = ["rendezvous", "sync", "async", "synchronize", "synchronization"]
categories = ["asynchronous", "concurrency"]
readme = "../README.md"
[dependencies]
async-mutex = "1.1.5"
event-listener = "2.4.0"
[dev-dependencies]
async-channel = "1.4.1"
futures-lite = "1.0.0"

139
async-barrier/src/lib.rs Normal file
View File

@ -0,0 +1,139 @@
//! An async barrier.
//!
//! This crate is an async version of [`std::sync::Barrier`].
#![forbid(unsafe_code)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use async_mutex::Mutex;
use event_listener::Event;
/// A counter to synchronize multiple tasks at the same time.
#[derive(Debug)]
pub struct Barrier {
n: usize,
state: Mutex<State>,
event: Event,
}
#[derive(Debug)]
struct State {
count: usize,
generation_id: u64,
}
impl Barrier {
/// Creates a barrier that can block the given number of tasks.
///
/// A barrier will block `n`-1 tasks which call [`wait()`] and then wake up all tasks
/// at once when the `n`th task calls [`wait()`].
///
/// [`wait()`]: `Barrier::wait()`
///
/// # Examples
///
/// ```
/// use async_barrier::Barrier;
///
/// let barrier = Barrier::new(5);
/// ```
pub fn new(n: usize) -> Barrier {
Barrier {
n,
state: Mutex::new(State {
count: 0,
generation_id: 0,
}),
event: Event::new(),
}
}
/// Blocks the current task until all tasks reach this point.
///
/// Barriers are reusable after all tasks have synchronized, and can be used continuously.
///
/// Returns a [`BarrierWaitResult`] indicating whether this task is the "leader", meaning the
/// last task to call this method.
///
/// # Examples
///
/// ```
/// use async_barrier::Barrier;
/// use futures_lite::future;
/// use std::sync::Arc;
/// use std::thread;
///
/// let barrier = Arc::new(Barrier::new(5));
///
/// for _ in 0..5 {
/// let b = barrier.clone();
/// thread::spawn(move || {
/// future::block_on(async {
/// // The same messages will be printed together.
/// // There will NOT be interleaving of "before" and "after".
/// println!("before wait");
/// b.wait().await;
/// println!("after wait");
/// });
/// });
/// }
/// ```
pub async fn wait(&self) -> BarrierWaitResult {
let mut state = self.state.lock().await;
let local_gen = state.generation_id;
state.count += 1;
if state.count < self.n {
while local_gen == state.generation_id && state.count < self.n {
let listener = self.event.listen();
drop(state);
listener.await;
state = self.state.lock().await;
}
BarrierWaitResult { is_leader: false }
} else {
state.count = 0;
state.generation_id = state.generation_id.wrapping_add(1);
self.event.notify(std::usize::MAX);
BarrierWaitResult { is_leader: true }
}
}
}
/// Returned by [`Barrier::wait()`] when all tasks have called it.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_barrier::Barrier;
///
/// let barrier = Barrier::new(1);
/// let barrier_wait_result = barrier.wait().await;
/// # });
/// ```
#[derive(Debug, Clone)]
pub struct BarrierWaitResult {
is_leader: bool,
}
impl BarrierWaitResult {
/// Returns `true` if this task was the last to call to [`Barrier::wait()`].
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_barrier::Barrier;
/// use futures_lite::future;
///
/// let barrier = Barrier::new(2);
/// let (a, b) = future::zip(barrier.wait(), barrier.wait()).await;
/// assert_eq!(a.is_leader(), false);
/// assert_eq!(b.is_leader(), true);
/// # });
/// ```
pub fn is_leader(&self) -> bool {
self.is_leader
}
}

View File

@ -0,0 +1,49 @@
use std::sync::Arc;
use std::thread;
use async_barrier::Barrier;
use futures_lite::future;
#[test]
fn smoke() {
future::block_on(async move {
const N: usize = 10;
let barrier = Arc::new(Barrier::new(N));
for _ in 0..10 {
let (tx, rx) = async_channel::unbounded();
for _ in 0..N - 1 {
let c = barrier.clone();
let tx = tx.clone();
thread::spawn(move || {
future::block_on(async move {
let res = c.wait().await;
tx.send(res.is_leader()).await.unwrap();
})
});
}
// At this point, all spawned threads should be blocked,
// so we shouldn't get anything from the cahnnel.
let res = rx.try_recv();
assert!(match res {
Err(_err) => true,
_ => false,
});
let mut leader_found = barrier.wait().await.is_leader();
// Now, the barrier is cleared and we should get data.
for _ in 0..N - 1 {
if rx.recv().await.unwrap() {
assert!(!leader_found);
leader_found = true;
}
}
assert!(leader_found);
}
});
}

24
async-mutex/Cargo.toml Normal file
View File

@ -0,0 +1,24 @@
[package]
name = "async-mutex"
version = "1.3.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
description = "Async mutex"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/stjepang/async-lock"
homepage = "https://github.com/stjepang/async-lock"
documentation = "https://docs.rs/async-mutex"
keywords = ["asynchronous", "mutex", "lock", "synchronization"]
categories = ["asynchronous", "concurrency"]
readme = "../README.md"
[dependencies]
event-listener = "2.0.0"
[dev-dependencies]
async-std = "1.6.2"
futures = "0.3.5"
futures-intrusive = "0.3.1"
futures-lite = "1.0.0"
smol = "0.1.18"
tokio = { version = "0.2.21", features = ["sync", "parking_lot"] }

View File

@ -0,0 +1,42 @@
#![feature(test)]
extern crate test;
use std::sync::Arc;
use async_mutex::Mutex;
use async_std::task;
use test::Bencher;
#[bench]
fn create(b: &mut Bencher) {
b.iter(|| Mutex::new(()));
}
#[bench]
fn contention(b: &mut Bencher) {
b.iter(|| task::block_on(run(10, 1000)));
}
#[bench]
fn no_contention(b: &mut Bencher) {
b.iter(|| task::block_on(run(1, 10000)));
}
async fn run(task: usize, iter: usize) {
let m = Arc::new(Mutex::new(()));
let mut tasks = Vec::new();
for _ in 0..task {
let m = m.clone();
tasks.push(task::spawn(async move {
for _ in 0..iter {
let _ = m.lock().await;
}
}));
}
for t in tasks {
t.await;
}
}

View File

@ -0,0 +1,42 @@
#![feature(test)]
extern crate test;
use std::sync::Arc;
use async_std::sync::Mutex;
use async_std::task;
use test::Bencher;
#[bench]
fn create(b: &mut Bencher) {
b.iter(|| Mutex::new(()));
}
#[bench]
fn contention(b: &mut Bencher) {
b.iter(|| task::block_on(run(10, 1000)));
}
#[bench]
fn no_contention(b: &mut Bencher) {
b.iter(|| task::block_on(run(1, 10000)));
}
async fn run(task: usize, iter: usize) {
let m = Arc::new(Mutex::new(()));
let mut tasks = Vec::new();
for _ in 0..task {
let m = m.clone();
tasks.push(task::spawn(async move {
for _ in 0..iter {
let _ = m.lock().await;
}
}));
}
for t in tasks {
t.await;
}
}

View File

@ -0,0 +1,42 @@
#![feature(test)]
extern crate test;
use std::sync::Arc;
use async_std::task;
use futures_intrusive::sync::Mutex;
use test::Bencher;
#[bench]
fn create(b: &mut Bencher) {
b.iter(|| Mutex::new((), true));
}
#[bench]
fn contention(b: &mut Bencher) {
b.iter(|| task::block_on(run(10, 1000)));
}
#[bench]
fn no_contention(b: &mut Bencher) {
b.iter(|| task::block_on(run(1, 10000)));
}
async fn run(task: usize, iter: usize) {
let m = Arc::new(Mutex::new((), true));
let mut tasks = Vec::new();
for _ in 0..task {
let m = m.clone();
tasks.push(task::spawn(async move {
for _ in 0..iter {
let _ = m.lock().await;
}
}));
}
for t in tasks {
t.await;
}
}

View File

@ -0,0 +1,42 @@
#![feature(test)]
extern crate test;
use std::sync::Arc;
use async_std::task;
use futures::lock::Mutex;
use test::Bencher;
#[bench]
fn create(b: &mut Bencher) {
b.iter(|| Mutex::new(()));
}
#[bench]
fn contention(b: &mut Bencher) {
b.iter(|| task::block_on(run(10, 1000)));
}
#[bench]
fn no_contention(b: &mut Bencher) {
b.iter(|| task::block_on(run(1, 10000)));
}
async fn run(task: usize, iter: usize) {
let m = Arc::new(Mutex::new(()));
let mut tasks = Vec::new();
for _ in 0..task {
let m = m.clone();
tasks.push(task::spawn(async move {
for _ in 0..iter {
let _ = m.lock().await;
}
}));
}
for t in tasks {
t.await;
}
}

View File

@ -0,0 +1,42 @@
#![feature(test)]
extern crate test;
use std::sync::Arc;
use async_std::task;
use test::Bencher;
use tokio::sync::Mutex;
#[bench]
fn create(b: &mut Bencher) {
b.iter(|| Mutex::new(()));
}
#[bench]
fn contention(b: &mut Bencher) {
b.iter(|| task::block_on(run(10, 1000)));
}
#[bench]
fn no_contention(b: &mut Bencher) {
b.iter(|| task::block_on(run(1, 10000)));
}
async fn run(task: usize, iter: usize) {
let m = Arc::new(Mutex::new(()));
let mut tasks = Vec::new();
for _ in 0..task {
let m = m.clone();
tasks.push(task::spawn(async move {
for _ in 0..iter {
let _ = m.lock().await;
}
}));
}
for t in tasks {
t.await;
}
}

View File

@ -0,0 +1,38 @@
//! Demonstrates fairness properties of the mutex.
//!
//! A number of threads run a loop in which they hold the lock for a little bit and re-acquire it
//! immediately after. In the end we print the number of times each thread acquired the lock.
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use async_mutex::Mutex;
use smol::Timer;
fn main() {
let num_threads = 30;
let mut threads = Vec::new();
let hits = Arc::new(Mutex::new(vec![0; num_threads]));
for i in 0..num_threads {
let hits = hits.clone();
threads.push(thread::spawn(move || {
smol::run(async {
let start = Instant::now();
while start.elapsed() < Duration::from_secs(1) {
let mut hits = hits.lock().await;
hits[i] += 1;
Timer::after(Duration::from_micros(5000)).await;
}
})
}));
}
for t in threads {
t.join().unwrap();
}
dbg!(hits);
}

455
async-mutex/src/lib.rs Normal file
View File

@ -0,0 +1,455 @@
//! An async mutex.
//!
//! The locking mechanism uses eventual fairness to ensure locking will be fair on average without
//! sacrificing performance. This is done by forcing a fair lock whenever a lock operation is
//! starved for longer than 0.5 milliseconds.
//!
//! # Examples
//!
//! ```
//! # futures_lite::future::block_on(async {
//! use async_mutex::Mutex;
//!
//! let m = Mutex::new(1);
//!
//! let mut guard = m.lock().await;
//! *guard = 2;
//!
//! assert!(m.try_lock().is_none());
//! drop(guard);
//! assert_eq!(*m.try_lock().unwrap(), 2);
//! # })
//! ```
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::cell::UnsafeCell;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::process;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::usize;
use event_listener::Event;
/// An async mutex.
pub struct Mutex<T: ?Sized> {
/// Current state of the mutex.
///
/// The least significant bit is set to 1 if the mutex is locked.
/// The other bits hold the number of starved lock operations.
state: AtomicUsize,
/// Lock operations waiting for the mutex to be released.
lock_ops: Event,
/// The value inside the mutex.
data: UnsafeCell<T>,
}
unsafe impl<T: Send + ?Sized> Send for Mutex<T> {}
unsafe impl<T: Send + ?Sized> Sync for Mutex<T> {}
impl<T> Mutex<T> {
/// Creates a new async mutex.
///
/// # Examples
///
/// ```
/// use async_mutex::Mutex;
///
/// let mutex = Mutex::new(0);
/// ```
pub fn new(data: T) -> Mutex<T> {
Mutex {
state: AtomicUsize::new(0),
lock_ops: Event::new(),
data: UnsafeCell::new(data),
}
}
/// Consumes the mutex, returning the underlying data.
///
/// # Examples
///
/// ```
/// use async_mutex::Mutex;
///
/// let mutex = Mutex::new(10);
/// assert_eq!(mutex.into_inner(), 10);
/// ```
pub fn into_inner(self) -> T {
self.data.into_inner()
}
}
impl<T: ?Sized> Mutex<T> {
/// Acquires the mutex.
///
/// Returns a guard that releases the mutex when dropped.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_mutex::Mutex;
///
/// let mutex = Mutex::new(10);
/// let guard = mutex.lock().await;
/// assert_eq!(*guard, 10);
/// # })
/// ```
#[inline]
pub async fn lock(&self) -> MutexGuard<'_, T> {
if let Some(guard) = self.try_lock() {
return guard;
}
self.acquire_slow().await;
MutexGuard(self)
}
/// Slow path for acquiring the mutex.
#[cold]
async fn acquire_slow(&self) {
// Get the current time.
let start = Instant::now();
loop {
// Start listening for events.
let listener = self.lock_ops.listen();
// Try locking if nobody is being starved.
match self.state.compare_and_swap(0, 1, Ordering::Acquire) {
// Lock acquired!
0 => return,
// Lock is held and nobody is starved.
1 => {}
// Somebody is starved.
_ => break,
}
// Wait for a notification.
listener.await;
// Try locking if nobody is being starved.
match self.state.compare_and_swap(0, 1, Ordering::Acquire) {
// Lock acquired!
0 => return,
// Lock is held and nobody is starved.
1 => {}
// Somebody is starved.
_ => {
// Notify the first listener in line because we probably received a
// notification that was meant for a starved task.
self.lock_ops.notify(1);
break;
}
}
// If waiting for too long, fall back to a fairer locking strategy that will prevent
// newer lock operations from starving us forever.
if start.elapsed() > Duration::from_micros(500) {
break;
}
}
// Increment the number of starved lock operations.
if self.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 {
// In case of potential overflow, abort.
process::abort();
}
// Decrement the counter when exiting this function.
let _call = CallOnDrop(|| {
self.state.fetch_sub(2, Ordering::Release);
});
loop {
// Start listening for events.
let listener = self.lock_ops.listen();
// Try locking if nobody else is being starved.
match self.state.compare_and_swap(2, 2 | 1, Ordering::Acquire) {
// Lock acquired!
2 => return,
// Lock is held by someone.
s if s % 2 == 1 => {}
// Lock is available.
_ => {
// Be fair: notify the first listener and then go wait in line.
self.lock_ops.notify(1);
}
}
// Wait for a notification.
listener.await;
// Try acquiring the lock without waiting for others.
if self.state.fetch_or(1, Ordering::Acquire) % 2 == 0 {
return;
}
}
}
/// Attempts to acquire the mutex.
///
/// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a
/// guard is returned that releases the mutex when dropped.
///
/// # Examples
///
/// ```
/// use async_mutex::Mutex;
///
/// let mutex = Mutex::new(10);
/// if let Some(guard) = mutex.try_lock() {
/// assert_eq!(*guard, 10);
/// }
/// # ;
/// ```
#[inline]
pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
if self.state.compare_and_swap(0, 1, Ordering::Acquire) == 0 {
Some(MutexGuard(self))
} else {
None
}
}
/// Returns a mutable reference to the underlying data.
///
/// Since this call borrows the mutex mutably, no actual locking takes place -- the mutable
/// borrow statically guarantees the mutex is not already acquired.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_mutex::Mutex;
///
/// let mut mutex = Mutex::new(0);
/// *mutex.get_mut() = 10;
/// assert_eq!(*mutex.lock().await, 10);
/// # })
/// ```
pub fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.data.get() }
}
}
impl<T: ?Sized> Mutex<T> {
/// Acquires the mutex and clones a reference to it.
///
/// Returns an owned guard that releases the mutex when dropped.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_mutex::Mutex;
/// use std::sync::Arc;
///
/// let mutex = Arc::new(Mutex::new(10));
/// let guard = mutex.lock_arc().await;
/// assert_eq!(*guard, 10);
/// # })
/// ```
#[inline]
pub async fn lock_arc(self: &Arc<Self>) -> MutexGuardArc<T> {
if let Some(guard) = self.try_lock_arc() {
return guard;
}
self.acquire_slow().await;
MutexGuardArc(self.clone())
}
/// Attempts to acquire the mutex and clone a reference to it.
///
/// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, an
/// owned guard is returned that releases the mutex when dropped.
///
/// # Examples
///
/// ```
/// use async_mutex::Mutex;
/// use std::sync::Arc;
///
/// let mutex = Arc::new(Mutex::new(10));
/// if let Some(guard) = mutex.try_lock() {
/// assert_eq!(*guard, 10);
/// }
/// # ;
/// ```
#[inline]
pub fn try_lock_arc(self: &Arc<Self>) -> Option<MutexGuardArc<T>> {
if self.state.compare_and_swap(0, 1, Ordering::Acquire) == 0 {
Some(MutexGuardArc(self.clone()))
} else {
None
}
}
}
impl<T: fmt::Debug + ?Sized> fmt::Debug for Mutex<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
struct Locked;
impl fmt::Debug for Locked {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<locked>")
}
}
match self.try_lock() {
None => f.debug_struct("Mutex").field("data", &Locked).finish(),
Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(),
}
}
}
impl<T> From<T> for Mutex<T> {
fn from(val: T) -> Mutex<T> {
Mutex::new(val)
}
}
impl<T: Default + ?Sized> Default for Mutex<T> {
fn default() -> Mutex<T> {
Mutex::new(Default::default())
}
}
/// A guard that releases the mutex when dropped.
pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex<T>);
unsafe impl<T: Send + ?Sized> Send for MutexGuard<'_, T> {}
unsafe impl<T: Sync + ?Sized> Sync for MutexGuard<'_, T> {}
impl<'a, T: ?Sized> MutexGuard<'a, T> {
/// Returns a reference to the mutex a guard came from.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_mutex::{Mutex, MutexGuard};
///
/// let mutex = Mutex::new(10i32);
/// let guard = mutex.lock().await;
/// dbg!(MutexGuard::source(&guard));
/// # })
/// ```
pub fn source(guard: &MutexGuard<'a, T>) -> &'a Mutex<T> {
guard.0
}
}
impl<T: ?Sized> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
// Remove the last bit and notify a waiting lock operation.
self.0.state.fetch_sub(1, Ordering::Release);
self.0.lock_ops.notify(1);
}
}
impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
(**self).fmt(f)
}
}
impl<T: ?Sized> Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.0.data.get() }
}
}
impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.0.data.get() }
}
}
/// An owned guard that releases the mutex when dropped.
pub struct MutexGuardArc<T: ?Sized>(Arc<Mutex<T>>);
unsafe impl<T: Send + ?Sized> Send for MutexGuardArc<T> {}
unsafe impl<T: Sync + ?Sized> Sync for MutexGuardArc<T> {}
impl<T: ?Sized> MutexGuardArc<T> {
/// Returns a reference to the mutex a guard came from.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_mutex::{Mutex, MutexGuardArc};
/// use std::sync::Arc;
///
/// let mutex = Arc::new(Mutex::new(10i32));
/// let guard = mutex.lock_arc().await;
/// dbg!(MutexGuardArc::source(&guard));
/// # })
/// ```
pub fn source(guard: &MutexGuardArc<T>) -> &Arc<Mutex<T>> {
&guard.0
}
}
impl<T: ?Sized> Drop for MutexGuardArc<T> {
fn drop(&mut self) {
// Remove the last bit and notify a waiting lock operation.
self.0.state.fetch_sub(1, Ordering::Release);
self.0.lock_ops.notify(1);
}
}
impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuardArc<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuardArc<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
(**self).fmt(f)
}
}
impl<T: ?Sized> Deref for MutexGuardArc<T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.0.data.get() }
}
}
impl<T: ?Sized> DerefMut for MutexGuardArc<T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.0.data.get() }
}
}
/// Calls a function when dropped.
struct CallOnDrop<F: Fn()>(F);
impl<F: Fn()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}

View File

@ -0,0 +1,67 @@
use std::sync::Arc;
use std::thread;
use async_mutex::Mutex;
use futures::channel::mpsc;
use futures::executor::block_on;
use futures::prelude::*;
#[test]
fn smoke() {
block_on(async {
let m = Mutex::new(());
drop(m.lock().await);
drop(m.lock().await);
})
}
#[test]
fn try_lock() {
let m = Mutex::new(());
*m.try_lock().unwrap() = ();
}
#[test]
fn into_inner() {
let m = Mutex::new(10i32);
assert_eq!(m.into_inner(), 10);
}
#[test]
fn get_mut() {
let mut m = Mutex::new(10i32);
*m.get_mut() = 20;
assert_eq!(m.into_inner(), 20);
}
#[test]
fn contention() {
block_on(async {
let (tx, mut rx) = mpsc::unbounded();
let tx = Arc::new(tx);
let mutex = Arc::new(Mutex::new(0i32));
let num_tasks = 100;
for _ in 0..num_tasks {
let tx = tx.clone();
let mutex = mutex.clone();
thread::spawn(|| {
block_on(async move {
let mut lock = mutex.lock().await;
*lock += 1;
tx.unbounded_send(()).unwrap();
drop(lock);
})
});
}
for _ in 0..num_tasks {
rx.next().await.unwrap();
}
let lock = mutex.lock().await;
assert_eq!(num_tasks, *lock);
});
}

22
async-rwlock/Cargo.toml Normal file
View File

@ -0,0 +1,22 @@
[package]
name = "async-rwlock"
version = "1.2.1"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
description = "Async reader-writer lock"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/stjepang/async-lock"
homepage = "https://github.com/stjepang/async-lock"
documentation = "https://docs.rs/async-rwlock"
keywords = ["asynchronous", "rwlock", "mutex", "lock", "synchronization"]
categories = ["asynchronous", "concurrency"]
readme = "../README.md"
[dependencies]
async-mutex = "1.1.5"
event-listener = "2.4.0"
[dev-dependencies]
async-channel = "1.4.1"
fastrand = "1.3.4"
futures-lite = "1.0.0"

663
async-rwlock/src/lib.rs Normal file
View File

@ -0,0 +1,663 @@
//! An async reader-writer lock.
//!
//! This type of lock allows multiple readers or one writer at any point in time.
//!
//! The locking strategy is write-preferring, which means writers are never starved.
//! Releasing a write lock wakes the next blocked reader and the next blocked writer.
//!
//! # Examples
//!
//! ```
//! # futures_lite::future::block_on(async {
//! use async_rwlock::RwLock;
//!
//! let lock = RwLock::new(5);
//!
//! // Multiple read locks can be held at a time.
//! let r1 = lock.read().await;
//! let r2 = lock.read().await;
//! assert_eq!(*r1, 5);
//! assert_eq!(*r2, 5);
//! drop((r1, r2));
//!
//! // Only one write lock can be held at a time.
//! let mut w = lock.write().await;
//! *w += 1;
//! assert_eq!(*w, 6);
//! # })
//! ```
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::cell::UnsafeCell;
use std::fmt;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::process;
use std::sync::atomic::{AtomicUsize, Ordering};
use async_mutex::{Mutex, MutexGuard};
use event_listener::Event;
const WRITER_BIT: usize = 1;
const ONE_READER: usize = 2;
/// An async reader-writer lock.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_rwlock::RwLock;
///
/// let lock = RwLock::new(5);
///
/// // Multiple read locks can be held at a time.
/// let r1 = lock.read().await;
/// let r2 = lock.read().await;
/// assert_eq!(*r1, 5);
/// assert_eq!(*r2, 5);
/// drop((r1, r2));
///
/// // Only one write locks can be held at a time.
/// let mut w = lock.write().await;
/// *w += 1;
/// assert_eq!(*w, 6);
/// # })
/// ```
pub struct RwLock<T: ?Sized> {
/// Acquired by the writer.
mutex: Mutex<()>,
/// Event triggered when the last reader is dropped.
no_readers: Event,
/// Event triggered when the writer is dropped.
no_writer: Event,
/// Current state of the lock.
///
/// The least significant bit (`WRITER_BIT`) is set to 1 when a writer is holding the lock or
/// trying to acquire it.
///
/// The upper bits contain the number of currently active readers. Each active reader
/// increments the state by `ONE_READER`.
state: AtomicUsize,
/// The inner value.
value: UnsafeCell<T>,
}
unsafe impl<T: Send + ?Sized> Send for RwLock<T> {}
unsafe impl<T: Send + Sync + ?Sized> Sync for RwLock<T> {}
impl<T> RwLock<T> {
/// Creates a new reader-writer lock.
///
/// # Examples
///
/// ```
/// use async_rwlock::RwLock;
///
/// let lock = RwLock::new(0);
/// ```
pub fn new(t: T) -> RwLock<T> {
RwLock {
mutex: Mutex::new(()),
no_readers: Event::new(),
no_writer: Event::new(),
state: AtomicUsize::new(0),
value: UnsafeCell::new(t),
}
}
/// Unwraps the lock and returns the inner value.
///
/// # Examples
///
/// ```
/// use async_rwlock::RwLock;
///
/// let lock = RwLock::new(5);
/// assert_eq!(lock.into_inner(), 5);
/// ```
pub fn into_inner(self) -> T {
self.value.into_inner()
}
}
impl<T: ?Sized> RwLock<T> {
/// Attempts to acquire a read lock.
///
/// If a read lock could not be acquired at this time, then [`None`] is returned. Otherwise, a
/// guard is returned that releases the lock when dropped.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_rwlock::RwLock;
///
/// let lock = RwLock::new(1);
///
/// let reader = lock.read().await;
/// assert_eq!(*reader, 1);
///
/// assert!(lock.try_read().is_some());
/// # })
/// ```
pub fn try_read(&self) -> Option<RwLockReadGuard<'_, T>> {
let mut state = self.state.load(Ordering::Acquire);
loop {
// If there's a writer holding the lock or attempting to acquire it, we cannot acquire
// a read lock here.
if state & WRITER_BIT != 0 {
return None;
}
// Make sure the number of readers doesn't overflow.
if state > std::isize::MAX as usize {
process::abort();
}
// Increment the number of readers.
match self.state.compare_exchange(
state,
state + ONE_READER,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return Some(RwLockReadGuard(self)),
Err(s) => state = s,
}
}
}
/// Acquires a read lock.
///
/// Returns a guard that releases the lock when dropped.
///
/// Note that attempts to acquire a read lock will block if there are also concurrent attempts
/// to acquire a write lock.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_rwlock::RwLock;
///
/// let lock = RwLock::new(1);
///
/// let reader = lock.read().await;
/// assert_eq!(*reader, 1);
///
/// assert!(lock.try_read().is_some());
/// # })
/// ```
pub async fn read(&self) -> RwLockReadGuard<'_, T> {
let mut state = self.state.load(Ordering::Acquire);
loop {
if state & WRITER_BIT == 0 {
// Make sure the number of readers doesn't overflow.
if state > std::isize::MAX as usize {
process::abort();
}
// If nobody is holding a write lock or attempting to acquire it, increment the
// number of readers.
match self.state.compare_exchange(
state,
state + ONE_READER,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return RwLockReadGuard(self),
Err(s) => state = s,
}
} else {
// Start listening for "no writer" events.
let listener = self.no_writer.listen();
// Check again if there's a writer.
if self.state.load(Ordering::SeqCst) & WRITER_BIT != 0 {
// Wait until the writer is dropped.
listener.await;
// Notify the next reader waiting in line.
self.no_writer.notify(1);
}
// Reload the state.
state = self.state.load(Ordering::Acquire);
}
}
}
/// Attempts to acquire a read lock with the possiblity to upgrade to a write lock.
///
/// If a read lock could not be acquired at this time, then [`None`] is returned. Otherwise, a
/// guard is returned that releases the lock when dropped.
///
/// Upgradable read lock reserves the right to be upgraded to a write lock, which means there
/// can be at most one upgradable read lock at a time.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_rwlock::{RwLock, RwLockUpgradableReadGuard};
///
/// let lock = RwLock::new(1);
///
/// let reader = lock.upgradable_read().await;
/// assert_eq!(*reader, 1);
/// assert_eq!(*lock.try_read().unwrap(), 1);
///
/// let mut writer = RwLockUpgradableReadGuard::upgrade(reader).await;
/// *writer = 2;
/// # })
/// ```
#[inline]
pub fn try_upgradable_read(&self) -> Option<RwLockUpgradableReadGuard<'_, T>> {
// First try grabbing the mutex.
let lock = self.mutex.try_lock()?;
let mut state = self.state.load(Ordering::Acquire);
// Make sure the number of readers doesn't overflow.
if state > std::isize::MAX as usize {
process::abort();
}
// Increment the number of readers.
loop {
match self.state.compare_exchange(
state,
state + ONE_READER,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
return Some(RwLockUpgradableReadGuard {
reader: RwLockReadGuard(self),
reserved: lock,
})
}
Err(s) => state = s,
}
}
}
/// Attempts to acquire a read lock with the possiblity to upgrade to a write lock.
///
/// Returns a guard that releases the lock when dropped.
///
/// Upgradable read lock reserves the right to be upgraded to a write lock, which means there
/// can be at most one upgradable read lock at a time.
///
/// Note that attempts to acquire an upgradable read lock will block if there are concurrent
/// attempts to acquire another upgradable read lock or a write lock.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_rwlock::{RwLock, RwLockUpgradableReadGuard};
///
/// let lock = RwLock::new(1);
///
/// let reader = lock.upgradable_read().await;
/// assert_eq!(*reader, 1);
/// assert_eq!(*lock.try_read().unwrap(), 1);
///
/// let mut writer = RwLockUpgradableReadGuard::upgrade(reader).await;
/// *writer = 2;
/// # })
/// ```
pub async fn upgradable_read(&self) -> RwLockUpgradableReadGuard<'_, T> {
// First grab the mutex.
let lock = self.mutex.lock().await;
let mut state = self.state.load(Ordering::Acquire);
// Make sure the number of readers doesn't overflow.
if state > std::isize::MAX as usize {
process::abort();
}
// Increment the number of readers.
loop {
match self.state.compare_exchange(
state,
state + ONE_READER,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
return RwLockUpgradableReadGuard {
reader: RwLockReadGuard(self),
reserved: lock,
}
}
Err(s) => state = s,
}
}
}
/// Attempts to acquire a write lock.
///
/// If a write lock could not be acquired at this time, then [`None`] is returned. Otherwise, a
/// guard is returned that releases the lock when dropped.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_rwlock::RwLock;
///
/// let lock = RwLock::new(1);
///
/// assert!(lock.try_write().is_some());
/// let reader = lock.read().await;
/// assert!(lock.try_write().is_none());
/// # })
/// ```
pub fn try_write(&self) -> Option<RwLockWriteGuard<'_, T>> {
// First try grabbing the mutex.
let lock = self.mutex.try_lock()?;
// If there are no readers, grab the write lock.
if self.state.compare_and_swap(0, WRITER_BIT, Ordering::AcqRel) == 0 {
Some(RwLockWriteGuard(self, lock))
} else {
None
}
}
/// Acquires a write lock.
///
/// Returns a guard that releases the lock when dropped.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_rwlock::RwLock;
///
/// let lock = RwLock::new(1);
///
/// let writer = lock.write().await;
/// assert!(lock.try_read().is_none());
/// # })
/// ```
pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
// First grab the mutex.
let lock = self.mutex.lock().await;
// Set `WRITER_BIT` and create a guard that unsets it in case this future is canceled.
self.state.fetch_or(WRITER_BIT, Ordering::SeqCst);
let guard = RwLockWriteGuard(self, lock);
// If there are readers, we need to wait for them to finish.
while self.state.load(Ordering::SeqCst) != WRITER_BIT {
// Start listening for "no readers" events.
let listener = self.no_readers.listen();
// Check again if there are readers.
if self.state.load(Ordering::Acquire) != WRITER_BIT {
// Wait for the readers to finish.
listener.await;
}
}
guard
}
/// Returns a mutable reference to the inner value.
///
/// Since this call borrows the lock mutably, no actual locking takes place. The mutable borrow
/// statically guarantees no locks exist.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_rwlock::RwLock;
///
/// let mut lock = RwLock::new(1);
///
/// *lock.get_mut() = 2;
/// assert_eq!(*lock.read().await, 2);
/// # })
/// ```
pub fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.value.get() }
}
}
impl<T: fmt::Debug + ?Sized> fmt::Debug for RwLock<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
struct Locked;
impl fmt::Debug for Locked {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<locked>")
}
}
match self.try_read() {
None => f.debug_struct("RwLock").field("value", &Locked).finish(),
Some(guard) => f.debug_struct("RwLock").field("value", &&*guard).finish(),
}
}
}
impl<T> From<T> for RwLock<T> {
fn from(val: T) -> RwLock<T> {
RwLock::new(val)
}
}
impl<T: Default + ?Sized> Default for RwLock<T> {
fn default() -> RwLock<T> {
RwLock::new(Default::default())
}
}
/// A guard that releases the read lock when dropped.
pub struct RwLockReadGuard<'a, T: ?Sized>(&'a RwLock<T>);
unsafe impl<T: Sync + ?Sized> Send for RwLockReadGuard<'_, T> {}
unsafe impl<T: Sync + ?Sized> Sync for RwLockReadGuard<'_, T> {}
impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
fn drop(&mut self) {
// Decrement the number of readers.
if self.0.state.fetch_sub(ONE_READER, Ordering::SeqCst) & !WRITER_BIT == ONE_READER {
// If this was the last reader, trigger the "no readers" event.
self.0.no_readers.notify(1);
}
}
}
impl<T: fmt::Debug + ?Sized> fmt::Debug for RwLockReadGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: fmt::Display + ?Sized> fmt::Display for RwLockReadGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
(**self).fmt(f)
}
}
impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.0.value.get() }
}
}
/// A guard that releases the upgradable read lock when dropped.
pub struct RwLockUpgradableReadGuard<'a, T: ?Sized> {
reader: RwLockReadGuard<'a, T>,
reserved: MutexGuard<'a, ()>,
}
unsafe impl<T: Send + Sync + ?Sized> Send for RwLockUpgradableReadGuard<'_, T> {}
unsafe impl<T: Sync + ?Sized> Sync for RwLockUpgradableReadGuard<'_, T> {}
impl<'a, T: ?Sized> RwLockUpgradableReadGuard<'a, T> {
/// Converts this guard into a write guard.
fn into_writer(self) -> RwLockWriteGuard<'a, T> {
let writer = RwLockWriteGuard(self.reader.0, self.reserved);
mem::forget(self.reader);
writer
}
/// Attempts to upgrade into a write lock.
///
/// If a write lock could not be acquired at this time, then [`None`] is returned. Otherwise,
/// an upgraded guard is returned that releases the write lock when dropped.
///
/// This function can only fail if there are other active read locks.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_rwlock::{RwLock, RwLockUpgradableReadGuard};
///
/// let lock = RwLock::new(1);
///
/// let reader = lock.upgradable_read().await;
/// assert_eq!(*reader, 1);
///
/// let reader2 = lock.read().await;
/// let reader = RwLockUpgradableReadGuard::try_upgrade(reader).unwrap_err();
///
/// drop(reader2);
/// let writer = RwLockUpgradableReadGuard::try_upgrade(reader).unwrap();
/// # })
/// ```
pub fn try_upgrade(guard: Self) -> Result<RwLockWriteGuard<'a, T>, Self> {
// If there are no readers, grab the write lock.
if guard
.reader
.0
.state
.compare_and_swap(ONE_READER, WRITER_BIT, Ordering::AcqRel)
== ONE_READER
{
Ok(guard.into_writer())
} else {
Err(guard)
}
}
/// Upgrades into a write lock.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_rwlock::{RwLock, RwLockUpgradableReadGuard};
///
/// let lock = RwLock::new(1);
///
/// let reader = lock.upgradable_read().await;
/// assert_eq!(*reader, 1);
///
/// let mut writer = RwLockUpgradableReadGuard::upgrade(reader).await;
/// *writer = 2;
/// # })
/// ```
pub async fn upgrade(guard: Self) -> RwLockWriteGuard<'a, T> {
// Set `WRITER_BIT` and decrement the number of readers at the same time.
guard
.reader
.0
.state
.fetch_sub(ONE_READER - WRITER_BIT, Ordering::SeqCst);
// Convert into a write guard that unsets `WRITER_BIT` in case this future is canceled.
let guard = guard.into_writer();
// If there are readers, we need to wait for them to finish.
while guard.0.state.load(Ordering::SeqCst) != WRITER_BIT {
// Start listening for "no readers" events.
let listener = guard.0.no_readers.listen();
// Check again if there are readers.
if guard.0.state.load(Ordering::Acquire) != WRITER_BIT {
// Wait for the readers to finish.
listener.await;
}
}
guard
}
}
impl<T: fmt::Debug + ?Sized> fmt::Debug for RwLockUpgradableReadGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: fmt::Display + ?Sized> fmt::Display for RwLockUpgradableReadGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
(**self).fmt(f)
}
}
impl<T: ?Sized> Deref for RwLockUpgradableReadGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.reader.0.value.get() }
}
}
/// A guard that releases the write lock when dropped.
pub struct RwLockWriteGuard<'a, T: ?Sized>(&'a RwLock<T>, MutexGuard<'a, ()>);
unsafe impl<T: Send + ?Sized> Send for RwLockWriteGuard<'_, T> {}
unsafe impl<T: Sync + ?Sized> Sync for RwLockWriteGuard<'_, T> {}
impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {
fn drop(&mut self) {
// Unset `WRITER_BIT`.
self.0.state.fetch_and(!WRITER_BIT, Ordering::SeqCst);
// Trigger the "no writer" event.
self.0.no_writer.notify(1);
}
}
impl<T: fmt::Debug + ?Sized> fmt::Debug for RwLockWriteGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: fmt::Display + ?Sized> fmt::Display for RwLockWriteGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
(**self).fmt(f)
}
}
impl<T: ?Sized> Deref for RwLockWriteGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.0.value.get() }
}
}
impl<T: ?Sized> DerefMut for RwLockWriteGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.0.value.get() }
}
}

153
async-rwlock/tests/std.rs Normal file
View File

@ -0,0 +1,153 @@
//! These tests were borrowed from `std::sync::RwLock`.
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use async_rwlock::RwLock;
use futures_lite::{future, FutureExt};
fn spawn<T: Send + 'static>(f: impl Future<Output = T> + Send + 'static) -> future::Boxed<T> {
let (s, r) = async_channel::bounded(1);
thread::spawn(move || {
future::block_on(async {
let _ = s.send(f.await).await;
})
});
async move { r.recv().await.unwrap() }.boxed()
}
#[test]
fn smoke() {
future::block_on(async {
let lock = RwLock::new(());
drop(lock.read().await);
drop(lock.write().await);
drop((lock.read().await, lock.read().await));
drop(lock.write().await);
});
}
#[test]
fn try_write() {
future::block_on(async {
let lock = RwLock::new(0isize);
let read_guard = lock.read().await;
assert!(lock.try_write().is_none());
drop(read_guard);
});
}
#[test]
fn into_inner() {
let lock = RwLock::new(10);
assert_eq!(lock.into_inner(), 10);
}
#[test]
fn into_inner_and_drop() {
struct Counter(Arc<AtomicUsize>);
impl Drop for Counter {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
let cnt = Arc::new(AtomicUsize::new(0));
let lock = RwLock::new(Counter(cnt.clone()));
assert_eq!(cnt.load(Ordering::SeqCst), 0);
{
let _inner = lock.into_inner();
assert_eq!(cnt.load(Ordering::SeqCst), 0);
}
assert_eq!(cnt.load(Ordering::SeqCst), 1);
}
#[test]
fn get_mut() {
let mut lock = RwLock::new(10);
*lock.get_mut() = 20;
assert_eq!(lock.into_inner(), 20);
}
#[test]
fn contention() {
const N: u32 = 10;
const M: usize = 1000;
let (tx, rx) = async_channel::unbounded();
let tx = Arc::new(tx);
let rw = Arc::new(RwLock::new(()));
// Spawn N tasks that randomly acquire the lock M times.
for _ in 0..N {
let tx = tx.clone();
let rw = rw.clone();
spawn(async move {
for _ in 0..M {
if fastrand::u32(..N) == 0 {
drop(rw.write().await);
} else {
drop(rw.read().await);
}
}
tx.send(()).await.unwrap();
});
}
future::block_on(async move {
for _ in 0..N {
rx.recv().await.unwrap();
}
});
}
#[test]
fn writer_and_readers() {
let lock = Arc::new(RwLock::new(0i32));
let (tx, rx) = async_channel::unbounded();
// Spawn a writer task.
spawn({
let lock = lock.clone();
async move {
let mut lock = lock.write().await;
for _ in 0..1000 {
let tmp = *lock;
*lock = -1;
future::yield_now().await;
*lock = tmp + 1;
}
tx.send(()).await.unwrap();
}
});
// Readers try to catch the writer in the act.
let mut readers = Vec::new();
for _ in 0..5 {
let lock = lock.clone();
readers.push(spawn(async move {
for _ in 0..1000 {
let lock = lock.read().await;
assert!(*lock >= 0);
}
}));
}
future::block_on(async move {
// Wait for readers to pass their asserts.
for r in readers {
r.await;
}
// Wait for writer to finish.
rx.recv().await.unwrap();
let lock = lock.read().await;
assert_eq!(*lock, 1000);
});
}

View File

@ -0,0 +1,93 @@
use std::sync::Arc;
use async_rwlock::{RwLock, RwLockUpgradableReadGuard};
use futures_lite::future;
#[test]
fn upgrade() {
future::block_on(async {
let lock: RwLock<i32> = RwLock::new(0);
let read_guard = lock.read().await;
let read_guard2 = lock.read().await;
// Should be able to obtain an upgradable lock.
let upgradable_guard = lock.upgradable_read().await;
// Should be able to obtain a read lock when an upgradable lock is active.
let read_guard3 = lock.read().await;
assert_eq!(0, *read_guard3);
drop(read_guard);
drop(read_guard2);
drop(read_guard3);
// Writers should not pass.
assert!(lock.try_write().is_none());
let mut write_guard = RwLockUpgradableReadGuard::try_upgrade(upgradable_guard).expect(
"should be able to upgrade an upgradable lock because there are no more readers",
);
*write_guard += 1;
drop(write_guard);
let read_guard = lock.read().await;
assert_eq!(1, *read_guard)
});
}
#[test]
fn not_upgrade() {
future::block_on(async {
let mutex: RwLock<i32> = RwLock::new(0);
let read_guard = mutex.read().await;
let read_guard2 = mutex.read().await;
// Should be able to obtain an upgradable lock.
let upgradable_guard = mutex.upgradable_read().await;
// Should be able to obtain a shared lock when an upgradable lock is active.
let read_guard3 = mutex.read().await;
assert_eq!(0, *read_guard3);
drop(read_guard);
drop(read_guard2);
drop(read_guard3);
// Drop the upgradable lock.
drop(upgradable_guard);
assert_eq!(0, *(mutex.read().await));
// Should be able to acquire a write lock because there are no more readers.
let mut write_guard = mutex.write().await;
*write_guard += 1;
drop(write_guard);
let read_guard = mutex.read().await;
assert_eq!(1, *read_guard)
});
}
#[test]
fn upgradable_with_concurrent_writer() {
future::block_on(async {
let lock: Arc<RwLock<i32>> = Arc::new(RwLock::new(0));
let lock2 = lock.clone();
let upgradable_guard = lock.upgradable_read().await;
future::or(
async move {
let mut write_guard = lock2.write().await;
*write_guard = 1;
},
async move {
let mut write_guard = RwLockUpgradableReadGuard::upgrade(upgradable_guard).await;
assert_eq!(*write_guard, 0);
*write_guard = 2;
},
)
.await;
assert_eq!(2, *(lock.write().await));
let read_guard = lock.read().await;
assert_eq!(2, *read_guard);
});
}

View File

@ -0,0 +1,19 @@
[package]
name = "async-semaphore"
version = "1.1.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
description = "An async semaphore"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/stjepang/async-lock"
homepage = "https://github.com/stjepang/async-lock"
documentation = "https://docs.rs/async-semaphore"
keywords = ["limit", "concurrent", "monitor", "synchronization", "sync"]
categories = ["asynchronous", "concurrency"]
readme = "../README.md"
[dependencies]
event-listener = "2.4.0"
[dev-dependencies]
futures-lite = "1.0.0"

196
async-semaphore/src/lib.rs Normal file
View File

@ -0,0 +1,196 @@
//! An async semaphore.
//!
//! A semaphore is a synchronization primitive that limits the number of concurrent operations.
#![forbid(unsafe_code)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use event_listener::Event;
/// A counter for limiting the number of concurrent operations.
#[derive(Debug)]
pub struct Semaphore {
count: AtomicUsize,
event: Event,
}
impl Semaphore {
/// Creates a new semaphore with a limit of `n` concurrent operations.
///
/// # Examples
///
/// ```
/// use async_semaphore::Semaphore;
///
/// let s = Semaphore::new(5);
/// ```
pub fn new(n: usize) -> Semaphore {
Semaphore {
count: AtomicUsize::new(n),
event: Event::new(),
}
}
/// Attempts to get a permit for a concurrent operation.
///
/// If the permit could not be acquired at this time, then [`None`] is returned. Otherwise, a
/// guard is returned that releases the mutex when dropped.
///
/// # Examples
///
/// ```
/// use async_semaphore::Semaphore;
///
/// let s = Semaphore::new(2);
///
/// let g1 = s.try_acquire().unwrap();
/// let g2 = s.try_acquire().unwrap();
///
/// assert!(s.try_acquire().is_none());
/// drop(g2);
/// assert!(s.try_acquire().is_some());
/// ```
pub fn try_acquire(&self) -> Option<SemaphoreGuard<'_>> {
let mut count = self.count.load(Ordering::Acquire);
loop {
if count == 0 {
return None;
}
match self.count.compare_exchange_weak(
count,
count - 1,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return Some(SemaphoreGuard(self)),
Err(c) => count = c,
}
}
}
/// Waits for a permit for a concurrent operation.
///
/// Returns a guard that releases the permit when dropped.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_semaphore::Semaphore;
///
/// let s = Semaphore::new(2);
/// let guard = s.acquire().await;
/// # });
/// ```
pub async fn acquire(&self) -> SemaphoreGuard<'_> {
let mut listener = None;
loop {
if let Some(guard) = self.try_acquire() {
return guard;
}
match listener.take() {
None => listener = Some(self.event.listen()),
Some(l) => l.await,
}
}
}
}
impl Semaphore {
/// Attempts to get an owned permit for a concurrent operation.
///
/// If the permit could not be acquired at this time, then [`None`] is returned. Otherwise, an
/// owned guard is returned that releases the mutex when dropped.
///
/// # Examples
///
/// ```
/// use async_semaphore::Semaphore;
/// use std::sync::Arc;
///
/// let s = Arc::new(Semaphore::new(2));
///
/// let g1 = s.try_acquire_arc().unwrap();
/// let g2 = s.try_acquire_arc().unwrap();
///
/// assert!(s.try_acquire_arc().is_none());
/// drop(g2);
/// assert!(s.try_acquire_arc().is_some());
/// ```
pub fn try_acquire_arc(self: &Arc<Self>) -> Option<SemaphoreGuardArc> {
let mut count = self.count.load(Ordering::Acquire);
loop {
if count == 0 {
return None;
}
match self.count.compare_exchange_weak(
count,
count - 1,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return Some(SemaphoreGuardArc(self.clone())),
Err(c) => count = c,
}
}
}
/// Waits for an owned permit for a concurrent operation.
///
/// Returns a guard that releases the permit when dropped.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_semaphore::Semaphore;
/// use std::sync::Arc;
///
/// let s = Arc::new(Semaphore::new(2));
/// let guard = s.acquire_arc().await;
/// # });
/// ```
pub async fn acquire_arc(self: &Arc<Self>) -> SemaphoreGuardArc {
let mut listener = None;
loop {
if let Some(guard) = self.try_acquire_arc() {
return guard;
}
match listener.take() {
None => listener = Some(self.event.listen()),
Some(l) => l.await,
}
}
}
}
/// A guard that releases the acquired permit.
#[derive(Debug)]
pub struct SemaphoreGuard<'a>(&'a Semaphore);
impl Drop for SemaphoreGuard<'_> {
fn drop(&mut self) {
self.0.count.fetch_add(1, Ordering::AcqRel);
self.0.event.notify(1);
}
}
/// An owned guard that releases the acquired permit.
#[derive(Debug)]
pub struct SemaphoreGuardArc(Arc<Semaphore>);
impl Drop for SemaphoreGuardArc {
fn drop(&mut self) {
self.0.count.fetch_add(1, Ordering::AcqRel);
self.0.event.notify(1);
}
}

View File

@ -0,0 +1,83 @@
use std::sync::{mpsc, Arc};
use std::thread;
use async_semaphore::Semaphore;
use futures_lite::future;
#[test]
fn try_acquire() {
let s = Semaphore::new(2);
let g1 = s.try_acquire().unwrap();
let _g2 = s.try_acquire().unwrap();
assert!(s.try_acquire().is_none());
drop(g1);
assert!(s.try_acquire().is_some());
}
#[test]
fn stress() {
let s = Arc::new(Semaphore::new(5));
let (tx, rx) = mpsc::channel::<()>();
for _ in 0..50 {
let s = s.clone();
let tx = tx.clone();
thread::spawn(move || {
future::block_on(async {
for _ in 0..10_000 {
s.acquire().await;
}
drop(tx);
})
});
}
drop(tx);
let _ = rx.recv();
let _g1 = s.try_acquire().unwrap();
let g2 = s.try_acquire().unwrap();
let _g3 = s.try_acquire().unwrap();
let _g4 = s.try_acquire().unwrap();
let _g5 = s.try_acquire().unwrap();
assert!(s.try_acquire().is_none());
drop(g2);
assert!(s.try_acquire().is_some());
}
#[test]
fn as_mutex() {
let s = Arc::new(Semaphore::new(1));
let s2 = s.clone();
let _t = thread::spawn(move || {
future::block_on(async {
let _g = s2.acquire().await;
});
});
future::block_on(async {
let _g = s.acquire().await;
});
}
#[test]
fn multi_resource() {
let s = Arc::new(Semaphore::new(2));
let s2 = s.clone();
let (tx1, rx1) = mpsc::channel();
let (tx2, rx2) = mpsc::channel();
let _t = thread::spawn(move || {
future::block_on(async {
let _g = s2.acquire().await;
let _ = rx2.recv();
tx1.send(()).unwrap();
});
});
future::block_on(async {
let _g = s.acquire().await;
tx2.send(()).unwrap();
rx1.recv().unwrap();
});
}