From ce51eeb5e0b8103a20a64b9130e5b824f0b1e6a7 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 3 Oct 2023 16:33:07 +0200 Subject: [PATCH] Standardize naming on ObjectId. Include CollectionId and low key in Object struct --- src/db.rs | 14 +++-- src/event_verifier.rs | 8 +-- src/heap.rs | 48 ++++++++-------- src/lib.rs | 9 ++- src/metadata_store.rs | 28 +++++----- src/object_cache.rs | 109 +++++++++++++++++++++--------------- src/tree.rs | 126 +++++++++++++++++++++++------------------- 7 files changed, 191 insertions(+), 151 deletions(-) diff --git a/src/db.rs b/src/db.rs index 82c03656..ab610f7a 100644 --- a/src/db.rs +++ b/src/db.rs @@ -232,11 +232,14 @@ impl Db { collection_id ); - let empty_node = cache.allocate_node(); + let initial_low_key = InlineArray::default(); + + let empty_node = + cache.allocate_node(collection_id, initial_low_key.clone()); let index = Index::default(); - assert!(index.insert(InlineArray::default(), empty_node).is_none()); + assert!(index.insert(initial_low_key, empty_node).is_none()); let tree = Tree::new( collection_id, @@ -482,11 +485,14 @@ impl Db { let collection_id = CollectionId(self.collection_id_allocator.allocate()); - let empty_node = self.cache.allocate_node(); + let initial_low_key = InlineArray::default(); + + let empty_node = + self.cache.allocate_node(collection_id, initial_low_key.clone()); let index = Index::default(); - assert!(index.insert(InlineArray::default(), empty_node).is_none()); + assert!(index.insert(initial_low_key, empty_node).is_none()); let tree = Tree::new( collection_id, diff --git a/src/event_verifier.rs b/src/event_verifier.rs index 5d6310ac..bafecd9f 100644 --- a/src/event_verifier.rs +++ b/src/event_verifier.rs @@ -12,7 +12,7 @@ pub(crate) struct EventVerifier { impl EventVerifier { pub(crate) fn mark_flush( &self, - node_id: ObjectId, + object_id: ObjectId, flush_epoch: NonZeroU64, mutation_count: u64, ) { @@ -20,13 +20,13 @@ impl EventVerifier { let epoch_entry = flush_model.entry(flush_epoch).or_default(); - let last = epoch_entry.insert(node_id, mutation_count); + let last = epoch_entry.insert(object_id, mutation_count); assert_eq!(last, None); } pub(crate) fn mark_unexpected_flush_epoch( &self, - node_id: ObjectId, + object_id: ObjectId, flush_epoch: NonZeroU64, mutation_count: u64, ) { @@ -34,7 +34,7 @@ impl EventVerifier { // assert that this object+mutation count was // already flushed let epoch_entry = flush_model.entry(flush_epoch).or_default(); - let flushed_mutation_count = epoch_entry.get(&node_id); + let flushed_mutation_count = epoch_entry.get(&object_id); println!("checking!"); assert_eq!(Some(&mutation_count), flushed_mutation_count); println!("good!"); diff --git a/src/heap.rs b/src/heap.rs index 178cc19d..3cab9cd9 100644 --- a/src/heap.rs +++ b/src/heap.rs @@ -135,7 +135,7 @@ pub struct Config {} #[derive(Debug)] pub(crate) struct ObjectRecovery { - pub node_id: ObjectId, + pub object_id: ObjectId, pub collection_id: CollectionId, pub metadata: InlineArray, } @@ -295,24 +295,24 @@ pub(crate) fn recover>( let pt = ObjectLocationMap::default(); let mut recovered_nodes = Vec::::with_capacity(recovered_metadata.len()); - let mut node_ids: FnvHashSet = Default::default(); + let mut object_ids: FnvHashSet = Default::default(); let mut slots_per_slab: [FnvHashSet; N_SLABS] = core::array::from_fn(|_| Default::default()); for update_metadata in recovered_metadata { match update_metadata { UpdateMetadata::Store { - node_id, + object_id, collection_id, location, metadata, } => { - node_ids.insert(node_id.0); + object_ids.insert(object_id.0); let slab_address = SlabAddress::from(location); slots_per_slab[slab_address.slab_id as usize] .insert(slab_address.slot()); - pt.insert(node_id, slab_address); + pt.insert(object_id, slab_address); recovered_nodes.push(ObjectRecovery { - node_id, + object_id, collection_id, metadata: metadata.clone(), }); @@ -347,7 +347,9 @@ pub(crate) fn recover>( heap: Heap { slabs: Arc::new(slabs.try_into().unwrap()), path: path.into(), - object_id_allocator: Arc::new(Allocator::from_allocated(&node_ids)), + object_id_allocator: Arc::new(Allocator::from_allocated( + &object_ids, + )), pt, global_error: metadata_store.get_global_error_arc(), metadata_store: Arc::new(metadata_store), @@ -632,13 +634,13 @@ fn set_error( #[derive(Debug)] pub(crate) enum Update { Store { - node_id: ObjectId, + object_id: ObjectId, collection_id: CollectionId, metadata: InlineArray, data: Vec, }, Free { - node_id: ObjectId, + object_id: ObjectId, collection_id: CollectionId, }, } @@ -646,22 +648,22 @@ pub(crate) enum Update { #[derive(Debug, PartialOrd, Ord, PartialEq, Eq)] pub(crate) enum UpdateMetadata { Store { - node_id: ObjectId, + object_id: ObjectId, collection_id: CollectionId, metadata: InlineArray, location: NonZeroU64, }, Free { - node_id: ObjectId, + object_id: ObjectId, collection_id: CollectionId, }, } impl UpdateMetadata { - pub fn node_id(&self) -> ObjectId { + pub fn object_id(&self) -> ObjectId { match self { - UpdateMetadata::Store { node_id, .. } - | UpdateMetadata::Free { node_id, .. } => *node_id, + UpdateMetadata::Store { object_id, .. } + | UpdateMetadata::Free { object_id, .. } => *object_id, } } } @@ -739,7 +741,7 @@ impl Heap { let slabs = &self.slabs; let map_closure = |update: Update| match update { - Update::Store { node_id, collection_id, metadata, data } => { + Update::Store { object_id, collection_id, metadata, data } => { let slab_id = slab_for_size(data.len()); let slab = &slabs[usize::from(slab_id)]; let slot = slab.slot_allocator.allocate(); @@ -755,14 +757,14 @@ impl Heap { return Err(e); } Ok(UpdateMetadata::Store { - node_id, + object_id, collection_id, metadata, location: new_location_nzu, }) } - Update::Free { node_id, collection_id } => { - Ok(UpdateMetadata::Free { node_id, collection_id }) + Update::Free { object_id, collection_id } => { + Ok(UpdateMetadata::Free { object_id, collection_id }) } }; @@ -790,15 +792,15 @@ impl Heap { // reclaim previous disk locations for future writes for update_metadata in metadata_batch { let last_address_opt = match update_metadata { - UpdateMetadata::Store { node_id, location, .. } => { - self.pt.insert(node_id, SlabAddress::from(location)) + UpdateMetadata::Store { object_id, location, .. } => { + self.pt.insert(object_id, SlabAddress::from(location)) } - UpdateMetadata::Free { node_id, .. } => { + UpdateMetadata::Free { object_id, .. } => { guard.defer_drop(DeferredFree { allocator: self.object_id_allocator.clone(), - freed_slot: node_id.0, + freed_slot: object_id.0, }); - self.pt.remove(node_id) + self.pt.remove(object_id) } }; diff --git a/src/lib.rs b/src/lib.rs index 4bb481a1..0d68dff3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,10 @@ +// TODO store low key and collection ID directly on Object +// TODO make the Arc = #[derive(Debug, Clone)] struct Object { // used for access in heap::Heap - id: ObjectId, + object_id: ObjectId, + collection_id: CollectionId, + low_key: InlineArray, inner: CacheBox, } impl PartialEq for Object { fn eq(&self, other: &Self) -> bool { - self.id == other.id + self.object_id == other.object_id } } diff --git a/src/metadata_store.rs b/src/metadata_store.rs index 2f13608b..fe3a2967 100644 --- a/src/metadata_store.rs +++ b/src/metadata_store.rs @@ -426,12 +426,12 @@ fn serialize_batch(batch: &[UpdateMetadata]) -> Vec { for update_metadata in batch { match update_metadata { UpdateMetadata::Store { - node_id, + object_id, collection_id, metadata, location, } => { - batch_encoder.write_all(&node_id.0.to_le_bytes()).unwrap(); + batch_encoder.write_all(&object_id.0.to_le_bytes()).unwrap(); batch_encoder .write_all(&collection_id.0.to_le_bytes()) .unwrap(); @@ -441,8 +441,8 @@ fn serialize_batch(batch: &[UpdateMetadata]) -> Vec { batch_encoder.write_all(&metadata_len.to_le_bytes()).unwrap(); batch_encoder.write_all(&metadata).unwrap(); } - UpdateMetadata::Free { node_id, collection_id } => { - batch_encoder.write_all(&node_id.0.to_le_bytes()).unwrap(); + UpdateMetadata::Free { object_id, collection_id } => { + batch_encoder.write_all(&object_id.0.to_le_bytes()).unwrap(); batch_encoder .write_all(&collection_id.0.to_le_bytes()) .unwrap(); @@ -508,14 +508,14 @@ fn read_frame( let mut decoder = ZstdDecoder::new(&reusable_frame_buffer[8..len + 8]) .expect("failed to create zstd decoder"); - let mut node_id_buf: [u8; 8] = [0; 8]; + let mut object_id_buf: [u8; 8] = [0; 8]; let mut collection_id_buf: [u8; 8] = [0; 8]; let mut location_buf: [u8; 8] = [0; 8]; let mut metadata_len_buf: [u8; 8] = [0; 8]; let mut metadata_buf = vec![]; loop { let first_read_res = decoder - .read_exact(&mut node_id_buf) + .read_exact(&mut object_id_buf) .and_then(|_| decoder.read_exact(&mut collection_id_buf)) .and_then(|_| decoder.read_exact(&mut location_buf)) .and_then(|_| decoder.read_exact(&mut metadata_len_buf)); @@ -528,7 +528,7 @@ fn read_frame( } } - let node_id = ObjectId(u64::from_le_bytes(node_id_buf)); + let object_id = ObjectId(u64::from_le_bytes(object_id_buf)); let collection_id = CollectionId(u64::from_le_bytes(collection_id_buf)); let location = u64::from_le_bytes(location_buf); @@ -548,13 +548,13 @@ fn read_frame( let metadata = InlineArray::from(&*metadata_buf); ret.push(UpdateMetadata::Store { - node_id, + object_id, collection_id, location: location_nzu, metadata, }); } else { - ret.push(UpdateMetadata::Free { node_id, collection_id }); + ret.push(UpdateMetadata::Free { object_id, collection_id }); } } @@ -576,7 +576,7 @@ fn read_log( while let Ok(frame) = read_frame(&mut file, &mut reusable_frame_buffer) { for update_metadata in frame { - ret.insert(update_metadata.node_id(), update_metadata); + ret.insert(update_metadata.object_id(), update_metadata); } } @@ -599,7 +599,7 @@ fn read_snapshot( let frame: FnvHashMap = raw_frame .into_iter() - .map(|update_metadata| (update_metadata.node_id(), update_metadata)) + .map(|update_metadata| (update_metadata.object_id(), update_metadata)) .collect(); log::trace!("recovered {} items in snapshot {}", frame.len(), lsn); @@ -735,11 +735,11 @@ fn read_snapshot_and_apply_logs( for (log_id, log_datum) in log_data_res? { max_log_id = max_log_id.max(log_id); - for (node_id, update_metadata) in log_datum { + for (object_id, update_metadata) in log_datum { if matches!(update_metadata, UpdateMetadata::Store { .. }) { - recovered.insert(node_id, update_metadata); + recovered.insert(object_id, update_metadata); } else { - let _previous = recovered.remove(&node_id); + let _previous = recovered.remove(&object_id); // TODO: assert!(previous.is_some()); } } diff --git a/src/object_cache.rs b/src/object_cache.rs index 24c5d665..cd921bf7 100644 --- a/src/object_cache.rs +++ b/src/object_cache.rs @@ -18,14 +18,14 @@ pub(crate) enum Dirty { collection_id: CollectionId, }, CooperativelySerialized { - node_id: ObjectId, + object_id: ObjectId, collection_id: CollectionId, low_key: InlineArray, data: Arc>, mutation_count: u64, }, MergedAndDeleted { - node_id: ObjectId, + object_id: ObjectId, collection_id: CollectionId, }, } @@ -44,7 +44,7 @@ impl Dirty { pub(crate) struct ObjectCache { pub config: Config, global_error: Arc>, - pub node_id_index: ConcurrentMap< + pub object_id_index: ConcurrentMap< ObjectId, Object, INDEX_FANOUT, @@ -69,15 +69,15 @@ impl ObjectCache { let HeapRecovery { heap, recovered_nodes, was_recovered } = recover(&config.path, LEAF_FANOUT)?; - let (node_id_index, indices) = initialize(&recovered_nodes, &heap); + let (object_id_index, indices) = initialize(&recovered_nodes, &heap); // validate recovery - for ObjectRecovery { node_id, collection_id, metadata } in + for ObjectRecovery { object_id, collection_id, metadata } in recovered_nodes { let index = indices.get(&collection_id).unwrap(); let node = index.get(&metadata).unwrap(); - assert_eq!(node.id, node_id); + assert_eq!(node.object_id, object_id); } if config.cache_capacity_bytes < 256 { @@ -96,7 +96,7 @@ impl ObjectCache { let pc = ObjectCache { config: config.clone(), - node_id_index, + object_id_index, cache_advisor: RefCell::new(CacheAdvisor::new( config.cache_capacity_bytes.max(256), config.entry_cache_percent.min(80), @@ -160,12 +160,21 @@ impl ObjectCache { } } - pub fn allocate_node(&self) -> Object { - let id = ObjectId(self.heap.allocate_object_id()); + pub fn allocate_node( + &self, + collection_id: CollectionId, + low_key: InlineArray, + ) -> Object { + let object_id = ObjectId(self.heap.allocate_object_id()); - let node = Object { id, inner: Arc::new(Some(Box::default()).into()) }; + let node = Object { + object_id, + collection_id, + low_key, + inner: Arc::new(Some(Box::default()).into()), + }; - self.node_id_index.insert(id, node.clone()); + self.object_id_index.insert(object_id, node.clone()); node } @@ -181,7 +190,7 @@ impl ObjectCache { pub fn install_dirty( &self, flush_epoch: FlushEpoch, - node_id: ObjectId, + object_id: ObjectId, dirty: Dirty, ) { // dirty can transition from: @@ -198,7 +207,7 @@ impl ObjectCache { // if the new Dirty is not final, we must assert // that the old value is also not final. - let old_dirty = self.dirty.insert((flush_epoch, node_id), dirty); + let old_dirty = self.dirty.insert((flush_epoch, object_id), dirty); if let Some(old) = old_dirty { assert!(!old.is_final_state(), @@ -214,16 +223,16 @@ impl ObjectCache { // this being called in the destructor. pub fn mark_access_and_evict( &self, - node_id: ObjectId, + object_id: ObjectId, size: usize, ) -> io::Result<()> { let mut ca = self.cache_advisor.borrow_mut(); - let to_evict = ca.accessed_reuse_buffer(node_id.0, size); + let to_evict = ca.accessed_reuse_buffer(object_id.0, size); for (node_to_evict, _rough_size) in to_evict { let node = if let Some(n) = - self.node_id_index.get(&ObjectId(*node_to_evict)) + self.object_id_index.get(&ObjectId(*node_to_evict)) { - if n.id.0 != *node_to_evict { + if n.object_id.0 != *node_to_evict { log::warn!("during cache eviction, node to evict did not match current occupant for {:?}", node_to_evict); continue; } @@ -275,12 +284,12 @@ impl ObjectCache { let mut evict_after_flush = vec![]; - for ((dirty_epoch, dirty_node_id), dirty_value_initial_read) in + for ((dirty_epoch, dirty_object_id), dirty_value_initial_read) in self.dirty.range(..flush_boundary) { let dirty_value = self .dirty - .remove(&(dirty_epoch, dirty_node_id)) + .remove(&(dirty_epoch, dirty_object_id)) .expect("violation of flush responsibility"); if let Dirty::NotYetSerialized { .. } = &dirty_value { @@ -304,18 +313,18 @@ impl ObjectCache { } match dirty_value { - Dirty::MergedAndDeleted { node_id, collection_id } => { + Dirty::MergedAndDeleted { object_id, collection_id } => { log::trace!( "MergedAndDeleted for {:?}, adding None to write_batch", - node_id + object_id ); write_batch.push(Update::Free { - node_id: dirty_node_id, + object_id: dirty_object_id, collection_id, }); } Dirty::CooperativelySerialized { - node_id: _, + object_id: _, collection_id, low_key, mutation_count: _, @@ -324,14 +333,14 @@ impl ObjectCache { Arc::make_mut(&mut data); let data = Arc::into_inner(data).unwrap(); write_batch.push(Update::Store { - node_id: dirty_node_id, + object_id: dirty_object_id, collection_id, metadata: low_key, data, }); } Dirty::NotYetSerialized { low_key, collection_id, node } => { - assert_eq!(dirty_node_id, node.id, "mismatched node ID for NotYetSerialized with low key {:?}", low_key); + assert_eq!(dirty_object_id, node.object_id, "mismatched node ID for NotYetSerialized with low key {:?}", low_key); let mut lock = node.inner.write(); let leaf_ref: &mut Leaf = @@ -350,7 +359,7 @@ impl ObjectCache { // mutated the leaf after encountering it being dirty for our epoch, after // storing a CooperativelySerialized in the dirty map. let dirty_value_2_opt = - self.dirty.remove(&(dirty_epoch, dirty_node_id)); + self.dirty.remove(&(dirty_epoch, dirty_object_id)); let dirty_value_2 = if let Some(dv2) = dirty_value_2_opt { @@ -361,7 +370,7 @@ impl ObjectCache { of expected cooperative serialization. leaf in question's \ dirty_flush_epoch is {:?}, our expected key was {:?}. node.deleted: {:?}", leaf_ref.dirty_flush_epoch, - (dirty_epoch, dirty_node_id), + (dirty_epoch, dirty_object_id), leaf_ref.deleted, ); @@ -373,11 +382,11 @@ impl ObjectCache { mutation_count: _, mut data, collection_id: ci2, - node_id: ni2, + object_id: ni2, } = dirty_value_2 { - assert_eq!(node.id, ni2); - assert_eq!(node.id, dirty_node_id); + assert_eq!(node.object_id, ni2); + assert_eq!(node.object_id, dirty_object_id); assert_eq!(low_key, low_key_2); assert_eq!(collection_id, ci2); Arc::make_mut(&mut data); @@ -388,7 +397,7 @@ impl ObjectCache { }; write_batch.push(Update::Store { - node_id: dirty_node_id, + object_id: dirty_object_id, collection_id: collection_id, metadata: low_key, data, @@ -437,12 +446,12 @@ impl ObjectCache { let epoch = self.check_into_flush_epoch(); /* - for (node_id, collection_id, low_key) in objects_to_defrag { + for (object_id, collection_id, low_key) in objects_to_defrag { // - let node = if let Some(node) = self.node_id_index.get(&node_id) { + let node = if let Some(node) = self.object_id_index.get(&object_id) { node } else { - log::error!("tried to get node for maintenance but it was not present in node_id_index"); + log::error!("tried to get node for maintenance but it was not present in object_id_index"); continue; }; @@ -455,7 +464,7 @@ impl ObjectCache { epoch.epoch(), node.id, Dirty::CooperativelySerialized { - node_id: node.id, + object_id: node.id, collection_id, low_key, mutation_count: leaf.mutation_count, @@ -486,17 +495,23 @@ fn initialize( ) { let mut trees: HashMap> = HashMap::new(); - let node_id_index: ConcurrentMap< + let object_id_index: ConcurrentMap< ObjectId, Object, INDEX_FANOUT, EBR_LOCAL_GC_BUFFER_SIZE, > = ConcurrentMap::default(); - for ObjectRecovery { node_id, collection_id, metadata } in recovered_nodes { - let node = Object { id: *node_id, inner: Arc::new(None.into()) }; + for ObjectRecovery { object_id, collection_id, metadata } in recovered_nodes + { + let node = Object { + object_id: *object_id, + collection_id: *collection_id, + low_key: metadata.clone(), + inner: Arc::new(None.into()), + }; - assert!(node_id_index.insert(*node_id, node.clone()).is_none()); + assert!(object_id_index.insert(*object_id, node.clone()).is_none()); let tree = trees.entry(*collection_id).or_default(); @@ -508,18 +523,22 @@ fn initialize( let tree = trees.entry(collection_id).or_default(); if tree.is_empty() { - let node_id = ObjectId(heap.allocate_object_id()); + let object_id = ObjectId(heap.allocate_object_id()); + + let initial_low_key = InlineArray::default(); let empty_node = Object { - id: node_id, + object_id, + collection_id, + low_key: initial_low_key.clone(), inner: Arc::new(Some(Box::default()).into()), }; - assert!(node_id_index - .insert(node_id, empty_node.clone()) + assert!(object_id_index + .insert(object_id, empty_node.clone()) .is_none()); - assert!(tree.insert(InlineArray::default(), empty_node).is_none()); + assert!(tree.insert(initial_low_key, empty_node).is_none()); } } @@ -527,5 +546,5 @@ fn initialize( assert!(tree.contains_key(&InlineArray::MIN)); } - (node_id_index, trees) + (object_id_index, trees) } diff --git a/src/tree.rs b/src/tree.rs index 274f173a..73244853 100644 --- a/src/tree.rs +++ b/src/tree.rs @@ -66,7 +66,7 @@ struct LeafReadGuard<'a, const LEAF_FANOUT: usize = 1024> { >, low_key: InlineArray, inner: &'a Tree, - node_id: ObjectId, + object_id: ObjectId, external_cache_access_and_eviction: bool, } @@ -81,7 +81,7 @@ impl<'a, const LEAF_FANOUT: usize> Drop for LeafReadGuard<'a, LEAF_FANOUT> { return; } if let Err(e) = - self.inner.cache.mark_access_and_evict(self.node_id, size) + self.inner.cache.mark_access_and_evict(self.object_id, size) { self.inner.set_error(&e); log::error!( @@ -114,7 +114,7 @@ impl<'a, const LEAF_FANOUT: usize> LeafWriteGuard<'a, LEAF_FANOUT> { mut self, ) -> (ObjectId, usize) { self.external_cache_access_and_eviction = true; - (self.node.id, self.leaf_write.as_ref().unwrap().in_memory_size) + (self.node.object_id, self.leaf_write.as_ref().unwrap().in_memory_size) } } @@ -130,7 +130,7 @@ impl<'a, const LEAF_FANOUT: usize> Drop for LeafWriteGuard<'a, LEAF_FANOUT> { return; } if let Err(e) = - self.inner.cache.mark_access_and_evict(self.node.id, size) + self.inner.cache.mark_access_and_evict(self.node.object_id, size) { self.inner.set_error(&e); log::error!("io error while paging out dirty data: {:?}", e); @@ -201,7 +201,7 @@ impl Tree { let (low_key, node) = self.index.get_lte(key).unwrap(); let mut write = node.inner.write_arc(); if write.is_none() { - let leaf_bytes = self.cache.read(node.id.0)?; + let leaf_bytes = self.cache.read(node.object_id.0)?; let leaf: Box> = Leaf::deserialize(&leaf_bytes).unwrap(); *write = Some(leaf); @@ -218,7 +218,7 @@ impl Tree { let size = leaf.in_memory_size; drop(write); log::trace!("key undershoot in page_in"); - self.cache.mark_access_and_evict(node.id, size)?; + self.cache.mark_access_and_evict(node.object_id, size)?; continue; } @@ -228,7 +228,7 @@ impl Tree { let size = leaf.in_memory_size; drop(write); log::trace!("key overshoot in page_in"); - self.cache.mark_access_and_evict(node.id, size)?; + self.cache.mark_access_and_evict(node.object_id, size)?; continue; } @@ -266,13 +266,13 @@ impl Tree { assert!(predecessor.deleted.is_none()); assert_eq!(predecessor.hi.as_ref(), Some(&leaf.lo)); - if leaf_guard.node.id.0 == 0 { + if leaf_guard.node.object_id.0 == 0 { assert!(leaf_guard.low_key.is_empty()); } log::trace!( "deleting empty node id {} with low key {:?} and high key {:?}", - leaf_guard.node.id.0, + leaf_guard.node.object_id.0, leaf.lo, leaf.hi ); @@ -283,21 +283,21 @@ impl Tree { leaf.deleted = Some(merge_epoch); self.index.remove(&leaf_guard.low_key).unwrap(); - self.cache.node_id_index.remove(&leaf_guard.node.id).unwrap(); + self.cache.object_id_index.remove(&leaf_guard.node.object_id).unwrap(); // NB: these updates must "happen" atomically in the same flush epoch self.cache.install_dirty( merge_epoch, - leaf_guard.node.id, + leaf_guard.node.object_id, Dirty::MergedAndDeleted { - node_id: leaf_guard.node.id, + object_id: leaf_guard.node.object_id, collection_id: self.collection_id, }, ); self.cache.install_dirty( merge_epoch, - predecessor_guard.node.id, + predecessor_guard.node.object_id, Dirty::NotYetSerialized { low_key: predecessor.lo.clone(), node: predecessor_guard.node.clone(), @@ -305,13 +305,13 @@ impl Tree { }, ); - let (p_node_id, p_sz) = + let (p_object_id, p_sz) = predecessor_guard.handle_cache_access_and_eviction_externally(); - let (s_node_id, s_sz) = + let (s_object_id, s_sz) = leaf_guard.handle_cache_access_and_eviction_externally(); - self.cache.mark_access_and_evict(p_node_id, p_sz)?; - self.cache.mark_access_and_evict(s_node_id, s_sz)?; + self.cache.mark_access_and_evict(p_object_id, p_sz)?; + self.cache.mark_access_and_evict(s_object_id, s_sz)?; Ok(()) } @@ -362,7 +362,7 @@ impl Tree { read = ArcRwLockWriteGuard::downgrade(write); } - if node.id.0 == 0 { + if node.object_id.0 == 0 { assert!(low_key.is_empty()); } @@ -370,7 +370,7 @@ impl Tree { leaf_read: ManuallyDrop::new(read), inner: self, low_key, - node_id: node.id, + object_id: node.object_id, external_cache_access_and_eviction: false, }; @@ -427,7 +427,7 @@ impl Tree { if old_flush_epoch != flush_epoch_guard.epoch() { log::trace!( "cooperatively flushing {:?} with dirty {:?} after checking into {:?}", - node.id, + node.object_id, old_flush_epoch, flush_epoch_guard.epoch() ); @@ -442,19 +442,19 @@ impl Tree { log::trace!( "D adding node {} to dirty {:?}", - node.id.0, + node.object_id.0, old_dirty_epoch ); - if node.id.0 == 0 { + if node.object_id.0 == 0 { assert!(leaf.lo.is_empty()); } self.cache.install_dirty( old_dirty_epoch, - node.id, + node.object_id, Dirty::CooperativelySerialized { - node_id: node.id, + object_id: node.object_id, collection_id: self.collection_id, low_key: leaf.lo.clone(), mutation_count: leaf.mutation_count, @@ -470,9 +470,6 @@ impl Tree { } } - if node.id.0 == 0 { - assert!(low_key.is_empty()); - } Ok(LeafWriteGuard { flush_epoch_guard, leaf_write: ManuallyDrop::new(write), @@ -564,22 +561,23 @@ impl Tree { leaf.in_memory_size.saturating_sub(old_size - new_size); } - let split = leaf.split_if_full(new_epoch, &self.cache); + let split = + leaf.split_if_full(new_epoch, &self.cache, self.collection_id); if split.is_some() || Some(value_ivec) != ret { leaf.mutation_count += 1; leaf.dirty_flush_epoch = Some(new_epoch); log::trace!( "F adding node {} to dirty {:?}", - leaf_guard.node.id.0, + leaf_guard.node.object_id.0, new_epoch ); - if leaf_guard.node.id.0 == 0 { + if leaf_guard.node.object_id.0 == 0 { assert!(leaf_guard.low_key.is_empty()); } self.cache.install_dirty( new_epoch, - leaf_guard.node.id, + leaf_guard.node.object_id, Dirty::NotYetSerialized { collection_id: self.collection_id, node: leaf_guard.node.clone(), @@ -590,19 +588,21 @@ impl Tree { if let Some((split_key, rhs_node)) = split { log::trace!( "G adding new from split {:?} to dirty {:?}", - rhs_node.id, + rhs_node.object_id, new_epoch ); - assert_ne!(rhs_node.id.0, 0); + assert_ne!(rhs_node.object_id.0, 0); assert!(!split_key.is_empty()); - self.cache.node_id_index.insert(rhs_node.id, rhs_node.clone()); + self.cache + .object_id_index + .insert(rhs_node.object_id, rhs_node.clone()); self.index.insert(split_key.clone(), rhs_node.clone()); self.cache.install_dirty( new_epoch, - rhs_node.id, + rhs_node.object_id, Dirty::NotYetSerialized { collection_id: self.collection_id, node: rhs_node, @@ -642,7 +642,7 @@ impl Tree { let key_ref = key.as_ref(); let mut leaf_guard = self.leaf_for_key_mut(key_ref)?; - if leaf_guard.node.id.0 == 0 { + if leaf_guard.node.object_id.0 == 0 { // TODO will break with collections so this is just an early stage scaffolding bit. assert!(leaf_guard.low_key.is_empty()); } @@ -662,13 +662,13 @@ impl Tree { log::trace!( "H adding node {} to dirty {:?}", - leaf_guard.node.id.0, + leaf_guard.node.object_id.0, new_epoch ); self.cache.install_dirty( new_epoch, - leaf_guard.node.id, + leaf_guard.node.object_id, Dirty::NotYetSerialized { collection_id: self.collection_id, low_key: leaf_guard.low_key.clone(), @@ -786,7 +786,8 @@ impl Tree { Err(CompareAndSwapError { current, proposed }) }; - let split = leaf.split_if_full(new_epoch, &self.cache); + let split = + leaf.split_if_full(new_epoch, &self.cache, self.collection_id); let split_happened = split.is_some(); if split_happened || ret.is_ok() { leaf.mutation_count += 1; @@ -794,16 +795,16 @@ impl Tree { leaf.dirty_flush_epoch = Some(new_epoch); log::trace!( "A adding node {} to dirty {:?}", - leaf_guard.node.id.0, + leaf_guard.node.object_id.0, new_epoch ); - if leaf_guard.node.id.0 == 0 { + if leaf_guard.node.object_id.0 == 0 { assert!(leaf_guard.low_key.is_empty()); } self.cache.install_dirty( new_epoch, - leaf_guard.node.id, + leaf_guard.node.object_id, Dirty::NotYetSerialized { collection_id: self.collection_id, node: leaf_guard.node.clone(), @@ -814,19 +815,21 @@ impl Tree { if let Some((split_key, rhs_node)) = split { log::trace!( "B adding new from split {:?} to dirty {:?}", - rhs_node.id, + rhs_node.object_id, new_epoch ); self.cache.install_dirty( new_epoch, - rhs_node.id, + rhs_node.object_id, Dirty::NotYetSerialized { collection_id: self.collection_id, node: rhs_node.clone(), low_key: split_key.clone(), }, ); - self.cache.node_id_index.insert(rhs_node.id, rhs_node.clone()); + self.cache + .object_id_index + .insert(rhs_node.object_id, rhs_node.clone()); self.index.insert(split_key, rhs_node); } @@ -1106,7 +1109,7 @@ impl Tree { log::trace!( "cooperatively flushing {:?} with dirty {:?} after checking into {:?}", - node.id, + node.object_id, old_flush_epoch, new_epoch ); @@ -1123,15 +1126,15 @@ impl Tree { log::trace!( "C adding node {} to dirty epoch {:?}", - node.id.0, + node.object_id.0, old_dirty_epoch ); self.cache.install_dirty( old_dirty_epoch, - node.id, + node.object_id, Dirty::CooperativelySerialized { - node_id: node.id, + object_id: node.object_id, collection_id: self.collection_id, mutation_count: leaf_ref.mutation_count, low_key: leaf.lo.clone(), @@ -1167,9 +1170,11 @@ impl Tree { if let Some(value) = value_opt { leaf.data.insert(key, value); - if let Some((split_key, rhs_node)) = - leaf.split_if_full(new_epoch, &self.cache) - { + if let Some((split_key, rhs_node)) = leaf.split_if_full( + new_epoch, + &self.cache, + self.collection_id, + ) { let write = rhs_node.inner.write_arc(); assert!(write.is_some()); @@ -1183,7 +1188,9 @@ impl Tree { // Make splits globally visible for (split_key, rhs_node) in splits { - self.cache.node_id_index.insert(rhs_node.id, rhs_node.clone()); + self.cache + .object_id_index + .insert(rhs_node.object_id, rhs_node.clone()); self.index.insert(split_key, rhs_node); } @@ -1193,10 +1200,10 @@ impl Tree { let leaf = write.as_mut().unwrap(); leaf.dirty_flush_epoch = Some(new_epoch); leaf.mutation_count += 1; - cache_accesses.push((node.id, leaf.in_memory_size)); + cache_accesses.push((node.object_id, leaf.in_memory_size)); self.cache.install_dirty( new_epoch, - node.id, + node.object_id, Dirty::NotYetSerialized { collection_id: self.collection_id, node: node.clone(), @@ -1209,8 +1216,8 @@ impl Tree { drop(acquired_locks); // Perform cache maintenance - for (node_id, size) in cache_accesses { - self.cache.mark_access_and_evict(node_id, size)?; + for (object_id, size) in cache_accesses { + self.cache.mark_access_and_evict(object_id, size)?; } Ok(()) @@ -1953,6 +1960,7 @@ impl Leaf { &mut self, new_epoch: FlushEpoch, allocator: &ObjectCache, + collection_id: CollectionId, ) -> Option<(InlineArray, Object)> { if self.data.is_full() { // split @@ -2011,7 +2019,9 @@ impl Leaf { self.set_in_memory_size(); let rhs_node = Object { - id: ObjectId(rhs_id), + object_id: ObjectId(rhs_id), + collection_id, + low_key: split_key.clone(), inner: Arc::new(Some(Box::new(rhs)).into()), };