mirror of https://github.com/railgun-rs/actix-raft
parent
db7365a42c
commit
81ecede1d4
|
@ -123,6 +123,15 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
|
|||
Ok(rx.await.map_err(|_| RaftError::ShuttingDown).and_then(|res| res)?)
|
||||
}
|
||||
|
||||
/// Get the id of the current leader from this Raft node.
|
||||
///
|
||||
/// Noted that it is the responsibility of the application to verify the leader by calling
|
||||
/// [`client_read`] or [`client_write`].
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn client_current_leader(&self) -> Option<NodeId> {
|
||||
self.metrics().borrow().current_leader
|
||||
}
|
||||
|
||||
/// Check to ensure this node is still the cluster leader, in order to guard against stale reads (§8).
|
||||
///
|
||||
/// The actual read operation itself is up to the application, this method just ensures that
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
mod fixtures;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Result;
|
||||
use async_raft::Config;
|
||||
use tokio::time::delay_for;
|
||||
|
||||
use fixtures::RaftRouter;
|
||||
|
||||
/// Client current leader tests.
|
||||
///
|
||||
/// What does this test do?
|
||||
///
|
||||
/// - create a stable 3-node cluster.
|
||||
/// - call the client_current_leader interface on the all nodes, and assert success.
|
||||
///
|
||||
/// RUST_LOG=async_raft,memstore,client_reads=trace cargo test -p async-raft --test client_current_leader
|
||||
#[tokio::test(core_threads = 4)]
|
||||
async fn client_current_leader() -> Result<()> {
|
||||
fixtures::init_tracing();
|
||||
|
||||
// Setup test dependencies.
|
||||
let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
|
||||
let router = Arc::new(RaftRouter::new(config.clone()));
|
||||
router.new_raft_node(0).await;
|
||||
router.new_raft_node(1).await;
|
||||
router.new_raft_node(2).await;
|
||||
|
||||
// Assert all nodes are in non-voter state & have no entries.
|
||||
delay_for(Duration::from_secs(10)).await;
|
||||
router.assert_pristine_cluster().await;
|
||||
|
||||
// Initialize the cluster, then assert that a stable cluster was formed & held.
|
||||
tracing::info!("--- initializing cluster");
|
||||
router.initialize_from_single_node(0).await?;
|
||||
delay_for(Duration::from_secs(10)).await;
|
||||
router.assert_stable_cluster(Some(1), Some(1)).await;
|
||||
|
||||
// Get the ID of the leader, and assert that client_read succeeds.
|
||||
let leader = router.leader().await.expect("leader not found");
|
||||
assert_eq!(leader, 0, "expected leader to be node 0, got {}", leader);
|
||||
|
||||
for i in 0..3 {
|
||||
let leader = router.client_current_leader(i).await;
|
||||
assert_eq!(leader, Some(0), "expected leader to be node 0, got {:?}", leader);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -158,6 +158,13 @@ impl RaftRouter {
|
|||
}
|
||||
}
|
||||
|
||||
/// Request the current leader from the target node.
|
||||
pub async fn client_current_leader(&self, target: NodeId) -> Option<NodeId> {
|
||||
let rt = self.routing_table.read().await;
|
||||
let node = rt.get(&target).unwrap_or_else(|| panic!("node with ID {} does not exist", target));
|
||||
node.0.client_current_leader().await
|
||||
}
|
||||
|
||||
/// Send multiple client requests to the target node, causing test failure on error.
|
||||
pub async fn client_request_many(&self, target: NodeId, client_id: &str, count: usize) {
|
||||
for idx in 0..count {
|
||||
|
|
Loading…
Reference in New Issue