diff --git a/.gitignore b/.gitignore index dc739ac..d2397fb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ # Directory Ignores ########################################################## +guide/book target +vendor # File Ignores ############################################################### **/*.rs.bk diff --git a/CHANGELOG.md b/CHANGELOG.md index 64538ec..144ab76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ changelog ========= ### 0.3 +#### 0.3.1 +Overhauled the election timeout mechanism. This uses an interval job instead of juggling a rescheduling processes. Seems to offer quite a lot more stability. Along with the interval job, we are using std::time::Instants for performing the comparisons against the last received heartbeat. + #### 0.3.0 Another backwards incompatible change to the `RaftStorage` trait. It is now using associated types to better express the needed trait constraints. These changes were the final bit of work needed to get the entire actix-raft system to work with a Synchronous `RaftStorage` impl. Async impls continue to work as they have, the `RaftStorage` impl block will need to be updated to use the associated types though. The recommend pattern is as follows: diff --git a/src/raft/append_entries.rs b/src/raft/append_entries.rs index f64c44c..db4e89f 100644 --- a/src/raft/append_entries.rs +++ b/src/raft/append_entries.rs @@ -70,7 +70,7 @@ impl, S: RaftStorage> Handler, S: RaftStorage> Handler, S: RaftStorage /// A handle to the election timeout callback. election_timeout: Option, - /// The currently scheduled timeout timestamp in millis. - election_timeout_stamp: Option, + /// The currently scheduled election timeout. + election_timeout_stamp: Option, } impl, S: RaftStorage> Raft { @@ -516,12 +516,13 @@ impl, S: RaftStorage> Raft) { // Don't update if the cluster has this node configured as a non-voter. if !self.membership.contains(&self.id) || self.membership.non_voters.contains(&self.id) { @@ -531,29 +532,24 @@ impl, S: RaftStorage> Raft { - if !act.state.is_non_voter() { - act.become_candidate(ctx) - } + self.election_timeout_stamp = Some(Instant::now() + timeout.clone()); + self.election_timeout = Some(ctx.run_interval(timeout, |act, ctx| { + if let Some(stamp) = &act.election_timeout_stamp { + if &Instant::now() >= stamp { + act.become_candidate(ctx) } - Some(stamp) => { - debug!("{} invoked election timeout prematurely.", act.id); - // If the scheduled timeout is still in the future, put it back. - act.election_timeout_stamp = Some(stamp); - } - None => return, } })); } + /// Update the election timeout stamp, typically due to receiving a heartbeat from the Raft leader. + fn update_election_timeout_stamp(&mut self) { + self.election_timeout_stamp = Some(Instant::now() + Duration::from_millis(self.config.election_timeout_millis)); + } + /// Update the node's current membership config. /// /// NOTE WELL: if a leader is stepping down, it should not call this method, as it will cause diff --git a/src/raft/vote.rs b/src/raft/vote.rs index 1988cfd..f2c5643 100644 --- a/src/raft/vote.rs +++ b/src/raft/vote.rs @@ -68,7 +68,7 @@ impl, S: RaftStorage> Raft { self.voted_for = Some(msg.candidate_id); self.save_hard_state(ctx); - self.update_election_timeout(ctx); + self.update_election_timeout_stamp(); self.become_follower(ctx); Ok(VoteResponse{term: self.current_term, vote_granted: true, is_candidate_unknown: false}) }, diff --git a/tests/client_writes.rs b/tests/client_writes.rs index 75bab3a..1e24d4e 100644 --- a/tests/client_writes.rs +++ b/tests/client_writes.rs @@ -47,7 +47,7 @@ fn client_writes() { // Setup test controller and actions. let mut ctl = RaftTestController::new(network); ctl.register(0, node0.addr.clone()).register(1, node1.addr.clone()).register(2, node2.addr.clone()); - ctl.start_with_test(4, Box::new(|act, ctx| { + ctl.start_with_test(10, Box::new(|act, ctx| { // Get the current leader. ctx.spawn(fut::wrap_future(act.network.send(GetCurrentLeader)) .map_err(|_, _: &mut RaftTestController, _| panic!("Failed to get current leader.")) diff --git a/tests/clustering.rs b/tests/clustering.rs index c37bccd..be0eaca 100644 --- a/tests/clustering.rs +++ b/tests/clustering.rs @@ -47,7 +47,7 @@ fn clustering() { // Setup test controller and actions. let mut ctl = RaftTestController::new(network); ctl.register(0, node0.addr.clone()).register(1, node1.addr.clone()).register(2, node2.addr.clone()); - ctl.start_with_test(5, Box::new(|act, ctx| { + ctl.start_with_test(10, Box::new(|act, ctx| { let (tx0, rx0) = oneshot::channel(); act.network.do_send(ExecuteInRaftRouter(Box::new(move |act, _| { let node0: &RaftMetrics = act.metrics.get(&0).unwrap(); diff --git a/tests/dynamic_membership.rs b/tests/dynamic_membership.rs index aae40b2..80d4292 100644 --- a/tests/dynamic_membership.rs +++ b/tests/dynamic_membership.rs @@ -51,7 +51,7 @@ fn dynamic_membership() { // Setup test controller and actions. let mut ctl = RaftTestController::new(network); ctl.register(0, node0.addr.clone()).register(1, node1.addr.clone()).register(2, node2.addr.clone()); - ctl.start_with_test(5, Box::new(|act, ctx| { + ctl.start_with_test(10, Box::new(|act, ctx| { let task = act.write_data(ctx) // Data has been writtent to new cluster, get the ID of the current leader. .and_then(|_, act, _| { @@ -78,7 +78,7 @@ fn dynamic_membership() { .map(move |_, _, _| leader_id) }) // Delay for a bit to ensure we can target a new leader in the test. - .and_then(|leader_id, _, _| fut::wrap_future(Delay::new(Instant::now() + Duration::from_secs(3))) + .and_then(|leader_id, _, _| fut::wrap_future(Delay::new(Instant::now() + Duration::from_secs(6))) .map_err(|_, _, _| ()) .map(move |_, _, _| leader_id)) // Remove old node. @@ -91,7 +91,7 @@ fn dynamic_membership() { }) // Write some additional data to the new leader. .and_then(|old_leader_id, act, ctx| act.write_data(ctx).map(move |_, _, _| old_leader_id)) - .and_then(|old_leader_id, _, _| fut::wrap_future(Delay::new(Instant::now() + Duration::from_secs(3)) + .and_then(|old_leader_id, _, _| fut::wrap_future(Delay::new(Instant::now() + Duration::from_secs(6)) .map_err(|_| ())) .map(move |_, _, _| old_leader_id)) diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index d3d4be8..f0c2c82 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -174,7 +174,7 @@ impl NodeBuilder { let temp_dir = tempdir_in("/tmp").expect("Tempdir to be created without error."); let snapshot_dir = temp_dir.path().to_string_lossy().to_string(); let config = Config::build(snapshot_dir.clone()) - .election_timeout_min(800).election_timeout_max(1000).heartbeat_interval(300) + .election_timeout_min(1500).election_timeout_max(2000).heartbeat_interval(150) .metrics_rate(Duration::from_secs(metrics_rate)) .snapshot_policy(snapshot_policy).snapshot_max_chunk_size(10000) .validate().expect("Raft config to be created without error."); @@ -206,7 +206,7 @@ pub fn new_raft_node(id: NodeId, network: Addr, members: Vec let temp_dir = tempdir_in("/tmp").expect("Tempdir to be created without error."); let snapshot_dir = temp_dir.path().to_string_lossy().to_string(); let config = Config::build(snapshot_dir.clone()) - .election_timeout_min(800).election_timeout_max(1000).heartbeat_interval(300) + .election_timeout_min(1500).election_timeout_max(2000).heartbeat_interval(150) .metrics_rate(Duration::from_secs(metrics_rate)) .snapshot_policy(SnapshotPolicy::Disabled).snapshot_max_chunk_size(10000) .validate().expect("Raft config to be created without error."); diff --git a/tests/initialization.rs b/tests/initialization.rs index fd91642..9b98b77 100644 --- a/tests/initialization.rs +++ b/tests/initialization.rs @@ -47,7 +47,7 @@ fn initialization() { // Setup test controller and actions. let mut ctl = RaftTestController::new(network); ctl.register(0, node0.addr.clone()).register(1, node1.addr.clone()).register(2, node2.addr.clone()); - ctl.start_with_test(5, Box::new(|act, ctx| { + ctl.start_with_test(10, Box::new(|act, ctx| { // Assert that all nodes are in NonVoter state with index 0. let task = fut::wrap_future(act.network.send(ExecuteInRaftRouter(Box::new(move |act, _| { for node in act.metrics.values() { diff --git a/tests/singlenode.rs b/tests/singlenode.rs index 8a0c26b..83d984b 100644 --- a/tests/singlenode.rs +++ b/tests/singlenode.rs @@ -42,7 +42,7 @@ fn singlenode() { // Setup test controller and actions. let mut ctl = RaftTestController::new(network); ctl.register(0, node0.addr.clone()); - ctl.start_with_test(5, Box::new(|act, ctx| { + ctl.start_with_test(10, Box::new(|act, ctx| { // Assert that all nodes are in NonVoter state with index 0. let task = fut::wrap_future(act.network.send(ExecuteInRaftRouter(Box::new(move |act, _| { for node in act.metrics.values() { diff --git a/tests/snapshotting.rs b/tests/snapshotting.rs index 093dac4..5efa143 100644 --- a/tests/snapshotting.rs +++ b/tests/snapshotting.rs @@ -47,7 +47,7 @@ fn snapshotting() { // Setup test controller and actions. let mut ctl = RaftTestController::new(network); ctl.register(0, node0.addr.clone()).register(1, node1.addr.clone()).register(2, node2.addr.clone()); - ctl.start_with_test(5, Box::new(|act, ctx| { + ctl.start_with_test(10, Box::new(|act, ctx| { // Isolate the current leader. let task = act.isolate_leader(ctx) // Wait for new leader to be elected.