mirror of https://github.com/spacejam/sled
Properly fsync slab files after write batches
This commit is contained in:
parent
4e4a3f93c1
commit
1516077a78
48
src/heap.rs
48
src/heap.rs
|
@ -858,7 +858,8 @@ impl Heap {
|
|||
let table = &self.table;
|
||||
|
||||
let heap_bytes_written = AtomicU64::new(0);
|
||||
let heap_files_used = AtomicU64::new(0);
|
||||
let heap_files_used_0_to_63 = AtomicU64::new(0);
|
||||
let heap_files_used_64_to_127 = AtomicU64::new(0);
|
||||
|
||||
let map_closure = |update: Update| match update {
|
||||
Update::Store { object_id, collection_id, metadata, data } => {
|
||||
|
@ -880,11 +881,17 @@ impl Heap {
|
|||
// record stats
|
||||
heap_bytes_written
|
||||
.fetch_add(data_len as u64, Ordering::Release);
|
||||
// N_SLABS is larger than 64, so we tolerate a bit
|
||||
// of sloppiness in this file count stat in order to
|
||||
// minimize overhead of calculating it.
|
||||
let slab_bit = 0b1 << (slab_id % 63);
|
||||
heap_files_used.fetch_or(slab_bit, Ordering::Release);
|
||||
|
||||
if slab_id < 64 {
|
||||
let slab_bit = 0b1 << slab_id;
|
||||
heap_files_used_0_to_63
|
||||
.fetch_or(slab_bit, Ordering::Release);
|
||||
} else {
|
||||
assert!(slab_id < 128);
|
||||
let slab_bit = 0b1 << (slab_id - 64);
|
||||
heap_files_used_64_to_127
|
||||
.fetch_or(slab_bit, Ordering::Release);
|
||||
}
|
||||
|
||||
Ok(UpdateMetadata::Store {
|
||||
object_id,
|
||||
|
@ -905,7 +912,23 @@ impl Heap {
|
|||
|
||||
let before_heap_sync = Instant::now();
|
||||
|
||||
// TODO fsync dirty slabs here
|
||||
for slab_id in 0..N_SLABS {
|
||||
let dirty = if slab_id < 64 {
|
||||
let slab_bit = 0b1 << slab_id;
|
||||
|
||||
heap_files_used_0_to_63.load(Ordering::Acquire) & slab_bit
|
||||
== slab_bit
|
||||
} else {
|
||||
let slab_bit = 0b1 << (slab_id - 64);
|
||||
|
||||
heap_files_used_64_to_127.load(Ordering::Acquire) & slab_bit
|
||||
== slab_bit
|
||||
};
|
||||
|
||||
if dirty {
|
||||
self.slabs[slab_id].sync()?;
|
||||
}
|
||||
}
|
||||
|
||||
let heap_sync_latency = before_heap_sync.elapsed();
|
||||
|
||||
|
@ -1013,11 +1036,16 @@ impl Heap {
|
|||
|
||||
drop(atomicity_mu);
|
||||
|
||||
let heap_files_written_to = u64::from(
|
||||
heap_files_used_0_to_63.load(Ordering::Acquire).count_ones()
|
||||
+ heap_files_used_64_to_127
|
||||
.load(Ordering::Acquire)
|
||||
.count_ones(),
|
||||
);
|
||||
|
||||
let stats = WriteBatchStats {
|
||||
heap_bytes_written: heap_bytes_written.load(Ordering::Acquire),
|
||||
heap_files_written_to: heap_files_used
|
||||
.load(Ordering::Acquire)
|
||||
.count_ones() as u64,
|
||||
heap_files_written_to,
|
||||
heap_write_latency,
|
||||
heap_sync_latency,
|
||||
metadata_bytes_written,
|
||||
|
|
|
@ -33,7 +33,6 @@
|
|||
// TODO list trees test for recovering empty collections
|
||||
// TODO set explicit max key and value sizes w/ corresponding heap
|
||||
// TODO add failpoints to writepath
|
||||
// TODO fsync strictness
|
||||
//
|
||||
// performance
|
||||
// TODO handle prefix encoding
|
||||
|
|
Loading…
Reference in New Issue