mirror of https://github.com/spacejam/sled
2292 lines
75 KiB
Rust
2292 lines
75 KiB
Rust
use std::collections::{BTreeMap, VecDeque};
|
|
use std::fmt;
|
|
use std::hint;
|
|
use std::io;
|
|
use std::mem::{self, ManuallyDrop};
|
|
use std::ops;
|
|
use std::ops::Bound;
|
|
use std::ops::RangeBounds;
|
|
use std::sync::atomic::Ordering;
|
|
use std::sync::Arc;
|
|
use std::time::Instant;
|
|
|
|
use concurrent_map::Minimum;
|
|
use inline_array::InlineArray;
|
|
use parking_lot::{
|
|
lock_api::{ArcRwLockReadGuard, ArcRwLockWriteGuard},
|
|
RawRwLock,
|
|
};
|
|
|
|
use crate::*;
|
|
|
|
#[derive(Clone)]
|
|
pub struct Tree<const LEAF_FANOUT: usize = 1024> {
|
|
collection_id: CollectionId,
|
|
cache: ObjectCache<LEAF_FANOUT>,
|
|
index: Index<LEAF_FANOUT>,
|
|
_shutdown_dropper: Arc<ShutdownDropper<LEAF_FANOUT>>,
|
|
}
|
|
|
|
impl<const LEAF_FANOUT: usize> Drop for Tree<LEAF_FANOUT> {
|
|
fn drop(&mut self) {
|
|
if self.cache.config.flush_every_ms.is_none() {
|
|
if let Err(e) = self.flush() {
|
|
log::error!("failed to flush Db on Drop: {e:?}");
|
|
}
|
|
} else {
|
|
// otherwise, it is expected that the flusher thread will
|
|
// flush while shutting down the final Db/Tree instance
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<const LEAF_FANOUT: usize> fmt::Debug for Tree<LEAF_FANOUT> {
|
|
fn fmt(&self, w: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
let alternate = w.alternate();
|
|
|
|
let mut debug_struct = w.debug_struct(&format!("Db<{}>", LEAF_FANOUT));
|
|
|
|
if alternate {
|
|
debug_struct
|
|
.field("global_error", &self.check_error())
|
|
.field(
|
|
"data",
|
|
&format!("{:?}", self.iter().collect::<Vec<_>>()),
|
|
)
|
|
.finish()
|
|
} else {
|
|
debug_struct.field("global_error", &self.check_error()).finish()
|
|
}
|
|
}
|
|
}
|
|
|
|
#[must_use]
|
|
struct LeafReadGuard<'a, const LEAF_FANOUT: usize = 1024> {
|
|
leaf_read:
|
|
ManuallyDrop<ArcRwLockReadGuard<RawRwLock, CacheBox<LEAF_FANOUT>>>,
|
|
low_key: InlineArray,
|
|
inner: &'a Tree<LEAF_FANOUT>,
|
|
object_id: ObjectId,
|
|
external_cache_access_and_eviction: bool,
|
|
}
|
|
|
|
impl<'a, const LEAF_FANOUT: usize> Drop for LeafReadGuard<'a, LEAF_FANOUT> {
|
|
fn drop(&mut self) {
|
|
let size = self.leaf_read.leaf.as_ref().unwrap().in_memory_size;
|
|
// we must drop our mutex before calling mark_access_and_evict
|
|
unsafe {
|
|
ManuallyDrop::drop(&mut self.leaf_read);
|
|
}
|
|
if self.external_cache_access_and_eviction {
|
|
return;
|
|
}
|
|
|
|
let current_epoch = self.inner.cache.current_flush_epoch();
|
|
|
|
if let Err(e) = self.inner.cache.mark_access_and_evict(
|
|
self.object_id,
|
|
size,
|
|
current_epoch,
|
|
) {
|
|
self.inner.set_error(&e);
|
|
log::error!(
|
|
"io error while paging out dirty data: {:?} \
|
|
for guard of leaf with low key {:?}",
|
|
e,
|
|
self.low_key
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
struct LeafWriteGuard<'a, const LEAF_FANOUT: usize = 1024> {
|
|
leaf_write:
|
|
ManuallyDrop<ArcRwLockWriteGuard<RawRwLock, CacheBox<LEAF_FANOUT>>>,
|
|
flush_epoch_guard: FlushEpochGuard<'a>,
|
|
low_key: InlineArray,
|
|
inner: &'a Tree<LEAF_FANOUT>,
|
|
node: Object<LEAF_FANOUT>,
|
|
external_cache_access_and_eviction: bool,
|
|
}
|
|
|
|
impl<'a, const LEAF_FANOUT: usize> LeafWriteGuard<'a, LEAF_FANOUT> {
|
|
fn epoch(&self) -> FlushEpoch {
|
|
self.flush_epoch_guard.epoch()
|
|
}
|
|
|
|
fn handle_cache_access_and_eviction_externally(
|
|
mut self,
|
|
) -> (ObjectId, usize) {
|
|
self.external_cache_access_and_eviction = true;
|
|
(
|
|
self.node.object_id,
|
|
self.leaf_write.leaf.as_ref().unwrap().in_memory_size,
|
|
)
|
|
}
|
|
}
|
|
|
|
impl<'a, const LEAF_FANOUT: usize> Drop for LeafWriteGuard<'a, LEAF_FANOUT> {
|
|
fn drop(&mut self) {
|
|
let size = self.leaf_write.leaf.as_ref().unwrap().in_memory_size;
|
|
|
|
// we must drop our mutex before calling mark_access_and_evict
|
|
unsafe {
|
|
ManuallyDrop::drop(&mut self.leaf_write);
|
|
}
|
|
if self.external_cache_access_and_eviction {
|
|
return;
|
|
}
|
|
|
|
if let Err(e) = self.inner.cache.mark_access_and_evict(
|
|
self.node.object_id,
|
|
size,
|
|
self.epoch(),
|
|
) {
|
|
self.inner.set_error(&e);
|
|
log::error!("io error while paging out dirty data: {:?}", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
|
|
pub(crate) fn new(
|
|
collection_id: CollectionId,
|
|
cache: ObjectCache<LEAF_FANOUT>,
|
|
index: Index<LEAF_FANOUT>,
|
|
_shutdown_dropper: Arc<ShutdownDropper<LEAF_FANOUT>>,
|
|
) -> Tree<LEAF_FANOUT> {
|
|
Tree { collection_id, cache, index, _shutdown_dropper }
|
|
}
|
|
|
|
// This is only pub for an extra assertion during testing.
|
|
#[doc(hidden)]
|
|
pub fn check_error(&self) -> io::Result<()> {
|
|
self.cache.check_error()
|
|
}
|
|
|
|
fn set_error(&self, error: &io::Error) {
|
|
self.cache.set_error(error)
|
|
}
|
|
|
|
pub fn storage_stats(&self) -> Stats {
|
|
Stats { cache: self.cache.stats() }
|
|
}
|
|
|
|
/// Synchronously flushes all dirty IO buffers and calls
|
|
/// fsync. If this succeeds, it is guaranteed that all
|
|
/// previous writes will be recovered if the system
|
|
/// crashes. Returns the number of bytes flushed during
|
|
/// this call.
|
|
///
|
|
/// Flushing can take quite a lot of time, and you should
|
|
/// measure the performance impact of using it on
|
|
/// realistic sustained workloads running on realistic
|
|
/// hardware.
|
|
///
|
|
/// This is called automatically on drop of the last open Db
|
|
/// instance.
|
|
pub fn flush(&self) -> io::Result<FlushStats> {
|
|
self.cache.flush()
|
|
}
|
|
|
|
fn page_in(
|
|
&self,
|
|
key: &[u8],
|
|
flush_epoch: FlushEpoch,
|
|
) -> io::Result<(
|
|
InlineArray,
|
|
ArcRwLockWriteGuard<RawRwLock, CacheBox<LEAF_FANOUT>>,
|
|
Object<LEAF_FANOUT>,
|
|
)> {
|
|
let before_read_io = Instant::now();
|
|
|
|
let mut read_loops: u64 = 0;
|
|
loop {
|
|
let _heap_pin = self.cache.heap_object_id_pin();
|
|
|
|
let (low_key, node) = self.index.get_lte(key).unwrap();
|
|
if node.collection_id != self.collection_id {
|
|
log::trace!("retry due to mismatched collection id in page_in");
|
|
|
|
hint::spin_loop();
|
|
|
|
continue;
|
|
}
|
|
|
|
let mut write = node.inner.write_arc();
|
|
if write.leaf.is_none() {
|
|
self.cache
|
|
.read_stats
|
|
.cache_misses
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
let leaf_bytes =
|
|
if let Some(read_res) = self.cache.read(node.object_id) {
|
|
read_res?
|
|
} else {
|
|
// this particular object ID is not present
|
|
read_loops += 1;
|
|
// TODO change this assertion
|
|
debug_assert!(
|
|
read_loops < 10_000_000_000,
|
|
"search key: {:?} node key: {:?} object id: {:?}",
|
|
key,
|
|
low_key,
|
|
node.object_id
|
|
);
|
|
|
|
hint::spin_loop();
|
|
|
|
continue;
|
|
};
|
|
|
|
let read_io_latency_us =
|
|
u64::try_from(before_read_io.elapsed().as_micros())
|
|
.unwrap();
|
|
self.cache
|
|
.read_stats
|
|
.max_read_io_latency_us
|
|
.fetch_max(read_io_latency_us, Ordering::Relaxed);
|
|
self.cache
|
|
.read_stats
|
|
.sum_read_io_latency_us
|
|
.fetch_add(read_io_latency_us, Ordering::Relaxed);
|
|
|
|
let before_deserialization = Instant::now();
|
|
|
|
let leaf: Box<Leaf<LEAF_FANOUT>> =
|
|
Leaf::deserialize(&leaf_bytes).unwrap();
|
|
|
|
let deserialization_latency_us =
|
|
u64::try_from(before_deserialization.elapsed().as_micros())
|
|
.unwrap();
|
|
self.cache
|
|
.read_stats
|
|
.max_deserialization_latency_us
|
|
.fetch_max(deserialization_latency_us, Ordering::Relaxed);
|
|
self.cache
|
|
.read_stats
|
|
.sum_deserialization_latency_us
|
|
.fetch_add(deserialization_latency_us, Ordering::Relaxed);
|
|
|
|
#[cfg(feature = "for-internal-testing-only")]
|
|
{
|
|
self.cache.event_verifier.mark(
|
|
node.object_id,
|
|
FlushEpoch::MIN,
|
|
event_verifier::State::CleanPagedIn,
|
|
concat!(file!(), ':', line!(), ":page-in"),
|
|
);
|
|
}
|
|
|
|
write.leaf = Some(leaf);
|
|
} else {
|
|
self.cache
|
|
.read_stats
|
|
.cache_hits
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
let leaf = write.leaf.as_mut().unwrap();
|
|
|
|
if leaf.deleted.is_some() {
|
|
log::trace!("retry due to deleted node in page_in");
|
|
drop(write);
|
|
|
|
hint::spin_loop();
|
|
|
|
continue;
|
|
}
|
|
|
|
if &*leaf.lo > key {
|
|
let size = leaf.in_memory_size;
|
|
drop(write);
|
|
log::trace!("key undershoot in page_in");
|
|
self.cache.mark_access_and_evict(
|
|
node.object_id,
|
|
size,
|
|
flush_epoch,
|
|
)?;
|
|
|
|
hint::spin_loop();
|
|
|
|
continue;
|
|
}
|
|
|
|
if let Some(ref hi) = leaf.hi {
|
|
if &**hi <= key {
|
|
let size = leaf.in_memory_size;
|
|
drop(write);
|
|
log::trace!("key overshoot in page_in");
|
|
self.cache.mark_access_and_evict(
|
|
node.object_id,
|
|
size,
|
|
flush_epoch,
|
|
)?;
|
|
|
|
hint::spin_loop();
|
|
|
|
continue;
|
|
}
|
|
}
|
|
return Ok((low_key, write, node));
|
|
}
|
|
}
|
|
|
|
// NB: must never be called without having already added the empty leaf
|
|
// operations to a normal flush epoch. This function acquires the lock
|
|
// for the left sibling so that the empty leaf's hi key can be given
|
|
// to the left sibling, but for this to happen atomically, the act of
|
|
// moving left must "happen" in the same flush epoch. By "pushing" the
|
|
// merge left potentially into a future flush epoch, any deletions that the
|
|
// leaf had applied that may have been a part of a previous batch would also
|
|
// be pushed into the future flush epoch, which would break the crash
|
|
// atomicity of the batch if the updates were not flushed in the same epoch
|
|
// as the rest of the batch. So, this is why we potentially separate the
|
|
// flush of the left merge from the flush of the operations that caused
|
|
// the leaf to empty in the first place.
|
|
fn merge_leaf_into_left_sibling<'a>(
|
|
&'a self,
|
|
mut leaf_guard: LeafWriteGuard<'a, LEAF_FANOUT>,
|
|
) -> io::Result<()> {
|
|
let mut predecessor_guard = self.predecessor_leaf_mut(&leaf_guard)?;
|
|
|
|
assert!(predecessor_guard.epoch() >= leaf_guard.epoch());
|
|
|
|
let merge_epoch = leaf_guard.epoch().max(predecessor_guard.epoch());
|
|
|
|
let leaf = leaf_guard.leaf_write.leaf.as_mut().unwrap();
|
|
let predecessor = predecessor_guard.leaf_write.leaf.as_mut().unwrap();
|
|
|
|
assert!(leaf.deleted.is_none());
|
|
assert!(leaf.data.is_empty());
|
|
assert!(predecessor.deleted.is_none());
|
|
assert_eq!(predecessor.hi.as_ref(), Some(&leaf.lo));
|
|
|
|
log::trace!(
|
|
"deleting empty node id {} with low key {:?} and high key {:?}",
|
|
leaf_guard.node.object_id.0,
|
|
leaf.lo,
|
|
leaf.hi
|
|
);
|
|
|
|
predecessor.hi = leaf.hi.clone();
|
|
predecessor.set_dirty_epoch(merge_epoch);
|
|
|
|
leaf.deleted = Some(merge_epoch);
|
|
|
|
leaf_guard
|
|
.inner
|
|
.cache
|
|
.tree_leaves_merged
|
|
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
|
|
|
self.index.remove(&leaf_guard.low_key).unwrap();
|
|
self.cache.object_id_index.remove(&leaf_guard.node.object_id).unwrap();
|
|
|
|
// NB: these updates must "happen" atomically in the same flush epoch
|
|
self.cache.install_dirty(
|
|
merge_epoch,
|
|
leaf_guard.node.object_id,
|
|
Dirty::MergedAndDeleted {
|
|
object_id: leaf_guard.node.object_id,
|
|
collection_id: self.collection_id,
|
|
},
|
|
);
|
|
|
|
#[cfg(feature = "for-internal-testing-only")]
|
|
{
|
|
self.cache.event_verifier.mark(
|
|
leaf_guard.node.object_id,
|
|
merge_epoch,
|
|
event_verifier::State::Unallocated,
|
|
concat!(file!(), ':', line!(), ":merged"),
|
|
);
|
|
}
|
|
|
|
self.cache.install_dirty(
|
|
merge_epoch,
|
|
predecessor_guard.node.object_id,
|
|
Dirty::NotYetSerialized {
|
|
low_key: predecessor.lo.clone(),
|
|
node: predecessor_guard.node.clone(),
|
|
collection_id: self.collection_id,
|
|
},
|
|
);
|
|
|
|
#[cfg(feature = "for-internal-testing-only")]
|
|
{
|
|
self.cache.event_verifier.mark(
|
|
predecessor_guard.node.object_id,
|
|
merge_epoch,
|
|
event_verifier::State::Dirty,
|
|
concat!(file!(), ':', line!(), ":merged-into"),
|
|
);
|
|
}
|
|
|
|
let (p_object_id, p_sz) =
|
|
predecessor_guard.handle_cache_access_and_eviction_externally();
|
|
let (s_object_id, s_sz) =
|
|
leaf_guard.handle_cache_access_and_eviction_externally();
|
|
|
|
self.cache.mark_access_and_evict(p_object_id, p_sz, merge_epoch)?;
|
|
|
|
// TODO maybe we don't need to do this, since we're removing it
|
|
self.cache.mark_access_and_evict(s_object_id, s_sz, merge_epoch)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn predecessor_leaf_mut<'a>(
|
|
&'a self,
|
|
successor: &LeafWriteGuard<'a, LEAF_FANOUT>,
|
|
) -> io::Result<LeafWriteGuard<'a, LEAF_FANOUT>> {
|
|
assert_ne!(successor.low_key, &*InlineArray::MIN);
|
|
|
|
loop {
|
|
let search_key = self
|
|
.index
|
|
.range::<InlineArray, _>(..&successor.low_key)
|
|
.next_back()
|
|
.unwrap()
|
|
.0;
|
|
|
|
assert!(search_key < successor.low_key);
|
|
|
|
// TODO this can probably deadlock
|
|
let node = self.leaf_for_key_mut(&search_key)?;
|
|
|
|
let leaf = node.leaf_write.leaf.as_ref().unwrap();
|
|
|
|
assert!(leaf.lo < successor.low_key);
|
|
|
|
let leaf_hi = leaf.hi.as_ref().expect("we hold the successor mutex, so the predecessor should have a hi key");
|
|
|
|
if leaf_hi > &successor.low_key {
|
|
let still_in_index = self.index.get(&successor.low_key);
|
|
panic!(
|
|
"somehow, predecessor high key of {:?} \
|
|
is greater than successor low key of {:?}. current index presence: {:?} \n \
|
|
predecessor: {:?} \n successor: {:?}",
|
|
leaf_hi, successor.low_key, still_in_index, leaf, successor.leaf_write.leaf.as_ref().unwrap(),
|
|
);
|
|
}
|
|
if leaf_hi != &successor.low_key {
|
|
continue;
|
|
}
|
|
return Ok(node);
|
|
}
|
|
}
|
|
|
|
fn leaf_for_key<'a>(
|
|
&'a self,
|
|
key: &[u8],
|
|
) -> io::Result<LeafReadGuard<'a, LEAF_FANOUT>> {
|
|
loop {
|
|
let (low_key, node) = self.index.get_lte(key).unwrap();
|
|
let mut read = node.inner.read_arc();
|
|
|
|
if read.leaf.is_none() {
|
|
drop(read);
|
|
let (read_low_key, write, _node) =
|
|
self.page_in(key, self.cache.current_flush_epoch())?;
|
|
assert!(&*read_low_key <= key);
|
|
read = ArcRwLockWriteGuard::downgrade(write);
|
|
} else {
|
|
self.cache
|
|
.read_stats
|
|
.cache_hits
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
|
|
let leaf_guard = LeafReadGuard {
|
|
leaf_read: ManuallyDrop::new(read),
|
|
inner: self,
|
|
low_key,
|
|
object_id: node.object_id,
|
|
external_cache_access_and_eviction: false,
|
|
};
|
|
|
|
let leaf = leaf_guard.leaf_read.leaf.as_ref().unwrap();
|
|
|
|
if leaf.deleted.is_some() {
|
|
log::trace!("retry due to deleted node in leaf_for_key");
|
|
drop(leaf_guard);
|
|
hint::spin_loop();
|
|
continue;
|
|
}
|
|
if &*leaf.lo > key {
|
|
log::trace!("key undershoot in leaf_for_key");
|
|
drop(leaf_guard);
|
|
hint::spin_loop();
|
|
continue;
|
|
}
|
|
if let Some(ref hi) = leaf.hi {
|
|
if &**hi <= key {
|
|
log::trace!("key overshoot on leaf_for_key");
|
|
// cache maintenance occurs in Drop for LeafReadGuard
|
|
drop(leaf_guard);
|
|
hint::spin_loop();
|
|
continue;
|
|
}
|
|
}
|
|
|
|
return Ok(leaf_guard);
|
|
}
|
|
}
|
|
|
|
fn leaf_for_key_mut<'a>(
|
|
&'a self,
|
|
key: &[u8],
|
|
) -> io::Result<LeafWriteGuard<'a, LEAF_FANOUT>> {
|
|
let reader_epoch = self.cache.current_flush_epoch();
|
|
|
|
let (low_key, mut write, node) = self.page_in(key, reader_epoch)?;
|
|
|
|
// by checking into an epoch after acquiring the node mutex, we
|
|
// avoid inversions where progress may be observed to go backwards.
|
|
let flush_epoch_guard = self.cache.check_into_flush_epoch();
|
|
|
|
let leaf = write.leaf.as_mut().unwrap();
|
|
|
|
// NB: these invariants should be enforced in page_in
|
|
assert!(leaf.deleted.is_none());
|
|
assert!(&*leaf.lo <= key);
|
|
if let Some(ref hi) = leaf.hi {
|
|
assert!(
|
|
&**hi > key,
|
|
"while retrieving the leaf for key {:?} \
|
|
we pulled a leaf with hi key of {:?}",
|
|
key,
|
|
hi
|
|
);
|
|
}
|
|
|
|
if let Some(old_dirty_epoch) = leaf.dirty_flush_epoch {
|
|
if old_dirty_epoch != flush_epoch_guard.epoch() {
|
|
log::trace!(
|
|
"cooperatively flushing {:?} with dirty {:?} after checking into {:?}",
|
|
node.object_id,
|
|
old_dirty_epoch,
|
|
flush_epoch_guard.epoch()
|
|
);
|
|
|
|
assert!(old_dirty_epoch < flush_epoch_guard.epoch());
|
|
|
|
// cooperatively serialize and put into dirty
|
|
leaf.max_unflushed_epoch = leaf.dirty_flush_epoch.take();
|
|
leaf.page_out_on_flush.take();
|
|
log::trace!(
|
|
"starting cooperatively serializing {:?} for {:?} because we want to use it in {:?}",
|
|
node.object_id, old_dirty_epoch, flush_epoch_guard.epoch(),
|
|
);
|
|
#[cfg(feature = "for-internal-testing-only")]
|
|
{
|
|
self.cache.event_verifier.mark(
|
|
node.object_id,
|
|
old_dirty_epoch,
|
|
event_verifier::State::CooperativelySerialized,
|
|
concat!(
|
|
file!(),
|
|
':',
|
|
line!(),
|
|
":cooperative-serialize"
|
|
),
|
|
);
|
|
}
|
|
|
|
// be extra-explicit about serialized bytes
|
|
let leaf_ref: &Leaf<LEAF_FANOUT> = &*leaf;
|
|
|
|
let serialized = leaf_ref
|
|
.serialize(self.cache.config.zstd_compression_level);
|
|
|
|
log::trace!(
|
|
"D adding node {} to dirty {:?}",
|
|
node.object_id.0,
|
|
old_dirty_epoch
|
|
);
|
|
|
|
self.cache.install_dirty(
|
|
old_dirty_epoch,
|
|
node.object_id,
|
|
Dirty::CooperativelySerialized {
|
|
object_id: node.object_id,
|
|
collection_id: self.collection_id,
|
|
low_key: leaf.lo.clone(),
|
|
mutation_count: leaf.mutation_count,
|
|
data: Arc::new(serialized),
|
|
},
|
|
);
|
|
log::trace!(
|
|
"finished cooperatively serializing {:?} for {:?} because we want to use it in {:?}",
|
|
node.object_id, old_dirty_epoch, flush_epoch_guard.epoch(),
|
|
);
|
|
|
|
assert!(
|
|
old_dirty_epoch < flush_epoch_guard.epoch(),
|
|
"flush epochs somehow became unlinked"
|
|
);
|
|
}
|
|
}
|
|
|
|
Ok(LeafWriteGuard {
|
|
flush_epoch_guard,
|
|
leaf_write: ManuallyDrop::new(write),
|
|
inner: self,
|
|
low_key,
|
|
node,
|
|
external_cache_access_and_eviction: false,
|
|
})
|
|
}
|
|
|
|
/// Retrieve a value from the `Tree` if it exists.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// # let config = sled::Config::tmp().unwrap();
|
|
/// # let db: sled::Db<1024> = config.open()?;
|
|
/// db.insert(&[0], vec![0])?;
|
|
/// assert_eq!(db.get(&[0]).unwrap(), Some(sled::InlineArray::from(vec![0])));
|
|
/// assert!(db.get(&[1]).unwrap().is_none());
|
|
/// # Ok(()) }
|
|
/// ```
|
|
pub fn get<K: AsRef<[u8]>>(
|
|
&self,
|
|
key: K,
|
|
) -> io::Result<Option<InlineArray>> {
|
|
self.check_error()?;
|
|
|
|
let key_ref = key.as_ref();
|
|
|
|
let leaf_guard = self.leaf_for_key(key_ref)?;
|
|
|
|
let leaf = leaf_guard.leaf_read.leaf.as_ref().unwrap();
|
|
|
|
if let Some(ref hi) = leaf.hi {
|
|
assert!(&**hi > key_ref);
|
|
}
|
|
|
|
Ok(leaf.get(key_ref).cloned())
|
|
}
|
|
|
|
/// Insert a key to a new value, returning the last value if it
|
|
/// was set.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// # let config = sled::Config::tmp().unwrap();
|
|
/// # let db: sled::Db<1024> = config.open()?;
|
|
/// assert_eq!(db.insert(&[1, 2, 3], vec![0]).unwrap(), None);
|
|
/// assert_eq!(db.insert(&[1, 2, 3], vec![1]).unwrap(), Some(sled::InlineArray::from(&[0])));
|
|
/// # Ok(()) }
|
|
/// ```
|
|
#[doc(alias = "set")]
|
|
#[doc(alias = "put")]
|
|
pub fn insert<K, V>(
|
|
&self,
|
|
key: K,
|
|
value: V,
|
|
) -> io::Result<Option<InlineArray>>
|
|
where
|
|
K: AsRef<[u8]>,
|
|
V: Into<InlineArray>,
|
|
{
|
|
self.check_error()?;
|
|
|
|
let key_ref = key.as_ref();
|
|
|
|
let value_ivec = value.into();
|
|
let mut leaf_guard = self.leaf_for_key_mut(key_ref)?;
|
|
let new_epoch = leaf_guard.epoch();
|
|
|
|
let leaf = leaf_guard.leaf_write.leaf.as_mut().unwrap();
|
|
|
|
let ret = leaf.insert(key_ref.into(), value_ivec.clone());
|
|
|
|
let old_size =
|
|
ret.as_ref().map(|v| key_ref.len() + v.len()).unwrap_or(0);
|
|
let new_size = key_ref.len() + value_ivec.len();
|
|
|
|
if new_size > old_size {
|
|
leaf.in_memory_size += new_size - old_size;
|
|
} else {
|
|
leaf.in_memory_size =
|
|
leaf.in_memory_size.saturating_sub(old_size - new_size);
|
|
}
|
|
|
|
let split =
|
|
leaf.split_if_full(new_epoch, &self.cache, self.collection_id);
|
|
if split.is_some() || Some(value_ivec) != ret {
|
|
leaf.mutation_count += 1;
|
|
leaf.set_dirty_epoch(new_epoch);
|
|
log::trace!(
|
|
"F adding node {} to dirty {:?}",
|
|
leaf_guard.node.object_id.0,
|
|
new_epoch
|
|
);
|
|
|
|
self.cache.install_dirty(
|
|
new_epoch,
|
|
leaf_guard.node.object_id,
|
|
Dirty::NotYetSerialized {
|
|
collection_id: self.collection_id,
|
|
node: leaf_guard.node.clone(),
|
|
low_key: leaf_guard.low_key.clone(),
|
|
},
|
|
);
|
|
|
|
#[cfg(feature = "for-internal-testing-only")]
|
|
{
|
|
self.cache.event_verifier.mark(
|
|
leaf_guard.node.object_id,
|
|
new_epoch,
|
|
event_verifier::State::Dirty,
|
|
concat!(file!(), ':', line!(), ":insert"),
|
|
);
|
|
}
|
|
}
|
|
if let Some((split_key, rhs_node)) = split {
|
|
assert_eq!(leaf.hi.as_ref().unwrap(), &split_key);
|
|
log::trace!(
|
|
"G adding new from split {:?} to dirty {:?}",
|
|
rhs_node.object_id,
|
|
new_epoch
|
|
);
|
|
|
|
assert_ne!(rhs_node.object_id, leaf_guard.node.object_id);
|
|
assert!(!split_key.is_empty());
|
|
|
|
let rhs_object_id = rhs_node.object_id;
|
|
|
|
self.cache.install_dirty(
|
|
new_epoch,
|
|
rhs_object_id,
|
|
Dirty::NotYetSerialized {
|
|
collection_id: self.collection_id,
|
|
node: rhs_node.clone(),
|
|
low_key: split_key.clone(),
|
|
},
|
|
);
|
|
|
|
#[cfg(feature = "for-internal-testing-only")]
|
|
{
|
|
self.cache.event_verifier.mark(
|
|
rhs_object_id,
|
|
new_epoch,
|
|
event_verifier::State::Dirty,
|
|
concat!(file!(), ':', line!(), ":insert-split"),
|
|
);
|
|
}
|
|
|
|
// NB only make the new node reachable via the index after
|
|
// we marked it as dirty, as from this point on, any other
|
|
// thread may cooperatively deserialize it and maybe conflict
|
|
// with that previous NotYetSerialized marker.
|
|
self.cache
|
|
.object_id_index
|
|
.insert(rhs_node.object_id, rhs_node.clone());
|
|
let prev = self.index.insert(split_key, rhs_node);
|
|
assert!(prev.is_none());
|
|
}
|
|
|
|
// this is for clarity, that leaf_guard is held while
|
|
// inserting into dirty with its guarded epoch
|
|
drop(leaf_guard);
|
|
|
|
Ok(ret)
|
|
}
|
|
|
|
/// Delete a value, returning the old value if it existed.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// # let config = sled::Config::tmp().unwrap();
|
|
/// # let db: sled::Db<1024> = config.open()?;
|
|
/// db.insert(&[1], vec![1]);
|
|
/// assert_eq!(db.remove(&[1]).unwrap(), Some(sled::InlineArray::from(vec![1])));
|
|
/// assert!(db.remove(&[1]).unwrap().is_none());
|
|
/// # Ok(()) }
|
|
/// ```
|
|
#[doc(alias = "delete")]
|
|
#[doc(alias = "del")]
|
|
pub fn remove<K: AsRef<[u8]>>(
|
|
&self,
|
|
key: K,
|
|
) -> io::Result<Option<InlineArray>> {
|
|
self.check_error()?;
|
|
|
|
let key_ref = key.as_ref();
|
|
|
|
let mut leaf_guard = self.leaf_for_key_mut(key_ref)?;
|
|
|
|
let new_epoch = leaf_guard.epoch();
|
|
|
|
let leaf = leaf_guard.leaf_write.leaf.as_mut().unwrap();
|
|
|
|
assert!(leaf.deleted.is_none());
|
|
|
|
let ret = leaf.remove(key_ref);
|
|
|
|
if ret.is_some() {
|
|
leaf.mutation_count += 1;
|
|
|
|
leaf.set_dirty_epoch(new_epoch);
|
|
|
|
log::trace!(
|
|
"H adding node {} to dirty {:?}",
|
|
leaf_guard.node.object_id.0,
|
|
new_epoch
|
|
);
|
|
|
|
self.cache.install_dirty(
|
|
new_epoch,
|
|
leaf_guard.node.object_id,
|
|
Dirty::NotYetSerialized {
|
|
collection_id: self.collection_id,
|
|
low_key: leaf_guard.low_key.clone(),
|
|
node: leaf_guard.node.clone(),
|
|
},
|
|
);
|
|
|
|
#[cfg(feature = "for-internal-testing-only")]
|
|
{
|
|
self.cache.event_verifier.mark(
|
|
leaf_guard.node.object_id,
|
|
new_epoch,
|
|
event_verifier::State::Dirty,
|
|
concat!(file!(), ':', line!(), ":remove"),
|
|
);
|
|
}
|
|
|
|
if leaf.data.is_empty() && leaf_guard.low_key != InlineArray::MIN {
|
|
self.merge_leaf_into_left_sibling(leaf_guard)?;
|
|
}
|
|
}
|
|
|
|
Ok(ret)
|
|
}
|
|
/// Compare and swap. Capable of unique creation, conditional modification,
|
|
/// or deletion. If old is `None`, this will only set the value if it
|
|
/// doesn't exist yet. If new is `None`, will delete the value if old is
|
|
/// correct. If both old and new are `Some`, will modify the value if
|
|
/// old is correct.
|
|
///
|
|
/// It returns `Ok(Ok(CompareAndSwapSuccess { new_value, previous_value }))` if operation finishes successfully.
|
|
///
|
|
/// If it fails it returns:
|
|
/// - `Ok(Err(CompareAndSwapError{ current, proposed }))` if no IO
|
|
/// error was encountered but the operation
|
|
/// failed to specify the correct current value. `CompareAndSwapError` contains
|
|
/// current and proposed values.
|
|
/// - `Err(io::Error)` if there was a high-level IO problem that prevented
|
|
/// the operation from logically progressing. This is usually fatal and
|
|
/// will prevent future requests from functioning, and requires the
|
|
/// administrator to fix the system issue before restarting.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// # let config = sled::Config::tmp().unwrap();
|
|
/// # let db: sled::Db<1024> = config.open()?;
|
|
/// // unique creation
|
|
/// assert!(
|
|
/// db.compare_and_swap(&[1], None as Option<&[u8]>, Some(&[10])).unwrap().is_ok(),
|
|
/// );
|
|
///
|
|
/// // conditional modification
|
|
/// assert!(
|
|
/// db.compare_and_swap(&[1], Some(&[10]), Some(&[20])).unwrap().is_ok(),
|
|
/// );
|
|
///
|
|
/// // failed conditional modification -- the current value is returned in
|
|
/// // the error variant
|
|
/// let operation = db.compare_and_swap(&[1], Some(&[30]), Some(&[40]));
|
|
/// assert!(operation.is_ok()); // the operation succeeded
|
|
/// let modification = operation.unwrap();
|
|
/// assert!(modification.is_err());
|
|
/// let actual_value = modification.unwrap_err();
|
|
/// assert_eq!(actual_value.current.map(|ivec| ivec.to_vec()), Some(vec![20]));
|
|
///
|
|
/// // conditional deletion
|
|
/// assert!(
|
|
/// db.compare_and_swap(&[1], Some(&[20]), None as Option<&[u8]>).unwrap().is_ok(),
|
|
/// );
|
|
/// assert!(db.get(&[1]).unwrap().is_none());
|
|
/// # Ok(()) }
|
|
/// ```
|
|
#[doc(alias = "cas")]
|
|
#[doc(alias = "tas")]
|
|
#[doc(alias = "test_and_swap")]
|
|
#[doc(alias = "compare_and_set")]
|
|
pub fn compare_and_swap<K, OV, NV>(
|
|
&self,
|
|
key: K,
|
|
old: Option<OV>,
|
|
new: Option<NV>,
|
|
) -> CompareAndSwapResult
|
|
where
|
|
K: AsRef<[u8]>,
|
|
OV: AsRef<[u8]>,
|
|
NV: Into<InlineArray>,
|
|
{
|
|
self.check_error()?;
|
|
|
|
let key_ref = key.as_ref();
|
|
|
|
let mut leaf_guard = self.leaf_for_key_mut(key_ref)?;
|
|
let new_epoch = leaf_guard.epoch();
|
|
|
|
let proposed: Option<InlineArray> = new.map(Into::into);
|
|
|
|
let leaf = leaf_guard.leaf_write.leaf.as_mut().unwrap();
|
|
|
|
let current = leaf.get(key_ref).cloned();
|
|
|
|
let previous_matches = match (old, ¤t) {
|
|
(None, None) => true,
|
|
(Some(conditional), Some(current))
|
|
if conditional.as_ref() == current.as_ref() =>
|
|
{
|
|
true
|
|
}
|
|
_ => false,
|
|
};
|
|
|
|
let ret = if previous_matches {
|
|
if let Some(ref new_value) = proposed {
|
|
leaf.insert(key_ref.into(), new_value.clone())
|
|
} else {
|
|
leaf.remove(key_ref)
|
|
};
|
|
|
|
Ok(CompareAndSwapSuccess {
|
|
new_value: proposed,
|
|
previous_value: current,
|
|
})
|
|
} else {
|
|
Err(CompareAndSwapError { current, proposed })
|
|
};
|
|
|
|
let split =
|
|
leaf.split_if_full(new_epoch, &self.cache, self.collection_id);
|
|
let split_happened = split.is_some();
|
|
if split_happened || ret.is_ok() {
|
|
leaf.mutation_count += 1;
|
|
|
|
leaf.set_dirty_epoch(new_epoch);
|
|
log::trace!(
|
|
"A adding node {} to dirty {:?}",
|
|
leaf_guard.node.object_id.0,
|
|
new_epoch
|
|
);
|
|
|
|
self.cache.install_dirty(
|
|
new_epoch,
|
|
leaf_guard.node.object_id,
|
|
Dirty::NotYetSerialized {
|
|
collection_id: self.collection_id,
|
|
node: leaf_guard.node.clone(),
|
|
low_key: leaf_guard.low_key.clone(),
|
|
},
|
|
);
|
|
|
|
#[cfg(feature = "for-internal-testing-only")]
|
|
{
|
|
self.cache.event_verifier.mark(
|
|
leaf_guard.node.object_id,
|
|
new_epoch,
|
|
event_verifier::State::Dirty,
|
|
concat!(file!(), ':', line!(), ":cas"),
|
|
);
|
|
}
|
|
}
|
|
if let Some((split_key, rhs_node)) = split {
|
|
log::trace!(
|
|
"B adding new from split {:?} to dirty {:?}",
|
|
rhs_node.object_id,
|
|
new_epoch
|
|
);
|
|
self.cache.install_dirty(
|
|
new_epoch,
|
|
rhs_node.object_id,
|
|
Dirty::NotYetSerialized {
|
|
collection_id: self.collection_id,
|
|
node: rhs_node.clone(),
|
|
low_key: split_key.clone(),
|
|
},
|
|
);
|
|
|
|
#[cfg(feature = "for-internal-testing-only")]
|
|
{
|
|
self.cache.event_verifier.mark(
|
|
rhs_node.object_id,
|
|
new_epoch,
|
|
event_verifier::State::Dirty,
|
|
concat!(file!(), ':', line!(), "cas-split"),
|
|
);
|
|
}
|
|
|
|
// NB only make the new node reachable via the index after
|
|
// we marked it as dirty, as from this point on, any other
|
|
// thread may cooperatively deserialize it and maybe conflict
|
|
// with that previous NotYetSerialized marker.
|
|
self.cache
|
|
.object_id_index
|
|
.insert(rhs_node.object_id, rhs_node.clone());
|
|
let prev = self.index.insert(split_key, rhs_node);
|
|
assert!(prev.is_none());
|
|
}
|
|
|
|
if leaf.data.is_empty() && leaf_guard.low_key != InlineArray::MIN {
|
|
assert!(!split_happened);
|
|
self.merge_leaf_into_left_sibling(leaf_guard)?;
|
|
}
|
|
|
|
Ok(ret)
|
|
}
|
|
|
|
/// Fetch the value, apply a function to it and return the result.
|
|
///
|
|
/// # Note
|
|
///
|
|
/// This may call the function multiple times if the value has been
|
|
/// changed from other threads in the meantime.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// use sled::{Config, InlineArray};
|
|
///
|
|
/// let config = Config::tmp().unwrap();
|
|
/// let db: sled::Db<1024> = config.open()?;
|
|
///
|
|
/// fn u64_to_ivec(number: u64) -> InlineArray {
|
|
/// InlineArray::from(number.to_be_bytes().to_vec())
|
|
/// }
|
|
///
|
|
/// let zero = u64_to_ivec(0);
|
|
/// let one = u64_to_ivec(1);
|
|
/// let two = u64_to_ivec(2);
|
|
/// let three = u64_to_ivec(3);
|
|
///
|
|
/// fn increment(old: Option<&[u8]>) -> Option<Vec<u8>> {
|
|
/// let number = match old {
|
|
/// Some(bytes) => {
|
|
/// let array: [u8; 8] = bytes.try_into().unwrap();
|
|
/// let number = u64::from_be_bytes(array);
|
|
/// number + 1
|
|
/// }
|
|
/// None => 0,
|
|
/// };
|
|
///
|
|
/// Some(number.to_be_bytes().to_vec())
|
|
/// }
|
|
///
|
|
/// assert_eq!(db.update_and_fetch("counter", increment).unwrap(), Some(zero));
|
|
/// assert_eq!(db.update_and_fetch("counter", increment).unwrap(), Some(one));
|
|
/// assert_eq!(db.update_and_fetch("counter", increment).unwrap(), Some(two));
|
|
/// assert_eq!(db.update_and_fetch("counter", increment).unwrap(), Some(three));
|
|
/// # Ok(()) }
|
|
/// ```
|
|
pub fn update_and_fetch<K, V, F>(
|
|
&self,
|
|
key: K,
|
|
mut f: F,
|
|
) -> io::Result<Option<InlineArray>>
|
|
where
|
|
K: AsRef<[u8]>,
|
|
F: FnMut(Option<&[u8]>) -> Option<V>,
|
|
V: Into<InlineArray>,
|
|
{
|
|
let key_ref = key.as_ref();
|
|
let mut current = self.get(key_ref)?;
|
|
|
|
loop {
|
|
let tmp = current.as_ref().map(AsRef::as_ref);
|
|
let next = f(tmp).map(Into::into);
|
|
match self.compare_and_swap::<_, _, InlineArray>(
|
|
key_ref,
|
|
tmp,
|
|
next.clone(),
|
|
)? {
|
|
Ok(_) => return Ok(next),
|
|
Err(CompareAndSwapError { current: cur, .. }) => {
|
|
current = cur;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Fetch the value, apply a function to it and return the previous value.
|
|
///
|
|
/// # Note
|
|
///
|
|
/// This may call the function multiple times if the value has been
|
|
/// changed from other threads in the meantime.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// use sled::{Config, InlineArray};
|
|
///
|
|
/// let config = Config::tmp().unwrap();
|
|
/// let db: sled::Db<1024> = config.open()?;
|
|
///
|
|
/// fn u64_to_ivec(number: u64) -> InlineArray {
|
|
/// InlineArray::from(number.to_be_bytes().to_vec())
|
|
/// }
|
|
///
|
|
/// let zero = u64_to_ivec(0);
|
|
/// let one = u64_to_ivec(1);
|
|
/// let two = u64_to_ivec(2);
|
|
///
|
|
/// fn increment(old: Option<&[u8]>) -> Option<Vec<u8>> {
|
|
/// let number = match old {
|
|
/// Some(bytes) => {
|
|
/// let array: [u8; 8] = bytes.try_into().unwrap();
|
|
/// let number = u64::from_be_bytes(array);
|
|
/// number + 1
|
|
/// }
|
|
/// None => 0,
|
|
/// };
|
|
///
|
|
/// Some(number.to_be_bytes().to_vec())
|
|
/// }
|
|
///
|
|
/// assert_eq!(db.fetch_and_update("counter", increment).unwrap(), None);
|
|
/// assert_eq!(db.fetch_and_update("counter", increment).unwrap(), Some(zero));
|
|
/// assert_eq!(db.fetch_and_update("counter", increment).unwrap(), Some(one));
|
|
/// assert_eq!(db.fetch_and_update("counter", increment).unwrap(), Some(two));
|
|
/// # Ok(()) }
|
|
/// ```
|
|
pub fn fetch_and_update<K, V, F>(
|
|
&self,
|
|
key: K,
|
|
mut f: F,
|
|
) -> io::Result<Option<InlineArray>>
|
|
where
|
|
K: AsRef<[u8]>,
|
|
F: FnMut(Option<&[u8]>) -> Option<V>,
|
|
V: Into<InlineArray>,
|
|
{
|
|
let key_ref = key.as_ref();
|
|
let mut current = self.get(key_ref)?;
|
|
|
|
loop {
|
|
let tmp = current.as_ref().map(AsRef::as_ref);
|
|
let next = f(tmp);
|
|
match self.compare_and_swap(key_ref, tmp, next)? {
|
|
Ok(_) => return Ok(current),
|
|
Err(CompareAndSwapError { current: cur, .. }) => {
|
|
current = cur;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn iter(&self) -> Iter<LEAF_FANOUT> {
|
|
Iter {
|
|
prefetched: VecDeque::new(),
|
|
prefetched_back: VecDeque::new(),
|
|
next_fetch: Some(InlineArray::MIN),
|
|
next_back_last_lo: None,
|
|
next_calls: 0,
|
|
next_back_calls: 0,
|
|
inner: self.clone(),
|
|
bounds: (Bound::Unbounded, Bound::Unbounded),
|
|
}
|
|
}
|
|
|
|
pub fn range<K, R>(&self, range: R) -> Iter<LEAF_FANOUT>
|
|
where
|
|
K: AsRef<[u8]>,
|
|
R: RangeBounds<K>,
|
|
{
|
|
let start: Bound<InlineArray> =
|
|
map_bound(range.start_bound(), |b| InlineArray::from(b.as_ref()));
|
|
let end: Bound<InlineArray> =
|
|
map_bound(range.end_bound(), |b| InlineArray::from(b.as_ref()));
|
|
|
|
let next_fetch = Some(match &start {
|
|
Bound::Included(b) | Bound::Excluded(b) => b.clone(),
|
|
Bound::Unbounded => InlineArray::MIN,
|
|
});
|
|
|
|
Iter {
|
|
prefetched: VecDeque::new(),
|
|
prefetched_back: VecDeque::new(),
|
|
next_fetch,
|
|
next_back_last_lo: None,
|
|
next_calls: 0,
|
|
next_back_calls: 0,
|
|
inner: self.clone(),
|
|
bounds: (start, end),
|
|
}
|
|
}
|
|
|
|
/// Create a new batched update that is applied
|
|
/// atomically. Readers will atomically see all updates
|
|
/// at an atomic instant, and if the database crashes,
|
|
/// either 0% or 100% of the full batch will be recovered,
|
|
/// but never a partial batch. If a `flush` operation succeeds
|
|
/// after this, it is guaranteed that 100% of the batch will be
|
|
/// visible, unless later concurrent updates changed the values
|
|
/// before the flush.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// # let _ = std::fs::remove_dir_all("batch_doctest");
|
|
/// # let db: sled::Db<1024> = sled::open("batch_doctest")?;
|
|
/// db.insert("key_0", "val_0")?;
|
|
///
|
|
/// let mut batch = sled::Batch::default();
|
|
/// batch.insert("key_a", "val_a");
|
|
/// batch.insert("key_b", "val_b");
|
|
/// batch.insert("key_c", "val_c");
|
|
/// batch.remove("key_0");
|
|
///
|
|
/// db.apply_batch(batch)?;
|
|
/// // key_0 no longer exists, and key_a, key_b, and key_c
|
|
/// // now do exist.
|
|
/// # let _ = std::fs::remove_dir_all("batch_doctest");
|
|
/// # Ok(()) }
|
|
/// ```
|
|
pub fn apply_batch(&self, batch: Batch) -> io::Result<()> {
|
|
// NB: we rely on lexicographic lock acquisition
|
|
// by iterating over the batch's BTreeMap to avoid
|
|
// deadlocks during 2PL
|
|
let mut acquired_locks: BTreeMap<
|
|
InlineArray,
|
|
(
|
|
ArcRwLockWriteGuard<RawRwLock, CacheBox<LEAF_FANOUT>>,
|
|
Object<LEAF_FANOUT>,
|
|
),
|
|
> = BTreeMap::new();
|
|
|
|
// Phase 1: lock acquisition
|
|
let mut last: Option<(
|
|
InlineArray,
|
|
ArcRwLockWriteGuard<RawRwLock, CacheBox<LEAF_FANOUT>>,
|
|
Object<LEAF_FANOUT>,
|
|
)> = None;
|
|
|
|
for key in batch.writes.keys() {
|
|
if let Some((_lo, w, _id)) = &last {
|
|
let leaf = w.leaf.as_ref().unwrap();
|
|
assert!(&leaf.lo <= key);
|
|
if let Some(hi) = &leaf.hi {
|
|
if hi <= key {
|
|
let (lo, w, n) = last.take().unwrap();
|
|
acquired_locks.insert(lo, (w, n));
|
|
}
|
|
}
|
|
}
|
|
if last.is_none() {
|
|
// TODO evaluate whether this is correct, as page_in performs
|
|
// cache maintenance internally if it over/undershoots due to
|
|
// concurrent modifications.
|
|
last =
|
|
Some(self.page_in(key, self.cache.current_flush_epoch())?);
|
|
}
|
|
}
|
|
|
|
if let Some((lo, w, id)) = last.take() {
|
|
acquired_locks.insert(lo, (w, id));
|
|
}
|
|
|
|
// NB: add the flush epoch at the end of the lock acquisition
|
|
// process when all locks have been acquired, to avoid situations
|
|
// where a leaf is already dirty with an epoch "from the future".
|
|
let flush_epoch_guard = self.cache.check_into_flush_epoch();
|
|
let new_epoch = flush_epoch_guard.epoch();
|
|
|
|
// Flush any leaves that are dirty from a previous flush epoch
|
|
// before performing operations.
|
|
for (write, node) in acquired_locks.values_mut() {
|
|
let leaf = write.leaf.as_mut().unwrap();
|
|
if let Some(old_flush_epoch) = leaf.dirty_flush_epoch {
|
|
if old_flush_epoch == new_epoch {
|
|
// no need to cooperatively flush
|
|
continue;
|
|
}
|
|
|
|
assert!(old_flush_epoch < new_epoch);
|
|
|
|
log::trace!(
|
|
"cooperatively flushing {:?} with dirty {:?} after checking into {:?}",
|
|
node.object_id,
|
|
old_flush_epoch,
|
|
new_epoch
|
|
);
|
|
|
|
// cooperatively serialize and put into dirty
|
|
let old_dirty_epoch = leaf.dirty_flush_epoch.take().unwrap();
|
|
leaf.max_unflushed_epoch = Some(old_dirty_epoch);
|
|
|
|
// be extra-explicit about serialized bytes
|
|
let leaf_ref: &Leaf<LEAF_FANOUT> = &*leaf;
|
|
|
|
let serialized = leaf_ref
|
|
.serialize(self.cache.config.zstd_compression_level);
|
|
|
|
log::trace!(
|
|
"C adding node {} to dirty epoch {:?}",
|
|
node.object_id.0,
|
|
old_dirty_epoch
|
|
);
|
|
|
|
#[cfg(feature = "for-internal-testing-only")]
|
|
{
|
|
self.cache.event_verifier.mark(
|
|
node.object_id,
|
|
old_dirty_epoch,
|
|
event_verifier::State::CooperativelySerialized,
|
|
concat!(
|
|
file!(),
|
|
':',
|
|
line!(),
|
|
":batch-cooperative-serialization"
|
|
),
|
|
);
|
|
}
|
|
|
|
self.cache.install_dirty(
|
|
old_dirty_epoch,
|
|
node.object_id,
|
|
Dirty::CooperativelySerialized {
|
|
object_id: node.object_id,
|
|
collection_id: self.collection_id,
|
|
mutation_count: leaf_ref.mutation_count,
|
|
low_key: leaf.lo.clone(),
|
|
data: Arc::new(serialized),
|
|
},
|
|
);
|
|
|
|
assert!(
|
|
old_flush_epoch < flush_epoch_guard.epoch(),
|
|
"flush epochs somehow became unlinked"
|
|
);
|
|
}
|
|
}
|
|
|
|
let mut splits: Vec<(InlineArray, Object<LEAF_FANOUT>)> = vec![];
|
|
let mut merges: BTreeMap<InlineArray, Object<LEAF_FANOUT>> =
|
|
BTreeMap::new();
|
|
|
|
// Insert and split when full
|
|
for (key, value_opt) in batch.writes {
|
|
let range = ..=&key;
|
|
let (lo, (ref mut w, object)) = acquired_locks
|
|
.range_mut::<InlineArray, _>(range)
|
|
.next_back()
|
|
.unwrap();
|
|
let leaf = w.leaf.as_mut().unwrap();
|
|
|
|
assert_eq!(lo, &leaf.lo);
|
|
assert!(leaf.lo <= key);
|
|
if let Some(hi) = &leaf.hi {
|
|
assert!(hi > &key);
|
|
}
|
|
|
|
if let Some(value) = value_opt {
|
|
leaf.insert(key, value);
|
|
merges.remove(lo);
|
|
|
|
merges.remove(&leaf.lo);
|
|
|
|
if let Some((split_key, rhs_node)) = leaf.split_if_full(
|
|
new_epoch,
|
|
&self.cache,
|
|
self.collection_id,
|
|
) {
|
|
let write = rhs_node.inner.write_arc();
|
|
assert!(write.leaf.is_some());
|
|
|
|
splits.push((split_key.clone(), rhs_node.clone()));
|
|
acquired_locks.insert(split_key, (write, rhs_node));
|
|
}
|
|
} else {
|
|
leaf.remove(&key);
|
|
|
|
if leaf.is_empty() {
|
|
assert_eq!(leaf.lo, lo);
|
|
merges.insert(leaf.lo.clone(), object.clone());
|
|
}
|
|
}
|
|
}
|
|
|
|
// Make splits globally visible
|
|
for (split_key, rhs_node) in splits {
|
|
self.cache
|
|
.object_id_index
|
|
.insert(rhs_node.object_id, rhs_node.clone());
|
|
self.index.insert(split_key, rhs_node);
|
|
}
|
|
|
|
// Add all written leaves to dirty and prepare to mark cache accesses
|
|
let mut cache_accesses = Vec::with_capacity(acquired_locks.len());
|
|
for (low_key, (write, node)) in &mut acquired_locks {
|
|
let leaf = write.leaf.as_mut().unwrap();
|
|
leaf.set_dirty_epoch(new_epoch);
|
|
leaf.mutation_count += 1;
|
|
cache_accesses.push((node.object_id, leaf.in_memory_size));
|
|
self.cache.install_dirty(
|
|
new_epoch,
|
|
node.object_id,
|
|
Dirty::NotYetSerialized {
|
|
collection_id: self.collection_id,
|
|
node: node.clone(),
|
|
low_key: low_key.clone(),
|
|
},
|
|
);
|
|
|
|
#[cfg(feature = "for-internal-testing-only")]
|
|
{
|
|
self.cache.event_verifier.mark(
|
|
node.object_id,
|
|
new_epoch,
|
|
event_verifier::State::Dirty,
|
|
concat!(file!(), ':', line!(), ":apply-batch"),
|
|
);
|
|
}
|
|
}
|
|
|
|
// Drop locks
|
|
drop(acquired_locks);
|
|
|
|
// Perform cache maintenance
|
|
for (object_id, size) in cache_accesses {
|
|
self.cache.mark_access_and_evict(object_id, size, new_epoch)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Returns `true` if the `Tree` contains a value for
|
|
/// the specified key.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// # let config = sled::Config::tmp().unwrap();
|
|
/// # let db: sled::Db<1024> = config.open()?;
|
|
/// db.insert(&[0], vec![0])?;
|
|
/// assert!(db.contains_key(&[0])?);
|
|
/// assert!(!db.contains_key(&[1])?);
|
|
/// # Ok(()) }
|
|
/// ```
|
|
pub fn contains_key<K: AsRef<[u8]>>(&self, key: K) -> io::Result<bool> {
|
|
self.get(key).map(|v| v.is_some())
|
|
}
|
|
|
|
/// Retrieve the key and value before the provided key,
|
|
/// if one exists.
|
|
///
|
|
/// # Note
|
|
/// The order follows the Ord implementation for `Vec<u8>`:
|
|
///
|
|
/// `[] < [0] < [255] < [255, 0] < [255, 255] ...`
|
|
///
|
|
/// To retain the ordering of numerical types use big endian reprensentation
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// use sled::InlineArray;
|
|
/// # let config = sled::Config::tmp().unwrap();
|
|
/// # let db: sled::Db<1024> = config.open()?;
|
|
/// for i in 0..10 {
|
|
/// db.insert(&[i], vec![i])
|
|
/// .expect("should write successfully");
|
|
/// }
|
|
///
|
|
/// assert!(db.get_lt(&[]).unwrap().is_none());
|
|
/// assert!(db.get_lt(&[0]).unwrap().is_none());
|
|
/// assert_eq!(
|
|
/// db.get_lt(&[1]).unwrap(),
|
|
/// Some((InlineArray::from(&[0]), InlineArray::from(&[0])))
|
|
/// );
|
|
/// assert_eq!(
|
|
/// db.get_lt(&[9]).unwrap(),
|
|
/// Some((InlineArray::from(&[8]), InlineArray::from(&[8])))
|
|
/// );
|
|
/// assert_eq!(
|
|
/// db.get_lt(&[10]).unwrap(),
|
|
/// Some((InlineArray::from(&[9]), InlineArray::from(&[9])))
|
|
/// );
|
|
/// assert_eq!(
|
|
/// db.get_lt(&[255]).unwrap(),
|
|
/// Some((InlineArray::from(&[9]), InlineArray::from(&[9])))
|
|
/// );
|
|
/// # Ok(()) }
|
|
/// ```
|
|
pub fn get_lt<K>(
|
|
&self,
|
|
key: K,
|
|
) -> io::Result<Option<(InlineArray, InlineArray)>>
|
|
where
|
|
K: AsRef<[u8]>,
|
|
{
|
|
self.range(..key).next_back().transpose()
|
|
}
|
|
|
|
/// Retrieve the next key and value from the `Tree` after the
|
|
/// provided key.
|
|
///
|
|
/// # Note
|
|
/// The order follows the Ord implementation for `Vec<u8>`:
|
|
///
|
|
/// `[] < [0] < [255] < [255, 0] < [255, 255] ...`
|
|
///
|
|
/// To retain the ordering of numerical types use big endian reprensentation
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// use sled::InlineArray;
|
|
/// # let config = sled::Config::tmp().unwrap();
|
|
/// # let db: sled::Db<1024> = config.open()?;
|
|
/// for i in 0..10 {
|
|
/// db.insert(&[i], vec![i])?;
|
|
/// }
|
|
///
|
|
/// assert_eq!(
|
|
/// db.get_gt(&[]).unwrap(),
|
|
/// Some((InlineArray::from(&[0]), InlineArray::from(&[0])))
|
|
/// );
|
|
/// assert_eq!(
|
|
/// db.get_gt(&[0]).unwrap(),
|
|
/// Some((InlineArray::from(&[1]), InlineArray::from(&[1])))
|
|
/// );
|
|
/// assert_eq!(
|
|
/// db.get_gt(&[1]).unwrap(),
|
|
/// Some((InlineArray::from(&[2]), InlineArray::from(&[2])))
|
|
/// );
|
|
/// assert_eq!(
|
|
/// db.get_gt(&[8]).unwrap(),
|
|
/// Some((InlineArray::from(&[9]), InlineArray::from(&[9])))
|
|
/// );
|
|
/// assert!(db.get_gt(&[9]).unwrap().is_none());
|
|
///
|
|
/// db.insert(500u16.to_be_bytes(), vec![10]);
|
|
/// assert_eq!(
|
|
/// db.get_gt(&499u16.to_be_bytes()).unwrap(),
|
|
/// Some((InlineArray::from(&500u16.to_be_bytes()), InlineArray::from(&[10])))
|
|
/// );
|
|
/// # Ok(()) }
|
|
/// ```
|
|
pub fn get_gt<K>(
|
|
&self,
|
|
key: K,
|
|
) -> io::Result<Option<(InlineArray, InlineArray)>>
|
|
where
|
|
K: AsRef<[u8]>,
|
|
{
|
|
self.range((ops::Bound::Excluded(key), ops::Bound::Unbounded))
|
|
.next()
|
|
.transpose()
|
|
}
|
|
|
|
/// Create an iterator over tuples of keys and values
|
|
/// where all keys start with the given prefix.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// # let config = sled::Config::tmp().unwrap();
|
|
/// # let db: sled::Db<1024> = config.open()?;
|
|
/// use sled::InlineArray;
|
|
/// db.insert(&[0, 0, 0], vec![0, 0, 0])?;
|
|
/// db.insert(&[0, 0, 1], vec![0, 0, 1])?;
|
|
/// db.insert(&[0, 0, 2], vec![0, 0, 2])?;
|
|
/// db.insert(&[0, 0, 3], vec![0, 0, 3])?;
|
|
/// db.insert(&[0, 1, 0], vec![0, 1, 0])?;
|
|
/// db.insert(&[0, 1, 1], vec![0, 1, 1])?;
|
|
///
|
|
/// let prefix: &[u8] = &[0, 0];
|
|
/// let mut r = db.scan_prefix(prefix);
|
|
/// assert_eq!(
|
|
/// r.next().unwrap().unwrap(),
|
|
/// (InlineArray::from(&[0, 0, 0]), InlineArray::from(&[0, 0, 0]))
|
|
/// );
|
|
/// assert_eq!(
|
|
/// r.next().unwrap().unwrap(),
|
|
/// (InlineArray::from(&[0, 0, 1]), InlineArray::from(&[0, 0, 1]))
|
|
/// );
|
|
/// assert_eq!(
|
|
/// r.next().unwrap().unwrap(),
|
|
/// (InlineArray::from(&[0, 0, 2]), InlineArray::from(&[0, 0, 2]))
|
|
/// );
|
|
/// assert_eq!(
|
|
/// r.next().unwrap().unwrap(),
|
|
/// (InlineArray::from(&[0, 0, 3]), InlineArray::from(&[0, 0, 3]))
|
|
/// );
|
|
/// assert!(r.next().is_none());
|
|
/// # Ok(()) }
|
|
/// ```
|
|
pub fn scan_prefix<'a, P>(&'a self, prefix: P) -> Iter<LEAF_FANOUT>
|
|
where
|
|
P: AsRef<[u8]>,
|
|
{
|
|
let prefix_ref = prefix.as_ref();
|
|
let mut upper = prefix_ref.to_vec();
|
|
|
|
while let Some(last) = upper.pop() {
|
|
if last < u8::MAX {
|
|
upper.push(last + 1);
|
|
return self.range(prefix_ref..&upper);
|
|
}
|
|
}
|
|
|
|
self.range(prefix..)
|
|
}
|
|
|
|
/// Returns the first key and value in the `Tree`, or
|
|
/// `None` if the `Tree` is empty.
|
|
pub fn first(&self) -> io::Result<Option<(InlineArray, InlineArray)>> {
|
|
self.iter().next().transpose()
|
|
}
|
|
|
|
/// Returns the last key and value in the `Tree`, or
|
|
/// `None` if the `Tree` is empty.
|
|
pub fn last(&self) -> io::Result<Option<(InlineArray, InlineArray)>> {
|
|
self.iter().next_back().transpose()
|
|
}
|
|
|
|
/// Atomically removes the maximum item in the `Tree` instance.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// # let config = sled::Config::tmp().unwrap();
|
|
/// # let db: sled::Db<1024> = config.open()?;
|
|
/// db.insert(&[0], vec![0])?;
|
|
/// db.insert(&[1], vec![10])?;
|
|
/// db.insert(&[2], vec![20])?;
|
|
/// db.insert(&[3], vec![30])?;
|
|
/// db.insert(&[4], vec![40])?;
|
|
/// db.insert(&[5], vec![50])?;
|
|
///
|
|
/// assert_eq!(&db.pop_last()?.unwrap().0, &[5]);
|
|
/// assert_eq!(&db.pop_last()?.unwrap().0, &[4]);
|
|
/// assert_eq!(&db.pop_last()?.unwrap().0, &[3]);
|
|
/// assert_eq!(&db.pop_last()?.unwrap().0, &[2]);
|
|
/// assert_eq!(&db.pop_last()?.unwrap().0, &[1]);
|
|
/// assert_eq!(&db.pop_last()?.unwrap().0, &[0]);
|
|
/// assert_eq!(db.pop_last()?, None);
|
|
/// # Ok(()) }
|
|
/// ```
|
|
pub fn pop_last(&self) -> io::Result<Option<(InlineArray, InlineArray)>> {
|
|
loop {
|
|
if let Some(first_res) = self.iter().next_back() {
|
|
let first = first_res?;
|
|
if self
|
|
.compare_and_swap::<_, _, &[u8]>(
|
|
&first.0,
|
|
Some(&first.1),
|
|
None,
|
|
)?
|
|
.is_ok()
|
|
{
|
|
log::trace!("pop_last removed item {:?}", first);
|
|
return Ok(Some(first));
|
|
}
|
|
// try again
|
|
} else {
|
|
log::trace!("pop_last removed nothing from empty tree");
|
|
return Ok(None);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Pops the last kv pair in the provided range, or returns `Ok(None)` if nothing
|
|
/// exists within that range.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// This will panic if the provided range's end_bound() == Bound::Excluded(K::MIN).
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// # let config = sled::Config::tmp().unwrap();
|
|
/// # let db: sled::Db<1024> = config.open()?;
|
|
///
|
|
/// let data = vec![
|
|
/// (b"key 1", b"value 1"),
|
|
/// (b"key 2", b"value 2"),
|
|
/// (b"key 3", b"value 3")
|
|
/// ];
|
|
///
|
|
/// for (k, v) in data {
|
|
/// db.insert(k, v).unwrap();
|
|
/// }
|
|
///
|
|
/// let r1 = db.pop_last_in_range(b"key 1".as_ref()..=b"key 3").unwrap();
|
|
/// assert_eq!(Some((b"key 3".into(), b"value 3".into())), r1);
|
|
///
|
|
/// let r2 = db.pop_last_in_range(b"key 1".as_ref()..b"key 3").unwrap();
|
|
/// assert_eq!(Some((b"key 2".into(), b"value 2".into())), r2);
|
|
///
|
|
/// let r3 = db.pop_last_in_range(b"key 4".as_ref()..).unwrap();
|
|
/// assert!(r3.is_none());
|
|
///
|
|
/// let r4 = db.pop_last_in_range(b"key 2".as_ref()..=b"key 3").unwrap();
|
|
/// assert!(r4.is_none());
|
|
///
|
|
/// let r5 = db.pop_last_in_range(b"key 0".as_ref()..=b"key 3").unwrap();
|
|
/// assert_eq!(Some((b"key 1".into(), b"value 1".into())), r5);
|
|
///
|
|
/// let r6 = db.pop_last_in_range(b"key 0".as_ref()..=b"key 3").unwrap();
|
|
/// assert!(r6.is_none());
|
|
/// # Ok (()) }
|
|
/// ```
|
|
pub fn pop_last_in_range<K, R>(
|
|
&self,
|
|
range: R,
|
|
) -> io::Result<Option<(InlineArray, InlineArray)>>
|
|
where
|
|
K: AsRef<[u8]>,
|
|
R: Clone + RangeBounds<K>,
|
|
{
|
|
loop {
|
|
let mut r = self.range(range.clone());
|
|
let (k, v) = if let Some(kv_res) = r.next_back() {
|
|
kv_res?
|
|
} else {
|
|
return Ok(None);
|
|
};
|
|
if self
|
|
.compare_and_swap(&k, Some(&v), None as Option<InlineArray>)?
|
|
.is_ok()
|
|
{
|
|
return Ok(Some((k, v)));
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Atomically removes the minimum item in the `Tree` instance.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// # let config = sled::Config::tmp().unwrap();
|
|
/// # let db: sled::Db<1024> = config.open()?;
|
|
/// db.insert(&[0], vec![0])?;
|
|
/// db.insert(&[1], vec![10])?;
|
|
/// db.insert(&[2], vec![20])?;
|
|
/// db.insert(&[3], vec![30])?;
|
|
/// db.insert(&[4], vec![40])?;
|
|
/// db.insert(&[5], vec![50])?;
|
|
///
|
|
/// assert_eq!(&db.pop_first()?.unwrap().0, &[0]);
|
|
/// assert_eq!(&db.pop_first()?.unwrap().0, &[1]);
|
|
/// assert_eq!(&db.pop_first()?.unwrap().0, &[2]);
|
|
/// assert_eq!(&db.pop_first()?.unwrap().0, &[3]);
|
|
/// assert_eq!(&db.pop_first()?.unwrap().0, &[4]);
|
|
/// assert_eq!(&db.pop_first()?.unwrap().0, &[5]);
|
|
/// assert_eq!(db.pop_first()?, None);
|
|
/// # Ok(()) }
|
|
/// ```
|
|
pub fn pop_first(&self) -> io::Result<Option<(InlineArray, InlineArray)>> {
|
|
loop {
|
|
if let Some(first_res) = self.iter().next() {
|
|
let first = first_res?;
|
|
if self
|
|
.compare_and_swap::<_, _, &[u8]>(
|
|
&first.0,
|
|
Some(&first.1),
|
|
None,
|
|
)?
|
|
.is_ok()
|
|
{
|
|
log::trace!("pop_first removed item {:?}", first);
|
|
return Ok(Some(first));
|
|
}
|
|
// try again
|
|
} else {
|
|
log::trace!("pop_first removed nothing from empty tree");
|
|
return Ok(None);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Pops the first kv pair in the provided range, or returns `Ok(None)` if nothing
|
|
/// exists within that range.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// This will panic if the provided range's end_bound() == Bound::Excluded(K::MIN).
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// # let config = sled::Config::tmp().unwrap();
|
|
/// # let db: sled::Db<1024> = config.open()?;
|
|
///
|
|
/// let data = vec![
|
|
/// (b"key 1", b"value 1"),
|
|
/// (b"key 2", b"value 2"),
|
|
/// (b"key 3", b"value 3")
|
|
/// ];
|
|
///
|
|
/// for (k, v) in data {
|
|
/// db.insert(k, v).unwrap();
|
|
/// }
|
|
///
|
|
/// let r1 = db.pop_first_in_range("key 1".as_ref()..="key 3").unwrap();
|
|
/// assert_eq!(Some((b"key 1".into(), b"value 1".into())), r1);
|
|
///
|
|
/// let r2 = db.pop_first_in_range("key 1".as_ref().."key 3").unwrap();
|
|
/// assert_eq!(Some((b"key 2".into(), b"value 2".into())), r2);
|
|
///
|
|
/// let r3_res: std::io::Result<Vec<_>> = db.range(b"key 4".as_ref()..).collect();
|
|
/// let r3: Vec<_> = r3_res.unwrap();
|
|
/// assert!(r3.is_empty());
|
|
///
|
|
/// let r4 = db.pop_first_in_range("key 2".as_ref()..="key 3").unwrap();
|
|
/// assert_eq!(Some((b"key 3".into(), b"value 3".into())), r4);
|
|
/// # Ok (()) }
|
|
/// ```
|
|
pub fn pop_first_in_range<K, R>(
|
|
&self,
|
|
range: R,
|
|
) -> io::Result<Option<(InlineArray, InlineArray)>>
|
|
where
|
|
K: AsRef<[u8]>,
|
|
R: Clone + RangeBounds<K>,
|
|
{
|
|
loop {
|
|
let mut r = self.range(range.clone());
|
|
let (k, v) = if let Some(kv_res) = r.next() {
|
|
kv_res?
|
|
} else {
|
|
return Ok(None);
|
|
};
|
|
if self
|
|
.compare_and_swap(&k, Some(&v), None as Option<InlineArray>)?
|
|
.is_ok()
|
|
{
|
|
return Ok(Some((k, v)));
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Returns the number of elements in this tree.
|
|
///
|
|
/// Beware: performs a full O(n) scan under the hood.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// # let config = sled::Config::tmp().unwrap();
|
|
/// # let db: sled::Db<1024> = config.open()?;
|
|
/// db.insert(b"a", vec![0]);
|
|
/// db.insert(b"b", vec![1]);
|
|
/// assert_eq!(db.len(), 2);
|
|
/// # Ok(()) }
|
|
/// ```
|
|
pub fn len(&self) -> usize {
|
|
self.iter().count()
|
|
}
|
|
|
|
/// Returns `true` if the `Tree` contains no elements.
|
|
///
|
|
/// This is O(1), as we only need to see if an iterator
|
|
/// returns anything for the first call to `next()`.
|
|
pub fn is_empty(&self) -> io::Result<bool> {
|
|
if let Some(res) = self.iter().next() {
|
|
res?;
|
|
Ok(false)
|
|
} else {
|
|
Ok(true)
|
|
}
|
|
}
|
|
|
|
/// Clears the `Tree`, removing all values.
|
|
///
|
|
/// Note that this is not atomic.
|
|
///
|
|
/// Beware: performs a full O(n) scan under the hood.
|
|
pub fn clear(&self) -> io::Result<()> {
|
|
for k in self.iter().keys() {
|
|
let key = k?;
|
|
let _old = self.remove(key)?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Returns the CRC32 of all keys and values
|
|
/// in this Tree.
|
|
///
|
|
/// This is O(N) and locks the underlying tree
|
|
/// for the duration of the entire scan.
|
|
pub fn checksum(&self) -> io::Result<u32> {
|
|
let mut hasher = crc32fast::Hasher::new();
|
|
for kv_res in self.iter() {
|
|
let (k, v) = kv_res?;
|
|
hasher.update(&k);
|
|
hasher.update(&v);
|
|
}
|
|
Ok(hasher.finalize())
|
|
}
|
|
}
|
|
|
|
#[allow(unused)]
|
|
pub struct Iter<const LEAF_FANOUT: usize> {
|
|
inner: Tree<LEAF_FANOUT>,
|
|
bounds: (Bound<InlineArray>, Bound<InlineArray>),
|
|
next_calls: usize,
|
|
next_back_calls: usize,
|
|
next_fetch: Option<InlineArray>,
|
|
next_back_last_lo: Option<InlineArray>,
|
|
prefetched: VecDeque<(InlineArray, InlineArray)>,
|
|
prefetched_back: VecDeque<(InlineArray, InlineArray)>,
|
|
}
|
|
|
|
impl<const LEAF_FANOUT: usize> Iterator for Iter<LEAF_FANOUT> {
|
|
type Item = io::Result<(InlineArray, InlineArray)>;
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
self.next_calls += 1;
|
|
while self.prefetched.is_empty() {
|
|
let search_key = if let Some(last) = &self.next_fetch {
|
|
last.clone()
|
|
} else {
|
|
return None;
|
|
};
|
|
|
|
let node = match self.inner.leaf_for_key(&search_key) {
|
|
Ok(n) => n,
|
|
Err(e) => return Some(Err(e)),
|
|
};
|
|
|
|
let leaf = node.leaf_read.leaf.as_ref().unwrap();
|
|
|
|
if let Some(leaf_hi) = &leaf.hi {
|
|
if leaf_hi <= &search_key {
|
|
// concurrent merge, retry
|
|
log::trace!("undershot in interator, retrying search");
|
|
continue;
|
|
}
|
|
}
|
|
|
|
if leaf.lo > search_key {
|
|
// concurrent successor split, retry
|
|
log::trace!("overshot in interator, retrying search");
|
|
continue;
|
|
}
|
|
|
|
for (k, v) in leaf.data.iter() {
|
|
if self.bounds.contains(k) && &search_key <= k {
|
|
self.prefetched.push_back((k.clone(), v.clone()));
|
|
}
|
|
}
|
|
|
|
self.next_fetch = leaf.hi.clone();
|
|
}
|
|
|
|
self.prefetched.pop_front().map(Ok)
|
|
}
|
|
}
|
|
|
|
impl<const LEAF_FANOUT: usize> DoubleEndedIterator for Iter<LEAF_FANOUT> {
|
|
fn next_back(&mut self) -> Option<Self::Item> {
|
|
self.next_back_calls += 1;
|
|
while self.prefetched_back.is_empty() {
|
|
let search_key: InlineArray = if let Some(last) =
|
|
&self.next_back_last_lo
|
|
{
|
|
if !self.bounds.contains(last) || last == &InlineArray::MIN {
|
|
return None;
|
|
}
|
|
self.inner
|
|
.index
|
|
.range::<InlineArray, _>(..last)
|
|
.next_back()
|
|
.unwrap()
|
|
.0
|
|
} else {
|
|
match &self.bounds.1 {
|
|
Bound::Included(k) => k.clone(),
|
|
Bound::Excluded(k) if k == &InlineArray::MIN => {
|
|
InlineArray::MIN
|
|
}
|
|
Bound::Excluded(k) => self.inner.index.get_lt(k).unwrap().0,
|
|
Bound::Unbounded => self.inner.index.last().unwrap().0,
|
|
}
|
|
};
|
|
|
|
let node = match self.inner.leaf_for_key(&search_key) {
|
|
Ok(n) => n,
|
|
Err(e) => return Some(Err(e)),
|
|
};
|
|
|
|
let leaf = node.leaf_read.leaf.as_ref().unwrap();
|
|
|
|
if leaf.lo > search_key {
|
|
// concurrent successor split, retry
|
|
log::trace!("overshot in reverse interator, retrying search");
|
|
continue;
|
|
}
|
|
|
|
// determine if we undershot our target due to concurrent modifications
|
|
let undershot =
|
|
match (&leaf.hi, &self.next_back_last_lo, &self.bounds.1) {
|
|
(Some(leaf_hi), Some(last_lo), _) => leaf_hi < last_lo,
|
|
(Some(_leaf_hi), None, Bound::Unbounded) => true,
|
|
(Some(leaf_hi), None, Bound::Included(bound_key)) => {
|
|
leaf_hi <= bound_key
|
|
}
|
|
(Some(leaf_hi), None, Bound::Excluded(bound_key)) => {
|
|
leaf_hi < bound_key
|
|
}
|
|
(None, _, _) => false,
|
|
};
|
|
|
|
if undershot {
|
|
log::trace!(
|
|
"undershoot detected in reverse iterator with \
|
|
(leaf_hi, next_back_last_lo, self.bounds.1) being {:?}",
|
|
(&leaf.hi, &self.next_back_last_lo, &self.bounds.1)
|
|
);
|
|
continue;
|
|
}
|
|
|
|
for (k, v) in leaf.data.iter() {
|
|
if self.bounds.contains(k) {
|
|
let beneath_last_lo =
|
|
if let Some(last_lo) = &self.next_back_last_lo {
|
|
k < last_lo
|
|
} else {
|
|
true
|
|
};
|
|
if beneath_last_lo {
|
|
self.prefetched_back.push_back((k.clone(), v.clone()));
|
|
}
|
|
}
|
|
}
|
|
self.next_back_last_lo = Some(leaf.lo.clone());
|
|
}
|
|
|
|
self.prefetched_back.pop_back().map(Ok)
|
|
}
|
|
}
|
|
|
|
impl<const LEAF_FANOUT: usize> Iter<LEAF_FANOUT> {
|
|
pub fn keys(
|
|
self,
|
|
) -> impl DoubleEndedIterator<Item = io::Result<InlineArray>> {
|
|
self.into_iter().map(|kv_res| kv_res.map(|(k, _v)| k))
|
|
}
|
|
|
|
pub fn values(
|
|
self,
|
|
) -> impl DoubleEndedIterator<Item = io::Result<InlineArray>> {
|
|
self.into_iter().map(|kv_res| kv_res.map(|(_k, v)| v))
|
|
}
|
|
}
|
|
|
|
impl<const LEAF_FANOUT: usize> IntoIterator for &Tree<LEAF_FANOUT> {
|
|
type Item = io::Result<(InlineArray, InlineArray)>;
|
|
type IntoIter = Iter<LEAF_FANOUT>;
|
|
|
|
fn into_iter(self) -> Self::IntoIter {
|
|
self.iter()
|
|
}
|
|
}
|
|
|
|
/// A batch of updates that will
|
|
/// be applied atomically to the
|
|
/// Tree.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// use sled::{Batch, open};
|
|
///
|
|
/// # let _ = std::fs::remove_dir_all("batch_db_2");
|
|
/// let db: sled::Db<1024> = open("batch_db_2")?;
|
|
/// db.insert("key_0", "val_0")?;
|
|
///
|
|
/// let mut batch = Batch::default();
|
|
/// batch.insert("key_a", "val_a");
|
|
/// batch.insert("key_b", "val_b");
|
|
/// batch.insert("key_c", "val_c");
|
|
/// batch.remove("key_0");
|
|
///
|
|
/// db.apply_batch(batch)?;
|
|
/// // key_0 no longer exists, and key_a, key_b, and key_c
|
|
/// // now do exist.
|
|
/// # let _ = std::fs::remove_dir_all("batch_db_2");
|
|
/// # Ok(()) }
|
|
/// ```
|
|
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
|
pub struct Batch {
|
|
pub(crate) writes:
|
|
std::collections::BTreeMap<InlineArray, Option<InlineArray>>,
|
|
}
|
|
|
|
impl Batch {
|
|
/// Set a key to a new value
|
|
pub fn insert<K, V>(&mut self, key: K, value: V)
|
|
where
|
|
K: Into<InlineArray>,
|
|
V: Into<InlineArray>,
|
|
{
|
|
self.writes.insert(key.into(), Some(value.into()));
|
|
}
|
|
|
|
/// Remove a key
|
|
pub fn remove<K>(&mut self, key: K)
|
|
where
|
|
K: Into<InlineArray>,
|
|
{
|
|
self.writes.insert(key.into(), None);
|
|
}
|
|
|
|
/// Get a value if it is present in the `Batch`.
|
|
/// `Some(None)` means it's present as a deletion.
|
|
pub fn get<K: AsRef<[u8]>>(&self, k: K) -> Option<Option<&InlineArray>> {
|
|
let inner = self.writes.get(k.as_ref())?;
|
|
Some(inner.as_ref())
|
|
}
|
|
}
|
|
|
|
impl<const LEAF_FANOUT: usize> Leaf<LEAF_FANOUT> {
|
|
pub fn serialize(&self, zstd_compression_level: i32) -> Vec<u8> {
|
|
let mut ret = vec![];
|
|
|
|
let mut zstd_enc =
|
|
zstd::stream::Encoder::new(&mut ret, zstd_compression_level)
|
|
.unwrap();
|
|
|
|
bincode::serialize_into(&mut zstd_enc, self).unwrap();
|
|
|
|
zstd_enc.finish().unwrap();
|
|
|
|
ret
|
|
}
|
|
|
|
fn deserialize(buf: &[u8]) -> io::Result<Box<Leaf<LEAF_FANOUT>>> {
|
|
let zstd_decoded = zstd::stream::decode_all(buf).unwrap();
|
|
let mut leaf: Box<Leaf<LEAF_FANOUT>> =
|
|
bincode::deserialize(&zstd_decoded).unwrap();
|
|
|
|
// use decompressed buffer length as a cheap proxy for in-memory size for now
|
|
leaf.in_memory_size = zstd_decoded.len();
|
|
|
|
Ok(leaf)
|
|
}
|
|
|
|
fn set_in_memory_size(&mut self) {
|
|
self.in_memory_size = mem::size_of::<Leaf<LEAF_FANOUT>>()
|
|
+ self.hi.as_ref().map(|h| h.len()).unwrap_or(0)
|
|
+ self.lo.len()
|
|
+ self.data.iter().map(|(k, v)| k.len() + v.len()).sum::<usize>();
|
|
}
|
|
|
|
fn split_if_full(
|
|
&mut self,
|
|
new_epoch: FlushEpoch,
|
|
allocator: &ObjectCache<LEAF_FANOUT>,
|
|
collection_id: CollectionId,
|
|
) -> Option<(InlineArray, Object<LEAF_FANOUT>)> {
|
|
if self.data.is_full() {
|
|
// split
|
|
let split_offset = if self.lo.is_empty() {
|
|
// split left-most shard almost at the beginning for
|
|
// optimizing downward-growing workloads
|
|
1
|
|
} else if self.hi.is_none() {
|
|
// split right-most shard almost at the end for
|
|
// optimizing upward-growing workloads
|
|
self.data.len() - 2
|
|
} else {
|
|
self.data.len() / 2
|
|
};
|
|
|
|
let data = self.data.split_off(split_offset);
|
|
|
|
let left_max = &self.data.last().unwrap().0;
|
|
let right_min = &data.first().unwrap().0;
|
|
|
|
// suffix truncation attempts to shrink the split key
|
|
// so that shorter keys bubble up into the index
|
|
let splitpoint_length = right_min
|
|
.iter()
|
|
.zip(left_max.iter())
|
|
.take_while(|(a, b)| a == b)
|
|
.count()
|
|
+ 1;
|
|
|
|
let split_key = InlineArray::from(&right_min[..splitpoint_length]);
|
|
|
|
let rhs_id = allocator.allocate_object_id(new_epoch);
|
|
|
|
log::trace!(
|
|
"split leaf {:?} at split key: {:?} into new {:?} at {:?}",
|
|
self.lo,
|
|
split_key,
|
|
rhs_id,
|
|
new_epoch,
|
|
);
|
|
|
|
let mut rhs = Leaf {
|
|
dirty_flush_epoch: Some(new_epoch),
|
|
hi: self.hi.clone(),
|
|
lo: split_key.clone(),
|
|
prefix_length: 0,
|
|
in_memory_size: 0,
|
|
data,
|
|
mutation_count: 0,
|
|
page_out_on_flush: None,
|
|
deleted: None,
|
|
max_unflushed_epoch: None,
|
|
};
|
|
rhs.set_in_memory_size();
|
|
|
|
self.hi = Some(split_key.clone());
|
|
self.set_in_memory_size();
|
|
|
|
assert_eq!(self.hi.as_ref().unwrap(), &split_key);
|
|
assert_eq!(rhs.lo, &split_key);
|
|
|
|
let rhs_node = Object {
|
|
object_id: rhs_id,
|
|
collection_id,
|
|
low_key: split_key.clone(),
|
|
inner: Arc::new(RwLock::new(CacheBox {
|
|
leaf: Some(Box::new(rhs)).into(),
|
|
logged_index: BTreeMap::default(),
|
|
})),
|
|
};
|
|
|
|
return Some((split_key, rhs_node));
|
|
}
|
|
|
|
None
|
|
}
|
|
}
|