Merge pull request #88 from async-raft/12-cache-replicated-entries-and-dont-block

Cache replicated entries & don't block main task w/ replication
This commit is contained in:
Anthony Dodd 2020-11-24 09:46:04 -06:00 committed by GitHub
commit 5835bf4fc6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 199 additions and 62 deletions

View File

@ -4,6 +4,14 @@ This changelog follows the patterns described here: https://keepachangelog.com/e
## [unreleased]
## 0.6.0-alpha.0
### changed
- Implemented [#12](https://github.com/async-raft/async-raft/issues/12). This is a pretty old issue and a pretty solid optimization. The previous implementation of this algorithm would go to storage (typically disk) for every process of replicating entries to the state machine. Now, we are caching entries as they come in from the leader, and using only the cache as the source of data. There are a few simple measures needed to ensure this is correct, as the leader entry replication protocol takes care of most of the work for us in this case.
- Updated / clarified the interface for log compaction. See the guide or the updated `do_log_compaction` method docs for more details.
### fixed
- Fixed [#76](https://github.com/async-raft/async-raft/issues/76) by moving the process of replicating log entries to the state machine off of the main task. This ensures that the process never blocks the main task. This also includes a few nice optimizations mentioned below.
## 0.5.5
### changed
- Added `#[derive(Serialize, Deserialize)]` to `RaftMetrics`, `State`.

View File

@ -1,6 +1,6 @@
[package]
name = "async-raft"
version = "0.5.5"
version = "0.6.0-alpha.0"
edition = "2018"
authors = ["Anthony Dodd <Dodd.AnthonyJosiah@gmail.com>"]
categories = ["algorithms", "asynchronous", "data-structures"]

View File

@ -52,7 +52,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
if msg_prev_index_is_min || msg_index_and_term_match {
// If this is just a heartbeat, then respond.
if msg.entries.is_empty() {
self.replicate_to_state_machine_if_needed(&mut report_metrics).await?;
self.replicate_to_state_machine_if_needed(msg.entries).await;
if report_metrics {
self.report_metrics();
}
@ -65,7 +65,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// Else, append log entries.
self.append_log_entries(&msg.entries).await?;
self.replicate_to_state_machine_if_needed(&mut report_metrics).await?;
self.replicate_to_state_machine_if_needed(msg.entries).await;
if report_metrics {
self.report_metrics();
}
@ -156,7 +156,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
tracing::trace!("end log consistency check");
self.append_log_entries(&msg.entries).await?;
self.replicate_to_state_machine_if_needed(&mut report_metrics).await?;
self.replicate_to_state_machine_if_needed(msg.entries).await;
if report_metrics {
self.report_metrics();
}
@ -198,20 +198,95 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
Ok(())
}
/// Replicate outstanding logs to the state machine if needed.
#[tracing::instrument(level = "trace", skip(self, report_metrics))]
async fn replicate_to_state_machine_if_needed(&mut self, report_metrics: &mut bool) -> RaftResult<()> {
if self.commit_index > self.last_applied {
// Fetch the series of entries which must be applied to the state machine, and apply them.
let stop = std::cmp::min(self.commit_index, self.last_log_index) + 1;
let entries = self
.storage
.get_log_entries(self.last_applied + 1, stop)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
/// Replicate any outstanding entries to the state machine for which it is safe to do so.
///
/// Very importantly, this routine must not block the main control loop main task, else it
/// may cause the Raft leader to timeout the requests to this node.
#[tracing::instrument(level = "trace", skip(self))]
async fn replicate_to_state_machine_if_needed(&mut self, entries: Vec<Entry<D>>) {
// Update cache. Always.
for entry in entries {
self.entries_cache.insert(entry.index, entry);
}
// Perform initial replication to state machine if needed.
if !self.has_completed_initial_replication_to_sm {
// Optimistic update, as failures will cause shutdown.
self.has_completed_initial_replication_to_sm = true;
return self.initial_replicate_to_state_machine().await;
}
// If we already have an active replication task, then do nothing.
if !self.replicate_to_sm_handle.is_empty() {
return;
}
// If we don't have any new entries to replicate, then do nothing.
if self.commit_index <= self.last_applied {
return;
}
// If we have no cached entries, then do nothing.
let first_idx = match self.entries_cache.iter().next() {
Some((_, entry)) => entry.index,
None => return,
};
// Drain entries from the beginning of the cache up to commit index.
let mut last_entry_seen: Option<u64> = None;
let entries: Vec<_> = (first_idx..=self.commit_index)
.filter_map(|idx| {
if let Some(entry) = self.entries_cache.remove(&idx) {
last_entry_seen = Some(entry.index);
match entry.payload {
EntryPayload::Normal(inner) => Some((entry.index, inner.data)),
_ => None,
}
} else {
None
}
})
.collect();
// If we actually have some cached entries to apply, then we optimistically update, as
// `self.last_applied` is held in-memory only, and if an error does come up, then
// Raft will go into shutdown.
if let Some(index) = last_entry_seen {
self.last_applied = index;
self.report_metrics();
}
// If we have no data entries to apply, then do nothing.
if entries.is_empty() {
return;
}
// Spawn task to replicate these entries to the state machine.
let storage = self.storage.clone();
let handle = tokio::spawn(async move {
// Create a new vector of references to the entries data ... might have to change this
// interface a bit before 1.0.
let entries_refs: Vec<_> = entries.iter().map(|(k, v)| (k, v)).collect();
storage.replicate_to_state_machine(&entries_refs).await?;
Ok(None)
});
self.replicate_to_sm_handle.push(handle);
}
/// Perform an initial replication of outstanding entries to the state machine.
///
/// This will only be executed once, and only in response to its first payload of entries
/// from the AppendEntries RPC handler.
#[tracing::instrument(level = "trace", skip(self))]
async fn initial_replicate_to_state_machine(&mut self) {
let stop = std::cmp::min(self.commit_index, self.last_log_index) + 1;
let start = self.last_applied + 1;
let storage = self.storage.clone();
// If we already have an active replication task, then do nothing.
if !self.replicate_to_sm_handle.is_empty() {
return;
}
// Fetch the series of entries which must be applied to the state machine, then apply them.
let handle = tokio::spawn(async move {
let mut new_last_applied: Option<u64> = None;
let entries = storage.get_log_entries(start, stop).await?;
if let Some(entry) = entries.last() {
self.last_applied = entry.index;
*report_metrics = true;
new_last_applied = Some(entry.index);
}
let data_entries: Vec<_> = entries
.iter()
@ -221,16 +296,11 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
})
.collect();
if data_entries.is_empty() {
return Ok(());
return Ok(new_last_applied);
}
self.storage
.replicate_to_state_machine(&data_entries)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
// Request async compaction, if needed.
self.trigger_log_compaction_if_needed();
}
Ok(())
storage.replicate_to_state_machine(&data_entries).await?;
Ok(new_last_applied)
});
self.replicate_to_sm_handle.push(handle);
}
}

View File

@ -56,7 +56,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Check to see if we have any config change logs newer than our commit index. If so, then
// we need to drive the commitment of the config change to the cluster.
let mut pending_config = None; // The inner bool represents `is_in_join_consensus`.
let mut pending_config = None; // The inner bool represents `is_in_joint_consensus`.
if self.core.last_log_index > self.core.commit_index {
let (stale_logs_start, stale_logs_stop) = (self.core.commit_index + 1, self.core.last_log_index + 1);
pending_config = self.core.storage.get_log_entries(stale_logs_start, stale_logs_stop).await
@ -80,8 +80,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
self.core.report_metrics();
// Setup any callbacks needed for responding to commitment of a pending config.
if let Some(is_in_join_consensus) = pending_config {
if is_in_join_consensus {
if let Some(is_in_joint_consensus) = pending_config {
if is_in_joint_consensus {
self.joint_consensus_cb.push(rx_payload_committed); // Receiver for when the joint consensus is committed.
} else {
self.uniform_consensus_cb.push(rx_payload_committed); // Receiver for when the uniform consensus is committed.
@ -349,6 +349,14 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}
// Before we can safely apply this entry to the state machine, we need to ensure there is
// no pending task to replicate entries to the state machine. This is edge case, and would only
// happen once very early in a new leader's term.
if !self.core.replicate_to_sm_handle.is_empty() {
if let Some(Ok(replicate_to_sm_result)) = self.core.replicate_to_sm_handle.next().await {
self.core.handle_replicate_to_sm_result(replicate_to_sm_result)?;
}
}
// Apply this entry to the state machine and return its data response.
let res = self
.core

View File

@ -23,7 +23,7 @@ use crate::config::{Config, SnapshotPolicy};
use crate::core::client::ClientRequestEntry;
use crate::error::{ChangeConfigError, ClientReadError, ClientWriteError, InitializeError, RaftError, RaftResult};
use crate::metrics::RaftMetrics;
use crate::raft::{ChangeMembershipTx, ClientReadResponseTx, ClientWriteRequest, ClientWriteResponseTx, MembershipConfig, RaftMsg};
use crate::raft::{ChangeMembershipTx, ClientReadResponseTx, ClientWriteRequest, ClientWriteResponseTx, Entry, MembershipConfig, RaftMsg};
use crate::replication::{RaftEvent, ReplicaEvent, ReplicationStream};
use crate::storage::HardState;
use crate::{AppData, AppDataResponse, NodeId, RaftNetwork, RaftStorage};
@ -85,6 +85,25 @@ pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftSt
/// This is primarily used in making a determination on when a compaction job needs to be triggered.
snapshot_index: u64,
/// A cache of entries which are waiting to be replicated to the state machine.
///
/// It is important to note that this cache must only be populated from the AppendEntries RPC
/// handler, as these values must only ever represent the entries which have been sent from
/// the current cluster leader.
///
/// Whenever there is a leadership change, this cache will be cleared.
entries_cache: BTreeMap<u64, Entry<D>>,
/// The stream of join handles from state machine replication tasks. There will only ever be
/// a maximum of 1 element at a time.
///
/// This abstraction is needed to ensure that replicating to the state machine does not block
/// the AppendEntries RPC flow, and to ensure that we have a smooth transition to becoming
/// leader without concern over duplicate application of entries to the state machine.
replicate_to_sm_handle: FuturesOrdered<JoinHandle<anyhow::Result<Option<u64>>>>,
/// A bool indicating if this system has performed its initial replication of
/// outstanding entries to the state machine.
has_completed_initial_replication_to_sm: bool,
/// The last time a heartbeat was received.
last_heartbeat: Option<Instant>,
/// The duration until the next election timeout.
@ -121,6 +140,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
last_log_term: 0,
snapshot_state: None,
snapshot_index: 0,
entries_cache: Default::default(),
replicate_to_sm_handle: FuturesOrdered::new(),
has_completed_initial_replication_to_sm: false,
last_heartbeat: None,
next_election_timeout: None,
tx_compaction,
@ -250,6 +272,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// Update the value of the `current_leader` property.
#[tracing::instrument(level = "trace", skip(self))]
fn update_current_leader(&mut self, update: UpdateCurrentLeader) {
self.entries_cache.clear();
match update {
UpdateCurrentLeader::ThisNode => {
self.current_leader = Some(self.id);
@ -324,13 +347,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
return;
}
let SnapshotPolicy::LogsSinceLast(threshold) = &self.config.snapshot_policy;
// Make sure we have actual entries for compaction.
let through_index = std::cmp::min(self.commit_index, self.last_log_index);
if through_index == 0 {
// Check to ensure we have actual entries for compaction.
if self.last_applied == 0 || self.last_applied < self.snapshot_index {
return;
}
// If we are below the threshold, then there is nothing to do.
if (through_index - self.snapshot_index) < *threshold {
if (self.last_applied - self.snapshot_index) < *threshold {
return;
}
@ -340,13 +362,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
let (chan_tx, _) = broadcast::channel(1);
let mut tx_compaction = self.tx_compaction.clone();
self.snapshot_state = Some(SnapshotState::Snapshotting {
through: through_index,
handle,
sender: chan_tx.clone(),
});
tokio::spawn(
async move {
let res = Abortable::new(storage.do_log_compaction(through_index), reg).await;
let res = Abortable::new(storage.do_log_compaction(), reg).await;
match res {
Ok(res) => match res {
Ok(snapshot) => {
@ -367,6 +388,18 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
);
}
/// Handle the output of an async task replicating entries to the state machine.
#[tracing::instrument(level = "trace", skip(self, res))]
pub(self) fn handle_replicate_to_sm_result(&mut self, res: anyhow::Result<Option<u64>>) -> RaftResult<()> {
let last_applied_opt = res.map_err(|err| self.map_fatal_storage_error(err))?;
if let Some(last_applied) = last_applied_opt {
self.last_applied = last_applied;
}
self.report_metrics();
self.trigger_log_compaction_if_needed();
Ok(())
}
/// Reject an init config request due to the Raft node being in a state which prohibits the request.
#[tracing::instrument(level = "trace", skip(self, tx))]
fn reject_init_with_config(&self, tx: oneshot::Sender<Result<(), InitializeError>>) {
@ -404,8 +437,6 @@ pub(self) enum UpdateCurrentLeader {
pub(self) enum SnapshotState<S> {
/// The Raft node is compacting itself.
Snapshotting {
/// The last included index of the new snapshot being generated.
through: u64,
/// A handle to abort the compaction process early if needed.
handle: AbortHandle,
/// A sender for notifiying any other tasks of the completion of this compaction.
@ -626,6 +657,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}
Some(event) = self.replicationrx.next() => self.handle_replica_event(event).await,
Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => {
// Errors herein will trigger shutdown, so no need to process error.
let _ = self.core.handle_replicate_to_sm_result(repl_sm_result);
}
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
}
}
@ -777,6 +812,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
},
Some(update) = self.core.rx_compaction.next() => self.core.update_snapshot_state(update),
Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => {
// Errors herein will trigger shutdown, so no need to process error.
let _ = self.core.handle_replicate_to_sm_result(repl_sm_result);
}
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
}
}
@ -837,6 +876,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
},
Some(update) = self.core.rx_compaction.next() => self.core.update_snapshot_state(update),
Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => {
// Errors herein will trigger shutdown, so no need to process error.
let _ = self.core.handle_replicate_to_sm_result(repl_sm_result);
}
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
}
}
@ -892,6 +935,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
},
Some(update) = self.core.rx_compaction.next() => self.core.update_snapshot_state(update),
Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => {
// Errors herein will trigger shutdown, so no need to process error.
let _ = self.core.handle_replicate_to_sm_result(repl_sm_result);
}
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
}
}

View File

@ -224,13 +224,13 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// completion (or cancellation), and respond to the replication stream. The repl stream
// will wait for the completion and will then send anothe request to fetch the finished snapshot.
// Else we just drop any other state and continue. Leaders never enter `Streaming` state.
if let Some(SnapshotState::Snapshotting { through, handle, sender }) = self.core.snapshot_state.take() {
if let Some(SnapshotState::Snapshotting { handle, sender }) = self.core.snapshot_state.take() {
let mut chan = sender.subscribe();
tokio::spawn(async move {
let _ = chan.recv().await;
drop(tx);
});
self.core.snapshot_state = Some(SnapshotState::Snapshotting { through, handle, sender });
self.core.snapshot_state = Some(SnapshotState::Snapshotting { handle, sender });
return Ok(());
}
@ -239,7 +239,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// will always be found and this block will never even be executed.
//
// If this block is executed, and a snapshot is needed, the repl stream will submit another
// request here shortly, and will hit the above logic where it will await the snapshot complection.
// request here shortly, and will hit the above logic where it will await the snapshot completion.
self.core.trigger_log_compaction_if_needed();
Ok(())
}

View File

@ -155,22 +155,20 @@ where
/// Perform log compaction, returning a handle to the generated snapshot.
///
/// ### `through`
/// The log should be compacted starting from entry `0` and should cover all entries through the
/// index specified by `through`, inclusively. This will always be the `commit_index` of the
/// Raft log at the time of the request.
///
/// ### implementation guide
/// See the [storage chapter of the guide](https://async-raft.github.io/async-raft/storage.html)
/// for details on how to implement this handler.
async fn do_log_compaction(&self, through: u64) -> Result<CurrentSnapshotData<Self::Snapshot>>;
/// When performing log compaction, the compaction can only cover the breadth of the log up to
/// the last applied log and under write load this value may change quickly. As such, the
/// storage implementation should export/checkpoint/snapshot its state machine, and then use
/// the value of that export's last applied log as the metadata indicating the breadth of the
/// log covered by the snapshot.
async fn do_log_compaction(&self) -> Result<CurrentSnapshotData<Self::Snapshot>>;
/// Create a new blank snapshot, returning a writable handle to the snapshot object along with
/// the ID of the snapshot.
///
/// ### implementation guide
/// See the [storage chapter of the guide]()
/// for details on how to implement this handler.
/// See the [storage chapter of the guide](https://async-raft.github.io/async-raft/storage.html)
/// for details on log compaction / snapshotting.
async fn create_snapshot(&self) -> Result<(String, Box<Self::Snapshot>)>;
/// Finalize the installation of a snapshot which has finished streaming from the cluster leader.

View File

@ -13,10 +13,12 @@ Once you're ready to begin with your implementation, be sure to adhere to the do
For inspiration, have a look at this [repo's `memstore` project](https://github.com/async-raft/async-raft/tree/master/memstore). It is an in-memory implementation of the `RaftStorage` trait, intended for demo and testing purposes.
### compaction / snapshots
This implementation of Raft automatically triggers log compaction based on runtime configuration, using the `RaftStorage::do_log_compaction` method. Additionally, the Raft leader may stream a snapshot over to other nodes if the node is new and needs to be brought up-to-speed, or if a node is lagging behind.
This implementation of Raft automatically triggers log compaction based on runtime configuration, using the `RaftStorage::do_log_compaction` method. Everything related to compaction / snapshots starts with this method. Though snapshots are originally created in the `RaftStorage::do_log_compaction` method, the Raft cluster leader may stream a snapshot over to other nodes if the node is new and needs to be brought up-to-speed, or if a node is lagging behind.
Compaction / snapshotting are not optional in this system. It is an integral component of the Raft spec, and `RaftStorage` implementations should be careful to implement the compaction / snapshotting related methods carefully according to the trait's documentation.
When performing log compaction, the compaction can only cover the breadth of the log up to the last applied log and under write load this value may change quickly. As such, the storage implementation should export/checkpoint/snapshot its state machine, and then use the value of that export's last applied log as the metadata indicating the breadth of the log covered by the snapshot.
----
There is more to learn, so let's keep going. Time to learn about the most central API of this project.

View File

@ -14,7 +14,7 @@ readme = "README.md"
[dependencies]
anyhow = "1.0.32"
async-raft = { version="0.5.0-alpha.0", path="../async-raft" }
async-raft = { version="0.6.0-alpha.0", path="../async-raft" }
serde = { version="1.0.114", features=["derive"] }
serde_json = "1.0.57"
thiserror = "1.0.20"

View File

@ -269,12 +269,13 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}
#[tracing::instrument(level = "trace", skip(self))]
async fn do_log_compaction(&self, through: u64) -> Result<CurrentSnapshotData<Self::Snapshot>> {
let data;
async fn do_log_compaction(&self) -> Result<CurrentSnapshotData<Self::Snapshot>> {
let (data, last_applied_log);
{
// Serialize the data of the state machine.
let sm = self.sm.read().await;
data = serde_json::to_vec(&*sm)?;
last_applied_log = sm.last_applied_log;
} // Release state machine read lock.
let membership_config;
@ -284,7 +285,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
membership_config = log
.values()
.rev()
.skip_while(|entry| entry.index > through)
.skip_while(|entry| entry.index > last_applied_log)
.find_map(|entry| match &entry.payload {
EntryPayload::ConfigChange(cfg) => Some(cfg.membership.clone()),
_ => None,
@ -298,14 +299,17 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
let mut log = self.log.write().await;
let mut current_snapshot = self.current_snapshot.write().await;
term = log
.get(&through)
.get(&last_applied_log)
.map(|entry| entry.term)
.ok_or_else(|| anyhow::anyhow!(ERR_INCONSISTENT_LOG))?;
*log = log.split_off(&through);
log.insert(through, Entry::new_snapshot_pointer(through, term, "".into(), membership_config.clone()));
*log = log.split_off(&last_applied_log);
log.insert(
last_applied_log,
Entry::new_snapshot_pointer(last_applied_log, term, "".into(), membership_config.clone()),
);
let snapshot = MemStoreSnapshot {
index: through,
index: last_applied_log,
term,
membership: membership_config.clone(),
data,
@ -317,7 +321,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
tracing::trace!({ snapshot_size = snapshot_bytes.len() }, "log compaction complete");
Ok(CurrentSnapshotData {
term,
index: through,
index: last_applied_log,
membership: membership_config.clone(),
snapshot: Box::new(Cursor::new(snapshot_bytes)),
})