mirror of https://github.com/spacejam/sled
410 lines
13 KiB
Rust
410 lines
13 KiB
Rust
// 1.0 blockers
|
|
//
|
|
// bugs
|
|
// * tree predecessor holds lock on successor and tries to get it for predecessor. This will
|
|
// deadlock if used concurrently with write batches, which acquire locks lexicographically.
|
|
// * add merges to iterator test and assert it deadlocks
|
|
// * alternative is to merge right, not left
|
|
// * page-out needs to be deferred until after any flush of the dirty epoch
|
|
// * need to remove max_unflushed_epoch after flushing it
|
|
// * can't send reliable page-out request backwards from 7->6
|
|
// * re-locking every mutex in a writebatch feels bad
|
|
// * need to signal stability status forward
|
|
// * maybe we already are
|
|
// * can make dirty_flush_epoch atomic and CAS it to 0 after flush
|
|
// * can change dirty_flush_epoch to unflushed_epoch
|
|
// * can always set mutation_count to max dirty flush epoch
|
|
// * this feels nice, we can lazily update a global stable flushed counter
|
|
// * can get rid of dirty_flush_epoch and page_out_on_flush?
|
|
// * or at least dirty_flush_epoch
|
|
// * dirty_flush_epoch really means "hasn't yet been cooperatively serialized @ F.E."
|
|
// * interesting metrics:
|
|
// * whether dirty for some epoch
|
|
// * whether cooperatively serialized for some epoch
|
|
// * whether fully flushed for some epoch
|
|
// * clean -> dirty -> {maybe coop} -> flushed
|
|
// * for page-out, we only care if it's stable or if we need to add it to
|
|
// a page-out priority queue
|
|
//
|
|
// reliability
|
|
// TODO make all writes wrapped in a Tearable wrapper that splits writes
|
|
// and can possibly crash based on a counter.
|
|
// TODO test concurrent drop_tree when other threads are still using it
|
|
// TODO list trees test for recovering empty collections
|
|
// TODO set explicit max key and value sizes w/ corresponding heap
|
|
// TODO add failpoints to writepath
|
|
//
|
|
// performance
|
|
// TODO handle prefix encoding
|
|
// TODO (minor) remove cache access for removed node in merge function
|
|
// TODO index+log hybrid - tinylsm key -> object location
|
|
//
|
|
// features
|
|
// TODO multi-collection batch
|
|
//
|
|
// misc
|
|
// TODO skim inlining output of RUSTFLAGS="-Cremark=all -Cdebuginfo=1"
|
|
//
|
|
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~ 1.0 cutoff ~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
//
|
|
// post-1.0 improvements
|
|
//
|
|
// reliability
|
|
// TODO bug hiding: if the crash_iter test panics, the test doesn't fail as expected
|
|
// TODO event log assertion for testing heap location bidirectional referential integrity,
|
|
// particularly in the object location mapper.
|
|
// TODO ensure nothing "from the future" gets copied into earlier epochs during GC
|
|
// TODO collection_id on page_in checks - it needs to be pinned w/ heap's EBR?
|
|
// TODO put aborts behind feature flags for hard crashes
|
|
// TODO re-enable transaction tests in test_tree.rs
|
|
//
|
|
// performance
|
|
// TODO force writers to flush when some number of dirty epochs have built up
|
|
// TODO serialize flush batch in parallel
|
|
// TODO concurrent serialization of NotYetSerialized dirty objects
|
|
// TODO make the Arc<Option<Box<Leaf just a single pointer chase w/ custom container
|
|
// TODO allow waiting flusher to start collecting dirty pages as soon
|
|
// as it is evacuated - just wait until last flush is done before
|
|
// we persist the batch
|
|
// TODO measure space savings vs cost of zstd in metadata store
|
|
// TODO make EBR and index fanout consts as small as possible to reduce memory usage
|
|
// TODO make leaf fanout as small as possible while retaining perf
|
|
// TODO dynamically sized fanouts for reducing fragmentation
|
|
//
|
|
// features
|
|
// TODO transactions
|
|
// TODO implement create exclusive
|
|
// TODO temporary trees for transactional in-memory coordination
|
|
// TODO corrupted data extraction binary
|
|
//
|
|
|
|
//! `sled` is a high-performance embedded database with
|
|
//! an API that is similar to a `BTreeMap<[u8], [u8]>`,
|
|
//! but with several additional capabilities for
|
|
//! assisting creators of stateful systems.
|
|
//!
|
|
//! It is fully thread-safe, and all operations are
|
|
//! atomic. Multiple `Tree`s with isolated keyspaces
|
|
//! are supported with the
|
|
//! [`Db::open_tree`](struct.Db.html#method.open_tree) method.
|
|
//!
|
|
//! `sled` is built by experienced database engineers
|
|
//! who think users should spend less time tuning and
|
|
//! working against high-friction APIs. Expect
|
|
//! significant ergonomic and performance improvements
|
|
//! over time. Most surprises are bugs, so please
|
|
//! [let us know](mailto:tylerneely@gmail.com?subject=sled%20sucks!!!)
|
|
//! if something is high friction.
|
|
//!
|
|
//! # Examples
|
|
//!
|
|
//! ```
|
|
//! # let _ = std::fs::remove_dir_all("my_db");
|
|
//! let db: sled::Db = sled::open("my_db").unwrap();
|
|
//!
|
|
//! // insert and get
|
|
//! db.insert(b"yo!", b"v1");
|
|
//! assert_eq!(&db.get(b"yo!").unwrap().unwrap(), b"v1");
|
|
//!
|
|
//! // Atomic compare-and-swap.
|
|
//! db.compare_and_swap(
|
|
//! b"yo!", // key
|
|
//! Some(b"v1"), // old value, None for not present
|
|
//! Some(b"v2"), // new value, None for delete
|
|
//! )
|
|
//! .unwrap();
|
|
//!
|
|
//! // Iterates over key-value pairs, starting at the given key.
|
|
//! let scan_key: &[u8] = b"a non-present key before yo!";
|
|
//! let mut iter = db.range(scan_key..);
|
|
//! assert_eq!(&iter.next().unwrap().unwrap().0, b"yo!");
|
|
//! assert!(iter.next().is_none());
|
|
//!
|
|
//! db.remove(b"yo!");
|
|
//! assert!(db.get(b"yo!").unwrap().is_none());
|
|
//!
|
|
//! let other_tree: sled::Tree = db.open_tree(b"cool db facts").unwrap();
|
|
//! other_tree.insert(
|
|
//! b"k1",
|
|
//! &b"a Db acts like a Tree due to implementing Deref<Target = Tree>"[..]
|
|
//! ).unwrap();
|
|
//! # let _ = std::fs::remove_dir_all("my_db");
|
|
//! ```
|
|
mod config;
|
|
mod db;
|
|
mod flush_epoch;
|
|
mod heap;
|
|
mod id_allocator;
|
|
mod leaf;
|
|
mod metadata_store;
|
|
mod object_cache;
|
|
mod object_location_mapper;
|
|
mod tree;
|
|
|
|
#[cfg(any(
|
|
feature = "testing_shred_allocator",
|
|
feature = "testing_count_allocator"
|
|
))]
|
|
pub mod alloc;
|
|
|
|
#[cfg(feature = "for-internal-testing-only")]
|
|
mod event_verifier;
|
|
|
|
#[inline]
|
|
fn debug_delay() {
|
|
#[cfg(debug_assertions)]
|
|
{
|
|
let rand =
|
|
std::time::SystemTime::UNIX_EPOCH.elapsed().unwrap().as_nanos();
|
|
|
|
if rand % 128 > 100 {
|
|
for _ in 0..rand % 16 {
|
|
std::thread::yield_now();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
pub use crate::config::Config;
|
|
pub use crate::db::Db;
|
|
pub use crate::tree::{Batch, Iter, Tree};
|
|
pub use inline_array::InlineArray;
|
|
|
|
const NAME_MAPPING_COLLECTION_ID: CollectionId = CollectionId(0);
|
|
const DEFAULT_COLLECTION_ID: CollectionId = CollectionId(1);
|
|
const INDEX_FANOUT: usize = 64;
|
|
const EBR_LOCAL_GC_BUFFER_SIZE: usize = 128;
|
|
|
|
use std::collections::BTreeMap;
|
|
use std::num::NonZeroU64;
|
|
use std::ops::Bound;
|
|
use std::sync::Arc;
|
|
|
|
use parking_lot::RwLock;
|
|
|
|
use crate::flush_epoch::{
|
|
FlushEpoch, FlushEpochGuard, FlushEpochTracker, FlushInvariants,
|
|
};
|
|
use crate::heap::{
|
|
recover, Heap, HeapRecovery, HeapStats, ObjectRecovery, SlabAddress,
|
|
Update, WriteBatchStats,
|
|
};
|
|
use crate::id_allocator::{Allocator, DeferredFree};
|
|
use crate::leaf::Leaf;
|
|
use crate::metadata_store::MetadataStore;
|
|
use crate::object_cache::{CacheStats, Dirty, FlushStats, ObjectCache};
|
|
|
|
/// Opens a `Db` with a default configuration at the
|
|
/// specified path. This will create a new storage
|
|
/// directory at the specified path if it does
|
|
/// not already exist. You can use the `Db::was_recovered`
|
|
/// method to determine if your database was recovered
|
|
/// from a previous instance.
|
|
pub fn open<P: AsRef<std::path::Path>>(path: P) -> std::io::Result<Db> {
|
|
Config::new().path(path).open()
|
|
}
|
|
|
|
#[derive(Debug, Copy, Clone)]
|
|
pub struct Stats {
|
|
pub cache: CacheStats,
|
|
}
|
|
|
|
/// Compare and swap result.
|
|
///
|
|
/// It returns `Ok(Ok(()))` if operation finishes successfully and
|
|
/// - `Ok(Err(CompareAndSwapError(current, proposed)))` if operation failed
|
|
/// to setup a new value. `CompareAndSwapError` contains current and
|
|
/// proposed values.
|
|
/// - `Err(Error::Unsupported)` if the database is opened in read-only mode.
|
|
/// otherwise.
|
|
pub type CompareAndSwapResult = std::io::Result<
|
|
std::result::Result<CompareAndSwapSuccess, CompareAndSwapError>,
|
|
>;
|
|
|
|
type Index<const LEAF_FANOUT: usize> = concurrent_map::ConcurrentMap<
|
|
InlineArray,
|
|
Object<LEAF_FANOUT>,
|
|
INDEX_FANOUT,
|
|
EBR_LOCAL_GC_BUFFER_SIZE,
|
|
>;
|
|
|
|
/// Compare and swap error.
|
|
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
|
pub struct CompareAndSwapError {
|
|
/// The current value which caused your CAS to fail.
|
|
pub current: Option<InlineArray>,
|
|
/// Returned value that was proposed unsuccessfully.
|
|
pub proposed: Option<InlineArray>,
|
|
}
|
|
|
|
/// Compare and swap success.
|
|
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
|
pub struct CompareAndSwapSuccess {
|
|
/// The current value which was successfully installed.
|
|
pub new_value: Option<InlineArray>,
|
|
/// Returned value that was previously stored.
|
|
pub previous_value: Option<InlineArray>,
|
|
}
|
|
|
|
impl std::fmt::Display for CompareAndSwapError {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
write!(f, "Compare and swap conflict")
|
|
}
|
|
}
|
|
|
|
impl std::error::Error for CompareAndSwapError {}
|
|
|
|
#[derive(
|
|
Debug,
|
|
Clone,
|
|
Copy,
|
|
serde::Serialize,
|
|
serde::Deserialize,
|
|
PartialOrd,
|
|
Ord,
|
|
PartialEq,
|
|
Eq,
|
|
Hash,
|
|
)]
|
|
struct ObjectId(NonZeroU64);
|
|
|
|
impl ObjectId {
|
|
fn new(from: u64) -> Option<ObjectId> {
|
|
NonZeroU64::new(from).map(ObjectId)
|
|
}
|
|
}
|
|
|
|
impl std::ops::Deref for ObjectId {
|
|
type Target = u64;
|
|
|
|
fn deref(&self) -> &u64 {
|
|
let self_ref: &NonZeroU64 = &self.0;
|
|
|
|
// NonZeroU64 is repr(transparent) where it wraps a u64
|
|
// so it is guaranteed to match the binary layout. This
|
|
// makes it safe to cast a reference to one as a reference
|
|
// to the other like this.
|
|
let self_ptr: *const NonZeroU64 = self_ref as *const _;
|
|
let reference: *const u64 = self_ptr as *const u64;
|
|
|
|
unsafe { &*reference }
|
|
}
|
|
}
|
|
|
|
impl concurrent_map::Minimum for ObjectId {
|
|
const MIN: ObjectId = ObjectId(NonZeroU64::MIN);
|
|
}
|
|
|
|
#[derive(
|
|
Debug,
|
|
Clone,
|
|
Copy,
|
|
serde::Serialize,
|
|
serde::Deserialize,
|
|
PartialOrd,
|
|
Ord,
|
|
PartialEq,
|
|
Eq,
|
|
Hash,
|
|
)]
|
|
pub struct CollectionId(u64);
|
|
|
|
impl concurrent_map::Minimum for CollectionId {
|
|
const MIN: CollectionId = CollectionId(u64::MIN);
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct CacheBox<const LEAF_FANOUT: usize> {
|
|
leaf: Option<Box<Leaf<LEAF_FANOUT>>>,
|
|
#[allow(unused)]
|
|
logged_index: BTreeMap<InlineArray, LogValue>,
|
|
}
|
|
|
|
#[allow(unused)]
|
|
#[derive(Debug, Clone)]
|
|
struct LogValue {
|
|
location: SlabAddress,
|
|
value: Option<InlineArray>,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct Object<const LEAF_FANOUT: usize> {
|
|
object_id: ObjectId,
|
|
collection_id: CollectionId,
|
|
low_key: InlineArray,
|
|
inner: Arc<RwLock<CacheBox<LEAF_FANOUT>>>,
|
|
}
|
|
|
|
impl<const LEAF_FANOUT: usize> PartialEq for Object<LEAF_FANOUT> {
|
|
fn eq(&self, other: &Self) -> bool {
|
|
self.object_id == other.object_id
|
|
}
|
|
}
|
|
|
|
/// Stored on `Db` and `Tree` in an Arc, so that when the
|
|
/// last "high-level" struct is dropped, the flusher thread
|
|
/// is cleaned up.
|
|
struct ShutdownDropper<const LEAF_FANOUT: usize> {
|
|
shutdown_sender: parking_lot::Mutex<
|
|
std::sync::mpsc::Sender<std::sync::mpsc::Sender<()>>,
|
|
>,
|
|
cache: parking_lot::Mutex<object_cache::ObjectCache<LEAF_FANOUT>>,
|
|
}
|
|
|
|
impl<const LEAF_FANOUT: usize> Drop for ShutdownDropper<LEAF_FANOUT> {
|
|
fn drop(&mut self) {
|
|
let (tx, rx) = std::sync::mpsc::channel();
|
|
log::debug!("sending shutdown signal to flusher");
|
|
if self.shutdown_sender.lock().send(tx).is_ok() {
|
|
if let Err(e) = rx.recv() {
|
|
log::error!("failed to shut down flusher thread: {:?}", e);
|
|
} else {
|
|
log::debug!("flush thread successfully terminated");
|
|
}
|
|
} else {
|
|
log::debug!(
|
|
"failed to shut down flusher, manually flushing ObjectCache"
|
|
);
|
|
let cache = self.cache.lock();
|
|
if let Err(e) = cache.flush() {
|
|
log::error!(
|
|
"Db flusher encountered error while flushing: {:?}",
|
|
e
|
|
);
|
|
cache.set_error(&e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn map_bound<T, U, F: FnOnce(T) -> U>(bound: Bound<T>, f: F) -> Bound<U> {
|
|
match bound {
|
|
Bound::Unbounded => Bound::Unbounded,
|
|
Bound::Included(x) => Bound::Included(f(x)),
|
|
Bound::Excluded(x) => Bound::Excluded(f(x)),
|
|
}
|
|
}
|
|
|
|
const fn _assert_public_types_send_sync() {
|
|
use std::fmt::Debug;
|
|
|
|
const fn _assert_send<S: Send + Clone + Debug>() {}
|
|
|
|
const fn _assert_send_sync<S: Send + Sync + Clone + Debug>() {}
|
|
|
|
/*
|
|
_assert_send::<Subscriber>();
|
|
_assert_send_sync::<Event>();
|
|
_assert_send_sync::<Mode>();
|
|
_assert_send_sync::<Tree>();
|
|
*/
|
|
|
|
_assert_send::<Db>();
|
|
|
|
_assert_send_sync::<Batch>();
|
|
_assert_send_sync::<InlineArray>();
|
|
_assert_send_sync::<Config>();
|
|
_assert_send_sync::<CompareAndSwapSuccess>();
|
|
_assert_send_sync::<CompareAndSwapError>();
|
|
}
|