diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d5fb135..8e0bb39 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,8 +29,14 @@ jobs: if: startsWith(matrix.rust, 'nightly') run: cargo check -Z features=dev_dep - run: cargo test + - run: cargo test --features portable-atomic - name: Install cargo-hack uses: taiki-e/install-action@cargo-hack + - name: Run with Loom enabled + run: cargo test --test loom --features loom + env: + RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg loom + LOOM_MAX_PREEMPTIONS: 2 - run: rustup target add thumbv7m-none-eabi - run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps @@ -46,6 +52,7 @@ jobs: - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} - run: cargo build + - run: cargo build --features portable-atomic - name: Install cargo-hack uses: taiki-e/install-action@cargo-hack - run: rustup target add thumbv7m-none-eabi diff --git a/Cargo.toml b/Cargo.toml index 73abc7f..7715cb8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,12 @@ bench = false [dependencies] crossbeam-utils = { version = "0.8.11", default-features = false } +portable-atomic = { version = "0.3", default-features = false, optional = true } + +# Enables loom testing. This feature is permanently unstable and the API may +# change at any time. +[target.'cfg(loom)'.dependencies] +loom = { version = "0.5", optional = true } [[bench]] name = "bench" diff --git a/src/bounded.rs b/src/bounded.rs index 2ed2809..8e0b9f4 100644 --- a/src/bounded.rs +++ b/src/bounded.rs @@ -1,10 +1,12 @@ use alloc::{boxed::Box, vec::Vec}; -use core::cell::UnsafeCell; use core::mem::MaybeUninit; -use core::sync::atomic::{AtomicUsize, Ordering}; use crossbeam_utils::CachePadded; +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::cell::UnsafeCell; +#[allow(unused_imports)] +use crate::sync::prelude::*; use crate::{busy_wait, PopError, PushError}; /// A slot in a queue. @@ -118,9 +120,9 @@ impl Bounded { ) { Ok(_) => { // Write the value into the slot and update the stamp. - unsafe { - slot.value.get().write(MaybeUninit::new(value)); - } + slot.value.with_mut(|slot| unsafe { + slot.write(MaybeUninit::new(value)); + }); slot.stamp.store(tail + 1, Ordering::Release); return Ok(()); } @@ -138,6 +140,10 @@ impl Bounded { return Err(PushError::Full(value)); } + // Loom complains if there isn't an explicit busy wait here. + #[cfg(loom)] + busy_wait(); + tail = self.tail.load(Ordering::Relaxed); } else { // Yield because we need to wait for the stamp to get updated. @@ -181,7 +187,9 @@ impl Bounded { ) { Ok(_) => { // Read the value from the slot and update the stamp. - let value = unsafe { slot.value.get().read().assume_init() }; + let value = slot + .value + .with_mut(|slot| unsafe { slot.read().assume_init() }); slot.stamp .store(head.wrapping_add(self.one_lap), Ordering::Release); return Ok(value); @@ -204,6 +212,10 @@ impl Bounded { } } + // Loom complains if there isn't a busy-wait here. + #[cfg(loom)] + busy_wait(); + head = self.head.load(Ordering::Relaxed); } else { // Yield because we need to wait for the stamp to get updated. @@ -284,37 +296,48 @@ impl Bounded { impl Drop for Bounded { fn drop(&mut self) { // Get the index of the head. - let head = *self.head.get_mut(); - let tail = *self.tail.get_mut(); + let Self { + head, + tail, + buffer, + mark_bit, + .. + } = self; - let hix = head & (self.mark_bit - 1); - let tix = tail & (self.mark_bit - 1); + let mark_bit = *mark_bit; - let len = if hix < tix { - tix - hix - } else if hix > tix { - self.buffer.len() - hix + tix - } else if (tail & !self.mark_bit) == head { - 0 - } else { - self.buffer.len() - }; + head.with_mut(|&mut head| { + tail.with_mut(|&mut tail| { + let hix = head & (mark_bit - 1); + let tix = tail & (mark_bit - 1); - // Loop over all slots that hold a value and drop them. - for i in 0..len { - // Compute the index of the next slot holding a value. - let index = if hix + i < self.buffer.len() { - hix + i - } else { - hix + i - self.buffer.len() - }; + let len = if hix < tix { + tix - hix + } else if hix > tix { + buffer.len() - hix + tix + } else if (tail & !mark_bit) == head { + 0 + } else { + buffer.len() + }; - // Drop the value in the slot. - let slot = &self.buffer[index]; - unsafe { - let value = &mut *slot.value.get(); - value.as_mut_ptr().drop_in_place(); - } - } + // Loop over all slots that hold a value and drop them. + for i in 0..len { + // Compute the index of the next slot holding a value. + let index = if hix + i < buffer.len() { + hix + i + } else { + hix + i - buffer.len() + }; + + // Drop the value in the slot. + let slot = &buffer[index]; + slot.value.with_mut(|slot| unsafe { + let value = &mut *slot; + value.as_mut_ptr().drop_in_place(); + }); + } + }); + }); } } diff --git a/src/lib.rs b/src/lib.rs index 6531abe..5f64fde 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,14 +26,23 @@ //! //! # Features //! -//! `concurrent-queue` used an `std` default feature. With this feature enabled, this crate will +//! `concurrent-queue` uses an `std` default feature. With this feature enabled, this crate will //! use [`std::thread::yield_now`] to avoid busy waiting in tight loops. However, with this //! feature disabled, [`core::hint::spin_loop`] will be used instead. Disabling `std` will allow //! this crate to be used on `no_std` platforms at the potential expense of more busy waiting. //! +//! There is also a `portable-atomic` feature, which uses a polyfill from the +//! [`portable-atomic`] crate to provide atomic operations on platforms that do not support them. +//! See the [`README`] for the [`portable-atomic`] crate for more information on how to use it on +//! single-threaded targets. Note that even with this feature enabled, `concurrent-queue` still +//! requires a global allocator to be available. See the documentation for the +//! [`std::alloc::GlobalAlloc`] trait for more information. +//! //! [Bounded]: `ConcurrentQueue::bounded()` //! [Unbounded]: `ConcurrentQueue::unbounded()` //! [closed]: `ConcurrentQueue::close()` +//! [`portable-atomic`]: https://crates.io/crates/portable-atomic +//! [`README`]: https://github.com/taiki-e/portable-atomic/blob/main/README.md#optional-cfg #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![no_std] @@ -44,7 +53,7 @@ extern crate std; use alloc::boxed::Box; use core::fmt; -use core::sync::atomic::{self, AtomicUsize, Ordering}; +use sync::atomic::{self, AtomicUsize, Ordering}; #[cfg(feature = "std")] use std::error; @@ -53,12 +62,15 @@ use std::panic::{RefUnwindSafe, UnwindSafe}; use crate::bounded::Bounded; use crate::single::Single; +use crate::sync::busy_wait; use crate::unbounded::Unbounded; mod bounded; mod single; mod unbounded; +mod sync; + /// A concurrent queue. /// /// # Examples @@ -463,24 +475,13 @@ impl fmt::Display for PushError { } } -/// Notify the CPU that we are currently busy-waiting. -#[inline] -fn busy_wait() { - #[cfg(feature = "std")] - std::thread::yield_now(); - // Use the deprecated `spin_loop_hint` here in order to - // avoid bumping the MSRV. - #[allow(deprecated)] - #[cfg(not(feature = "std"))] - core::sync::atomic::spin_loop_hint() -} - /// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster. #[inline] fn full_fence() { if cfg!(all( any(target_arch = "x86", target_arch = "x86_64"), - not(miri) + not(miri), + not(loom) )) { // HACK(stjepang): On x86 architectures there are two different ways of executing // a `SeqCst` fence. diff --git a/src/single.rs b/src/single.rs index 06ab766..c7748b3 100644 --- a/src/single.rs +++ b/src/single.rs @@ -1,7 +1,9 @@ -use core::cell::UnsafeCell; use core::mem::MaybeUninit; -use core::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::cell::UnsafeCell; +#[allow(unused_imports)] +use crate::sync::prelude::*; use crate::{busy_wait, PopError, PushError}; const LOCKED: usize = 1 << 0; @@ -33,7 +35,9 @@ impl Single { if state == 0 { // Write the value and unlock. - unsafe { self.slot.get().write(MaybeUninit::new(value)) } + self.slot.with_mut(|slot| unsafe { + slot.write(MaybeUninit::new(value)); + }); self.state.fetch_and(!LOCKED, Ordering::Release); Ok(()) } else if state & CLOSED != 0 { @@ -60,7 +64,9 @@ impl Single { if prev == state { // Read the value and unlock. - let value = unsafe { self.slot.get().read().assume_init() }; + let value = self + .slot + .with_mut(|slot| unsafe { slot.read().assume_init() }); self.state.fetch_and(!LOCKED, Ordering::Release); return Ok(value); } @@ -118,11 +124,14 @@ impl Single { impl Drop for Single { fn drop(&mut self) { // Drop the value in the slot. - if *self.state.get_mut() & PUSHED != 0 { - unsafe { - let value = &mut *self.slot.get(); - value.as_mut_ptr().drop_in_place(); + let Self { state, slot } = self; + state.with_mut(|state| { + if *state & PUSHED != 0 { + slot.with_mut(|slot| unsafe { + let value = &mut *slot; + value.as_mut_ptr().drop_in_place(); + }); } - } + }); } } diff --git a/src/sync.rs b/src/sync.rs new file mode 100644 index 0000000..ca283bb --- /dev/null +++ b/src/sync.rs @@ -0,0 +1,123 @@ +//! Synchronization facade to choose between `core` primitives and `loom` primitives. + +#[cfg(all(feature = "portable-atomic", not(loom)))] +mod sync_impl { + pub(crate) use core::cell; + pub(crate) use portable_atomic as atomic; + + #[cfg(not(feature = "std"))] + pub(crate) use atomic::hint::spin_loop; + + #[cfg(feature = "std")] + pub(crate) use std::thread::yield_now; +} + +#[cfg(all(not(feature = "portable-atomic"), not(loom)))] +mod sync_impl { + pub(crate) use core::cell; + pub(crate) use core::sync::atomic; + + #[cfg(not(feature = "std"))] + #[inline] + pub(crate) fn spin_loop() { + #[allow(deprecated)] + atomic::spin_loop_hint(); + } + + #[cfg(feature = "std")] + pub(crate) use std::thread::yield_now; +} + +#[cfg(loom)] +mod sync_impl { + pub(crate) use loom::cell; + + pub(crate) mod atomic { + pub(crate) use core::sync::atomic::compiler_fence; + pub(crate) use loom::sync::atomic::*; + } + + pub(crate) use loom::thread::yield_now; +} + +pub(crate) use sync_impl::*; + +/// Notify the CPU that we are currently busy-waiting. +#[inline] +pub(crate) fn busy_wait() { + #[cfg(feature = "std")] + yield_now(); + + #[cfg(not(feature = "std"))] + spin_loop(); +} + +#[cfg(loom)] +pub(crate) mod prelude {} + +#[cfg(not(loom))] +pub(crate) mod prelude { + use super::{atomic, cell}; + + /// Emulate `loom::UnsafeCell`'s API. + pub(crate) trait UnsafeCellExt { + type Value; + + fn with(&self, f: F) -> R + where + F: FnOnce(*const Self::Value) -> R; + + fn with_mut(&self, f: F) -> R + where + F: FnOnce(*mut Self::Value) -> R; + } + + impl UnsafeCellExt for cell::UnsafeCell { + type Value = T; + + fn with(&self, f: F) -> R + where + F: FnOnce(*const Self::Value) -> R, + { + f(self.get()) + } + + fn with_mut(&self, f: F) -> R + where + F: FnOnce(*mut Self::Value) -> R, + { + f(self.get()) + } + } + + /// Emulate `loom::Atomic*`'s API. + pub(crate) trait AtomicExt { + type Value; + + fn with_mut(&mut self, f: F) -> R + where + F: FnOnce(&mut Self::Value) -> R; + } + + impl AtomicExt for atomic::AtomicUsize { + type Value = usize; + + fn with_mut(&mut self, f: F) -> R + where + F: FnOnce(&mut Self::Value) -> R, + { + f(self.get_mut()) + } + } + + impl AtomicExt for atomic::AtomicPtr { + type Value = *mut T; + + fn with_mut(&mut self, f: F) -> R + where + F: FnOnce(&mut Self::Value) -> R, + { + f(self.get_mut()) + } + } +} diff --git a/src/unbounded.rs b/src/unbounded.rs index 1815f5d..a84f856 100644 --- a/src/unbounded.rs +++ b/src/unbounded.rs @@ -1,11 +1,13 @@ use alloc::boxed::Box; -use core::cell::UnsafeCell; use core::mem::MaybeUninit; use core::ptr; -use core::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; use crossbeam_utils::CachePadded; +use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use crate::sync::cell::UnsafeCell; +#[allow(unused_imports)] +use crate::sync::prelude::*; use crate::{busy_wait, PopError, PushError}; // Bits indicating the state of a slot: @@ -37,11 +39,36 @@ struct Slot { } impl Slot { + #[cfg(not(loom))] const UNINIT: Slot = Slot { value: UnsafeCell::new(MaybeUninit::uninit()), state: AtomicUsize::new(0), }; + #[cfg(not(loom))] + fn uninit_block() -> [Slot; BLOCK_CAP] { + [Self::UNINIT; BLOCK_CAP] + } + + #[cfg(loom)] + fn uninit_block() -> [Slot; BLOCK_CAP] { + // Repeat this expression 31 times. + // Update if we change BLOCK_CAP + macro_rules! repeat_31 { + ($e: expr) => { + [ + $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, + $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, + ] + }; + } + + repeat_31!(Slot { + value: UnsafeCell::new(MaybeUninit::uninit()), + state: AtomicUsize::new(0), + }) + } + /// Waits until a value is written into the slot. fn wait_write(&self) { while self.state.load(Ordering::Acquire) & WRITE == 0 { @@ -66,7 +93,7 @@ impl Block { fn new() -> Block { Block { next: AtomicPtr::new(ptr::null_mut()), - slots: [Slot::UNINIT; BLOCK_CAP], + slots: Slot::uninit_block(), } } @@ -205,7 +232,9 @@ impl Unbounded { // Write the value into the slot. let slot = (*block).slots.get_unchecked(offset); - slot.value.get().write(MaybeUninit::new(value)); + slot.value.with_mut(|slot| { + slot.write(MaybeUninit::new(value)); + }); slot.state.fetch_or(WRITE, Ordering::Release); return Ok(()); }, @@ -287,7 +316,7 @@ impl Unbounded { // Read the value. let slot = (*block).slots.get_unchecked(offset); slot.wait_write(); - let value = slot.value.get().read().assume_init(); + let value = slot.value.with_mut(|slot| slot.read().assume_init()); // Destroy the block if we've reached the end, or if another thread wanted to // destroy but couldn't because we were busy reading from the slot. @@ -371,38 +400,49 @@ impl Unbounded { impl Drop for Unbounded { fn drop(&mut self) { - let mut head = *self.head.index.get_mut(); - let mut tail = *self.tail.index.get_mut(); - let mut block = *self.head.block.get_mut(); + let Self { head, tail } = self; + let Position { index: head, block } = &mut **head; - // Erase the lower bits. - head &= !((1 << SHIFT) - 1); - tail &= !((1 << SHIFT) - 1); + head.with_mut(|&mut mut head| { + tail.index.with_mut(|&mut mut tail| { + // Erase the lower bits. + head &= !((1 << SHIFT) - 1); + tail &= !((1 << SHIFT) - 1); - unsafe { - // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. - while head != tail { - let offset = (head >> SHIFT) % LAP; + unsafe { + // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. + while head != tail { + let offset = (head >> SHIFT) % LAP; - if offset < BLOCK_CAP { - // Drop the value in the slot. - let slot = (*block).slots.get_unchecked(offset); - let value = &mut *slot.value.get(); - value.as_mut_ptr().drop_in_place(); - } else { - // Deallocate the block and move to the next one. - let next = *(*block).next.get_mut(); - drop(Box::from_raw(block)); - block = next; + if offset < BLOCK_CAP { + // Drop the value in the slot. + block.with_mut(|block| { + let slot = (**block).slots.get_unchecked(offset); + slot.value.with_mut(|slot| { + let value = &mut *slot; + value.as_mut_ptr().drop_in_place(); + }); + }); + } else { + // Deallocate the block and move to the next one. + block.with_mut(|block| { + let next_block = (**block).next.with_mut(|next| *next); + drop(Box::from_raw(*block)); + *block = next_block; + }); + } + + head = head.wrapping_add(1 << SHIFT); + } + + // Deallocate the last remaining block. + block.with_mut(|block| { + if !block.is_null() { + drop(Box::from_raw(*block)); + } + }); } - - head = head.wrapping_add(1 << SHIFT); - } - - // Deallocate the last remaining block. - if !block.is_null() { - drop(Box::from_raw(block)); - } - } + }); + }); } } diff --git a/tests/loom.rs b/tests/loom.rs new file mode 100644 index 0000000..e05bd4a --- /dev/null +++ b/tests/loom.rs @@ -0,0 +1,247 @@ +#![cfg(loom)] + +use concurrent_queue::{ConcurrentQueue, PopError, PushError}; +use loom::sync::atomic::{AtomicUsize, Ordering}; +use loom::sync::{Arc, Condvar, Mutex}; +use loom::thread; + +/// A basic MPMC channel based on a ConcurrentQueue and loom primitives. +struct Channel { + /// The queue used to contain items. + queue: ConcurrentQueue, + + /// The number of senders. + senders: AtomicUsize, + + /// The number of receivers. + receivers: AtomicUsize, + + /// The event that is signaled when a new item is pushed. + push_event: Event, + + /// The event that is signaled when a new item is popped. + pop_event: Event, +} + +/// The sending side of a channel. +struct Sender { + /// The channel. + channel: Arc>, +} + +/// The receiving side of a channel. +struct Receiver { + /// The channel. + channel: Arc>, +} + +/// Create a new pair of senders/receivers based on a queue. +fn pair(queue: ConcurrentQueue) -> (Sender, Receiver) { + let channel = Arc::new(Channel { + queue, + senders: AtomicUsize::new(1), + receivers: AtomicUsize::new(1), + push_event: Event::new(), + pop_event: Event::new(), + }); + + ( + Sender { + channel: channel.clone(), + }, + Receiver { channel }, + ) +} + +impl Clone for Sender { + fn clone(&self) -> Self { + self.channel.senders.fetch_add(1, Ordering::SeqCst); + Sender { + channel: self.channel.clone(), + } + } +} + +impl Drop for Sender { + fn drop(&mut self) { + if self.channel.senders.fetch_sub(1, Ordering::SeqCst) == 1 { + // Close the channel and notify the receivers. + self.channel.queue.close(); + self.channel.push_event.signal_all(); + } + } +} + +impl Clone for Receiver { + fn clone(&self) -> Self { + self.channel.receivers.fetch_add(1, Ordering::SeqCst); + Receiver { + channel: self.channel.clone(), + } + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + if self.channel.receivers.fetch_sub(1, Ordering::SeqCst) == 1 { + // Close the channel and notify the senders. + self.channel.queue.close(); + self.channel.pop_event.signal_all(); + } + } +} + +impl Sender { + /// Send a value. + /// + /// Returns an error with the value if the channel is closed. + fn send(&self, mut value: T) -> Result<(), T> { + loop { + match self.channel.queue.push(value) { + Ok(()) => { + // Notify a single receiver. + self.channel.push_event.signal(); + return Ok(()); + } + Err(PushError::Closed(val)) => return Err(val), + Err(PushError::Full(val)) => { + // Wait for a receiver to pop an item. + value = val; + self.channel.pop_event.wait(); + } + } + } + } +} + +impl Receiver { + /// Receive a value. + /// + /// Returns an error if the channel is closed. + fn recv(&self) -> Result { + loop { + match self.channel.queue.pop() { + Ok(value) => { + // Notify a single sender. + self.channel.pop_event.signal(); + return Ok(value); + } + Err(PopError::Closed) => return Err(()), + Err(PopError::Empty) => { + // Wait for a sender to push an item. + self.channel.push_event.wait(); + } + } + } + } +} + +/// An event that can be waited on and then signaled. +struct Event { + /// The condition variable used to wait on the event. + condvar: Condvar, + + /// The mutex used to protect the event. + /// + /// Inside is the event's state. The first bit is used to indicate if the + /// notify_one method was called. The second bit is used to indicate if the + /// notify_all method was called. + mutex: Mutex, +} + +impl Event { + /// Create a new event. + fn new() -> Self { + Self { + condvar: Condvar::new(), + mutex: Mutex::new(0), + } + } + + /// Wait for the event to be signaled. + fn wait(&self) { + let mut state = self.mutex.lock().unwrap(); + + loop { + if *state & 0b11 != 0 { + // The event was signaled. + *state &= !0b01; + return; + } + + // Wait for the event to be signaled. + state = self.condvar.wait(state).unwrap(); + } + } + + /// Signal the event. + fn signal(&self) { + let mut state = self.mutex.lock().unwrap(); + *state |= 1; + drop(state); + + self.condvar.notify_one(); + } + + /// Signal the event, but notify all waiters. + fn signal_all(&self) { + let mut state = self.mutex.lock().unwrap(); + *state |= 3; + drop(state); + + self.condvar.notify_all(); + } +} + +/// Wrapper to run tests on all three queues. +fn run_test, usize) + Send + Sync + Clone + 'static>(f: F) { + // The length of a loom test seems to increase exponentially the higher this number is. + const LIMIT: usize = 4; + + let fc = f.clone(); + loom::model(move || { + fc(ConcurrentQueue::bounded(1), LIMIT); + }); + + let fc = f.clone(); + loom::model(move || { + fc(ConcurrentQueue::bounded(LIMIT / 2), LIMIT); + }); + + loom::model(move || { + f(ConcurrentQueue::unbounded(), LIMIT); + }); +} + +#[test] +fn spsc() { + run_test(|q, limit| { + // Create a new pair of senders/receivers. + let (tx, rx) = pair(q); + + // Push each onto a thread and run them. + let handle = thread::spawn(move || { + for i in 0..limit { + if tx.send(i).is_err() { + break; + } + } + }); + + let mut recv_values = vec![]; + + loop { + match rx.recv() { + Ok(value) => recv_values.push(value), + Err(()) => break, + } + } + + // Values may not be in order. + recv_values.sort_unstable(); + assert_eq!(recv_values, (0..limit).collect::>()); + + // Join the handle before we exit. + handle.join().unwrap(); + }); +}