Standardize naming on ObjectId. Include CollectionId and low key in Object struct

This commit is contained in:
Tyler Neely 2023-10-03 16:33:07 +02:00
parent 4b1aea0e26
commit ce51eeb5e0
7 changed files with 191 additions and 151 deletions

View File

@ -232,11 +232,14 @@ impl<const LEAF_FANOUT: usize> Db<LEAF_FANOUT> {
collection_id
);
let empty_node = cache.allocate_node();
let initial_low_key = InlineArray::default();
let empty_node =
cache.allocate_node(collection_id, initial_low_key.clone());
let index = Index::default();
assert!(index.insert(InlineArray::default(), empty_node).is_none());
assert!(index.insert(initial_low_key, empty_node).is_none());
let tree = Tree::new(
collection_id,
@ -482,11 +485,14 @@ impl<const LEAF_FANOUT: usize> Db<LEAF_FANOUT> {
let collection_id =
CollectionId(self.collection_id_allocator.allocate());
let empty_node = self.cache.allocate_node();
let initial_low_key = InlineArray::default();
let empty_node =
self.cache.allocate_node(collection_id, initial_low_key.clone());
let index = Index::default();
assert!(index.insert(InlineArray::default(), empty_node).is_none());
assert!(index.insert(initial_low_key, empty_node).is_none());
let tree = Tree::new(
collection_id,

View File

@ -12,7 +12,7 @@ pub(crate) struct EventVerifier {
impl EventVerifier {
pub(crate) fn mark_flush(
&self,
node_id: ObjectId,
object_id: ObjectId,
flush_epoch: NonZeroU64,
mutation_count: u64,
) {
@ -20,13 +20,13 @@ impl EventVerifier {
let epoch_entry = flush_model.entry(flush_epoch).or_default();
let last = epoch_entry.insert(node_id, mutation_count);
let last = epoch_entry.insert(object_id, mutation_count);
assert_eq!(last, None);
}
pub(crate) fn mark_unexpected_flush_epoch(
&self,
node_id: ObjectId,
object_id: ObjectId,
flush_epoch: NonZeroU64,
mutation_count: u64,
) {
@ -34,7 +34,7 @@ impl EventVerifier {
// assert that this object+mutation count was
// already flushed
let epoch_entry = flush_model.entry(flush_epoch).or_default();
let flushed_mutation_count = epoch_entry.get(&node_id);
let flushed_mutation_count = epoch_entry.get(&object_id);
println!("checking!");
assert_eq!(Some(&mutation_count), flushed_mutation_count);
println!("good!");

View File

@ -135,7 +135,7 @@ pub struct Config {}
#[derive(Debug)]
pub(crate) struct ObjectRecovery {
pub node_id: ObjectId,
pub object_id: ObjectId,
pub collection_id: CollectionId,
pub metadata: InlineArray,
}
@ -295,24 +295,24 @@ pub(crate) fn recover<P: AsRef<Path>>(
let pt = ObjectLocationMap::default();
let mut recovered_nodes =
Vec::<ObjectRecovery>::with_capacity(recovered_metadata.len());
let mut node_ids: FnvHashSet<u64> = Default::default();
let mut object_ids: FnvHashSet<u64> = Default::default();
let mut slots_per_slab: [FnvHashSet<u64>; N_SLABS] =
core::array::from_fn(|_| Default::default());
for update_metadata in recovered_metadata {
match update_metadata {
UpdateMetadata::Store {
node_id,
object_id,
collection_id,
location,
metadata,
} => {
node_ids.insert(node_id.0);
object_ids.insert(object_id.0);
let slab_address = SlabAddress::from(location);
slots_per_slab[slab_address.slab_id as usize]
.insert(slab_address.slot());
pt.insert(node_id, slab_address);
pt.insert(object_id, slab_address);
recovered_nodes.push(ObjectRecovery {
node_id,
object_id,
collection_id,
metadata: metadata.clone(),
});
@ -347,7 +347,9 @@ pub(crate) fn recover<P: AsRef<Path>>(
heap: Heap {
slabs: Arc::new(slabs.try_into().unwrap()),
path: path.into(),
object_id_allocator: Arc::new(Allocator::from_allocated(&node_ids)),
object_id_allocator: Arc::new(Allocator::from_allocated(
&object_ids,
)),
pt,
global_error: metadata_store.get_global_error_arc(),
metadata_store: Arc::new(metadata_store),
@ -632,13 +634,13 @@ fn set_error(
#[derive(Debug)]
pub(crate) enum Update {
Store {
node_id: ObjectId,
object_id: ObjectId,
collection_id: CollectionId,
metadata: InlineArray,
data: Vec<u8>,
},
Free {
node_id: ObjectId,
object_id: ObjectId,
collection_id: CollectionId,
},
}
@ -646,22 +648,22 @@ pub(crate) enum Update {
#[derive(Debug, PartialOrd, Ord, PartialEq, Eq)]
pub(crate) enum UpdateMetadata {
Store {
node_id: ObjectId,
object_id: ObjectId,
collection_id: CollectionId,
metadata: InlineArray,
location: NonZeroU64,
},
Free {
node_id: ObjectId,
object_id: ObjectId,
collection_id: CollectionId,
},
}
impl UpdateMetadata {
pub fn node_id(&self) -> ObjectId {
pub fn object_id(&self) -> ObjectId {
match self {
UpdateMetadata::Store { node_id, .. }
| UpdateMetadata::Free { node_id, .. } => *node_id,
UpdateMetadata::Store { object_id, .. }
| UpdateMetadata::Free { object_id, .. } => *object_id,
}
}
}
@ -739,7 +741,7 @@ impl Heap {
let slabs = &self.slabs;
let map_closure = |update: Update| match update {
Update::Store { node_id, collection_id, metadata, data } => {
Update::Store { object_id, collection_id, metadata, data } => {
let slab_id = slab_for_size(data.len());
let slab = &slabs[usize::from(slab_id)];
let slot = slab.slot_allocator.allocate();
@ -755,14 +757,14 @@ impl Heap {
return Err(e);
}
Ok(UpdateMetadata::Store {
node_id,
object_id,
collection_id,
metadata,
location: new_location_nzu,
})
}
Update::Free { node_id, collection_id } => {
Ok(UpdateMetadata::Free { node_id, collection_id })
Update::Free { object_id, collection_id } => {
Ok(UpdateMetadata::Free { object_id, collection_id })
}
};
@ -790,15 +792,15 @@ impl Heap {
// reclaim previous disk locations for future writes
for update_metadata in metadata_batch {
let last_address_opt = match update_metadata {
UpdateMetadata::Store { node_id, location, .. } => {
self.pt.insert(node_id, SlabAddress::from(location))
UpdateMetadata::Store { object_id, location, .. } => {
self.pt.insert(object_id, SlabAddress::from(location))
}
UpdateMetadata::Free { node_id, .. } => {
UpdateMetadata::Free { object_id, .. } => {
guard.defer_drop(DeferredFree {
allocator: self.object_id_allocator.clone(),
freed_slot: node_id.0,
freed_slot: object_id.0,
});
self.pt.remove(node_id)
self.pt.remove(object_id)
}
};

View File

@ -1,9 +1,10 @@
// TODO store low key and collection ID directly on Object
// TODO make the Arc<Option<Box<Leaf just a single pointer chase w/ custom container
// TODO heap maintenance w/ speculative write followed by CAS in pt
// maybe with global writer lock that controls flushers too
// TODO implement create exclusive
// TODO test concurrent drop_tree when other threads are still using it
// TODO list trees test for recovering empty collections
// TODO make node_id_index private on ObjectCache
// TODO put aborts behind feature flags for hard crashes
// TODO allow waiting flusher to start collecting dirty pages as soon
// as it is evacuated - just wait until last flush is done before
@ -169,13 +170,15 @@ type CacheBox<const LEAF_FANOUT: usize> =
#[derive(Debug, Clone)]
struct Object<const LEAF_FANOUT: usize> {
// used for access in heap::Heap
id: ObjectId,
object_id: ObjectId,
collection_id: CollectionId,
low_key: InlineArray,
inner: CacheBox<LEAF_FANOUT>,
}
impl<const LEAF_FANOUT: usize> PartialEq for Object<LEAF_FANOUT> {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
self.object_id == other.object_id
}
}

View File

@ -426,12 +426,12 @@ fn serialize_batch(batch: &[UpdateMetadata]) -> Vec<u8> {
for update_metadata in batch {
match update_metadata {
UpdateMetadata::Store {
node_id,
object_id,
collection_id,
metadata,
location,
} => {
batch_encoder.write_all(&node_id.0.to_le_bytes()).unwrap();
batch_encoder.write_all(&object_id.0.to_le_bytes()).unwrap();
batch_encoder
.write_all(&collection_id.0.to_le_bytes())
.unwrap();
@ -441,8 +441,8 @@ fn serialize_batch(batch: &[UpdateMetadata]) -> Vec<u8> {
batch_encoder.write_all(&metadata_len.to_le_bytes()).unwrap();
batch_encoder.write_all(&metadata).unwrap();
}
UpdateMetadata::Free { node_id, collection_id } => {
batch_encoder.write_all(&node_id.0.to_le_bytes()).unwrap();
UpdateMetadata::Free { object_id, collection_id } => {
batch_encoder.write_all(&object_id.0.to_le_bytes()).unwrap();
batch_encoder
.write_all(&collection_id.0.to_le_bytes())
.unwrap();
@ -508,14 +508,14 @@ fn read_frame(
let mut decoder = ZstdDecoder::new(&reusable_frame_buffer[8..len + 8])
.expect("failed to create zstd decoder");
let mut node_id_buf: [u8; 8] = [0; 8];
let mut object_id_buf: [u8; 8] = [0; 8];
let mut collection_id_buf: [u8; 8] = [0; 8];
let mut location_buf: [u8; 8] = [0; 8];
let mut metadata_len_buf: [u8; 8] = [0; 8];
let mut metadata_buf = vec![];
loop {
let first_read_res = decoder
.read_exact(&mut node_id_buf)
.read_exact(&mut object_id_buf)
.and_then(|_| decoder.read_exact(&mut collection_id_buf))
.and_then(|_| decoder.read_exact(&mut location_buf))
.and_then(|_| decoder.read_exact(&mut metadata_len_buf));
@ -528,7 +528,7 @@ fn read_frame(
}
}
let node_id = ObjectId(u64::from_le_bytes(node_id_buf));
let object_id = ObjectId(u64::from_le_bytes(object_id_buf));
let collection_id = CollectionId(u64::from_le_bytes(collection_id_buf));
let location = u64::from_le_bytes(location_buf);
@ -548,13 +548,13 @@ fn read_frame(
let metadata = InlineArray::from(&*metadata_buf);
ret.push(UpdateMetadata::Store {
node_id,
object_id,
collection_id,
location: location_nzu,
metadata,
});
} else {
ret.push(UpdateMetadata::Free { node_id, collection_id });
ret.push(UpdateMetadata::Free { object_id, collection_id });
}
}
@ -576,7 +576,7 @@ fn read_log(
while let Ok(frame) = read_frame(&mut file, &mut reusable_frame_buffer) {
for update_metadata in frame {
ret.insert(update_metadata.node_id(), update_metadata);
ret.insert(update_metadata.object_id(), update_metadata);
}
}
@ -599,7 +599,7 @@ fn read_snapshot(
let frame: FnvHashMap<ObjectId, UpdateMetadata> = raw_frame
.into_iter()
.map(|update_metadata| (update_metadata.node_id(), update_metadata))
.map(|update_metadata| (update_metadata.object_id(), update_metadata))
.collect();
log::trace!("recovered {} items in snapshot {}", frame.len(), lsn);
@ -735,11 +735,11 @@ fn read_snapshot_and_apply_logs(
for (log_id, log_datum) in log_data_res? {
max_log_id = max_log_id.max(log_id);
for (node_id, update_metadata) in log_datum {
for (object_id, update_metadata) in log_datum {
if matches!(update_metadata, UpdateMetadata::Store { .. }) {
recovered.insert(node_id, update_metadata);
recovered.insert(object_id, update_metadata);
} else {
let _previous = recovered.remove(&node_id);
let _previous = recovered.remove(&object_id);
// TODO: assert!(previous.is_some());
}
}

View File

@ -18,14 +18,14 @@ pub(crate) enum Dirty<const LEAF_FANOUT: usize> {
collection_id: CollectionId,
},
CooperativelySerialized {
node_id: ObjectId,
object_id: ObjectId,
collection_id: CollectionId,
low_key: InlineArray,
data: Arc<Vec<u8>>,
mutation_count: u64,
},
MergedAndDeleted {
node_id: ObjectId,
object_id: ObjectId,
collection_id: CollectionId,
},
}
@ -44,7 +44,7 @@ impl<const LEAF_FANOUT: usize> Dirty<LEAF_FANOUT> {
pub(crate) struct ObjectCache<const LEAF_FANOUT: usize> {
pub config: Config,
global_error: Arc<AtomicPtr<(io::ErrorKind, String)>>,
pub node_id_index: ConcurrentMap<
pub object_id_index: ConcurrentMap<
ObjectId,
Object<LEAF_FANOUT>,
INDEX_FANOUT,
@ -69,15 +69,15 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
let HeapRecovery { heap, recovered_nodes, was_recovered } =
recover(&config.path, LEAF_FANOUT)?;
let (node_id_index, indices) = initialize(&recovered_nodes, &heap);
let (object_id_index, indices) = initialize(&recovered_nodes, &heap);
// validate recovery
for ObjectRecovery { node_id, collection_id, metadata } in
for ObjectRecovery { object_id, collection_id, metadata } in
recovered_nodes
{
let index = indices.get(&collection_id).unwrap();
let node = index.get(&metadata).unwrap();
assert_eq!(node.id, node_id);
assert_eq!(node.object_id, object_id);
}
if config.cache_capacity_bytes < 256 {
@ -96,7 +96,7 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
let pc = ObjectCache {
config: config.clone(),
node_id_index,
object_id_index,
cache_advisor: RefCell::new(CacheAdvisor::new(
config.cache_capacity_bytes.max(256),
config.entry_cache_percent.min(80),
@ -160,12 +160,21 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
}
}
pub fn allocate_node(&self) -> Object<LEAF_FANOUT> {
let id = ObjectId(self.heap.allocate_object_id());
pub fn allocate_node(
&self,
collection_id: CollectionId,
low_key: InlineArray,
) -> Object<LEAF_FANOUT> {
let object_id = ObjectId(self.heap.allocate_object_id());
let node = Object { id, inner: Arc::new(Some(Box::default()).into()) };
let node = Object {
object_id,
collection_id,
low_key,
inner: Arc::new(Some(Box::default()).into()),
};
self.node_id_index.insert(id, node.clone());
self.object_id_index.insert(object_id, node.clone());
node
}
@ -181,7 +190,7 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
pub fn install_dirty(
&self,
flush_epoch: FlushEpoch,
node_id: ObjectId,
object_id: ObjectId,
dirty: Dirty<LEAF_FANOUT>,
) {
// dirty can transition from:
@ -198,7 +207,7 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
// if the new Dirty is not final, we must assert
// that the old value is also not final.
let old_dirty = self.dirty.insert((flush_epoch, node_id), dirty);
let old_dirty = self.dirty.insert((flush_epoch, object_id), dirty);
if let Some(old) = old_dirty {
assert!(!old.is_final_state(),
@ -214,16 +223,16 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
// this being called in the destructor.
pub fn mark_access_and_evict(
&self,
node_id: ObjectId,
object_id: ObjectId,
size: usize,
) -> io::Result<()> {
let mut ca = self.cache_advisor.borrow_mut();
let to_evict = ca.accessed_reuse_buffer(node_id.0, size);
let to_evict = ca.accessed_reuse_buffer(object_id.0, size);
for (node_to_evict, _rough_size) in to_evict {
let node = if let Some(n) =
self.node_id_index.get(&ObjectId(*node_to_evict))
self.object_id_index.get(&ObjectId(*node_to_evict))
{
if n.id.0 != *node_to_evict {
if n.object_id.0 != *node_to_evict {
log::warn!("during cache eviction, node to evict did not match current occupant for {:?}", node_to_evict);
continue;
}
@ -275,12 +284,12 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
let mut evict_after_flush = vec![];
for ((dirty_epoch, dirty_node_id), dirty_value_initial_read) in
for ((dirty_epoch, dirty_object_id), dirty_value_initial_read) in
self.dirty.range(..flush_boundary)
{
let dirty_value = self
.dirty
.remove(&(dirty_epoch, dirty_node_id))
.remove(&(dirty_epoch, dirty_object_id))
.expect("violation of flush responsibility");
if let Dirty::NotYetSerialized { .. } = &dirty_value {
@ -304,18 +313,18 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
}
match dirty_value {
Dirty::MergedAndDeleted { node_id, collection_id } => {
Dirty::MergedAndDeleted { object_id, collection_id } => {
log::trace!(
"MergedAndDeleted for {:?}, adding None to write_batch",
node_id
object_id
);
write_batch.push(Update::Free {
node_id: dirty_node_id,
object_id: dirty_object_id,
collection_id,
});
}
Dirty::CooperativelySerialized {
node_id: _,
object_id: _,
collection_id,
low_key,
mutation_count: _,
@ -324,14 +333,14 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
Arc::make_mut(&mut data);
let data = Arc::into_inner(data).unwrap();
write_batch.push(Update::Store {
node_id: dirty_node_id,
object_id: dirty_object_id,
collection_id,
metadata: low_key,
data,
});
}
Dirty::NotYetSerialized { low_key, collection_id, node } => {
assert_eq!(dirty_node_id, node.id, "mismatched node ID for NotYetSerialized with low key {:?}", low_key);
assert_eq!(dirty_object_id, node.object_id, "mismatched node ID for NotYetSerialized with low key {:?}", low_key);
let mut lock = node.inner.write();
let leaf_ref: &mut Leaf<LEAF_FANOUT> =
@ -350,7 +359,7 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
// mutated the leaf after encountering it being dirty for our epoch, after
// storing a CooperativelySerialized in the dirty map.
let dirty_value_2_opt =
self.dirty.remove(&(dirty_epoch, dirty_node_id));
self.dirty.remove(&(dirty_epoch, dirty_object_id));
let dirty_value_2 = if let Some(dv2) = dirty_value_2_opt
{
@ -361,7 +370,7 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
of expected cooperative serialization. leaf in question's \
dirty_flush_epoch is {:?}, our expected key was {:?}. node.deleted: {:?}",
leaf_ref.dirty_flush_epoch,
(dirty_epoch, dirty_node_id),
(dirty_epoch, dirty_object_id),
leaf_ref.deleted,
);
@ -373,11 +382,11 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
mutation_count: _,
mut data,
collection_id: ci2,
node_id: ni2,
object_id: ni2,
} = dirty_value_2
{
assert_eq!(node.id, ni2);
assert_eq!(node.id, dirty_node_id);
assert_eq!(node.object_id, ni2);
assert_eq!(node.object_id, dirty_object_id);
assert_eq!(low_key, low_key_2);
assert_eq!(collection_id, ci2);
Arc::make_mut(&mut data);
@ -388,7 +397,7 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
};
write_batch.push(Update::Store {
node_id: dirty_node_id,
object_id: dirty_object_id,
collection_id: collection_id,
metadata: low_key,
data,
@ -437,12 +446,12 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
let epoch = self.check_into_flush_epoch();
/*
for (node_id, collection_id, low_key) in objects_to_defrag {
for (object_id, collection_id, low_key) in objects_to_defrag {
//
let node = if let Some(node) = self.node_id_index.get(&node_id) {
let node = if let Some(node) = self.object_id_index.get(&object_id) {
node
} else {
log::error!("tried to get node for maintenance but it was not present in node_id_index");
log::error!("tried to get node for maintenance but it was not present in object_id_index");
continue;
};
@ -455,7 +464,7 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
epoch.epoch(),
node.id,
Dirty::CooperativelySerialized {
node_id: node.id,
object_id: node.id,
collection_id,
low_key,
mutation_count: leaf.mutation_count,
@ -486,17 +495,23 @@ fn initialize<const LEAF_FANOUT: usize>(
) {
let mut trees: HashMap<CollectionId, Index<LEAF_FANOUT>> = HashMap::new();
let node_id_index: ConcurrentMap<
let object_id_index: ConcurrentMap<
ObjectId,
Object<LEAF_FANOUT>,
INDEX_FANOUT,
EBR_LOCAL_GC_BUFFER_SIZE,
> = ConcurrentMap::default();
for ObjectRecovery { node_id, collection_id, metadata } in recovered_nodes {
let node = Object { id: *node_id, inner: Arc::new(None.into()) };
for ObjectRecovery { object_id, collection_id, metadata } in recovered_nodes
{
let node = Object {
object_id: *object_id,
collection_id: *collection_id,
low_key: metadata.clone(),
inner: Arc::new(None.into()),
};
assert!(node_id_index.insert(*node_id, node.clone()).is_none());
assert!(object_id_index.insert(*object_id, node.clone()).is_none());
let tree = trees.entry(*collection_id).or_default();
@ -508,18 +523,22 @@ fn initialize<const LEAF_FANOUT: usize>(
let tree = trees.entry(collection_id).or_default();
if tree.is_empty() {
let node_id = ObjectId(heap.allocate_object_id());
let object_id = ObjectId(heap.allocate_object_id());
let initial_low_key = InlineArray::default();
let empty_node = Object {
id: node_id,
object_id,
collection_id,
low_key: initial_low_key.clone(),
inner: Arc::new(Some(Box::default()).into()),
};
assert!(node_id_index
.insert(node_id, empty_node.clone())
assert!(object_id_index
.insert(object_id, empty_node.clone())
.is_none());
assert!(tree.insert(InlineArray::default(), empty_node).is_none());
assert!(tree.insert(initial_low_key, empty_node).is_none());
}
}
@ -527,5 +546,5 @@ fn initialize<const LEAF_FANOUT: usize>(
assert!(tree.contains_key(&InlineArray::MIN));
}
(node_id_index, trees)
(object_id_index, trees)
}

View File

@ -66,7 +66,7 @@ struct LeafReadGuard<'a, const LEAF_FANOUT: usize = 1024> {
>,
low_key: InlineArray,
inner: &'a Tree<LEAF_FANOUT>,
node_id: ObjectId,
object_id: ObjectId,
external_cache_access_and_eviction: bool,
}
@ -81,7 +81,7 @@ impl<'a, const LEAF_FANOUT: usize> Drop for LeafReadGuard<'a, LEAF_FANOUT> {
return;
}
if let Err(e) =
self.inner.cache.mark_access_and_evict(self.node_id, size)
self.inner.cache.mark_access_and_evict(self.object_id, size)
{
self.inner.set_error(&e);
log::error!(
@ -114,7 +114,7 @@ impl<'a, const LEAF_FANOUT: usize> LeafWriteGuard<'a, LEAF_FANOUT> {
mut self,
) -> (ObjectId, usize) {
self.external_cache_access_and_eviction = true;
(self.node.id, self.leaf_write.as_ref().unwrap().in_memory_size)
(self.node.object_id, self.leaf_write.as_ref().unwrap().in_memory_size)
}
}
@ -130,7 +130,7 @@ impl<'a, const LEAF_FANOUT: usize> Drop for LeafWriteGuard<'a, LEAF_FANOUT> {
return;
}
if let Err(e) =
self.inner.cache.mark_access_and_evict(self.node.id, size)
self.inner.cache.mark_access_and_evict(self.node.object_id, size)
{
self.inner.set_error(&e);
log::error!("io error while paging out dirty data: {:?}", e);
@ -201,7 +201,7 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
let (low_key, node) = self.index.get_lte(key).unwrap();
let mut write = node.inner.write_arc();
if write.is_none() {
let leaf_bytes = self.cache.read(node.id.0)?;
let leaf_bytes = self.cache.read(node.object_id.0)?;
let leaf: Box<Leaf<LEAF_FANOUT>> =
Leaf::deserialize(&leaf_bytes).unwrap();
*write = Some(leaf);
@ -218,7 +218,7 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
let size = leaf.in_memory_size;
drop(write);
log::trace!("key undershoot in page_in");
self.cache.mark_access_and_evict(node.id, size)?;
self.cache.mark_access_and_evict(node.object_id, size)?;
continue;
}
@ -228,7 +228,7 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
let size = leaf.in_memory_size;
drop(write);
log::trace!("key overshoot in page_in");
self.cache.mark_access_and_evict(node.id, size)?;
self.cache.mark_access_and_evict(node.object_id, size)?;
continue;
}
@ -266,13 +266,13 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
assert!(predecessor.deleted.is_none());
assert_eq!(predecessor.hi.as_ref(), Some(&leaf.lo));
if leaf_guard.node.id.0 == 0 {
if leaf_guard.node.object_id.0 == 0 {
assert!(leaf_guard.low_key.is_empty());
}
log::trace!(
"deleting empty node id {} with low key {:?} and high key {:?}",
leaf_guard.node.id.0,
leaf_guard.node.object_id.0,
leaf.lo,
leaf.hi
);
@ -283,21 +283,21 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
leaf.deleted = Some(merge_epoch);
self.index.remove(&leaf_guard.low_key).unwrap();
self.cache.node_id_index.remove(&leaf_guard.node.id).unwrap();
self.cache.object_id_index.remove(&leaf_guard.node.object_id).unwrap();
// NB: these updates must "happen" atomically in the same flush epoch
self.cache.install_dirty(
merge_epoch,
leaf_guard.node.id,
leaf_guard.node.object_id,
Dirty::MergedAndDeleted {
node_id: leaf_guard.node.id,
object_id: leaf_guard.node.object_id,
collection_id: self.collection_id,
},
);
self.cache.install_dirty(
merge_epoch,
predecessor_guard.node.id,
predecessor_guard.node.object_id,
Dirty::NotYetSerialized {
low_key: predecessor.lo.clone(),
node: predecessor_guard.node.clone(),
@ -305,13 +305,13 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
},
);
let (p_node_id, p_sz) =
let (p_object_id, p_sz) =
predecessor_guard.handle_cache_access_and_eviction_externally();
let (s_node_id, s_sz) =
let (s_object_id, s_sz) =
leaf_guard.handle_cache_access_and_eviction_externally();
self.cache.mark_access_and_evict(p_node_id, p_sz)?;
self.cache.mark_access_and_evict(s_node_id, s_sz)?;
self.cache.mark_access_and_evict(p_object_id, p_sz)?;
self.cache.mark_access_and_evict(s_object_id, s_sz)?;
Ok(())
}
@ -362,7 +362,7 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
read = ArcRwLockWriteGuard::downgrade(write);
}
if node.id.0 == 0 {
if node.object_id.0 == 0 {
assert!(low_key.is_empty());
}
@ -370,7 +370,7 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
leaf_read: ManuallyDrop::new(read),
inner: self,
low_key,
node_id: node.id,
object_id: node.object_id,
external_cache_access_and_eviction: false,
};
@ -427,7 +427,7 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
if old_flush_epoch != flush_epoch_guard.epoch() {
log::trace!(
"cooperatively flushing {:?} with dirty {:?} after checking into {:?}",
node.id,
node.object_id,
old_flush_epoch,
flush_epoch_guard.epoch()
);
@ -442,19 +442,19 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
log::trace!(
"D adding node {} to dirty {:?}",
node.id.0,
node.object_id.0,
old_dirty_epoch
);
if node.id.0 == 0 {
if node.object_id.0 == 0 {
assert!(leaf.lo.is_empty());
}
self.cache.install_dirty(
old_dirty_epoch,
node.id,
node.object_id,
Dirty::CooperativelySerialized {
node_id: node.id,
object_id: node.object_id,
collection_id: self.collection_id,
low_key: leaf.lo.clone(),
mutation_count: leaf.mutation_count,
@ -470,9 +470,6 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
}
}
if node.id.0 == 0 {
assert!(low_key.is_empty());
}
Ok(LeafWriteGuard {
flush_epoch_guard,
leaf_write: ManuallyDrop::new(write),
@ -564,22 +561,23 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
leaf.in_memory_size.saturating_sub(old_size - new_size);
}
let split = leaf.split_if_full(new_epoch, &self.cache);
let split =
leaf.split_if_full(new_epoch, &self.cache, self.collection_id);
if split.is_some() || Some(value_ivec) != ret {
leaf.mutation_count += 1;
leaf.dirty_flush_epoch = Some(new_epoch);
log::trace!(
"F adding node {} to dirty {:?}",
leaf_guard.node.id.0,
leaf_guard.node.object_id.0,
new_epoch
);
if leaf_guard.node.id.0 == 0 {
if leaf_guard.node.object_id.0 == 0 {
assert!(leaf_guard.low_key.is_empty());
}
self.cache.install_dirty(
new_epoch,
leaf_guard.node.id,
leaf_guard.node.object_id,
Dirty::NotYetSerialized {
collection_id: self.collection_id,
node: leaf_guard.node.clone(),
@ -590,19 +588,21 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
if let Some((split_key, rhs_node)) = split {
log::trace!(
"G adding new from split {:?} to dirty {:?}",
rhs_node.id,
rhs_node.object_id,
new_epoch
);
assert_ne!(rhs_node.id.0, 0);
assert_ne!(rhs_node.object_id.0, 0);
assert!(!split_key.is_empty());
self.cache.node_id_index.insert(rhs_node.id, rhs_node.clone());
self.cache
.object_id_index
.insert(rhs_node.object_id, rhs_node.clone());
self.index.insert(split_key.clone(), rhs_node.clone());
self.cache.install_dirty(
new_epoch,
rhs_node.id,
rhs_node.object_id,
Dirty::NotYetSerialized {
collection_id: self.collection_id,
node: rhs_node,
@ -642,7 +642,7 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
let key_ref = key.as_ref();
let mut leaf_guard = self.leaf_for_key_mut(key_ref)?;
if leaf_guard.node.id.0 == 0 {
if leaf_guard.node.object_id.0 == 0 {
// TODO will break with collections so this is just an early stage scaffolding bit.
assert!(leaf_guard.low_key.is_empty());
}
@ -662,13 +662,13 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
log::trace!(
"H adding node {} to dirty {:?}",
leaf_guard.node.id.0,
leaf_guard.node.object_id.0,
new_epoch
);
self.cache.install_dirty(
new_epoch,
leaf_guard.node.id,
leaf_guard.node.object_id,
Dirty::NotYetSerialized {
collection_id: self.collection_id,
low_key: leaf_guard.low_key.clone(),
@ -786,7 +786,8 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
Err(CompareAndSwapError { current, proposed })
};
let split = leaf.split_if_full(new_epoch, &self.cache);
let split =
leaf.split_if_full(new_epoch, &self.cache, self.collection_id);
let split_happened = split.is_some();
if split_happened || ret.is_ok() {
leaf.mutation_count += 1;
@ -794,16 +795,16 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
leaf.dirty_flush_epoch = Some(new_epoch);
log::trace!(
"A adding node {} to dirty {:?}",
leaf_guard.node.id.0,
leaf_guard.node.object_id.0,
new_epoch
);
if leaf_guard.node.id.0 == 0 {
if leaf_guard.node.object_id.0 == 0 {
assert!(leaf_guard.low_key.is_empty());
}
self.cache.install_dirty(
new_epoch,
leaf_guard.node.id,
leaf_guard.node.object_id,
Dirty::NotYetSerialized {
collection_id: self.collection_id,
node: leaf_guard.node.clone(),
@ -814,19 +815,21 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
if let Some((split_key, rhs_node)) = split {
log::trace!(
"B adding new from split {:?} to dirty {:?}",
rhs_node.id,
rhs_node.object_id,
new_epoch
);
self.cache.install_dirty(
new_epoch,
rhs_node.id,
rhs_node.object_id,
Dirty::NotYetSerialized {
collection_id: self.collection_id,
node: rhs_node.clone(),
low_key: split_key.clone(),
},
);
self.cache.node_id_index.insert(rhs_node.id, rhs_node.clone());
self.cache
.object_id_index
.insert(rhs_node.object_id, rhs_node.clone());
self.index.insert(split_key, rhs_node);
}
@ -1106,7 +1109,7 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
log::trace!(
"cooperatively flushing {:?} with dirty {:?} after checking into {:?}",
node.id,
node.object_id,
old_flush_epoch,
new_epoch
);
@ -1123,15 +1126,15 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
log::trace!(
"C adding node {} to dirty epoch {:?}",
node.id.0,
node.object_id.0,
old_dirty_epoch
);
self.cache.install_dirty(
old_dirty_epoch,
node.id,
node.object_id,
Dirty::CooperativelySerialized {
node_id: node.id,
object_id: node.object_id,
collection_id: self.collection_id,
mutation_count: leaf_ref.mutation_count,
low_key: leaf.lo.clone(),
@ -1167,9 +1170,11 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
if let Some(value) = value_opt {
leaf.data.insert(key, value);
if let Some((split_key, rhs_node)) =
leaf.split_if_full(new_epoch, &self.cache)
{
if let Some((split_key, rhs_node)) = leaf.split_if_full(
new_epoch,
&self.cache,
self.collection_id,
) {
let write = rhs_node.inner.write_arc();
assert!(write.is_some());
@ -1183,7 +1188,9 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
// Make splits globally visible
for (split_key, rhs_node) in splits {
self.cache.node_id_index.insert(rhs_node.id, rhs_node.clone());
self.cache
.object_id_index
.insert(rhs_node.object_id, rhs_node.clone());
self.index.insert(split_key, rhs_node);
}
@ -1193,10 +1200,10 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
let leaf = write.as_mut().unwrap();
leaf.dirty_flush_epoch = Some(new_epoch);
leaf.mutation_count += 1;
cache_accesses.push((node.id, leaf.in_memory_size));
cache_accesses.push((node.object_id, leaf.in_memory_size));
self.cache.install_dirty(
new_epoch,
node.id,
node.object_id,
Dirty::NotYetSerialized {
collection_id: self.collection_id,
node: node.clone(),
@ -1209,8 +1216,8 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
drop(acquired_locks);
// Perform cache maintenance
for (node_id, size) in cache_accesses {
self.cache.mark_access_and_evict(node_id, size)?;
for (object_id, size) in cache_accesses {
self.cache.mark_access_and_evict(object_id, size)?;
}
Ok(())
@ -1953,6 +1960,7 @@ impl<const LEAF_FANOUT: usize> Leaf<LEAF_FANOUT> {
&mut self,
new_epoch: FlushEpoch,
allocator: &ObjectCache<LEAF_FANOUT>,
collection_id: CollectionId,
) -> Option<(InlineArray, Object<LEAF_FANOUT>)> {
if self.data.is_full() {
// split
@ -2011,7 +2019,9 @@ impl<const LEAF_FANOUT: usize> Leaf<LEAF_FANOUT> {
self.set_in_memory_size();
let rhs_node = Object {
id: ObjectId(rhs_id),
object_id: ObjectId(rhs_id),
collection_id,
low_key: split_key.clone(),
inner: Arc::new(Some(Box::new(rhs)).into()),
};