mirror of https://github.com/spacejam/sled
Remove INDEX_FANOUT and EBR const generics from Db. Fix flush safety bug
This commit is contained in:
parent
6cdea88f6c
commit
65a8a1a5ec
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "sled"
|
name = "sled"
|
||||||
version = "1.0.0-alpha.114"
|
version = "1.0.0-alpha.115"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
authors = ["Tyler Neely <tylerneely@gmail.com>"]
|
authors = ["Tyler Neely <tylerneely@gmail.com>"]
|
||||||
documentation = "https://docs.rs/sled/"
|
documentation = "https://docs.rs/sled/"
|
||||||
|
|
|
@ -72,21 +72,9 @@ impl Config {
|
||||||
(zstd_compression_level, i32, "The zstd compression level to use when writing data to disk. Defaults to 3.")
|
(zstd_compression_level, i32, "The zstd compression level to use when writing data to disk. Defaults to 3.")
|
||||||
);
|
);
|
||||||
|
|
||||||
pub fn open<
|
pub fn open<const LEAF_FANOUT: usize>(
|
||||||
const INDEX_FANOUT: usize,
|
|
||||||
const LEAF_FANOUT: usize,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize,
|
|
||||||
>(
|
|
||||||
&self,
|
&self,
|
||||||
) -> io::Result<Db<INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>>
|
) -> io::Result<Db<LEAF_FANOUT>> {
|
||||||
{
|
|
||||||
if INDEX_FANOUT < 4 {
|
|
||||||
return Err(annotate!(io::Error::new(
|
|
||||||
io::ErrorKind::Unsupported,
|
|
||||||
"Db's INDEX_FANOUT const generic must be 4 or greater."
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
if LEAF_FANOUT < 3 {
|
if LEAF_FANOUT < 3 {
|
||||||
return Err(annotate!(io::Error::new(
|
return Err(annotate!(io::Error::new(
|
||||||
io::ErrorKind::Unsupported,
|
io::ErrorKind::Unsupported,
|
||||||
|
|
312
src/db.rs
312
src/db.rs
|
@ -22,13 +22,12 @@ use stack_map::StackMap;
|
||||||
|
|
||||||
use crate::*;
|
use crate::*;
|
||||||
|
|
||||||
|
const INDEX_FANOUT: usize = 64;
|
||||||
|
const EBR_LOCAL_GC_BUFFER_SIZE: usize = 128;
|
||||||
|
|
||||||
/// sled 1.0
|
/// sled 1.0
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Db<
|
pub struct Db<const LEAF_FANOUT: usize = 1024> {
|
||||||
const INDEX_FANOUT: usize = 64,
|
|
||||||
const LEAF_FANOUT: usize = 1024,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize = 128,
|
|
||||||
> {
|
|
||||||
global_error: Arc<AtomicPtr<(io::ErrorKind, String)>>,
|
global_error: Arc<AtomicPtr<(io::ErrorKind, String)>>,
|
||||||
config: Config,
|
config: Config,
|
||||||
high_level_rc: Arc<()>,
|
high_level_rc: Arc<()>,
|
||||||
|
@ -52,20 +51,12 @@ pub struct Db<
|
||||||
event_verifier: Arc<crate::event_verifier::EventVerifier>,
|
event_verifier: Arc<crate::event_verifier::EventVerifier>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<const LEAF_FANOUT: usize> fmt::Debug for Db<LEAF_FANOUT> {
|
||||||
const INDEX_FANOUT: usize,
|
|
||||||
const LEAF_FANOUT: usize,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize,
|
|
||||||
> fmt::Debug for Db<INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>
|
|
||||||
{
|
|
||||||
fn fmt(&self, w: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, w: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
w.debug_struct(&format!(
|
w.debug_struct(&format!("Db<{}>", LEAF_FANOUT,))
|
||||||
"Db<{}, {}, {}>",
|
.field("global_error", &self.check_error())
|
||||||
INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE
|
.field("data", &format!("{:?}", self.iter().collect::<Vec<_>>()))
|
||||||
))
|
.finish()
|
||||||
.field("global_error", &self.check_error())
|
|
||||||
.field("data", &format!("{:?}", self.iter().collect::<Vec<_>>()))
|
|
||||||
.finish()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,15 +79,11 @@ struct Leaf<const LEAF_FANOUT: usize> {
|
||||||
in_memory_size: usize,
|
in_memory_size: usize,
|
||||||
mutation_count: u64,
|
mutation_count: u64,
|
||||||
#[serde(skip)]
|
#[serde(skip)]
|
||||||
page_out_on_flush: bool,
|
page_out_on_flush: Option<NonZeroU64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn flusher<
|
fn flusher<const LEAF_FANOUT: usize>(
|
||||||
const INDEX_FANOUT: usize,
|
db: Db<LEAF_FANOUT>,
|
||||||
const LEAF_FANOUT: usize,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize,
|
|
||||||
>(
|
|
||||||
db: Db<INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>,
|
|
||||||
shutdown_signal: mpsc::Receiver<mpsc::Sender<()>>,
|
shutdown_signal: mpsc::Receiver<mpsc::Sender<()>>,
|
||||||
flush_every_ms: usize,
|
flush_every_ms: usize,
|
||||||
) {
|
) {
|
||||||
|
@ -216,7 +203,7 @@ impl<const LEAF_FANOUT: usize> Leaf<LEAF_FANOUT> {
|
||||||
in_memory_size: 0,
|
in_memory_size: 0,
|
||||||
data,
|
data,
|
||||||
mutation_count: 0,
|
mutation_count: 0,
|
||||||
page_out_on_flush: false,
|
page_out_on_flush: None,
|
||||||
};
|
};
|
||||||
rhs.set_in_memory_size();
|
rhs.set_in_memory_size();
|
||||||
|
|
||||||
|
@ -236,28 +223,16 @@ impl<const LEAF_FANOUT: usize> Leaf<LEAF_FANOUT> {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[must_use]
|
#[must_use]
|
||||||
struct LeafReadGuard<
|
struct LeafReadGuard<'a, const LEAF_FANOUT: usize = 1024> {
|
||||||
'a,
|
|
||||||
const INDEX_FANOUT: usize = 64,
|
|
||||||
const LEAF_FANOUT: usize = 1024,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize = 128,
|
|
||||||
> {
|
|
||||||
leaf_read: ManuallyDrop<
|
leaf_read: ManuallyDrop<
|
||||||
ArcRwLockReadGuard<RawRwLock, Option<Box<Leaf<LEAF_FANOUT>>>>,
|
ArcRwLockReadGuard<RawRwLock, Option<Box<Leaf<LEAF_FANOUT>>>>,
|
||||||
>,
|
>,
|
||||||
low_key: InlineArray,
|
low_key: InlineArray,
|
||||||
inner: &'a Db<INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>,
|
inner: &'a Db<LEAF_FANOUT>,
|
||||||
node_id: NodeId,
|
node_id: NodeId,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<'a, const LEAF_FANOUT: usize> Drop for LeafReadGuard<'a, LEAF_FANOUT> {
|
||||||
'a,
|
|
||||||
const INDEX_FANOUT: usize,
|
|
||||||
const LEAF_FANOUT: usize,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize,
|
|
||||||
> Drop
|
|
||||||
for LeafReadGuard<'a, INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>
|
|
||||||
{
|
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
let size = self.leaf_read.as_ref().unwrap().in_memory_size;
|
let size = self.leaf_read.as_ref().unwrap().in_memory_size;
|
||||||
// we must drop our mutex before calling mark_access_and_evict
|
// we must drop our mutex before calling mark_access_and_evict
|
||||||
|
@ -276,41 +251,23 @@ impl<
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct LeafWriteGuard<
|
struct LeafWriteGuard<'a, const LEAF_FANOUT: usize = 1024> {
|
||||||
'a,
|
|
||||||
const INDEX_FANOUT: usize = 64,
|
|
||||||
const LEAF_FANOUT: usize = 1024,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize = 128,
|
|
||||||
> {
|
|
||||||
leaf_write: ManuallyDrop<
|
leaf_write: ManuallyDrop<
|
||||||
ArcRwLockWriteGuard<RawRwLock, Option<Box<Leaf<LEAF_FANOUT>>>>,
|
ArcRwLockWriteGuard<RawRwLock, Option<Box<Leaf<LEAF_FANOUT>>>>,
|
||||||
>,
|
>,
|
||||||
flush_epoch_guard: FlushEpochGuard<'a>,
|
flush_epoch_guard: FlushEpochGuard<'a>,
|
||||||
low_key: InlineArray,
|
low_key: InlineArray,
|
||||||
inner: &'a Db<INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>,
|
inner: &'a Db<LEAF_FANOUT>,
|
||||||
node_id: NodeId,
|
node_id: NodeId,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<'a, const LEAF_FANOUT: usize> LeafWriteGuard<'a, LEAF_FANOUT> {
|
||||||
'a,
|
|
||||||
const INDEX_FANOUT: usize,
|
|
||||||
const LEAF_FANOUT: usize,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize,
|
|
||||||
> LeafWriteGuard<'a, INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>
|
|
||||||
{
|
|
||||||
fn epoch(&self) -> NonZeroU64 {
|
fn epoch(&self) -> NonZeroU64 {
|
||||||
self.flush_epoch_guard.epoch()
|
self.flush_epoch_guard.epoch()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<'a, const LEAF_FANOUT: usize> Drop for LeafWriteGuard<'a, LEAF_FANOUT> {
|
||||||
'a,
|
|
||||||
const INDEX_FANOUT: usize,
|
|
||||||
const LEAF_FANOUT: usize,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize,
|
|
||||||
> Drop
|
|
||||||
for LeafWriteGuard<'a, INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>
|
|
||||||
{
|
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
let size = self.leaf_write.as_ref().unwrap().in_memory_size;
|
let size = self.leaf_write.as_ref().unwrap().in_memory_size;
|
||||||
|
|
||||||
|
@ -325,12 +282,7 @@ impl<
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<const LEAF_FANOUT: usize> Drop for Db<LEAF_FANOUT> {
|
||||||
const INDEX_FANOUT: usize,
|
|
||||||
const LEAF_FANOUT: usize,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize,
|
|
||||||
> Drop for Db<INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>
|
|
||||||
{
|
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if Arc::strong_count(&self.high_level_rc) == 2 {
|
if Arc::strong_count(&self.high_level_rc) == 2 {
|
||||||
if let Some(shutdown_sender) = self.shutdown_sender.take() {
|
if let Some(shutdown_sender) = self.shutdown_sender.take() {
|
||||||
|
@ -371,12 +323,7 @@ fn map_bound<T, U, F: FnOnce(T) -> U>(bound: Bound<T>, f: F) -> Bound<U> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<const LEAF_FANOUT: usize> Db<LEAF_FANOUT> {
|
||||||
const INDEX_FANOUT: usize,
|
|
||||||
const LEAF_FANOUT: usize,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize,
|
|
||||||
> Db<INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>
|
|
||||||
{
|
|
||||||
// This is only pub for an extra assertion during testing.
|
// This is only pub for an extra assertion during testing.
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub fn check_error(&self) -> io::Result<()> {
|
pub fn check_error(&self) -> io::Result<()> {
|
||||||
|
@ -439,9 +386,7 @@ impl<
|
||||||
fn leaf_for_key<'a>(
|
fn leaf_for_key<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
key: &[u8],
|
key: &[u8],
|
||||||
) -> io::Result<
|
) -> io::Result<LeafReadGuard<'a, LEAF_FANOUT>> {
|
||||||
LeafReadGuard<'a, INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>,
|
|
||||||
> {
|
|
||||||
loop {
|
loop {
|
||||||
let (low_key, node) = self.index.get_lte(key).unwrap();
|
let (low_key, node) = self.index.get_lte(key).unwrap();
|
||||||
let mut read = node.inner.read_arc();
|
let mut read = node.inner.read_arc();
|
||||||
|
@ -494,10 +439,7 @@ impl<
|
||||||
self.was_recovered
|
self.was_recovered
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn open_with_config(
|
pub fn open_with_config(config: &Config) -> io::Result<Db<LEAF_FANOUT>> {
|
||||||
config: &Config,
|
|
||||||
) -> io::Result<Db<INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>>
|
|
||||||
{
|
|
||||||
let (store, index_data) = heap::recover(&config.path)?;
|
let (store, index_data) = heap::recover(&config.path)?;
|
||||||
|
|
||||||
let first_id_opt = if index_data.is_empty() {
|
let first_id_opt = if index_data.is_empty() {
|
||||||
|
@ -555,7 +497,16 @@ impl<
|
||||||
let db = ret.clone();
|
let db = ret.clone();
|
||||||
let (tx, rx) = mpsc::channel();
|
let (tx, rx) = mpsc::channel();
|
||||||
ret.shutdown_sender = Some(tx);
|
ret.shutdown_sender = Some(tx);
|
||||||
std::thread::spawn(move || flusher(db, rx, flush_every_ms));
|
let spawn_res = std::thread::Builder::new()
|
||||||
|
.name("sled_flusher".into())
|
||||||
|
.spawn(move || flusher(db, rx, flush_every_ms));
|
||||||
|
|
||||||
|
if let Err(e) = spawn_res {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
format!("unable to spawn flusher thread for sled database: {:?}", e)
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
@ -605,9 +556,7 @@ impl<
|
||||||
fn leaf_for_key_mut<'a>(
|
fn leaf_for_key_mut<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
key: &[u8],
|
key: &[u8],
|
||||||
) -> io::Result<
|
) -> io::Result<LeafWriteGuard<'a, LEAF_FANOUT>> {
|
||||||
LeafWriteGuard<'a, INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>,
|
|
||||||
> {
|
|
||||||
let (low_key, mut write, node_id) = self.page_in(key)?;
|
let (low_key, mut write, node_id) = self.page_in(key)?;
|
||||||
|
|
||||||
let flush_epoch_guard = self.flush_epoch.check_in();
|
let flush_epoch_guard = self.flush_epoch.check_in();
|
||||||
|
@ -626,10 +575,13 @@ impl<
|
||||||
|
|
||||||
if let Some(old_flush_epoch) = leaf.dirty_flush_epoch {
|
if let Some(old_flush_epoch) = leaf.dirty_flush_epoch {
|
||||||
if old_flush_epoch != flush_epoch_guard.epoch() {
|
if old_flush_epoch != flush_epoch_guard.epoch() {
|
||||||
|
/*
|
||||||
|
* TODO flush responsibility
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
old_flush_epoch.get() + 1,
|
old_flush_epoch.get() + 1,
|
||||||
flush_epoch_guard.epoch().get()
|
flush_epoch_guard.epoch().get()
|
||||||
);
|
);
|
||||||
|
*/
|
||||||
|
|
||||||
log::trace!(
|
log::trace!(
|
||||||
"cooperatively flushing {:?} with dirty epoch {} after checking into epoch {}",
|
"cooperatively flushing {:?} with dirty epoch {} after checking into epoch {}",
|
||||||
|
@ -689,10 +641,10 @@ impl<
|
||||||
}
|
}
|
||||||
let leaf: &mut Leaf<LEAF_FANOUT> = write.as_mut().unwrap();
|
let leaf: &mut Leaf<LEAF_FANOUT> = write.as_mut().unwrap();
|
||||||
|
|
||||||
if leaf.dirty_flush_epoch.is_some() {
|
if let Some(dirty_epoch) = leaf.dirty_flush_epoch {
|
||||||
// We can't page out this leaf until it has been
|
// We can't page out this leaf until it has been
|
||||||
// flushed, because its changes are not yet durable.
|
// flushed, because its changes are not yet durable.
|
||||||
leaf.page_out_on_flush = true;
|
leaf.page_out_on_flush = Some(dirty_epoch);
|
||||||
} else {
|
} else {
|
||||||
*write = None;
|
*write = None;
|
||||||
}
|
}
|
||||||
|
@ -708,7 +660,7 @@ impl<
|
||||||
/// ```
|
/// ```
|
||||||
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
/// # let config = sled::Config::tmp().unwrap();
|
/// # let config = sled::Config::tmp().unwrap();
|
||||||
/// # let db: sled::Db<64, 1024, 128> = config.open()?;
|
/// # let db: sled::Db<1024> = config.open()?;
|
||||||
/// db.insert(&[0], vec![0])?;
|
/// db.insert(&[0], vec![0])?;
|
||||||
/// assert_eq!(db.get(&[0]).unwrap(), Some(sled::InlineArray::from(vec![0])));
|
/// assert_eq!(db.get(&[0]).unwrap(), Some(sled::InlineArray::from(vec![0])));
|
||||||
/// assert!(db.get(&[1]).unwrap().is_none());
|
/// assert!(db.get(&[1]).unwrap().is_none());
|
||||||
|
@ -741,7 +693,7 @@ impl<
|
||||||
/// ```
|
/// ```
|
||||||
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
/// # let config = sled::Config::tmp().unwrap();
|
/// # let config = sled::Config::tmp().unwrap();
|
||||||
/// # let db: sled::Db<64, 1024, 128> = config.open()?;
|
/// # let db: sled::Db<1024> = config.open()?;
|
||||||
/// assert_eq!(db.insert(&[1, 2, 3], vec![0]).unwrap(), None);
|
/// assert_eq!(db.insert(&[1, 2, 3], vec![0]).unwrap(), None);
|
||||||
/// assert_eq!(db.insert(&[1, 2, 3], vec![1]).unwrap(), Some(sled::InlineArray::from(&[0])));
|
/// assert_eq!(db.insert(&[1, 2, 3], vec![1]).unwrap(), Some(sled::InlineArray::from(&[0])));
|
||||||
/// # Ok(()) }
|
/// # Ok(()) }
|
||||||
|
@ -815,7 +767,7 @@ impl<
|
||||||
/// ```
|
/// ```
|
||||||
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
/// # let config = sled::Config::tmp().unwrap();
|
/// # let config = sled::Config::tmp().unwrap();
|
||||||
/// # let db: sled::Db<64, 1024, 128> = config.open()?;
|
/// # let db: sled::Db<1024> = config.open()?;
|
||||||
/// db.insert(&[1], vec![1]);
|
/// db.insert(&[1], vec![1]);
|
||||||
/// assert_eq!(db.remove(&[1]).unwrap(), Some(sled::InlineArray::from(vec![1])));
|
/// assert_eq!(db.remove(&[1]).unwrap(), Some(sled::InlineArray::from(vec![1])));
|
||||||
/// assert!(db.remove(&[1]).unwrap().is_none());
|
/// assert!(db.remove(&[1]).unwrap().is_none());
|
||||||
|
@ -874,31 +826,34 @@ impl<
|
||||||
|
|
||||||
let (
|
let (
|
||||||
previous_flush_complete_notifier,
|
previous_flush_complete_notifier,
|
||||||
previous_vacant_notifier,
|
this_vacant_notifier,
|
||||||
forward_flush_notifier,
|
forward_flush_notifier,
|
||||||
) = self.flush_epoch.roll_epoch_forward();
|
) = self.flush_epoch.roll_epoch_forward();
|
||||||
|
|
||||||
previous_flush_complete_notifier.wait_for_complete();
|
previous_flush_complete_notifier.wait_for_complete();
|
||||||
let flush_through_epoch: NonZeroU64 =
|
let flush_through_epoch: NonZeroU64 =
|
||||||
previous_vacant_notifier.wait_for_complete();
|
this_vacant_notifier.wait_for_complete();
|
||||||
|
|
||||||
let flush_boundary = (
|
let flush_boundary = (
|
||||||
NonZeroU64::new(flush_through_epoch.get() + 1).unwrap(),
|
NonZeroU64::new(flush_through_epoch.get() + 1).unwrap(),
|
||||||
InlineArray::default(),
|
InlineArray::default(),
|
||||||
);
|
);
|
||||||
for ((epoch, low_key), _) in self.dirty.range(..flush_boundary) {
|
|
||||||
|
let mut evict_after_flush = vec![];
|
||||||
|
|
||||||
|
for ((dirty_epoch, low_key), _) in self.dirty.range(..flush_boundary) {
|
||||||
if let Some(node) = self.index.get(&*low_key) {
|
if let Some(node) = self.index.get(&*low_key) {
|
||||||
let mut lock = node.inner.write();
|
let mut lock = node.inner.write();
|
||||||
|
|
||||||
let serialized_value_opt = self
|
let serialized_value_opt = self
|
||||||
.dirty
|
.dirty
|
||||||
.remove(&(epoch, low_key.clone()))
|
.remove(&(dirty_epoch, low_key.clone()))
|
||||||
.expect("violation of flush responsibility");
|
.expect("violation of flush responsibility");
|
||||||
|
|
||||||
if epoch != flush_through_epoch {
|
if dirty_epoch != flush_through_epoch {
|
||||||
log::warn!(
|
log::warn!(
|
||||||
"encountered unexpected flush epoch {} for object {} while flushing epoch {}",
|
"encountered unexpected flush epoch {} for object {} while flushing epoch {}",
|
||||||
epoch,
|
dirty_epoch,
|
||||||
node.id.0,
|
node.id.0,
|
||||||
flush_through_epoch,
|
flush_through_epoch,
|
||||||
);
|
);
|
||||||
|
@ -908,11 +863,11 @@ impl<
|
||||||
lock.as_ref().unwrap().mutation_count;
|
lock.as_ref().unwrap().mutation_count;
|
||||||
self.event_verifier.mark_unexpected_flush_epoch(
|
self.event_verifier.mark_unexpected_flush_epoch(
|
||||||
node.id,
|
node.id,
|
||||||
epoch,
|
dirty_epoch,
|
||||||
mutation_count,
|
mutation_count,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
assert_eq!(epoch, flush_through_epoch);
|
// assert_eq!(dirty_epoch, flush_through_epoch);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "for-internal-testing-only")]
|
#[cfg(feature = "for-internal-testing-only")]
|
||||||
|
@ -920,41 +875,31 @@ impl<
|
||||||
let mutation_count = lock.as_ref().unwrap().mutation_count;
|
let mutation_count = lock.as_ref().unwrap().mutation_count;
|
||||||
self.event_verifier.mark_flush(
|
self.event_verifier.mark_flush(
|
||||||
node.id,
|
node.id,
|
||||||
epoch,
|
dirty_epoch,
|
||||||
mutation_count,
|
mutation_count,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert_eq!(
|
let leaf_bytes: Vec<u8> = if let Some(mut serialized_value) =
|
||||||
epoch,
|
serialized_value_opt
|
||||||
flush_through_epoch,
|
{
|
||||||
"{:?} is dirty for old epoch {}",
|
Arc::make_mut(&mut serialized_value.1);
|
||||||
node.id,
|
Arc::into_inner(serialized_value.1).unwrap()
|
||||||
epoch.get()
|
} else {
|
||||||
);
|
let leaf_ref: &mut Leaf<LEAF_FANOUT> =
|
||||||
|
lock.as_mut().unwrap();
|
||||||
|
// ugly but basically free
|
||||||
|
let bytes =
|
||||||
|
leaf_ref.serialize(self.config.zstd_compression_level);
|
||||||
|
|
||||||
let leaf_bytes: Vec<u8> =
|
if leaf_ref.page_out_on_flush == Some(flush_through_epoch) {
|
||||||
if let Some(mut serialized_value) = serialized_value_opt {
|
// page_out_on_flush is set to false
|
||||||
Arc::make_mut(&mut serialized_value.1);
|
// on page-in due to serde(skip)
|
||||||
Arc::into_inner(serialized_value.1).unwrap()
|
evict_after_flush.push(node.clone());
|
||||||
} else {
|
}
|
||||||
let leaf_ref: &mut Leaf<LEAF_FANOUT> =
|
|
||||||
lock.as_mut().unwrap();
|
|
||||||
let dirty_epoch =
|
|
||||||
leaf_ref.dirty_flush_epoch.take().unwrap();
|
|
||||||
assert_eq!(epoch, dirty_epoch);
|
|
||||||
// ugly but basically free
|
|
||||||
let bytes = leaf_ref
|
|
||||||
.serialize(self.config.zstd_compression_level);
|
|
||||||
|
|
||||||
if leaf_ref.page_out_on_flush {
|
bytes
|
||||||
// page_out_on_flush is set to false
|
};
|
||||||
// on page-in due to serde(skip)
|
|
||||||
*lock = None;
|
|
||||||
}
|
|
||||||
|
|
||||||
bytes
|
|
||||||
};
|
|
||||||
|
|
||||||
drop(lock);
|
drop(lock);
|
||||||
// println!("node id {} is dirty", node.id.0);
|
// println!("node id {} is dirty", node.id.0);
|
||||||
|
@ -981,6 +926,16 @@ impl<
|
||||||
flush_through_epoch.get()
|
flush_through_epoch.get()
|
||||||
);
|
);
|
||||||
forward_flush_notifier.mark_complete();
|
forward_flush_notifier.mark_complete();
|
||||||
|
|
||||||
|
for node_to_evict in evict_after_flush {
|
||||||
|
let mut lock = node_to_evict.inner.write();
|
||||||
|
if let Some(ref mut leaf) = *lock {
|
||||||
|
if leaf.page_out_on_flush == Some(flush_through_epoch) {
|
||||||
|
*lock = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
self.store.maintenance()?;
|
self.store.maintenance()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -1008,7 +963,7 @@ impl<
|
||||||
/// ```
|
/// ```
|
||||||
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
/// # let config = sled::Config::tmp().unwrap();
|
/// # let config = sled::Config::tmp().unwrap();
|
||||||
/// # let db: sled::Db<64, 1024, 128> = config.open()?;
|
/// # let db: sled::Db<1024> = config.open()?;
|
||||||
/// // unique creation
|
/// // unique creation
|
||||||
/// assert!(
|
/// assert!(
|
||||||
/// db.compare_and_swap(&[1], None as Option<&[u8]>, Some(&[10])).unwrap().is_ok(),
|
/// db.compare_and_swap(&[1], None as Option<&[u8]>, Some(&[10])).unwrap().is_ok(),
|
||||||
|
@ -1131,7 +1086,7 @@ impl<
|
||||||
/// use sled::{Config, InlineArray};
|
/// use sled::{Config, InlineArray};
|
||||||
///
|
///
|
||||||
/// let config = Config::tmp().unwrap();
|
/// let config = Config::tmp().unwrap();
|
||||||
/// let db: sled::Db<64, 1024, 128> = config.open()?;
|
/// let db: sled::Db<1024> = config.open()?;
|
||||||
///
|
///
|
||||||
/// fn u64_to_ivec(number: u64) -> InlineArray {
|
/// fn u64_to_ivec(number: u64) -> InlineArray {
|
||||||
/// InlineArray::from(number.to_be_bytes().to_vec())
|
/// InlineArray::from(number.to_be_bytes().to_vec())
|
||||||
|
@ -1204,7 +1159,7 @@ impl<
|
||||||
/// use sled::{Config, InlineArray};
|
/// use sled::{Config, InlineArray};
|
||||||
///
|
///
|
||||||
/// let config = Config::tmp().unwrap();
|
/// let config = Config::tmp().unwrap();
|
||||||
/// let db: sled::Db<64, 1024, 128> = config.open()?;
|
/// let db: sled::Db<1024> = config.open()?;
|
||||||
///
|
///
|
||||||
/// fn u64_to_ivec(number: u64) -> InlineArray {
|
/// fn u64_to_ivec(number: u64) -> InlineArray {
|
||||||
/// InlineArray::from(number.to_be_bytes().to_vec())
|
/// InlineArray::from(number.to_be_bytes().to_vec())
|
||||||
|
@ -1258,9 +1213,7 @@ impl<
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn iter(
|
pub fn iter(&self) -> Iter<LEAF_FANOUT> {
|
||||||
&self,
|
|
||||||
) -> Iter<INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE> {
|
|
||||||
Iter {
|
Iter {
|
||||||
prefetched: VecDeque::new(),
|
prefetched: VecDeque::new(),
|
||||||
prefetched_back: VecDeque::new(),
|
prefetched_back: VecDeque::new(),
|
||||||
|
@ -1273,10 +1226,7 @@ impl<
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn range<K, R>(
|
pub fn range<K, R>(&self, range: R) -> Iter<LEAF_FANOUT>
|
||||||
&self,
|
|
||||||
range: R,
|
|
||||||
) -> Iter<INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>
|
|
||||||
where
|
where
|
||||||
K: AsRef<[u8]>,
|
K: AsRef<[u8]>,
|
||||||
R: RangeBounds<K>,
|
R: RangeBounds<K>,
|
||||||
|
@ -1317,7 +1267,7 @@ impl<
|
||||||
/// ```
|
/// ```
|
||||||
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
/// # let _ = std::fs::remove_dir_all("batch_doctest");
|
/// # let _ = std::fs::remove_dir_all("batch_doctest");
|
||||||
/// # let db: sled::Db<64, 1024, 128> = sled::open("batch_doctest")?;
|
/// # let db: sled::Db<1024> = sled::open("batch_doctest")?;
|
||||||
/// db.insert("key_0", "val_0")?;
|
/// db.insert("key_0", "val_0")?;
|
||||||
///
|
///
|
||||||
/// let mut batch = sled::Batch::default();
|
/// let mut batch = sled::Batch::default();
|
||||||
|
@ -1408,9 +1358,13 @@ impl<
|
||||||
node_id.0,
|
node_id.0,
|
||||||
dirty_epoch
|
dirty_epoch
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TODO
|
||||||
assert!(self
|
assert!(self
|
||||||
.dirty
|
.dirty
|
||||||
.contains_key(&(dirty_epoch, leaf.lo.clone())));
|
.contains_key(&(dirty_epoch, leaf.lo.clone())));
|
||||||
|
*/
|
||||||
|
|
||||||
self.dirty.insert(
|
self.dirty.insert(
|
||||||
(dirty_epoch, leaf.lo.clone()),
|
(dirty_epoch, leaf.lo.clone()),
|
||||||
|
@ -1490,7 +1444,7 @@ impl<
|
||||||
/// ```
|
/// ```
|
||||||
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
/// # let config = sled::Config::tmp().unwrap();
|
/// # let config = sled::Config::tmp().unwrap();
|
||||||
/// # let db: sled::Db<64, 1024, 128> = config.open()?;
|
/// # let db: sled::Db<1024> = config.open()?;
|
||||||
/// db.insert(&[0], vec![0])?;
|
/// db.insert(&[0], vec![0])?;
|
||||||
/// assert!(db.contains_key(&[0])?);
|
/// assert!(db.contains_key(&[0])?);
|
||||||
/// assert!(!db.contains_key(&[1])?);
|
/// assert!(!db.contains_key(&[1])?);
|
||||||
|
@ -1516,7 +1470,7 @@ impl<
|
||||||
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
/// use sled::InlineArray;
|
/// use sled::InlineArray;
|
||||||
/// # let config = sled::Config::tmp().unwrap();
|
/// # let config = sled::Config::tmp().unwrap();
|
||||||
/// # let db: sled::Db<64, 1024, 128> = config.open()?;
|
/// # let db: sled::Db<1024> = config.open()?;
|
||||||
/// for i in 0..10 {
|
/// for i in 0..10 {
|
||||||
/// db.insert(&[i], vec![i])
|
/// db.insert(&[i], vec![i])
|
||||||
/// .expect("should write successfully");
|
/// .expect("should write successfully");
|
||||||
|
@ -1568,7 +1522,7 @@ impl<
|
||||||
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
/// use sled::InlineArray;
|
/// use sled::InlineArray;
|
||||||
/// # let config = sled::Config::tmp().unwrap();
|
/// # let config = sled::Config::tmp().unwrap();
|
||||||
/// # let db: sled::Db<64, 1024, 128> = config.open()?;
|
/// # let db: sled::Db<1024> = config.open()?;
|
||||||
/// for i in 0..10 {
|
/// for i in 0..10 {
|
||||||
/// db.insert(&[i], vec![i])?;
|
/// db.insert(&[i], vec![i])?;
|
||||||
/// }
|
/// }
|
||||||
|
@ -1618,7 +1572,7 @@ impl<
|
||||||
/// ```
|
/// ```
|
||||||
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
/// # let config = sled::Config::tmp().unwrap();
|
/// # let config = sled::Config::tmp().unwrap();
|
||||||
/// # let db: sled::Db<64, 1024, 128> = config.open()?;
|
/// # let db: sled::Db<1024> = config.open()?;
|
||||||
/// use sled::InlineArray;
|
/// use sled::InlineArray;
|
||||||
/// db.insert(&[0, 0, 0], vec![0, 0, 0])?;
|
/// db.insert(&[0, 0, 0], vec![0, 0, 0])?;
|
||||||
/// db.insert(&[0, 0, 1], vec![0, 0, 1])?;
|
/// db.insert(&[0, 0, 1], vec![0, 0, 1])?;
|
||||||
|
@ -1648,10 +1602,7 @@ impl<
|
||||||
/// assert!(r.next().is_none());
|
/// assert!(r.next().is_none());
|
||||||
/// # Ok(()) }
|
/// # Ok(()) }
|
||||||
/// ```
|
/// ```
|
||||||
pub fn scan_prefix<'a, P>(
|
pub fn scan_prefix<'a, P>(&'a self, prefix: P) -> Iter<'a, LEAF_FANOUT>
|
||||||
&'a self,
|
|
||||||
prefix: P,
|
|
||||||
) -> Iter<'a, INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>
|
|
||||||
where
|
where
|
||||||
P: AsRef<[u8]>,
|
P: AsRef<[u8]>,
|
||||||
{
|
{
|
||||||
|
@ -1687,7 +1638,7 @@ impl<
|
||||||
/// ```
|
/// ```
|
||||||
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
/// # let config = sled::Config::tmp().unwrap();
|
/// # let config = sled::Config::tmp().unwrap();
|
||||||
/// # let db: sled::Db<64, 1024, 128> = config.open()?;
|
/// # let db: sled::Db<1024> = config.open()?;
|
||||||
/// db.insert(&[0], vec![0])?;
|
/// db.insert(&[0], vec![0])?;
|
||||||
/// db.insert(&[1], vec![10])?;
|
/// db.insert(&[1], vec![10])?;
|
||||||
/// db.insert(&[2], vec![20])?;
|
/// db.insert(&[2], vec![20])?;
|
||||||
|
@ -1739,7 +1690,7 @@ impl<
|
||||||
/// ```
|
/// ```
|
||||||
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
/// # let config = sled::Config::tmp().unwrap();
|
/// # let config = sled::Config::tmp().unwrap();
|
||||||
/// # let db: sled::Db<64, 1024, 128> = config.open()?;
|
/// # let db: sled::Db<1024> = config.open()?;
|
||||||
///
|
///
|
||||||
/// let data = vec![
|
/// let data = vec![
|
||||||
/// (b"key 1", b"value 1"),
|
/// (b"key 1", b"value 1"),
|
||||||
|
@ -1801,7 +1752,7 @@ impl<
|
||||||
/// ```
|
/// ```
|
||||||
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
/// # let config = sled::Config::tmp().unwrap();
|
/// # let config = sled::Config::tmp().unwrap();
|
||||||
/// # let db: sled::Db<64, 1024, 128> = config.open()?;
|
/// # let db: sled::Db<1024> = config.open()?;
|
||||||
/// db.insert(&[0], vec![0])?;
|
/// db.insert(&[0], vec![0])?;
|
||||||
/// db.insert(&[1], vec![10])?;
|
/// db.insert(&[1], vec![10])?;
|
||||||
/// db.insert(&[2], vec![20])?;
|
/// db.insert(&[2], vec![20])?;
|
||||||
|
@ -1853,7 +1804,7 @@ impl<
|
||||||
/// ```
|
/// ```
|
||||||
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
/// # let config = sled::Config::tmp().unwrap();
|
/// # let config = sled::Config::tmp().unwrap();
|
||||||
/// # let db: sled::Db<64, 1024, 128> = config.open()?;
|
/// # let db: sled::Db<1024> = config.open()?;
|
||||||
///
|
///
|
||||||
/// let data = vec![
|
/// let data = vec![
|
||||||
/// (b"key 1", b"value 1"),
|
/// (b"key 1", b"value 1"),
|
||||||
|
@ -1912,7 +1863,7 @@ impl<
|
||||||
/// ```
|
/// ```
|
||||||
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
/// # let config = sled::Config::tmp().unwrap();
|
/// # let config = sled::Config::tmp().unwrap();
|
||||||
/// # let db: sled::Db<64, 1024, 128> = config.open()?;
|
/// # let db: sled::Db<1024> = config.open()?;
|
||||||
/// db.insert(b"a", vec![0]);
|
/// db.insert(b"a", vec![0]);
|
||||||
/// db.insert(b"b", vec![1]);
|
/// db.insert(b"b", vec![1]);
|
||||||
/// assert_eq!(db.len(), 2);
|
/// assert_eq!(db.len(), 2);
|
||||||
|
@ -1965,13 +1916,8 @@ impl<
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub struct Iter<
|
pub struct Iter<'a, const LEAF_FANOUT: usize> {
|
||||||
'a,
|
inner: &'a Db<LEAF_FANOUT>,
|
||||||
const INDEX_FANOUT: usize,
|
|
||||||
const LEAF_FANOUT: usize,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize,
|
|
||||||
> {
|
|
||||||
inner: &'a Db<INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>,
|
|
||||||
bounds: (Bound<InlineArray>, Bound<InlineArray>),
|
bounds: (Bound<InlineArray>, Bound<InlineArray>),
|
||||||
next_calls: usize,
|
next_calls: usize,
|
||||||
next_back_calls: usize,
|
next_back_calls: usize,
|
||||||
|
@ -1981,14 +1927,7 @@ pub struct Iter<
|
||||||
prefetched_back: VecDeque<(InlineArray, InlineArray)>,
|
prefetched_back: VecDeque<(InlineArray, InlineArray)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<'a, const LEAF_FANOUT: usize> Iterator for Iter<'a, LEAF_FANOUT> {
|
||||||
'a,
|
|
||||||
const INDEX_FANOUT: usize,
|
|
||||||
const LEAF_FANOUT: usize,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize,
|
|
||||||
> Iterator
|
|
||||||
for Iter<'a, INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>
|
|
||||||
{
|
|
||||||
type Item = io::Result<(InlineArray, InlineArray)>;
|
type Item = io::Result<(InlineArray, InlineArray)>;
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
@ -2024,13 +1963,8 @@ impl<
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<'a, const LEAF_FANOUT: usize> DoubleEndedIterator
|
||||||
'a,
|
for Iter<'a, LEAF_FANOUT>
|
||||||
const INDEX_FANOUT: usize,
|
|
||||||
const LEAF_FANOUT: usize,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize,
|
|
||||||
> DoubleEndedIterator
|
|
||||||
for Iter<'a, INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>
|
|
||||||
{
|
{
|
||||||
fn next_back(&mut self) -> Option<Self::Item> {
|
fn next_back(&mut self) -> Option<Self::Item> {
|
||||||
self.next_back_calls += 1;
|
self.next_back_calls += 1;
|
||||||
|
@ -2108,13 +2042,7 @@ impl<
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<'a, const LEAF_FANOUT: usize> Iter<'a, LEAF_FANOUT> {
|
||||||
'a,
|
|
||||||
const INDEX_FANOUT: usize,
|
|
||||||
const LEAF_FANOUT: usize,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize,
|
|
||||||
> Iter<'a, INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>
|
|
||||||
{
|
|
||||||
pub fn keys(
|
pub fn keys(
|
||||||
self,
|
self,
|
||||||
) -> impl 'a + DoubleEndedIterator<Item = io::Result<InlineArray>> {
|
) -> impl 'a + DoubleEndedIterator<Item = io::Result<InlineArray>> {
|
||||||
|
@ -2128,17 +2056,9 @@ impl<
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<'a, const LEAF_FANOUT: usize> IntoIterator for &'a Db<LEAF_FANOUT> {
|
||||||
'a,
|
|
||||||
const INDEX_FANOUT: usize,
|
|
||||||
const LEAF_FANOUT: usize,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize,
|
|
||||||
> IntoIterator
|
|
||||||
for &'a Db<INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>
|
|
||||||
{
|
|
||||||
type Item = io::Result<(InlineArray, InlineArray)>;
|
type Item = io::Result<(InlineArray, InlineArray)>;
|
||||||
type IntoIter =
|
type IntoIter = Iter<'a, LEAF_FANOUT>;
|
||||||
Iter<'a, INDEX_FANOUT, LEAF_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE>;
|
|
||||||
|
|
||||||
fn into_iter(self) -> Self::IntoIter {
|
fn into_iter(self) -> Self::IntoIter {
|
||||||
self.iter()
|
self.iter()
|
||||||
|
@ -2156,7 +2076,7 @@ impl<
|
||||||
/// use sled::{Batch, open};
|
/// use sled::{Batch, open};
|
||||||
///
|
///
|
||||||
/// # let _ = std::fs::remove_dir_all("batch_db_2");
|
/// # let _ = std::fs::remove_dir_all("batch_db_2");
|
||||||
/// let db: sled::Db<64, 1024, 128> = open("batch_db_2")?;
|
/// let db: sled::Db<1024> = open("batch_db_2")?;
|
||||||
/// db.insert("key_0", "val_0")?;
|
/// db.insert("key_0", "val_0")?;
|
||||||
///
|
///
|
||||||
/// let mut batch = Batch::default();
|
/// let mut batch = Batch::default();
|
||||||
|
@ -2203,11 +2123,7 @@ impl Batch {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn initialize<
|
fn initialize<const LEAF_FANOUT: usize>(
|
||||||
const INDEX_FANOUT: usize,
|
|
||||||
const LEAF_FANOUT: usize,
|
|
||||||
const EBR_LOCAL_GC_BUFFER_SIZE: usize,
|
|
||||||
>(
|
|
||||||
index_data: &[(u64, InlineArray)],
|
index_data: &[(u64, InlineArray)],
|
||||||
first_id_opt: Option<u64>,
|
first_id_opt: Option<u64>,
|
||||||
) -> ConcurrentMap<
|
) -> ConcurrentMap<
|
||||||
|
@ -2228,7 +2144,7 @@ fn initialize<
|
||||||
data: StackMap::new(),
|
data: StackMap::new(),
|
||||||
in_memory_size: mem::size_of::<Leaf<LEAF_FANOUT>>(),
|
in_memory_size: mem::size_of::<Leaf<LEAF_FANOUT>>(),
|
||||||
mutation_count: 0,
|
mutation_count: 0,
|
||||||
page_out_on_flush: false,
|
page_out_on_flush: None,
|
||||||
};
|
};
|
||||||
let first_node = Node {
|
let first_node = Node {
|
||||||
id: NodeId(first_id),
|
id: NodeId(first_id),
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
|
// TODO allow waiting flusher to start collecting dirty pages
|
||||||
|
// TODO unset page_out_on_flush when we send serialized bytes for a page to dirty
|
||||||
|
// TODO serialize flush batch in parallel
|
||||||
// TODO add failpoints to writepath
|
// TODO add failpoints to writepath
|
||||||
// TODO ensure that any time something is added to dirty, that there was a live
|
// TODO ensure that any time something is added to dirty, that there was a live
|
||||||
// flush epoch already in-scope
|
// flush epoch already in-scope
|
||||||
// TODO re-enable transaction tests in test_tree.rs
|
// TODO re-enable transaction tests in test_tree.rs
|
||||||
// TODO free empty leaves with try_lock on left sibling, set hi key, remove from indexes, store deletion in metadata_store
|
// TODO free empty leaves with try_lock on left sibling, set hi key, remove from indexes, store deletion in metadata_store
|
||||||
// TODO name all spawned maintenance threads and rayon threadpool
|
|
||||||
// TODO heap maintenance w/ speculative write followed by CAS in pt
|
// TODO heap maintenance w/ speculative write followed by CAS in pt
|
||||||
// maybe with global writer lock that controls flushers too
|
// maybe with global writer lock that controls flushers too
|
||||||
// TODO set explicit max key and value sizes w/ corresponding heap
|
// TODO set explicit max key and value sizes w/ corresponding heap
|
||||||
|
|
|
@ -108,7 +108,13 @@ fn worker(
|
||||||
inner.global_error.load(Ordering::Acquire);
|
inner.global_error.load(Ordering::Acquire);
|
||||||
|
|
||||||
if !err_ptr.is_null() {
|
if !err_ptr.is_null() {
|
||||||
log::error!("compaction thread prematurely terminating after global error set");
|
let deref: &(io::ErrorKind, String) = unsafe { &*err_ptr };
|
||||||
|
let error = io::Error::new(deref.0, deref.1.clone());
|
||||||
|
|
||||||
|
log::error!(
|
||||||
|
"compaction thread terminating after global error set to {:?}",
|
||||||
|
error
|
||||||
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,7 +131,7 @@ fn worker(
|
||||||
match write_res {
|
match write_res {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
set_error(&inner.global_error, &e);
|
set_error(&inner.global_error, &e);
|
||||||
log::error!("log compactor thread encountered error - setting global fatal error and shutting down compactions");
|
log::error!("log compactor thread encountered error: {:?} - setting global fatal error and shutting down compactions", e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Ok(recovery) => {
|
Ok(recovery) => {
|
||||||
|
@ -292,13 +298,26 @@ impl MetadataStore {
|
||||||
};
|
};
|
||||||
|
|
||||||
let worker_inner = inner.clone();
|
let worker_inner = inner.clone();
|
||||||
std::thread::spawn(move || {
|
|
||||||
worker(
|
let spawn_res = std::thread::Builder::new()
|
||||||
rx,
|
.name("sled_flusher".into())
|
||||||
recovery.id_for_next_log.checked_sub(1).unwrap(),
|
.spawn(move || {
|
||||||
worker_inner,
|
worker(
|
||||||
)
|
rx,
|
||||||
});
|
recovery.id_for_next_log.checked_sub(1).unwrap(),
|
||||||
|
worker_inner,
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
|
if let Err(e) = spawn_res {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
format!(
|
||||||
|
"unable to spawn metadata compactor thread for sled database: {:?}",
|
||||||
|
e
|
||||||
|
),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
Ok((MetadataStore { inner, is_shut_down: false }, recovery.recovered))
|
Ok((MetadataStore { inner, is_shut_down: false }, recovery.recovered))
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ use sled::{Config, Db as SledDb};
|
||||||
const CONCURRENCY: usize = 32;
|
const CONCURRENCY: usize = 32;
|
||||||
const N_KEYS: usize = 1024;
|
const N_KEYS: usize = 1024;
|
||||||
|
|
||||||
type Db = SledDb<8, 8, 8>;
|
type Db = SledDb<8>;
|
||||||
|
|
||||||
fn batch_writer(db: Db, barrier: Arc<Barrier>, thread_number: usize) {
|
fn batch_writer(db: Db, barrier: Arc<Barrier>, thread_number: usize) {
|
||||||
barrier.wait();
|
barrier.wait();
|
||||||
|
|
|
@ -13,7 +13,7 @@ use sled::{Config, Db as SledDb};
|
||||||
|
|
||||||
use common::cleanup;
|
use common::cleanup;
|
||||||
|
|
||||||
type Db = SledDb<8, 8, 8>;
|
type Db = SledDb<8>;
|
||||||
|
|
||||||
const TEST_ENV_VAR: &str = "SLED_CRASH_TEST";
|
const TEST_ENV_VAR: &str = "SLED_CRASH_TEST";
|
||||||
const N_TESTS: usize = 100;
|
const N_TESTS: usize = 100;
|
||||||
|
|
|
@ -7,7 +7,7 @@ mod common;
|
||||||
fn size_leak() -> io::Result<()> {
|
fn size_leak() -> io::Result<()> {
|
||||||
common::setup_logger();
|
common::setup_logger();
|
||||||
|
|
||||||
let tree: sled::Db<64, 1024, 128> =
|
let tree: sled::Db<1024> =
|
||||||
sled::Config::tmp()?.flush_every_ms(None).open()?;
|
sled::Config::tmp()?.flush_every_ms(None).open()?;
|
||||||
|
|
||||||
for _ in 0..10_000 {
|
for _ in 0..10_000 {
|
||||||
|
|
|
@ -18,7 +18,7 @@ use quickcheck::{Gen, QuickCheck};
|
||||||
// use sled::transaction::*;
|
// use sled::transaction::*;
|
||||||
use sled::{Config, Db as SledDb};
|
use sled::{Config, Db as SledDb};
|
||||||
|
|
||||||
type Db = SledDb<4, 3, 1>;
|
type Db = SledDb<3>;
|
||||||
|
|
||||||
use tree::{
|
use tree::{
|
||||||
prop_tree_matches_btreemap, Key,
|
prop_tree_matches_btreemap, Key,
|
||||||
|
@ -223,7 +223,7 @@ fn varied_compression_ratios() {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_pop_first() -> io::Result<()> {
|
fn test_pop_first() -> io::Result<()> {
|
||||||
let config = sled::Config::tmp().unwrap();
|
let config = sled::Config::tmp().unwrap();
|
||||||
let db: sled::Db<64, 4, 128> = config.open()?;
|
let db: sled::Db<4> = config.open()?;
|
||||||
db.insert(&[0], vec![0])?;
|
db.insert(&[0], vec![0])?;
|
||||||
db.insert(&[1], vec![10])?;
|
db.insert(&[1], vec![10])?;
|
||||||
db.insert(&[2], vec![20])?;
|
db.insert(&[2], vec![20])?;
|
||||||
|
@ -246,7 +246,7 @@ fn test_pop_first() -> io::Result<()> {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_pop_last_in_range() -> io::Result<()> {
|
fn test_pop_last_in_range() -> io::Result<()> {
|
||||||
let config = sled::Config::tmp().unwrap();
|
let config = sled::Config::tmp().unwrap();
|
||||||
let db: sled::Db<64, 4, 128> = config.open()?;
|
let db: sled::Db<4> = config.open()?;
|
||||||
|
|
||||||
let data = vec![
|
let data = vec![
|
||||||
(b"key 1", b"value 1"),
|
(b"key 1", b"value 1"),
|
||||||
|
@ -1030,7 +1030,7 @@ fn tree_range() {
|
||||||
common::setup_logger();
|
common::setup_logger();
|
||||||
|
|
||||||
let config = Config::tmp().unwrap().flush_every_ms(Some(1));
|
let config = Config::tmp().unwrap().flush_every_ms(Some(1));
|
||||||
let t: sled::Db<5, 7, 11> = config.open().unwrap();
|
let t: sled::Db<7> = config.open().unwrap();
|
||||||
|
|
||||||
t.insert(b"0", vec![0]).unwrap();
|
t.insert(b"0", vec![0]).unwrap();
|
||||||
t.insert(b"1", vec![10]).unwrap();
|
t.insert(b"1", vec![10]).unwrap();
|
||||||
|
@ -1081,14 +1081,14 @@ fn recover_tree() {
|
||||||
|
|
||||||
let config = Config::tmp().unwrap().flush_every_ms(Some(1));
|
let config = Config::tmp().unwrap().flush_every_ms(Some(1));
|
||||||
|
|
||||||
let t: sled::Db<5, 7, 128> = config.open().unwrap();
|
let t: sled::Db<7> = config.open().unwrap();
|
||||||
for i in 0..N_PER_THREAD {
|
for i in 0..N_PER_THREAD {
|
||||||
let k = kv(i);
|
let k = kv(i);
|
||||||
t.insert(&k, k.clone()).unwrap();
|
t.insert(&k, k.clone()).unwrap();
|
||||||
}
|
}
|
||||||
drop(t);
|
drop(t);
|
||||||
|
|
||||||
let t: sled::Db<5, 7, 128> = config.open().unwrap();
|
let t: sled::Db<7> = config.open().unwrap();
|
||||||
for i in 0..N_PER_THREAD {
|
for i in 0..N_PER_THREAD {
|
||||||
let k = kv(i as usize);
|
let k = kv(i as usize);
|
||||||
assert_eq!(t.get(&*k).unwrap().unwrap(), k);
|
assert_eq!(t.get(&*k).unwrap().unwrap(), k);
|
||||||
|
@ -1096,7 +1096,7 @@ fn recover_tree() {
|
||||||
}
|
}
|
||||||
drop(t);
|
drop(t);
|
||||||
|
|
||||||
let t: sled::Db<5, 7, 128> = config.open().unwrap();
|
let t: sled::Db<7> = config.open().unwrap();
|
||||||
for i in 0..N_PER_THREAD {
|
for i in 0..N_PER_THREAD {
|
||||||
let k = kv(i as usize);
|
let k = kv(i as usize);
|
||||||
assert!(t.get(&*k).unwrap().is_none());
|
assert!(t.get(&*k).unwrap().is_none());
|
||||||
|
|
|
@ -5,7 +5,7 @@ use rand_distr::{Distribution, Gamma};
|
||||||
|
|
||||||
use sled::{Config, Db as SledDb, InlineArray};
|
use sled::{Config, Db as SledDb, InlineArray};
|
||||||
|
|
||||||
type Db = SledDb<4, 3, 1>;
|
type Db = SledDb<3>;
|
||||||
|
|
||||||
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq)]
|
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq)]
|
||||||
pub struct Key(pub Vec<u8>);
|
pub struct Key(pub Vec<u8>);
|
||||||
|
|
Loading…
Reference in New Issue