Merge pull request #102 from async-raft/98-race-condition-for-heartbeats-and-client-read

Ensure heartbeats are never processed by log consistency algorithm
This commit is contained in:
Anthony Dodd 2021-01-20 08:02:44 -06:00 committed by GitHub
commit cd3eb346f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 65 additions and 44 deletions

View File

@ -5,6 +5,10 @@ This changelog follows the patterns described here: https://keepachangelog.com/e
## [unreleased]
## async-raft 0.6.0-alpha.2 && memstore 0.6.0-alpha.2
### fixed
- Fixed [#98](https://github.com/async-raft/async-raft/issues/98) where heartbeats were being passed along into the log consistency check algorithm. This had the potential to cause a Raft node to go into shutdown under some circumstances.
- Fixed a bug where the timestamp of the last received heartbeat from a leader was not being stored, resulting in degraded cluster stability under some circumstances.
### changed
- **BREAKING:** this introduces a `RaftStorage::ShutdownError`associated type. This allows for the Raft system to differentiate between fatal storage errors which should cause the system to shutdown vs errors which should be propagated back to the client for application specific error handling. These changes only apply to the `RaftStorage::apply_entry_to_state_machine` method.
- A small change to Raft startup semantics. When a node comes online and successfully recoveres state (the node was already part of a cluster), the node will start with a 30 second election timeout, ensuring that it does not disrupt a running cluster.

View File

@ -23,7 +23,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}
// Update election timeout.
self.update_next_election_timeout();
self.update_next_election_timeout(true);
let mut report_metrics = false;
self.commit_index = msg.leader_commit; // The value for `self.commit_index` is only updated here when not the leader.
@ -45,25 +45,24 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.set_target_state(State::Follower);
}
// If this is just a heartbeat, then respond.
if msg.entries.is_empty() {
self.replicate_to_state_machine_if_needed(msg.entries).await;
if report_metrics {
self.report_metrics();
}
return Ok(AppendEntriesResponse {
term: self.current_term,
success: true,
conflict_opt: None,
});
}
// If RPC's `prev_log_index` is 0, or the RPC's previous log info matches the local
// log info, then replication is g2g.
let msg_prev_index_is_min = msg.prev_log_index == u64::min_value();
let msg_index_and_term_match = (msg.prev_log_index == self.last_log_index) && (msg.prev_log_term == self.last_log_term);
if msg_prev_index_is_min || msg_index_and_term_match {
// If this is just a heartbeat, then respond.
if msg.entries.is_empty() {
self.replicate_to_state_machine_if_needed(msg.entries).await;
if report_metrics {
self.report_metrics();
}
return Ok(AppendEntriesResponse {
term: self.current_term,
success: true,
conflict_opt: None,
});
}
// Else, append log entries.
self.append_log_entries(&msg.entries).await?;
self.replicate_to_state_machine_if_needed(msg.entries).await;
if report_metrics {

View File

@ -21,7 +21,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}
// Update election timeout.
self.update_next_election_timeout();
self.update_next_election_timeout(true);
// Update current term if needed.
let mut report_metrics = false;

View File

@ -58,8 +58,10 @@ pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftSt
commit_index: u64,
/// The index of the highest log entry which has been applied to the local state machine.
///
/// Is initialized to 0, increases following the `commit_index` as logs are
/// applied to the state machine (via the storage interface).
/// Is initialized to 0 for a pristine node; else, for nodes with existing state it is
/// is initialized to the value returned from the `RaftStorage::get_initial_state` on startup.
/// This value increases following the `commit_index` as logs are applied to the state
/// machine (via the storage interface).
last_applied: u64,
/// The current term.
///
@ -271,9 +273,15 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}
/// Set a value for the next election timeout.
///
/// If `heartbeat=true`, then also update the value of `last_heartbeat`.
#[tracing::instrument(level = "trace", skip(self))]
fn update_next_election_timeout(&mut self) {
self.next_election_timeout = Some(Instant::now() + Duration::from_millis(self.config.new_rand_election_timeout()));
fn update_next_election_timeout(&mut self, heartbeat: bool) {
let now = Instant::now();
self.next_election_timeout = Some(now + Duration::from_millis(self.config.new_rand_election_timeout()));
if heartbeat {
self.last_heartbeat = Some(now);
}
}
/// Update the value of the `current_leader` property.
@ -359,7 +367,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
return;
}
// If we are below the threshold, then there is nothing to do.
if (self.last_applied - self.snapshot_index) < *threshold {
let is_below_threshold = self
.last_applied
.checked_sub(self.snapshot_index)
.map(|diff| diff < *threshold)
.unwrap_or(false);
if is_below_threshold {
return;
}
@ -769,7 +782,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
// Setup new term.
self.core.update_next_election_timeout(); // Generates a new rand value within range.
self.core.update_next_election_timeout(false); // Generates a new rand value within range.
self.core.current_term += 1;
self.core.voted_for = Some(self.core.id);
self.core.update_current_leader(UpdateCurrentLeader::Unknown);

View File

@ -43,7 +43,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// do vote checking after this.
if msg.term > self.current_term {
self.update_current_term(msg.term, None);
self.update_next_election_timeout();
self.update_next_election_timeout(false);
self.set_target_state(State::Follower);
self.save_hard_state().await?;
}
@ -62,6 +62,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
});
}
// TODO: add hook for PreVote optimization here. If the RPC is a PreVote, then at this
// point we can respond to the candidate telling them that we would vote for them.
// Candidate's log is up-to-date so handle voting conditions.
match &self.voted_for {
// This node has already voted for the candidate.
@ -78,7 +81,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
None => {
self.voted_for = Some(msg.candidate_id);
self.set_target_state(State::Follower);
self.update_next_election_timeout();
self.update_next_election_timeout(false);
self.save_hard_state().await?;
tracing::trace!({candidate=msg.candidate_id, msg.term}, "voted for candidate");
Ok(VoteResponse {

View File

@ -246,7 +246,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
// If running at line rate, and our buffered outbound requests have accumulated too
// much, we need to purge and transition to a lagging state. The target is not able to
// replicate data fast enough.
if (self.last_log_index - self.match_index) > self.config.replication_lag_threshold {
let is_lagging = self
.last_log_index
.checked_sub(self.match_index)
.map(|diff| diff > self.config.replication_lag_threshold)
.unwrap_or(false);
if is_lagging {
self.target_state = TargetReplState::Lagging;
}
}
@ -345,7 +350,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
pub(self) fn needs_snapshot(&self) -> bool {
match &self.config.snapshot_policy {
SnapshotPolicy::LogsSinceLast(threshold) => {
if self.commit_index > self.match_index && (self.commit_index - self.match_index) >= *threshold {
let needs_snap = self
.commit_index
.checked_sub(self.match_index)
.map(|diff| diff >= *threshold)
.unwrap_or(false);
if needs_snap {
tracing::trace!("snapshot needed");
true
} else {
@ -767,7 +777,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn stream_snapshot(&mut self, mut snapshot: CurrentSnapshotData<S::Snapshot>) -> RaftResult<()> {
let mut offset = 0;
self.core.last_log_index = snapshot.index;
self.core.next_index = snapshot.index + 1;
self.core.match_index = snapshot.index;
self.core.match_term = snapshot.term;

View File

@ -74,22 +74,15 @@ async fn compaction() -> Result<()> {
.expect("failed to modify cluster membership");
delay_for(Duration::from_secs(5)).await; // Wait to ensure metrics are updated (this is way more than enough).
router.assert_stable_cluster(Some(1), Some(502)).await; // We expect index to be 500 + 2 (joint & uniform config change entries).
router
.assert_storage_state(
1,
502,
None,
500,
Some((
500.into(),
1,
MembershipConfig {
members: hashset![0u64],
members_after_consensus: None,
},
)),
)
.await;
let expected_snap = Some((
500.into(),
1,
MembershipConfig {
members: hashset![0u64],
members_after_consensus: None,
},
));
router.assert_storage_state(1, 502, None, 500, expected_snap).await;
// -------------------------------- ^^^^ this value is None because non-voters do not vote.
Ok(())

View File

@ -46,7 +46,7 @@ async fn stepdown() -> Result<()> {
router.new_raft_node(2).await;
router.new_raft_node(3).await;
router.change_membership(orig_leader, hashset![1, 2, 3]).await?;
delay_for(Duration::from_secs(1)).await; // Give time for step down metrics to flow through.
delay_for(Duration::from_secs(5)).await; // Give time for step down metrics to flow through.
// Assert on the state of the old leader.
{