Use a proper lock implementation (#214)

* use a proper but reentrant safe lock impl

* impl downgrade
This commit is contained in:
Joel 2022-05-28 13:19:37 +02:00 committed by GitHub
parent 5fbb68f188
commit 3236196d01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 353 additions and 47 deletions

View File

@ -18,6 +18,7 @@ raw-api = []
[dependencies]
lock_api = "0.4.7"
parking_lot_core = "0.9.3"
hashbrown = { version = "0.12.0", default-features = false }
serde = { version = "1.0.136", optional = true, features = ["derive"] }
cfg-if = "1.0.0"

View File

@ -1,76 +1,300 @@
use lock_api::GuardSend;
use std::hint;
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
const USIZE_BITS: usize = mem::size_of::<usize>() * 8;
const EXCLUSIVE_BIT: usize = 1 << (USIZE_BITS - 1);
use core::sync::atomic::{AtomicUsize, Ordering};
use parking_lot_core::{ParkToken, SpinWait, UnparkToken};
pub type RwLock<T> = lock_api::RwLock<RawRwLock, T>;
pub type RwLockReadGuard<'a, T> = lock_api::RwLockReadGuard<'a, RawRwLock, T>;
pub type RwLockWriteGuard<'a, T> = lock_api::RwLockWriteGuard<'a, RawRwLock, T>;
const READERS_PARKED: usize = 0b0001;
const WRITERS_PARKED: usize = 0b0010;
const ONE_READER: usize = 0b0100;
const ONE_WRITER: usize = !(READERS_PARKED | WRITERS_PARKED);
pub struct RawRwLock {
data: AtomicUsize,
state: AtomicUsize,
}
unsafe impl lock_api::RawRwLock for RawRwLock {
type GuardMarker = GuardSend;
#[allow(clippy::declare_interior_mutable_const)]
const INIT: Self = RawRwLock {
data: AtomicUsize::new(0),
const INIT: Self = Self {
state: AtomicUsize::new(0),
};
fn lock_shared(&self) {
while !self.try_lock_shared() {
hint::spin_loop();
}
}
fn try_lock_shared(&self) -> bool {
let x = self.data.load(Ordering::Acquire);
if x & EXCLUSIVE_BIT != 0 {
return false;
}
let y = x + 1;
self.data
.compare_exchange(x, y, Ordering::Release, Ordering::Relaxed)
.is_ok()
}
unsafe fn unlock_shared(&self) {
self.data.fetch_sub(1, Ordering::Release);
}
fn lock_exclusive(&self) {
while !self.try_lock_exclusive() {
hint::spin_loop();
}
}
type GuardMarker = lock_api::GuardNoSend;
#[inline]
fn try_lock_exclusive(&self) -> bool {
self.data
.compare_exchange(0, EXCLUSIVE_BIT, Ordering::Release, Ordering::Relaxed)
self.state
.compare_exchange(0, ONE_WRITER, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
}
#[inline]
fn lock_exclusive(&self) {
if self
.state
.compare_exchange_weak(0, ONE_WRITER, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
self.lock_exclusive_slow();
}
}
#[inline]
unsafe fn unlock_exclusive(&self) {
self.data.store(0, Ordering::Release)
if self
.state
.compare_exchange(ONE_WRITER, 0, Ordering::Release, Ordering::Relaxed)
.is_err()
{
self.unlock_exclusive_slow();
}
}
fn is_locked(&self) -> bool {
self.data.load(Ordering::Acquire) != 0
#[inline]
fn try_lock_shared(&self) -> bool {
self.try_lock_shared_fast() || self.try_lock_shared_slow()
}
fn is_locked_exclusive(&self) -> bool {
self.data.load(Ordering::Acquire) & EXCLUSIVE_BIT != 0
#[inline]
fn lock_shared(&self) {
if !self.try_lock_shared_fast() {
self.lock_shared_slow();
}
}
#[inline]
unsafe fn unlock_shared(&self) {
let state = self.state.fetch_sub(ONE_READER, Ordering::Release);
if state == (ONE_READER | WRITERS_PARKED) {
self.unlock_shared_slow();
}
}
}
unsafe impl lock_api::RawRwLockDowngrade for RawRwLock {
#[inline]
unsafe fn downgrade(&self) {
self.data.store(1, Ordering::SeqCst);
let state = self
.state
.fetch_and(ONE_READER | WRITERS_PARKED, Ordering::Release);
if state & READERS_PARKED != 0 {
parking_lot_core::unpark_all((self as *const _ as usize) + 1, UnparkToken(0));
}
}
}
impl RawRwLock {
#[cold]
fn lock_exclusive_slow(&self) {
let mut acquire_with = 0;
loop {
let mut spin = SpinWait::new();
let mut state = self.state.load(Ordering::Relaxed);
loop {
while state & ONE_WRITER == 0 {
match self.state.compare_exchange_weak(
state,
state | ONE_WRITER | acquire_with,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => return,
Err(e) => state = e,
}
}
if state & WRITERS_PARKED == 0 {
if spin.spin() {
state = self.state.load(Ordering::Relaxed);
continue;
}
if let Err(e) = self.state.compare_exchange_weak(
state,
state | WRITERS_PARKED,
Ordering::Relaxed,
Ordering::Relaxed,
) {
state = e;
continue;
}
}
let _ = unsafe {
parking_lot_core::park(
self as *const _ as usize,
|| {
let state = self.state.load(Ordering::Relaxed);
(state & ONE_WRITER != 0) && (state & WRITERS_PARKED != 0)
},
|| {},
|_, _| {},
ParkToken(0),
None,
)
};
acquire_with = WRITERS_PARKED;
break;
}
}
}
#[cold]
fn unlock_exclusive_slow(&self) {
let state = self.state.load(Ordering::Relaxed);
assert_eq!(state & ONE_WRITER, ONE_WRITER);
let mut parked = state & (READERS_PARKED | WRITERS_PARKED);
assert_ne!(parked, 0);
if parked != (READERS_PARKED | WRITERS_PARKED) {
if let Err(new_state) =
self.state
.compare_exchange(state, 0, Ordering::Release, Ordering::Relaxed)
{
assert_eq!(new_state, ONE_WRITER | READERS_PARKED | WRITERS_PARKED);
parked = READERS_PARKED | WRITERS_PARKED;
}
}
if parked == (READERS_PARKED | WRITERS_PARKED) {
self.state.store(WRITERS_PARKED, Ordering::Release);
parked = READERS_PARKED;
}
if parked == READERS_PARKED {
return unsafe {
parking_lot_core::unpark_all((self as *const _ as usize) + 1, UnparkToken(0));
};
}
assert_eq!(parked, WRITERS_PARKED);
unsafe {
parking_lot_core::unpark_one(self as *const _ as usize, |_| UnparkToken(0));
}
}
#[inline(always)]
fn try_lock_shared_fast(&self) -> bool {
let state = self.state.load(Ordering::Relaxed);
if let Some(new_state) = state.checked_add(ONE_READER) {
if new_state & ONE_WRITER != ONE_WRITER {
return self
.state
.compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
.is_ok();
}
}
false
}
#[cold]
fn try_lock_shared_slow(&self) -> bool {
let mut state = self.state.load(Ordering::Relaxed);
while let Some(new_state) = state.checked_add(ONE_READER) {
if new_state & ONE_WRITER == ONE_WRITER {
break;
}
match self.state.compare_exchange_weak(
state,
new_state,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => return true,
Err(e) => state = e,
}
}
false
}
#[cold]
fn lock_shared_slow(&self) {
loop {
let mut spin = SpinWait::new();
let mut state = self.state.load(Ordering::Relaxed);
loop {
let mut backoff = SpinWait::new();
while let Some(new_state) = state.checked_add(ONE_READER) {
assert_ne!(
new_state & ONE_WRITER,
ONE_WRITER,
"reader count overflowed",
);
if self
.state
.compare_exchange_weak(
state,
new_state,
Ordering::Acquire,
Ordering::Relaxed,
)
.is_ok()
{
return;
}
backoff.spin_no_yield();
state = self.state.load(Ordering::Relaxed);
}
if state & READERS_PARKED == 0 {
if spin.spin() {
state = self.state.load(Ordering::Relaxed);
continue;
}
if let Err(e) = self.state.compare_exchange_weak(
state,
state | READERS_PARKED,
Ordering::Relaxed,
Ordering::Relaxed,
) {
state = e;
continue;
}
}
let _ = unsafe {
parking_lot_core::park(
(self as *const _ as usize) + 1,
|| {
let state = self.state.load(Ordering::Relaxed);
(state & ONE_WRITER == ONE_WRITER) && (state & READERS_PARKED != 0)
},
|| {},
|_, _| {},
ParkToken(0),
None,
)
};
break;
}
}
}
#[cold]
fn unlock_shared_slow(&self) {
if self
.state
.compare_exchange(WRITERS_PARKED, 0, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
unsafe {
parking_lot_core::unpark_one(self as *const _ as usize, |_| UnparkToken(0));
}
}
}
}

81
src/table.rs Normal file
View File

@ -0,0 +1,81 @@
use super::u128::AtomicU128;
use std::borrow::Borrow;
use std::cell::UnsafeCell;
use std::hash::{BuildHasher, Hash, Hasher};
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
const TOMBSTONE_BIT: u64 = 1 << 63;
const ALLOCATED_BIT: u64 = 1 << 62;
const POINTER_MASK: u64 = 0x3FFFFFFFFFFFFFFF;
fn hash<S, K>(hasher: &S, key: &K) -> u64
where
S: BuildHasher,
K: Hash,
{
let mut hasher = hasher.build_hasher();
key.hash(&mut hasher);
hasher.finish()
}
struct Slot<K, V> {
data: AtomicU64,
pair: UnsafeCell<MaybeUninit<(K, V)>>,
}
pub struct Table<K, V, S> {
hash: Arc<S>,
slots: Box<[Slot<K, V>]>,
mask: usize,
}
impl<K, V, S> Table<K, V, S>
where
K: Eq + Hash,
S: BuildHasher,
{
pub fn new(hasher: Arc<S>, capacity: usize) -> Self {
debug_assert!(capacity.is_power_of_two());
let slots = (0..capacity)
.map(|_| Slot {
data: AtomicU64::new(0),
pair: UnsafeCell::new(MaybeUninit::uninit()),
})
.collect::<Vec<_>>();
Table {
hash: hasher,
slots: slots.into_boxed_slice(),
mask: capacity - 1,
}
}
pub fn get<Q>(&self, key: &Q) -> Option<*mut (K, V)>
where
K: Borrow<Q>,
Q: Eq + Hash,
{
let hash = hash(&*self.hash, key);
let mut idx = hash as usize & self.mask;
let mut i = 0;
loop {
let slot = &self.slots[idx];
let data = slot.data.load(Ordering::Relaxed);
let ptr = (data & POINTER_MASK) as *mut (K, V);
if !ptr.is_null() {
let stored = unsafe { (*ptr).0.borrow() };
if stored == key {
return Some(ptr);
}
} else if data & TOMBSTONE_BIT != TOMBSTONE_BIT || i > self.mask {
return None;
}
idx = (idx + 1) & self.mask;
i += 1;
}
}
}