diff --git a/fuzz/fuzz_targets/fuzz_model.rs b/fuzz/fuzz_targets/fuzz_model.rs index ea1b3515..4af51125 100644 --- a/fuzz/fuzz_targets/fuzz_model.rs +++ b/fuzz/fuzz_targets/fuzz_model.rs @@ -8,7 +8,7 @@ use arbitrary::Arbitrary; use sled::{Config, Db as SledDb, InlineArray}; -type Db = SledDb<4, 3, 11>; +type Db = SledDb<3>; const KEYSPACE: u64 = 128; diff --git a/src/config.rs b/src/config.rs index 0422315c..f4655619 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,6 +7,18 @@ use tempdir::TempDir; use crate::Db; +macro_rules! builder { + ($(($name:ident, $t:ty, $desc:expr)),*) => { + $( + #[doc=$desc] + pub fn $name(mut self, to: $t) -> Self { + self.$name = to; + self + } + )* + } +} + #[derive(Debug, Clone)] pub struct Config { /// The base directory for storing the database. diff --git a/src/db.rs b/src/db.rs index a6d20538..4da5f479 100644 --- a/src/db.rs +++ b/src/db.rs @@ -553,7 +553,7 @@ impl Db { if config.cache_capacity_bytes < 256 { log::debug!( "Db configured to have Config.cache_capacity_bytes \ - of under 256, so we will use the minimum of 256 bytes" + of under 256, so we will use the minimum of 256 bytes instead" ); } if config.entry_cache_percent > 80 { @@ -1184,7 +1184,10 @@ impl Db { "MergedAndDeleted for {:?}, adding None to write_batch", node_id ); - write_batch.push((dirty_node_id, None)); + write_batch.push(heap::Update::Free { + node_id: dirty_node_id, + collection_id: CollectionId::MIN, + }); } Dirty::CooperativelySerialized { low_key, @@ -1193,7 +1196,12 @@ impl Db { } => { Arc::make_mut(&mut data); let data = Arc::into_inner(data).unwrap(); - write_batch.push((dirty_node_id, Some((low_key, data)))); + write_batch.push(heap::Update::Store { + node_id: dirty_node_id, + collection_id: CollectionId::MIN, + metadata: low_key, + data, + }); } Dirty::NotYetSerialized { low_key, node } => { assert_eq!(dirty_node_id, node.id, "mismatched node ID for NotYetSerialized with low key {:?}", low_key); @@ -1247,7 +1255,12 @@ impl Db { } }; - write_batch.push((dirty_node_id, Some((low_key, data)))); + write_batch.push(heap::Update::Store { + node_id: dirty_node_id, + collection_id: CollectionId::MIN, + metadata: low_key, + data, + }); if leaf_ref.page_out_on_flush == Some(flush_through_epoch) { // page_out_on_flush is set to false @@ -2519,7 +2532,7 @@ impl Batch { } fn initialize( - index_data: &[(NodeId, InlineArray)], + index_data: &[(NodeId, CollectionId, InlineArray)], first_id_opt: Option, ) -> ConcurrentMap< InlineArray, @@ -2551,8 +2564,8 @@ fn initialize( let ret = ConcurrentMap::default(); - for (id, low_key) in index_data { - let node = Node { id: *id, inner: Arc::new(None.into()) }; + for (node_id, collection_id, low_key) in index_data { + let node = Node { id: *node_id, inner: Arc::new(None.into()) }; ret.insert(low_key.clone(), node); } diff --git a/src/heap.rs b/src/heap.rs index c5ef5baf..f155f648 100644 --- a/src/heap.rs +++ b/src/heap.rs @@ -8,6 +8,7 @@ use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicPtr, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; +use concurrent_map::Minimum; use crossbeam_queue::SegQueue; use ebr::{Ebr, Guard}; use fault_injection::{annotate, fallible, maybe}; @@ -17,7 +18,7 @@ use pagetable::PageTable; use rayon::prelude::*; use crate::metadata_store::MetadataStore; -use crate::NodeId; +use crate::{CollectionId, NodeId}; const WARN: &str = "DO_NOT_PUT_YOUR_FILES_HERE"; const N_SLABS: usize = 78; @@ -141,7 +142,7 @@ pub struct Config { pub(crate) fn recover>( storage_directory: P, -) -> io::Result<(Heap, Vec<(NodeId, InlineArray)>)> { +) -> io::Result<(Heap, Vec<(NodeId, CollectionId, InlineArray)>)> { Heap::recover(&Config { path: storage_directory.as_ref().into() }) } @@ -344,6 +345,11 @@ struct Slab { } impl Slab { + fn maintenance(&self) -> io::Result { + // TODO compact + Ok(0) + } + fn read( &self, slot: u64, @@ -472,6 +478,43 @@ fn set_error( } } +#[derive(Debug)] +pub(crate) enum Update { + Store { + node_id: NodeId, + collection_id: CollectionId, + metadata: InlineArray, + data: Vec, + }, + Free { + node_id: NodeId, + collection_id: CollectionId, + }, +} + +#[derive(Debug, PartialOrd, Ord, PartialEq, Eq)] +pub(crate) enum UpdateMetadata { + Store { + node_id: NodeId, + collection_id: CollectionId, + metadata: InlineArray, + location: NonZeroU64, + }, + Free { + node_id: NodeId, + collection_id: CollectionId, + }, +} + +impl UpdateMetadata { + pub fn node_id(&self) -> NodeId { + match self { + UpdateMetadata::Store { node_id, .. } + | UpdateMetadata::Free { node_id, .. } => *node_id, + } + } +} + #[derive(Clone)] pub(crate) struct Heap { config: Config, @@ -519,7 +562,7 @@ impl Heap { pub fn recover( config: &Config, - ) -> io::Result<(Heap, Vec<(NodeId, InlineArray)>)> { + ) -> io::Result<(Heap, Vec<(NodeId, CollectionId, InlineArray)>)> { log::trace!("recovering Heap at {:?}", config.path); let slabs_dir = config.path.join("slabs"); @@ -543,19 +586,32 @@ impl Heap { MetadataStore::recover(config.path.join("metadata"))?; let pt = PageTable::::default(); - let mut user_data = Vec::<(NodeId, InlineArray)>::with_capacity( - recovered_metadata.len(), - ); - let mut object_ids: FnvHashSet = Default::default(); + let mut user_data = + Vec::<(NodeId, CollectionId, InlineArray)>::with_capacity( + recovered_metadata.len(), + ); + let mut node_ids: FnvHashSet = Default::default(); let mut slots_per_slab: [FnvHashSet; N_SLABS] = core::array::from_fn(|_| Default::default()); - for (k, location, data) in recovered_metadata { - object_ids.insert(k.0); - let slab_address = SlabAddress::from(location); - slots_per_slab[slab_address.slab_id as usize] - .insert(slab_address.slot()); - pt.get(k.0).store(location.get(), Ordering::Relaxed); - user_data.push((k, data.clone())); + for update_metadata in recovered_metadata { + match update_metadata { + UpdateMetadata::Store { + node_id, + collection_id, + location, + metadata, + } => { + node_ids.insert(node_id.0); + let slab_address = SlabAddress::from(location); + slots_per_slab[slab_address.slab_id as usize] + .insert(slab_address.slot()); + pt.get(node_id.0).store(location.get(), Ordering::Relaxed); + user_data.push((node_id, collection_id, metadata.clone())); + } + UpdateMetadata::Free { .. } => { + unreachable!() + } + } } let mut slabs = vec![]; @@ -582,7 +638,7 @@ impl Heap { slabs: Arc::new(slabs.try_into().unwrap()), config: config.clone(), object_id_allocator: Arc::new(Allocator::from_allocated( - &object_ids, + &node_ids, )), pt, global_error: metadata_store.get_global_error_arc(), @@ -595,7 +651,10 @@ impl Heap { } pub fn maintenance(&self) -> io::Result { - // TODO + for slab in self.slabs.iter() { + slab.maintenance()?; + } + Ok(0) } @@ -633,89 +692,72 @@ impl Heap { } } - pub fn write_batch(&self, batch: I) -> io::Result<()> - where - I: Sized - + IntoIterator)>)>, - { + pub fn write_batch(&self, batch: Vec) -> io::Result<()> { self.check_error()?; let mut guard = self.free_ebr.pin(); - let batch: Vec<(NodeId, Option<(InlineArray, Vec)>)> = batch - .into_iter() - //.map(|(key, val_opt)| (key, val_opt.map(|(user_data, b)| (user_data, b.as_ref())))) - .collect(); - let slabs = &self.slabs; - let metadata_batch_res: io::Result< - Vec<(NodeId, Option<(NonZeroU64, InlineArray)>)>, - > = batch - .into_par_iter() - .map( - |(object_id, val_opt): ( - NodeId, - Option<(InlineArray, Vec)>, - )| { - let new_meta = if let Some((user_data, bytes)) = val_opt { - let slab_id = slab_for_size(bytes.len()); - let slab = &slabs[usize::from(slab_id)]; - let slot = slab.slot_allocator.allocate(); - let new_location = - SlabAddress::from_slab_slot(slab_id, slot); - let new_location_nzu: NonZeroU64 = new_location.into(); - let complete_durability_pipeline = - maybe!(slab.write(slot, bytes)); + let map_closure = |update: Update| match update { + Update::Store { node_id, collection_id, metadata, data } => { + let slab_id = slab_for_size(data.len()); + let slab = &slabs[usize::from(slab_id)]; + let slot = slab.slot_allocator.allocate(); + let new_location = SlabAddress::from_slab_slot(slab_id, slot); + let new_location_nzu: NonZeroU64 = new_location.into(); - if let Err(e) = complete_durability_pipeline { - // can immediately free slot as the - slab.slot_allocator.free(slot); - return Err(e); - } - Some((new_location_nzu, user_data)) - } else { - None - }; + let complete_durability_pipeline = + maybe!(slab.write(slot, data)); - Ok((object_id, new_meta)) - }, - ) - .collect(); + if let Err(e) = complete_durability_pipeline { + // can immediately free slot as the + slab.slot_allocator.free(slot); + return Err(e); + } + Ok(UpdateMetadata::Store { + node_id, + collection_id, + metadata, + location: new_location_nzu, + }) + } + Update::Free { node_id, collection_id } => { + Ok(UpdateMetadata::Free { node_id, collection_id }) + } + }; + + let metadata_batch_res: io::Result> = + batch.into_par_iter().map(map_closure).collect(); let metadata_batch = match metadata_batch_res { - Ok(mb) => mb, + Ok(mut mb) => { + // TODO evaluate impact : cost ratio of this sort + mb.par_sort_unstable(); + mb + } Err(e) => { self.set_error(&e); return Err(e); } }; - if let Err(e) = self.metadata_store.insert_batch(metadata_batch.clone()) - { + // make metadata durable + if let Err(e) = self.metadata_store.insert_batch(&metadata_batch) { self.set_error(&e); - - // this is very cold, so it's fine if it's not fast - for (_object_id, value_opt) in metadata_batch { - let (new_location_u64, _user_data) = value_opt.unwrap(); - let new_location = SlabAddress::from(new_location_u64); - let slab_id = new_location.slab_id; - let slab = &self.slabs[usize::from(slab_id)]; - slab.slot_allocator.free(new_location.slot()); - } return Err(e); } - // now we can update in-memory metadata - - for (object_id, value_opt) in metadata_batch { - let new_location = if let Some((nl, _user_data)) = value_opt { - nl.get() - } else { - 0 + // reclaim previous disk locations for future writes + for update_metadata in metadata_batch { + let (node_id, new_location) = match update_metadata { + UpdateMetadata::Store { node_id, location, .. } => { + (node_id, location.get()) + } + UpdateMetadata::Free { node_id, .. } => (node_id, 0), }; let last_u64 = - self.pt.get(object_id.0).swap(new_location, Ordering::Release); + self.pt.get(node_id.0).swap(new_location, Ordering::Release); if let Some(nzu) = NonZeroU64::new(last_u64) { let last_address = SlabAddress::from(nzu); @@ -736,14 +778,19 @@ impl Heap { self.object_id_allocator.allocate() } - #[allow(unused)] - pub fn free(&self, object_id: NodeId) -> io::Result<()> { + //#[allow(unused)] + pub fn free(&self, node_id: NodeId) -> io::Result<()> { let mut guard = self.free_ebr.pin(); - if let Err(e) = self.metadata_store.insert_batch([(object_id, None)]) { + if let Err(e) = + self.metadata_store.insert_batch(&[UpdateMetadata::Free { + node_id, + collection_id: CollectionId::MIN, + }]) + { self.set_error(&e); return Err(e); } - let last_u64 = self.pt.get(object_id.0).swap(0, Ordering::Release); + let last_u64 = self.pt.get(node_id.0).swap(0, Ordering::Release); if let Some(nzu) = NonZeroU64::new(last_u64) { let last_address = SlabAddress::from(nzu); diff --git a/src/lib.rs b/src/lib.rs index 6e380c04..0d686a48 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,7 @@ +// TODO remove all Drop logic that checks Arc::strong_count, all are race conditions +// TODO move dirty tracking, cache to shared level +// TODO write actual CollectionId instead of MIN in Db::flush // TODO put aborts behind feature flags for hard crashes -// TODO implement tree node merges when batches remove items // TODO heap maintenance w/ speculative write followed by CAS in pt // maybe with global writer lock that controls flushers too // TODO allow waiting flusher to start collecting dirty pages as soon @@ -10,29 +12,13 @@ // TODO re-enable transaction tests in test_tree.rs // TODO set explicit max key and value sizes w/ corresponding heap // TODO skim inlining output of RUSTFLAGS="-Cremark=all -Cdebuginfo=1" - -// NB: this macro must appear before the following mod statements -// for it to be usable within them. One of the few places where -// this sort of ordering matters in Rust. -macro_rules! builder { - ($(($name:ident, $t:ty, $desc:expr)),*) => { - $( - #[doc=$desc] - pub fn $name(mut self, to: $t) -> Self { - self.$name = to; - self - } - )* - } -} +// TODO measure space savings vs cost of zstd in metadata store mod config; mod db; mod flush_epoch; mod heap; -// mod meta_node; mod metadata_store; -// mod varint; #[cfg(any( feature = "testing_shred_allocator", @@ -115,6 +101,24 @@ impl concurrent_map::Minimum for NodeId { const MIN: NodeId = NodeId(u64::MIN); } +#[derive( + Debug, + Clone, + Copy, + serde::Serialize, + serde::Deserialize, + PartialOrd, + Ord, + PartialEq, + Eq, + Hash, +)] +struct CollectionId(u64); + +impl concurrent_map::Minimum for CollectionId { + const MIN: CollectionId = CollectionId(u64::MIN); +} + const fn _assert_public_types_send_sync() { use std::fmt::Debug; diff --git a/src/metadata_store.rs b/src/metadata_store.rs index a102d049..1a99fac1 100644 --- a/src/metadata_store.rs +++ b/src/metadata_store.rs @@ -17,7 +17,7 @@ use rayon::prelude::*; use zstd::stream::read::Decoder as ZstdDecoder; use zstd::stream::write::Encoder as ZstdEncoder; -use crate::NodeId; +use crate::{heap::UpdateMetadata, CollectionId, NodeId}; const WARN: &str = "DO_NOT_PUT_YOUR_FILES_HERE"; const TMP_SUFFIX: &str = ".tmp"; @@ -49,7 +49,7 @@ impl Drop for MetadataStore { } struct Recovery { - recovered: Vec<(NodeId, NonZeroU64, InlineArray)>, + recovered: Vec, id_for_next_log: u64, snapshot_size: u64, } @@ -259,7 +259,7 @@ impl MetadataStore { // Metadata writer MetadataStore, // Metadata - node id, value, user data - Vec<(NodeId, NonZeroU64, InlineArray)>, + Vec, )> { use fs2::FileExt; @@ -346,12 +346,7 @@ impl MetadataStore { /// Write a batch of metadata. `None` for the second half of the outer tuple represents a /// deletion. - pub fn insert_batch< - I: IntoIterator)>, - >( - &self, - batch: I, - ) -> io::Result<()> { + pub fn insert_batch(&self, batch: &[UpdateMetadata]) -> io::Result<()> { self.check_error()?; let batch_bytes = serialize_batch(batch); @@ -415,11 +410,7 @@ impl MetadataStore { } } -fn serialize_batch< - I: IntoIterator)>, ->( - batch: I, -) -> Vec { +fn serialize_batch(batch: &[UpdateMetadata]) -> Vec { // we initialize the vector to contain placeholder bytes for the frame length let batch_bytes = 0_u64.to_le_bytes().to_vec(); @@ -432,19 +423,34 @@ fn serialize_batch< // LE encoded crc32 of length + payload raw bytes, XOR 0xAF to make non-zero in empty case let mut batch_encoder = ZstdEncoder::new(batch_bytes, ZSTD_LEVEL).unwrap(); - for (k, v_opt) in batch { - batch_encoder.write_all(&k.0.to_le_bytes()).unwrap(); - if let Some((v, user_data)) = v_opt { - batch_encoder.write_all(&v.get().to_le_bytes()).unwrap(); + for update_metadata in batch { + match update_metadata { + UpdateMetadata::Store { + node_id, + collection_id, + metadata, + location, + } => { + batch_encoder.write_all(&node_id.0.to_le_bytes()).unwrap(); + batch_encoder + .write_all(&collection_id.0.to_le_bytes()) + .unwrap(); + batch_encoder.write_all(&location.get().to_le_bytes()).unwrap(); - let user_data_len: u64 = user_data.len() as u64; - batch_encoder.write_all(&user_data_len.to_le_bytes()).unwrap(); - batch_encoder.write_all(&user_data).unwrap(); - } else { - // v - batch_encoder.write_all(&0_u64.to_le_bytes()).unwrap(); - // user data len - batch_encoder.write_all(&0_u64.to_le_bytes()).unwrap(); + let metadata_len: u64 = metadata.len() as u64; + batch_encoder.write_all(&metadata_len.to_le_bytes()).unwrap(); + batch_encoder.write_all(&metadata).unwrap(); + } + UpdateMetadata::Free { node_id, collection_id } => { + batch_encoder.write_all(&node_id.0.to_le_bytes()).unwrap(); + batch_encoder + .write_all(&collection_id.0.to_le_bytes()) + .unwrap(); + // heap location + batch_encoder.write_all(&0_u64.to_le_bytes()).unwrap(); + // metadata len + batch_encoder.write_all(&0_u64.to_le_bytes()).unwrap(); + } } } @@ -463,7 +469,7 @@ fn serialize_batch< fn read_frame( file: &mut fs::File, reusable_frame_buffer: &mut Vec, -) -> io::Result> { +) -> io::Result> { let mut frame_size_buf: [u8; 8] = [0; 8]; // TODO only break if UnexpectedEof, otherwise propagate fallible!(file.read_exact(&mut frame_size_buf)); @@ -502,12 +508,18 @@ fn read_frame( let mut decoder = ZstdDecoder::new(&reusable_frame_buffer[8..len + 8]) .expect("failed to create zstd decoder"); - let mut k_buf: [u8; 8] = [0; 8]; - let mut v_buf: [u8; 8] = [0; 8]; - let mut user_data_len_buf: [u8; 8] = [0; 8]; - let mut user_data_buf = vec![]; + let mut node_id_buf: [u8; 8] = [0; 8]; + let mut collection_id_buf: [u8; 8] = [0; 8]; + let mut location_buf: [u8; 8] = [0; 8]; + let mut metadata_len_buf: [u8; 8] = [0; 8]; + let mut metadata_buf = vec![]; loop { - let first_read_res = decoder.read_exact(&mut k_buf); + let first_read_res = decoder + .read_exact(&mut node_id_buf) + .and_then(|_| decoder.read_exact(&mut collection_id_buf)) + .and_then(|_| decoder.read_exact(&mut location_buf)) + .and_then(|_| decoder.read_exact(&mut metadata_len_buf)); + if let Err(e) = first_read_res { if e.kind() != io::ErrorKind::UnexpectedEof { return Err(e); @@ -515,30 +527,35 @@ fn read_frame( break; } } - decoder - .read_exact(&mut v_buf) - .expect("we expect reads from crc-verified buffers to succeed"); - decoder - .read_exact(&mut user_data_len_buf) - .expect("we expect reads from crc-verified buffers to succeed"); - let k = u64::from_le_bytes(k_buf); - let v = u64::from_le_bytes(v_buf); + let node_id = NodeId(u64::from_le_bytes(node_id_buf)); + let collection_id = CollectionId(u64::from_le_bytes(collection_id_buf)); + let location = u64::from_le_bytes(location_buf); - let user_data_len_raw = u64::from_le_bytes(user_data_len_buf); - let user_data_len = usize::try_from(user_data_len_raw).unwrap(); - user_data_buf.reserve(user_data_len); + let metadata_len_raw = u64::from_le_bytes(metadata_len_buf); + let metadata_len = usize::try_from(metadata_len_raw).unwrap(); + + metadata_buf.reserve(metadata_len); unsafe { - user_data_buf.set_len(user_data_len); + metadata_buf.set_len(metadata_len); } decoder - .read_exact(&mut user_data_buf) + .read_exact(&mut metadata_buf) .expect("we expect reads from crc-verified buffers to succeed"); - let user_data = InlineArray::from(&*user_data_buf); + if let Some(location_nzu) = NonZeroU64::new(location) { + let metadata = InlineArray::from(&*metadata_buf); - ret.push((NodeId(k), (v, user_data))); + ret.push(UpdateMetadata::Store { + node_id, + collection_id, + location: location_nzu, + metadata, + }); + } else { + ret.push(UpdateMetadata::Free { node_id, collection_id }); + } } Ok(ret) @@ -549,7 +566,7 @@ fn read_frame( fn read_log( directory_path: &Path, lsn: u64, -) -> io::Result> { +) -> io::Result> { log::trace!("reading log {lsn}"); let mut ret = FnvHashMap::default(); @@ -558,8 +575,8 @@ fn read_log( let mut reusable_frame_buffer: Vec = vec![]; while let Ok(frame) = read_frame(&mut file, &mut reusable_frame_buffer) { - for (k, v) in frame { - ret.insert(k, v); + for update_metadata in frame { + ret.insert(update_metadata.node_id(), update_metadata); } } @@ -572,7 +589,7 @@ fn read_log( fn read_snapshot( directory_path: &Path, lsn: u64, -) -> io::Result<(FnvHashMap, u64)> { +) -> io::Result<(FnvHashMap, u64)> { log::trace!("reading snapshot {lsn}"); let mut reusable_frame_buffer: Vec = vec![]; let mut file = @@ -580,11 +597,9 @@ fn read_snapshot( let size = fallible!(file.metadata()).len(); let raw_frame = read_frame(&mut file, &mut reusable_frame_buffer)?; - let frame: FnvHashMap = raw_frame + let frame: FnvHashMap = raw_frame .into_iter() - .map(|(k, (v, user_data))| { - (k, (NonZeroU64::new(v).unwrap(), user_data)) - }) + .map(|update_metadata| (update_metadata.node_id(), update_metadata)) .collect(); log::trace!("recovered {} items in snapshot {}", frame.len(), lsn); @@ -698,7 +713,7 @@ fn read_snapshot_and_apply_logs( let mut max_log_id = snapshot_id_opt.unwrap_or(0); let log_data_res: io::Result< - Vec<(u64, FnvHashMap)>, + Vec<(u64, FnvHashMap)>, > = (&log_ids) //.iter().collect::>()) .into_par_iter() .map(move |log_id| { @@ -706,13 +721,13 @@ fn read_snapshot_and_apply_logs( assert!(*log_id > snapshot_id); } - let log_datum = read_log(path, *log_id)?; + let log_data = read_log(path, *log_id)?; - Ok((*log_id, log_datum)) + Ok((*log_id, log_data)) }) .collect(); - let mut recovered: FnvHashMap = + let mut recovered: FnvHashMap = snapshot_rx.recv().unwrap()?; log::trace!("recovered snapshot contains {recovered:?}"); @@ -720,29 +735,25 @@ fn read_snapshot_and_apply_logs( for (log_id, log_datum) in log_data_res? { max_log_id = max_log_id.max(log_id); - for (k, (v, user_data)) in log_datum { - log::trace!("recovery of log contained location {v:?} for object id {k:?} user_data {user_data:?}"); - if v == 0 { - recovered.remove(&k); + for (node_id, update_metadata) in log_datum { + if matches!(update_metadata, UpdateMetadata::Store { .. }) { + recovered.insert(node_id, update_metadata); } else { - recovered.insert(k, (NonZeroU64::new(v).unwrap(), user_data)); + let previous = recovered.remove(&node_id); + // TODO: assert!(previous.is_some()); } } } - let mut recovered: Vec<(NodeId, NonZeroU64, InlineArray)> = recovered + let mut recovered: Vec = recovered .into_iter() - .map(|(k, (v, user_data))| (k, v, user_data)) + .map(|(node_id, update_metadata)| update_metadata) .collect(); recovered.par_sort_unstable(); // write fresh snapshot with recovered data - let new_snapshot_data = serialize_batch( - recovered - .iter() - .map(|(k, v, user_data)| (*k, Some((*v, user_data.clone())))), - ); + let new_snapshot_data = serialize_batch(&recovered); let snapshot_size = new_snapshot_data.len() as u64; let new_snapshot_tmp_path = snapshot_path(path, max_log_id, true);