// 1.0 blockers // // bugs // * tree predecessor holds lock on successor and tries to get it for predecessor. This will // deadlock if used concurrently with write batches, which acquire locks lexicographically. // * add merges to iterator test and assert it deadlocks // * alternative is to merge right, not left // * page-out needs to be deferred until after any flush of the dirty epoch // * need to remove max_unflushed_epoch after flushing it // * can't send reliable page-out request backwards from 7->6 // * re-locking every mutex in a writebatch feels bad // * need to signal stability status forward // * maybe we already are // * can make dirty_flush_epoch atomic and CAS it to 0 after flush // * can change dirty_flush_epoch to unflushed_epoch // * can always set mutation_count to max dirty flush epoch // * this feels nice, we can lazily update a global stable flushed counter // * can get rid of dirty_flush_epoch and page_out_on_flush? // * or at least dirty_flush_epoch // * dirty_flush_epoch really means "hasn't yet been cooperatively serialized @ F.E." // * interesting metrics: // * whether dirty for some epoch // * whether cooperatively serialized for some epoch // * whether fully flushed for some epoch // * clean -> dirty -> {maybe coop} -> flushed // * for page-out, we only care if it's stable or if we need to add it to // a page-out priority queue // // reliability // TODO make all writes wrapped in a Tearable wrapper that splits writes // and can possibly crash based on a counter. // TODO test concurrent drop_tree when other threads are still using it // TODO list trees test for recovering empty collections // TODO set explicit max key and value sizes w/ corresponding heap // TODO add failpoints to writepath // // performance // TODO handle prefix encoding // TODO (minor) remove cache access for removed node in merge function // TODO index+log hybrid - tinylsm key -> object location // // features // TODO multi-collection batch // // misc // TODO skim inlining output of RUSTFLAGS="-Cremark=all -Cdebuginfo=1" // // ~~~~~~~~~~~~~~~~~~~~~~~~~~~ 1.0 cutoff ~~~~~~~~~~~~~~~~~~~~~~~~~~~ // // post-1.0 improvements // // reliability // TODO bug hiding: if the crash_iter test panics, the test doesn't fail as expected // TODO event log assertion for testing heap location bidirectional referential integrity, // particularly in the object location mapper. // TODO ensure nothing "from the future" gets copied into earlier epochs during GC // TODO collection_id on page_in checks - it needs to be pinned w/ heap's EBR? // TODO put aborts behind feature flags for hard crashes // TODO re-enable transaction tests in test_tree.rs // // performance // TODO force writers to flush when some number of dirty epochs have built up // TODO serialize flush batch in parallel // TODO concurrent serialization of NotYetSerialized dirty objects // TODO make the Arc`, //! but with several additional capabilities for //! assisting creators of stateful systems. //! //! It is fully thread-safe, and all operations are //! atomic. Multiple `Tree`s with isolated keyspaces //! are supported with the //! [`Db::open_tree`](struct.Db.html#method.open_tree) method. //! //! `sled` is built by experienced database engineers //! who think users should spend less time tuning and //! working against high-friction APIs. Expect //! significant ergonomic and performance improvements //! over time. Most surprises are bugs, so please //! [let us know](mailto:tylerneely@gmail.com?subject=sled%20sucks!!!) //! if something is high friction. //! //! # Examples //! //! ``` //! # let _ = std::fs::remove_dir_all("my_db"); //! let db: sled::Db = sled::open("my_db").unwrap(); //! //! // insert and get //! db.insert(b"yo!", b"v1"); //! assert_eq!(&db.get(b"yo!").unwrap().unwrap(), b"v1"); //! //! // Atomic compare-and-swap. //! db.compare_and_swap( //! b"yo!", // key //! Some(b"v1"), // old value, None for not present //! Some(b"v2"), // new value, None for delete //! ) //! .unwrap(); //! //! // Iterates over key-value pairs, starting at the given key. //! let scan_key: &[u8] = b"a non-present key before yo!"; //! let mut iter = db.range(scan_key..); //! assert_eq!(&iter.next().unwrap().unwrap().0, b"yo!"); //! assert!(iter.next().is_none()); //! //! db.remove(b"yo!"); //! assert!(db.get(b"yo!").unwrap().is_none()); //! //! let other_tree: sled::Tree = db.open_tree(b"cool db facts").unwrap(); //! other_tree.insert( //! b"k1", //! &b"a Db acts like a Tree due to implementing Deref"[..] //! ).unwrap(); //! # let _ = std::fs::remove_dir_all("my_db"); //! ``` mod config; mod db; mod flush_epoch; mod heap; mod id_allocator; mod leaf; mod metadata_store; mod object_cache; mod object_location_mapper; mod tree; #[cfg(any( feature = "testing_shred_allocator", feature = "testing_count_allocator" ))] pub mod alloc; #[cfg(feature = "for-internal-testing-only")] mod event_verifier; #[inline] fn debug_delay() { #[cfg(debug_assertions)] { let rand = std::time::SystemTime::UNIX_EPOCH.elapsed().unwrap().as_nanos(); if rand % 128 > 100 { for _ in 0..rand % 16 { std::thread::yield_now(); } } } } pub use crate::config::Config; pub use crate::db::Db; pub use crate::tree::{Batch, Iter, Tree}; pub use inline_array::InlineArray; const NAME_MAPPING_COLLECTION_ID: CollectionId = CollectionId(0); const DEFAULT_COLLECTION_ID: CollectionId = CollectionId(1); const INDEX_FANOUT: usize = 64; const EBR_LOCAL_GC_BUFFER_SIZE: usize = 128; use std::collections::BTreeMap; use std::num::NonZeroU64; use std::ops::Bound; use std::sync::Arc; use parking_lot::RwLock; use crate::flush_epoch::{ FlushEpoch, FlushEpochGuard, FlushEpochTracker, FlushInvariants, }; use crate::heap::{ recover, Heap, HeapRecovery, HeapStats, ObjectRecovery, SlabAddress, Update, WriteBatchStats, }; use crate::id_allocator::{Allocator, DeferredFree}; use crate::leaf::Leaf; use crate::metadata_store::MetadataStore; use crate::object_cache::{CacheStats, Dirty, FlushStats, ObjectCache}; /// Opens a `Db` with a default configuration at the /// specified path. This will create a new storage /// directory at the specified path if it does /// not already exist. You can use the `Db::was_recovered` /// method to determine if your database was recovered /// from a previous instance. pub fn open>(path: P) -> std::io::Result { Config::new().path(path).open() } #[derive(Debug, Copy, Clone)] pub struct Stats { pub cache: CacheStats, } /// Compare and swap result. /// /// It returns `Ok(Ok(()))` if operation finishes successfully and /// - `Ok(Err(CompareAndSwapError(current, proposed)))` if operation failed /// to setup a new value. `CompareAndSwapError` contains current and /// proposed values. /// - `Err(Error::Unsupported)` if the database is opened in read-only mode. /// otherwise. pub type CompareAndSwapResult = std::io::Result< std::result::Result, >; type Index = concurrent_map::ConcurrentMap< InlineArray, Object, INDEX_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE, >; /// Compare and swap error. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct CompareAndSwapError { /// The current value which caused your CAS to fail. pub current: Option, /// Returned value that was proposed unsuccessfully. pub proposed: Option, } /// Compare and swap success. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct CompareAndSwapSuccess { /// The current value which was successfully installed. pub new_value: Option, /// Returned value that was previously stored. pub previous_value: Option, } impl std::fmt::Display for CompareAndSwapError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Compare and swap conflict") } } impl std::error::Error for CompareAndSwapError {} #[derive( Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash, )] struct ObjectId(NonZeroU64); impl ObjectId { fn new(from: u64) -> Option { NonZeroU64::new(from).map(ObjectId) } } impl std::ops::Deref for ObjectId { type Target = u64; fn deref(&self) -> &u64 { let self_ref: &NonZeroU64 = &self.0; // NonZeroU64 is repr(transparent) where it wraps a u64 // so it is guaranteed to match the binary layout. This // makes it safe to cast a reference to one as a reference // to the other like this. let self_ptr: *const NonZeroU64 = self_ref as *const _; let reference: *const u64 = self_ptr as *const u64; unsafe { &*reference } } } impl concurrent_map::Minimum for ObjectId { const MIN: ObjectId = ObjectId(NonZeroU64::MIN); } #[derive( Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash, )] pub struct CollectionId(u64); impl concurrent_map::Minimum for CollectionId { const MIN: CollectionId = CollectionId(u64::MIN); } #[derive(Debug, Clone)] struct CacheBox { leaf: Option>>, #[allow(unused)] logged_index: BTreeMap, } #[allow(unused)] #[derive(Debug, Clone)] struct LogValue { location: SlabAddress, value: Option, } #[derive(Debug, Clone)] struct Object { object_id: ObjectId, collection_id: CollectionId, low_key: InlineArray, inner: Arc>>, } impl PartialEq for Object { fn eq(&self, other: &Self) -> bool { self.object_id == other.object_id } } /// Stored on `Db` and `Tree` in an Arc, so that when the /// last "high-level" struct is dropped, the flusher thread /// is cleaned up. struct ShutdownDropper { shutdown_sender: parking_lot::Mutex< std::sync::mpsc::Sender>, >, cache: parking_lot::Mutex>, } impl Drop for ShutdownDropper { fn drop(&mut self) { let (tx, rx) = std::sync::mpsc::channel(); log::debug!("sending shutdown signal to flusher"); if self.shutdown_sender.lock().send(tx).is_ok() { if let Err(e) = rx.recv() { log::error!("failed to shut down flusher thread: {:?}", e); } else { log::debug!("flush thread successfully terminated"); } } else { log::debug!( "failed to shut down flusher, manually flushing ObjectCache" ); let cache = self.cache.lock(); if let Err(e) = cache.flush() { log::error!( "Db flusher encountered error while flushing: {:?}", e ); cache.set_error(&e); } } } } fn map_bound U>(bound: Bound, f: F) -> Bound { match bound { Bound::Unbounded => Bound::Unbounded, Bound::Included(x) => Bound::Included(f(x)), Bound::Excluded(x) => Bound::Excluded(f(x)), } } const fn _assert_public_types_send_sync() { use std::fmt::Debug; const fn _assert_send() {} const fn _assert_send_sync() {} /* _assert_send::(); _assert_send_sync::(); _assert_send_sync::(); _assert_send_sync::(); */ _assert_send::(); _assert_send_sync::(); _assert_send_sync::(); _assert_send_sync::(); _assert_send_sync::(); _assert_send_sync::(); }