Check-in initial GC object rewriting logic

This commit is contained in:
Tyler Neely 2023-10-28 20:27:12 +02:00
parent b765c2fa1c
commit af4ec89316
9 changed files with 473 additions and 238 deletions

View File

@ -20,6 +20,7 @@ trait Databench: Clone + Send {
const NAME: &'static str;
const PATH: &'static str;
fn open() -> Self;
fn remove_generic(&self, key: &[u8]);
fn insert_generic(&self, key: &[u8], value: &[u8]);
fn get_generic(&self, key: &[u8]) -> Option<Self::READ>;
fn flush_generic(&self);
@ -47,6 +48,9 @@ impl Databench for Db {
fn insert_generic(&self, key: &[u8], value: &[u8]) {
self.insert(key, value).unwrap();
}
fn remove_generic(&self, key: &[u8]) {
self.remove(key).unwrap();
}
fn get_generic(&self, key: &[u8]) -> Option<Self::READ> {
self.get(key).unwrap()
}
@ -284,6 +288,56 @@ fn inserts<D: Databench>(store: &D) -> Vec<InsertStats> {
ret
}
fn removes<D: Databench>(store: &D) -> Vec<RemoveStats> {
println!("{} removals", D::NAME);
let mut i = 0_u32;
let factory = move || {
i += 1;
(store.clone(), i - 1)
};
let f = |state: (D, u32)| {
let (store, offset) = state;
let start = N_WRITES_PER_THREAD * offset;
let end = N_WRITES_PER_THREAD * (offset + 1);
for i in start..end {
let k: &[u8] = &i.to_be_bytes();
store.remove_generic(k);
}
};
let mut ret = vec![];
for concurrency in CONCURRENCY {
let remove_elapsed =
execute_lockstep_concurrent(factory, f, *concurrency);
let flush_timer = Instant::now();
store.flush_generic();
let wps = (N_WRITES_PER_THREAD * *concurrency as u32) as u64
* 1_000_000_u64
/ u64::try_from(remove_elapsed.as_micros().max(1))
.unwrap_or(u64::MAX);
ret.push(RemoveStats {
thread_count: *concurrency,
removes_per_second: wps,
});
println!(
"{} removes/s with {concurrency} threads over {:?}, then {:?} to flush {}",
wps.to_formatted_string(&Locale::en),
remove_elapsed,
flush_timer.elapsed(),
D::NAME,
);
}
ret
}
fn gets<D: Databench>(store: &D) -> Vec<GetStats> {
println!("{} reads", D::NAME);
@ -371,22 +425,34 @@ struct GetStats {
gets_per_second: u64,
}
#[derive(Debug, Clone, Copy)]
struct RemoveStats {
thread_count: usize,
removes_per_second: u64,
}
#[allow(unused)]
#[derive(Debug, Clone)]
struct Stats {
disk_space: u64,
post_insert_disk_space: u64,
post_remove_disk_space: u64,
allocated_memory: usize,
freed_memory: usize,
resident_memory: usize,
insert_stats: Vec<InsertStats>,
get_stats: Vec<GetStats>,
remove_stats: Vec<RemoveStats>,
}
impl Stats {
fn print_report(&self) {
println!(
"bytes on disk: {}",
self.disk_space.to_formatted_string(&Locale::en)
"bytes on disk after inserts: {}",
self.post_insert_disk_space.to_formatted_string(&Locale::en)
);
println!(
"bytes on disk after removes: {}",
self.post_remove_disk_space.to_formatted_string(&Locale::en)
);
println!(
"bytes in memory: {}",
@ -406,6 +472,13 @@ impl Stats {
stats.gets_per_second.to_formatted_string(&Locale::en)
);
}
for stats in &self.remove_stats {
println!(
"{} threads {} removes per second",
stats.thread_count,
stats.removes_per_second.to_formatted_string(&Locale::en)
);
}
}
}
@ -418,15 +491,21 @@ fn bench<D: Databench>() -> Stats {
store.flush_generic();
println!("final flush took {:?} for {}", before_flush.elapsed(), D::NAME);
let post_insert_disk_space = du(D::PATH.as_ref()).unwrap();
let get_stats = gets(&store);
let remove_stats = removes(&store);
Stats {
disk_space: du(D::PATH.as_ref()).unwrap(),
post_insert_disk_space,
post_remove_disk_space: du(D::PATH.as_ref()).unwrap(),
allocated_memory: allocated(),
freed_memory: freed(),
resident_memory: resident(),
insert_stats,
get_stats,
remove_stats,
}
}

View File

@ -37,6 +37,9 @@ pub struct Config {
/// `Config::tmp`, and will remove the storage directory
/// when the final Arc drops.
pub tempdir_deleter: Option<Arc<TempDir>>,
/// A float between 0.0 and 1.0 that controls how much fragmentation can
/// exist in a file before GC attempts to recompact it.
pub target_heap_file_fill_ratio: f32,
}
impl Default for Config {
@ -48,6 +51,7 @@ impl Default for Config {
entry_cache_percent: 20,
zstd_compression_level: 3,
tempdir_deleter: None,
target_heap_file_fill_ratio: 0.9,
}
}
}

View File

@ -13,8 +13,8 @@ use fs2::FileExt as _;
use parking_lot::Mutex;
use rayon::prelude::*;
use crate::object_location_map::ObjectLocationMap;
use crate::{Allocator, CollectionId, DeferredFree, MetadataStore, ObjectId};
use crate::object_location_mapper::ObjectLocationMapper;
use crate::{CollectionId, Config, DeferredFree, MetadataStore, ObjectId};
const WARN: &str = "DO_NOT_PUT_YOUR_FILES_HERE";
pub(crate) const N_SLABS: usize = 78;
@ -131,9 +131,6 @@ pub use inline_array::InlineArray;
#[derive(Debug, Clone)]
pub struct Stats {}
#[derive(Debug, Clone)]
pub struct Config {}
#[derive(Debug)]
pub(crate) struct ObjectRecovery {
pub object_id: ObjectId,
@ -262,6 +259,7 @@ impl PersistentSettings {
pub(crate) fn recover<P: AsRef<Path>>(
path: P,
leaf_fanout: usize,
config: &Config,
) -> io::Result<HeapRecovery> {
let path = path.as_ref();
log::trace!("recovering Heap at {:?}", path);
@ -293,25 +291,22 @@ pub(crate) fn recover<P: AsRef<Path>>(
let (metadata_store, recovered_metadata) =
MetadataStore::recover(path.join("metadata"))?;
let table = ObjectLocationMap::default();
let table = ObjectLocationMapper::new(
&recovered_metadata,
config.target_heap_file_fill_ratio,
);
let mut recovered_nodes =
Vec::<ObjectRecovery>::with_capacity(recovered_metadata.len());
let mut object_ids: FnvHashSet<u64> = Default::default();
let mut slots_per_slab: [FnvHashSet<u64>; N_SLABS] =
core::array::from_fn(|_| Default::default());
for update_metadata in recovered_metadata {
match update_metadata {
UpdateMetadata::Store {
object_id,
collection_id,
location,
location: _,
metadata,
} => {
object_ids.insert(object_id.0);
let slab_address = SlabAddress::from(location);
slots_per_slab[slab_address.slab_id as usize]
.insert(slab_address.slot());
table.insert(object_id, slab_address);
recovered_nodes.push(ObjectRecovery {
object_id,
collection_id,
@ -333,13 +328,7 @@ pub(crate) fn recover<P: AsRef<Path>>(
let file = fallible!(slab_opts.open(slab_path));
slabs.push(Slab {
slot_size,
file,
slot_allocator: Arc::new(Allocator::from_allocated(
&slots_per_slab[i],
)),
})
slabs.push(Slab { slot_size, file })
}
log::info!("recovery of Heap at {:?} complete", path);
@ -348,9 +337,6 @@ pub(crate) fn recover<P: AsRef<Path>>(
heap: Heap {
slabs: Arc::new(slabs.try_into().unwrap()),
path: path.into(),
object_id_allocator: Arc::new(Allocator::from_allocated(
&object_ids,
)),
table,
global_error: metadata_store.get_global_error_arc(),
metadata_store: Arc::new(metadata_store),
@ -363,14 +349,14 @@ pub(crate) fn recover<P: AsRef<Path>>(
})
}
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub(crate) struct SlabAddress {
slab_id: u8,
slab_slot: [u8; 7],
}
impl SlabAddress {
fn from_slab_slot(slab: u8, slot: u64) -> SlabAddress {
pub(crate) fn from_slab_slot(slab: u8, slot: u64) -> SlabAddress {
let slot_bytes = slot.to_be_bytes();
assert_eq!(slot_bytes[0], 0);
@ -381,11 +367,13 @@ impl SlabAddress {
}
}
pub fn slab(&self) -> u8 {
#[inline]
pub const fn slab(&self) -> u8 {
self.slab_id
}
pub fn slot(&self) -> u64 {
#[inline]
pub const fn slot(&self) -> u64 {
u64::from_be_bytes([
0,
self.slab_slot[0],
@ -512,7 +500,6 @@ mod sys_io {
struct Slab {
file: fs::File,
slot_size: usize,
slot_allocator: Arc<Allocator>,
}
impl Slab {
@ -674,8 +661,7 @@ impl UpdateMetadata {
pub(crate) struct Heap {
path: PathBuf,
slabs: Arc<[Slab; N_SLABS]>,
table: ObjectLocationMap,
object_id_allocator: Arc<Allocator>,
table: ObjectLocationMapper,
metadata_store: Arc<MetadataStore>,
free_ebr: Ebr<DeferredFree, 1>,
global_error: Arc<AtomicPtr<(io::ErrorKind, String)>>,
@ -739,25 +725,26 @@ impl Heap {
pub fn write_batch(&self, batch: Vec<Update>) -> io::Result<()> {
self.check_error()?;
let atomicity_mu = self.write_batch_atomicity_mutex.lock();
let atomicity_mu = self.write_batch_atomicity_mutex.try_lock()
.expect("write_batch called concurrently! major correctness assumpiton violated");
let mut guard = self.free_ebr.pin();
let slabs = &self.slabs;
let table = &self.table;
let map_closure = |update: Update| match update {
Update::Store { object_id, collection_id, metadata, data } => {
let slab_id = slab_for_size(data.len());
let slab = &slabs[usize::from(slab_id)];
let slot = slab.slot_allocator.allocate();
let new_location = SlabAddress::from_slab_slot(slab_id, slot);
let new_location = table.allocate_slab_slot(slab_id);
let new_location_nzu: NonZeroU64 = new_location.into();
let complete_durability_pipeline =
maybe!(slab.write(slot, data));
maybe!(slab.write(new_location.slot(), data));
if let Err(e) = complete_durability_pipeline {
// can immediately free slot as the
slab.slot_allocator.free(slot);
table.free_slab_slot(new_location);
return Err(e);
}
Ok(UpdateMetadata::Store {
@ -801,7 +788,7 @@ impl Heap {
}
UpdateMetadata::Free { object_id, .. } => {
guard.defer_drop(DeferredFree {
allocator: self.object_id_allocator.clone(),
allocator: self.table.clone_object_id_allocator_arc(),
freed_slot: object_id.0,
});
self.table.remove(object_id)
@ -810,9 +797,9 @@ impl Heap {
if let Some(last_address) = last_address_opt {
guard.defer_drop(DeferredFree {
allocator: self.slabs[usize::from(last_address.slab_id)]
.slot_allocator
.clone(),
allocator: self
.table
.clone_slab_allocator_arc(last_address.slab_id),
freed_slot: last_address.slot(),
});
}
@ -823,66 +810,11 @@ impl Heap {
Ok(())
}
pub fn allocate_object_id(&self) -> u64 {
self.object_id_allocator.allocate()
pub fn allocate_object_id(&self) -> ObjectId {
self.table.allocate_object_id()
}
pub fn maintenance(&self) {
let atomicity_mu = self.write_batch_atomicity_mutex.lock();
let mut guard = self.free_ebr.pin();
let to_defrag = self.table.objects_to_defrag();
let mut metadata_batch = vec![];
let mut allocations_to_free_on_failure = vec![];
for (object_id, old_location) in to_defrag {
let slab_id = old_location.slab();
let slab = &self.slabs[usize::from(slab_id)];
let fragmented_slot = old_location.slot();
// read
let data = match slab.read(fragmented_slot, &mut guard) {
Ok(data) => data,
Err(e) => {
// this doesn't need to propagate the error
// because this work is not correctness
// critical for anything, it's just a space
// optimization if it happens to work out.
log::error!("failed to read object during GC: {e:?}");
return;
}
};
// allocate new slot
let compacted_slot = slab.slot_allocator.allocate();
allocations_to_free_on_failure.push((slab_id, compacted_slot));
let new_location =
SlabAddress::from_slab_slot(slab_id, compacted_slot);
let new_location_nzu: NonZeroU64 = new_location.into();
let complete_durability_pipeline =
maybe!(slab.write(compacted_slot, data));
match complete_durability_pipeline {
Ok(()) => {
//
}
Err(e) => {
// free the whole batch and return
}
}
}
// try to write the metadata batch, freeing all allocated ids if we failed.
if let Err(e) = self.metadata_store.insert_batch(&metadata_batch) {
self.set_error(&e);
}
drop(atomicity_mu);
pub(crate) fn objects_to_defrag(&self) -> FnvHashSet<ObjectId> {
self.table.objects_to_defrag()
}
}

View File

@ -18,6 +18,46 @@ pub(crate) struct Allocator {
}
impl Allocator {
/// Intended primarily for heap slab slot allocators when performing GC.
///
/// If the slab is fragmented beyond the desired fill ratio, this returns
/// the range of offsets (min inclusive, max exclusive) that may be copied
/// into earlier free slots if they are currently occupied in order to
/// achieve the desired fragmentation ratio.
pub fn fragmentation_cutoff(
&self,
desired_ratio: f32,
) -> Option<(u64, u64)> {
let next_to_allocate = self.next_to_allocate.load(Ordering::Acquire);
if next_to_allocate == 0 {
return None;
}
let mut free = self.free_and_pending.lock();
while let Some(free_id) = self.free_queue.pop() {
free.push(Reverse(free_id));
}
let live_objects = next_to_allocate - free.len() as u64;
let actual_ratio = live_objects as f32 / next_to_allocate as f32;
log::trace!(
"fragmented_slots actual ratio: {actual_ratio}, free len: {}",
free.len()
);
if desired_ratio <= actual_ratio {
return None;
}
// calculate theoretical cut-off point, return everything past that
let min = (live_objects as f32 / desired_ratio) as u64;
let max = next_to_allocate;
assert!(min < max);
Some((min, max))
}
pub fn from_allocated(allocated: &FnvHashSet<u64>) -> Allocator {
let mut heap = BinaryHeap::<Reverse<u64>>::default();
let max = allocated.iter().copied().max();
@ -35,6 +75,16 @@ impl Allocator {
}
}
pub fn max_allocated(&self) -> Option<u64> {
let next = self.next_to_allocate.load(Ordering::Acquire);
if next == 0 {
None
} else {
Some(next - 1)
}
}
pub fn allocate(&self) -> u64 {
let mut free = self.free_and_pending.lock();
while let Some(free_id) = self.free_queue.pop() {

View File

@ -1,3 +1,9 @@
// TODO event log assertion for testing heap location bidirectional referential integrity
// TODO make ObjectId wrap NonZeroU64 so it's more clear when slab tenant PageTable has a 0 that
// it's unoccupied
// TODO ensure nothing "from the future" gets copied into earlier epochs during GC
// not sure if that's possible though?
// TODO concurrent serialization of NotYetSerialized dirty objects
// TODO collection_id on page_in checks - it needs to be pinned w/ heap's EBR?
// TODO after defrag, reduce self.tip while popping the max items in the free list
// TODO store low key and collection ID directly on Object
@ -19,6 +25,7 @@
// TODO measure space savings vs cost of zstd in metadata store
// TODO corrupted data extraction binary
// TODO if the crash_iter test panics, the test doesn't fail as expected
// TODO remove/make conditionally compiled for testing the explicit process aborts
mod config;
mod db;
@ -27,7 +34,7 @@ mod heap;
mod id_allocator;
mod metadata_store;
mod object_cache;
mod object_location_map;
mod object_location_mapper;
mod tree;
#[cfg(any(

View File

@ -67,7 +67,7 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
bool,
)> {
let HeapRecovery { heap, recovered_nodes, was_recovered } =
recover(&config.path, LEAF_FANOUT)?;
recover(&config.path, LEAF_FANOUT, config)?;
let (object_id_index, indices) = initialize(&recovered_nodes, &heap);
@ -165,7 +165,7 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
collection_id: CollectionId,
low_key: InlineArray,
) -> Object<LEAF_FANOUT> {
let object_id = ObjectId(self.heap.allocate_object_id());
let object_id = self.heap.allocate_object_id();
let node = Object {
object_id,
@ -179,7 +179,7 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
node
}
pub fn allocate_object_id(&self) -> u64 {
pub fn allocate_object_id(&self) -> ObjectId {
self.heap.allocate_object_id()
}
@ -278,6 +278,8 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
let flush_through_epoch: FlushEpoch =
this_vacant_notifier.wait_for_complete();
let mut objects_to_defrag = self.heap.objects_to_defrag();
log::trace!("performing flush");
let flush_boundary = (flush_through_epoch.increment(), ObjectId::MIN);
@ -287,6 +289,8 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
for ((dirty_epoch, dirty_object_id), dirty_value_initial_read) in
self.dirty.range(..flush_boundary)
{
objects_to_defrag.remove(&dirty_object_id);
let dirty_value = self
.dirty
.remove(&(dirty_epoch, dirty_object_id))
@ -412,6 +416,35 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
}
}
log::info!(
"objects to defrag (after flush loop): {}",
objects_to_defrag.len()
);
for fragmented_object_id in objects_to_defrag {
let object_opt = self.object_id_index.get(&fragmented_object_id);
let object = if let Some(object) = object_opt {
object
} else {
panic!("defragmenting object not found in object_id_index: {fragmented_object_id:?}");
};
let data = match self.heap.read(fragmented_object_id) {
Ok(data) => data,
Err(e) => {
log::error!("failed to read object during GC: {e:?}");
continue;
}
};
write_batch.push(Update::Store {
object_id: fragmented_object_id,
collection_id: object.collection_id,
metadata: object.low_key,
data,
});
}
let written_count = write_batch.len();
if written_count > 0 {
self.heap.write_batch(write_batch)?;
@ -436,14 +469,8 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
}
}
self.maintenance();
Ok(())
}
// maintenance is infallible because if it fails, it doesn't invalidate
// the durability already achieved earlier.
fn maintenance(&self) {}
}
fn initialize<const LEAF_FANOUT: usize>(
@ -488,7 +515,7 @@ fn initialize<const LEAF_FANOUT: usize>(
let tree = trees.entry(collection_id).or_default();
if tree.is_empty() {
let object_id = ObjectId(heap.allocate_object_id());
let object_id = heap.allocate_object_id();
let initial_low_key = InlineArray::default();

View File

@ -1,121 +0,0 @@
use std::num::NonZeroU64;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use pagetable::PageTable;
use crate::{
heap::{SlabAddress, N_SLABS},
ObjectId,
};
#[derive(Clone)]
struct SlabTenancy {
inner: Arc<[PageTable<AtomicU64>; N_SLABS]>,
}
impl Default for SlabTenancy {
fn default() -> SlabTenancy {
SlabTenancy {
inner: Arc::new(core::array::from_fn(|_| PageTable::default())),
}
}
}
#[derive(Clone, Default)]
pub(crate) struct ObjectLocationMap {
object_id_to_location: PageTable<AtomicU64>,
location_to_object_id: SlabTenancy,
}
impl ObjectLocationMap {
pub(crate) fn get_location_for_object(
&self,
object_id: ObjectId,
) -> crate::SlabAddress {
let location_u64 =
self.object_id_to_location.get(object_id.0).load(Ordering::Acquire);
let nzu = NonZeroU64::new(location_u64)
.expect("node location metadata not present in pagetable");
SlabAddress::from(nzu)
}
/// Returns the previous address for this object, if it is vacating one.
///
/// # Panics
///
/// Asserts that the new location is actually unoccupied. This is a major
/// correctness violation if that isn't true.
pub(crate) fn insert(
&self,
object_id: ObjectId,
new_location: SlabAddress,
) -> Option<SlabAddress> {
// insert into object_id_to_location
let location_nzu: NonZeroU64 = new_location.into();
let location_u64 = location_nzu.get();
let last_u64 = self
.object_id_to_location
.get(object_id.0)
.swap(location_u64, Ordering::Release);
let last_address_opt = if let Some(nzu) = NonZeroU64::new(last_u64) {
let last_address = SlabAddress::from(nzu);
Some(last_address)
} else {
None
};
// insert into location_to_object_id
let slab = new_location.slab();
let slot = new_location.slot();
let last_oid_at_location = self.location_to_object_id.inner
[usize::from(slab)]
.get(slot)
.swap(object_id.0, Ordering::Release);
assert_eq!(0, last_oid_at_location);
last_address_opt
}
/// Unmaps an object and returns its location.
///
/// # Panics
///
/// Asserts that the object was actually stored in a location.
pub(crate) fn remove(&self, object_id: ObjectId) -> Option<SlabAddress> {
let last_u64 = self
.object_id_to_location
.get(object_id.0)
.swap(0, Ordering::Release);
if let Some(nzu) = NonZeroU64::new(last_u64) {
let last_address = SlabAddress::from(nzu);
let slab = last_address.slab();
let slot = last_address.slot();
let last_oid_at_location = self.location_to_object_id.inner
[usize::from(slab)]
.get(slot)
.swap(0, Ordering::Release);
assert_eq!(object_id.0, last_oid_at_location);
Some(last_address)
} else {
None
}
}
pub(crate) fn objects_to_defrag(&self) -> Vec<(ObjectId, SlabAddress)> {
// TODO
//todo!()
vec![]
}
}

View File

@ -0,0 +1,257 @@
use std::num::NonZeroU64;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use fnv::FnvHashSet;
use pagetable::PageTable;
use crate::{
heap::{SlabAddress, UpdateMetadata, N_SLABS},
Allocator, ObjectId,
};
#[derive(Default)]
struct SlabTenancy {
slot_to_object_id: PageTable<AtomicU64>,
slot_allocator: Arc<Allocator>,
}
impl SlabTenancy {
// returns (ObjectId, slot index) pairs
fn objects_to_defrag(
&self,
target_fill_ratio: f32,
) -> Vec<(ObjectId, u64)> {
let (frag_min, frag_max) = if let Some(frag) =
self.slot_allocator.fragmentation_cutoff(target_fill_ratio)
{
frag
} else {
return vec![];
};
let mut ret = vec![];
for fragmented_slot in frag_min..frag_max {
let object_id = self
.slot_to_object_id
.get(fragmented_slot)
.load(Ordering::Acquire);
if object_id != 0 {
ret.push((ObjectId(object_id), fragmented_slot));
}
}
ret
}
}
#[derive(Clone)]
pub(crate) struct ObjectLocationMapper {
object_id_to_location: PageTable<AtomicU64>,
slab_tenancies: Arc<[SlabTenancy; N_SLABS]>,
object_id_allocator: Arc<Allocator>,
target_fill_ratio: f32,
}
impl ObjectLocationMapper {
pub(crate) fn new(
recovered_metadata: &[UpdateMetadata],
target_fill_ratio: f32,
) -> ObjectLocationMapper {
let mut ret = ObjectLocationMapper {
object_id_to_location: PageTable::default(),
slab_tenancies: Arc::new(core::array::from_fn(|_| {
SlabTenancy::default()
})),
object_id_allocator: Arc::default(),
target_fill_ratio,
};
let mut object_ids: FnvHashSet<u64> = Default::default();
let mut slots_per_slab: [FnvHashSet<u64>; N_SLABS] =
core::array::from_fn(|_| Default::default());
for update_metadata in recovered_metadata {
match update_metadata {
UpdateMetadata::Store {
object_id,
collection_id: _,
location,
metadata: _,
} => {
object_ids.insert(object_id.0);
let slab_address = SlabAddress::from(*location);
slots_per_slab[slab_address.slab() as usize]
.insert(slab_address.slot());
ret.insert(*object_id, slab_address);
}
UpdateMetadata::Free { .. } => {
unreachable!()
}
}
}
ret.object_id_allocator =
Arc::new(Allocator::from_allocated(&object_ids));
let slabs = Arc::get_mut(&mut ret.slab_tenancies).unwrap();
for i in 0..N_SLABS {
let slab = &mut slabs[i];
slab.slot_allocator =
Arc::new(Allocator::from_allocated(&slots_per_slab[i]));
}
ret
}
pub(crate) fn clone_object_id_allocator_arc(&self) -> Arc<Allocator> {
self.object_id_allocator.clone()
}
pub(crate) fn allocate_object_id(&self) -> ObjectId {
ObjectId(self.object_id_allocator.allocate())
}
pub(crate) fn clone_slab_allocator_arc(
&self,
slab_id: u8,
) -> Arc<Allocator> {
self.slab_tenancies[usize::from(slab_id)].slot_allocator.clone()
}
pub(crate) fn allocate_slab_slot(&self, slab_id: u8) -> SlabAddress {
let slot =
self.slab_tenancies[usize::from(slab_id)].slot_allocator.allocate();
SlabAddress::from_slab_slot(slab_id, slot)
}
pub(crate) fn free_slab_slot(&self, slab_address: SlabAddress) {
self.slab_tenancies[usize::from(slab_address.slab())]
.slot_allocator
.free(slab_address.slot())
}
pub(crate) fn get_location_for_object(
&self,
object_id: ObjectId,
) -> crate::SlabAddress {
let location_u64 =
self.object_id_to_location.get(object_id.0).load(Ordering::Acquire);
let nzu = NonZeroU64::new(location_u64)
.expect("node location metadata not present in pagetable");
SlabAddress::from(nzu)
}
/// Returns the previous address for this object, if it is vacating one.
///
/// # Panics
///
/// Asserts that the new location is actually unoccupied. This is a major
/// correctness violation if that isn't true.
pub(crate) fn insert(
&self,
object_id: ObjectId,
new_location: SlabAddress,
) -> Option<SlabAddress> {
// insert into object_id_to_location
let location_nzu: NonZeroU64 = new_location.into();
let location_u64 = location_nzu.get();
let last_u64 = self
.object_id_to_location
.get(object_id.0)
.swap(location_u64, Ordering::Release);
let last_address_opt = if let Some(nzu) = NonZeroU64::new(last_u64) {
let last_address = SlabAddress::from(nzu);
Some(last_address)
} else {
None
};
// insert into slab_tenancies
let slab = new_location.slab();
let slot = new_location.slot();
let last_oid_at_location = self.slab_tenancies[usize::from(slab)]
.slot_to_object_id
.get(slot)
.swap(object_id.0, Ordering::Release);
assert_eq!(0, last_oid_at_location);
last_address_opt
}
/// Unmaps an object and returns its location.
///
/// # Panics
///
/// Asserts that the object was actually stored in a location.
pub(crate) fn remove(&self, object_id: ObjectId) -> Option<SlabAddress> {
let last_u64 = self
.object_id_to_location
.get(object_id.0)
.swap(0, Ordering::Release);
if let Some(nzu) = NonZeroU64::new(last_u64) {
let last_address = SlabAddress::from(nzu);
let slab = last_address.slab();
let slot = last_address.slot();
let last_oid_at_location = self.slab_tenancies[usize::from(slab)]
.slot_to_object_id
.get(slot)
.swap(0, Ordering::Release);
assert_eq!(object_id.0, last_oid_at_location);
Some(last_address)
} else {
None
}
}
pub(crate) fn objects_to_defrag(&self) -> FnvHashSet<ObjectId> {
let mut ret = FnvHashSet::default();
for slab_id in 0..N_SLABS {
let slab = &self.slab_tenancies[usize::from(slab_id)];
for (object_id, slot) in
slab.objects_to_defrag(self.target_fill_ratio)
{
let sa = SlabAddress::from_slab_slot(
u8::try_from(slab_id).unwrap(),
slot,
);
let rt_sa = if let Some(rt_raw_sa) = NonZeroU64::new(
self.object_id_to_location
.get(object_id.0)
.load(Ordering::Acquire),
) {
SlabAddress::from(rt_raw_sa)
} else {
// object has been removed but its slot has not yet been freed,
// hopefully due to a deferred write
// TODO test that with a testing event log
continue;
};
if sa == rt_sa {
let newly_inserted = ret.insert(object_id);
assert!(newly_inserted, "{object_id:?} present multiple times across slab objects_to_defrag");
}
}
}
ret
}
}

View File

@ -2000,7 +2000,7 @@ impl<const LEAF_FANOUT: usize> Leaf<LEAF_FANOUT> {
let rhs_id = allocator.allocate_object_id();
log::trace!(
"split leaf {:?} at split key: {:?} into new node {} at {:?}",
"split leaf {:?} at split key: {:?} into new {:?} at {:?}",
self.lo,
split_key,
rhs_id,
@ -2024,7 +2024,7 @@ impl<const LEAF_FANOUT: usize> Leaf<LEAF_FANOUT> {
self.set_in_memory_size();
let rhs_node = Object {
object_id: ObjectId(rhs_id),
object_id: rhs_id,
collection_id,
low_key: split_key.clone(),
inner: Arc::new(Some(Box::new(rhs)).into()),