Round two on fixing #79

Added some tests to verify the fix and guard against regressions.

closes #79
This commit is contained in:
Anthony Dodd 2020-11-04 22:58:44 -06:00
parent f308f8c255
commit 3f47347c71
No known key found for this signature in database
GPG Key ID: 6E0613E0F653DBC0
9 changed files with 132 additions and 50 deletions

View File

@ -5,57 +5,74 @@ jobs:
name: build async-raft
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
- name: Setup | Checkout
uses: actions/checkout@v2
- name: Setup | Toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: 1.45.2
default: true
components: clippy
# clippy
- uses: actions-rs/cargo@v1
- name: Clippy
uses: actions-rs/cargo@v1
with:
command: clippy
args: -p async-raft -- -D warnings
# unit tests
- uses: actions-rs/cargo@v1
- name: Unit Tests
uses: actions-rs/cargo@v1
with:
command: test
args: -p async-raft --lib
# integration tests
- uses: actions-rs/cargo@v1
- name: Integration Test | Initialization
uses: actions-rs/cargo@v1
with:
command: test
args: -p async-raft --test initialization
- uses: actions-rs/cargo@v1
- name: Integration Test | Client Reads
uses: actions-rs/cargo@v1
with:
command: test
args: -p async-raft --test client_reads
- uses: actions-rs/cargo@v1
- name: Integration Test | Client Writes
uses: actions-rs/cargo@v1
with:
command: test
args: -p async-raft --test client_writes
- uses: actions-rs/cargo@v1
- name: Integration Test | Singlenode
uses: actions-rs/cargo@v1
with:
command: test
args: -p async-raft --test singlenode
- uses: actions-rs/cargo@v1
- name: Integration Test | Dynamic Membership
uses: actions-rs/cargo@v1
with:
command: test
args: -p async-raft --test dynamic_membership
- uses: actions-rs/cargo@v1
- name: Integration Test | Compaction
uses: actions-rs/cargo@v1
with:
command: test
args: -p async-raft --test compaction
- uses: actions-rs/cargo@v1
- name: Integration Test | Stepdown
uses: actions-rs/cargo@v1
with:
command: test
args: -p async-raft --test stepdown
- name: Integration Test | Shutdown
uses: actions-rs/cargo@v1
with:
command: test
args: -p async-raft --test shutdown
# release build
- uses: actions-rs/cargo@v1
- name: Build | Release Mode
uses: actions-rs/cargo@v1
with:
command: build
args: -p async-raft --release
@ -64,12 +81,15 @@ jobs:
name: build async-raft nightly
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
- name: Setup | Checkout
uses: actions/checkout@v2
- name: Setup | Toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: nightly
override: true
- uses: actions-rs/cargo@v1
- name: Build | Release Mode
uses: actions-rs/cargo@v1
with:
command: build
args: -p async-raft --release --all-features
@ -78,27 +98,32 @@ jobs:
name: build memstore
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
- name: Setup | Checkout
uses: actions/checkout@v2
- name: Setup | Toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: 1.45.2
default: true
components: clippy
# clippy
- uses: actions-rs/cargo@v1
- name: Clippy
uses: actions-rs/cargo@v1
with:
command: clippy
args: -p memstore -- -D warnings
# unit tests
- uses: actions-rs/cargo@v1
- name: Unit Tests
uses: actions-rs/cargo@v1
with:
command: test
args: -p memstore
# release build
- uses: actions-rs/cargo@v1
- name: Build | Release Mode
uses: actions-rs/cargo@v1
with:
command: build
args: -p memstore --release

View File

@ -10,13 +10,15 @@ jobs:
deploy:
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v2
- name: mdbook
- name: Setup | Checkout
uses: actions/checkout@v2
- name: Setup | mdbook
uses: peaceiris/actions-mdbook@v1
with:
mdbook-version: '0.4.2'
- run: mdbook build
- name: deploy
- name: Build
run: mdbook build
- name: Deploy
uses: peaceiris/actions-gh-pages@v3
with:
github_token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -4,6 +4,10 @@ This changelog follows the patterns described here: https://keepachangelog.com/e
## [unreleased]
## 0.5.3
### fixed
- Fixed [#79](https://github.com/async-raft/async-raft/issues/79) ... for real this time! Add an integration test to prove it.
## 0.5.2
### fixed
- Fixed [#79](https://github.com/async-raft/async-raft/issues/79). The Raft core state machine was not being properly updated in response to shutdown requests. That has been addressed and shutdowns are now behaving as expected.

View File

@ -1,6 +1,6 @@
[package]
name = "async-raft"
version = "0.5.2"
version = "0.5.3"
edition = "2018"
authors = ["Anthony Dodd <Dodd.AnthonyJosiah@gmail.com>"]
categories = ["algorithms", "asynchronous", "data-structures"]

View File

@ -8,7 +8,6 @@ pub(crate) mod replication;
mod vote;
use std::collections::{BTreeMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use futures::future::{AbortHandle, Abortable};
@ -90,22 +89,18 @@ pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftSt
/// The duration until the next election timeout.
next_election_timeout: Option<Instant>,
/// An atomic bool indicating if this node needs to shutdown.
///
/// This is only used from the `Raft` handle.
needs_shutdown: Arc<AtomicBool>,
tx_compaction: mpsc::Sender<SnapshotUpdate>,
rx_compaction: mpsc::Receiver<SnapshotUpdate>,
rx_api: mpsc::UnboundedReceiver<RaftMsg<D, R>>,
tx_metrics: watch::Sender<RaftMetrics>,
rx_shutdown: oneshot::Receiver<()>,
}
impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> RaftCore<D, R, N, S> {
pub(crate) fn spawn(
id: NodeId, config: Arc<Config>, network: Arc<N>, storage: Arc<S>, rx_api: mpsc::UnboundedReceiver<RaftMsg<D, R>>,
tx_metrics: watch::Sender<RaftMetrics>, needs_shutdown: Arc<AtomicBool>,
tx_metrics: watch::Sender<RaftMetrics>, rx_shutdown: oneshot::Receiver<()>,
) -> JoinHandle<RaftResult<()>> {
let membership = MembershipConfig::new_initial(id); // This is updated from storage in the main loop.
let (tx_compaction, rx_compaction) = mpsc::channel(1);
@ -131,7 +126,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
rx_compaction,
rx_api,
tx_metrics,
needs_shutdown,
rx_shutdown,
};
tokio::spawn(this.main())
}
@ -183,15 +178,15 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// controllers and simply awaits the delegated loop to return, which will only take place
// if some error has been encountered, or if a state change is required.
loop {
if self.needs_shutdown.load(Ordering::SeqCst) {
self.set_target_state(State::Shutdown);
}
match &self.target_state {
State::Leader => LeaderState::new(&mut self).run().await?,
State::Candidate => CandidateState::new(&mut self).run().await?,
State::Follower => FollowerState::new(&mut self).run().await?,
State::NonVoter => NonVoterState::new(&mut self).run().await?,
State::Shutdown => return Ok(()),
State::Shutdown => {
tracing::info!("node has shutdown");
return Ok(());
}
}
}
}
@ -571,7 +566,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
self.commit_initial_leader_entry().await?;
loop {
if !self.core.target_state.is_leader() || self.core.needs_shutdown.load(Ordering::SeqCst) {
if !self.core.target_state.is_leader() {
for node in self.nodes.values() {
let _ = node.replstream.repltx.send(RaftEvent::Terminate);
}
@ -630,6 +625,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}
Some(event) = self.replicationrx.next() => self.handle_replica_event(event).await,
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
}
}
}
@ -745,7 +741,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Inner processing loop for this Raft state.
loop {
if !self.core.target_state.is_candidate() || self.core.needs_shutdown.load(Ordering::SeqCst) {
if !self.core.target_state.is_candidate() {
return Ok(());
}
@ -780,6 +776,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
},
Some(update) = self.core.rx_compaction.next() => self.core.update_snapshot_state(update),
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
}
}
}
@ -804,7 +801,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
pub(self) async fn run(self) -> RaftResult<()> {
self.core.report_metrics();
loop {
if !self.core.target_state.is_follower() || self.core.needs_shutdown.load(Ordering::SeqCst) {
if !self.core.target_state.is_follower() {
return Ok(());
}
@ -839,6 +836,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
},
Some(update) = self.core.rx_compaction.next() => self.core.update_snapshot_state(update),
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
}
}
}
@ -862,7 +860,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
pub(self) async fn run(mut self) -> RaftResult<()> {
self.core.report_metrics();
loop {
if !self.core.target_state.is_non_voter() || self.core.needs_shutdown.load(Ordering::SeqCst) {
if !self.core.target_state.is_non_voter() {
return Ok(());
}
tokio::select! {
@ -893,6 +891,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
},
Some(update) = self.core.rx_compaction.next() => self.core.update_snapshot_state(update),
Ok(_) = &mut self.core.rx_shutdown => self.core.set_target_state(State::Shutdown),
}
}
}

View File

@ -1,7 +1,6 @@
//! Public Raft interface and data types.
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use serde::{Deserialize, Serialize};
@ -35,7 +34,7 @@ pub struct Raft<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorag
tx_api: mpsc::UnboundedSender<RaftMsg<D, R>>,
rx_metrics: watch::Receiver<RaftMetrics>,
raft_handle: JoinHandle<RaftResult<()>>,
needs_shutdown: Arc<AtomicBool>,
tx_shutdown: oneshot::Sender<()>,
marker_n: std::marker::PhantomData<N>,
marker_s: std::marker::PhantomData<S>,
}
@ -62,13 +61,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
pub fn new(id: NodeId, config: Arc<Config>, network: Arc<N>, storage: Arc<S>) -> Self {
let (tx_api, rx_api) = mpsc::unbounded_channel();
let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id));
let needs_shutdown = Arc::new(AtomicBool::new(false));
let raft_handle = RaftCore::spawn(id, config, network, storage, rx_api, tx_metrics, needs_shutdown.clone());
let (tx_shutdown, rx_shutdown) = oneshot::channel();
let raft_handle = RaftCore::spawn(id, config, network, storage, rx_api, tx_metrics, rx_shutdown);
Self {
tx_api,
rx_metrics,
raft_handle,
needs_shutdown,
tx_shutdown,
marker_n: std::marker::PhantomData,
marker_s: std::marker::PhantomData,
}
@ -247,7 +246,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// Shutdown this Raft node, returning its join handle.
pub fn shutdown(self) -> tokio::task::JoinHandle<RaftResult<()>> {
self.needs_shutdown.store(true, Ordering::SeqCst);
let _ = self.tx_shutdown.send(());
self.raft_handle
}
}

View File

@ -65,11 +65,13 @@ impl RaftRouter {
}
/// Remove the target node from the routing table & isolation.
pub async fn remove_node(&self, id: NodeId) {
pub async fn remove_node(&self, id: NodeId) -> Option<(MemRaft, Arc<MemStore>)> {
let mut rt = self.routing_table.write().await;
rt.remove(&id);
let opt_handles = rt.remove(&id);
let mut isolated = self.isolated_nodes.write().await;
isolated.remove(&id);
opt_handles
}
/// Initialize all nodes based on the config in the routing table.

View File

@ -0,0 +1,51 @@
mod fixtures;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Result};
use async_raft::Config;
use tokio::time::delay_for;
use fixtures::RaftRouter;
/// Cluster shutdown test.
///
/// What does this test do?
///
/// - this test builds upon the `initialization` test.
/// - after the cluster has been initialize, it performs a shutdown routine
/// on each node, asserting that the shutdown routine succeeded.
///
/// RUST_LOG=async_raft,memstore,shutdown=trace cargo test -p async-raft --test shutdown
#[tokio::test(core_threads = 4)]
async fn initialization() -> Result<()> {
fixtures::init_tracing();
// Setup test dependencies.
let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
let router = Arc::new(RaftRouter::new(config.clone()));
router.new_raft_node(0).await;
router.new_raft_node(1).await;
router.new_raft_node(2).await;
// Assert all nodes are in non-voter state & have no entries.
delay_for(Duration::from_secs(10)).await;
router.assert_pristine_cluster().await;
// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
delay_for(Duration::from_secs(10)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
tracing::info!("--- performing node shutdowns");
let (node0, _) = router.remove_node(0).await.ok_or_else(|| anyhow!("failed to find node 0 in router"))?;
node0.shutdown().await??;
let (node1, _) = router.remove_node(1).await.ok_or_else(|| anyhow!("failed to find node 1 in router"))?;
node1.shutdown().await??;
let (node2, _) = router.remove_node(2).await.ok_or_else(|| anyhow!("failed to find node 2 in router"))?;
node2.shutdown().await??;
Ok(())
}

View File

@ -83,7 +83,7 @@ async fn stepdown() -> Result<()> {
}
// Assert that the current cluster is stable.
router.remove_node(0).await;
let _ = router.remove_node(0).await;
delay_for(Duration::from_secs(5)).await; // Give time for a new leader to be elected.
router.assert_stable_cluster(Some(2), Some(4)).await;
router.assert_storage_state(2, 4, None, 0, None).await;