Improve startup semantics for recovered/restarted nodes.

This adds `30s + rand_election_timeout` to a node's initial
next_election_timeout when the node has recovered state after a restart.
This helps to ensure that the node does not trigger a new election and
begin driving up the Raft term before it can even properly communicate
with peers.

Fixed a few clippy lints.
This commit is contained in:
Anthony Dodd 2021-01-08 08:09:18 -06:00
parent 886f46dbe0
commit 2ede56fb38
No known key found for this signature in database
GPG Key ID: 6E0613E0F653DBC0
14 changed files with 51 additions and 48 deletions

View File

@ -79,7 +79,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
toolchain: nightly
override: true
default: true
- name: Build | Release Mode
uses: actions-rs/cargo@v1
with:

View File

@ -4,6 +4,11 @@ This changelog follows the patterns described here: https://keepachangelog.com/e
## [unreleased]
## async-raft 0.6.0-alpha.2 && memstore 0.6.0-alpha.2
### changed
- **BREAKING:** this introduces a `RaftStorage::ShutdownError`associated type. This allows for the Raft system to differentiate between fatal storage errors which should cause the system to shutdown vs errors which should be propagated back to the client for application specific error handling. These changes only apply to the `RaftStorage::apply_entry_to_state_machine` method.
- A small change to Raft startup semantics. When a node comes online and successfully recoveres state (the node was already part of a cluster), the node will start with a 30 second election timeout, ensuring that it does not disrupt a running cluster.
## async-raft 0.6.0-alpha.1
### changed
- The `Raft` type can now be cloned. The clone is very cheap and helps to facilitate async workflows while feeding client requests and Raft RPCs into the Raft instance.

View File

@ -22,9 +22,10 @@ Blazing fast Rust, a modern consensus protocol, and a reliable async runtime —
[The guide](https://async-raft.github.io/async-raft) is the best place to get started, followed by [the docs](https://docs.rs/async-raft/latest/async_raft/) for more in-depth details.
This crate differs from other Raft implementations in that:
- It is fully reactive and embraces the async ecosystem. It is driven by actual Raft related events taking place in the system as opposed to being driven by a `tick` operation. Batching of messages during replication is still used whenever possible for maximum throughput.
- It is fully reactive and embraces the async ecosystem. It is driven by actual Raft events taking place in the system as opposed to being driven by a `tick` operation. Batching of messages during replication is still used whenever possible for maximum throughput.
- Storage and network integration is well defined via two traits `RaftStorage` & `RaftNetwork`. This provides applications maximum flexibility in being able to choose their storage and networking mediums. See the [storage](https://async-raft.github.io/async-raft/storage.html) & [network](https://async-raft.github.io/async-raft/network.html) chapters of the guide for more details.
- All interaction with the Raft node is well defined via a single public `Raft` type, which is used to spawn the Raft async task, and to interact with that task. The API for this system is clear and concise. See the [raft](https://async-raft.github.io/async-raft/raft.html) chapter in the guide.
- Log replication is fully pipelined and batched for optimal performance. Log replication also uses a congestion control mechanism to help keep nodes up-to-date as efficiently as possible.
- It fully supports dynamic cluster membership changes according to the Raft spec. See the [`dynamic membership`](https://async-raft.github.io/async-raft/dynamic-membership.html) chapter in the guide. With full support for leader stepdown, and non-voter syncing.
- Details on initial cluster formation, and how to effectively do so from an application's perspective, are discussed in the [cluster formation](https://async-raft.github.io/async-raft/cluster-formation.html) chapter in the guide.
- Automatic log compaction with snapshots, as well as snapshot streaming from the leader node to follower nodes is fully supported and configurable.

View File

@ -1,6 +1,6 @@
[package]
name = "async-raft"
version = "0.6.0-alpha.1"
version = "0.6.0-alpha.2"
edition = "2018"
authors = ["Anthony Dodd <Dodd.AnthonyJosiah@gmail.com>"]
categories = ["algorithms", "asynchronous", "data-structures"]
@ -24,7 +24,7 @@ serde = { version="1", features=["derive"] }
thiserror = "1.0.20"
tokio = { version="0.2", default-features=false, features=["fs", "io-util", "macros", "rt-core", "rt-threaded", "stream", "sync", "time"] }
tracing = "0.1"
tracing-futures = { version="0.2.4", features=["tokio"] }
tracing-futures = "0.2.4"
[dev-dependencies]
maplit = "1.0.2"

View File

@ -104,13 +104,15 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// consensus. Else, new nodes need to first be brought up-to-speed.
//
// Here, all we do is check to see which nodes still need to be synced, which determines
// we can proceed.
let diff = members.difference(&self.core.membership.members).cloned().collect::<Vec<_>>();
let awaiting = diff
.into_iter()
.filter(|new_node| match self.non_voters.get(&new_node) {
Some(node) if node.is_ready_to_join => false,
Some(_) => true,
// if we can proceed.
let mut awaiting = HashSet::new();
for new_node in members.difference(&self.core.membership.members) {
match self.non_voters.get(&new_node) {
// Node is ready to join.
Some(node) if node.is_ready_to_join => continue,
// Node has repl stream, but is not yet ready to join.
Some(_) => (),
// Node does not yet have a repl stream, spawn one.
None => {
// Spawn a replication stream for the new member. Track state as a non-voter so that it
// can be updated to be added to the cluster config once it has been brought up-to-date.
@ -123,10 +125,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
tx: None,
},
);
true
}
})
.collect::<HashSet<_>>();
}
awaiting.insert(*new_node);
}
// If there are new nodes which need to sync, then we need to wait until they are synced.
// Once they've finished, this routine will be called again to progress further.
if !awaiting.is_empty() {

View File

@ -281,7 +281,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
#[tracing::instrument(level = "trace", skip(self, req))]
pub(super) async fn client_request_post_commit(&mut self, req: ClientRequestEntry<D, R>) {
match req.tx {
// If this is a client response channel, then it means that we are dealing with
ClientOrInternalResponseTx::Client(tx) => match &req.entry.payload {
EntryPayload::Normal(inner) => match self.apply_entry_to_state_machine(&req.entry.index, &inner.data).await {
Ok(data) => {

View File

@ -75,7 +75,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
id,
snapshot,
});
return Ok(InstallSnapshotResponse { term: self.current_term });
Ok(InstallSnapshotResponse { term: self.current_term })
}
#[tracing::instrument(level = "trace", skip(self, req, offset, snapshot))]
@ -104,7 +104,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
} else {
self.snapshot_state = Some(SnapshotState::Streaming { offset, id, snapshot });
}
return Ok(InstallSnapshotResponse { term: self.current_term });
Ok(InstallSnapshotResponse { term: self.current_term })
}
/// Finalize the installation of a new snapshot.

View File

@ -190,8 +190,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.target_state = State::Leader;
}
// Else if there are other members, that can only mean that state was recovered. Become follower.
// Here we use a 30 second overhead on the initial next_election_timeout. This is because we need
// to ensure that restarted nodes don't disrupt a stable cluster by timing out and driving up their
// term before network communication is established.
else if !is_only_configured_member {
self.target_state = State::Follower;
let inst = Instant::now() + Duration::from_secs(30) + Duration::from_millis(self.config.new_rand_election_timeout());
self.next_election_timeout = Some(inst);
}
// Else, for any other condition, stay non-voter.
else {
@ -494,38 +499,22 @@ pub enum State {
impl State {
/// Check if currently in non-voter state.
pub fn is_non_voter(&self) -> bool {
if let Self::NonVoter = self {
true
} else {
false
}
matches!(self, Self::NonVoter)
}
/// Check if currently in follower state.
pub fn is_follower(&self) -> bool {
if let Self::Follower = self {
true
} else {
false
}
matches!(self, Self::Follower)
}
/// Check if currently in candidate state.
pub fn is_candidate(&self) -> bool {
if let Self::Candidate = self {
true
} else {
false
}
matches!(self, Self::Candidate)
}
/// Check if currently in leader state.
pub fn is_leader(&self) -> bool {
if let Self::Leader = self {
true
} else {
false
}
matches!(self, Self::Leader)
}
}
@ -767,6 +756,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
pub(self) async fn run(mut self) -> RaftResult<()> {
// Each iteration of the outer loop represents a new term.
loop {
if !self.core.target_state.is_candidate() {
return Ok(());
}
// Setup initial state per term.
self.votes_granted_old = 1; // We must vote for ourselves per the Raft spec.
self.votes_needed_old = ((self.core.membership.members.len() / 2) + 1) as u64; // Just need a majority.
@ -787,12 +780,12 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
let mut pending_votes = self.spawn_parallel_vote_requests();
// Inner processing loop for this Raft state.
let mut timeout_fut = delay_until(self.core.get_next_election_timeout());
loop {
if !self.core.target_state.is_candidate() {
return Ok(());
}
let mut timeout_fut = delay_until(self.core.get_next_election_timeout());
tokio::select! {
_ = &mut timeout_fut => break, // This election has timed-out. Break to outer loop, which starts a new term.
Some((res, peer)) = pending_votes.recv() => self.handle_vote_response(res, peer).await?,

View File

@ -265,7 +265,7 @@ fn calculate_new_commit_index(mut entries: Vec<u64>, current_commit: u64) -> u64
};
// Calculate offset which will give the majority slice of high-end.
entries.sort();
entries.sort_unstable();
let offset = if (len % 2) == 0 { (len / 2) - 1 } else { len / 2 };
let new_val = entries.get(offset).unwrap_or(&current_commit);
if new_val < &current_commit {
@ -338,7 +338,7 @@ mod tests {
fn $name() {
let mut entries = $entries;
let output = calculate_new_commit_index(entries.clone(), $current);
entries.sort();
entries.sort_unstable();
assert_eq!(output, $expected, "Sorted values: {:?}", entries);
}
};

View File

@ -270,7 +270,7 @@ impl RaftRouter {
node.id, node.last_log_index, expected_last_log
);
let mut members = node.membership_config.members.iter().cloned().collect::<Vec<_>>();
members.sort();
members.sort_unstable();
assert_eq!(
members, all_nodes,
"node {} has membership {:?}, expected {:?}",

View File

@ -74,6 +74,7 @@ use anyhow::Result;
#[async_trait]
impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
type Snapshot = Cursor<Vec<u8>>;
type ShutdownError = ShutdownError;
async fn get_membership_config(&self) -> Result<MembershipConfig> {
// ... snip ...

View File

@ -1,6 +1,6 @@
[package]
name = "memstore"
version = "0.2.0-alpha.0"
version = "0.2.0-alpha.2"
edition = "2018"
categories = ["algorithms", "asynchronous", "data-structures"]
description = "An in-memory implementation of the `async-raft::RaftStorage` trait."
@ -14,7 +14,7 @@ readme = "README.md"
[dependencies]
anyhow = "1.0.32"
async-raft = { version="0.6.0-alpha.0", path="../async-raft" }
async-raft = { version="0.6.0-alpha.2", path="../async-raft" }
serde = { version="1.0.114", features=["derive"] }
serde_json = "1.0.57"
thiserror = "1.0.20"

View File

@ -170,18 +170,18 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
None => (0, 0),
};
let last_applied_log = sm.last_applied_log;
return Ok(InitialState {
Ok(InitialState {
last_log_index,
last_log_term,
last_applied_log,
hard_state: inner.clone(),
membership,
});
})
}
None => {
let new = InitialState::new_initial(self.id);
*hs = Some(new.hard_state.clone());
return Ok(new);
Ok(new)
}
}
}

View File

@ -87,8 +87,10 @@ async fn test_get_initial_state_with_previous_state() -> Result<()> {
payload: EntryPayload::Blank,
},
);
let mut sm = MemStoreStateMachine::default();
sm.last_applied_log = 1; // Just stubbed in for testing.
let sm = MemStoreStateMachine {
last_applied_log: 1, // Just stubbed in for testing.
..Default::default()
};
let hs = HardState {
current_term: 1,
voted_for: Some(NODE_ID),