Compare commits

...

4 Commits

Author SHA1 Message Date
Anthony Dodd cf823a1dc3
Prep for async-raft@0.6.0 & memstore@0.2.0 releases. 2021-01-20 22:56:57 -06:00
Anthony Dodd 0af8f72f97
Merge pull request #94 from xu-cheng/tokio
Update Tokio to 1.0
2021-01-20 22:16:21 -06:00
Anthony Dodd 3d0cac739c
Merge branch 'master' into tokio 2021-01-20 21:48:24 -06:00
Cheng XU 22caeebc8c
Update Tokio to 1.0 2021-01-20 06:26:13 -08:00
16 changed files with 99 additions and 103 deletions

View File

@ -4,7 +4,22 @@ This changelog follows the patterns described here: https://keepachangelog.com/e
## [unreleased]
## async-raft 0.6.0-alpha.2 && memstore 0.2.0-alpha.2
## async-raft 0.6.0
The big news for this release is that we are now based on Tokio 1.0! Big shoutout to @xu-cheng for doing all of the heavy lifting, along with many other changes which are part of this release.
It is important to note that 0.6.0 does include two breaking changes from 0.5: the new `RaftStorage::ShutdownError` associated type, and Tokio 1.0. Both of these changes are purely code based, and it is not expected that they will negatively impact running systems.
### changed
- Updated to Tokio 1.0!
- **BREAKING:** this introduces a `RaftStorage::ShutdownError` associated type. This allows for the Raft system to differentiate between fatal storage errors which should cause the system to shutdown vs errors which should be propagated back to the client for application specific error handling. These changes only apply to the `RaftStorage::apply_entry_to_state_machine` method.
- A small change to Raft startup semantics. When a node comes online and successfully recoveres state (the node was already part of a cluster), the node will start with a 30 second election timeout, ensuring that it does not disrupt a running cluster.
- [#89](https://github.com/async-raft/async-raft/pull/89) removes the `Debug` bounds requirement on the `AppData` & `AppDataResponse` types.
- The `Raft` type can now be cloned. The clone is very cheap and helps to facilitate async workflows while feeding client requests and Raft RPCs into the Raft instance.
- The `Raft.shutdown` interface has been changed slightly. Instead of returning a `JoinHandle`, the method is now async and simply returns a result.
- The `ClientWriteError::ForwardToLeader` error variant has been modified slightly. It now exposes the data (generic type `D` of the type) of the original client request directly. This ensures that the data can actually be used for forwarding, if that is what the parent app wants to do.
- Implemented [#12](https://github.com/async-raft/async-raft/issues/12). This is a pretty old issue and a pretty solid optimization. The previous implementation of this algorithm would go to storage (typically disk) for every process of replicating entries to the state machine. Now, we are caching entries as they come in from the leader, and using only the cache as the source of data. There are a few simple measures needed to ensure this is correct, as the leader entry replication protocol takes care of most of the work for us in this case.
- Updated / clarified the interface for log compaction. See the guide or the updated `do_log_compaction` method docs for more details.
### added
- [#97](https://github.com/async-raft/async-raft/issues/97) adds the new `Raft.current_leader` method. This is a convenience method which builds upon the Raft metrics system to quickly and easily identify the current cluster leader.
@ -12,25 +27,9 @@ This changelog follows the patterns described here: https://keepachangelog.com/e
- Fixed [#98](https://github.com/async-raft/async-raft/issues/98) where heartbeats were being passed along into the log consistency check algorithm. This had the potential to cause a Raft node to go into shutdown under some circumstances.
- Fixed a bug where the timestamp of the last received heartbeat from a leader was not being stored, resulting in degraded cluster stability under some circumstances.
## memstore 0.2.0
### changed
- **BREAKING:** this introduces a `RaftStorage::ShutdownError`associated type. This allows for the Raft system to differentiate between fatal storage errors which should cause the system to shutdown vs errors which should be propagated back to the client for application specific error handling. These changes only apply to the `RaftStorage::apply_entry_to_state_machine` method.
- A small change to Raft startup semantics. When a node comes online and successfully recoveres state (the node was already part of a cluster), the node will start with a 30 second election timeout, ensuring that it does not disrupt a running cluster.
- [#89](https://github.com/async-raft/async-raft/pull/89) removes the `Debug` bounds requirement on the `AppData` & `AppDataResponse` types.
## async-raft 0.6.0-alpha.1
### changed
- The `Raft` type can now be cloned. The clone is very cheap and helps to facilitate async workflows while feeding client requests and Raft RPCs into the Raft instance.
- The `Raft.shutdown` interface has been changed slightly. Instead of returning a `JoinHandle`, the method is now async and simply returns a result.
- The `ClientWriteError::ForwardToLeader` error variant has been modified slightly. It now exposes the data (generic type `D` of the type) of the original client request directly. This ensures that the data can actually be used for forwarding, if that is what the parent app wants to do.
## memstore 0.2.0-alpha.0
### changed
- Updated async-raft dependency to `0.6.0-aplpha.0` & updated storage interface as needed.
## async-raft 0.6.0-alpha.0
### changed
- Implemented [#12](https://github.com/async-raft/async-raft/issues/12). This is a pretty old issue and a pretty solid optimization. The previous implementation of this algorithm would go to storage (typically disk) for every process of replicating entries to the state machine. Now, we are caching entries as they come in from the leader, and using only the cache as the source of data. There are a few simple measures needed to ensure this is correct, as the leader entry replication protocol takes care of most of the work for us in this case.
- Updated / clarified the interface for log compaction. See the guide or the updated `do_log_compaction` method docs for more details.
- Updated async-raft dependency to `0.6.0` & updated storage interface as needed.
### fixed
- Fixed [#76](https://github.com/async-raft/async-raft/issues/76) by moving the process of replicating log entries to the state machine off of the main task. This ensures that the process never blocks the main task. This also includes a few nice optimizations mentioned below.

View File

@ -1,6 +1,6 @@
[package]
name = "async-raft"
version = "0.6.0-alpha.2"
version = "0.6.0"
edition = "2018"
authors = ["Anthony Dodd <Dodd.AnthonyJosiah@gmail.com>"]
categories = ["algorithms", "asynchronous", "data-structures"]
@ -15,20 +15,20 @@ readme = "../README.md"
[dependencies]
anyhow = "1.0.32"
async-trait = "0.1.36"
bytes = "0.5"
bytes = "1.0"
derive_more = { version="0.99.9", default-features=false, features=["from"] }
futures = "0.3"
log = "0.4"
rand = "0.7"
serde = { version="1", features=["derive"] }
thiserror = "1.0.20"
tokio = { version="0.2", default-features=false, features=["fs", "io-util", "macros", "rt-core", "rt-threaded", "stream", "sync", "time"] }
tokio = { version="1.0", default-features=false, features=["fs", "io-util", "macros", "rt", "rt-multi-thread", "sync", "time"] }
tracing = "0.1"
tracing-futures = "0.2.4"
[dev-dependencies]
maplit = "1.0.2"
memstore = { version="0.2.0-alpha.0", path="../memstore" }
memstore = { version="0.2.0-alpha.2", path="../memstore" }
tracing-subscriber = "0.2.10"
[features]

View File

@ -2,8 +2,7 @@ use std::sync::Arc;
use anyhow::anyhow;
use futures::future::TryFutureExt;
use futures::stream::FuturesUnordered;
use tokio::stream::StreamExt;
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::sync::oneshot;
use tokio::time::{timeout, Duration};

View File

@ -11,12 +11,11 @@ use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use futures::future::{AbortHandle, Abortable};
use futures::stream::FuturesOrdered;
use futures::stream::{FuturesOrdered, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::stream::StreamExt;
use tokio::sync::{broadcast, mpsc, oneshot, watch};
use tokio::task::JoinHandle;
use tokio::time::{delay_until, Duration, Instant};
use tokio::time::{sleep_until, Duration, Instant};
use tracing_futures::Instrument;
use crate::config::{Config, SnapshotPolicy};
@ -226,7 +225,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// Report a metrics payload on the current state of the Raft node.
#[tracing::instrument(level = "trace", skip(self))]
fn report_metrics(&mut self) {
let res = self.tx_metrics.broadcast(RaftMetrics {
let res = self.tx_metrics.send(RaftMetrics {
id: self.id,
state: self.target_state,
current_term: self.current_term,
@ -380,7 +379,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
let storage = self.storage.clone();
let (handle, reg) = AbortHandle::new_pair();
let (chan_tx, _) = broadcast::channel(1);
let mut tx_compaction = self.tx_compaction.clone();
let tx_compaction = self.tx_compaction.clone();
self.snapshot_state = Some(SnapshotState::Snapshotting {
handle,
sender: chan_tx.clone(),
@ -621,7 +620,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
return Ok(());
}
tokio::select! {
Some(msg) = self.core.rx_api.next() => match msg {
Some(msg) = self.core.rx_api.recv() => match msg {
RaftMsg::AppendEntries{rpc, tx} => {
let _ = tx.send(self.core.handle_append_entries_request(rpc).await);
}
@ -647,7 +646,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
self.change_membership(members, tx).await;
}
},
Some(update) = self.core.rx_compaction.next() => self.core.update_snapshot_state(update),
Some(update) = self.core.rx_compaction.recv() => self.core.update_snapshot_state(update),
Some(Ok(res)) = self.joint_consensus_cb.next() => {
match res {
Ok(_) => self.handle_joint_consensus_committed().await?,
@ -669,7 +668,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}
}
Some(event) = self.replicationrx.next() => self.handle_replica_event(event).await,
Some(event) = self.replicationrx.recv() => self.handle_replica_event(event).await,
Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => {
// Errors herein will trigger shutdown, so no need to process error.
let _ = self.core.handle_replicate_to_sm_result(repl_sm_result);
@ -793,16 +792,15 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
let mut pending_votes = self.spawn_parallel_vote_requests();
// Inner processing loop for this Raft state.
let mut timeout_fut = delay_until(self.core.get_next_election_timeout());
loop {
if !self.core.target_state.is_candidate() {
return Ok(());
}
let timeout_fut = sleep_until(self.core.get_next_election_timeout());
tokio::select! {
_ = &mut timeout_fut => break, // This election has timed-out. Break to outer loop, which starts a new term.
_ = timeout_fut => break, // This election has timed-out. Break to outer loop, which starts a new term.
Some((res, peer)) = pending_votes.recv() => self.handle_vote_response(res, peer).await?,
Some(msg) = self.core.rx_api.next() => match msg {
Some(msg) = self.core.rx_api.recv() => match msg {
RaftMsg::AppendEntries{rpc, tx} => {
let _ = tx.send(self.core.handle_append_entries_request(rpc).await);
}
@ -828,7 +826,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
self.core.reject_config_change_not_leader(tx);
}
},
Some(update) = self.core.rx_compaction.next() => self.core.update_snapshot_state(update),
Some(update) = self.core.rx_compaction.recv() => self.core.update_snapshot_state(update),
Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => {
// Errors herein will trigger shutdown, so no need to process error.
let _ = self.core.handle_replicate_to_sm_result(repl_sm_result);
@ -862,11 +860,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
return Ok(());
}
let mut election_timeout = delay_until(self.core.get_next_election_timeout()); // Value is updated as heartbeats are received.
let election_timeout = sleep_until(self.core.get_next_election_timeout()); // Value is updated as heartbeats are received.
tokio::select! {
// If an election timeout is hit, then we need to transition to candidate.
_ = &mut election_timeout => self.core.set_target_state(State::Candidate),
Some(msg) = self.core.rx_api.next() => match msg {
_ = election_timeout => self.core.set_target_state(State::Candidate),
Some(msg) = self.core.rx_api.recv() => match msg {
RaftMsg::AppendEntries{rpc, tx} => {
let _ = tx.send(self.core.handle_append_entries_request(rpc).await);
}
@ -892,7 +890,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
self.core.reject_config_change_not_leader(tx);
}
},
Some(update) = self.core.rx_compaction.next() => self.core.update_snapshot_state(update),
Some(update) = self.core.rx_compaction.recv() => self.core.update_snapshot_state(update),
Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => {
// Errors herein will trigger shutdown, so no need to process error.
let _ = self.core.handle_replicate_to_sm_result(repl_sm_result);
@ -925,7 +923,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
return Ok(());
}
tokio::select! {
Some(msg) = self.core.rx_api.next() => match msg {
Some(msg) = self.core.rx_api.recv() => match msg {
RaftMsg::AppendEntries{rpc, tx} => {
let _ = tx.send(self.core.handle_append_entries_request(rpc).await);
}
@ -951,7 +949,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
self.core.reject_config_change_not_leader(tx);
}
},
Some(update) = self.core.rx_compaction.next() => self.core.update_snapshot_state(update),
Some(update) = self.core.rx_compaction.recv() => self.core.update_snapshot_state(update),
Some(Ok(repl_sm_result)) = self.core.replicate_to_sm_handle.next() => {
// Errors herein will trigger shutdown, so no need to process error.
let _ = self.core.handle_replicate_to_sm_result(repl_sm_result);

View File

@ -143,7 +143,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
let (tx, rx) = mpsc::channel(all_members.len());
for member in all_members.into_iter().filter(|member| member != &self.core.id) {
let rpc = VoteRequest::new(self.core.current_term, self.core.id, self.core.last_log_index, self.core.last_log_term);
let (network, mut tx_inner) = (self.core.network.clone(), tx.clone());
let (network, tx_inner) = (self.core.network.clone(), tx.clone());
let _ = tokio::spawn(
async move {
match network.vote(member, rpc).await {

View File

@ -3,8 +3,8 @@
use std::io::SeekFrom;
use std::sync::Arc;
use futures::future::FutureExt;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use tokio::stream::StreamExt;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time::{interval, timeout, Duration, Interval};
@ -398,8 +398,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
}
}
// Attempt to unpack the next event for the next loop iteration.
if let Ok(event) = self.raftrx.try_recv() {
event_opt = Some(event);
if let Some(event) = self.raftrx.recv().now_or_never() {
event_opt = event;
}
iters += 1;
}
@ -551,8 +551,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
continue;
}
tokio::select! {
_ = self.core.heartbeat.next() => self.core.send_append_entries().await,
event = self.core.raftrx.next() => match event {
_ = self.core.heartbeat.tick() => self.core.send_append_entries().await,
event = self.core.raftrx.recv() => match event {
Some(event) => self.core.drain_raftrx(event),
None => self.core.target_state = TargetReplState::Shutdown,
}
@ -631,7 +631,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
// Check raft channel to ensure we are staying up-to-date, then loop.
if let Ok(event) = self.core.raftrx.try_recv() {
if let Some(Some(event)) = self.core.raftrx.recv().now_or_never() {
self.core.drain_raftrx(event);
}
}
@ -753,8 +753,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
async fn wait_for_snapshot(&mut self, mut rx: oneshot::Receiver<CurrentSnapshotData<S::Snapshot>>) {
loop {
tokio::select! {
_ = self.core.heartbeat.next() => self.core.send_append_entries().await,
event = self.core.raftrx.next() => match event {
_ = self.core.heartbeat.tick() => self.core.send_append_entries().await,
event = self.core.raftrx.recv() => match event {
Some(event) => self.core.drain_raftrx(event),
None => {
self.core.target_state = TargetReplState::Shutdown;
@ -833,7 +833,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
offset += nread as u64;
// Check raft channel to ensure we are staying up-to-date, then loop.
if let Ok(event) = self.core.raftrx.try_recv() {
if let Some(Some(event)) = self.core.raftrx.recv().now_or_never() {
self.core.drain_raftrx(event);
}
}

View File

@ -5,7 +5,7 @@ use std::time::Duration;
use anyhow::Result;
use async_raft::Config;
use tokio::time::delay_for;
use tokio::time::sleep;
use fixtures::RaftRouter;
@ -18,7 +18,7 @@ use fixtures::RaftRouter;
/// - call the client_read interface on the followers, and assert failure.
///
/// RUST_LOG=async_raft,memstore,client_reads=trace cargo test -p async-raft --test client_reads
#[tokio::test(core_threads = 4)]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn client_reads() -> Result<()> {
fixtures::init_tracing();
@ -30,13 +30,13 @@ async fn client_reads() -> Result<()> {
router.new_raft_node(2).await;
// Assert all nodes are in non-voter state & have no entries.
delay_for(Duration::from_secs(10)).await;
sleep(Duration::from_secs(10)).await;
router.assert_pristine_cluster().await;
// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
delay_for(Duration::from_secs(10)).await;
sleep(Duration::from_secs(10)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
// Get the ID of the leader, and assert that client_read succeeds.

View File

@ -8,7 +8,7 @@ use async_raft::raft::MembershipConfig;
use async_raft::Config;
use futures::prelude::*;
use maplit::hashset;
use tokio::time::delay_for;
use tokio::time::sleep;
use fixtures::RaftRouter;
@ -21,7 +21,7 @@ use fixtures::RaftRouter;
/// - assert that the cluster stayed stable and has all of the expected data.
///
/// RUST_LOG=async_raft,memstore,client_writes=trace cargo test -p async-raft --test client_writes
#[tokio::test(core_threads = 4)]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn client_writes() -> Result<()> {
fixtures::init_tracing();
@ -33,13 +33,13 @@ async fn client_writes() -> Result<()> {
router.new_raft_node(2).await;
// Assert all nodes are in non-voter state & have no entries.
delay_for(Duration::from_secs(10)).await;
sleep(Duration::from_secs(10)).await;
router.assert_pristine_cluster().await;
// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
delay_for(Duration::from_secs(10)).await;
sleep(Duration::from_secs(10)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
// Write a bunch of data and assert that the cluster stayes stable.
@ -52,7 +52,7 @@ async fn client_writes() -> Result<()> {
clients.push(router.client_request_many(leader, "4", 1000));
clients.push(router.client_request_many(leader, "5", 1000));
while clients.next().await.is_some() {}
delay_for(Duration::from_secs(5)).await; // Ensure enough time is given for replication (this is WAY more than enough).
sleep(Duration::from_secs(5)).await; // Ensure enough time is given for replication (this is WAY more than enough).
router.assert_stable_cluster(Some(1), Some(6001)).await; // The extra 1 is from the leader's initial commit entry.
router
.assert_storage_state(

View File

@ -7,7 +7,7 @@ use anyhow::Result;
use async_raft::raft::MembershipConfig;
use async_raft::{Config, SnapshotPolicy};
use maplit::hashset;
use tokio::time::delay_for;
use tokio::time::sleep;
use fixtures::RaftRouter;
@ -20,7 +20,7 @@ use fixtures::RaftRouter;
/// - add new nodes and assert that they receive the snapshot.
///
/// RUST_LOG=async_raft,memstore,compaction=trace cargo test -p async-raft --test compaction
#[tokio::test(core_threads = 4)]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn compaction() -> Result<()> {
fixtures::init_tracing();
@ -35,18 +35,18 @@ async fn compaction() -> Result<()> {
router.new_raft_node(0).await;
// Assert all nodes are in non-voter state & have no entries.
delay_for(Duration::from_secs(10)).await;
sleep(Duration::from_secs(10)).await;
router.assert_pristine_cluster().await;
// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
delay_for(Duration::from_secs(10)).await;
sleep(Duration::from_secs(10)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
// Send enough requests to the cluster that compaction on the node should be triggered.
router.client_request_many(0, "0", 499).await; // Puts us exactly at the configured snapshot policy threshold.
delay_for(Duration::from_secs(5)).await; // Wait to ensure there is enough time for a snapshot to be built (this is way more than enough).
sleep(Duration::from_secs(5)).await; // Wait to ensure there is enough time for a snapshot to be built (this is way more than enough).
router.assert_stable_cluster(Some(1), Some(500)).await;
router
.assert_storage_state(
@ -72,7 +72,7 @@ async fn compaction() -> Result<()> {
.change_membership(0, hashset![0, 1])
.await
.expect("failed to modify cluster membership");
delay_for(Duration::from_secs(5)).await; // Wait to ensure metrics are updated (this is way more than enough).
sleep(Duration::from_secs(5)).await; // Wait to ensure metrics are updated (this is way more than enough).
router.assert_stable_cluster(Some(1), Some(502)).await; // We expect index to be 500 + 2 (joint & uniform config change entries).
let expected_snap = Some((
500.into(),

View File

@ -5,7 +5,7 @@ use std::time::Duration;
use anyhow::Result;
use async_raft::Config;
use tokio::time::delay_for;
use tokio::time::sleep;
use fixtures::RaftRouter;
@ -17,7 +17,7 @@ use fixtures::RaftRouter;
/// - call the current_leader interface on the all nodes, and assert success.
///
/// RUST_LOG=async_raft,memstore,client_reads=trace cargo test -p async-raft --test current_leader
#[tokio::test(core_threads = 4)]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn current_leader() -> Result<()> {
fixtures::init_tracing();
@ -29,16 +29,16 @@ async fn current_leader() -> Result<()> {
router.new_raft_node(2).await;
// Assert all nodes are in non-voter state & have no entries.
delay_for(Duration::from_secs(10)).await;
sleep(Duration::from_secs(10)).await;
router.assert_pristine_cluster().await;
// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
delay_for(Duration::from_secs(10)).await;
sleep(Duration::from_secs(10)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
// Get the ID of the leader, and assert that client_read succeeds.
// Get the ID of the leader, and assert that current_leader succeeds.
let leader = router.leader().await.expect("leader not found");
assert_eq!(leader, 0, "expected leader to be node 0, got {}", leader);

View File

@ -5,9 +5,9 @@ use std::time::Duration;
use anyhow::Result;
use async_raft::Config;
use futures::stream::StreamExt;
use maplit::hashset;
use tokio::stream::StreamExt;
use tokio::time::delay_for;
use tokio::time::sleep;
use fixtures::RaftRouter;
@ -22,7 +22,7 @@ use fixtures::RaftRouter;
/// - restore the isolated node and assert that it becomes a follower.
///
/// RUST_LOG=async_raft,memstore,dynamic_membership=trace cargo test -p async-raft --test dynamic_membership
#[tokio::test(core_threads = 6)]
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
async fn dynamic_membership() -> Result<()> {
fixtures::init_tracing();
@ -32,13 +32,13 @@ async fn dynamic_membership() -> Result<()> {
router.new_raft_node(0).await;
// Assert all nodes are in non-voter state & have no entries.
delay_for(Duration::from_secs(3)).await;
sleep(Duration::from_secs(3)).await;
router.assert_pristine_cluster().await;
// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
delay_for(Duration::from_secs(3)).await;
sleep(Duration::from_secs(3)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
// Sync some new nodes.
@ -57,20 +57,20 @@ async fn dynamic_membership() -> Result<()> {
}
tracing::info!("--- changing cluster config");
router.change_membership(0, hashset![0, 1, 2, 3, 4]).await?;
delay_for(Duration::from_secs(5)).await;
sleep(Duration::from_secs(5)).await;
router.assert_stable_cluster(Some(1), Some(3)).await; // Still in term 1, so leader is still node 0.
// Isolate old leader and assert that a new leader takes over.
tracing::info!("--- isolating master node 0");
router.isolate_node(0).await;
delay_for(Duration::from_secs(5)).await; // Wait for election and for everything to stabilize (this is way longer than needed).
sleep(Duration::from_secs(5)).await; // Wait for election and for everything to stabilize (this is way longer than needed).
router.assert_stable_cluster(Some(2), Some(4)).await;
let leader = router.leader().await.expect("expected new leader");
assert!(leader != 0, "expected new leader to be different from the old leader");
// Restore isolated node.
router.restore_node(0).await;
delay_for(Duration::from_secs(5)).await; // Wait for election and for everything to stabilize (this is way longer than needed).
sleep(Duration::from_secs(5)).await; // Wait for election and for everything to stabilize (this is way longer than needed).
router.assert_stable_cluster(Some(2), Some(4)).await; // We should still be in term 2, as leaders should
// not be deposed when they are not missing heartbeats.
let current_leader = router.leader().await.expect("expected to find current leader");

View File

@ -5,7 +5,7 @@ use std::time::Duration;
use anyhow::Result;
use async_raft::Config;
use tokio::time::delay_for;
use tokio::time::sleep;
use fixtures::RaftRouter;
@ -21,7 +21,7 @@ use fixtures::RaftRouter;
/// followers have successfully replicated the payload.
///
/// RUST_LOG=async_raft,memstore,initialization=trace cargo test -p async-raft --test initialization
#[tokio::test(core_threads = 4)]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn initialization() -> Result<()> {
fixtures::init_tracing();
@ -33,13 +33,13 @@ async fn initialization() -> Result<()> {
router.new_raft_node(2).await;
// Assert all nodes are in non-voter state & have no entries.
delay_for(Duration::from_secs(10)).await;
sleep(Duration::from_secs(10)).await;
router.assert_pristine_cluster().await;
// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
delay_for(Duration::from_secs(10)).await;
sleep(Duration::from_secs(10)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
Ok(())

View File

@ -5,7 +5,7 @@ use std::time::Duration;
use anyhow::{anyhow, Result};
use async_raft::Config;
use tokio::time::delay_for;
use tokio::time::sleep;
use fixtures::RaftRouter;
@ -18,7 +18,7 @@ use fixtures::RaftRouter;
/// on each node, asserting that the shutdown routine succeeded.
///
/// RUST_LOG=async_raft,memstore,shutdown=trace cargo test -p async-raft --test shutdown
#[tokio::test(core_threads = 4)]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn initialization() -> Result<()> {
fixtures::init_tracing();
@ -30,13 +30,13 @@ async fn initialization() -> Result<()> {
router.new_raft_node(2).await;
// Assert all nodes are in non-voter state & have no entries.
delay_for(Duration::from_secs(10)).await;
sleep(Duration::from_secs(10)).await;
router.assert_pristine_cluster().await;
// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
delay_for(Duration::from_secs(10)).await;
sleep(Duration::from_secs(10)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
tracing::info!("--- performing node shutdowns");

View File

@ -5,7 +5,7 @@ use std::time::Duration;
use anyhow::Result;
use async_raft::Config;
use tokio::time::delay_for;
use tokio::time::sleep;
use fixtures::RaftRouter;
@ -20,7 +20,7 @@ use fixtures::RaftRouter;
/// - asserts that the leader was able to successfully commit its initial payload.
///
/// RUST_LOG=async_raft,memstore,singlenode=trace cargo test -p async-raft --test singlenode
#[tokio::test(core_threads = 4)]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn singlenode() -> Result<()> {
fixtures::init_tracing();
@ -30,13 +30,13 @@ async fn singlenode() -> Result<()> {
router.new_raft_node(0).await;
// Assert all nodes are in non-voter state & have no entries.
delay_for(Duration::from_secs(10)).await;
sleep(Duration::from_secs(10)).await;
router.assert_pristine_cluster().await;
// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
delay_for(Duration::from_secs(10)).await;
sleep(Duration::from_secs(10)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
// Write some data to the single node cluster.

View File

@ -6,7 +6,7 @@ use std::time::Duration;
use anyhow::Result;
use async_raft::{Config, State};
use maplit::hashset;
use tokio::time::delay_for;
use tokio::time::sleep;
use fixtures::RaftRouter;
@ -20,7 +20,7 @@ use fixtures::RaftRouter;
/// after the config change is committed.
///
/// RUST_LOG=async_raft,memstore,stepdown=trace cargo test -p async-raft --test stepdown
#[tokio::test(core_threads = 5)]
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn stepdown() -> Result<()> {
fixtures::init_tracing();
@ -31,13 +31,13 @@ async fn stepdown() -> Result<()> {
router.new_raft_node(1).await;
// Assert all nodes are in non-voter state & have no entries.
delay_for(Duration::from_secs(3)).await;
sleep(Duration::from_secs(3)).await;
router.assert_pristine_cluster().await;
// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
delay_for(Duration::from_secs(3)).await;
sleep(Duration::from_secs(3)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
// Submit a config change which adds two new nodes and removes the current leader.
@ -46,7 +46,7 @@ async fn stepdown() -> Result<()> {
router.new_raft_node(2).await;
router.new_raft_node(3).await;
router.change_membership(orig_leader, hashset![1, 2, 3]).await?;
delay_for(Duration::from_secs(5)).await; // Give time for step down metrics to flow through.
sleep(Duration::from_secs(5)).await; // Give time for step down metrics to flow through.
// Assert on the state of the old leader.
{
@ -84,7 +84,7 @@ async fn stepdown() -> Result<()> {
// Assert that the current cluster is stable.
let _ = router.remove_node(0).await;
delay_for(Duration::from_secs(5)).await; // Give time for a new leader to be elected.
sleep(Duration::from_secs(5)).await; // Give time for a new leader to be elected.
router.assert_stable_cluster(Some(2), Some(4)).await;
router.assert_storage_state(2, 4, None, 0, None).await;
// ----------------------------------- ^^^ this is `0` instead of `4` because blank payloads from new leaders

View File

@ -1,6 +1,6 @@
[package]
name = "memstore"
version = "0.2.0-alpha.2"
version = "0.2.0"
edition = "2018"
categories = ["algorithms", "asynchronous", "data-structures"]
description = "An in-memory implementation of the `async-raft::RaftStorage` trait."
@ -14,11 +14,11 @@ readme = "README.md"
[dependencies]
anyhow = "1.0.32"
async-raft = { version="0.6.0-alpha.2", path="../async-raft" }
async-raft = { version="0.6", path="../async-raft" }
serde = { version="1.0.114", features=["derive"] }
serde_json = "1.0.57"
thiserror = "1.0.20"
tokio = { version="0.2.22", default-features=false, features=["sync"] }
tokio = { version="1.0", default-features=false, features=["sync"] }
tracing = "0.1.17"
tracing-futures = "0.2.4"