Add notion of max unflushed epoch to leaves

This commit is contained in:
Tyler Neely 2024-03-10 07:51:01 +01:00
parent 16c108de0d
commit 6905c87dd4
6 changed files with 47 additions and 19 deletions

View File

@ -14,6 +14,8 @@ pub(crate) struct Leaf<const LEAF_FANOUT: usize> {
pub page_out_on_flush: Option<FlushEpoch>,
#[serde(skip)]
pub deleted: Option<FlushEpoch>,
#[serde(skip)]
pub max_unflushed_epoch: Option<FlushEpoch>,
}
impl<const LEAF_FANOUT: usize> Default for Leaf<LEAF_FANOUT> {
@ -30,6 +32,7 @@ impl<const LEAF_FANOUT: usize> Default for Leaf<LEAF_FANOUT> {
mutation_count: 0,
page_out_on_flush: None,
deleted: None,
max_unflushed_epoch: None,
}
}
}

View File

@ -6,6 +6,7 @@
// * add merges to iterator test and assert it deadlocks
// * alternative is to merge right, not left
// * page-out needs to be deferred until after any flush of the dirty epoch
// * need to remove max_unflushed_epoch after flushing it
// * can't send reliable page-out request backwards from 7->6
// * re-locking every mutex in a writebatch feels bad
// * need to signal stability status forward

View File

@ -424,7 +424,11 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
if let Some(dirty_epoch) = leaf.dirty_flush_epoch {
// We can't page out this leaf until it has been
// flushed, because its changes are not yet durable.
leaf.page_out_on_flush = Some(dirty_epoch);
leaf.page_out_on_flush =
leaf.page_out_on_flush.max(Some(dirty_epoch));
} else if let Some(max_unflushed_epoch) = leaf.max_unflushed_epoch {
leaf.page_out_on_flush =
leaf.page_out_on_flush.max(Some(max_unflushed_epoch));
} else {
#[cfg(feature = "for-internal-testing-only")]
{
@ -603,7 +607,9 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
}
assert!(deleted_at > flush_through_epoch);
}
leaf_ref.dirty_flush_epoch.take();
leaf_ref.max_unflushed_epoch =
leaf_ref.dirty_flush_epoch.take();
#[cfg(feature = "for-internal-testing-only")]
{

View File

@ -542,6 +542,8 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
let (low_key, mut write, node) = self.page_in(key, reader_epoch)?;
// by checking into an epoch after acquiring the node mutex, we
// avoid inversions where progress may be observed to go backwards.
let flush_epoch_guard = self.cache.check_into_flush_epoch();
let leaf = write.leaf.as_mut().unwrap();
@ -571,11 +573,8 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
assert!(old_dirty_epoch < flush_epoch_guard.epoch());
// cooperatively serialize and put into dirty
//leaf.dirty_flush_epoch.take().unwrap();
// can't make many assertions about the page out epoch
leaf.max_unflushed_epoch = leaf.dirty_flush_epoch.take();
leaf.page_out_on_flush.take();
log::trace!(
"starting cooperatively serializing {:?} for {:?} because we want to use it in {:?}",
node.object_id, old_dirty_epoch, flush_epoch_guard.epoch(),
@ -623,10 +622,8 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
node.object_id, old_dirty_epoch, flush_epoch_guard.epoch(),
);
// TODO this won't hold w/ concurrent flushes
assert_eq!(
old_dirty_epoch.increment(),
flush_epoch_guard.epoch(),
assert!(
old_dirty_epoch < flush_epoch_guard.epoch(),
"flush epochs somehow became unlinked"
);
}
@ -1324,17 +1321,18 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
continue;
}
assert_eq!(old_flush_epoch.increment(), new_epoch);
assert!(old_flush_epoch < new_epoch);
log::trace!(
"cooperatively flushing {:?} with dirty {:?} after checking into {:?}",
node.object_id,
old_flush_epoch,
new_epoch
);
"cooperatively flushing {:?} with dirty {:?} after checking into {:?}",
node.object_id,
old_flush_epoch,
new_epoch
);
// cooperatively serialize and put into dirty
let old_dirty_epoch = leaf.dirty_flush_epoch.take().unwrap();
leaf.max_unflushed_epoch = Some(old_dirty_epoch);
#[cfg(feature = "for-internal-testing-only")]
{
@ -1385,9 +1383,8 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
);
}
assert_eq!(
old_flush_epoch.increment(),
flush_epoch_guard.epoch(),
assert!(
old_flush_epoch < flush_epoch_guard.epoch(),
"flush epochs somehow became unlinked"
);
}
@ -2265,6 +2262,7 @@ impl<const LEAF_FANOUT: usize> Leaf<LEAF_FANOUT> {
mutation_count: 0,
page_out_on_flush: None,
deleted: None,
max_unflushed_epoch: None,
};
rhs.set_in_memory_size();

View File

@ -30,6 +30,20 @@ fn concurrent_batch_atomicity() {
let mut threads = vec![];
let flusher_barrier = Arc::new(Barrier::new(CONCURRENCY));
for tn in 0..CONCURRENCY {
let db = db.clone();
let barrier = flusher_barrier.clone();
let thread = thread::Builder::new()
.name(format!("t(thread: {} flusher)", tn))
.spawn(move || {
db.flush().unwrap();
barrier.wait();
})
.expect("should be able to spawn thread");
threads.push(thread);
}
let barrier = Arc::new(Barrier::new(CONCURRENCY + 1));
for thread_number in 0..CONCURRENCY {
let db = db.clone();

View File

@ -370,12 +370,15 @@ fn concurrent_tree_ops() {
($t:ident, $f:expr) => {
let mut threads = vec![];
let flusher_barrier = Arc::new(Barrier::new(N_THREADS));
for tn in 0..N_THREADS {
let tree = $t.clone();
let barrier = flusher_barrier.clone();
let thread = thread::Builder::new()
.name(format!("t(thread: {} flusher)", tn))
.spawn(move || {
tree.flush().unwrap();
barrier.wait();
})
.expect("should be able to spawn thread");
threads.push(thread);
@ -534,12 +537,15 @@ fn concurrent_tree_iter() -> io::Result<()> {
let mut threads: Vec<thread::JoinHandle<io::Result<()>>> = vec![];
let flusher_barrier = Arc::new(Barrier::new(N_THREADS));
for tn in 0..N_THREADS {
let tree = t.clone();
let barrier = flusher_barrier.clone();
let thread = thread::Builder::new()
.name(format!("t(thread: {} flusher)", tn))
.spawn(move || {
tree.flush().unwrap();
barrier.wait();
Ok(())
})
.expect("should be able to spawn thread");