Add special bounded(1) implementation

This commit is contained in:
Stjepan Glavina 2020-08-05 18:57:58 +02:00
parent 83223b50a5
commit ee83323156
4 changed files with 321 additions and 6 deletions

View File

@ -36,9 +36,11 @@ use std::panic::{RefUnwindSafe, UnwindSafe};
use std::sync::atomic::{self, AtomicUsize, Ordering};
use crate::bounded::Bounded;
use crate::single::Single;
use crate::unbounded::Unbounded;
mod bounded;
mod single;
mod unbounded;
/// A concurrent queue.
@ -67,8 +69,9 @@ impl<T> UnwindSafe for ConcurrentQueue<T> {}
impl<T> RefUnwindSafe for ConcurrentQueue<T> {}
enum Inner<T> {
Bounded(Bounded<T>),
Unbounded(Unbounded<T>),
Single(Single<T>),
Bounded(Box<Bounded<T>>),
Unbounded(Box<Unbounded<T>>),
}
impl<T> ConcurrentQueue<T> {
@ -88,7 +91,11 @@ impl<T> ConcurrentQueue<T> {
/// let q = ConcurrentQueue::<i32>::bounded(100);
/// ```
pub fn bounded(cap: usize) -> ConcurrentQueue<T> {
ConcurrentQueue(Inner::Bounded(Bounded::new(cap)))
if cap == 1 {
ConcurrentQueue(Inner::Single(Single::new()))
} else {
ConcurrentQueue(Inner::Bounded(Box::new(Bounded::new(cap))))
}
}
/// Creates a new unbounded queue.
@ -101,7 +108,7 @@ impl<T> ConcurrentQueue<T> {
/// let q = ConcurrentQueue::<i32>::unbounded();
/// ```
pub fn unbounded() -> ConcurrentQueue<T> {
ConcurrentQueue(Inner::Unbounded(Unbounded::new()))
ConcurrentQueue(Inner::Unbounded(Box::new(Unbounded::new())))
}
/// Attempts to push an item into the queue.
@ -135,6 +142,7 @@ impl<T> ConcurrentQueue<T> {
/// ```
pub fn push(&self, value: T) -> Result<(), PushError<T>> {
match &self.0 {
Inner::Single(q) => q.push(value),
Inner::Bounded(q) => q.push(value),
Inner::Unbounded(q) => q.push(value),
}
@ -167,6 +175,7 @@ impl<T> ConcurrentQueue<T> {
/// ```
pub fn pop(&self) -> Result<T, PopError> {
match &self.0 {
Inner::Single(q) => q.pop(),
Inner::Bounded(q) => q.pop(),
Inner::Unbounded(q) => q.pop(),
}
@ -187,6 +196,7 @@ impl<T> ConcurrentQueue<T> {
/// ```
pub fn is_empty(&self) -> bool {
match &self.0 {
Inner::Single(q) => q.is_empty(),
Inner::Bounded(q) => q.is_empty(),
Inner::Unbounded(q) => q.is_empty(),
}
@ -209,6 +219,7 @@ impl<T> ConcurrentQueue<T> {
/// ```
pub fn is_full(&self) -> bool {
match &self.0 {
Inner::Single(q) => q.is_full(),
Inner::Bounded(q) => q.is_full(),
Inner::Unbounded(q) => q.is_full(),
}
@ -232,6 +243,7 @@ impl<T> ConcurrentQueue<T> {
/// ```
pub fn len(&self) -> usize {
match &self.0 {
Inner::Single(q) => q.len(),
Inner::Bounded(q) => q.len(),
Inner::Unbounded(q) => q.len(),
}
@ -254,6 +266,7 @@ impl<T> ConcurrentQueue<T> {
/// ```
pub fn capacity(&self) -> Option<usize> {
match &self.0 {
Inner::Single(_) => Some(1),
Inner::Bounded(q) => Some(q.capacity()),
Inner::Unbounded(_) => None,
}
@ -288,6 +301,7 @@ impl<T> ConcurrentQueue<T> {
/// ```
pub fn close(&self) -> bool {
match &self.0 {
Inner::Single(q) => q.close(),
Inner::Bounded(q) => q.close(),
Inner::Unbounded(q) => q.close(),
}
@ -308,6 +322,7 @@ impl<T> ConcurrentQueue<T> {
/// ```
pub fn is_closed(&self) -> bool {
match &self.0 {
Inner::Single(q) => q.is_closed(),
Inner::Bounded(q) => q.is_closed(),
Inner::Unbounded(q) => q.is_closed(),
}

120
src/single.rs Normal file
View File

@ -0,0 +1,120 @@
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use crate::{PopError, PushError};
const LOCKED: usize = 1 << 0;
const PUSHED: usize = 1 << 1;
const CLOSED: usize = 1 << 2;
/// A single-element queue.
pub struct Single<T> {
state: AtomicUsize,
slot: UnsafeCell<MaybeUninit<T>>,
}
impl<T> Single<T> {
/// Creates a new single-element queue.
pub fn new() -> Single<T> {
Single {
state: AtomicUsize::new(0),
slot: UnsafeCell::new(MaybeUninit::uninit()),
}
}
/// Attempts to push an item into the queue.
pub fn push(&self, value: T) -> Result<(), PushError<T>> {
// Lock and fill the slot.
let state = self
.state
.compare_and_swap(0, LOCKED | PUSHED, Ordering::SeqCst);
if state == 0 {
// Write the value and unlock.
unsafe { self.slot.get().write(MaybeUninit::new(value)) }
self.state.fetch_and(!LOCKED, Ordering::Release);
Ok(())
} else if state & CLOSED != 0 {
Err(PushError::Closed(value))
} else {
Err(PushError::Full(value))
}
}
/// Attempts to pop an item from the queue.
pub fn pop(&self) -> Result<T, PopError> {
let mut state = PUSHED;
loop {
// Lock and empty the slot.
let prev =
self.state
.compare_and_swap(state, (state | LOCKED) & !PUSHED, Ordering::SeqCst);
if prev == state {
// Read the value and unlock.
let value = unsafe { self.slot.get().read().assume_init() };
self.state.fetch_and(!LOCKED, Ordering::Release);
return Ok(value);
}
if prev & PUSHED == 0 {
if prev & CLOSED == 0 {
return Err(PopError::Empty);
} else {
return Err(PopError::Closed);
}
}
if prev & LOCKED == 0 {
state = prev;
} else {
thread::yield_now();
state = prev & !LOCKED;
}
}
}
/// Returns the number of items in the queue.
pub fn len(&self) -> usize {
if self.state.load(Ordering::SeqCst) & PUSHED == 0 {
0
} else {
1
}
}
/// Returns `true` if the queue is empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Returns `true` if the queue is full.
pub fn is_full(&self) -> bool {
self.len() == 1
}
/// Closes the queue.
///
/// Returns `true` if this call closed the queue.
pub fn close(&self) -> bool {
let state = self.state.fetch_or(CLOSED, Ordering::SeqCst);
state & CLOSED == 0
}
/// Returns `true` if the queue is closed.
pub fn is_closed(&self) -> bool {
self.state.load(Ordering::SeqCst) & CLOSED != 0
}
}
impl<T> Drop for Single<T> {
fn drop(&mut self) {
// Drop the value in the slot.
if *self.state.get_mut() & PUSHED != 0 {
let value = unsafe { self.slot.get().read().assume_init() };
drop(value);
}
}
}

View File

@ -5,7 +5,7 @@ use easy_parallel::Parallel;
#[test]
fn smoke() {
let q = ConcurrentQueue::bounded(1);
let q = ConcurrentQueue::bounded(2);
q.push(7).unwrap();
assert_eq!(q.pop(), Ok(7));
@ -114,7 +114,7 @@ fn len() {
#[test]
fn close() {
let q = ConcurrentQueue::bounded(1);
let q = ConcurrentQueue::bounded(2);
assert_eq!(q.push(10), Ok(()));
assert!(!q.is_closed());

180
tests/single.rs Normal file
View File

@ -0,0 +1,180 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use easy_parallel::Parallel;
#[test]
fn smoke() {
let q = ConcurrentQueue::bounded(1);
q.push(7).unwrap();
assert_eq!(q.pop(), Ok(7));
q.push(8).unwrap();
assert_eq!(q.pop(), Ok(8));
assert!(q.pop().is_err());
}
#[test]
fn capacity() {
let q = ConcurrentQueue::<i32>::bounded(1);
assert_eq!(q.capacity(), Some(1));
}
#[test]
fn len_empty_full() {
let q = ConcurrentQueue::bounded(1);
assert_eq!(q.len(), 0);
assert_eq!(q.is_empty(), true);
assert_eq!(q.is_full(), false);
q.push(()).unwrap();
assert_eq!(q.len(), 1);
assert_eq!(q.is_empty(), false);
assert_eq!(q.is_full(), true);
q.pop().unwrap();
assert_eq!(q.len(), 0);
assert_eq!(q.is_empty(), true);
assert_eq!(q.is_full(), false);
}
#[test]
fn close() {
let q = ConcurrentQueue::<i32>::bounded(1);
assert_eq!(q.push(10), Ok(()));
assert!(!q.is_closed());
assert!(q.close());
assert!(q.is_closed());
assert!(!q.close());
assert_eq!(q.push(20), Err(PushError::Closed(20)));
assert_eq!(q.pop(), Ok(10));
assert_eq!(q.pop(), Err(PopError::Closed));
}
#[test]
fn spsc() {
const COUNT: usize = 100_000;
let q = ConcurrentQueue::bounded(1);
Parallel::new()
.add(|| {
for i in 0..COUNT {
loop {
if let Ok(x) = q.pop() {
assert_eq!(x, i);
break;
}
}
}
assert!(q.pop().is_err());
})
.add(|| {
for i in 0..COUNT {
while q.push(i).is_err() {}
}
})
.run();
}
#[test]
fn mpmc() {
const COUNT: usize = 25_000;
const THREADS: usize = 1;
let q = ConcurrentQueue::<usize>::bounded(THREADS);
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
Parallel::new()
.each(0..THREADS, |_| {
for _ in 0..COUNT {
let n = loop {
if let Ok(x) = q.pop() {
break x;
}
};
v[n].fetch_add(1, Ordering::SeqCst);
}
})
.each(0..THREADS, |_| {
for i in 0..COUNT {
while q.push(i).is_err() {}
}
})
.run();
for c in v {
assert_eq!(c.load(Ordering::SeqCst), THREADS);
}
}
#[test]
fn drops() {
const RUNS: usize = 100;
static DROPS: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, PartialEq)]
struct DropCounter;
impl Drop for DropCounter {
fn drop(&mut self) {
DROPS.fetch_add(1, Ordering::SeqCst);
}
}
for _ in 0..RUNS {
let steps = fastrand::usize(..10_000);
let additional = fastrand::usize(0..=1);
DROPS.store(0, Ordering::SeqCst);
let q = ConcurrentQueue::bounded(1);
Parallel::new()
.add(|| {
for _ in 0..steps {
while q.pop().is_err() {}
}
})
.add(|| {
for _ in 0..steps {
while q.push(DropCounter).is_err() {
DROPS.fetch_sub(1, Ordering::SeqCst);
}
}
})
.run();
for _ in 0..additional {
q.push(DropCounter).unwrap();
}
assert_eq!(DROPS.load(Ordering::SeqCst), steps);
drop(q);
assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
}
}
#[test]
fn linearizable() {
const COUNT: usize = 25_000;
const THREADS: usize = 4;
let q = ConcurrentQueue::bounded(1);
Parallel::new()
.each(0..THREADS, |_| {
for _ in 0..COUNT {
while q.push(0).is_err() {}
q.pop().unwrap();
}
})
.run();
}