A large number of improvements towards on-disk file GC

This commit is contained in:
Tyler Neely 2023-11-12 00:00:42 +01:00
parent fe48530576
commit cb44c1f70f
9 changed files with 115 additions and 34 deletions

View File

@ -35,7 +35,7 @@ bincode = "1.3.3"
cache-advisor = "1.0.16"
concurrent-map = { version = "5.0.31", features = ["serde"] }
crc32fast = "1.3.2"
ebr = "0.2.9"
ebr = "0.2.11"
inline-array = { version = "0.1.12", features = ["serde", "concurrent_map_minimum"] }
fs2 = "0.4.3"
log = "0.4.19"

View File

@ -37,6 +37,10 @@ pub(crate) struct Completion {
}
impl Completion {
pub fn epoch(&self) -> FlushEpoch {
self.epoch
}
pub fn new(epoch: FlushEpoch) -> Completion {
Completion { mu: Default::default(), cv: Default::default(), epoch }
}
@ -107,7 +111,7 @@ pub(crate) struct EpochTracker {
#[derive(Clone, Debug)]
pub(crate) struct FlushEpochTracker {
active_ebr: ebr::Ebr<Box<EpochTracker>>,
active_ebr: ebr::Ebr<Box<EpochTracker>, 16, 16>,
inner: Arc<FlushEpochInner>,
}
@ -223,6 +227,10 @@ impl FlushEpochTracker {
}
}
}
pub fn manually_advance_epoch(&self) {
self.active_ebr.manually_advance_epoch();
}
}
#[test]

View File

@ -130,11 +130,13 @@ pub use inline_array::InlineArray;
#[derive(Debug, Clone, Copy)]
pub struct Stats {
pub flushes: u64,
pub objects_allocated: u64,
pub objects_freed: u64,
pub heap_slots_allocated: u64,
pub heap_slots_freed: u64,
pub compacted_heap_slots: u64,
pub tree_leaves_merged: u64,
}
#[derive(Debug)]
@ -512,7 +514,7 @@ impl Slab {
fn read(
&self,
slot: u64,
_guard: &mut Guard<'_, DeferredFree, 1>,
_guard: &mut Guard<'_, DeferredFree, 16, 16>,
) -> io::Result<Vec<u8>> {
let mut data = Vec::with_capacity(self.slot_size);
unsafe {
@ -669,7 +671,7 @@ pub(crate) struct Heap {
slabs: Arc<[Slab; N_SLABS]>,
table: ObjectLocationMapper,
metadata_store: Arc<MetadataStore>,
free_ebr: Ebr<DeferredFree, 1>,
free_ebr: Ebr<DeferredFree, 16, 16>,
global_error: Arc<AtomicPtr<(io::ErrorKind, String)>>,
#[allow(unused)]
directory_lock: Arc<fs::File>,
@ -708,6 +710,10 @@ impl Heap {
set_error(&self.global_error, error);
}
pub fn manually_advance_epoch(&self) {
self.free_ebr.manually_advance_epoch();
}
pub fn stats(&self) -> Stats {
self.table.stats()
}

View File

@ -79,16 +79,6 @@ impl Allocator {
}
}
pub fn max_allocated(&self) -> Option<u64> {
let next = self.next_to_allocate.load(Ordering::Acquire);
if next == 0 {
None
} else {
Some(next - 1)
}
}
pub fn allocate(&self) -> u64 {
self.allocation_counter.fetch_add(1, Ordering::Relaxed);
let mut free = self.free_and_pending.lock();

View File

@ -1,4 +1,6 @@
// TODO event log assertion for testing heap location bidirectional referential integrity
// TODO temporary trees for transactional in-memory coordination
// TODO event log assertion for testing heap location bidirectional referential integrity,
// particularly in the object location mapper.
// TODO make ObjectId wrap NonZeroU64 so it's more clear when slab tenant PageTable has a 0 that
// it's unoccupied
// TODO ensure nothing "from the future" gets copied into earlier epochs during GC
@ -26,6 +28,9 @@
// TODO corrupted data extraction binary
// TODO if the crash_iter test panics, the test doesn't fail as expected
// TODO remove/make conditionally compiled for testing the explicit process aborts
// TODO make EBR and index fanout consts as small as possible to reduce memory usage
// TODO make leaf fanout as small as possible while retaining perf
// TODO dynamically sized fanouts for reducing fragmentation
mod config;
mod db;

View File

@ -55,6 +55,8 @@ pub(crate) struct ObjectCache<const LEAF_FANOUT: usize> {
flush_epoch: FlushEpochTracker,
dirty: ConcurrentMap<(FlushEpoch, ObjectId), Dirty<LEAF_FANOUT>, 4>,
compacted_heap_slots: Arc<AtomicU64>,
pub(super) tree_leaves_merged: Arc<AtomicU64>,
pub(super) flushes: Arc<AtomicU64>,
}
impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
@ -109,6 +111,8 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
#[cfg(feature = "for-internal-testing-only")]
event_verifier: Arc::default(),
compacted_heap_slots: Arc::default(),
tree_leaves_merged: Arc::default(),
flushes: Arc::default(),
};
Ok((pc, indices, was_recovered))
@ -127,6 +131,8 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
compacted_heap_slots: self
.compacted_heap_slots
.load(Ordering::Acquire),
tree_leaves_merged: self.tree_leaves_merged.load(Ordering::Acquire),
flushes: self.flushes.load(Ordering::Acquire),
..self.heap.stats()
}
}
@ -240,12 +246,12 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
self.object_id_index.get(&ObjectId(*node_to_evict))
{
if n.object_id.0 != *node_to_evict {
log::warn!("during cache eviction, node to evict did not match current occupant for {:?}", node_to_evict);
log::debug!("during cache eviction, node to evict did not match current occupant for {:?}", node_to_evict);
continue;
}
n
} else {
log::warn!("during cache eviction, unable to find node to evict for {:?}", node_to_evict);
log::debug!("during cache eviction, unable to find node to evict for {:?}", node_to_evict);
continue;
};
@ -278,17 +284,21 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
forward_flush_notifier,
) = self.flush_epoch.roll_epoch_forward();
log::trace!("waiting for previous flush to complete");
log::trace!(
"waiting for previous flush of {:?} to complete",
previous_flush_complete_notifier.epoch()
);
previous_flush_complete_notifier.wait_for_complete();
log::trace!("waiting for our epoch to become vacant");
log::trace!(
"waiting for our epoch {:?} to become vacant",
this_vacant_notifier.epoch()
);
let flush_through_epoch: FlushEpoch =
this_vacant_notifier.wait_for_complete();
let mut objects_to_defrag = self.heap.objects_to_defrag();
log::trace!("performing flush");
let flush_boundary = (flush_through_epoch.increment(), ObjectId::MIN);
let mut evict_after_flush = vec![];
@ -423,21 +433,33 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
}
}
log::info!(
"objects to defrag (after flush loop): {}",
objects_to_defrag.len()
);
if !objects_to_defrag.is_empty() {
log::debug!(
"objects to defrag (after flush loop): {}",
objects_to_defrag.len()
);
}
self.compacted_heap_slots
.fetch_add(objects_to_defrag.len() as u64, Ordering::Relaxed);
for fragmented_object_id in objects_to_defrag {
let object_opt = self.object_id_index.get(&fragmented_object_id);
let object = if let Some(object) = object_opt {
object
} else {
panic!("defragmenting object not found in object_id_index: {fragmented_object_id:?}");
log::debug!("defragmenting object not found in object_id_index: {fragmented_object_id:?}");
continue;
};
if let Some(ref inner) = *object.inner.read() {
if let Some(dirty) = inner.dirty_flush_epoch {
assert!(dirty > flush_through_epoch);
// This object will be rewritten anyway when its dirty epoch gets flushed
continue;
}
}
let data = match self.heap.read(fragmented_object_id) {
Ok(data) => data,
Err(e) => {
@ -478,6 +500,11 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
}
}
self.flush_epoch.manually_advance_epoch();
self.heap.manually_advance_epoch();
self.flushes.fetch_add(1, Ordering::Release);
Ok(())
}
}

View File

@ -127,6 +127,8 @@ impl ObjectLocationMapper {
heap_slots_allocated,
heap_slots_freed,
compacted_heap_slots: 0,
tree_leaves_merged: 0,
flushes: 0,
}
}
@ -201,12 +203,12 @@ impl ObjectLocationMapper {
let slab = new_location.slab();
let slot = new_location.slot();
let last_oid_at_location = self.slab_tenancies[usize::from(slab)]
let _last_oid_at_location = self.slab_tenancies[usize::from(slab)]
.slot_to_object_id
.get(slot)
.swap(object_id.0, Ordering::Release);
assert_eq!(0, last_oid_at_location);
// TODO add debug event verifier here assert_eq!(0, last_oid_at_location);
last_address_opt
}

View File

@ -287,6 +287,12 @@ impl<const LEAF_FANOUT: usize> Tree<LEAF_FANOUT> {
leaf.deleted = Some(merge_epoch);
leaf_guard
.inner
.cache
.tree_leaves_merged
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.index.remove(&leaf_guard.low_key).unwrap();
self.cache.object_id_index.remove(&leaf_guard.node.object_id).unwrap();

View File

@ -26,7 +26,7 @@ use tree::{
};
const N_THREADS: usize = 10;
const N_PER_THREAD: usize = 1000;
const N_PER_THREAD: usize = 1_000;
const N: usize = N_THREADS * N_PER_THREAD; // NB N should be multiple of N_THREADS
const SPACE: usize = N;
@ -1119,31 +1119,68 @@ fn tree_gc() {
common::setup_logger();
let config = Config::tmp().unwrap().flush_every_ms(Some(1));
let config = Config::tmp().unwrap().flush_every_ms(None);
let t: sled::Db<FANOUT> = config.open().unwrap();
for i in 0..N {
let k = kv(i);
t.insert(&k, k.clone()).unwrap();
}
for _ in 0..100 {
t.flush().unwrap();
}
let size_on_disk_after_inserts = t.size_on_disk().unwrap();
for i in 0..N {
let k = kv(i);
t.insert(&k, k.clone()).unwrap();
}
for _ in 0..100 {
t.flush().unwrap();
}
let size_on_disk_after_rewrites = t.size_on_disk().unwrap();
for i in 0..N {
let k = kv(i);
assert_eq!(t.get(&*k).unwrap(), Some(k.clone().into()), "{k:?}");
t.remove(&*k).unwrap();
}
for _ in 0..256 {
for _ in 0..100 {
t.flush().unwrap();
t.stats();
}
let size_on_disk_after_deletes = t.size_on_disk().unwrap();
let stats = t.stats();
assert!(stats.objects_allocated >= (N / FANOUT) as u64, "{stats:?}");
assert!(stats.objects_freed >= (N / 2) as u64, "{stats:?}");
assert!(
stats.objects_freed >= (stats.objects_allocated / 2) as u64,
"{stats:?}"
);
assert!(stats.heap_slots_allocated >= (N / FANOUT) as u64, "{stats:?}");
assert!(stats.heap_slots_freed >= (N / 2) as u64, "{stats:?}");
assert!(
stats.heap_slots_freed >= (stats.heap_slots_allocated / 2) as u64,
"{stats:?}"
);
// TODO test this after we implement file truncation
// let expected_max_size = size_on_disk_after_inserts / 100;
// assert!(size_on_disk_after_deletes <= expected_max_size);
println!(
"after writing {N} items and removing them, our stats are: \n{stats:?}. \
disk size went from {}kb after inserts to {}kb after rewriting to {}kb after deletes",
size_on_disk_after_inserts / 1024,
size_on_disk_after_rewrites / 1024,
size_on_disk_after_deletes / 1024,
);
}
/*