Compare commits

...

73 Commits

Author SHA1 Message Date
John Nunley e874f701f8
v2.5.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-04-26 07:25:42 -07:00
John Nunley 5b74dc8acc
Merge fetch_and andthe prev_value check
cc https://github.com/smol-rs/concurrent-queue/pull/58#discussion_r1574686897

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-25 22:53:55 -07:00
John Nunley 05e7bff8e5
tests: Add more tests for force_push
This commit adds more tests for the force_push functionality. These
tests are copied from the corresponding crossbeam implementation.

We also add a clone of the "spsc" test that uses force_push.

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-22 19:01:51 -07:00
John Nunley 576965a8a5 tests: Move test to bounded.rs
Previously the force_push test was placed in unbounded.rs by accident

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-13 23:40:59 -07:00
James Liu 89a64f8c3b
feat: Make unbounded a const function
This PR makes `ConcurrentQueue::unbounded` a const function. It'd be great if `bounded` could be `const` as well, but this would likely require static memory allocation support in const functions, which is currently not allowed by the compiler. This would enable https://github.com/smol-rs/async-executor/pull/112 to be directly constructable in a const context (i.e. static/thread_local variable initialization without OnceLock). It might also allow unbounded `async_channel`s to be constructed in a similar context.

Co-authored-by: Taiki Endo <te316e89@gmail.com>
2024-04-13 09:01:26 -07:00
John Nunley 59e93fc952
ci: Test loom under no-default-features
This commit adds loom tests to CI with --no-default-features, then also
fixes a compile error that was introduced in a new version of loom.

Closes #64
2024-03-31 11:09:52 -07:00
John Nunley c407467c20
feat: Add an overflow push method
In some cases it is desired to have a "lossy" queue for data. Such as an
event queue where more recent events should be prioritized over older
ones, where infinite storage is impractical. This commit adds a method
called "force_push" which enables this usage.

Bounded queue code is partially derived from the following commit:
bd75c3c45e

cc smol-rs/async-channel#44
2024-03-30 16:38:05 -07:00
John Nunley ff53a68d8c ci: Add loom test to CI
I'm unsure why this wasn't added to begin with. This adds Loom testing
to the CI with a low number of max pre-emptions, in order to avoid
making the test take forever.

cc https://github.com/smol-rs/event-listener/pull/126#issue-2214269916

Signed-off-by: John Nunley <dev@notgull.net>
2024-03-30 07:40:49 -07:00
Taiki Endo d49453323c Remove dead code
```
error: method `with` is never used
  --> src/sync.rs:65:12
   |
62 |     pub(crate) trait UnsafeCellExt {
   |                      ------------- method in this trait
...
65 |         fn with<R, F>(&self, f: F) -> R
   |            ^^^^
   |
   = note: `-D dead-code` implied by `-D warnings`
   = help: to override `-D warnings` add `#[allow(dead_code)]`
```
2024-03-06 02:19:43 +09:00
Taiki Endo 93ee058b7f Migrate to Rust 2021 2024-01-07 01:57:36 +09:00
Taiki Endo e7c2115f62 Relax MSRV to 1.60
https://github.com/crossbeam-rs/crossbeam/pull/1056
2024-01-07 01:57:36 +09:00
Taiki Endo 9f6f5d3188 ci: Use cargo-hack's --rust-version flag for msrv check
This respects rust-version field in Cargo.toml, so it removes the need
to manage MSRV in both the CI file and Cargo.toml.
2024-01-07 01:57:36 +09:00
dependabot[bot] 3b08720796
Update criterion requirement from 0.4.0 to 0.5 (#42)
Signed-off-by: dependabot[bot] <support@github.com>
2024-01-07 01:51:40 +09:00
John Nunley 71302ab09c
tests: Add WASM testing
This commit ensures this crate builds and works on WASM.

Signed-off-by: John Nunley <dev@notgull.net>
2023-12-16 11:20:15 -08:00
John Nunley 79b9292d10
m: Bump MSRV to 1.61
Due to crossbeam-rs/crossbeam#1037

Signed-off-by: John Nunley <dev@notgull.net>
2023-12-16 09:04:58 -08:00
John Nunley ba51f6e942
v2.4.0
Signed-off-by: John Nunley <dev@notgull.net>
2023-12-02 10:59:27 -08:00
Irine c0e1098aa0
bugfix: Remove the heap allocation from non-single queues 2023-11-15 18:08:17 -08:00
John Nunley 22b5e83c4f
v2.3.0
Signed-off-by: John Nunley <dev@notgull.net>
2023-09-25 10:22:00 -07:00
Taiki Endo 381d6360e1 Update actions/checkout action to v4 2023-09-10 18:25:36 +09:00
MrEconomical 04f3a1eecc
feat: Implement UnwindSafe on core (#49) 2023-08-25 21:15:31 +09:00
Taiki Endo cbdf9e88e1
bugfix: Use inline assembly in full_fence
This commit bumps the MSRV to 1.59
2023-08-13 14:33:40 -07:00
dependabot[bot] 2d309371f8
deps: Update loom requirement from 0.6 to 0.7
Updates the requirements on [loom](https://github.com/tokio-rs/loom) to permit the latest version.
- [Release notes](https://github.com/tokio-rs/loom/releases)
- [Changelog](https://github.com/tokio-rs/loom/blob/master/CHANGELOG.md)
- [Commits](https://github.com/tokio-rs/loom/compare/v0.6.0...v0.7.0)

---
updated-dependencies:
- dependency-name: loom
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-07 16:33:30 -07:00
John Nunley b660d795a4
Add smol-rs logo (#45) 2023-07-17 14:33:21 +09:00
dependabot[bot] c1d2c77b1c Update loom requirement from 0.5 to 0.6
Updates the requirements on [loom](https://github.com/tokio-rs/loom) to permit the latest version.
- [Release notes](https://github.com/tokio-rs/loom/releases)
- [Changelog](https://github.com/tokio-rs/loom/blob/master/CHANGELOG.md)
- [Commits](https://github.com/tokio-rs/loom/compare/v0.5.0...v0.6.0)

---
updated-dependencies:
- dependency-name: loom
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-07-04 12:24:37 +09:00
dependabot[bot] 2cd3dbad14 Update fastrand requirement from 1.3.3 to 2.0.0
Updates the requirements on [fastrand](https://github.com/smol-rs/fastrand) to permit the latest version.
- [Release notes](https://github.com/smol-rs/fastrand/releases)
- [Changelog](https://github.com/smol-rs/fastrand/blob/master/CHANGELOG.md)
- [Commits](https://github.com/smol-rs/fastrand/compare/v1.3.3...v2.0.0)

---
updated-dependencies:
- dependency-name: fastrand
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-06-11 16:08:55 +09:00
John Nunley e3fef3f0ae
v2.2.0 (#38) 2023-04-07 10:53:32 -07:00
Taiki Endo 07c3e4d5b9 Minimize GITHUB_TOKEN permissions
Refs: https://github.blog/changelog/2021-04-20-github-actions-control-permissions-for-github_token
2023-03-26 16:30:28 +09:00
Taiki Endo d66e007b3a Set CARGO_NET_GIT_FETCH_WITH_CLI=true in CI 2023-03-26 16:30:05 +09:00
John Nunley f877490dcb
feat: Add the try_iter method (#36) 2023-03-06 19:22:48 -08:00
Taiki Endo a96abb3467 Release 2.1.0 2023-01-15 23:36:42 +09:00
Taiki Endo db25fe1573 Update CI config 2023-01-15 23:36:42 +09:00
Taiki Endo b5463b2f5e Update portable-atomic to 1.0 2023-01-15 23:14:59 +09:00
dependabot[bot] 54e7d94998 Update criterion requirement from 0.3.4 to 0.4.0
Updates the requirements on [criterion](https://github.com/bheisler/criterion.rs) to permit the latest version.
- [Release notes](https://github.com/bheisler/criterion.rs/releases)
- [Changelog](https://github.com/bheisler/criterion.rs/blob/master/CHANGELOG.md)
- [Commits](https://github.com/bheisler/criterion.rs/compare/0.3.4...0.4.0)

---
updated-dependencies:
- dependency-name: criterion
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-12-28 12:42:51 +09:00
Taiki Endo 3383b2125a Enable dependabot update for Rust 2022-12-28 12:26:33 +09:00
Taiki Endo 1ff6001c68 Clean up CI config 2022-12-28 12:26:33 +09:00
Taiki Endo 6be67b375d Fix clippy::bool_to_int_with_if warning
```
warning: boolean to int conversion using if
  --> src/single.rs:93:9
   |
93 | /         if self.state.load(Ordering::SeqCst) & PUSHED == 0 {
94 | |             0
95 | |         } else {
96 | |             1
97 | |         }
   | |_________^ help: replace with from: `usize::from(self.state.load(Ordering::SeqCst) & PUSHED != 0)`
   |
   = note: `(self.state.load(Ordering::SeqCst) & PUSHED != 0) as usize` or `(self.state.load(Ordering::SeqCst) & PUSHED != 0).into()` can also be valid options
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_to_int_with_if
   = note: `#[warn(clippy::bool_to_int_with_if)]` on by default
```
2022-12-16 21:08:10 +09:00
Taiki Endo d14af85906 Remove msrv field from .clippy.toml
Since Rust 1.64, Clippy respects `rust-version` field in Cargo.toml.
rust-lang/rust@b776fb8
2022-11-27 15:59:17 +09:00
John Nunley 3d653aac40
Merge pull request #29 from smol-rs/2.0.0 2022-11-08 14:59:05 -08:00
jtnunley 34334a15dd v2.0.0 2022-11-08 09:57:13 -08:00
John Nunley 8c42b8dd9d
Add alternate implementations of synchronization primitives (#27) 2022-10-07 06:48:10 -07:00
John Nunley 06c99537c2
replace cache_padded with crossbeam-utils (#26) 2022-09-06 09:22:39 -07:00
John Nunley 9553e6fa92
Add benchmarks for bounded queues (#24) 2022-08-25 02:09:34 +09:00
John Nunley d3bf5a5424
Make it so this crate can be `no_std` (#22) 2022-08-25 02:09:13 +09:00
Taiki Endo 60354f9ea4 Format benches/bench.rs with rustfmt 2022-08-24 22:44:10 +09:00
Greg Morenz f8eca83b5f Add benchmarks for unbounded queues 2022-08-24 22:36:42 +09:00
Taiki Endo 812a733720 Release 1.2.4 2022-07-27 11:25:01 +09:00
Taiki Endo 434e8e9b2d Run more tests with Miri 2022-07-22 00:32:17 +09:00
Taiki Endo 9de877f9e0 Do not copy data before dropping 2022-07-22 00:03:27 +09:00
Taiki Endo d1218a02e0 Use get_mut instead of atomic load in Drop impls 2022-07-22 00:03:27 +09:00
Taiki Endo d61005fc79 Do not use x86 specific fence on Miri 2022-07-20 23:18:24 +09:00
Taiki Endo bc4f426618 Use compiler_fence in full_fence on x86 2022-07-20 23:18:24 +09:00
Taiki Endo 194cf156b1 Revert "Fix fence on non-x86 arch and miri (#16)"
This reverts commit 54df36a543.
2022-07-20 23:18:24 +09:00
Taiki Endo b8363bab2d Release 1.2.3 2022-07-17 22:35:42 +09:00
Taiki Endo 8497007235 Ignore clippy::bool_assert_comparison lint in tests
```
warning: used `assert_eq!` with a literal bool
  --> tests/bounded.rs:37:5
   |
37 |     assert_eq!(q.is_empty(), true);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = note: `#[warn(clippy::bool_assert_comparison)]` on by default
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison

warning: used `assert_eq!` with a literal bool
  --> tests/bounded.rs:38:5
   |
38 |     assert_eq!(q.is_full(), false);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison

warning: used `assert_eq!` with a literal bool
  --> tests/bounded.rs:43:5
   |
43 |     assert_eq!(q.is_empty(), false);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison

warning: used `assert_eq!` with a literal bool
  --> tests/bounded.rs:44:5
   |
44 |     assert_eq!(q.is_full(), false);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison

warning: used `assert_eq!` with a literal bool
  --> tests/bounded.rs:49:5
   |
49 |     assert_eq!(q.is_empty(), false);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison

warning: used `assert_eq!` with a literal bool
  --> tests/bounded.rs:50:5
   |
50 |     assert_eq!(q.is_full(), true);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison

warning: used `assert_eq!` with a literal bool
  --> tests/bounded.rs:55:5
   |
55 |     assert_eq!(q.is_empty(), false);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison

warning: used `assert_eq!` with a literal bool
  --> tests/bounded.rs:56:5
   |
56 |     assert_eq!(q.is_full(), false);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison

warning: used `assert_eq!` with a literal bool
  --> tests/unbounded.rs:22:5
   |
22 |     assert_eq!(q.is_empty(), true);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = note: `#[warn(clippy::bool_assert_comparison)]` on by default
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison

warning: used `assert_eq!` with a literal bool
  --> tests/unbounded.rs:27:5
   |
27 |     assert_eq!(q.is_empty(), false);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison

warning: used `assert_eq!` with a literal bool
  --> tests/unbounded.rs:32:5
   |
32 |     assert_eq!(q.is_empty(), true);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison

warning: used `assert_eq!` with a literal bool
  --> tests/single.rs:29:5
   |
29 |     assert_eq!(q.is_empty(), true);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = note: `#[warn(clippy::bool_assert_comparison)]` on by default
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison

warning: used `assert_eq!` with a literal bool
  --> tests/single.rs:30:5
   |
30 |     assert_eq!(q.is_full(), false);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison

warning: used `assert_eq!` with a literal bool
  --> tests/single.rs:35:5
   |
35 |     assert_eq!(q.is_empty(), false);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison

warning: used `assert_eq!` with a literal bool
  --> tests/single.rs:36:5
   |
36 |     assert_eq!(q.is_full(), true);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison

warning: used `assert_eq!` with a literal bool
  --> tests/single.rs:41:5
   |
41 |     assert_eq!(q.is_empty(), true);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison

warning: used `assert_eq!` with a literal bool
  --> tests/single.rs:42:5
   |
42 |     assert_eq!(q.is_full(), false);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `assert!(..)`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bool_assert_comparison
```
2022-07-17 22:34:00 +09:00
Taiki Endo 6b7a6a0584 Apply clippy to all targets 2022-07-17 22:34:00 +09:00
Taiki Endo 54df36a543
Fix fence on non-x86 arch and miri (#16) 2022-07-17 22:33:35 +09:00
Taiki Endo 83ec31aac4 Update CI config 2022-07-03 01:13:27 +09:00
Taiki Endo 37585c0eb6 Update MIRIFLAGS 2022-05-01 15:06:48 +09:00
Taiki Endo 8d3e3a860e Update actions/checkout action to v3 2022-05-01 14:12:02 +09:00
Taiki Endo 29dc872e05 Create GitHub release automatically 2022-01-08 18:43:05 +09:00
Taiki Endo 407b9e2942 Clean up CI config 2022-01-08 18:09:30 +09:00
Taiki Endo b6f39c7aa8 Run Miri on CI 2022-01-08 18:09:30 +09:00
Taiki Endo edb1210403
Merge pull request #10 from smol-rs/readme
Remove readme field from Cargo.toml
2021-02-14 19:51:43 +09:00
Taiki Endo c97ffb7296 Remove readme field from Cargo.toml 2021-02-14 19:44:08 +09:00
Taiki Endo bb80164b90
Merge pull request #9 from smol-rs/badge
Update license badge to match Cargo.toml
2021-02-14 13:53:25 +09:00
Taiki Endo 4a7acf9b9e Update license badge to match Cargo.toml 2021-02-14 13:37:47 +09:00
Taiki Endo 858f4f0607
Merge pull request #8 from taiki-e/url
Update URLs
2020-12-26 23:59:17 +09:00
Taiki Endo 8ea5d6f404 Update URLs 2020-12-26 23:47:13 +09:00
Taiki Endo 52573f74f2
Merge pull request #7 from taiki-e/compare_and_swap
Replace deprecated compare_and_swap with compare_exchange
2020-12-24 21:32:45 +09:00
Taiki Endo cefe5efc42 Replace deprecated compare_and_swap with compare_exchange 2020-12-24 21:28:40 +09:00
Taiki Endo b5bf0b250e
Merge pull request #6 from taiki-e/ci
Fix CI
2020-12-24 18:35:34 +09:00
Taiki Endo 3aece12ce3 Fix CI 2020-12-24 18:33:15 +09:00
Stjepan Glavina 444cae15a5
. 2020-11-30 12:27:08 +01:00
20 changed files with 1575 additions and 284 deletions

1
.github/FUNDING.yml vendored
View File

@ -1 +0,0 @@
github: stjepang

9
.github/dependabot.yml vendored Normal file
View File

@ -0,0 +1,9 @@
version: 2
updates:
- package-ecosystem: cargo
directory: /
schedule:
interval: weekly
commit-message:
prefix: ''
labels: []

View File

@ -1,51 +0,0 @@
name: Build and test
on:
push:
branches:
- master
pull_request:
jobs:
build_and_test:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest]
rust: [nightly, beta, stable]
steps:
- uses: actions/checkout@v2
- name: Set current week of the year in environnement
if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macOS')
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
- name: Set current week of the year in environnement
if: startsWith(matrix.os, 'windows')
run: echo "::set-env name=CURRENT_WEEK::$(Get-Date -UFormat %V)"
- name: Install latest ${{ matrix.rust }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
profile: minimal
override: true
- name: Run cargo check
uses: actions-rs/cargo@v1
with:
command: check
args: --all --bins --examples --tests --all-features
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
if: startsWith(matrix.rust, 'nightly')
uses: actions-rs/cargo@v1
with:
command: check
args: -Z features=dev_dep
- name: Run cargo test
uses: actions-rs/cargo@v1
with:
command: test

127
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,127 @@
name: CI
permissions:
contents: read
on:
pull_request:
push:
branches:
- master
schedule:
- cron: '0 2 * * 0'
env:
CARGO_INCREMENTAL: 0
CARGO_NET_GIT_FETCH_WITH_CLI: true
CARGO_NET_RETRY: 10
CARGO_TERM_COLOR: always
RUST_BACKTRACE: 1
RUSTFLAGS: -D warnings
RUSTDOCFLAGS: -D warnings
RUSTUP_MAX_RETRIES: 10
defaults:
run:
shell: bash
jobs:
test:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest]
rust: [nightly, beta, stable]
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- run: rustup target add thumbv7m-none-eabi
- run: rustup target add wasm32-unknown-unknown
- name: Install cargo-hack and wasm-pack
uses: taiki-e/install-action@v2
with:
tool: cargo-hack,wasm-pack
- run: cargo build --all --all-features --all-targets
- run: cargo hack build --feature-powerset --no-dev-deps
- run: cargo hack build --feature-powerset --no-dev-deps --target thumbv7m-none-eabi --skip std,default
- run: cargo test
- run: cargo test --features portable-atomic
- name: Run with Loom enabled
run: cargo test --test loom --features loom
env:
RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg loom
LOOM_MAX_PREEMPTIONS: 2
- name: Check WASM tests
run: cargo build --target wasm32-unknown-unknown
- run: wasm-pack test --node
- run: wasm-pack test --node --no-default-features
- run: wasm-pack test --node --no-default-features --features portable-atomic
msrv:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install cargo-hack
uses: taiki-e/install-action@cargo-hack
- run: cargo hack build --rust-version
- run: cargo hack build --features portable-atomic --rust-version
- run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps --rust-version
clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- run: cargo clippy --all-features --all-targets
fmt:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- run: cargo fmt --all --check
miri:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup toolchain install nightly --component miri && rustup default nightly
- run: cargo miri test
env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation
RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout
loom:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- name: Loom tests
run: cargo test --release --test loom --features loom
env:
RUSTFLAGS: "--cfg=loom"
LOOM_MAX_PREEMPTIONS: 4
- name: Loom tests without default features
run: cargo test --release --test loom --features loom --no-default-features
env:
RUSTFLAGS: "--cfg=loom"
LOOM_MAX_PREEMPTIONS: 4
security_audit:
permissions:
checks: write
contents: read
issues: write
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
# https://github.com/rustsec/audit-check/issues/2
- uses: rustsec/audit-check@master
with:
token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -1,26 +0,0 @@
name: Lint
on:
push:
branches:
- master
pull_request:
jobs:
clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set current week of the year in environnement
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
components: clippy
- uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --all-features -- -W clippy::all

22
.github/workflows/release.yml vendored Normal file
View File

@ -0,0 +1,22 @@
name: Release
permissions:
contents: write
on:
push:
tags:
- v[0-9]+.*
jobs:
create-release:
if: github.repository_owner == 'smol-rs'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: taiki-e/create-gh-release-action@v1
with:
changelog: CHANGELOG.md
branch: master
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@ -1,20 +0,0 @@
name: Security audit
on:
push:
branches:
- master
pull_request:
jobs:
security_audit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set current week of the year in environnement
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
- uses: actions-rs/audit-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -1,3 +1,44 @@
# Version 2.5.0
- Add a `force_push` method that can be used to add an element to the queue by displacing another. (#58)
- Make `ConcurrentQueue::unbounded()` into a `const` function. (#67)
- Fix a compilation error in the Loom implementation. (#65)
# Version 2.4.0
- Remove unnecessary heap allocations from inside of the `ConcurrentQueue` type. (#53)
# Version 2.3.0
- Implement `UnwindSafe` without libstd. (#49)
- Bump `fastrand` to `v2.0.0`. (#43)
- Use inline assembly in the `full_fence` funtion. (#47)
# Version 2.2.0
- Add the try_iter method. (#36)
# Version 2.1.0
- Update `portable-atomic` to 1.0. (#33)
# Version 2.0.0
- Add support for the `portable-atomic` and `loom` crates. (#27)
- **Breaking:** Add an `std` feature that can be disabled to use this crate on `no_std` platforms. (#22)
- Replace usage of `cache-padded` with `crossbeam-utils`. (#26)
# Version 1.2.4
- Fix fence on x86 and miri. (#18)
- Revert 1.2.3. (#18)
# Version 1.2.3
**Note:** This release has been yanked, see #17 for details.
- Fix fence on non-x86 architectures and miri. (#16)
# Version 1.2.2
- Add a special, efficient `bounded(1)` implementation.

View File

@ -1,20 +1,47 @@
[package]
name = "concurrent-queue"
version = "1.2.2"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
# When publishing a new version:
# - Update CHANGELOG.md
# - Create "v2.x.y" git tag
version = "2.5.0"
authors = [
"Stjepan Glavina <stjepang@gmail.com>",
"Taiki Endo <te316e89@gmail.com>",
"John Nunley <dev@notgull.net>"
]
edition = "2021"
rust-version = "1.60"
description = "Concurrent multi-producer multi-consumer queue"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/stjepang/concurrent-queue"
homepage = "https://github.com/stjepang/concurrent-queue"
documentation = "https://docs.rs/concurrent-queue"
repository = "https://github.com/smol-rs/concurrent-queue"
keywords = ["channel", "mpmc", "spsc", "spmc", "mpsc"]
categories = ["concurrency"]
readme = "README.md"
exclude = ["/.*"]
[lib]
bench = false
[dependencies]
cache-padded = "1.1.1"
crossbeam-utils = { version = "0.8.11", default-features = false }
portable-atomic = { version = "1", default-features = false, optional = true }
# Enables loom testing. This feature is permanently unstable and the API may
# change at any time.
[target.'cfg(loom)'.dependencies]
loom = { version = "0.7", optional = true }
[[bench]]
name = "bench"
harness = false
[dev-dependencies]
criterion = { version = "0.5", features = ["cargo_bench_support"], default-features = false }
easy-parallel = "3.1.0"
fastrand = "1.3.3"
fastrand = "2.0.0"
[target.'cfg(target_family = "wasm")'.dev-dependencies]
wasm-bindgen-test = "0.3"
[features]
default = ["std"]
std = []

View File

@ -1,9 +1,9 @@
# concurrent-queue
[![Build](https://github.com/stjepang/concurrent-queue/workflows/Build%20and%20test/badge.svg)](
https://github.com/stjepang/concurrent-queue/actions)
[![License](https://img.shields.io/badge/license-MIT%2FApache--2.0-blue.svg)](
https://github.com/stjepang/concurrent-queue)
[![Build](https://github.com/smol-rs/concurrent-queue/workflows/Build%20and%20test/badge.svg)](
https://github.com/smol-rs/concurrent-queue/actions)
[![License](https://img.shields.io/badge/license-Apache--2.0_OR_MIT-blue.svg)](
https://github.com/smol-rs/concurrent-queue)
[![Cargo](https://img.shields.io/crates/v/concurrent-queue.svg)](
https://crates.io/crates/concurrent-queue)
[![Documentation](https://docs.rs/concurrent-queue/badge.svg)](

93
benches/bench.rs Normal file
View File

@ -0,0 +1,93 @@
use std::{any::type_name, fmt::Debug};
use concurrent_queue::{ConcurrentQueue, PopError};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use easy_parallel::Parallel;
const COUNT: usize = 100_000;
const THREADS: usize = 7;
fn spsc<T: Default + std::fmt::Debug + Send>(recv: &ConcurrentQueue<T>, send: &ConcurrentQueue<T>) {
Parallel::new()
.add(|| loop {
match recv.pop() {
Ok(_) => (),
Err(PopError::Empty) => (),
Err(PopError::Closed) => break,
}
})
.add(|| {
for _ in 0..COUNT {
send.push(T::default()).unwrap();
}
send.close();
})
.run();
}
fn mpsc<T: Default + std::fmt::Debug + Send>(recv: &ConcurrentQueue<T>, send: &ConcurrentQueue<T>) {
Parallel::new()
.each(0..THREADS, |_| {
for _ in 0..COUNT {
send.push(T::default()).unwrap();
}
})
.add(|| {
let mut recieved = 0;
while recieved < THREADS * COUNT {
match recv.pop() {
Ok(_) => recieved += 1,
Err(PopError::Empty) => (),
Err(PopError::Closed) => unreachable!(),
}
}
})
.run();
}
fn single_thread<T: Default + std::fmt::Debug>(
recv: &ConcurrentQueue<T>,
send: &ConcurrentQueue<T>,
) {
for _ in 0..COUNT {
send.push(T::default()).unwrap();
}
for _ in 0..COUNT {
recv.pop().unwrap();
}
}
// Because we can't pass generic functions as const parameters.
macro_rules! bench_all(
($name:ident, $f:ident) => {
fn $name(c: &mut Criterion) {
fn helper<T: Default + Debug + Send>(c: &mut Criterion) {
let name = format!("unbounded_{}_{}", stringify!($f), type_name::<T>());
c.bench_function(&name, |b| b.iter(|| {
let q = ConcurrentQueue::unbounded();
$f::<T>(black_box(&q), black_box(&q));
}));
let name = format!("bounded_{}_{}", stringify!($f), type_name::<T>());
c.bench_function(&name, |b| b.iter(|| {
let q = ConcurrentQueue::bounded(THREADS * COUNT);
$f::<T>(black_box(&q), black_box(&q));
}));
}
helper::<u8>(c);
helper::<u16>(c);
helper::<u32>(c);
helper::<u64>(c);
helper::<u128>(c);
}
}
);
bench_all!(bench_spsc, spsc);
bench_all!(bench_mpsc, mpsc);
bench_all!(bench_single_thread, single_thread);
criterion_group!(generic_group, bench_single_thread, bench_spsc, bench_mpsc);
criterion_main!(generic_group);

View File

@ -1,11 +1,13 @@
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use alloc::{boxed::Box, vec::Vec};
use core::mem::MaybeUninit;
use cache_padded::CachePadded;
use crossbeam_utils::CachePadded;
use crate::{PopError, PushError};
use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::cell::UnsafeCell;
#[allow(unused_imports)]
use crate::sync::prelude::*;
use crate::{busy_wait, ForcePushError, PopError, PushError};
/// A slot in a queue.
struct Slot<T> {
@ -81,6 +83,74 @@ impl<T> Bounded<T> {
/// Attempts to push an item into the queue.
pub fn push(&self, value: T) -> Result<(), PushError<T>> {
self.push_or_else(value, |value, tail, _, _| {
let head = self.head.load(Ordering::Relaxed);
// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the queue is full.
Err(PushError::Full(value))
} else {
Ok(value)
}
})
}
/// Pushes an item into the queue, displacing another item if needed.
pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
let result = self.push_or_else(value, |value, tail, new_tail, slot| {
let head = tail.wrapping_sub(self.one_lap);
let new_head = new_tail.wrapping_sub(self.one_lap);
// Try to move the head.
if self
.head
.compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
// Move the tail.
self.tail.store(new_tail, Ordering::SeqCst);
// Swap out the old value.
// SAFETY: We know this is initialized, since it's covered by the current queue.
let old = unsafe {
slot.value
.with_mut(|slot| slot.replace(MaybeUninit::new(value)).assume_init())
};
// Update the stamp.
slot.stamp.store(tail + 1, Ordering::Release);
// Return a PushError.
Err(PushError::Full(old))
} else {
Ok(value)
}
});
match result {
Ok(()) => Ok(None),
Err(PushError::Full(old_value)) => Ok(Some(old_value)),
Err(PushError::Closed(value)) => Err(ForcePushError(value)),
}
}
/// Attempts to push an item into the queue, running a closure on failure.
///
/// `fail` is run when there is no more room left in the tail of the queue. The parameters of
/// this function are as follows:
///
/// - The item that failed to push.
/// - The value of `self.tail` before the new value would be inserted.
/// - The value of `self.tail` after the new value would be inserted.
/// - The slot that we attempted to push into.
///
/// If `fail` returns `Ok(val)`, we will try pushing `val` to the head of the queue. Otherwise,
/// this function will return the error.
fn push_or_else<F>(&self, mut value: T, mut fail: F) -> Result<(), PushError<T>>
where
F: FnMut(T, usize, usize, &Slot<T>) -> Result<T, PushError<T>>,
{
let mut tail = self.tail.load(Ordering::Relaxed);
loop {
@ -93,22 +163,23 @@ impl<T> Bounded<T> {
let index = tail & (self.mark_bit - 1);
let lap = tail & !(self.one_lap - 1);
// Calculate the new location of the tail.
let new_tail = if index + 1 < self.buffer.len() {
// Same lap, incremented index.
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
lap.wrapping_add(self.one_lap)
};
// Inspect the corresponding slot.
let slot = &self.buffer[index];
let stamp = slot.stamp.load(Ordering::Acquire);
// If the tail and the stamp match, we may attempt to push.
if tail == stamp {
let new_tail = if index + 1 < self.buffer.len() {
// Same lap, incremented index.
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
lap.wrapping_add(self.one_lap)
};
// Try moving the tail.
match self.tail.compare_exchange_weak(
tail,
@ -118,9 +189,9 @@ impl<T> Bounded<T> {
) {
Ok(_) => {
// Write the value into the slot and update the stamp.
unsafe {
slot.value.get().write(MaybeUninit::new(value));
}
slot.value.with_mut(|slot| unsafe {
slot.write(MaybeUninit::new(value));
});
slot.stamp.store(tail + 1, Ordering::Release);
return Ok(());
}
@ -130,18 +201,18 @@ impl<T> Bounded<T> {
}
} else if stamp.wrapping_add(self.one_lap) == tail + 1 {
crate::full_fence();
let head = self.head.load(Ordering::Relaxed);
// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the queue is full.
return Err(PushError::Full(value));
}
// We've failed to push; run our failure closure.
value = fail(value, tail, new_tail, slot)?;
// Loom complains if there isn't an explicit busy wait here.
#[cfg(loom)]
busy_wait();
tail = self.tail.load(Ordering::Relaxed);
} else {
// Yield because we need to wait for the stamp to get updated.
thread::yield_now();
busy_wait();
tail = self.tail.load(Ordering::Relaxed);
}
}
@ -181,7 +252,9 @@ impl<T> Bounded<T> {
) {
Ok(_) => {
// Read the value from the slot and update the stamp.
let value = unsafe { slot.value.get().read().assume_init() };
let value = slot
.value
.with_mut(|slot| unsafe { slot.read().assume_init() });
slot.stamp
.store(head.wrapping_add(self.one_lap), Ordering::Release);
return Ok(value);
@ -204,10 +277,14 @@ impl<T> Bounded<T> {
}
}
// Loom complains if there isn't a busy-wait here.
#[cfg(loom)]
busy_wait();
head = self.head.load(Ordering::Relaxed);
} else {
// Yield because we need to wait for the stamp to get updated.
thread::yield_now();
busy_wait();
head = self.head.load(Ordering::Relaxed);
}
}
@ -284,23 +361,48 @@ impl<T> Bounded<T> {
impl<T> Drop for Bounded<T> {
fn drop(&mut self) {
// Get the index of the head.
let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
let Self {
head,
tail,
buffer,
mark_bit,
..
} = self;
// Loop over all slots that hold a value and drop them.
for i in 0..self.len() {
// Compute the index of the next slot holding a value.
let index = if hix + i < self.buffer.len() {
hix + i
} else {
hix + i - self.buffer.len()
};
let mark_bit = *mark_bit;
// Drop the value in the slot.
let slot = &self.buffer[index];
unsafe {
let value = slot.value.get().read().assume_init();
drop(value);
}
}
head.with_mut(|&mut head| {
tail.with_mut(|&mut tail| {
let hix = head & (mark_bit - 1);
let tix = tail & (mark_bit - 1);
let len = if hix < tix {
tix - hix
} else if hix > tix {
buffer.len() - hix + tix
} else if (tail & !mark_bit) == head {
0
} else {
buffer.len()
};
// Loop over all slots that hold a value and drop them.
for i in 0..len {
// Compute the index of the next slot holding a value.
let index = if hix + i < buffer.len() {
hix + i
} else {
hix + i - buffer.len()
};
// Drop the value in the slot.
let slot = &buffer[index];
slot.value.with_mut(|slot| unsafe {
let value = &mut *slot;
value.as_mut_ptr().drop_in_place();
});
}
});
});
}
}

View File

@ -24,25 +24,75 @@
//! assert_eq!(q.pop(), Ok(2));
//! ```
//!
//! # Features
//!
//! `concurrent-queue` uses an `std` default feature. With this feature enabled, this crate will
//! use [`std::thread::yield_now`] to avoid busy waiting in tight loops. However, with this
//! feature disabled, [`core::hint::spin_loop`] will be used instead. Disabling `std` will allow
//! this crate to be used on `no_std` platforms at the potential expense of more busy waiting.
//!
//! There is also a `portable-atomic` feature, which uses a polyfill from the
//! [`portable-atomic`] crate to provide atomic operations on platforms that do not support them.
//! See the [`README`] for the [`portable-atomic`] crate for more information on how to use it.
//! Note that even with this feature enabled, `concurrent-queue` still requires a global allocator
//! to be available. See the documentation for the [`std::alloc::GlobalAlloc`] trait for more
//! information.
//!
//! [Bounded]: `ConcurrentQueue::bounded()`
//! [Unbounded]: `ConcurrentQueue::unbounded()`
//! [closed]: `ConcurrentQueue::close()`
//! [`portable-atomic`]: https://crates.io/crates/portable-atomic
//! [`README`]: https://github.com/taiki-e/portable-atomic/blob/main/README.md#optional-cfg
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![no_std]
#![doc(
html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
#![doc(
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
extern crate alloc;
#[cfg(feature = "std")]
extern crate std;
use core::fmt;
use core::panic::{RefUnwindSafe, UnwindSafe};
use sync::atomic::{self, Ordering};
#[cfg(feature = "std")]
use std::error;
use std::fmt;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::sync::atomic::{self, AtomicUsize, Ordering};
use crate::bounded::Bounded;
use crate::single::Single;
use crate::sync::busy_wait;
use crate::unbounded::Unbounded;
mod bounded;
mod single;
mod unbounded;
mod sync;
/// Make the given function const if the given condition is true.
macro_rules! const_fn {
(
const_if: #[cfg($($cfg:tt)+)];
$(#[$($attr:tt)*])*
$vis:vis const fn $($rest:tt)*
) => {
#[cfg($($cfg)+)]
$(#[$($attr)*])*
$vis const fn $($rest)*
#[cfg(not($($cfg)+))]
$(#[$($attr)*])*
$vis fn $($rest)*
};
}
pub(crate) use const_fn;
/// A concurrent queue.
///
/// # Examples
@ -68,10 +118,11 @@ unsafe impl<T: Send> Sync for ConcurrentQueue<T> {}
impl<T> UnwindSafe for ConcurrentQueue<T> {}
impl<T> RefUnwindSafe for ConcurrentQueue<T> {}
#[allow(clippy::large_enum_variant)]
enum Inner<T> {
Single(Single<T>),
Bounded(Box<Bounded<T>>),
Unbounded(Box<Unbounded<T>>),
Bounded(Bounded<T>),
Unbounded(Unbounded<T>),
}
impl<T> ConcurrentQueue<T> {
@ -94,22 +145,25 @@ impl<T> ConcurrentQueue<T> {
if cap == 1 {
ConcurrentQueue(Inner::Single(Single::new()))
} else {
ConcurrentQueue(Inner::Bounded(Box::new(Bounded::new(cap))))
ConcurrentQueue(Inner::Bounded(Bounded::new(cap)))
}
}
/// Creates a new unbounded queue.
///
/// # Examples
///
/// ```
/// use concurrent_queue::ConcurrentQueue;
///
/// let q = ConcurrentQueue::<i32>::unbounded();
/// ```
pub fn unbounded() -> ConcurrentQueue<T> {
ConcurrentQueue(Inner::Unbounded(Box::new(Unbounded::new())))
}
const_fn!(
const_if: #[cfg(not(loom))];
/// Creates a new unbounded queue.
///
/// # Examples
///
/// ```
/// use concurrent_queue::ConcurrentQueue;
///
/// let q = ConcurrentQueue::<i32>::unbounded();
/// ```
pub const fn unbounded() -> ConcurrentQueue<T> {
ConcurrentQueue(Inner::Unbounded(Unbounded::new()))
}
);
/// Attempts to push an item into the queue.
///
@ -148,6 +202,54 @@ impl<T> ConcurrentQueue<T> {
}
}
/// Push an element into the queue, potentially displacing another element.
///
/// Attempts to push an element into the queue. If the queue is full, one item from the
/// queue is replaced with the provided item. The displaced item is returned as `Some(T)`.
/// If the queue is closed, an error is returned.
///
/// # Examples
///
/// ```
/// use concurrent_queue::{ConcurrentQueue, ForcePushError, PushError};
///
/// let q = ConcurrentQueue::bounded(3);
///
/// // We can push to the queue.
/// for i in 1..=3 {
/// assert_eq!(q.force_push(i), Ok(None));
/// }
///
/// // Push errors because the queue is now full.
/// assert_eq!(q.push(4), Err(PushError::Full(4)));
///
/// // Pushing a new value replaces the old ones.
/// assert_eq!(q.force_push(5), Ok(Some(1)));
/// assert_eq!(q.force_push(6), Ok(Some(2)));
///
/// // Close the queue to stop further pushes.
/// q.close();
///
/// // Pushing will return an error.
/// assert_eq!(q.force_push(7), Err(ForcePushError(7)));
///
/// // Popping items will return the force-pushed ones.
/// assert_eq!(q.pop(), Ok(3));
/// assert_eq!(q.pop(), Ok(5));
/// assert_eq!(q.pop(), Ok(6));
/// ```
pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
match &self.0 {
Inner::Single(q) => q.force_push(value),
Inner::Bounded(q) => q.force_push(value),
Inner::Unbounded(q) => match q.push(value) {
Ok(()) => Ok(None),
Err(PushError::Closed(value)) => Err(ForcePushError(value)),
Err(PushError::Full(_)) => unreachable!(),
},
}
}
/// Attempts to pop an item from the queue.
///
/// If the queue is empty, an error is returned.
@ -181,6 +283,35 @@ impl<T> ConcurrentQueue<T> {
}
}
/// Get an iterator over the items in the queue.
///
/// The iterator will continue until the queue is empty or closed. It will never block;
/// if the queue is empty, the iterator will return `None`. If new items are pushed into
/// the queue, the iterator may return `Some` in the future after returning `None`.
///
/// # Examples
///
/// ```
/// use concurrent_queue::ConcurrentQueue;
///
/// let q = ConcurrentQueue::bounded(5);
/// q.push(1).unwrap();
/// q.push(2).unwrap();
/// q.push(3).unwrap();
///
/// let mut iter = q.try_iter();
/// assert_eq!(iter.by_ref().sum::<i32>(), 6);
/// assert_eq!(iter.next(), None);
///
/// // Pushing more items will make them available to the iterator.
/// q.push(4).unwrap();
/// assert_eq!(iter.next(), Some(4));
/// assert_eq!(iter.next(), None);
/// ```
pub fn try_iter(&self) -> TryIter<'_, T> {
TryIter { queue: self }
}
/// Returns `true` if the queue is empty.
///
/// # Examples
@ -339,6 +470,31 @@ impl<T> fmt::Debug for ConcurrentQueue<T> {
}
}
/// An iterator that pops items from a [`ConcurrentQueue`].
///
/// This iterator will never block; it will return `None` once the queue has
/// been exhausted. Calling `next` after `None` may yield `Some(item)` if more items
/// are pushed to the queue.
#[must_use = "iterators are lazy and do nothing unless consumed"]
#[derive(Clone)]
pub struct TryIter<'a, T> {
queue: &'a ConcurrentQueue<T>,
}
impl<T> fmt::Debug for TryIter<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Iter").field(&self.queue).finish()
}
}
impl<T> Iterator for TryIter<'_, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.queue.pop().ok()
}
}
/// Error which occurs when popping from an empty queue.
#[derive(Clone, Copy, Eq, PartialEq)]
pub enum PopError {
@ -367,6 +523,7 @@ impl PopError {
}
}
#[cfg(feature = "std")]
impl error::Error for PopError {}
impl fmt::Debug for PopError {
@ -423,6 +580,7 @@ impl<T> PushError<T> {
}
}
#[cfg(feature = "std")]
impl<T: fmt::Debug> error::Error for PushError<T> {}
impl<T: fmt::Debug> fmt::Debug for PushError<T> {
@ -443,25 +601,60 @@ impl<T> fmt::Display for PushError<T> {
}
}
/// Error that occurs when force-pushing into a full queue.
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct ForcePushError<T>(pub T);
impl<T> ForcePushError<T> {
/// Return the inner value that failed to be force-pushed.
pub fn into_inner(self) -> T {
self.0
}
}
impl<T: fmt::Debug> fmt::Debug for ForcePushError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("ForcePushError").field(&self.0).finish()
}
}
impl<T> fmt::Display for ForcePushError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Closed")
}
}
#[cfg(feature = "std")]
impl<T: fmt::Debug> error::Error for ForcePushError<T> {}
/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
#[inline]
fn full_fence() {
if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
#[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), not(miri), not(loom)))]
{
use core::{arch::asm, cell::UnsafeCell};
// HACK(stjepang): On x86 architectures there are two different ways of executing
// a `SeqCst` fence.
//
// 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
// 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg` instruction.
// 2. A `lock <op>` instruction.
//
// Both instructions have the effect of a full barrier, but empirical benchmarks have shown
// that the second one is sometimes a bit faster.
//
// The ideal solution here would be to use inline assembly, but we're instead creating a
// temporary atomic variable and compare-and-exchanging its value. No sane compiler to
// x86 platforms is going to optimize this away.
let a = AtomicUsize::new(0);
a.compare_and_swap(0, 1, Ordering::SeqCst);
} else {
let a = UnsafeCell::new(0_usize);
// It is common to use `lock or` here, but when using a local variable, `lock not`, which
// does not change the flag, should be slightly more efficient.
// Refs: https://www.felixcloutier.com/x86/not
unsafe {
#[cfg(target_pointer_width = "64")]
asm!("lock not qword ptr [{0}]", in(reg) a.get(), options(nostack, preserves_flags));
#[cfg(target_pointer_width = "32")]
asm!("lock not dword ptr [{0:e}]", in(reg) a.get(), options(nostack, preserves_flags));
}
return;
}
#[allow(unreachable_code)]
{
atomic::fence(Ordering::SeqCst);
}
}

View File

@ -1,9 +1,11 @@
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use core::mem::MaybeUninit;
use core::ptr;
use crate::{PopError, PushError};
use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::cell::UnsafeCell;
#[allow(unused_imports)]
use crate::sync::prelude::*;
use crate::{busy_wait, ForcePushError, PopError, PushError};
const LOCKED: usize = 1 << 0;
const PUSHED: usize = 1 << 1;
@ -29,11 +31,14 @@ impl<T> Single<T> {
// Lock and fill the slot.
let state = self
.state
.compare_and_swap(0, LOCKED | PUSHED, Ordering::SeqCst);
.compare_exchange(0, LOCKED | PUSHED, Ordering::SeqCst, Ordering::SeqCst)
.unwrap_or_else(|x| x);
if state == 0 {
// Write the value and unlock.
unsafe { self.slot.get().write(MaybeUninit::new(value)) }
self.slot.with_mut(|slot| unsafe {
slot.write(MaybeUninit::new(value));
});
self.state.fetch_and(!LOCKED, Ordering::Release);
Ok(())
} else if state & CLOSED != 0 {
@ -43,18 +48,79 @@ impl<T> Single<T> {
}
}
/// Attempts to push an item into the queue, displacing another if necessary.
pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
// Attempt to lock the slot.
let mut state = 0;
loop {
// Lock the slot.
let prev = self
.state
.compare_exchange(state, LOCKED | PUSHED, Ordering::SeqCst, Ordering::SeqCst)
.unwrap_or_else(|x| x);
if prev & CLOSED != 0 {
return Err(ForcePushError(value));
}
if prev == state {
// If the value was pushed, swap out the value.
let prev_value = if prev & PUSHED == 0 {
// SAFETY: write is safe because we have locked the state.
self.slot.with_mut(|slot| unsafe {
slot.write(MaybeUninit::new(value));
});
None
} else {
// SAFETY: replace is safe because we have locked the state, and
// assume_init is safe because we have checked that the value was pushed.
let prev_value = unsafe {
self.slot.with_mut(move |slot| {
ptr::replace(slot, MaybeUninit::new(value)).assume_init()
})
};
Some(prev_value)
};
// We can unlock the slot now.
self.state.fetch_and(!LOCKED, Ordering::Release);
// Return the old value.
return Ok(prev_value);
}
// Try to go for the current (pushed) state.
if prev & LOCKED == 0 {
state = prev;
} else {
// State is locked.
busy_wait();
state = prev & !LOCKED;
}
}
}
/// Attempts to pop an item from the queue.
pub fn pop(&self) -> Result<T, PopError> {
let mut state = PUSHED;
loop {
// Lock and empty the slot.
let prev =
self.state
.compare_and_swap(state, (state | LOCKED) & !PUSHED, Ordering::SeqCst);
let prev = self
.state
.compare_exchange(
state,
(state | LOCKED) & !PUSHED,
Ordering::SeqCst,
Ordering::SeqCst,
)
.unwrap_or_else(|x| x);
if prev == state {
// Read the value and unlock.
let value = unsafe { self.slot.get().read().assume_init() };
let value = self
.slot
.with_mut(|slot| unsafe { slot.read().assume_init() });
self.state.fetch_and(!LOCKED, Ordering::Release);
return Ok(value);
}
@ -70,7 +136,7 @@ impl<T> Single<T> {
if prev & LOCKED == 0 {
state = prev;
} else {
thread::yield_now();
busy_wait();
state = prev & !LOCKED;
}
}
@ -78,11 +144,7 @@ impl<T> Single<T> {
/// Returns the number of items in the queue.
pub fn len(&self) -> usize {
if self.state.load(Ordering::SeqCst) & PUSHED == 0 {
0
} else {
1
}
usize::from(self.state.load(Ordering::SeqCst) & PUSHED != 0)
}
/// Returns `true` if the queue is empty.
@ -112,9 +174,14 @@ impl<T> Single<T> {
impl<T> Drop for Single<T> {
fn drop(&mut self) {
// Drop the value in the slot.
if *self.state.get_mut() & PUSHED != 0 {
let value = unsafe { self.slot.get().read().assume_init() };
drop(value);
}
let Self { state, slot } = self;
state.with_mut(|state| {
if *state & PUSHED != 0 {
slot.with_mut(|slot| unsafe {
let value = &mut *slot;
value.as_mut_ptr().drop_in_place();
});
}
});
}
}

114
src/sync.rs Normal file
View File

@ -0,0 +1,114 @@
//! Synchronization facade to choose between `core` primitives and `loom` primitives.
#[cfg(all(feature = "portable-atomic", not(loom)))]
mod sync_impl {
pub(crate) use core::cell;
pub(crate) use portable_atomic as atomic;
#[cfg(not(feature = "std"))]
pub(crate) use atomic::hint::spin_loop;
#[cfg(feature = "std")]
pub(crate) use std::thread::yield_now;
}
#[cfg(all(not(feature = "portable-atomic"), not(loom)))]
mod sync_impl {
pub(crate) use core::cell;
pub(crate) use core::sync::atomic;
#[cfg(not(feature = "std"))]
#[inline]
pub(crate) fn spin_loop() {
#[allow(deprecated)]
atomic::spin_loop_hint();
}
#[cfg(feature = "std")]
pub(crate) use std::thread::yield_now;
}
#[cfg(loom)]
mod sync_impl {
pub(crate) use loom::cell;
pub(crate) mod atomic {
pub(crate) use loom::sync::atomic::*;
}
#[cfg(not(feature = "std"))]
pub(crate) use loom::hint::spin_loop;
#[cfg(feature = "std")]
pub(crate) use loom::thread::yield_now;
}
pub(crate) use sync_impl::*;
/// Notify the CPU that we are currently busy-waiting.
#[inline]
pub(crate) fn busy_wait() {
#[cfg(feature = "std")]
yield_now();
#[cfg(not(feature = "std"))]
spin_loop();
}
#[cfg(loom)]
pub(crate) mod prelude {}
#[cfg(not(loom))]
pub(crate) mod prelude {
use super::{atomic, cell};
/// Emulate `loom::UnsafeCell`'s API.
pub(crate) trait UnsafeCellExt {
type Value;
fn with_mut<R, F>(&self, f: F) -> R
where
F: FnOnce(*mut Self::Value) -> R;
}
impl<T> UnsafeCellExt for cell::UnsafeCell<T> {
type Value = T;
fn with_mut<R, F>(&self, f: F) -> R
where
F: FnOnce(*mut Self::Value) -> R,
{
f(self.get())
}
}
/// Emulate `loom::Atomic*`'s API.
pub(crate) trait AtomicExt {
type Value;
fn with_mut<R, F>(&mut self, f: F) -> R
where
F: FnOnce(&mut Self::Value) -> R;
}
impl AtomicExt for atomic::AtomicUsize {
type Value = usize;
fn with_mut<R, F>(&mut self, f: F) -> R
where
F: FnOnce(&mut Self::Value) -> R,
{
f(self.get_mut())
}
}
impl<T> AtomicExt for atomic::AtomicPtr<T> {
type Value = *mut T;
fn with_mut<R, F>(&mut self, f: F) -> R
where
F: FnOnce(&mut Self::Value) -> R,
{
f(self.get_mut())
}
}
}

View File

@ -1,12 +1,15 @@
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use std::thread;
use alloc::boxed::Box;
use core::mem::MaybeUninit;
use core::ptr;
use cache_padded::CachePadded;
use crossbeam_utils::CachePadded;
use crate::{PopError, PushError};
use crate::const_fn;
use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use crate::sync::cell::UnsafeCell;
#[allow(unused_imports)]
use crate::sync::prelude::*;
use crate::{busy_wait, PopError, PushError};
// Bits indicating the state of a slot:
// * If a value has been written into the slot, `WRITE` is set.
@ -37,15 +40,40 @@ struct Slot<T> {
}
impl<T> Slot<T> {
#[cfg(not(loom))]
const UNINIT: Slot<T> = Slot {
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(0),
};
#[cfg(not(loom))]
fn uninit_block() -> [Slot<T>; BLOCK_CAP] {
[Self::UNINIT; BLOCK_CAP]
}
#[cfg(loom)]
fn uninit_block() -> [Slot<T>; BLOCK_CAP] {
// Repeat this expression 31 times.
// Update if we change BLOCK_CAP
macro_rules! repeat_31 {
($e: expr) => {
[
$e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e,
$e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e,
]
};
}
repeat_31!(Slot {
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(0),
})
}
/// Waits until a value is written into the slot.
fn wait_write(&self) {
while self.state.load(Ordering::Acquire) & WRITE == 0 {
thread::yield_now();
busy_wait();
}
}
}
@ -66,7 +94,7 @@ impl<T> Block<T> {
fn new() -> Block<T> {
Block {
next: AtomicPtr::new(ptr::null_mut()),
slots: [Slot::UNINIT; BLOCK_CAP],
slots: Slot::uninit_block(),
}
}
@ -77,7 +105,7 @@ impl<T> Block<T> {
if !next.is_null() {
return next;
}
thread::yield_now();
busy_wait();
}
}
@ -121,19 +149,22 @@ pub struct Unbounded<T> {
}
impl<T> Unbounded<T> {
/// Creates a new unbounded queue.
pub fn new() -> Unbounded<T> {
Unbounded {
head: CachePadded::new(Position {
block: AtomicPtr::new(ptr::null_mut()),
index: AtomicUsize::new(0),
}),
tail: CachePadded::new(Position {
block: AtomicPtr::new(ptr::null_mut()),
index: AtomicUsize::new(0),
}),
const_fn!(
const_if: #[cfg(not(loom))];
/// Creates a new unbounded queue.
pub const fn new() -> Unbounded<T> {
Unbounded {
head: CachePadded::new(Position {
block: AtomicPtr::new(ptr::null_mut()),
index: AtomicUsize::new(0),
}),
tail: CachePadded::new(Position {
block: AtomicPtr::new(ptr::null_mut()),
index: AtomicUsize::new(0),
}),
}
}
}
);
/// Pushes an item into the queue.
pub fn push(&self, value: T) -> Result<(), PushError<T>> {
@ -152,7 +183,7 @@ impl<T> Unbounded<T> {
// If we reached the end of the block, wait until the next one is installed.
if offset == BLOCK_CAP {
thread::yield_now();
busy_wait();
tail = self.tail.index.load(Ordering::Acquire);
block = self.tail.block.load(Ordering::Acquire);
continue;
@ -172,8 +203,8 @@ impl<T> Unbounded<T> {
if self
.tail
.block
.compare_and_swap(block, new, Ordering::Release)
== block
.compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
.is_ok()
{
self.head.block.store(new, Ordering::Release);
block = new;
@ -205,7 +236,9 @@ impl<T> Unbounded<T> {
// Write the value into the slot.
let slot = (*block).slots.get_unchecked(offset);
slot.value.get().write(MaybeUninit::new(value));
slot.value.with_mut(|slot| {
slot.write(MaybeUninit::new(value));
});
slot.state.fetch_or(WRITE, Ordering::Release);
return Ok(());
},
@ -228,7 +261,7 @@ impl<T> Unbounded<T> {
// If we reached the end of the block, wait until the next one is installed.
if offset == BLOCK_CAP {
thread::yield_now();
busy_wait();
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);
continue;
@ -258,7 +291,7 @@ impl<T> Unbounded<T> {
// The block can be null here only if the first push operation is in progress.
if block.is_null() {
thread::yield_now();
busy_wait();
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);
continue;
@ -287,7 +320,7 @@ impl<T> Unbounded<T> {
// Read the value.
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let value = slot.value.get().read().assume_init();
let value = slot.value.with_mut(|slot| slot.read().assume_init());
// Destroy the block if we've reached the end, or if another thread wanted to
// destroy but couldn't because we were busy reading from the slot.
@ -371,38 +404,49 @@ impl<T> Unbounded<T> {
impl<T> Drop for Unbounded<T> {
fn drop(&mut self) {
let mut head = self.head.index.load(Ordering::Relaxed);
let mut tail = self.tail.index.load(Ordering::Relaxed);
let mut block = self.head.block.load(Ordering::Relaxed);
let Self { head, tail } = self;
let Position { index: head, block } = &mut **head;
// Erase the lower bits.
head &= !((1 << SHIFT) - 1);
tail &= !((1 << SHIFT) - 1);
head.with_mut(|&mut mut head| {
tail.index.with_mut(|&mut mut tail| {
// Erase the lower bits.
head &= !((1 << SHIFT) - 1);
tail &= !((1 << SHIFT) - 1);
unsafe {
// Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
while head != tail {
let offset = (head >> SHIFT) % LAP;
unsafe {
// Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
while head != tail {
let offset = (head >> SHIFT) % LAP;
if offset < BLOCK_CAP {
// Drop the value in the slot.
let slot = (*block).slots.get_unchecked(offset);
let value = slot.value.get().read().assume_init();
drop(value);
} else {
// Deallocate the block and move to the next one.
let next = (*block).next.load(Ordering::Relaxed);
drop(Box::from_raw(block));
block = next;
if offset < BLOCK_CAP {
// Drop the value in the slot.
block.with_mut(|block| {
let slot = (**block).slots.get_unchecked(offset);
slot.value.with_mut(|slot| {
let value = &mut *slot;
value.as_mut_ptr().drop_in_place();
});
});
} else {
// Deallocate the block and move to the next one.
block.with_mut(|block| {
let next_block = (**block).next.with_mut(|next| *next);
drop(Box::from_raw(*block));
*block = next_block;
});
}
head = head.wrapping_add(1 << SHIFT);
}
// Deallocate the last remaining block.
block.with_mut(|block| {
if !block.is_null() {
drop(Box::from_raw(*block));
}
});
}
head = head.wrapping_add(1 << SHIFT);
}
// Deallocate the last remaining block.
if !block.is_null() {
drop(Box::from_raw(block));
}
}
});
});
}
}

View File

@ -1,7 +1,14 @@
#![allow(clippy::bool_assert_comparison)]
use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};
#[cfg(not(target_family = "wasm"))]
use easy_parallel::Parallel;
#[cfg(not(target_family = "wasm"))]
use std::sync::atomic::{AtomicUsize, Ordering};
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use easy_parallel::Parallel;
#[cfg(target_family = "wasm")]
use wasm_bindgen_test::wasm_bindgen_test as test;
#[test]
fn smoke() {
@ -56,10 +63,11 @@ fn len_empty_full() {
assert_eq!(q.is_full(), false);
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn len() {
const COUNT: usize = 25_000;
const CAP: usize = 1000;
const COUNT: usize = if cfg!(miri) { 50 } else { 25_000 };
const CAP: usize = if cfg!(miri) { 50 } else { 1000 };
let q = ConcurrentQueue::bounded(CAP);
assert_eq!(q.len(), 0);
@ -128,9 +136,36 @@ fn close() {
assert_eq!(q.pop(), Err(PopError::Closed));
}
#[test]
fn force_push() {
let q = ConcurrentQueue::<i32>::bounded(5);
for i in 1..=5 {
assert_eq!(q.force_push(i), Ok(None));
}
assert!(!q.is_closed());
for i in 6..=10 {
assert_eq!(q.force_push(i), Ok(Some(i - 5)));
}
assert_eq!(q.pop(), Ok(6));
assert_eq!(q.force_push(11), Ok(None));
for i in 12..=15 {
assert_eq!(q.force_push(i), Ok(Some(i - 5)));
}
assert!(q.close());
assert_eq!(q.force_push(40), Err(ForcePushError(40)));
for i in 11..=15 {
assert_eq!(q.pop(), Ok(i));
}
assert_eq!(q.pop(), Err(PopError::Closed));
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn spsc() {
const COUNT: usize = 100_000;
const COUNT: usize = if cfg!(miri) { 100 } else { 100_000 };
let q = ConcurrentQueue::bounded(3);
@ -154,9 +189,10 @@ fn spsc() {
.run();
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn mpmc() {
const COUNT: usize = 25_000;
const COUNT: usize = if cfg!(miri) { 100 } else { 25_000 };
const THREADS: usize = 4;
let q = ConcurrentQueue::<usize>::bounded(3);
@ -185,9 +221,11 @@ fn mpmc() {
}
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn drops() {
const RUNS: usize = 100;
const RUNS: usize = if cfg!(miri) { 10 } else { 100 };
const STEPS: usize = if cfg!(miri) { 100 } else { 10_000 };
static DROPS: AtomicUsize = AtomicUsize::new(0);
@ -201,7 +239,7 @@ fn drops() {
}
for _ in 0..RUNS {
let steps = fastrand::usize(..10_000);
let steps = fastrand::usize(..STEPS);
let additional = fastrand::usize(..50);
DROPS.store(0, Ordering::SeqCst);
@ -232,19 +270,102 @@ fn drops() {
}
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn linearizable() {
const COUNT: usize = 25_000;
const COUNT: usize = if cfg!(miri) { 500 } else { 25_000 };
const THREADS: usize = 4;
let q = ConcurrentQueue::bounded(THREADS);
Parallel::new()
.each(0..THREADS, |_| {
.each(0..THREADS / 2, |_| {
for _ in 0..COUNT {
while q.push(0).is_err() {}
q.pop().unwrap();
}
})
.each(0..THREADS / 2, |_| {
for _ in 0..COUNT {
if q.force_push(0).unwrap().is_none() {
q.pop().unwrap();
}
}
})
.run();
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn spsc_ring_buffer() {
const COUNT: usize = if cfg!(miri) { 200 } else { 100_000 };
let t = AtomicUsize::new(1);
let q = ConcurrentQueue::<usize>::bounded(3);
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
Parallel::new()
.add(|| loop {
match t.load(Ordering::SeqCst) {
0 if q.is_empty() => break,
_ => {
while let Ok(n) = q.pop() {
v[n].fetch_add(1, Ordering::SeqCst);
}
}
}
})
.add(|| {
for i in 0..COUNT {
if let Ok(Some(n)) = q.force_push(i) {
v[n].fetch_add(1, Ordering::SeqCst);
}
}
t.fetch_sub(1, Ordering::SeqCst);
})
.run();
for c in v {
assert_eq!(c.load(Ordering::SeqCst), 1);
}
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn mpmc_ring_buffer() {
const COUNT: usize = if cfg!(miri) { 100 } else { 25_000 };
const THREADS: usize = 4;
let t = AtomicUsize::new(THREADS);
let q = ConcurrentQueue::<usize>::bounded(3);
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
Parallel::new()
.each(0..THREADS, |_| loop {
match t.load(Ordering::SeqCst) {
0 if q.is_empty() => break,
_ => {
while let Ok(n) = q.pop() {
v[n].fetch_add(1, Ordering::SeqCst);
}
}
}
})
.each(0..THREADS, |_| {
for i in 0..COUNT {
if let Ok(Some(n)) = q.force_push(i) {
v[n].fetch_add(1, Ordering::SeqCst);
}
}
t.fetch_sub(1, Ordering::SeqCst);
})
.run();
for c in v {
assert_eq!(c.load(Ordering::SeqCst), THREADS);
}
}

307
tests/loom.rs Normal file
View File

@ -0,0 +1,307 @@
#![cfg(loom)]
use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};
use loom::sync::atomic::{AtomicUsize, Ordering};
use loom::sync::{Arc, Condvar, Mutex};
use loom::thread;
#[cfg(target_family = "wasm")]
use wasm_bindgen_test::wasm_bindgen_test as test;
/// A basic MPMC channel based on a ConcurrentQueue and loom primitives.
struct Channel<T> {
/// The queue used to contain items.
queue: ConcurrentQueue<T>,
/// The number of senders.
senders: AtomicUsize,
/// The number of receivers.
receivers: AtomicUsize,
/// The event that is signaled when a new item is pushed.
push_event: Event,
/// The event that is signaled when a new item is popped.
pop_event: Event,
}
/// The sending side of a channel.
struct Sender<T> {
/// The channel.
channel: Arc<Channel<T>>,
}
/// The receiving side of a channel.
struct Receiver<T> {
/// The channel.
channel: Arc<Channel<T>>,
}
/// Create a new pair of senders/receivers based on a queue.
fn pair<T>(queue: ConcurrentQueue<T>) -> (Sender<T>, Receiver<T>) {
let channel = Arc::new(Channel {
queue,
senders: AtomicUsize::new(1),
receivers: AtomicUsize::new(1),
push_event: Event::new(),
pop_event: Event::new(),
});
(
Sender {
channel: channel.clone(),
},
Receiver { channel },
)
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
self.channel.senders.fetch_add(1, Ordering::SeqCst);
Sender {
channel: self.channel.clone(),
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
if self.channel.senders.fetch_sub(1, Ordering::SeqCst) == 1 {
// Close the channel and notify the receivers.
self.channel.queue.close();
self.channel.push_event.signal_all();
}
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
self.channel.receivers.fetch_add(1, Ordering::SeqCst);
Receiver {
channel: self.channel.clone(),
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
if self.channel.receivers.fetch_sub(1, Ordering::SeqCst) == 1 {
// Close the channel and notify the senders.
self.channel.queue.close();
self.channel.pop_event.signal_all();
}
}
}
impl<T> Sender<T> {
/// Send a value.
///
/// Returns an error with the value if the channel is closed.
fn send(&self, mut value: T) -> Result<(), T> {
loop {
match self.channel.queue.push(value) {
Ok(()) => {
// Notify a single receiver.
self.channel.push_event.signal();
return Ok(());
}
Err(PushError::Closed(val)) => return Err(val),
Err(PushError::Full(val)) => {
// Wait for a receiver to pop an item.
value = val;
self.channel.pop_event.wait();
}
}
}
}
/// Send a value forcefully.
fn force_send(&self, value: T) -> Result<Option<T>, T> {
match self.channel.queue.force_push(value) {
Ok(bumped) => {
self.channel.push_event.signal();
Ok(bumped)
}
Err(ForcePushError(val)) => Err(val),
}
}
}
impl<T> Receiver<T> {
/// Channel capacity.
fn capacity(&self) -> Option<usize> {
self.channel.queue.capacity()
}
/// Receive a value.
///
/// Returns an error if the channel is closed.
fn recv(&self) -> Result<T, ()> {
loop {
match self.channel.queue.pop() {
Ok(value) => {
// Notify a single sender.
self.channel.pop_event.signal();
return Ok(value);
}
Err(PopError::Closed) => return Err(()),
Err(PopError::Empty) => {
// Wait for a sender to push an item.
self.channel.push_event.wait();
}
}
}
}
}
/// An event that can be waited on and then signaled.
struct Event {
/// The condition variable used to wait on the event.
condvar: Condvar,
/// The mutex used to protect the event.
///
/// Inside is the event's state. The first bit is used to indicate if the
/// notify_one method was called. The second bit is used to indicate if the
/// notify_all method was called.
mutex: Mutex<usize>,
}
impl Event {
/// Create a new event.
fn new() -> Self {
Self {
condvar: Condvar::new(),
mutex: Mutex::new(0),
}
}
/// Wait for the event to be signaled.
fn wait(&self) {
let mut state = self.mutex.lock().unwrap();
loop {
if *state & 0b11 != 0 {
// The event was signaled.
*state &= !0b01;
return;
}
// Wait for the event to be signaled.
state = self.condvar.wait(state).unwrap();
}
}
/// Signal the event.
fn signal(&self) {
let mut state = self.mutex.lock().unwrap();
*state |= 1;
drop(state);
self.condvar.notify_one();
}
/// Signal the event, but notify all waiters.
fn signal_all(&self) {
let mut state = self.mutex.lock().unwrap();
*state |= 3;
drop(state);
self.condvar.notify_all();
}
}
/// Wrapper to run tests on all three queues.
fn run_test<F: Fn(ConcurrentQueue<usize>, usize) + Send + Sync + Clone + 'static>(f: F) {
// The length of a loom test seems to increase exponentially the higher this number is.
const LIMIT: usize = 4;
let fc = f.clone();
loom::model(move || {
fc(ConcurrentQueue::bounded(1), LIMIT);
});
let fc = f.clone();
loom::model(move || {
fc(ConcurrentQueue::bounded(LIMIT / 2), LIMIT);
});
loom::model(move || {
f(ConcurrentQueue::unbounded(), LIMIT);
});
}
#[test]
fn spsc() {
run_test(|q, limit| {
// Create a new pair of senders/receivers.
let (tx, rx) = pair(q);
// Push each onto a thread and run them.
let handle = thread::spawn(move || {
for i in 0..limit {
if tx.send(i).is_err() {
break;
}
}
});
let mut recv_values = vec![];
loop {
match rx.recv() {
Ok(value) => recv_values.push(value),
Err(()) => break,
}
}
// Values may not be in order.
recv_values.sort_unstable();
assert_eq!(recv_values, (0..limit).collect::<Vec<_>>());
// Join the handle before we exit.
handle.join().unwrap();
});
}
#[test]
fn spsc_force() {
run_test(|q, limit| {
// Create a new pair of senders/receivers.
let (tx, rx) = pair(q);
// Push each onto a thread and run them.
let handle = thread::spawn(move || {
for i in 0..limit {
if tx.force_send(i).is_err() {
break;
}
}
});
let mut recv_values = vec![];
loop {
match rx.recv() {
Ok(value) => recv_values.push(value),
Err(()) => break,
}
}
// Values may not be in order.
recv_values.sort_unstable();
let cap = rx.capacity().unwrap_or(usize::MAX);
for (left, right) in (0..limit)
.rev()
.take(cap)
.zip(recv_values.into_iter().rev())
{
assert_eq!(left, right);
}
// Join the handle before we exit.
handle.join().unwrap();
});
}

View File

@ -1,7 +1,14 @@
#![allow(clippy::bool_assert_comparison)]
use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};
#[cfg(not(target_family = "wasm"))]
use easy_parallel::Parallel;
#[cfg(not(target_family = "wasm"))]
use std::sync::atomic::{AtomicUsize, Ordering};
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use easy_parallel::Parallel;
#[cfg(target_family = "wasm")]
use wasm_bindgen_test::wasm_bindgen_test as test;
#[test]
fn smoke() {
@ -58,9 +65,25 @@ fn close() {
assert_eq!(q.pop(), Err(PopError::Closed));
}
#[test]
fn force_push() {
let q = ConcurrentQueue::<i32>::bounded(1);
assert_eq!(q.force_push(10), Ok(None));
assert!(!q.is_closed());
assert_eq!(q.force_push(20), Ok(Some(10)));
assert_eq!(q.force_push(30), Ok(Some(20)));
assert!(q.close());
assert_eq!(q.force_push(40), Err(ForcePushError(40)));
assert_eq!(q.pop(), Ok(30));
assert_eq!(q.pop(), Err(PopError::Closed));
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn spsc() {
const COUNT: usize = 100_000;
const COUNT: usize = if cfg!(miri) { 100 } else { 100_000 };
let q = ConcurrentQueue::bounded(1);
@ -84,9 +107,10 @@ fn spsc() {
.run();
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn mpmc() {
const COUNT: usize = 25_000;
const COUNT: usize = if cfg!(miri) { 100 } else { 25_000 };
const THREADS: usize = 1;
let q = ConcurrentQueue::<usize>::bounded(THREADS);
@ -115,9 +139,11 @@ fn mpmc() {
}
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn drops() {
const RUNS: usize = 100;
const RUNS: usize = if cfg!(miri) { 20 } else { 100 };
const STEPS: usize = if cfg!(miri) { 100 } else { 10_000 };
static DROPS: AtomicUsize = AtomicUsize::new(0);
@ -131,7 +157,7 @@ fn drops() {
}
for _ in 0..RUNS {
let steps = fastrand::usize(..10_000);
let steps = fastrand::usize(..STEPS);
let additional = fastrand::usize(0..=1);
DROPS.store(0, Ordering::SeqCst);
@ -162,19 +188,102 @@ fn drops() {
}
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn linearizable() {
const COUNT: usize = 25_000;
const COUNT: usize = if cfg!(miri) { 500 } else { 25_000 };
const THREADS: usize = 4;
let q = ConcurrentQueue::bounded(1);
Parallel::new()
.each(0..THREADS, |_| {
.each(0..THREADS / 2, |_| {
for _ in 0..COUNT {
while q.push(0).is_err() {}
q.pop().unwrap();
}
})
.each(0..THREADS / 2, |_| {
for _ in 0..COUNT {
if q.force_push(0).unwrap().is_none() {
q.pop().unwrap();
}
}
})
.run();
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn spsc_ring_buffer() {
const COUNT: usize = if cfg!(miri) { 200 } else { 100_000 };
let t = AtomicUsize::new(1);
let q = ConcurrentQueue::<usize>::bounded(1);
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
Parallel::new()
.add(|| loop {
match t.load(Ordering::SeqCst) {
0 if q.is_empty() => break,
_ => {
while let Ok(n) = q.pop() {
v[n].fetch_add(1, Ordering::SeqCst);
}
}
}
})
.add(|| {
for i in 0..COUNT {
if let Ok(Some(n)) = q.force_push(i) {
v[n].fetch_add(1, Ordering::SeqCst);
}
}
t.fetch_sub(1, Ordering::SeqCst);
})
.run();
for c in v {
assert_eq!(c.load(Ordering::SeqCst), 1);
}
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn mpmc_ring_buffer() {
const COUNT: usize = if cfg!(miri) { 100 } else { 25_000 };
const THREADS: usize = 4;
let t = AtomicUsize::new(THREADS);
let q = ConcurrentQueue::<usize>::bounded(1);
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
Parallel::new()
.each(0..THREADS, |_| loop {
match t.load(Ordering::SeqCst) {
0 if q.is_empty() => break,
_ => {
while let Ok(n) = q.pop() {
v[n].fetch_add(1, Ordering::SeqCst);
}
}
}
})
.each(0..THREADS, |_| {
for i in 0..COUNT {
if let Ok(Some(n)) = q.force_push(i) {
v[n].fetch_add(1, Ordering::SeqCst);
}
}
t.fetch_sub(1, Ordering::SeqCst);
})
.run();
for c in v {
assert_eq!(c.load(Ordering::SeqCst), THREADS);
}
}

View File

@ -1,7 +1,14 @@
use std::sync::atomic::{AtomicUsize, Ordering};
#![allow(clippy::bool_assert_comparison)]
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
#[cfg(not(target_family = "wasm"))]
use easy_parallel::Parallel;
#[cfg(not(target_family = "wasm"))]
use std::sync::atomic::{AtomicUsize, Ordering};
#[cfg(target_family = "wasm")]
use wasm_bindgen_test::wasm_bindgen_test as test;
#[test]
fn smoke() {
@ -67,9 +74,10 @@ fn close() {
assert_eq!(q.pop(), Err(PopError::Closed));
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn spsc() {
const COUNT: usize = 100_000;
const COUNT: usize = if cfg!(miri) { 100 } else { 100_000 };
let q = ConcurrentQueue::unbounded();
@ -93,9 +101,10 @@ fn spsc() {
.run();
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn mpmc() {
const COUNT: usize = 25_000;
const COUNT: usize = if cfg!(miri) { 100 } else { 25_000 };
const THREADS: usize = 4;
let q = ConcurrentQueue::<usize>::unbounded();
@ -124,8 +133,12 @@ fn mpmc() {
}
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn drops() {
const RUNS: usize = if cfg!(miri) { 20 } else { 100 };
const STEPS: usize = if cfg!(miri) { 100 } else { 10_000 };
static DROPS: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, PartialEq)]
@ -137,8 +150,8 @@ fn drops() {
}
}
for _ in 0..100 {
let steps = fastrand::usize(0..10_000);
for _ in 0..RUNS {
let steps = fastrand::usize(0..STEPS);
let additional = fastrand::usize(0..1000);
DROPS.store(0, Ordering::SeqCst);