Update deps & remove lint buildup

This commit is contained in:
Anthony Dodd 2023-02-12 11:20:54 -06:00
parent bcc246ccf8
commit db93071972
No known key found for this signature in database
GPG Key ID: 620065BB8C3DAEF0
11 changed files with 109 additions and 172 deletions

View File

@ -13,23 +13,23 @@ repository = "https://github.com/async-raft/async-raft"
readme = "../README.md"
[dependencies]
anyhow = "1.0.32"
anyhow = "1"
async-trait = "0.1.36"
bytes = "1.0"
bytes = "1"
derive_more = { version="0.99.9", default-features=false, features=["from"] }
futures = "0.3"
log = "0.4"
rand = "0.7"
serde = { version="1", features=["derive"] }
thiserror = "1.0.20"
tokio = { version="1.0", default-features=false, features=["fs", "io-util", "macros", "rt", "rt-multi-thread", "sync", "time"] }
thiserror = "1"
tokio = { version="1", default-features=false, features=["fs", "io-util", "macros", "rt", "rt-multi-thread", "sync", "time"] }
tracing = "0.1"
tracing-futures = "0.2.4"
[dev-dependencies]
maplit = "1.0.2"
maplit = "1"
memstore = { version="0.2.0", path="../memstore" }
tracing-subscriber = "0.2.10"
tracing-subscriber = "0.2"
[features]
docinclude = [] # Used only for activating `doc(include="...")` on nightly.

View File

@ -211,7 +211,7 @@ impl ConfigBuilder {
return Err(ConfigError::MaxPayloadEntriesTooSmall);
}
let replication_lag_threshold = self.replication_lag_threshold.unwrap_or(DEFAULT_REPLICATION_LAG_THRESHOLD);
let snapshot_policy = self.snapshot_policy.unwrap_or_else(SnapshotPolicy::default);
let snapshot_policy = self.snapshot_policy.unwrap_or_default();
let snapshot_max_chunk_size = self.snapshot_max_chunk_size.unwrap_or(DEFAULT_SNAPSHOT_CHUNKSIZE);
Ok(Config {
cluster_name: self.cluster_name,
@ -237,9 +237,9 @@ mod tests {
fn test_config_defaults() {
let cfg = Config::build("cluster0".into()).validate().unwrap();
assert!(cfg.election_timeout_min >= DEFAULT_ELECTION_TIMEOUT_MIN as u64);
assert!(cfg.election_timeout_max <= DEFAULT_ELECTION_TIMEOUT_MAX as u64);
assert!(cfg.heartbeat_interval == DEFAULT_HEARTBEAT_INTERVAL as u64);
assert!(cfg.election_timeout_min >= DEFAULT_ELECTION_TIMEOUT_MIN);
assert!(cfg.election_timeout_max <= DEFAULT_ELECTION_TIMEOUT_MAX);
assert!(cfg.heartbeat_interval == DEFAULT_HEARTBEAT_INTERVAL);
assert!(cfg.max_payload_entries == DEFAULT_MAX_PAYLOAD_ENTRIES);
assert!(cfg.replication_lag_threshold == DEFAULT_REPLICATION_LAG_THRESHOLD);
assert!(cfg.snapshot_max_chunk_size == DEFAULT_SNAPSHOT_CHUNKSIZE);

View File

@ -26,10 +26,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Build a new membership config from given init data & assign it as the new cluster
// membership config in memory only.
self.core.membership = MembershipConfig {
members,
members_after_consensus: None,
};
self.core.membership = MembershipConfig { members, members_after_consensus: None };
// Become a candidate and start campaigning for leadership. If this node is the only node
// in the cluster, then become leader without holding an election. If members len == 1, we
@ -72,14 +69,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Spawn a replication stream for the new member. Track state as a non-voter so that it
// can be updated to be added to the cluster config once it has been brought up-to-date.
let state = self.spawn_replication_stream(target);
self.non_voters.insert(
target,
NonVoterReplicationState {
state,
is_ready_to_join: false,
tx: Some(tx),
},
);
self.non_voters
.insert(target, NonVoterReplicationState { state, is_ready_to_join: false, tx: Some(tx) });
}
#[tracing::instrument(level = "trace", skip(self, tx))]
@ -107,7 +98,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// if we can proceed.
let mut awaiting = HashSet::new();
for new_node in members.difference(&self.core.membership.members) {
match self.non_voters.get(&new_node) {
match self.non_voters.get(new_node) {
// Node is ready to join.
Some(node) if node.is_ready_to_join => continue,
// Node has repl stream, but is not yet ready to join.
@ -117,14 +108,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Spawn a replication stream for the new member. Track state as a non-voter so that it
// can be updated to be added to the cluster config once it has been brought up-to-date.
let state = self.spawn_replication_stream(*new_node);
self.non_voters.insert(
*new_node,
NonVoterReplicationState {
state,
is_ready_to_join: false,
tx: None,
},
);
self.non_voters
.insert(*new_node, NonVoterReplicationState { state, is_ready_to_join: false, tx: None });
}
}
awaiting.insert(*new_node);
@ -160,7 +145,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Setup channels for eventual response to the 2-phase config change.
let (tx_cfg_change, rx_cfg_change) = oneshot::channel();
self.propose_config_change_cb = Some(tx_cfg_change); // Once the entire process is done, this is our response channel.
self.joint_consensus_cb.push(rx_join); // Receiver for when the joint consensus is committed.
self.joint_consensus_cb.push_back(rx_join); // Receiver for when the joint consensus is committed.
tokio::spawn(async move {
let res = rx_cfg_change
.map_err(|_| RaftError::ShuttingDown)
@ -224,7 +209,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
self.core.report_metrics();
// Setup channel for eventual commitment of the uniform consensus config.
self.uniform_consensus_cb.push(rx_uniform); // Receiver for when the uniform consensus is committed.
self.uniform_consensus_cb.push_back(rx_uniform); // Receiver for when the uniform consensus is committed.
Ok(())
}

View File

@ -131,10 +131,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
let opt = match old_entries.iter().find(|entry| entry.term == msg.prev_log_term) {
Some(entry) => Some(ConflictOpt {
term: entry.term,
index: entry.index,
}),
Some(entry) => Some(ConflictOpt { term: entry.term, index: entry.index }),
None => Some(ConflictOpt {
term: self.last_log_term,
index: self.last_log_index,
@ -262,7 +259,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
storage.replicate_to_state_machine(&entries_refs).await?;
Ok(None)
});
self.replicate_to_sm_handle.push(handle);
self.replicate_to_sm_handle.push_back(handle);
}
/// Perform an initial replication of outstanding entries to the state machine.
@ -300,6 +297,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
storage.replicate_to_state_machine(&data_entries).await?;
Ok(new_last_applied)
});
self.replicate_to_sm_handle.push(handle);
self.replicate_to_sm_handle.push_back(handle);
}
}

View File

@ -27,10 +27,7 @@ pub(super) struct ClientRequestEntry<D: AppData, R: AppDataResponse> {
impl<D: AppData, R: AppDataResponse> ClientRequestEntry<D, R> {
/// Create a new instance from the raw components of a client request.
pub(crate) fn from_entry<T: Into<ClientOrInternalResponseTx<D, R>>>(entry: Entry<D>, tx: T) -> Self {
Self {
entry: Arc::new(entry),
tx: tx.into(),
}
Self { entry: Arc::new(entry), tx: tx.into() }
}
}
@ -81,9 +78,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Setup any callbacks needed for responding to commitment of a pending config.
if let Some(is_in_joint_consensus) = pending_config {
if is_in_joint_consensus {
self.joint_consensus_cb.push(rx_payload_committed); // Receiver for when the joint consensus is committed.
self.joint_consensus_cb.push_back(rx_payload_committed); // Receiver for when the joint consensus is committed.
} else {
self.uniform_consensus_cb.push(rx_payload_committed); // Receiver for when the uniform consensus is committed.
self.uniform_consensus_cb.push_back(rx_payload_committed); // Receiver for when the uniform consensus is committed.
}
}
Ok(())
@ -283,10 +280,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
ClientOrInternalResponseTx::Client(tx) => match &req.entry.payload {
EntryPayload::Normal(inner) => match self.apply_entry_to_state_machine(&req.entry.index, &inner.data).await {
Ok(data) => {
let _ = tx.send(Ok(ClientWriteResponse {
index: req.entry.index,
data,
}));
let _ = tx.send(Ok(ClientWriteResponse { index: req.entry.index, data }));
}
Err(err) => {
let _ = tx.send(Err(ClientWriteError::RaftError(err)));
@ -356,17 +350,22 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}
// Apply this entry to the state machine and return its data response.
let res = self.core.storage.apply_entry_to_state_machine(index, entry).await.map_err(|err| {
if err.downcast_ref::<S::ShutdownError>().is_some() {
// If this is an instance of the storage impl's shutdown error, then trigger shutdown.
self.core.map_fatal_storage_error(err)
} else {
// Else, we propagate normally.
RaftError::RaftStorage(err)
}
});
let res = self
.core
.storage
.apply_entry_to_state_machine(index, entry)
.await
.map_err(|err| {
if err.downcast_ref::<S::ShutdownError>().is_some() {
// If this is an instance of the storage impl's shutdown error, then trigger shutdown.
self.core.map_fatal_storage_error(err)
} else {
// Else, we propagate normally.
RaftError::RaftStorage(err)
}
});
self.core.last_applied = *index;
self.core.report_metrics();
Ok(res?)
res
}
}

View File

@ -161,7 +161,11 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
#[tracing::instrument(level="trace", skip(self), fields(id=self.id, cluster=%self.config.cluster_name))]
async fn main(mut self) -> RaftResult<()> {
tracing::trace!("raft node is initializing");
let state = self.storage.get_initial_state().await.map_err(|err| self.map_fatal_storage_error(err))?;
let state = self
.storage
.get_initial_state()
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
self.last_log_index = state.last_log_index;
self.last_log_term = state.last_log_term;
self.current_term = state.hard_state.current_term;
@ -246,7 +250,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
current_term: self.current_term,
voted_for: self.voted_for,
};
Ok(self.storage.save_hard_state(&hs).await.map_err(|err| self.map_fatal_storage_error(err))?)
self.storage
.save_hard_state(&hs)
.await
.map_err(|err| self.map_fatal_storage_error(err))
}
/// Update core's target state, ensuring all invariants are upheld.
@ -381,10 +388,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
let (handle, reg) = AbortHandle::new_pair();
let (chan_tx, _) = broadcast::channel(1);
let tx_compaction = self.tx_compaction.clone();
self.snapshot_state = Some(SnapshotState::Snapshotting {
handle,
sender: chan_tx.clone(),
});
self.snapshot_state = Some(SnapshotState::Snapshotting { handle, sender: chan_tx.clone() });
tokio::spawn(
async move {
let res = Abortable::new(storage.do_log_compaction(), reg).await;

View File

@ -16,10 +16,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// If candidate's current term is less than this nodes current term, reject.
if msg.term < self.current_term {
tracing::trace!({candidate=msg.candidate_id, self.current_term, rpc_term=msg.term}, "RequestVote RPC term is less than current term");
return Ok(VoteResponse {
term: self.current_term,
vote_granted: false,
});
return Ok(VoteResponse { term: self.current_term, vote_granted: false });
}
// Do not respond to the request if we've received a heartbeat within the election timeout minimum.
@ -31,10 +28,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
{ candidate = msg.candidate_id },
"rejecting vote request received within election timeout minimum"
);
return Ok(VoteResponse {
term: self.current_term,
vote_granted: false,
});
return Ok(VoteResponse { term: self.current_term, vote_granted: false });
}
}
@ -56,10 +50,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
{ candidate = msg.candidate_id },
"rejecting vote request as candidate's log is not up-to-date"
);
return Ok(VoteResponse {
term: self.current_term,
vote_granted: false,
});
return Ok(VoteResponse { term: self.current_term, vote_granted: false });
}
// TODO: add hook for PreVote optimization here. If the RPC is a PreVote, then at this
@ -68,15 +59,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// Candidate's log is up-to-date so handle voting conditions.
match &self.voted_for {
// This node has already voted for the candidate.
Some(candidate_id) if candidate_id == &msg.candidate_id => Ok(VoteResponse {
term: self.current_term,
vote_granted: true,
}),
Some(candidate_id) if candidate_id == &msg.candidate_id => Ok(VoteResponse { term: self.current_term, vote_granted: true }),
// This node has already voted for a different candidate.
Some(_) => Ok(VoteResponse {
term: self.current_term,
vote_granted: false,
}),
Some(_) => Ok(VoteResponse { term: self.current_term, vote_granted: false }),
// This node has not yet voted for the current term, so vote for the candidate.
None => {
self.voted_for = Some(msg.candidate_id);
@ -84,10 +69,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.update_next_election_timeout(false);
self.save_hard_state().await?;
tracing::trace!({candidate=msg.candidate_id, msg.term}, "voted for candidate");
Ok(VoteResponse {
term: self.current_term,
vote_granted: true,
})
Ok(VoteResponse { term: self.current_term, vote_granted: true })
}
}
}
@ -144,7 +126,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
for member in all_members.into_iter().filter(|member| member != &self.core.id) {
let rpc = VoteRequest::new(self.core.current_term, self.core.id, self.core.last_log_index, self.core.last_log_term);
let (network, tx_inner) = (self.core.network.clone(), tx.clone());
let _ = tokio::spawn(
tokio::spawn(
async move {
match network.vote(member, rpc).await {
Ok(res) => {

View File

@ -93,7 +93,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.tx_api
.send(RaftMsg::AppendEntries { rpc, tx })
.map_err(|_| RaftError::ShuttingDown)?;
Ok(rx.await.map_err(|_| RaftError::ShuttingDown).and_then(|res| res)?)
rx.await.map_err(|_| RaftError::ShuttingDown).and_then(|res| res)
}
/// Submit a VoteRequest (RequestVote in the spec) RPC to this Raft node.
@ -106,7 +106,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.tx_api
.send(RaftMsg::RequestVote { rpc, tx })
.map_err(|_| RaftError::ShuttingDown)?;
Ok(rx.await.map_err(|_| RaftError::ShuttingDown).and_then(|res| res)?)
rx.await.map_err(|_| RaftError::ShuttingDown).and_then(|res| res)
}
/// Submit an InstallSnapshot RPC to this Raft node.
@ -120,7 +120,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.tx_api
.send(RaftMsg::InstallSnapshot { rpc, tx })
.map_err(|_| RaftError::ShuttingDown)?;
Ok(rx.await.map_err(|_| RaftError::ShuttingDown).and_then(|res| res)?)
rx.await.map_err(|_| RaftError::ShuttingDown).and_then(|res| res)
}
/// Get the ID of the current leader from this Raft node.
@ -144,10 +144,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.tx_api
.send(RaftMsg::ClientReadRequest { tx })
.map_err(|_| ClientReadError::RaftError(RaftError::ShuttingDown))?;
Ok(rx
.await
rx.await
.map_err(|_| ClientReadError::RaftError(RaftError::ShuttingDown))
.and_then(|res| res)?)
.and_then(|res| res)
}
/// Submit a mutating client request to Raft to update the state of the system (§5.1).
@ -174,10 +173,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.tx_api
.send(RaftMsg::ClientWriteRequest { rpc, tx })
.map_err(|_| ClientWriteError::RaftError(RaftError::ShuttingDown))?;
Ok(rx
.await
rx.await
.map_err(|_| ClientWriteError::RaftError(RaftError::ShuttingDown))
.and_then(|res| res)?)
.and_then(|res| res)
}
/// Initialize a pristine Raft node with the given config.
@ -215,10 +213,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.tx_api
.send(RaftMsg::Initialize { members, tx })
.map_err(|_| RaftError::ShuttingDown)?;
Ok(rx
.await
rx.await
.map_err(|_| InitializeError::RaftError(RaftError::ShuttingDown))
.and_then(|res| res)?)
.and_then(|res| res)
}
/// Synchronize a new Raft node, bringing it up-to-speed (§6).
@ -240,10 +237,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.tx_api
.send(RaftMsg::AddNonVoter { id, tx })
.map_err(|_| RaftError::ShuttingDown)?;
Ok(rx
.await
rx.await
.map_err(|_| ChangeConfigError::RaftError(RaftError::ShuttingDown))
.and_then(|res| res)?)
.and_then(|res| res)
}
/// Propose a cluster configuration change (§6).
@ -264,10 +260,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.tx_api
.send(RaftMsg::ChangeMembership { members, tx })
.map_err(|_| RaftError::ShuttingDown)?;
Ok(rx
.await
rx.await
.map_err(|_| ChangeConfigError::RaftError(RaftError::ShuttingDown))
.and_then(|res| res)?)
.and_then(|res| res)
}
/// Get a handle to the metrics channel.
@ -505,10 +500,7 @@ impl MembershipConfig {
pub fn new_initial(id: NodeId) -> Self {
let mut members = HashSet::new();
members.insert(id);
Self {
members,
members_after_consensus: None,
}
Self { members, members_after_consensus: None }
}
}
@ -531,12 +523,7 @@ pub struct VoteRequest {
impl VoteRequest {
/// Create a new instance.
pub fn new(term: u64, candidate_id: u64, last_log_index: u64, last_log_term: u64) -> Self {
Self {
term,
candidate_id,
last_log_index,
last_log_term,
}
Self { term, candidate_id, last_log_index, last_log_term }
}
}

View File

@ -18,6 +18,7 @@ use crate::{AppData, AppDataResponse, NodeId, RaftNetwork, RaftStorage};
/// The public handle to a spawned replication stream.
pub(crate) struct ReplicationStream<D: AppData> {
/// The spawn handle the `ReplicationCore` task.
#[allow(dead_code)]
pub handle: JoinHandle<()>,
/// The channel used for communicating with the replication task.
pub repltx: mpsc::UnboundedSender<RaftEvent<D>>,
@ -223,7 +224,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
return;
}
};
let last_index_and_term = self.outbound_buffer.last().map(|last| (last.as_ref().index, last.as_ref().term));
let last_index_and_term = self
.outbound_buffer
.last()
.map(|last| (last.as_ref().index, last.as_ref().term));
self.outbound_buffer.clear(); // Once we've successfully sent a payload of entries, don't send them again.
// Handle success conditions.
@ -258,10 +262,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
// Replication was not successful, if a newer term has been returned, revert to follower.
if res.term > self.term {
tracing::trace!({ res.term }, "append entries failed, reverting to follower");
let _ = self.rafttx.send(ReplicaEvent::RevertToFollower {
target: self.target,
term: res.term,
});
let _ = self
.rafttx
.send(ReplicaEvent::RevertToFollower { target: self.target, term: res.term });
self.target_state = TargetReplState::Shutdown;
return;
}
@ -335,7 +338,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
}
// Follower is behind, but not too far behind to receive an InstallSnapshot RPC.
self.target_state = TargetReplState::Lagging;
return;
}
}
}
@ -515,10 +517,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
#[tracing::instrument(level = "trace", skip(self), fields(state = "line-rate"))]
pub async fn run(mut self) {
let event = ReplicaEvent::RateUpdate {
target: self.core.target,
is_line_rate: true,
};
let event = ReplicaEvent::RateUpdate { target: self.core.target, is_line_rate: true };
let _ = self.core.rafttx.send(event);
loop {
if self.core.target_state != TargetReplState::LineRate {
@ -576,7 +575,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
// Prepend.
self.core.outbound_buffer.reverse();
self.core.outbound_buffer.extend(entries.into_iter().rev().map(OutboundEntry::Raw));
self.core
.outbound_buffer
.extend(entries.into_iter().rev().map(OutboundEntry::Raw));
self.core.outbound_buffer.reverse();
}
}
@ -598,10 +599,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
#[tracing::instrument(level = "trace", skip(self), fields(state = "lagging"))]
pub async fn run(mut self) {
let event = ReplicaEvent::RateUpdate {
target: self.core.target,
is_line_rate: false,
};
let event = ReplicaEvent::RateUpdate { target: self.core.target, is_line_rate: false };
let _ = self.core.rafttx.send(event);
self.core.replication_buffer.clear();
self.core.outbound_buffer.clear();
@ -674,7 +672,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
return;
}
}
self.core.outbound_buffer.extend(entries.into_iter().map(OutboundEntry::Raw));
self.core
.outbound_buffer
.extend(entries.into_iter().map(OutboundEntry::Raw));
}
}
}
@ -693,19 +693,12 @@ struct SnapshottingState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>,
impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> SnapshottingState<'a, D, R, N, S> {
/// Create a new instance.
pub fn new(core: &'a mut ReplicationCore<D, R, N, S>) -> Self {
Self {
core,
snapshot: None,
snapshot_fetch_rx: None,
}
Self { core, snapshot: None, snapshot_fetch_rx: None }
}
#[tracing::instrument(level = "trace", skip(self), fields(state = "snapshotting"))]
pub async fn run(mut self) {
let event = ReplicaEvent::RateUpdate {
target: self.core.target,
is_line_rate: false,
};
let event = ReplicaEvent::RateUpdate { target: self.core.target, is_line_rate: false };
let _ = self.core.rafttx.send(event);
self.core.replication_buffer.clear();
self.core.outbound_buffer.clear();
@ -718,10 +711,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// If we don't have any of the components we need, fetch the current snapshot.
if self.snapshot.is_none() && self.snapshot_fetch_rx.is_none() {
let (tx, rx) = oneshot::channel();
let _ = self.core.rafttx.send(ReplicaEvent::NeedsSnapshot {
target: self.core.target,
tx,
});
let _ = self
.core
.rafttx
.send(ReplicaEvent::NeedsSnapshot { target: self.core.target, tx });
self.snapshot_fetch_rx = Some(rx);
}
@ -812,10 +805,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Handle response conditions.
if res.term > self.core.term {
let _ = self.core.rafttx.send(ReplicaEvent::RevertToFollower {
target: self.core.target,
term: res.term,
});
let _ = self
.core
.rafttx
.send(ReplicaEvent::RevertToFollower { target: self.core.target, term: res.term });
self.core.target_state = TargetReplState::Shutdown;
return Ok(());
}

View File

@ -13,12 +13,12 @@ repository = "https://github.com/async-raft/async-raft"
readme = "README.md"
[dependencies]
anyhow = "1.0.32"
anyhow = "1"
async-raft = { version="0.6", path="../async-raft" }
serde = { version="1.0.114", features=["derive"] }
serde_json = "1.0.57"
thiserror = "1.0.20"
tokio = { version="1.0", default-features=false, features=["sync"] }
serde = { version="1", features=["derive"] }
serde_json = "1"
thiserror = "1"
tokio = { version="1", default-features=false, features=["sync"] }
tracing = "0.1.17"
tracing-futures = "0.2.4"

View File

@ -94,13 +94,7 @@ impl MemStore {
let sm = RwLock::new(MemStoreStateMachine::default());
let hs = RwLock::new(None);
let current_snapshot = RwLock::new(None);
Self {
id,
log,
sm,
hs,
current_snapshot,
}
Self { id, log, sm, hs, current_snapshot }
}
/// Create a new `MemStore` instance with some existing state (for testing).
@ -113,13 +107,7 @@ impl MemStore {
let sm = RwLock::new(sm);
let hs = RwLock::new(hs);
let current_snapshot = RwLock::new(current_snapshot);
Self {
id,
log,
sm,
hs,
current_snapshot,
}
Self { id, log, sm, hs, current_snapshot }
}
/// Get a handle to the log for testing purposes.
@ -249,7 +237,8 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}
}
let previous = sm.client_status.insert(data.client.clone(), data.status.clone());
sm.client_serial_responses.insert(data.client.clone(), (data.serial, previous.clone()));
sm.client_serial_responses
.insert(data.client.clone(), (data.serial, previous.clone()));
Ok(ClientResponse(previous))
}
@ -264,7 +253,8 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}
}
let previous = sm.client_status.insert(data.client.clone(), data.status.clone());
sm.client_serial_responses.insert(data.client.clone(), (data.serial, previous.clone()));
sm.client_serial_responses
.insert(data.client.clone(), (data.serial, previous.clone()));
}
Ok(())
}