Compare commits

...

4 Commits

2 changed files with 31 additions and 29 deletions

View File

@ -38,6 +38,10 @@ impl<const LEAF_FANOUT: usize> Default for Leaf<LEAF_FANOUT> {
}
impl<const LEAF_FANOUT: usize> Leaf<LEAF_FANOUT> {
pub(crate) const fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub(crate) fn set_dirty_epoch(&mut self, epoch: FlushEpoch) {
if let Some(current_epoch) = self.dirty_flush_epoch {
assert!(current_epoch <= epoch);

View File

@ -1208,7 +1208,7 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
pub fn range<K, R>(&self, range: R) -> Iter<LEAF_FANOUT>
where
K: AsRef<[u8]>,
K: AsRef<[u8]> + ?Sized,
R: RangeBounds<K>,
{
let start: Bound<InlineArray> =
@ -1334,21 +1334,6 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
let old_dirty_epoch = leaf.dirty_flush_epoch.take().unwrap();
leaf.max_unflushed_epoch = Some(old_dirty_epoch);
#[cfg(feature = "for-internal-testing-only")]
{
self.cache.event_verifier.mark(
node.object_id,
old_dirty_epoch,
event_verifier::State::CleanPagedIn,
concat!(
file!(),
':',
line!(),
"batch-cooperative-serialization"
),
);
}
// be extra-explicit about serialized bytes
let leaf_ref: &Leaf<LEAF_FANOUT> = &*leaf;
@ -1361,6 +1346,21 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
old_dirty_epoch
);
#[cfg(feature = "for-internal-testing-only")]
{
self.cache.event_verifier.mark(
node.object_id,
old_dirty_epoch,
event_verifier::State::CooperativelySerialized,
concat!(
file!(),
':',
line!(),
":batch-cooperative-serialization"
),
);
}
self.cache.install_dirty(
old_dirty_epoch,
node.object_id,
@ -1373,16 +1373,6 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
},
);
#[cfg(feature = "for-internal-testing-only")]
{
self.cache.event_verifier.mark(
node.object_id,
old_dirty_epoch,
event_verifier::State::Dirty,
concat!(file!(), ':', line!(), ":apply-batch"),
);
}
assert!(
old_flush_epoch < flush_epoch_guard.epoch(),
"flush epochs somehow became unlinked"
@ -1391,16 +1381,19 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
}
let mut splits: Vec<(InlineArray, Object<LEAF_FANOUT>)> = vec![];
let mut merges: BTreeMap<InlineArray, Object<LEAF_FANOUT>> =
BTreeMap::new();
// Insert and split when full
for (key, value_opt) in batch.writes {
let range = ..=&key;
let (_lo, (ref mut w, _id)) = acquired_locks
let (lo, (ref mut w, object)) = acquired_locks
.range_mut::<InlineArray, _>(range)
.next_back()
.unwrap();
let leaf = w.leaf.as_mut().unwrap();
assert_eq!(lo, &leaf.lo);
assert!(leaf.lo <= key);
if let Some(hi) = &leaf.hi {
assert!(hi > &key);
@ -1408,6 +1401,7 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
if let Some(value) = value_opt {
leaf.insert(key, value);
merges.remove(lo);
if let Some((split_key, rhs_node)) = leaf.split_if_full(
new_epoch,
@ -1422,6 +1416,10 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
}
} else {
leaf.remove(&key);
if leaf.is_empty() {
merges.insert(lo.clone(), object.clone());
}
}
}
@ -1762,7 +1760,7 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
range: R,
) -> io::Result<Option<(InlineArray, InlineArray)>>
where
K: AsRef<[u8]>,
K: AsRef<[u8]> + ?Sized,
R: Clone + RangeBounds<K>,
{
loop {
@ -1871,7 +1869,7 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
range: R,
) -> io::Result<Option<(InlineArray, InlineArray)>>
where
K: AsRef<[u8]>,
K: AsRef<[u8]> + ?Sized,
R: Clone + RangeBounds<K>,
{
loop {