Compare commits
45 Commits
Author | SHA1 | Date |
---|---|---|
John Nunley | e874f701f8 | |
John Nunley | 5b74dc8acc | |
John Nunley | 05e7bff8e5 | |
John Nunley | 576965a8a5 | |
James Liu | 89a64f8c3b | |
John Nunley | 59e93fc952 | |
John Nunley | c407467c20 | |
John Nunley | ff53a68d8c | |
Taiki Endo | d49453323c | |
Taiki Endo | 93ee058b7f | |
Taiki Endo | e7c2115f62 | |
Taiki Endo | 9f6f5d3188 | |
dependabot[bot] | 3b08720796 | |
John Nunley | 71302ab09c | |
John Nunley | 79b9292d10 | |
John Nunley | ba51f6e942 | |
Irine | c0e1098aa0 | |
John Nunley | 22b5e83c4f | |
Taiki Endo | 381d6360e1 | |
MrEconomical | 04f3a1eecc | |
Taiki Endo | cbdf9e88e1 | |
dependabot[bot] | 2d309371f8 | |
John Nunley | b660d795a4 | |
dependabot[bot] | c1d2c77b1c | |
dependabot[bot] | 2cd3dbad14 | |
John Nunley | e3fef3f0ae | |
Taiki Endo | 07c3e4d5b9 | |
Taiki Endo | d66e007b3a | |
John Nunley | f877490dcb | |
Taiki Endo | a96abb3467 | |
Taiki Endo | db25fe1573 | |
Taiki Endo | b5463b2f5e | |
dependabot[bot] | 54e7d94998 | |
Taiki Endo | 3383b2125a | |
Taiki Endo | 1ff6001c68 | |
Taiki Endo | 6be67b375d | |
Taiki Endo | d14af85906 | |
John Nunley | 3d653aac40 | |
jtnunley | 34334a15dd | |
John Nunley | 8c42b8dd9d | |
John Nunley | 06c99537c2 | |
John Nunley | 9553e6fa92 | |
John Nunley | d3bf5a5424 | |
Taiki Endo | 60354f9ea4 | |
Greg Morenz | f8eca83b5f |
|
@ -1 +0,0 @@
|
|||
msrv = "1.38"
|
|
@ -0,0 +1,9 @@
|
|||
version: 2
|
||||
updates:
|
||||
- package-ecosystem: cargo
|
||||
directory: /
|
||||
schedule:
|
||||
interval: weekly
|
||||
commit-message:
|
||||
prefix: ''
|
||||
labels: []
|
|
@ -1,16 +1,29 @@
|
|||
name: CI
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
schedule:
|
||||
- cron: '0 2 * * *'
|
||||
- cron: '0 2 * * 0'
|
||||
|
||||
env:
|
||||
RUSTFLAGS: -D warnings
|
||||
CARGO_INCREMENTAL: 0
|
||||
CARGO_NET_GIT_FETCH_WITH_CLI: true
|
||||
CARGO_NET_RETRY: 10
|
||||
CARGO_TERM_COLOR: always
|
||||
RUST_BACKTRACE: 1
|
||||
RUSTFLAGS: -D warnings
|
||||
RUSTDOCFLAGS: -D warnings
|
||||
RUSTUP_MAX_RETRIES: 10
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
|
||||
jobs:
|
||||
test:
|
||||
|
@ -21,32 +34,45 @@ jobs:
|
|||
os: [ubuntu-latest]
|
||||
rust: [nightly, beta, stable]
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install Rust
|
||||
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
|
||||
- run: rustup target add thumbv7m-none-eabi
|
||||
- run: rustup target add wasm32-unknown-unknown
|
||||
- name: Install cargo-hack and wasm-pack
|
||||
uses: taiki-e/install-action@v2
|
||||
with:
|
||||
tool: cargo-hack,wasm-pack
|
||||
- run: cargo build --all --all-features --all-targets
|
||||
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
|
||||
if: startsWith(matrix.rust, 'nightly')
|
||||
run: cargo check -Z features=dev_dep
|
||||
- run: cargo hack build --feature-powerset --no-dev-deps
|
||||
- run: cargo hack build --feature-powerset --no-dev-deps --target thumbv7m-none-eabi --skip std,default
|
||||
- run: cargo test
|
||||
- run: cargo test --features portable-atomic
|
||||
- name: Run with Loom enabled
|
||||
run: cargo test --test loom --features loom
|
||||
env:
|
||||
RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg loom
|
||||
LOOM_MAX_PREEMPTIONS: 2
|
||||
- name: Check WASM tests
|
||||
run: cargo build --target wasm32-unknown-unknown
|
||||
- run: wasm-pack test --node
|
||||
- run: wasm-pack test --node --no-default-features
|
||||
- run: wasm-pack test --node --no-default-features --features portable-atomic
|
||||
|
||||
msrv:
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
# When updating this, the reminder to update the minimum supported
|
||||
# Rust version in Cargo.toml and .clippy.toml.
|
||||
rust: ['1.38']
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Install Rust
|
||||
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
|
||||
- run: cargo build
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install cargo-hack
|
||||
uses: taiki-e/install-action@cargo-hack
|
||||
- run: cargo hack build --rust-version
|
||||
- run: cargo hack build --features portable-atomic --rust-version
|
||||
- run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps --rust-version
|
||||
|
||||
clippy:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install Rust
|
||||
run: rustup update stable
|
||||
- run: cargo clippy --all-features --all-targets
|
||||
|
@ -54,7 +80,7 @@ jobs:
|
|||
fmt:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install Rust
|
||||
run: rustup update stable
|
||||
- run: cargo fmt --all --check
|
||||
|
@ -62,7 +88,7 @@ jobs:
|
|||
miri:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install Rust
|
||||
run: rustup toolchain install nightly --component miri && rustup default nightly
|
||||
- run: cargo miri test
|
||||
|
@ -70,10 +96,32 @@ jobs:
|
|||
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation
|
||||
RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout
|
||||
|
||||
security_audit:
|
||||
loom:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions-rs/audit-check@v1
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install Rust
|
||||
run: rustup update stable
|
||||
- name: Loom tests
|
||||
run: cargo test --release --test loom --features loom
|
||||
env:
|
||||
RUSTFLAGS: "--cfg=loom"
|
||||
LOOM_MAX_PREEMPTIONS: 4
|
||||
- name: Loom tests without default features
|
||||
run: cargo test --release --test loom --features loom --no-default-features
|
||||
env:
|
||||
RUSTFLAGS: "--cfg=loom"
|
||||
LOOM_MAX_PREEMPTIONS: 4
|
||||
|
||||
security_audit:
|
||||
permissions:
|
||||
checks: write
|
||||
contents: read
|
||||
issues: write
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
# https://github.com/rustsec/audit-check/issues/2
|
||||
- uses: rustsec/audit-check@master
|
||||
with:
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
name: Release
|
||||
|
||||
permissions:
|
||||
contents: write
|
||||
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
|
@ -10,7 +13,7 @@ jobs:
|
|||
if: github.repository_owner == 'smol-rs'
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- uses: taiki-e/create-gh-release-action@v1
|
||||
with:
|
||||
changelog: CHANGELOG.md
|
||||
|
|
30
CHANGELOG.md
30
CHANGELOG.md
|
@ -1,3 +1,33 @@
|
|||
# Version 2.5.0
|
||||
|
||||
- Add a `force_push` method that can be used to add an element to the queue by displacing another. (#58)
|
||||
- Make `ConcurrentQueue::unbounded()` into a `const` function. (#67)
|
||||
- Fix a compilation error in the Loom implementation. (#65)
|
||||
|
||||
# Version 2.4.0
|
||||
|
||||
- Remove unnecessary heap allocations from inside of the `ConcurrentQueue` type. (#53)
|
||||
|
||||
# Version 2.3.0
|
||||
|
||||
- Implement `UnwindSafe` without libstd. (#49)
|
||||
- Bump `fastrand` to `v2.0.0`. (#43)
|
||||
- Use inline assembly in the `full_fence` funtion. (#47)
|
||||
|
||||
# Version 2.2.0
|
||||
|
||||
- Add the try_iter method. (#36)
|
||||
|
||||
# Version 2.1.0
|
||||
|
||||
- Update `portable-atomic` to 1.0. (#33)
|
||||
|
||||
# Version 2.0.0
|
||||
|
||||
- Add support for the `portable-atomic` and `loom` crates. (#27)
|
||||
- **Breaking:** Add an `std` feature that can be disabled to use this crate on `no_std` platforms. (#22)
|
||||
- Replace usage of `cache-padded` with `crossbeam-utils`. (#26)
|
||||
|
||||
# Version 1.2.4
|
||||
|
||||
- Fix fence on x86 and miri. (#18)
|
||||
|
|
39
Cargo.toml
39
Cargo.toml
|
@ -2,11 +2,15 @@
|
|||
name = "concurrent-queue"
|
||||
# When publishing a new version:
|
||||
# - Update CHANGELOG.md
|
||||
# - Create "v1.x.y" git tag
|
||||
version = "1.2.4"
|
||||
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
|
||||
edition = "2018"
|
||||
rust-version = "1.38"
|
||||
# - Create "v2.x.y" git tag
|
||||
version = "2.5.0"
|
||||
authors = [
|
||||
"Stjepan Glavina <stjepang@gmail.com>",
|
||||
"Taiki Endo <te316e89@gmail.com>",
|
||||
"John Nunley <dev@notgull.net>"
|
||||
]
|
||||
edition = "2021"
|
||||
rust-version = "1.60"
|
||||
description = "Concurrent multi-producer multi-consumer queue"
|
||||
license = "Apache-2.0 OR MIT"
|
||||
repository = "https://github.com/smol-rs/concurrent-queue"
|
||||
|
@ -14,9 +18,30 @@ keywords = ["channel", "mpmc", "spsc", "spmc", "mpsc"]
|
|||
categories = ["concurrency"]
|
||||
exclude = ["/.*"]
|
||||
|
||||
[lib]
|
||||
bench = false
|
||||
|
||||
[dependencies]
|
||||
cache-padded = "1.1.1"
|
||||
crossbeam-utils = { version = "0.8.11", default-features = false }
|
||||
portable-atomic = { version = "1", default-features = false, optional = true }
|
||||
|
||||
# Enables loom testing. This feature is permanently unstable and the API may
|
||||
# change at any time.
|
||||
[target.'cfg(loom)'.dependencies]
|
||||
loom = { version = "0.7", optional = true }
|
||||
|
||||
[[bench]]
|
||||
name = "bench"
|
||||
harness = false
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = { version = "0.5", features = ["cargo_bench_support"], default-features = false }
|
||||
easy-parallel = "3.1.0"
|
||||
fastrand = "1.3.3"
|
||||
fastrand = "2.0.0"
|
||||
|
||||
[target.'cfg(target_family = "wasm")'.dev-dependencies]
|
||||
wasm-bindgen-test = "0.3"
|
||||
|
||||
[features]
|
||||
default = ["std"]
|
||||
std = []
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
use std::{any::type_name, fmt::Debug};
|
||||
|
||||
use concurrent_queue::{ConcurrentQueue, PopError};
|
||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||
use easy_parallel::Parallel;
|
||||
|
||||
const COUNT: usize = 100_000;
|
||||
const THREADS: usize = 7;
|
||||
|
||||
fn spsc<T: Default + std::fmt::Debug + Send>(recv: &ConcurrentQueue<T>, send: &ConcurrentQueue<T>) {
|
||||
Parallel::new()
|
||||
.add(|| loop {
|
||||
match recv.pop() {
|
||||
Ok(_) => (),
|
||||
Err(PopError::Empty) => (),
|
||||
Err(PopError::Closed) => break,
|
||||
}
|
||||
})
|
||||
.add(|| {
|
||||
for _ in 0..COUNT {
|
||||
send.push(T::default()).unwrap();
|
||||
}
|
||||
send.close();
|
||||
})
|
||||
.run();
|
||||
}
|
||||
|
||||
fn mpsc<T: Default + std::fmt::Debug + Send>(recv: &ConcurrentQueue<T>, send: &ConcurrentQueue<T>) {
|
||||
Parallel::new()
|
||||
.each(0..THREADS, |_| {
|
||||
for _ in 0..COUNT {
|
||||
send.push(T::default()).unwrap();
|
||||
}
|
||||
})
|
||||
.add(|| {
|
||||
let mut recieved = 0;
|
||||
while recieved < THREADS * COUNT {
|
||||
match recv.pop() {
|
||||
Ok(_) => recieved += 1,
|
||||
Err(PopError::Empty) => (),
|
||||
Err(PopError::Closed) => unreachable!(),
|
||||
}
|
||||
}
|
||||
})
|
||||
.run();
|
||||
}
|
||||
|
||||
fn single_thread<T: Default + std::fmt::Debug>(
|
||||
recv: &ConcurrentQueue<T>,
|
||||
send: &ConcurrentQueue<T>,
|
||||
) {
|
||||
for _ in 0..COUNT {
|
||||
send.push(T::default()).unwrap();
|
||||
}
|
||||
for _ in 0..COUNT {
|
||||
recv.pop().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Because we can't pass generic functions as const parameters.
|
||||
macro_rules! bench_all(
|
||||
($name:ident, $f:ident) => {
|
||||
fn $name(c: &mut Criterion) {
|
||||
fn helper<T: Default + Debug + Send>(c: &mut Criterion) {
|
||||
let name = format!("unbounded_{}_{}", stringify!($f), type_name::<T>());
|
||||
|
||||
c.bench_function(&name, |b| b.iter(|| {
|
||||
let q = ConcurrentQueue::unbounded();
|
||||
$f::<T>(black_box(&q), black_box(&q));
|
||||
}));
|
||||
|
||||
let name = format!("bounded_{}_{}", stringify!($f), type_name::<T>());
|
||||
|
||||
c.bench_function(&name, |b| b.iter(|| {
|
||||
let q = ConcurrentQueue::bounded(THREADS * COUNT);
|
||||
$f::<T>(black_box(&q), black_box(&q));
|
||||
}));
|
||||
}
|
||||
helper::<u8>(c);
|
||||
helper::<u16>(c);
|
||||
helper::<u32>(c);
|
||||
helper::<u64>(c);
|
||||
helper::<u128>(c);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
bench_all!(bench_spsc, spsc);
|
||||
bench_all!(bench_mpsc, mpsc);
|
||||
bench_all!(bench_single_thread, single_thread);
|
||||
|
||||
criterion_group!(generic_group, bench_single_thread, bench_spsc, bench_mpsc);
|
||||
criterion_main!(generic_group);
|
200
src/bounded.rs
200
src/bounded.rs
|
@ -1,11 +1,13 @@
|
|||
use std::cell::UnsafeCell;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::thread;
|
||||
use alloc::{boxed::Box, vec::Vec};
|
||||
use core::mem::MaybeUninit;
|
||||
|
||||
use cache_padded::CachePadded;
|
||||
use crossbeam_utils::CachePadded;
|
||||
|
||||
use crate::{PopError, PushError};
|
||||
use crate::sync::atomic::{AtomicUsize, Ordering};
|
||||
use crate::sync::cell::UnsafeCell;
|
||||
#[allow(unused_imports)]
|
||||
use crate::sync::prelude::*;
|
||||
use crate::{busy_wait, ForcePushError, PopError, PushError};
|
||||
|
||||
/// A slot in a queue.
|
||||
struct Slot<T> {
|
||||
|
@ -81,6 +83,74 @@ impl<T> Bounded<T> {
|
|||
|
||||
/// Attempts to push an item into the queue.
|
||||
pub fn push(&self, value: T) -> Result<(), PushError<T>> {
|
||||
self.push_or_else(value, |value, tail, _, _| {
|
||||
let head = self.head.load(Ordering::Relaxed);
|
||||
|
||||
// If the head lags one lap behind the tail as well...
|
||||
if head.wrapping_add(self.one_lap) == tail {
|
||||
// ...then the queue is full.
|
||||
Err(PushError::Full(value))
|
||||
} else {
|
||||
Ok(value)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Pushes an item into the queue, displacing another item if needed.
|
||||
pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
|
||||
let result = self.push_or_else(value, |value, tail, new_tail, slot| {
|
||||
let head = tail.wrapping_sub(self.one_lap);
|
||||
let new_head = new_tail.wrapping_sub(self.one_lap);
|
||||
|
||||
// Try to move the head.
|
||||
if self
|
||||
.head
|
||||
.compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
// Move the tail.
|
||||
self.tail.store(new_tail, Ordering::SeqCst);
|
||||
|
||||
// Swap out the old value.
|
||||
// SAFETY: We know this is initialized, since it's covered by the current queue.
|
||||
let old = unsafe {
|
||||
slot.value
|
||||
.with_mut(|slot| slot.replace(MaybeUninit::new(value)).assume_init())
|
||||
};
|
||||
|
||||
// Update the stamp.
|
||||
slot.stamp.store(tail + 1, Ordering::Release);
|
||||
|
||||
// Return a PushError.
|
||||
Err(PushError::Full(old))
|
||||
} else {
|
||||
Ok(value)
|
||||
}
|
||||
});
|
||||
|
||||
match result {
|
||||
Ok(()) => Ok(None),
|
||||
Err(PushError::Full(old_value)) => Ok(Some(old_value)),
|
||||
Err(PushError::Closed(value)) => Err(ForcePushError(value)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to push an item into the queue, running a closure on failure.
|
||||
///
|
||||
/// `fail` is run when there is no more room left in the tail of the queue. The parameters of
|
||||
/// this function are as follows:
|
||||
///
|
||||
/// - The item that failed to push.
|
||||
/// - The value of `self.tail` before the new value would be inserted.
|
||||
/// - The value of `self.tail` after the new value would be inserted.
|
||||
/// - The slot that we attempted to push into.
|
||||
///
|
||||
/// If `fail` returns `Ok(val)`, we will try pushing `val` to the head of the queue. Otherwise,
|
||||
/// this function will return the error.
|
||||
fn push_or_else<F>(&self, mut value: T, mut fail: F) -> Result<(), PushError<T>>
|
||||
where
|
||||
F: FnMut(T, usize, usize, &Slot<T>) -> Result<T, PushError<T>>,
|
||||
{
|
||||
let mut tail = self.tail.load(Ordering::Relaxed);
|
||||
|
||||
loop {
|
||||
|
@ -93,22 +163,23 @@ impl<T> Bounded<T> {
|
|||
let index = tail & (self.mark_bit - 1);
|
||||
let lap = tail & !(self.one_lap - 1);
|
||||
|
||||
// Calculate the new location of the tail.
|
||||
let new_tail = if index + 1 < self.buffer.len() {
|
||||
// Same lap, incremented index.
|
||||
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
|
||||
tail + 1
|
||||
} else {
|
||||
// One lap forward, index wraps around to zero.
|
||||
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
|
||||
lap.wrapping_add(self.one_lap)
|
||||
};
|
||||
|
||||
// Inspect the corresponding slot.
|
||||
let slot = &self.buffer[index];
|
||||
let stamp = slot.stamp.load(Ordering::Acquire);
|
||||
|
||||
// If the tail and the stamp match, we may attempt to push.
|
||||
if tail == stamp {
|
||||
let new_tail = if index + 1 < self.buffer.len() {
|
||||
// Same lap, incremented index.
|
||||
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
|
||||
tail + 1
|
||||
} else {
|
||||
// One lap forward, index wraps around to zero.
|
||||
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
|
||||
lap.wrapping_add(self.one_lap)
|
||||
};
|
||||
|
||||
// Try moving the tail.
|
||||
match self.tail.compare_exchange_weak(
|
||||
tail,
|
||||
|
@ -118,9 +189,9 @@ impl<T> Bounded<T> {
|
|||
) {
|
||||
Ok(_) => {
|
||||
// Write the value into the slot and update the stamp.
|
||||
unsafe {
|
||||
slot.value.get().write(MaybeUninit::new(value));
|
||||
}
|
||||
slot.value.with_mut(|slot| unsafe {
|
||||
slot.write(MaybeUninit::new(value));
|
||||
});
|
||||
slot.stamp.store(tail + 1, Ordering::Release);
|
||||
return Ok(());
|
||||
}
|
||||
|
@ -130,18 +201,18 @@ impl<T> Bounded<T> {
|
|||
}
|
||||
} else if stamp.wrapping_add(self.one_lap) == tail + 1 {
|
||||
crate::full_fence();
|
||||
let head = self.head.load(Ordering::Relaxed);
|
||||
|
||||
// If the head lags one lap behind the tail as well...
|
||||
if head.wrapping_add(self.one_lap) == tail {
|
||||
// ...then the queue is full.
|
||||
return Err(PushError::Full(value));
|
||||
}
|
||||
// We've failed to push; run our failure closure.
|
||||
value = fail(value, tail, new_tail, slot)?;
|
||||
|
||||
// Loom complains if there isn't an explicit busy wait here.
|
||||
#[cfg(loom)]
|
||||
busy_wait();
|
||||
|
||||
tail = self.tail.load(Ordering::Relaxed);
|
||||
} else {
|
||||
// Yield because we need to wait for the stamp to get updated.
|
||||
thread::yield_now();
|
||||
busy_wait();
|
||||
tail = self.tail.load(Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
@ -181,7 +252,9 @@ impl<T> Bounded<T> {
|
|||
) {
|
||||
Ok(_) => {
|
||||
// Read the value from the slot and update the stamp.
|
||||
let value = unsafe { slot.value.get().read().assume_init() };
|
||||
let value = slot
|
||||
.value
|
||||
.with_mut(|slot| unsafe { slot.read().assume_init() });
|
||||
slot.stamp
|
||||
.store(head.wrapping_add(self.one_lap), Ordering::Release);
|
||||
return Ok(value);
|
||||
|
@ -204,10 +277,14 @@ impl<T> Bounded<T> {
|
|||
}
|
||||
}
|
||||
|
||||
// Loom complains if there isn't a busy-wait here.
|
||||
#[cfg(loom)]
|
||||
busy_wait();
|
||||
|
||||
head = self.head.load(Ordering::Relaxed);
|
||||
} else {
|
||||
// Yield because we need to wait for the stamp to get updated.
|
||||
thread::yield_now();
|
||||
busy_wait();
|
||||
head = self.head.load(Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
@ -284,37 +361,48 @@ impl<T> Bounded<T> {
|
|||
impl<T> Drop for Bounded<T> {
|
||||
fn drop(&mut self) {
|
||||
// Get the index of the head.
|
||||
let head = *self.head.get_mut();
|
||||
let tail = *self.tail.get_mut();
|
||||
let Self {
|
||||
head,
|
||||
tail,
|
||||
buffer,
|
||||
mark_bit,
|
||||
..
|
||||
} = self;
|
||||
|
||||
let hix = head & (self.mark_bit - 1);
|
||||
let tix = tail & (self.mark_bit - 1);
|
||||
let mark_bit = *mark_bit;
|
||||
|
||||
let len = if hix < tix {
|
||||
tix - hix
|
||||
} else if hix > tix {
|
||||
self.buffer.len() - hix + tix
|
||||
} else if (tail & !self.mark_bit) == head {
|
||||
0
|
||||
} else {
|
||||
self.buffer.len()
|
||||
};
|
||||
head.with_mut(|&mut head| {
|
||||
tail.with_mut(|&mut tail| {
|
||||
let hix = head & (mark_bit - 1);
|
||||
let tix = tail & (mark_bit - 1);
|
||||
|
||||
// Loop over all slots that hold a value and drop them.
|
||||
for i in 0..len {
|
||||
// Compute the index of the next slot holding a value.
|
||||
let index = if hix + i < self.buffer.len() {
|
||||
hix + i
|
||||
} else {
|
||||
hix + i - self.buffer.len()
|
||||
};
|
||||
let len = if hix < tix {
|
||||
tix - hix
|
||||
} else if hix > tix {
|
||||
buffer.len() - hix + tix
|
||||
} else if (tail & !mark_bit) == head {
|
||||
0
|
||||
} else {
|
||||
buffer.len()
|
||||
};
|
||||
|
||||
// Drop the value in the slot.
|
||||
let slot = &self.buffer[index];
|
||||
unsafe {
|
||||
let value = &mut *slot.value.get();
|
||||
value.as_mut_ptr().drop_in_place();
|
||||
}
|
||||
}
|
||||
// Loop over all slots that hold a value and drop them.
|
||||
for i in 0..len {
|
||||
// Compute the index of the next slot holding a value.
|
||||
let index = if hix + i < buffer.len() {
|
||||
hix + i
|
||||
} else {
|
||||
hix + i - buffer.len()
|
||||
};
|
||||
|
||||
// Drop the value in the slot.
|
||||
let slot = &buffer[index];
|
||||
slot.value.with_mut(|slot| unsafe {
|
||||
let value = &mut *slot;
|
||||
value.as_mut_ptr().drop_in_place();
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
252
src/lib.rs
252
src/lib.rs
|
@ -24,25 +24,75 @@
|
|||
//! assert_eq!(q.pop(), Ok(2));
|
||||
//! ```
|
||||
//!
|
||||
//! # Features
|
||||
//!
|
||||
//! `concurrent-queue` uses an `std` default feature. With this feature enabled, this crate will
|
||||
//! use [`std::thread::yield_now`] to avoid busy waiting in tight loops. However, with this
|
||||
//! feature disabled, [`core::hint::spin_loop`] will be used instead. Disabling `std` will allow
|
||||
//! this crate to be used on `no_std` platforms at the potential expense of more busy waiting.
|
||||
//!
|
||||
//! There is also a `portable-atomic` feature, which uses a polyfill from the
|
||||
//! [`portable-atomic`] crate to provide atomic operations on platforms that do not support them.
|
||||
//! See the [`README`] for the [`portable-atomic`] crate for more information on how to use it.
|
||||
//! Note that even with this feature enabled, `concurrent-queue` still requires a global allocator
|
||||
//! to be available. See the documentation for the [`std::alloc::GlobalAlloc`] trait for more
|
||||
//! information.
|
||||
//!
|
||||
//! [Bounded]: `ConcurrentQueue::bounded()`
|
||||
//! [Unbounded]: `ConcurrentQueue::unbounded()`
|
||||
//! [closed]: `ConcurrentQueue::close()`
|
||||
//! [`portable-atomic`]: https://crates.io/crates/portable-atomic
|
||||
//! [`README`]: https://github.com/taiki-e/portable-atomic/blob/main/README.md#optional-cfg
|
||||
|
||||
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
|
||||
#![no_std]
|
||||
#![doc(
|
||||
html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
|
||||
)]
|
||||
#![doc(
|
||||
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
|
||||
)]
|
||||
|
||||
extern crate alloc;
|
||||
#[cfg(feature = "std")]
|
||||
extern crate std;
|
||||
|
||||
use core::fmt;
|
||||
use core::panic::{RefUnwindSafe, UnwindSafe};
|
||||
use sync::atomic::{self, Ordering};
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
use std::error;
|
||||
use std::fmt;
|
||||
use std::panic::{RefUnwindSafe, UnwindSafe};
|
||||
use std::sync::atomic::{self, AtomicUsize, Ordering};
|
||||
|
||||
use crate::bounded::Bounded;
|
||||
use crate::single::Single;
|
||||
use crate::sync::busy_wait;
|
||||
use crate::unbounded::Unbounded;
|
||||
|
||||
mod bounded;
|
||||
mod single;
|
||||
mod unbounded;
|
||||
|
||||
mod sync;
|
||||
|
||||
/// Make the given function const if the given condition is true.
|
||||
macro_rules! const_fn {
|
||||
(
|
||||
const_if: #[cfg($($cfg:tt)+)];
|
||||
$(#[$($attr:tt)*])*
|
||||
$vis:vis const fn $($rest:tt)*
|
||||
) => {
|
||||
#[cfg($($cfg)+)]
|
||||
$(#[$($attr)*])*
|
||||
$vis const fn $($rest)*
|
||||
#[cfg(not($($cfg)+))]
|
||||
$(#[$($attr)*])*
|
||||
$vis fn $($rest)*
|
||||
};
|
||||
}
|
||||
|
||||
pub(crate) use const_fn;
|
||||
|
||||
/// A concurrent queue.
|
||||
///
|
||||
/// # Examples
|
||||
|
@ -68,10 +118,11 @@ unsafe impl<T: Send> Sync for ConcurrentQueue<T> {}
|
|||
impl<T> UnwindSafe for ConcurrentQueue<T> {}
|
||||
impl<T> RefUnwindSafe for ConcurrentQueue<T> {}
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
enum Inner<T> {
|
||||
Single(Single<T>),
|
||||
Bounded(Box<Bounded<T>>),
|
||||
Unbounded(Box<Unbounded<T>>),
|
||||
Bounded(Bounded<T>),
|
||||
Unbounded(Unbounded<T>),
|
||||
}
|
||||
|
||||
impl<T> ConcurrentQueue<T> {
|
||||
|
@ -94,22 +145,25 @@ impl<T> ConcurrentQueue<T> {
|
|||
if cap == 1 {
|
||||
ConcurrentQueue(Inner::Single(Single::new()))
|
||||
} else {
|
||||
ConcurrentQueue(Inner::Bounded(Box::new(Bounded::new(cap))))
|
||||
ConcurrentQueue(Inner::Bounded(Bounded::new(cap)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new unbounded queue.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use concurrent_queue::ConcurrentQueue;
|
||||
///
|
||||
/// let q = ConcurrentQueue::<i32>::unbounded();
|
||||
/// ```
|
||||
pub fn unbounded() -> ConcurrentQueue<T> {
|
||||
ConcurrentQueue(Inner::Unbounded(Box::new(Unbounded::new())))
|
||||
}
|
||||
const_fn!(
|
||||
const_if: #[cfg(not(loom))];
|
||||
/// Creates a new unbounded queue.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use concurrent_queue::ConcurrentQueue;
|
||||
///
|
||||
/// let q = ConcurrentQueue::<i32>::unbounded();
|
||||
/// ```
|
||||
pub const fn unbounded() -> ConcurrentQueue<T> {
|
||||
ConcurrentQueue(Inner::Unbounded(Unbounded::new()))
|
||||
}
|
||||
);
|
||||
|
||||
/// Attempts to push an item into the queue.
|
||||
///
|
||||
|
@ -148,6 +202,54 @@ impl<T> ConcurrentQueue<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Push an element into the queue, potentially displacing another element.
|
||||
///
|
||||
/// Attempts to push an element into the queue. If the queue is full, one item from the
|
||||
/// queue is replaced with the provided item. The displaced item is returned as `Some(T)`.
|
||||
/// If the queue is closed, an error is returned.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use concurrent_queue::{ConcurrentQueue, ForcePushError, PushError};
|
||||
///
|
||||
/// let q = ConcurrentQueue::bounded(3);
|
||||
///
|
||||
/// // We can push to the queue.
|
||||
/// for i in 1..=3 {
|
||||
/// assert_eq!(q.force_push(i), Ok(None));
|
||||
/// }
|
||||
///
|
||||
/// // Push errors because the queue is now full.
|
||||
/// assert_eq!(q.push(4), Err(PushError::Full(4)));
|
||||
///
|
||||
/// // Pushing a new value replaces the old ones.
|
||||
/// assert_eq!(q.force_push(5), Ok(Some(1)));
|
||||
/// assert_eq!(q.force_push(6), Ok(Some(2)));
|
||||
///
|
||||
/// // Close the queue to stop further pushes.
|
||||
/// q.close();
|
||||
///
|
||||
/// // Pushing will return an error.
|
||||
/// assert_eq!(q.force_push(7), Err(ForcePushError(7)));
|
||||
///
|
||||
/// // Popping items will return the force-pushed ones.
|
||||
/// assert_eq!(q.pop(), Ok(3));
|
||||
/// assert_eq!(q.pop(), Ok(5));
|
||||
/// assert_eq!(q.pop(), Ok(6));
|
||||
/// ```
|
||||
pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
|
||||
match &self.0 {
|
||||
Inner::Single(q) => q.force_push(value),
|
||||
Inner::Bounded(q) => q.force_push(value),
|
||||
Inner::Unbounded(q) => match q.push(value) {
|
||||
Ok(()) => Ok(None),
|
||||
Err(PushError::Closed(value)) => Err(ForcePushError(value)),
|
||||
Err(PushError::Full(_)) => unreachable!(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to pop an item from the queue.
|
||||
///
|
||||
/// If the queue is empty, an error is returned.
|
||||
|
@ -181,6 +283,35 @@ impl<T> ConcurrentQueue<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Get an iterator over the items in the queue.
|
||||
///
|
||||
/// The iterator will continue until the queue is empty or closed. It will never block;
|
||||
/// if the queue is empty, the iterator will return `None`. If new items are pushed into
|
||||
/// the queue, the iterator may return `Some` in the future after returning `None`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use concurrent_queue::ConcurrentQueue;
|
||||
///
|
||||
/// let q = ConcurrentQueue::bounded(5);
|
||||
/// q.push(1).unwrap();
|
||||
/// q.push(2).unwrap();
|
||||
/// q.push(3).unwrap();
|
||||
///
|
||||
/// let mut iter = q.try_iter();
|
||||
/// assert_eq!(iter.by_ref().sum::<i32>(), 6);
|
||||
/// assert_eq!(iter.next(), None);
|
||||
///
|
||||
/// // Pushing more items will make them available to the iterator.
|
||||
/// q.push(4).unwrap();
|
||||
/// assert_eq!(iter.next(), Some(4));
|
||||
/// assert_eq!(iter.next(), None);
|
||||
/// ```
|
||||
pub fn try_iter(&self) -> TryIter<'_, T> {
|
||||
TryIter { queue: self }
|
||||
}
|
||||
|
||||
/// Returns `true` if the queue is empty.
|
||||
///
|
||||
/// # Examples
|
||||
|
@ -339,6 +470,31 @@ impl<T> fmt::Debug for ConcurrentQueue<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// An iterator that pops items from a [`ConcurrentQueue`].
|
||||
///
|
||||
/// This iterator will never block; it will return `None` once the queue has
|
||||
/// been exhausted. Calling `next` after `None` may yield `Some(item)` if more items
|
||||
/// are pushed to the queue.
|
||||
#[must_use = "iterators are lazy and do nothing unless consumed"]
|
||||
#[derive(Clone)]
|
||||
pub struct TryIter<'a, T> {
|
||||
queue: &'a ConcurrentQueue<T>,
|
||||
}
|
||||
|
||||
impl<T> fmt::Debug for TryIter<'_, T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_tuple("Iter").field(&self.queue).finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Iterator for TryIter<'_, T> {
|
||||
type Item = T;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.queue.pop().ok()
|
||||
}
|
||||
}
|
||||
|
||||
/// Error which occurs when popping from an empty queue.
|
||||
#[derive(Clone, Copy, Eq, PartialEq)]
|
||||
pub enum PopError {
|
||||
|
@ -367,6 +523,7 @@ impl PopError {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl error::Error for PopError {}
|
||||
|
||||
impl fmt::Debug for PopError {
|
||||
|
@ -423,6 +580,7 @@ impl<T> PushError<T> {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl<T: fmt::Debug> error::Error for PushError<T> {}
|
||||
|
||||
impl<T: fmt::Debug> fmt::Debug for PushError<T> {
|
||||
|
@ -443,30 +601,60 @@ impl<T> fmt::Display for PushError<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Error that occurs when force-pushing into a full queue.
|
||||
#[derive(Clone, Copy, PartialEq, Eq)]
|
||||
pub struct ForcePushError<T>(pub T);
|
||||
|
||||
impl<T> ForcePushError<T> {
|
||||
/// Return the inner value that failed to be force-pushed.
|
||||
pub fn into_inner(self) -> T {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: fmt::Debug> fmt::Debug for ForcePushError<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_tuple("ForcePushError").field(&self.0).finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> fmt::Display for ForcePushError<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "Closed")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl<T: fmt::Debug> error::Error for ForcePushError<T> {}
|
||||
|
||||
/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
|
||||
#[inline]
|
||||
fn full_fence() {
|
||||
if cfg!(all(
|
||||
any(target_arch = "x86", target_arch = "x86_64"),
|
||||
not(miri)
|
||||
)) {
|
||||
#[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), not(miri), not(loom)))]
|
||||
{
|
||||
use core::{arch::asm, cell::UnsafeCell};
|
||||
// HACK(stjepang): On x86 architectures there are two different ways of executing
|
||||
// a `SeqCst` fence.
|
||||
//
|
||||
// 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
|
||||
// 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg` instruction.
|
||||
// 2. A `lock <op>` instruction.
|
||||
//
|
||||
// Both instructions have the effect of a full barrier, but empirical benchmarks have shown
|
||||
// that the second one is sometimes a bit faster.
|
||||
//
|
||||
// The ideal solution here would be to use inline assembly, but we're instead creating a
|
||||
// temporary atomic variable and compare-and-exchanging its value. No sane compiler to
|
||||
// x86 platforms is going to optimize this away.
|
||||
atomic::compiler_fence(Ordering::SeqCst);
|
||||
let a = AtomicUsize::new(0);
|
||||
let _ = a.compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst);
|
||||
atomic::compiler_fence(Ordering::SeqCst);
|
||||
} else {
|
||||
let a = UnsafeCell::new(0_usize);
|
||||
// It is common to use `lock or` here, but when using a local variable, `lock not`, which
|
||||
// does not change the flag, should be slightly more efficient.
|
||||
// Refs: https://www.felixcloutier.com/x86/not
|
||||
unsafe {
|
||||
#[cfg(target_pointer_width = "64")]
|
||||
asm!("lock not qword ptr [{0}]", in(reg) a.get(), options(nostack, preserves_flags));
|
||||
#[cfg(target_pointer_width = "32")]
|
||||
asm!("lock not dword ptr [{0:e}]", in(reg) a.get(), options(nostack, preserves_flags));
|
||||
}
|
||||
return;
|
||||
}
|
||||
#[allow(unreachable_code)]
|
||||
{
|
||||
atomic::fence(Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
use std::cell::UnsafeCell;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::thread;
|
||||
use core::mem::MaybeUninit;
|
||||
use core::ptr;
|
||||
|
||||
use crate::{PopError, PushError};
|
||||
use crate::sync::atomic::{AtomicUsize, Ordering};
|
||||
use crate::sync::cell::UnsafeCell;
|
||||
#[allow(unused_imports)]
|
||||
use crate::sync::prelude::*;
|
||||
use crate::{busy_wait, ForcePushError, PopError, PushError};
|
||||
|
||||
const LOCKED: usize = 1 << 0;
|
||||
const PUSHED: usize = 1 << 1;
|
||||
|
@ -34,7 +36,9 @@ impl<T> Single<T> {
|
|||
|
||||
if state == 0 {
|
||||
// Write the value and unlock.
|
||||
unsafe { self.slot.get().write(MaybeUninit::new(value)) }
|
||||
self.slot.with_mut(|slot| unsafe {
|
||||
slot.write(MaybeUninit::new(value));
|
||||
});
|
||||
self.state.fetch_and(!LOCKED, Ordering::Release);
|
||||
Ok(())
|
||||
} else if state & CLOSED != 0 {
|
||||
|
@ -44,6 +48,59 @@ impl<T> Single<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Attempts to push an item into the queue, displacing another if necessary.
|
||||
pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
|
||||
// Attempt to lock the slot.
|
||||
let mut state = 0;
|
||||
|
||||
loop {
|
||||
// Lock the slot.
|
||||
let prev = self
|
||||
.state
|
||||
.compare_exchange(state, LOCKED | PUSHED, Ordering::SeqCst, Ordering::SeqCst)
|
||||
.unwrap_or_else(|x| x);
|
||||
|
||||
if prev & CLOSED != 0 {
|
||||
return Err(ForcePushError(value));
|
||||
}
|
||||
|
||||
if prev == state {
|
||||
// If the value was pushed, swap out the value.
|
||||
let prev_value = if prev & PUSHED == 0 {
|
||||
// SAFETY: write is safe because we have locked the state.
|
||||
self.slot.with_mut(|slot| unsafe {
|
||||
slot.write(MaybeUninit::new(value));
|
||||
});
|
||||
None
|
||||
} else {
|
||||
// SAFETY: replace is safe because we have locked the state, and
|
||||
// assume_init is safe because we have checked that the value was pushed.
|
||||
let prev_value = unsafe {
|
||||
self.slot.with_mut(move |slot| {
|
||||
ptr::replace(slot, MaybeUninit::new(value)).assume_init()
|
||||
})
|
||||
};
|
||||
Some(prev_value)
|
||||
};
|
||||
|
||||
// We can unlock the slot now.
|
||||
self.state.fetch_and(!LOCKED, Ordering::Release);
|
||||
|
||||
// Return the old value.
|
||||
return Ok(prev_value);
|
||||
}
|
||||
|
||||
// Try to go for the current (pushed) state.
|
||||
if prev & LOCKED == 0 {
|
||||
state = prev;
|
||||
} else {
|
||||
// State is locked.
|
||||
busy_wait();
|
||||
state = prev & !LOCKED;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to pop an item from the queue.
|
||||
pub fn pop(&self) -> Result<T, PopError> {
|
||||
let mut state = PUSHED;
|
||||
|
@ -61,7 +118,9 @@ impl<T> Single<T> {
|
|||
|
||||
if prev == state {
|
||||
// Read the value and unlock.
|
||||
let value = unsafe { self.slot.get().read().assume_init() };
|
||||
let value = self
|
||||
.slot
|
||||
.with_mut(|slot| unsafe { slot.read().assume_init() });
|
||||
self.state.fetch_and(!LOCKED, Ordering::Release);
|
||||
return Ok(value);
|
||||
}
|
||||
|
@ -77,7 +136,7 @@ impl<T> Single<T> {
|
|||
if prev & LOCKED == 0 {
|
||||
state = prev;
|
||||
} else {
|
||||
thread::yield_now();
|
||||
busy_wait();
|
||||
state = prev & !LOCKED;
|
||||
}
|
||||
}
|
||||
|
@ -85,11 +144,7 @@ impl<T> Single<T> {
|
|||
|
||||
/// Returns the number of items in the queue.
|
||||
pub fn len(&self) -> usize {
|
||||
if self.state.load(Ordering::SeqCst) & PUSHED == 0 {
|
||||
0
|
||||
} else {
|
||||
1
|
||||
}
|
||||
usize::from(self.state.load(Ordering::SeqCst) & PUSHED != 0)
|
||||
}
|
||||
|
||||
/// Returns `true` if the queue is empty.
|
||||
|
@ -119,11 +174,14 @@ impl<T> Single<T> {
|
|||
impl<T> Drop for Single<T> {
|
||||
fn drop(&mut self) {
|
||||
// Drop the value in the slot.
|
||||
if *self.state.get_mut() & PUSHED != 0 {
|
||||
unsafe {
|
||||
let value = &mut *self.slot.get();
|
||||
value.as_mut_ptr().drop_in_place();
|
||||
let Self { state, slot } = self;
|
||||
state.with_mut(|state| {
|
||||
if *state & PUSHED != 0 {
|
||||
slot.with_mut(|slot| unsafe {
|
||||
let value = &mut *slot;
|
||||
value.as_mut_ptr().drop_in_place();
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
//! Synchronization facade to choose between `core` primitives and `loom` primitives.
|
||||
|
||||
#[cfg(all(feature = "portable-atomic", not(loom)))]
|
||||
mod sync_impl {
|
||||
pub(crate) use core::cell;
|
||||
pub(crate) use portable_atomic as atomic;
|
||||
|
||||
#[cfg(not(feature = "std"))]
|
||||
pub(crate) use atomic::hint::spin_loop;
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
pub(crate) use std::thread::yield_now;
|
||||
}
|
||||
|
||||
#[cfg(all(not(feature = "portable-atomic"), not(loom)))]
|
||||
mod sync_impl {
|
||||
pub(crate) use core::cell;
|
||||
pub(crate) use core::sync::atomic;
|
||||
|
||||
#[cfg(not(feature = "std"))]
|
||||
#[inline]
|
||||
pub(crate) fn spin_loop() {
|
||||
#[allow(deprecated)]
|
||||
atomic::spin_loop_hint();
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
pub(crate) use std::thread::yield_now;
|
||||
}
|
||||
|
||||
#[cfg(loom)]
|
||||
mod sync_impl {
|
||||
pub(crate) use loom::cell;
|
||||
|
||||
pub(crate) mod atomic {
|
||||
pub(crate) use loom::sync::atomic::*;
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "std"))]
|
||||
pub(crate) use loom::hint::spin_loop;
|
||||
#[cfg(feature = "std")]
|
||||
pub(crate) use loom::thread::yield_now;
|
||||
}
|
||||
|
||||
pub(crate) use sync_impl::*;
|
||||
|
||||
/// Notify the CPU that we are currently busy-waiting.
|
||||
#[inline]
|
||||
pub(crate) fn busy_wait() {
|
||||
#[cfg(feature = "std")]
|
||||
yield_now();
|
||||
|
||||
#[cfg(not(feature = "std"))]
|
||||
spin_loop();
|
||||
}
|
||||
|
||||
#[cfg(loom)]
|
||||
pub(crate) mod prelude {}
|
||||
|
||||
#[cfg(not(loom))]
|
||||
pub(crate) mod prelude {
|
||||
use super::{atomic, cell};
|
||||
|
||||
/// Emulate `loom::UnsafeCell`'s API.
|
||||
pub(crate) trait UnsafeCellExt {
|
||||
type Value;
|
||||
|
||||
fn with_mut<R, F>(&self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(*mut Self::Value) -> R;
|
||||
}
|
||||
|
||||
impl<T> UnsafeCellExt for cell::UnsafeCell<T> {
|
||||
type Value = T;
|
||||
|
||||
fn with_mut<R, F>(&self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(*mut Self::Value) -> R,
|
||||
{
|
||||
f(self.get())
|
||||
}
|
||||
}
|
||||
|
||||
/// Emulate `loom::Atomic*`'s API.
|
||||
pub(crate) trait AtomicExt {
|
||||
type Value;
|
||||
|
||||
fn with_mut<R, F>(&mut self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&mut Self::Value) -> R;
|
||||
}
|
||||
|
||||
impl AtomicExt for atomic::AtomicUsize {
|
||||
type Value = usize;
|
||||
|
||||
fn with_mut<R, F>(&mut self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&mut Self::Value) -> R,
|
||||
{
|
||||
f(self.get_mut())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AtomicExt for atomic::AtomicPtr<T> {
|
||||
type Value = *mut T;
|
||||
|
||||
fn with_mut<R, F>(&mut self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&mut Self::Value) -> R,
|
||||
{
|
||||
f(self.get_mut())
|
||||
}
|
||||
}
|
||||
}
|
156
src/unbounded.rs
156
src/unbounded.rs
|
@ -1,12 +1,15 @@
|
|||
use std::cell::UnsafeCell;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::ptr;
|
||||
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
|
||||
use std::thread;
|
||||
use alloc::boxed::Box;
|
||||
use core::mem::MaybeUninit;
|
||||
use core::ptr;
|
||||
|
||||
use cache_padded::CachePadded;
|
||||
use crossbeam_utils::CachePadded;
|
||||
|
||||
use crate::{PopError, PushError};
|
||||
use crate::const_fn;
|
||||
use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
|
||||
use crate::sync::cell::UnsafeCell;
|
||||
#[allow(unused_imports)]
|
||||
use crate::sync::prelude::*;
|
||||
use crate::{busy_wait, PopError, PushError};
|
||||
|
||||
// Bits indicating the state of a slot:
|
||||
// * If a value has been written into the slot, `WRITE` is set.
|
||||
|
@ -37,15 +40,40 @@ struct Slot<T> {
|
|||
}
|
||||
|
||||
impl<T> Slot<T> {
|
||||
#[cfg(not(loom))]
|
||||
const UNINIT: Slot<T> = Slot {
|
||||
value: UnsafeCell::new(MaybeUninit::uninit()),
|
||||
state: AtomicUsize::new(0),
|
||||
};
|
||||
|
||||
#[cfg(not(loom))]
|
||||
fn uninit_block() -> [Slot<T>; BLOCK_CAP] {
|
||||
[Self::UNINIT; BLOCK_CAP]
|
||||
}
|
||||
|
||||
#[cfg(loom)]
|
||||
fn uninit_block() -> [Slot<T>; BLOCK_CAP] {
|
||||
// Repeat this expression 31 times.
|
||||
// Update if we change BLOCK_CAP
|
||||
macro_rules! repeat_31 {
|
||||
($e: expr) => {
|
||||
[
|
||||
$e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e,
|
||||
$e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e,
|
||||
]
|
||||
};
|
||||
}
|
||||
|
||||
repeat_31!(Slot {
|
||||
value: UnsafeCell::new(MaybeUninit::uninit()),
|
||||
state: AtomicUsize::new(0),
|
||||
})
|
||||
}
|
||||
|
||||
/// Waits until a value is written into the slot.
|
||||
fn wait_write(&self) {
|
||||
while self.state.load(Ordering::Acquire) & WRITE == 0 {
|
||||
thread::yield_now();
|
||||
busy_wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -66,7 +94,7 @@ impl<T> Block<T> {
|
|||
fn new() -> Block<T> {
|
||||
Block {
|
||||
next: AtomicPtr::new(ptr::null_mut()),
|
||||
slots: [Slot::UNINIT; BLOCK_CAP],
|
||||
slots: Slot::uninit_block(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -77,7 +105,7 @@ impl<T> Block<T> {
|
|||
if !next.is_null() {
|
||||
return next;
|
||||
}
|
||||
thread::yield_now();
|
||||
busy_wait();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -121,19 +149,22 @@ pub struct Unbounded<T> {
|
|||
}
|
||||
|
||||
impl<T> Unbounded<T> {
|
||||
/// Creates a new unbounded queue.
|
||||
pub fn new() -> Unbounded<T> {
|
||||
Unbounded {
|
||||
head: CachePadded::new(Position {
|
||||
block: AtomicPtr::new(ptr::null_mut()),
|
||||
index: AtomicUsize::new(0),
|
||||
}),
|
||||
tail: CachePadded::new(Position {
|
||||
block: AtomicPtr::new(ptr::null_mut()),
|
||||
index: AtomicUsize::new(0),
|
||||
}),
|
||||
const_fn!(
|
||||
const_if: #[cfg(not(loom))];
|
||||
/// Creates a new unbounded queue.
|
||||
pub const fn new() -> Unbounded<T> {
|
||||
Unbounded {
|
||||
head: CachePadded::new(Position {
|
||||
block: AtomicPtr::new(ptr::null_mut()),
|
||||
index: AtomicUsize::new(0),
|
||||
}),
|
||||
tail: CachePadded::new(Position {
|
||||
block: AtomicPtr::new(ptr::null_mut()),
|
||||
index: AtomicUsize::new(0),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
/// Pushes an item into the queue.
|
||||
pub fn push(&self, value: T) -> Result<(), PushError<T>> {
|
||||
|
@ -152,7 +183,7 @@ impl<T> Unbounded<T> {
|
|||
|
||||
// If we reached the end of the block, wait until the next one is installed.
|
||||
if offset == BLOCK_CAP {
|
||||
thread::yield_now();
|
||||
busy_wait();
|
||||
tail = self.tail.index.load(Ordering::Acquire);
|
||||
block = self.tail.block.load(Ordering::Acquire);
|
||||
continue;
|
||||
|
@ -205,7 +236,9 @@ impl<T> Unbounded<T> {
|
|||
|
||||
// Write the value into the slot.
|
||||
let slot = (*block).slots.get_unchecked(offset);
|
||||
slot.value.get().write(MaybeUninit::new(value));
|
||||
slot.value.with_mut(|slot| {
|
||||
slot.write(MaybeUninit::new(value));
|
||||
});
|
||||
slot.state.fetch_or(WRITE, Ordering::Release);
|
||||
return Ok(());
|
||||
},
|
||||
|
@ -228,7 +261,7 @@ impl<T> Unbounded<T> {
|
|||
|
||||
// If we reached the end of the block, wait until the next one is installed.
|
||||
if offset == BLOCK_CAP {
|
||||
thread::yield_now();
|
||||
busy_wait();
|
||||
head = self.head.index.load(Ordering::Acquire);
|
||||
block = self.head.block.load(Ordering::Acquire);
|
||||
continue;
|
||||
|
@ -258,7 +291,7 @@ impl<T> Unbounded<T> {
|
|||
|
||||
// The block can be null here only if the first push operation is in progress.
|
||||
if block.is_null() {
|
||||
thread::yield_now();
|
||||
busy_wait();
|
||||
head = self.head.index.load(Ordering::Acquire);
|
||||
block = self.head.block.load(Ordering::Acquire);
|
||||
continue;
|
||||
|
@ -287,7 +320,7 @@ impl<T> Unbounded<T> {
|
|||
// Read the value.
|
||||
let slot = (*block).slots.get_unchecked(offset);
|
||||
slot.wait_write();
|
||||
let value = slot.value.get().read().assume_init();
|
||||
let value = slot.value.with_mut(|slot| slot.read().assume_init());
|
||||
|
||||
// Destroy the block if we've reached the end, or if another thread wanted to
|
||||
// destroy but couldn't because we were busy reading from the slot.
|
||||
|
@ -371,38 +404,49 @@ impl<T> Unbounded<T> {
|
|||
|
||||
impl<T> Drop for Unbounded<T> {
|
||||
fn drop(&mut self) {
|
||||
let mut head = *self.head.index.get_mut();
|
||||
let mut tail = *self.tail.index.get_mut();
|
||||
let mut block = *self.head.block.get_mut();
|
||||
let Self { head, tail } = self;
|
||||
let Position { index: head, block } = &mut **head;
|
||||
|
||||
// Erase the lower bits.
|
||||
head &= !((1 << SHIFT) - 1);
|
||||
tail &= !((1 << SHIFT) - 1);
|
||||
head.with_mut(|&mut mut head| {
|
||||
tail.index.with_mut(|&mut mut tail| {
|
||||
// Erase the lower bits.
|
||||
head &= !((1 << SHIFT) - 1);
|
||||
tail &= !((1 << SHIFT) - 1);
|
||||
|
||||
unsafe {
|
||||
// Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
|
||||
while head != tail {
|
||||
let offset = (head >> SHIFT) % LAP;
|
||||
unsafe {
|
||||
// Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
|
||||
while head != tail {
|
||||
let offset = (head >> SHIFT) % LAP;
|
||||
|
||||
if offset < BLOCK_CAP {
|
||||
// Drop the value in the slot.
|
||||
let slot = (*block).slots.get_unchecked(offset);
|
||||
let value = &mut *slot.value.get();
|
||||
value.as_mut_ptr().drop_in_place();
|
||||
} else {
|
||||
// Deallocate the block and move to the next one.
|
||||
let next = *(*block).next.get_mut();
|
||||
drop(Box::from_raw(block));
|
||||
block = next;
|
||||
if offset < BLOCK_CAP {
|
||||
// Drop the value in the slot.
|
||||
block.with_mut(|block| {
|
||||
let slot = (**block).slots.get_unchecked(offset);
|
||||
slot.value.with_mut(|slot| {
|
||||
let value = &mut *slot;
|
||||
value.as_mut_ptr().drop_in_place();
|
||||
});
|
||||
});
|
||||
} else {
|
||||
// Deallocate the block and move to the next one.
|
||||
block.with_mut(|block| {
|
||||
let next_block = (**block).next.with_mut(|next| *next);
|
||||
drop(Box::from_raw(*block));
|
||||
*block = next_block;
|
||||
});
|
||||
}
|
||||
|
||||
head = head.wrapping_add(1 << SHIFT);
|
||||
}
|
||||
|
||||
// Deallocate the last remaining block.
|
||||
block.with_mut(|block| {
|
||||
if !block.is_null() {
|
||||
drop(Box::from_raw(*block));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
head = head.wrapping_add(1 << SHIFT);
|
||||
}
|
||||
|
||||
// Deallocate the last remaining block.
|
||||
if !block.is_null() {
|
||||
drop(Box::from_raw(block));
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
124
tests/bounded.rs
124
tests/bounded.rs
|
@ -1,9 +1,14 @@
|
|||
#![allow(clippy::bool_assert_comparison)]
|
||||
|
||||
use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
use easy_parallel::Parallel;
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
|
||||
use easy_parallel::Parallel;
|
||||
#[cfg(target_family = "wasm")]
|
||||
use wasm_bindgen_test::wasm_bindgen_test as test;
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
|
@ -58,6 +63,7 @@ fn len_empty_full() {
|
|||
assert_eq!(q.is_full(), false);
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
#[test]
|
||||
fn len() {
|
||||
const COUNT: usize = if cfg!(miri) { 50 } else { 25_000 };
|
||||
|
@ -130,6 +136,33 @@ fn close() {
|
|||
assert_eq!(q.pop(), Err(PopError::Closed));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn force_push() {
|
||||
let q = ConcurrentQueue::<i32>::bounded(5);
|
||||
|
||||
for i in 1..=5 {
|
||||
assert_eq!(q.force_push(i), Ok(None));
|
||||
}
|
||||
|
||||
assert!(!q.is_closed());
|
||||
for i in 6..=10 {
|
||||
assert_eq!(q.force_push(i), Ok(Some(i - 5)));
|
||||
}
|
||||
assert_eq!(q.pop(), Ok(6));
|
||||
assert_eq!(q.force_push(11), Ok(None));
|
||||
for i in 12..=15 {
|
||||
assert_eq!(q.force_push(i), Ok(Some(i - 5)));
|
||||
}
|
||||
|
||||
assert!(q.close());
|
||||
assert_eq!(q.force_push(40), Err(ForcePushError(40)));
|
||||
for i in 11..=15 {
|
||||
assert_eq!(q.pop(), Ok(i));
|
||||
}
|
||||
assert_eq!(q.pop(), Err(PopError::Closed));
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
#[test]
|
||||
fn spsc() {
|
||||
const COUNT: usize = if cfg!(miri) { 100 } else { 100_000 };
|
||||
|
@ -156,6 +189,7 @@ fn spsc() {
|
|||
.run();
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
#[test]
|
||||
fn mpmc() {
|
||||
const COUNT: usize = if cfg!(miri) { 100 } else { 25_000 };
|
||||
|
@ -187,6 +221,7 @@ fn mpmc() {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
#[test]
|
||||
fn drops() {
|
||||
const RUNS: usize = if cfg!(miri) { 10 } else { 100 };
|
||||
|
@ -235,6 +270,7 @@ fn drops() {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
#[test]
|
||||
fn linearizable() {
|
||||
const COUNT: usize = if cfg!(miri) { 500 } else { 25_000 };
|
||||
|
@ -243,11 +279,93 @@ fn linearizable() {
|
|||
let q = ConcurrentQueue::bounded(THREADS);
|
||||
|
||||
Parallel::new()
|
||||
.each(0..THREADS, |_| {
|
||||
.each(0..THREADS / 2, |_| {
|
||||
for _ in 0..COUNT {
|
||||
while q.push(0).is_err() {}
|
||||
q.pop().unwrap();
|
||||
}
|
||||
})
|
||||
.each(0..THREADS / 2, |_| {
|
||||
for _ in 0..COUNT {
|
||||
if q.force_push(0).unwrap().is_none() {
|
||||
q.pop().unwrap();
|
||||
}
|
||||
}
|
||||
})
|
||||
.run();
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
#[test]
|
||||
fn spsc_ring_buffer() {
|
||||
const COUNT: usize = if cfg!(miri) { 200 } else { 100_000 };
|
||||
|
||||
let t = AtomicUsize::new(1);
|
||||
let q = ConcurrentQueue::<usize>::bounded(3);
|
||||
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
|
||||
|
||||
Parallel::new()
|
||||
.add(|| loop {
|
||||
match t.load(Ordering::SeqCst) {
|
||||
0 if q.is_empty() => break,
|
||||
|
||||
_ => {
|
||||
while let Ok(n) = q.pop() {
|
||||
v[n].fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.add(|| {
|
||||
for i in 0..COUNT {
|
||||
if let Ok(Some(n)) = q.force_push(i) {
|
||||
v[n].fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
t.fetch_sub(1, Ordering::SeqCst);
|
||||
})
|
||||
.run();
|
||||
|
||||
for c in v {
|
||||
assert_eq!(c.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
#[test]
|
||||
fn mpmc_ring_buffer() {
|
||||
const COUNT: usize = if cfg!(miri) { 100 } else { 25_000 };
|
||||
const THREADS: usize = 4;
|
||||
|
||||
let t = AtomicUsize::new(THREADS);
|
||||
let q = ConcurrentQueue::<usize>::bounded(3);
|
||||
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
|
||||
|
||||
Parallel::new()
|
||||
.each(0..THREADS, |_| loop {
|
||||
match t.load(Ordering::SeqCst) {
|
||||
0 if q.is_empty() => break,
|
||||
|
||||
_ => {
|
||||
while let Ok(n) = q.pop() {
|
||||
v[n].fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.each(0..THREADS, |_| {
|
||||
for i in 0..COUNT {
|
||||
if let Ok(Some(n)) = q.force_push(i) {
|
||||
v[n].fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
t.fetch_sub(1, Ordering::SeqCst);
|
||||
})
|
||||
.run();
|
||||
|
||||
for c in v {
|
||||
assert_eq!(c.load(Ordering::SeqCst), THREADS);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,307 @@
|
|||
#![cfg(loom)]
|
||||
|
||||
use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};
|
||||
use loom::sync::atomic::{AtomicUsize, Ordering};
|
||||
use loom::sync::{Arc, Condvar, Mutex};
|
||||
use loom::thread;
|
||||
|
||||
#[cfg(target_family = "wasm")]
|
||||
use wasm_bindgen_test::wasm_bindgen_test as test;
|
||||
|
||||
/// A basic MPMC channel based on a ConcurrentQueue and loom primitives.
|
||||
struct Channel<T> {
|
||||
/// The queue used to contain items.
|
||||
queue: ConcurrentQueue<T>,
|
||||
|
||||
/// The number of senders.
|
||||
senders: AtomicUsize,
|
||||
|
||||
/// The number of receivers.
|
||||
receivers: AtomicUsize,
|
||||
|
||||
/// The event that is signaled when a new item is pushed.
|
||||
push_event: Event,
|
||||
|
||||
/// The event that is signaled when a new item is popped.
|
||||
pop_event: Event,
|
||||
}
|
||||
|
||||
/// The sending side of a channel.
|
||||
struct Sender<T> {
|
||||
/// The channel.
|
||||
channel: Arc<Channel<T>>,
|
||||
}
|
||||
|
||||
/// The receiving side of a channel.
|
||||
struct Receiver<T> {
|
||||
/// The channel.
|
||||
channel: Arc<Channel<T>>,
|
||||
}
|
||||
|
||||
/// Create a new pair of senders/receivers based on a queue.
|
||||
fn pair<T>(queue: ConcurrentQueue<T>) -> (Sender<T>, Receiver<T>) {
|
||||
let channel = Arc::new(Channel {
|
||||
queue,
|
||||
senders: AtomicUsize::new(1),
|
||||
receivers: AtomicUsize::new(1),
|
||||
push_event: Event::new(),
|
||||
pop_event: Event::new(),
|
||||
});
|
||||
|
||||
(
|
||||
Sender {
|
||||
channel: channel.clone(),
|
||||
},
|
||||
Receiver { channel },
|
||||
)
|
||||
}
|
||||
|
||||
impl<T> Clone for Sender<T> {
|
||||
fn clone(&self) -> Self {
|
||||
self.channel.senders.fetch_add(1, Ordering::SeqCst);
|
||||
Sender {
|
||||
channel: self.channel.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Sender<T> {
|
||||
fn drop(&mut self) {
|
||||
if self.channel.senders.fetch_sub(1, Ordering::SeqCst) == 1 {
|
||||
// Close the channel and notify the receivers.
|
||||
self.channel.queue.close();
|
||||
self.channel.push_event.signal_all();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for Receiver<T> {
|
||||
fn clone(&self) -> Self {
|
||||
self.channel.receivers.fetch_add(1, Ordering::SeqCst);
|
||||
Receiver {
|
||||
channel: self.channel.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Receiver<T> {
|
||||
fn drop(&mut self) {
|
||||
if self.channel.receivers.fetch_sub(1, Ordering::SeqCst) == 1 {
|
||||
// Close the channel and notify the senders.
|
||||
self.channel.queue.close();
|
||||
self.channel.pop_event.signal_all();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Sender<T> {
|
||||
/// Send a value.
|
||||
///
|
||||
/// Returns an error with the value if the channel is closed.
|
||||
fn send(&self, mut value: T) -> Result<(), T> {
|
||||
loop {
|
||||
match self.channel.queue.push(value) {
|
||||
Ok(()) => {
|
||||
// Notify a single receiver.
|
||||
self.channel.push_event.signal();
|
||||
return Ok(());
|
||||
}
|
||||
Err(PushError::Closed(val)) => return Err(val),
|
||||
Err(PushError::Full(val)) => {
|
||||
// Wait for a receiver to pop an item.
|
||||
value = val;
|
||||
self.channel.pop_event.wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a value forcefully.
|
||||
fn force_send(&self, value: T) -> Result<Option<T>, T> {
|
||||
match self.channel.queue.force_push(value) {
|
||||
Ok(bumped) => {
|
||||
self.channel.push_event.signal();
|
||||
Ok(bumped)
|
||||
}
|
||||
|
||||
Err(ForcePushError(val)) => Err(val),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Receiver<T> {
|
||||
/// Channel capacity.
|
||||
fn capacity(&self) -> Option<usize> {
|
||||
self.channel.queue.capacity()
|
||||
}
|
||||
|
||||
/// Receive a value.
|
||||
///
|
||||
/// Returns an error if the channel is closed.
|
||||
fn recv(&self) -> Result<T, ()> {
|
||||
loop {
|
||||
match self.channel.queue.pop() {
|
||||
Ok(value) => {
|
||||
// Notify a single sender.
|
||||
self.channel.pop_event.signal();
|
||||
return Ok(value);
|
||||
}
|
||||
Err(PopError::Closed) => return Err(()),
|
||||
Err(PopError::Empty) => {
|
||||
// Wait for a sender to push an item.
|
||||
self.channel.push_event.wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An event that can be waited on and then signaled.
|
||||
struct Event {
|
||||
/// The condition variable used to wait on the event.
|
||||
condvar: Condvar,
|
||||
|
||||
/// The mutex used to protect the event.
|
||||
///
|
||||
/// Inside is the event's state. The first bit is used to indicate if the
|
||||
/// notify_one method was called. The second bit is used to indicate if the
|
||||
/// notify_all method was called.
|
||||
mutex: Mutex<usize>,
|
||||
}
|
||||
|
||||
impl Event {
|
||||
/// Create a new event.
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
condvar: Condvar::new(),
|
||||
mutex: Mutex::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for the event to be signaled.
|
||||
fn wait(&self) {
|
||||
let mut state = self.mutex.lock().unwrap();
|
||||
|
||||
loop {
|
||||
if *state & 0b11 != 0 {
|
||||
// The event was signaled.
|
||||
*state &= !0b01;
|
||||
return;
|
||||
}
|
||||
|
||||
// Wait for the event to be signaled.
|
||||
state = self.condvar.wait(state).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
/// Signal the event.
|
||||
fn signal(&self) {
|
||||
let mut state = self.mutex.lock().unwrap();
|
||||
*state |= 1;
|
||||
drop(state);
|
||||
|
||||
self.condvar.notify_one();
|
||||
}
|
||||
|
||||
/// Signal the event, but notify all waiters.
|
||||
fn signal_all(&self) {
|
||||
let mut state = self.mutex.lock().unwrap();
|
||||
*state |= 3;
|
||||
drop(state);
|
||||
|
||||
self.condvar.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper to run tests on all three queues.
|
||||
fn run_test<F: Fn(ConcurrentQueue<usize>, usize) + Send + Sync + Clone + 'static>(f: F) {
|
||||
// The length of a loom test seems to increase exponentially the higher this number is.
|
||||
const LIMIT: usize = 4;
|
||||
|
||||
let fc = f.clone();
|
||||
loom::model(move || {
|
||||
fc(ConcurrentQueue::bounded(1), LIMIT);
|
||||
});
|
||||
|
||||
let fc = f.clone();
|
||||
loom::model(move || {
|
||||
fc(ConcurrentQueue::bounded(LIMIT / 2), LIMIT);
|
||||
});
|
||||
|
||||
loom::model(move || {
|
||||
f(ConcurrentQueue::unbounded(), LIMIT);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn spsc() {
|
||||
run_test(|q, limit| {
|
||||
// Create a new pair of senders/receivers.
|
||||
let (tx, rx) = pair(q);
|
||||
|
||||
// Push each onto a thread and run them.
|
||||
let handle = thread::spawn(move || {
|
||||
for i in 0..limit {
|
||||
if tx.send(i).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut recv_values = vec![];
|
||||
|
||||
loop {
|
||||
match rx.recv() {
|
||||
Ok(value) => recv_values.push(value),
|
||||
Err(()) => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Values may not be in order.
|
||||
recv_values.sort_unstable();
|
||||
assert_eq!(recv_values, (0..limit).collect::<Vec<_>>());
|
||||
|
||||
// Join the handle before we exit.
|
||||
handle.join().unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn spsc_force() {
|
||||
run_test(|q, limit| {
|
||||
// Create a new pair of senders/receivers.
|
||||
let (tx, rx) = pair(q);
|
||||
|
||||
// Push each onto a thread and run them.
|
||||
let handle = thread::spawn(move || {
|
||||
for i in 0..limit {
|
||||
if tx.force_send(i).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut recv_values = vec![];
|
||||
|
||||
loop {
|
||||
match rx.recv() {
|
||||
Ok(value) => recv_values.push(value),
|
||||
Err(()) => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Values may not be in order.
|
||||
recv_values.sort_unstable();
|
||||
let cap = rx.capacity().unwrap_or(usize::MAX);
|
||||
for (left, right) in (0..limit)
|
||||
.rev()
|
||||
.take(cap)
|
||||
.zip(recv_values.into_iter().rev())
|
||||
{
|
||||
assert_eq!(left, right);
|
||||
}
|
||||
|
||||
// Join the handle before we exit.
|
||||
handle.join().unwrap();
|
||||
});
|
||||
}
|
112
tests/single.rs
112
tests/single.rs
|
@ -1,9 +1,14 @@
|
|||
#![allow(clippy::bool_assert_comparison)]
|
||||
|
||||
use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
use easy_parallel::Parallel;
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
|
||||
use easy_parallel::Parallel;
|
||||
#[cfg(target_family = "wasm")]
|
||||
use wasm_bindgen_test::wasm_bindgen_test as test;
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
|
@ -60,6 +65,22 @@ fn close() {
|
|||
assert_eq!(q.pop(), Err(PopError::Closed));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn force_push() {
|
||||
let q = ConcurrentQueue::<i32>::bounded(1);
|
||||
assert_eq!(q.force_push(10), Ok(None));
|
||||
|
||||
assert!(!q.is_closed());
|
||||
assert_eq!(q.force_push(20), Ok(Some(10)));
|
||||
assert_eq!(q.force_push(30), Ok(Some(20)));
|
||||
|
||||
assert!(q.close());
|
||||
assert_eq!(q.force_push(40), Err(ForcePushError(40)));
|
||||
assert_eq!(q.pop(), Ok(30));
|
||||
assert_eq!(q.pop(), Err(PopError::Closed));
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
#[test]
|
||||
fn spsc() {
|
||||
const COUNT: usize = if cfg!(miri) { 100 } else { 100_000 };
|
||||
|
@ -86,6 +107,7 @@ fn spsc() {
|
|||
.run();
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
#[test]
|
||||
fn mpmc() {
|
||||
const COUNT: usize = if cfg!(miri) { 100 } else { 25_000 };
|
||||
|
@ -117,6 +139,7 @@ fn mpmc() {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
#[test]
|
||||
fn drops() {
|
||||
const RUNS: usize = if cfg!(miri) { 20 } else { 100 };
|
||||
|
@ -165,6 +188,7 @@ fn drops() {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
#[test]
|
||||
fn linearizable() {
|
||||
const COUNT: usize = if cfg!(miri) { 500 } else { 25_000 };
|
||||
|
@ -173,11 +197,93 @@ fn linearizable() {
|
|||
let q = ConcurrentQueue::bounded(1);
|
||||
|
||||
Parallel::new()
|
||||
.each(0..THREADS, |_| {
|
||||
.each(0..THREADS / 2, |_| {
|
||||
for _ in 0..COUNT {
|
||||
while q.push(0).is_err() {}
|
||||
q.pop().unwrap();
|
||||
}
|
||||
})
|
||||
.each(0..THREADS / 2, |_| {
|
||||
for _ in 0..COUNT {
|
||||
if q.force_push(0).unwrap().is_none() {
|
||||
q.pop().unwrap();
|
||||
}
|
||||
}
|
||||
})
|
||||
.run();
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
#[test]
|
||||
fn spsc_ring_buffer() {
|
||||
const COUNT: usize = if cfg!(miri) { 200 } else { 100_000 };
|
||||
|
||||
let t = AtomicUsize::new(1);
|
||||
let q = ConcurrentQueue::<usize>::bounded(1);
|
||||
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
|
||||
|
||||
Parallel::new()
|
||||
.add(|| loop {
|
||||
match t.load(Ordering::SeqCst) {
|
||||
0 if q.is_empty() => break,
|
||||
|
||||
_ => {
|
||||
while let Ok(n) = q.pop() {
|
||||
v[n].fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.add(|| {
|
||||
for i in 0..COUNT {
|
||||
if let Ok(Some(n)) = q.force_push(i) {
|
||||
v[n].fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
t.fetch_sub(1, Ordering::SeqCst);
|
||||
})
|
||||
.run();
|
||||
|
||||
for c in v {
|
||||
assert_eq!(c.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
#[test]
|
||||
fn mpmc_ring_buffer() {
|
||||
const COUNT: usize = if cfg!(miri) { 100 } else { 25_000 };
|
||||
const THREADS: usize = 4;
|
||||
|
||||
let t = AtomicUsize::new(THREADS);
|
||||
let q = ConcurrentQueue::<usize>::bounded(1);
|
||||
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
|
||||
|
||||
Parallel::new()
|
||||
.each(0..THREADS, |_| loop {
|
||||
match t.load(Ordering::SeqCst) {
|
||||
0 if q.is_empty() => break,
|
||||
|
||||
_ => {
|
||||
while let Ok(n) = q.pop() {
|
||||
v[n].fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.each(0..THREADS, |_| {
|
||||
for i in 0..COUNT {
|
||||
if let Ok(Some(n)) = q.force_push(i) {
|
||||
v[n].fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
t.fetch_sub(1, Ordering::SeqCst);
|
||||
})
|
||||
.run();
|
||||
|
||||
for c in v {
|
||||
assert_eq!(c.load(Ordering::SeqCst), THREADS);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,14 @@
|
|||
#![allow(clippy::bool_assert_comparison)]
|
||||
|
||||
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
use easy_parallel::Parallel;
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
|
||||
use easy_parallel::Parallel;
|
||||
#[cfg(target_family = "wasm")]
|
||||
use wasm_bindgen_test::wasm_bindgen_test as test;
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
|
@ -69,6 +74,7 @@ fn close() {
|
|||
assert_eq!(q.pop(), Err(PopError::Closed));
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
#[test]
|
||||
fn spsc() {
|
||||
const COUNT: usize = if cfg!(miri) { 100 } else { 100_000 };
|
||||
|
@ -95,6 +101,7 @@ fn spsc() {
|
|||
.run();
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
#[test]
|
||||
fn mpmc() {
|
||||
const COUNT: usize = if cfg!(miri) { 100 } else { 25_000 };
|
||||
|
@ -126,6 +133,7 @@ fn mpmc() {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
#[test]
|
||||
fn drops() {
|
||||
const RUNS: usize = if cfg!(miri) { 20 } else { 100 };
|
||||
|
|
Loading…
Reference in New Issue