concurrent-queue/src/unbounded.rs

449 lines
15 KiB
Rust

use alloc::boxed::Box;
use core::mem::MaybeUninit;
use core::ptr;
use crossbeam_utils::CachePadded;
use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use crate::sync::cell::UnsafeCell;
#[allow(unused_imports)]
use crate::sync::prelude::*;
use crate::{busy_wait, PopError, PushError};
// Bits indicating the state of a slot:
// * If a value has been written into the slot, `WRITE` is set.
// * If a value has been read from the slot, `READ` is set.
// * If the block is being destroyed, `DESTROY` is set.
const WRITE: usize = 1;
const READ: usize = 2;
const DESTROY: usize = 4;
// Each block covers one "lap" of indices.
const LAP: usize = 32;
// The maximum number of items a block can hold.
const BLOCK_CAP: usize = LAP - 1;
// How many lower bits are reserved for metadata.
const SHIFT: usize = 1;
// Has two different purposes:
// * If set in head, indicates that the block is not the last one.
// * If set in tail, indicates that the queue is closed.
const MARK_BIT: usize = 1;
/// A slot in a block.
struct Slot<T> {
/// The value.
value: UnsafeCell<MaybeUninit<T>>,
/// The state of the slot.
state: AtomicUsize,
}
impl<T> Slot<T> {
#[cfg(not(loom))]
const UNINIT: Slot<T> = Slot {
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(0),
};
#[cfg(not(loom))]
fn uninit_block() -> [Slot<T>; BLOCK_CAP] {
[Self::UNINIT; BLOCK_CAP]
}
#[cfg(loom)]
fn uninit_block() -> [Slot<T>; BLOCK_CAP] {
// Repeat this expression 31 times.
// Update if we change BLOCK_CAP
macro_rules! repeat_31 {
($e: expr) => {
[
$e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e,
$e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e,
]
};
}
repeat_31!(Slot {
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(0),
})
}
/// Waits until a value is written into the slot.
fn wait_write(&self) {
while self.state.load(Ordering::Acquire) & WRITE == 0 {
busy_wait();
}
}
}
/// A block in a linked list.
///
/// Each block in the list can hold up to `BLOCK_CAP` values.
struct Block<T> {
/// The next block in the linked list.
next: AtomicPtr<Block<T>>,
/// Slots for values.
slots: [Slot<T>; BLOCK_CAP],
}
impl<T> Block<T> {
/// Creates an empty block.
fn new() -> Block<T> {
Block {
next: AtomicPtr::new(ptr::null_mut()),
slots: Slot::uninit_block(),
}
}
/// Waits until the next pointer is set.
fn wait_next(&self) -> *mut Block<T> {
loop {
let next = self.next.load(Ordering::Acquire);
if !next.is_null() {
return next;
}
busy_wait();
}
}
/// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
unsafe fn destroy(this: *mut Block<T>, start: usize) {
// It is not necessary to set the `DESTROY` bit in the last slot because that slot has
// begun destruction of the block.
for i in start..BLOCK_CAP - 1 {
let slot = (*this).slots.get_unchecked(i);
// Mark the `DESTROY` bit if a thread is still using the slot.
if slot.state.load(Ordering::Acquire) & READ == 0
&& slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
{
// If a thread is still using the slot, it will continue destruction of the block.
return;
}
}
// No thread is using the block, now it is safe to destroy it.
drop(Box::from_raw(this));
}
}
/// A position in a queue.
struct Position<T> {
/// The index in the queue.
index: AtomicUsize,
/// The block in the linked list.
block: AtomicPtr<Block<T>>,
}
/// An unbounded queue.
pub struct Unbounded<T> {
/// The head of the queue.
head: CachePadded<Position<T>>,
/// The tail of the queue.
tail: CachePadded<Position<T>>,
}
impl<T> Unbounded<T> {
/// Creates a new unbounded queue.
pub fn new() -> Unbounded<T> {
Unbounded {
head: CachePadded::new(Position {
block: AtomicPtr::new(ptr::null_mut()),
index: AtomicUsize::new(0),
}),
tail: CachePadded::new(Position {
block: AtomicPtr::new(ptr::null_mut()),
index: AtomicUsize::new(0),
}),
}
}
/// Pushes an item into the queue.
pub fn push(&self, value: T) -> Result<(), PushError<T>> {
let mut tail = self.tail.index.load(Ordering::Acquire);
let mut block = self.tail.block.load(Ordering::Acquire);
let mut next_block = None;
loop {
// Check if the queue is closed.
if tail & MARK_BIT != 0 {
return Err(PushError::Closed(value));
}
// Calculate the offset of the index into the block.
let offset = (tail >> SHIFT) % LAP;
// If we reached the end of the block, wait until the next one is installed.
if offset == BLOCK_CAP {
busy_wait();
tail = self.tail.index.load(Ordering::Acquire);
block = self.tail.block.load(Ordering::Acquire);
continue;
}
// If we're going to have to install the next block, allocate it in advance in order to
// make the wait for other threads as short as possible.
if offset + 1 == BLOCK_CAP && next_block.is_none() {
next_block = Some(Box::new(Block::<T>::new()));
}
// If this is the first value to be pushed into the queue, we need to allocate the
// first block and install it.
if block.is_null() {
let new = Box::into_raw(Box::new(Block::<T>::new()));
if self
.tail
.block
.compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
.is_ok()
{
self.head.block.store(new, Ordering::Release);
block = new;
} else {
next_block = unsafe { Some(Box::from_raw(new)) };
tail = self.tail.index.load(Ordering::Acquire);
block = self.tail.block.load(Ordering::Acquire);
continue;
}
}
let new_tail = tail + (1 << SHIFT);
// Try advancing the tail forward.
match self.tail.index.compare_exchange_weak(
tail,
new_tail,
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(_) => unsafe {
// If we've reached the end of the block, install the next one.
if offset + 1 == BLOCK_CAP {
let next_block = Box::into_raw(next_block.unwrap());
self.tail.block.store(next_block, Ordering::Release);
self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
(*block).next.store(next_block, Ordering::Release);
}
// Write the value into the slot.
let slot = (*block).slots.get_unchecked(offset);
slot.value.with_mut(|slot| {
slot.write(MaybeUninit::new(value));
});
slot.state.fetch_or(WRITE, Ordering::Release);
return Ok(());
},
Err(t) => {
tail = t;
block = self.tail.block.load(Ordering::Acquire);
}
}
}
}
/// Pops an item from the queue.
pub fn pop(&self) -> Result<T, PopError> {
let mut head = self.head.index.load(Ordering::Acquire);
let mut block = self.head.block.load(Ordering::Acquire);
loop {
// Calculate the offset of the index into the block.
let offset = (head >> SHIFT) % LAP;
// If we reached the end of the block, wait until the next one is installed.
if offset == BLOCK_CAP {
busy_wait();
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);
continue;
}
let mut new_head = head + (1 << SHIFT);
if new_head & MARK_BIT == 0 {
crate::full_fence();
let tail = self.tail.index.load(Ordering::Relaxed);
// If the tail equals the head, that means the queue is empty.
if head >> SHIFT == tail >> SHIFT {
// Check if the queue is closed.
if tail & MARK_BIT != 0 {
return Err(PopError::Closed);
} else {
return Err(PopError::Empty);
}
}
// If head and tail are not in the same block, set `MARK_BIT` in head.
if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
new_head |= MARK_BIT;
}
}
// The block can be null here only if the first push operation is in progress.
if block.is_null() {
busy_wait();
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);
continue;
}
// Try moving the head index forward.
match self.head.index.compare_exchange_weak(
head,
new_head,
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(_) => unsafe {
// If we've reached the end of the block, move to the next one.
if offset + 1 == BLOCK_CAP {
let next = (*block).wait_next();
let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
if !(*next).next.load(Ordering::Relaxed).is_null() {
next_index |= MARK_BIT;
}
self.head.block.store(next, Ordering::Release);
self.head.index.store(next_index, Ordering::Release);
}
// Read the value.
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let value = slot.value.with_mut(|slot| slot.read().assume_init());
// Destroy the block if we've reached the end, or if another thread wanted to
// destroy but couldn't because we were busy reading from the slot.
if offset + 1 == BLOCK_CAP {
Block::destroy(block, 0);
} else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
Block::destroy(block, offset + 1);
}
return Ok(value);
},
Err(h) => {
head = h;
block = self.head.block.load(Ordering::Acquire);
}
}
}
}
/// Returns the number of items in the queue.
pub fn len(&self) -> usize {
loop {
// Load the tail index, then load the head index.
let mut tail = self.tail.index.load(Ordering::SeqCst);
let mut head = self.head.index.load(Ordering::SeqCst);
// If the tail index didn't change, we've got consistent indices to work with.
if self.tail.index.load(Ordering::SeqCst) == tail {
// Erase the lower bits.
tail &= !((1 << SHIFT) - 1);
head &= !((1 << SHIFT) - 1);
// Fix up indices if they fall onto block ends.
if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
tail = tail.wrapping_add(1 << SHIFT);
}
if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
head = head.wrapping_add(1 << SHIFT);
}
// Rotate indices so that head falls into the first block.
let lap = (head >> SHIFT) / LAP;
tail = tail.wrapping_sub((lap * LAP) << SHIFT);
head = head.wrapping_sub((lap * LAP) << SHIFT);
// Remove the lower bits.
tail >>= SHIFT;
head >>= SHIFT;
// Return the difference minus the number of blocks between tail and head.
return tail - head - tail / LAP;
}
}
}
/// Returns `true` if the queue is empty.
pub fn is_empty(&self) -> bool {
let head = self.head.index.load(Ordering::SeqCst);
let tail = self.tail.index.load(Ordering::SeqCst);
head >> SHIFT == tail >> SHIFT
}
/// Returns `true` if the queue is full.
pub fn is_full(&self) -> bool {
false
}
/// Closes the queue.
///
/// Returns `true` if this call closed the queue.
pub fn close(&self) -> bool {
let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
tail & MARK_BIT == 0
}
/// Returns `true` if the queue is closed.
pub fn is_closed(&self) -> bool {
self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
}
}
impl<T> Drop for Unbounded<T> {
fn drop(&mut self) {
let Self { head, tail } = self;
let Position { index: head, block } = &mut **head;
head.with_mut(|&mut mut head| {
tail.index.with_mut(|&mut mut tail| {
// Erase the lower bits.
head &= !((1 << SHIFT) - 1);
tail &= !((1 << SHIFT) - 1);
unsafe {
// Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
while head != tail {
let offset = (head >> SHIFT) % LAP;
if offset < BLOCK_CAP {
// Drop the value in the slot.
block.with_mut(|block| {
let slot = (**block).slots.get_unchecked(offset);
slot.value.with_mut(|slot| {
let value = &mut *slot;
value.as_mut_ptr().drop_in_place();
});
});
} else {
// Deallocate the block and move to the next one.
block.with_mut(|block| {
let next_block = (**block).next.with_mut(|next| *next);
drop(Box::from_raw(*block));
*block = next_block;
});
}
head = head.wrapping_add(1 << SHIFT);
}
// Deallocate the last remaining block.
block.with_mut(|block| {
if !block.is_null() {
drop(Box::from_raw(*block));
}
});
}
});
});
}
}