From 3b378cc45be6b7dd39cd64418f315cee35108779 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Mon, 14 Oct 2024 16:06:58 +0800 Subject: [PATCH] Improve: remove `Copy` bound from `NodeId` The `NodeId` type is currently defined as: ```rust type NodeId: .. + Copy + .. + 'static; ``` This commit removes the `Copy` bound from `NodeId`. This modification will allow the use of non-`Copy` types as `NodeId`, providing greater flexibility for applications that prefer variable-length strings or other non-`Copy` types for node identification. This change maintain compatibility by updating derived `Copy` implementations with manual implementations: ```rust // Before #[derive(Copy...)] pub struct LogId {} // After impl Copy for LogId {} ``` --- examples/memstore/src/log_store.rs | 16 +- openraft/src/core/raft_core.rs | 233 ++++++++++-------- openraft/src/core/sm/command.rs | 4 +- openraft/src/core/sm/worker.rs | 4 +- openraft/src/engine/engine_impl.rs | 38 +-- .../engine/handler/establish_handler/mod.rs | 6 +- .../engine/handler/following_handler/mod.rs | 21 +- .../src/engine/handler/leader_handler/mod.rs | 8 +- .../src/engine/handler/log_handler/mod.rs | 6 +- .../engine/handler/replication_handler/mod.rs | 39 +-- .../handler/server_state_handler/mod.rs | 6 +- .../engine/handler/snapshot_handler/mod.rs | 2 +- .../src/engine/handler/vote_handler/mod.rs | 12 +- openraft/src/engine/log_id_list.rs | 36 +-- openraft/src/entry/mod.rs | 4 +- openraft/src/log_id/log_id_option_ext.rs | 2 +- openraft/src/log_id/mod.rs | 2 +- openraft/src/log_id_range.rs | 2 +- .../src/membership/effective_membership.rs | 8 +- openraft/src/membership/membership.rs | 14 +- openraft/src/metrics/wait.rs | 2 +- openraft/src/network/snapshot_transport.rs | 2 +- openraft/src/node.rs | 1 - openraft/src/progress/entry/mod.rs | 14 +- openraft/src/progress/inflight/mod.rs | 2 +- openraft/src/progress/mod.rs | 32 +-- openraft/src/proposer/candidate.rs | 6 +- openraft/src/proposer/leader.rs | 20 +- openraft/src/quorum/quorum_set_impl.rs | 8 +- openraft/src/raft/impl_raft_blocking_write.rs | 10 +- openraft/src/raft/message/vote.rs | 8 +- openraft/src/raft/mod.rs | 18 +- .../membership_state/change_handler.rs | 4 +- openraft/src/raft_state/mod.rs | 8 +- openraft/src/replication/mod.rs | 55 +++-- openraft/src/storage/callback.rs | 2 +- openraft/src/storage/helper.rs | 20 +- openraft/src/storage/log_store_ext.rs | 2 +- openraft/src/storage/mod.rs | 4 +- openraft/src/testing/suite.rs | 10 +- openraft/src/vote/leader_id/leader_id_adv.rs | 4 +- openraft/src/vote/leader_id/leader_id_std.rs | 5 +- tests/tests/fixtures/mod.rs | 2 +- 43 files changed, 366 insertions(+), 336 deletions(-) diff --git a/examples/memstore/src/log_store.rs b/examples/memstore/src/log_store.rs index 0a7f740ff..aa73b9bba 100644 --- a/examples/memstore/src/log_store.rs +++ b/examples/memstore/src/log_store.rs @@ -60,12 +60,12 @@ impl LogStoreInner { } async fn get_log_state(&mut self) -> Result, StorageError> { - let last = self.log.iter().next_back().map(|(_, ent)| *ent.get_log_id()); + let last = self.log.iter().next_back().map(|(_, ent)| ent.get_log_id().clone()); - let last_purged = self.last_purged_log_id; + let last_purged = self.last_purged_log_id.clone(); let last = match last { - None => last_purged, + None => last_purged.clone(), Some(x) => Some(x), }; @@ -81,16 +81,16 @@ impl LogStoreInner { } async fn read_committed(&mut self) -> Result>, StorageError> { - Ok(self.committed) + Ok(self.committed.clone()) } async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { - self.vote = Some(*vote); + self.vote = Some(vote.clone()); Ok(()) } async fn read_vote(&mut self) -> Result>, StorageError> { - Ok(self.vote) + Ok(self.vote.clone()) } async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> @@ -116,8 +116,8 @@ impl LogStoreInner { async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { { let ld = &mut self.last_purged_log_id; - assert!(*ld <= Some(log_id)); - *ld = Some(log_id); + assert!(ld.as_ref() <= Some(&log_id)); + *ld = Some(log_id.clone()); } { diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index b03a101aa..c750028b3 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -252,7 +252,7 @@ where Err(err) } - #[tracing::instrument(level="trace", skip_all, fields(id=display(self.id), cluster=%self.config.cluster_name))] + #[tracing::instrument(level="trace", skip_all, fields(id=display(&self.id), cluster=%self.config.cluster_name))] async fn do_main( &mut self, rx_shutdown: ::OneshotReceiver<()>, @@ -298,18 +298,18 @@ where // TODO: this applied is a little stale when being returned to client. // Fix this when the following heartbeats are replaced with calling RaftNetwork. - let applied = self.engine.state.io_applied().copied(); + let applied = self.engine.state.io_applied().cloned(); (read_log_id, applied) }; - let my_id = self.id; - let my_vote = *self.engine.state.vote_ref(); + let my_id = self.id.clone(); + let my_vote = self.engine.state.vote_ref().clone(); let ttl = Duration::from_millis(self.config.heartbeat_interval); let eff_mem = self.engine.state.membership_state.effective().clone(); let core_tx = self.tx_notify.clone(); - let mut granted = btreeset! {my_id}; + let mut granted = btreeset! {my_id.clone()}; if eff_mem.is_quorum(granted.iter()) { let _ = tx.send(Ok(resp)); @@ -325,41 +325,45 @@ where }; for (target, progress) in voter_progresses { - let target = *target; + let target = target.clone(); if target == my_id { continue; } let rpc = AppendEntriesRequest { - vote: my_vote, - prev_log_id: progress.matching, + vote: my_vote.clone(), + prev_log_id: progress.matching.clone(), entries: vec![], - leader_commit: self.engine.state.committed().copied(), + leader_commit: self.engine.state.committed().cloned(), }; // Safe unwrap(): target is in membership let target_node = eff_mem.get_node(&target).unwrap().clone(); - let mut client = self.network.new_client(target, &target_node).await; + let mut client = self.network.new_client(target.clone(), &target_node).await; let option = RPCOption::new(ttl); - let fu = async move { - let outer_res = C::AsyncRuntime::timeout(ttl, client.append_entries(rpc, option)).await; - match outer_res { - Ok(append_res) => match append_res { - Ok(x) => Ok((target, x)), - Err(err) => Err((target, err)), - }, - Err(_timeout) => { - let timeout_err = Timeout { - action: RPCTypes::AppendEntries, - id: my_id, - target, - timeout: ttl, - }; + let fu = { + let my_id = my_id.clone(); + let target = target.clone(); + async move { + let outer_res = C::AsyncRuntime::timeout(ttl, client.append_entries(rpc, option)).await; + match outer_res { + Ok(append_res) => match append_res { + Ok(x) => Ok((target.clone(), x)), + Err(err) => Err((target.clone(), err)), + }, + Err(_timeout) => { + let timeout_err = Timeout { + action: RPCTypes::AppendEntries, + id: my_id, + target: target.clone(), + timeout: ttl, + }; - Err((target, RPCError::Timeout(timeout_err))) + Err((target, RPCError::Timeout(timeout_err))) + } } } }; @@ -376,11 +380,11 @@ where let (target, append_res) = match res { Ok(Ok(res)) => res, Ok(Err((target, err))) => { - tracing::error!(target=display(target), error=%err, "timeout while confirming leadership for read request"); + tracing::error!(target=display(&target), error=%err, "timeout while confirming leadership for read request"); continue; } Err((target, err)) => { - tracing::error!(target = display(target), "fail to join task: {}", err); + tracing::error!(target = display(&target), "fail to join task: {}", err); continue; } }; @@ -480,7 +484,7 @@ where /// /// The result of applying it to state machine is sent to `resp_tx`, if it is not `None`. /// The calling side may not receive a result from `resp_tx`, if raft is shut down. - #[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))] + #[tracing::instrument(level = "debug", skip_all, fields(id = display(&self.id)))] pub fn write_entry(&mut self, entry: C::Entry, resp_tx: Option>) -> bool { tracing::debug!(payload = display(&entry), "write_entry"); @@ -507,7 +511,7 @@ where /// Send a heartbeat message to every followers/learners. /// /// Currently heartbeat is a blank log - #[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))] + #[tracing::instrument(level = "debug", skip_all, fields(id = display(&self.id)))] pub fn send_heartbeat(&mut self, emitter: impl Display) -> bool { tracing::debug!(now = debug(C::now()), "send_heartbeat"); @@ -528,7 +532,19 @@ where pub fn flush_metrics(&mut self) { let leader_metrics = if let Some(leader) = self.engine.leader.as_ref() { let prog = &leader.progress; - Some(prog.iter().map(|(id, p)| (*id, *p.borrow())).collect()) + Some( + prog.iter() + .map(|(id, p)| { + ( + id.clone(), + ::NodeId> as Borrow>>>::borrow( + p, + ) + .clone(), + ) + }) + .collect(), + ) } else { None }; @@ -548,19 +564,19 @@ where let m = RaftMetrics { running_state: Ok(()), - id: self.id, + id: self.id.clone(), // --- data --- current_term: st.vote_ref().leader_id().get_term(), - vote: *st.io_state().vote(), + vote: st.io_state().vote().clone(), last_log_index: st.last_log_id().index(), - last_applied: st.io_applied().copied(), - snapshot: st.io_snapshot_last_log_id().copied(), - purged: st.io_purged().copied(), + last_applied: st.io_applied().cloned(), + snapshot: st.io_snapshot_last_log_id().cloned(), + purged: st.io_purged().cloned(), // --- cluster --- state: st.server_state, - current_leader, + current_leader: current_leader.clone(), millis_since_quorum_ack, membership_config: membership_config.clone(), @@ -569,17 +585,17 @@ where }; let data_metrics = RaftDataMetrics { - last_log: st.last_log_id().copied(), - last_applied: st.io_applied().copied(), - snapshot: st.io_snapshot_last_log_id().copied(), - purged: st.io_purged().copied(), + last_log: st.last_log_id().cloned(), + last_applied: st.io_applied().cloned(), + snapshot: st.io_snapshot_last_log_id().cloned(), + purged: st.io_purged().cloned(), millis_since_quorum_ack, replication, }; let server_metrics = RaftServerMetrics { - id: self.id, - vote: *st.io_state().vote(), + id: self.id.clone(), + vote: st.io_state().vote().clone(), state: st.server_state, current_leader, membership_config, @@ -610,7 +626,7 @@ where let res = self.tx_metrics.send(m); if let Err(err) = res { - tracing::error!(error=%err, id=display(self.id), "error reporting metrics"); + tracing::error!(error=%err, id=display(&self.id), "error reporting metrics"); } } @@ -650,7 +666,7 @@ where pub(crate) fn reject_with_forward_to_leader(&self, tx: ResultSender) where E: From> + OptionalSend { let mut leader_id = self.current_leader(); - let leader_node = self.get_leader_node(leader_id); + let leader_node = self.get_leader_node(leader_id.clone()); // Leader is no longer a node in the membership config. if leader_node.is_none() { @@ -665,7 +681,7 @@ where #[tracing::instrument(level = "debug", skip(self))] pub(crate) fn current_leader(&self) -> Option { tracing::debug!( - self_id = display(self.id), + self_id = display(&self.id), vote = display(self.engine.state.vote_ref().summary()), "get current_leader" ); @@ -762,7 +778,7 @@ where "about to apply" ); - let last_applied = *entries[entries.len() - 1].get_log_id(); + let last_applied = entries[entries.len() - 1].get_log_id().clone(); let cmd = sm::Command::apply(entries).with_seq(seq); self.sm_handle.send(cmd).map_err(|e| StorageIOError::apply(last_applied, AnyError::error(e)))?; @@ -821,25 +837,25 @@ where let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap(); let membership_log_id = self.engine.state.membership_state.effective().log_id(); - let network = self.network.new_client(target, target_node).await; - let snapshot_network = self.network.new_client(target, target_node).await; + let network = self.network.new_client(target.clone(), target_node).await; + let snapshot_network = self.network.new_client(target.clone(), target_node).await; let leader = self.engine.leader.as_ref().unwrap(); - let session_id = ReplicationSessionId::new(leader.vote, *membership_log_id); + let session_id = ReplicationSessionId::new(leader.vote.clone(), membership_log_id.clone()); ReplicationCore::::spawn( - target, + target.clone(), session_id, self.config.clone(), - self.engine.state.committed().copied(), + self.engine.state.committed().cloned(), progress_entry.matching, network, snapshot_network, self.log_store.get_log_reader().await, self.sm_handle.new_snapshot_reader(), self.tx_notify.clone(), - tracing::span!(parent: &self.span, Level::DEBUG, "replication", id=display(self.id), target=display(target)), + tracing::span!(parent: &self.span, Level::DEBUG, "replication", id=display(&self.id), target=display(&target)), ) } @@ -851,7 +867,7 @@ where let nodes = std::mem::take(&mut self.replications); tracing::debug!( - targets = debug(nodes.iter().map(|x| *x.0).collect::>()), + targets = debug(nodes.iter().map(|x| x.0.clone()).collect::>()), "remove all targets from replication_metrics" ); @@ -906,7 +922,7 @@ where /// Run an event handling loop /// /// It always returns a [`Fatal`] error upon returning. - #[tracing::instrument(level="debug", skip_all, fields(id=display(self.id)))] + #[tracing::instrument(level="debug", skip_all, fields(id=display(&self.id)))] async fn runtime_loop( &mut self, mut rx_shutdown: ::OneshotReceiver<()>, @@ -1052,7 +1068,7 @@ where async fn spawn_parallel_vote_requests(&mut self, vote_req: &VoteRequest) { let members = self.engine.state.membership_state.effective().voter_ids(); - let vote = vote_req.vote; + let vote = vote_req.vote.clone(); for target in members { if target == self.id { @@ -1063,49 +1079,54 @@ where // Safe unwrap(): target must be in membership let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap().clone(); - let mut client = self.network.new_client(target, &target_node).await; + let mut client = self.network.new_client(target.clone(), &target_node).await; let tx = self.tx_notify.clone(); let ttl = Duration::from_millis(self.config.election_timeout_min); - let id = self.id; + let id = self.id.clone(); let option = RPCOption::new(ttl); // False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932 #[allow(clippy::let_underscore_future)] let _ = C::AsyncRuntime::spawn( - async move { - let tm_res = C::AsyncRuntime::timeout(ttl, client.vote(req, option)).await; - let res = match tm_res { - Ok(res) => res, - - Err(_timeout) => { - let timeout_err = Timeout { - action: RPCTypes::Vote, - id, - target, - timeout: ttl, - }; - tracing::error!({error = %timeout_err, target = display(target)}, "timeout"); - return; - } - }; + { + let target = target.clone(); + let vote = vote.clone(); + + async move { + let tm_res = C::AsyncRuntime::timeout(ttl, client.vote(req, option)).await; + let res = match tm_res { + Ok(res) => res, + + Err(_timeout) => { + let timeout_err = Timeout { + action: RPCTypes::Vote, + id, + target: target.clone(), + timeout: ttl, + }; + tracing::error!({error = %timeout_err, target = display(&target)}, "timeout"); + return; + } + }; - match res { - Ok(resp) => { - let _ = tx.send(Notify::VoteResponse { - target, - resp, - sender_vote: vote, - }); + match res { + Ok(resp) => { + let _ = tx.send(Notify::VoteResponse { + target, + resp, + sender_vote: vote, + }); + } + Err(err) => tracing::error!({error=%err, target=display(&target)}, "while requesting vote"), } - Err(err) => tracing::error!({error=%err, target=display(target)}, "while requesting vote"), } } .instrument(tracing::debug_span!( parent: &Span::current(), "send_vote_req", - target = display(target) + target = display(&target) )), ); } @@ -1134,7 +1155,7 @@ where } // TODO: Make this method non-async. It does not need to run any async command in it. - #[tracing::instrument(level = "debug", skip(self, msg), fields(state = debug(self.engine.state.server_state), id=display(self.id)))] + #[tracing::instrument(level = "debug", skip(self, msg), fields(state = debug(self.engine.state.server_state), id=display(&self.id)))] pub(crate) async fn handle_api_msg(&mut self, msg: RaftMsg) { tracing::debug!("recv from rx_api: {}", msg.summary()); @@ -1220,7 +1241,7 @@ where } // TODO: Make this method non-async. It does not need to run any async command in it. - #[tracing::instrument(level = "debug", skip_all, fields(state = debug(self.engine.state.server_state), id=display(self.id)))] + #[tracing::instrument(level = "debug", skip_all, fields(state = debug(self.engine.state.server_state), id=display(&self.id)))] pub(crate) fn handle_notify(&mut self, notify: Notify) -> Result<(), Fatal> { tracing::debug!("recv from rx_notify: {}", notify.summary()); @@ -1250,7 +1271,7 @@ where sender_vote, } => { tracing::info!( - target = display(target), + target = display(&target), higher_vote = display(&higher), sending_vote = display(&sender_vote), "received Notify::HigherVote: {}", @@ -1344,7 +1365,7 @@ where sender_vote, } => { tracing::info!( - target = display(target), + target = display(&target), higher_vote = display(&higher), sender_vote = display(&sender_vote), "received Notify::HigherVote: {}", @@ -1391,7 +1412,7 @@ where // Update in-memory state first, then the io state. // In-memory state should always be ahead or equal to the io state. - let last_log_id = meta.last_log_id; + let last_log_id = meta.last_log_id.clone(); self.engine.finish_building_snapshot(meta); let st = self.engine.state.io_state_mut(); @@ -1406,12 +1427,12 @@ where if let Some(meta) = meta { let st = self.engine.state.io_state_mut(); - st.update_applied(meta.last_log_id); + st.update_applied(meta.last_log_id.clone()); st.update_snapshot(meta.last_log_id); } } sm::Response::Apply(res) => { - self.engine.state.io_state_mut().update_applied(Some(res.last_applied)); + self.engine.state.io_state_mut().update_applied(Some(res.last_applied.clone())); self.handle_apply_result(res); } @@ -1495,7 +1516,7 @@ where result: Result, String>, ) { tracing::debug!( - target = display(target), + target = display(&target), request_id = display(request_id), result = debug(&result), "handle_replication_progress" @@ -1523,14 +1544,14 @@ where // - Otherwise, it is sent by a Candidate, we check against the current in progress voting state. let my_vote = if sender_vote.is_committed() { let l = self.engine.leader.as_ref(); - l.map(|x| x.vote) + l.map(|x| x.vote.clone()) } else { // If it finished voting, Candidate's vote is None. let candidate = self.engine.candidate_ref(); - candidate.map(|x| *x.vote_ref()) + candidate.map(|x| x.vote_ref().clone()) }; - if Some(*sender_vote) != my_vote { + if Some(sender_vote) != my_vote.as_ref() { tracing::warn!( "A message will be ignored because vote changed: msg sent by vote: {}; current my vote: {}; when ({})", sender_vote, @@ -1615,10 +1636,10 @@ where self.leader_data = None; } Command::AppendInputEntries { vote, entries } => { - let last_log_id = *entries.last().unwrap().get_log_id(); + let last_log_id = entries.last().unwrap().get_log_id().clone(); tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),); - self.append_to_log(entries, vote, last_log_id).await?; + self.append_to_log(entries, vote, last_log_id.clone()).await?; // The leader may have changed. // But reporting to a different leader is not a problem. @@ -1628,34 +1649,34 @@ where } Command::SaveVote { vote } => { self.log_store.save_vote(&vote).await?; - self.engine.state.io_state_mut().update_vote(vote); + self.engine.state.io_state_mut().update_vote(vote.clone()); let _ = self.tx_notify.send(Notify::VoteResponse { - target: self.id, + target: self.id.clone(), // last_log_id is not used when sending VoteRequest to local node - resp: VoteResponse::new(vote, None, true), + resp: VoteResponse::new(vote.clone(), None, true), sender_vote: vote, }); } Command::PurgeLog { upto } => { - self.log_store.purge(upto).await?; + self.log_store.purge(upto.clone()).await?; self.engine.state.io_state_mut().update_purged(Some(upto)); } Command::DeleteConflictLog { since } => { - self.log_store.truncate(since).await?; + self.log_store.truncate(since.clone()).await?; // Inform clients waiting for logs to be applied. let removed = self.client_resp_channels.split_off(&since.index); if !removed.is_empty() { let leader_id = self.current_leader(); - let leader_node = self.get_leader_node(leader_id); + let leader_node = self.get_leader_node(leader_id.clone()); // False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932 #[allow(clippy::let_underscore_future)] let _ = C::spawn(async move { for (log_index, tx) in removed.into_iter() { tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader { - leader_id, + leader_id: leader_id.clone(), leader_node: leader_node.clone(), }))); @@ -1669,7 +1690,7 @@ where } Command::ReplicateCommitted { committed } => { for node in self.replications.values() { - let _ = node.tx_repl.send(Replicate::Committed(committed)); + let _ = node.tx_repl.send(Replicate::Committed(committed.clone())); } } Command::Commit { @@ -1677,7 +1698,7 @@ where ref already_committed, ref upto, } => { - self.log_store.save_committed(Some(*upto)).await?; + self.log_store.save_committed(Some(upto.clone())).await?; self.apply_to_state_machine(seq, already_committed.next_index(), upto.index).await?; } Command::Replicate { req, target } => { @@ -1702,8 +1723,8 @@ where self.remove_all_replication().await; for (target, matching) in targets.iter() { - let handle = self.spawn_replication_stream(*target, *matching).await; - self.replications.insert(*target, handle); + let handle = self.spawn_replication_stream(target.clone(), matching.clone()).await; + self.replications.insert(target.clone(), handle); } } Command::StateMachine { command } => { diff --git a/openraft/src/core/sm/command.rs b/openraft/src/core/sm/command.rs index 0cc6061cf..9fbcfcfe0 100644 --- a/openraft/src/core/sm/command.rs +++ b/openraft/src/core/sm/command.rs @@ -142,8 +142,8 @@ where C: RaftTypeConfig (CommandPayload::Apply { entries: entries1 }, CommandPayload::Apply { entries: entries2 }) => { // Entry may not be `Eq`, we just compare log id. // This would be enough for testing. - entries1.iter().map(|e| *e.get_log_id()).collect::>() - == entries2.iter().map(|e| *e.get_log_id()).collect::>() + entries1.iter().map(|e| e.get_log_id().clone()).collect::>() + == entries2.iter().map(|e| e.get_log_id().clone()).collect::>() } _ => false, } diff --git a/openraft/src/core/sm/worker.rs b/openraft/src/core/sm/worker.rs index 9393d2941..317e68f64 100644 --- a/openraft/src/core/sm/worker.rs +++ b/openraft/src/core/sm/worker.rs @@ -133,13 +133,13 @@ where let since = entries.first().map(|x| x.get_log_id().index).unwrap(); let end = entries.last().map(|x| x.get_log_id().index + 1).unwrap(); - let last_applied = entries.last().map(|x| *x.get_log_id()).unwrap(); + let last_applied = entries.last().map(|x| x.get_log_id().clone()).unwrap(); // Fake complain: avoid using `collect()` when not needed #[allow(clippy::needless_collect)] let applying_entries = entries .iter() - .map(|e| ApplyingEntry::new(*e.get_log_id(), e.get_membership().cloned())) + .map(|e| ApplyingEntry::new(e.get_log_id().clone(), e.get_membership().cloned())) .collect::>(); let n_entries = applying_entries.len(); diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index ae928161d..f1914aa0a 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -117,7 +117,7 @@ where C: RaftTypeConfig /// [`RaftState`] pub(crate) fn new_candidate(&mut self, vote: Vote) -> &mut Candidate> { let now = C::now(); - let last_log_id = self.state.last_log_id().copied(); + let last_log_id = self.state.last_log_id().cloned(); let membership = self.state.membership_state.effective().membership(); @@ -161,7 +161,7 @@ where C: RaftTypeConfig let mut rh = self.replication_handler(); // Restore the progress about the local log - rh.update_local_progress(rh.state.last_log_id().copied()); + rh.update_local_progress(rh.state.last_log_id().cloned()); rh.initiate_replication(SendNone::False); @@ -217,13 +217,13 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip(self))] pub(crate) fn elect(&mut self) { let new_term = self.state.vote.leader_id().term + 1; - let new_vote = Vote::new(new_term, self.config.id); + let new_vote = Vote::new(new_term, self.config.id.clone()); - let candidate = self.new_candidate(new_vote); + let candidate = self.new_candidate(new_vote.clone()); tracing::info!("{}, new candidate: {}", func_name!(), candidate); - let last_log_id = candidate.last_log_id().copied(); + let last_log_id = candidate.last_log_id().cloned(); // Simulate sending RequestVote RPC to local node. // Safe unwrap(): it won't reject itself ˙–˙ @@ -304,7 +304,7 @@ where C: RaftTypeConfig vote_utime + lease - now ); - return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().copied(), false); + return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().cloned(), false); } } @@ -323,7 +323,7 @@ where C: RaftTypeConfig // Return the updated vote, this way the candidate knows which vote is granted, in case // the candidate's vote is changed after sending the vote request. - return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().copied(), false); + return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().cloned(), false); } // Then check vote just as it does for every incoming event. @@ -338,14 +338,14 @@ where C: RaftTypeConfig // Return the updated vote, this way the candidate knows which vote is granted, in case // the candidate's vote is changed after sending the vote request. - VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().copied(), res.is_ok()) + VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().cloned(), res.is_ok()) } #[tracing::instrument(level = "debug", skip(self, resp))] pub(crate) fn handle_vote_resp(&mut self, target: C::NodeId, resp: VoteResponse) { tracing::info!( resp = display(resp.summary()), - target = display(target), + target = display(&target), my_vote = display(self.state.vote_ref()), my_last_log_id = display(self.state.last_log_id().summary()), "{}", @@ -435,7 +435,7 @@ where C: RaftTypeConfig // Vote is legal. let mut fh = self.following_handler(); - fh.ensure_log_consecutive(prev_log_id)?; + fh.ensure_log_consecutive(prev_log_id.clone())?; fh.append_entries(prev_log_id, entries); Ok(()) @@ -464,10 +464,10 @@ where C: RaftTypeConfig snapshot: Snapshot, tx: ResultSender>, ) { - tracing::info!(vote = display(vote), snapshot = display(&snapshot), "{}", func_name!()); + tracing::info!(vote = display(&vote), snapshot = display(&snapshot), "{}", func_name!()); let vote_res = self.vote_handler().accept_vote(&vote, tx, |state, _rejected| { - Ok(SnapshotResponse::new(*state.vote_ref())) + Ok(SnapshotResponse::new(state.vote_ref().clone())) }); let Some(tx) = vote_res else { @@ -480,7 +480,7 @@ where C: RaftTypeConfig // In this case, the response can only be sent when the snapshot is installed. let cond = fh.install_full_snapshot(snapshot); let res = Ok(SnapshotResponse { - vote: *self.state.vote_ref(), + vote: self.state.vote_ref().clone(), }); self.output.push_command(Command::Respond { @@ -579,7 +579,7 @@ where C: RaftTypeConfig let snapshot_last_log_id = self.state.snapshot_last_log_id(); let snapshot_last_log_id = if let Some(x) = snapshot_last_log_id { - *x + x.clone() } else { tracing::info!("no snapshot, can not purge"); return; @@ -608,7 +608,7 @@ where C: RaftTypeConfig // Safe unwrap: `index` is ensured to be present in the above code. let log_id = self.state.get_log_id(index).unwrap(); - tracing::info!(purge_upto = display(log_id), "{}", func_name!()); + tracing::info!(purge_upto = display(&log_id), "{}", func_name!()); self.log_handler().update_purge_upto(log_id); self.try_purge_log(); @@ -630,7 +630,7 @@ where C: RaftTypeConfig // There may already be a Leader with higher vote let Some(leader) = leader else { return }; - let vote = *leader.vote_ref(); + let vote = leader.vote_ref().clone(); self.replication_handler().rebuild_replication_streams(); @@ -661,8 +661,8 @@ where C: RaftTypeConfig ); Err(NotAllowed { - last_log_id: self.state.last_log_id().copied(), - vote: *self.state.vote_ref(), + last_log_id: self.state.last_log_id().cloned(), + vote: self.state.vote_ref().clone(), }) } @@ -674,7 +674,7 @@ where C: RaftTypeConfig ) -> Result<(), NotInMembers> { if !m.is_voter(&self.config.id) { let e = NotInMembers { - node_id: self.config.id, + node_id: self.config.id.clone(), membership: m.clone(), }; Err(e) diff --git a/openraft/src/engine/handler/establish_handler/mod.rs b/openraft/src/engine/handler/establish_handler/mod.rs index 452c8257d..4ee1dce61 100644 --- a/openraft/src/engine/handler/establish_handler/mod.rs +++ b/openraft/src/engine/handler/establish_handler/mod.rs @@ -21,11 +21,11 @@ where C: RaftTypeConfig self, candidate: Candidate>, ) -> Option<&'x mut Leader>> { - let vote = *candidate.vote_ref(); + let vote = candidate.vote_ref().clone(); debug_assert_eq!( - vote.leader_id().voted_for(), - Some(self.config.id), + vote.leader_id().voted_for().as_ref(), + Some(&self.config.id), "it can only commit its own vote" ); diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index 95cacc3ad..787172cce 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -77,7 +77,7 @@ where C: RaftTypeConfig "prev_log_id matches, skip matching entries", ); - let last_log_id = entries.last().map(|x| *x.get_log_id()); + let last_log_id = entries.last().map(|x| x.get_log_id().clone()); self.state.update_accepted(std::cmp::max(prev_log_id, last_log_id)); @@ -107,7 +107,10 @@ where C: RaftTypeConfig tracing::debug!(local = display(DisplayOption(&local)), "prev_log_id does not match"); self.truncate_logs(prev.index); - return Err(RejectAppendEntries::ByConflictingLogId { local, expect: *prev }); + return Err(RejectAppendEntries::ByConflictingLogId { + local, + expect: prev.clone(), + }); } } @@ -145,7 +148,7 @@ where C: RaftTypeConfig self.output.push_command(Command::AppendInputEntries { // A follower should always use the node's vote. - vote: *self.state.vote_ref(), + vote: self.state.vote_ref().clone(), entries, }); } @@ -153,8 +156,8 @@ where C: RaftTypeConfig /// Commit entries that are already committed by the leader. #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn commit_entries(&mut self, leader_committed: Option>) { - let accepted = self.state.accepted().copied(); - let committed = std::cmp::min(accepted, leader_committed); + let accepted = self.state.accepted().cloned(); + let committed = std::cmp::min(accepted.clone(), leader_committed.clone()); tracing::debug!( leader_committed = display(DisplayOption(&leader_committed)), @@ -272,7 +275,7 @@ where C: RaftTypeConfig let meta = &snapshot.meta; tracing::info!("install_full_snapshot: meta:{:?}", meta); - let snap_last_log_id = meta.last_log_id; + let snap_last_log_id = meta.last_log_id.clone(); if snap_last_log_id.as_ref() <= self.state.committed() { tracing::info!( @@ -304,8 +307,8 @@ where C: RaftTypeConfig } } - self.state.update_accepted(Some(snap_last_log_id)); - self.state.committed = Some(snap_last_log_id); + self.state.update_accepted(Some(snap_last_log_id.clone())); + self.state.committed = Some(snap_last_log_id.clone()); self.update_committed_membership(EffectiveMembership::new_from_stored_membership( meta.last_membership.clone(), )); @@ -334,7 +337,7 @@ where C: RaftTypeConfig // Find the last 2 membership config entries: the committed and the effective. for ent in entries.rev() { if let Some(m) = ent.get_membership() { - memberships.insert(0, StoredMembership::new(Some(*ent.get_log_id()), m.clone())); + memberships.insert(0, StoredMembership::new(Some(ent.get_log_id().clone()), m.clone())); if memberships.len() == 2 { break; } diff --git a/openraft/src/engine/handler/leader_handler/mod.rs b/openraft/src/engine/handler/leader_handler/mod.rs index e3ab871df..562f57399 100644 --- a/openraft/src/engine/handler/leader_handler/mod.rs +++ b/openraft/src/engine/handler/leader_handler/mod.rs @@ -62,7 +62,7 @@ where C: RaftTypeConfig membership_entry.is_none(), "only one membership entry is allowed in a batch" ); - membership_entry = Some((*entry.get_log_id(), m.clone())); + membership_entry = Some((entry.get_log_id().clone(), m.clone())); } } @@ -88,7 +88,7 @@ where C: RaftTypeConfig self.output.push_command(Command::AppendInputEntries { // A leader should always use the leader's vote. // It is allowed to be different from local vote. - vote: self.leader.vote, + vote: self.leader.vote.clone(), entries, }); @@ -114,9 +114,9 @@ where C: RaftTypeConfig /// /// See: [Read Operation](crate::docs::protocol::read) pub(crate) fn get_read_log_id(&self) -> Option> { - let committed = self.state.committed().copied(); + let committed = self.state.committed().cloned(); // noop log id is the first log this leader proposed. - std::cmp::max(self.leader.noop_log_id, committed) + std::cmp::max(self.leader.noop_log_id.clone(), committed) } pub(crate) fn replication_handler(&mut self) -> ReplicationHandler { diff --git a/openraft/src/engine/handler/log_handler/mod.rs b/openraft/src/engine/handler/log_handler/mod.rs index 998538d27..b58e39656 100644 --- a/openraft/src/engine/handler/log_handler/mod.rs +++ b/openraft/src/engine/handler/log_handler/mod.rs @@ -42,7 +42,7 @@ where C: RaftTypeConfig return; } - let upto = *purge_upto.unwrap(); + let upto = purge_upto.unwrap().clone(); st.purge_log(&upto); self.output.push_command(Command::PurgeLog { upto }); @@ -83,7 +83,7 @@ where C: RaftTypeConfig let purge_end = self.state.snapshot_meta.last_log_id.next_index().saturating_sub(max_keep); tracing::debug!( - snapshot_last_log_id = debug(self.state.snapshot_meta.last_log_id), + snapshot_last_log_id = debug(self.state.snapshot_meta.last_log_id.clone()), max_keep, "try purge: (-oo, {})", purge_end @@ -91,7 +91,7 @@ where C: RaftTypeConfig if st.last_purged_log_id().next_index() + batch_size > purge_end { tracing::debug!( - snapshot_last_log_id = debug(self.state.snapshot_meta.last_log_id), + snapshot_last_log_id = debug(self.state.snapshot_meta.last_log_id.clone()), max_keep, last_purged_log_id = display(st.last_purged_log_id().summary()), batch_size, diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index 111b3edc8..70d3faaf9 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -76,7 +76,7 @@ where C: RaftTypeConfig "Only leader is allowed to call update_effective_membership()" ); - self.state.membership_state.append(EffectiveMembership::new_arc(Some(*log_id), m.clone())); + self.state.membership_state.append(EffectiveMembership::new_arc(Some(log_id.clone()), m.clone())); // TODO(9): currently only a leader has replication setup. // It's better to setup replication for both leader and candidate. @@ -128,7 +128,7 @@ where C: RaftTypeConfig result: ReplicationResult, ) { // No matter what the result is, the validity of the leader is granted by a follower. - self.update_leader_clock(target, result.sending_time); + self.update_leader_clock(target.clone(), result.sending_time); let id = request_id.request_id(); let Some(id) = id else { @@ -138,7 +138,7 @@ where C: RaftTypeConfig match result.result { Ok(matching) => { - self.update_matching(target, id, matching); + self.update_matching(target.clone(), id, matching); } Err(conflict) => { self.update_conflicting(target, id, conflict); @@ -150,7 +150,7 @@ where C: RaftTypeConfig /// accepted. #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn update_leader_clock(&mut self, node_id: C::NodeId, t: InstantOf) { - tracing::debug!(target = display(node_id), t = debug(t), "{}", func_name!()); + tracing::debug!(target = display(&node_id), t = debug(t), "{}", func_name!()); let granted = *self .leader @@ -181,7 +181,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn update_matching(&mut self, node_id: C::NodeId, inflight_id: u64, log_id: Option>) { tracing::debug!( - node_id = display(node_id), + node_id = display(&node_id), inflight_id = display(inflight_id), log_id = display(log_id.display()), "{}", @@ -192,7 +192,7 @@ where C: RaftTypeConfig // The value granted by a quorum may not yet be a committed. // A committed is **granted** and also is in current term. - let quorum_accepted = *self + let quorum_accepted = self .leader .progress .update_with(&node_id, |prog_entry| { @@ -202,7 +202,8 @@ where C: RaftTypeConfig panic!("update_matching error: {}", e); } }) - .expect("it should always update existing progress"); + .expect("it should always update existing progress") + .clone(); tracing::debug!( quorum_accepted = display(quorum_accepted.display()), @@ -218,7 +219,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn try_commit_quorum_accepted(&mut self, granted: Option>) { // Only when the log id is proposed by current leader, it is committed. - if let Some(c) = granted { + if let Some(c) = granted.clone() { if !self.state.vote_ref().is_same_leader(c.committed_leader_id()) { return; } @@ -226,14 +227,14 @@ where C: RaftTypeConfig if let Some(prev_committed) = self.state.update_committed(&granted) { self.output.push_command(Command::ReplicateCommitted { - committed: self.state.committed().copied(), + committed: self.state.committed().cloned(), }); let seq = self.output.next_sm_seq(); self.output.push_command(Command::Commit { seq, already_committed: prev_committed, - upto: self.state.committed().copied().unwrap(), + upto: self.state.committed().cloned().unwrap(), }); if self.config.snapshot_policy.should_snapshot(&self.state) { @@ -270,7 +271,7 @@ where C: RaftTypeConfig repl_res: Result, String>, ) { tracing::debug!( - target = display(target), + target = display(&target), request_id = display(request_id), result = debug(&repl_res), progress = display(&self.leader.progress), @@ -280,7 +281,7 @@ where C: RaftTypeConfig match repl_res { Ok(p) => { - self.update_success_progress(target, request_id, p); + self.update_success_progress(target.clone(), request_id, p); } Err(err_str) => { tracing::warn!( @@ -338,7 +339,7 @@ where C: RaftTypeConfig // Reset and resend(by self.send_to_all()) replication requests. prog_entry.inflight = Inflight::None; - targets.push((*target, *prog_entry)); + targets.push((target.clone(), prog_entry.clone())); } } self.output.push_command(Command::RebuildReplicationStreams { targets }); @@ -359,7 +360,7 @@ where C: RaftTypeConfig } let t = prog_entry.next_send(self.state, self.config.max_payload_entries); - tracing::debug!(target = display(*id), send = debug(&t), "next send"); + tracing::debug!(target = display(&*id), send = debug(&t), "next send"); match t { Ok(inflight) => { @@ -387,8 +388,8 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn send_to_target(output: &mut EngineOutput, target: &C::NodeId, inflight: &Inflight) { output.push_command(Command::Replicate { - target: *target, - req: *inflight, + target: target.clone(), + req: inflight.clone(), }); } @@ -413,7 +414,7 @@ where C: RaftTypeConfig } // Safe unwrap(): it greater than an Option thus it must be a Some() - let purge_upto = *self.state.purge_upto().unwrap(); + let purge_upto = self.state.purge_upto().unwrap().clone(); // Check if any replication task is going to use the log that are going to purge. let mut in_use = false; @@ -446,7 +447,7 @@ where C: RaftTypeConfig return; } - let id = self.config.id; + let id = self.config.id.clone(); // The leader may not be in membership anymore if let Some(prog_entry) = self.leader.progress.get_mut(&id) { @@ -459,7 +460,7 @@ where C: RaftTypeConfig return; } // TODO: It should be self.state.last_log_id() but None is ok. - prog_entry.inflight = Inflight::logs(None, upto); + prog_entry.inflight = Inflight::logs(None, upto.clone()); let inflight_id = prog_entry.inflight.get_id().unwrap(); self.update_matching(id, inflight_id, upto); diff --git a/openraft/src/engine/handler/server_state_handler/mod.rs b/openraft/src/engine/handler/server_state_handler/mod.rs index d228a895a..59271af3f 100644 --- a/openraft/src/engine/handler/server_state_handler/mod.rs +++ b/openraft/src/engine/handler/server_state_handler/mod.rs @@ -27,7 +27,7 @@ where C: RaftTypeConfig let server_state = self.state.calc_server_state(&self.config.id); tracing::debug!( - id = display(self.config.id), + id = display(&self.config.id), prev_server_state = debug(self.state.server_state), server_state = debug(server_state), "update_server_state_if_changed" @@ -41,10 +41,10 @@ where C: RaftTypeConfig let is_leader = server_state == ServerState::Leader; if !was_leader && is_leader { - tracing::info!(id = display(self.config.id), "become leader"); + tracing::info!(id = display(&self.config.id), "become leader"); self.output.push_command(Command::BecomeLeader); } else if was_leader && !is_leader { - tracing::info!(id = display(self.config.id), "quit leader"); + tracing::info!(id = display(&self.config.id), "quit leader"); self.output.push_command(Command::QuitLeader); } else { // nothing to do diff --git a/openraft/src/engine/handler/snapshot_handler/mod.rs b/openraft/src/engine/handler/snapshot_handler/mod.rs index 004ad2adf..4873e7223 100644 --- a/openraft/src/engine/handler/snapshot_handler/mod.rs +++ b/openraft/src/engine/handler/snapshot_handler/mod.rs @@ -52,7 +52,7 @@ where C: RaftTypeConfig pub(crate) fn update_snapshot(&mut self, meta: SnapshotMeta) -> bool { tracing::info!("update_snapshot: {:?}", meta); - if meta.last_log_id <= self.state.snapshot_last_log_id().copied() { + if meta.last_log_id <= self.state.snapshot_last_log_id().cloned() { tracing::info!( "No need to install a smaller snapshot: current snapshot last_log_id({}), new snapshot last_log_id({})", self.state.snapshot_last_log_id().summary(), diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 122633e5f..4e1b6d090 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -99,7 +99,7 @@ where C: RaftTypeConfig // Ok } else { tracing::info!("vote {} is rejected by local vote: {}", vote, self.state.vote_ref()); - return Err(RejectVoteRequest::ByVote(*self.state.vote_ref())); + return Err(RejectVoteRequest::ByVote(self.state.vote_ref().clone())); } tracing::debug!(%vote, "vote is changing to" ); @@ -108,8 +108,8 @@ where C: RaftTypeConfig if vote > self.state.vote_ref() { tracing::info!("vote is changing from {} to {}", self.state.vote_ref(), vote); - self.state.vote.update(C::now(), *vote); - self.output.push_command(Command::SaveVote { vote: *vote }); + self.state.vote.update(C::now(), vote.clone()); + self.output.push_command(Command::SaveVote { vote: vote.clone() }); } else { self.state.vote.touch(C::now()); } @@ -147,7 +147,7 @@ where C: RaftTypeConfig "become leader: node-{}, my vote: {}, last-log-id: {}", self.config.id, self.state.vote_ref(), - self.state.last_log_id().copied().unwrap_or_default() + self.state.last_log_id().cloned().unwrap_or_default() ); if let Some(l) = self.leader.as_mut() { @@ -162,7 +162,7 @@ where C: RaftTypeConfig // TODO: this is not gonna happen, // because `self.leader`(previous `internal_server_state`) // does not include Candidate any more. - l.vote = *self.state.vote_ref(); + l.vote = self.state.vote_ref().clone(); self.server_state_handler().update_server_state_if_changed(); return; } @@ -195,7 +195,7 @@ where C: RaftTypeConfig // timeout. debug_assert!( - self.state.vote_ref().leader_id().voted_for() != Some(self.config.id) + self.state.vote_ref().leader_id().voted_for() != Some(self.config.id.clone()) || !self.state.membership_state.effective().membership().is_voter(&self.config.id), "It must hold: vote is not mine, or I am not a voter(leader just left the cluster)" ); diff --git a/openraft/src/engine/log_id_list.rs b/openraft/src/engine/log_id_list.rs index 1fef9fd2c..fefd250b1 100644 --- a/openraft/src/engine/log_id_list.rs +++ b/openraft/src/engine/log_id_list.rs @@ -63,7 +63,7 @@ where NID: NodeId }; // Recursion stack - let mut stack = vec![(first, last)]; + let mut stack = vec![(first, last.clone())]; loop { let (first, last) = match stack.pop() { @@ -75,7 +75,7 @@ where NID: NodeId // Case AA if first.leader_id == last.leader_id { - if res.last().map(|x| x.leader_id) < Some(first.leader_id) { + if res.last().map(|x| &x.leader_id) < Some(&first.leader_id) { res.push(first); } continue; @@ -83,7 +83,7 @@ where NID: NodeId // Two adjacent logs with different leader_id, no need to binary search if first.index + 1 == last.index { - if res.last().map(|x| x.leader_id) < Some(first.leader_id) { + if res.last().map(|x| &x.leader_id) < Some(&first.leader_id) { res.push(first); } res.push(last); @@ -94,7 +94,7 @@ where NID: NodeId if first.leader_id == mid.leader_id { // Case AAC - if res.last().map(|x| x.leader_id) < Some(first.leader_id) { + if res.last().map(|x| &x.leader_id) < Some(&first.leader_id) { res.push(first); } stack.push((mid, last)); @@ -105,7 +105,7 @@ where NID: NodeId // Case ABC // first.leader_id < mid_log_id.leader_id < last.leader_id // Deal with (first, mid) then (mid, last) - stack.push((mid, last)); + stack.push((mid.clone(), last.clone())); stack.push((first, mid)); } } @@ -131,14 +131,14 @@ impl LogIdList { pub(crate) fn extend_from_same_leader<'a, LID: RaftLogId + 'a>(&mut self, new_ids: &[LID]) { if let Some(first) = new_ids.first() { let first_id = first.get_log_id(); - self.append(*first_id); + self.append(first_id.clone()); if let Some(last) = new_ids.last() { let last_id = last.get_log_id(); assert_eq!(last_id.leader_id, first_id.leader_id); if last_id != first_id { - self.append(*last_id); + self.append(last_id.clone()); } } } @@ -147,15 +147,15 @@ impl LogIdList { /// Extends a list of `log_id`. #[allow(dead_code)] pub(crate) fn extend<'a, LID: RaftLogId + 'a>(&mut self, new_ids: &[LID]) { - let mut prev = self.last().map(|x| x.leader_id); + let mut prev = self.last().map(|x| x.leader_id.clone()); for x in new_ids.iter() { let log_id = x.get_log_id(); - if prev != Some(log_id.leader_id) { - self.append(*log_id); + if prev.as_ref() != Some(&log_id.leader_id) { + self.append(log_id.clone()); - prev = Some(log_id.leader_id); + prev = Some(log_id.leader_id.clone()); } } @@ -163,7 +163,7 @@ impl LogIdList { let log_id = last.get_log_id(); if self.last() != Some(log_id) { - self.append(*log_id); + self.append(log_id.clone()); } } } @@ -199,9 +199,9 @@ impl LogIdList { // l >= 2 - let last = self.key_log_ids[l - 1]; + let last = self.key_log_ids[l - 1].clone(); - if self.key_log_ids.get(l - 2).map(|x| x.leader_id) == Some(last.leader_id) { + if self.key_log_ids.get(l - 2).map(|x| &x.leader_id) == Some(&last.leader_id) { // Replace the **last log id**. self.key_log_ids[l - 1] = new_log_id; return; @@ -233,7 +233,7 @@ impl LogIdList { // Add key log id if there is a gap between last.index and at - 1. let last = self.key_log_ids.last(); if let Some(last) = last { - let (last_leader_id, last_index) = (last.leader_id, last.index); + let (last_leader_id, last_index) = (last.leader_id.clone(), last.index); if last_index < at - 1 { self.append(LogId::new(last_leader_id, at - 1)); } @@ -248,7 +248,7 @@ impl LogIdList { // When installing snapshot it may need to purge across the `last_log_id`. if upto.index >= last.next_index() { debug_assert!(Some(upto) > self.last()); - self.key_log_ids = vec![*upto]; + self.key_log_ids = vec![upto.clone()]; return; } @@ -278,12 +278,12 @@ impl LogIdList { let res = self.key_log_ids.binary_search_by(|log_id| log_id.index.cmp(&index)); match res { - Ok(i) => Some(LogId::new(self.key_log_ids[i].leader_id, index)), + Ok(i) => Some(LogId::new(self.key_log_ids[i].leader_id.clone(), index)), Err(i) => { if i == 0 || i == self.key_log_ids.len() { None } else { - Some(LogId::new(self.key_log_ids[i - 1].leader_id, index)) + Some(LogId::new(self.key_log_ids[i - 1].leader_id.clone(), index)) } } } diff --git a/openraft/src/entry/mod.rs b/openraft/src/entry/mod.rs index f4cbecabf..612e54cd3 100644 --- a/openraft/src/entry/mod.rs +++ b/openraft/src/entry/mod.rs @@ -35,7 +35,7 @@ where { fn clone(&self) -> Self { Self { - log_id: self.log_id, + log_id: self.log_id.clone(), payload: self.payload.clone(), } } @@ -114,7 +114,7 @@ where C: RaftTypeConfig } fn set_log_id(&mut self, log_id: &LogId) { - self.log_id = *log_id; + self.log_id = log_id.clone(); } } diff --git a/openraft/src/log_id/log_id_option_ext.rs b/openraft/src/log_id/log_id_option_ext.rs index 9a9bd829f..39a2fc35b 100644 --- a/openraft/src/log_id/log_id_option_ext.rs +++ b/openraft/src/log_id/log_id_option_ext.rs @@ -14,7 +14,7 @@ pub trait LogIdOptionExt { impl LogIdOptionExt for Option> { fn index(&self) -> Option { - self.map(|x| x.index) + self.as_ref().map(|x| x.index) } fn next_index(&self) -> u64 { diff --git a/openraft/src/log_id/mod.rs b/openraft/src/log_id/mod.rs index 650e96a98..f167577e7 100644 --- a/openraft/src/log_id/mod.rs +++ b/openraft/src/log_id/mod.rs @@ -37,7 +37,7 @@ impl RaftLogId for LogId { } fn set_log_id(&mut self, log_id: &LogId) { - *self = *log_id + *self = log_id.clone() } } diff --git a/openraft/src/log_id_range.rs b/openraft/src/log_id_range.rs index 8885326f9..c2a5d1ce1 100644 --- a/openraft/src/log_id_range.rs +++ b/openraft/src/log_id_range.rs @@ -32,7 +32,7 @@ impl Display for LogIdRange { impl Validate for LogIdRange { fn validate(&self) -> Result<(), Box> { - validit::less_equal!(self.prev, self.last); + validit::less_equal!(&self.prev, &self.last); Ok(()) } } diff --git a/openraft/src/membership/effective_membership.rs b/openraft/src/membership/effective_membership.rs index 8b223ef50..bff87184d 100644 --- a/openraft/src/membership/effective_membership.rs +++ b/openraft/src/membership/effective_membership.rs @@ -65,7 +65,7 @@ where LID: RaftLogId, { fn from(v: (&LID, Membership)) -> Self { - EffectiveMembership::new(Some(*v.0.get_log_id()), v.1) + EffectiveMembership::new(Some(v.0.get_log_id().clone()), v.1) } } @@ -84,7 +84,7 @@ where let configs = membership.get_joint_config(); let mut joint = vec![]; for c in configs { - joint.push(c.iter().copied().collect::>()); + joint.push(c.iter().cloned().collect::>()); } let quorum_set = Joint::from(joint); @@ -97,7 +97,7 @@ where } pub(crate) fn new_from_stored_membership(stored: StoredMembership) -> Self { - Self::new(*stored.log_id(), stored.membership().clone()) + Self::new(stored.log_id().clone(), stored.membership().clone()) } pub(crate) fn stored_membership(&self) -> &Arc> { @@ -126,7 +126,7 @@ where /// Returns an Iterator of all voter node ids. Learners are not included. pub fn voter_ids(&self) -> impl Iterator + '_ { - self.voter_ids.iter().copied() + self.voter_ids.iter().cloned() } /// Returns an Iterator of all learner node ids. Voters are not included. diff --git a/openraft/src/membership/membership.rs b/openraft/src/membership/membership.rs index 2d62fdd79..f08dc5ec8 100644 --- a/openraft/src/membership/membership.rs +++ b/openraft/src/membership/membership.rs @@ -170,7 +170,7 @@ where /// Returns an Iterator of all learner node ids. Voters are not included. pub fn learner_ids(&self) -> impl Iterator + '_ { - self.nodes.keys().filter(|x| !self.is_voter(x)).copied() + self.nodes.keys().filter(|x| !self.is_voter(x)).cloned() } } @@ -213,7 +213,7 @@ where if res.contains_key(k) { continue; } - res.insert(*k, v.clone()); + res.insert(k.clone(), v.clone()); } res @@ -310,19 +310,19 @@ where let new_membership = match change { ChangeMembers::AddVoterIds(add_voter_ids) => { - let new_voter_ids = last.union(&add_voter_ids).copied().collect::>(); + let new_voter_ids = last.union(&add_voter_ids).cloned().collect::>(); self.next_coherent(new_voter_ids, retain) } ChangeMembers::AddVoters(add_voters) => { // Add nodes without overriding existent self.nodes = Self::extend_nodes(self.nodes, &add_voters); - let add_voter_ids = add_voters.keys().copied().collect::>(); - let new_voter_ids = last.union(&add_voter_ids).copied().collect::>(); + let add_voter_ids = add_voters.keys().cloned().collect::>(); + let new_voter_ids = last.union(&add_voter_ids).cloned().collect::>(); self.next_coherent(new_voter_ids, retain) } ChangeMembers::RemoveVoters(remove_voter_ids) => { - let new_voter_ids = last.difference(&remove_voter_ids).copied().collect::>(); + let new_voter_ids = last.difference(&remove_voter_ids).cloned().collect::>(); self.next_coherent(new_voter_ids, retain) } ChangeMembers::ReplaceAllVoters(all_voter_ids) => self.next_coherent(all_voter_ids, retain), @@ -362,7 +362,7 @@ where pub(crate) fn to_quorum_set(&self) -> Joint, Vec>> { let mut qs = vec![]; for c in self.get_joint_config().iter() { - qs.push(c.iter().copied().collect::>()); + qs.push(c.iter().cloned().collect::>()); } Joint::new(qs) } diff --git a/openraft/src/metrics/wait.rs b/openraft/src/metrics/wait.rs index 80b8a63ff..14d28cda4 100644 --- a/openraft/src/metrics/wait.rs +++ b/openraft/src/metrics/wait.rs @@ -121,7 +121,7 @@ where #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn current_leader(&self, leader_id: NID, msg: impl ToString) -> Result, WaitError> { self.metrics( - |m| m.current_leader == Some(leader_id), + |m| m.current_leader.as_ref() == Some(&leader_id), &format!("{} .current_leader == {}", msg.to_string(), leader_id), ) .await diff --git a/openraft/src/network/snapshot_transport.rs b/openraft/src/network/snapshot_transport.rs index dcb5a13a3..c8c20e176 100644 --- a/openraft/src/network/snapshot_transport.rs +++ b/openraft/src/network/snapshot_transport.rs @@ -142,7 +142,7 @@ where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io: let done = (offset + n_read as u64) == end; let req = InstallSnapshotRequest { - vote, + vote: vote.clone(), meta: snapshot.meta.clone(), offset, data: buf, diff --git a/openraft/src/node.rs b/openraft/src/node.rs index c280df842..e512a623d 100644 --- a/openraft/src/node.rs +++ b/openraft/src/node.rs @@ -19,7 +19,6 @@ pub trait NodeIdEssential: + Debug + Display + Hash - + Copy + Clone + Default + 'static diff --git a/openraft/src/progress/entry/mod.rs b/openraft/src/progress/entry/mod.rs index 7fe2f7bb6..2b90627fa 100644 --- a/openraft/src/progress/entry/mod.rs +++ b/openraft/src/progress/entry/mod.rs @@ -37,7 +37,7 @@ impl ProgressEntry { #[allow(dead_code)] pub(crate) fn new(matching: Option>) -> Self { Self { - matching, + matching: matching.clone(), curr_inflight_id: 0, inflight: Inflight::None, searching_end: matching.next_index(), @@ -79,7 +79,7 @@ impl ProgressEntry { match &self.inflight { Inflight::None => false, Inflight::Logs { log_id_range, .. } => { - let lid = Some(*upto); + let lid = Some(upto.clone()); lid > log_id_range.prev } Inflight::Snapshot { last_log_id: _, .. } => false, @@ -98,7 +98,7 @@ impl ProgressEntry { "update_matching" ); - self.inflight.ack(request_id, matching)?; + self.inflight.ack(request_id, matching.clone())?; debug_assert!(matching >= self.matching); self.matching = matching; @@ -196,7 +196,7 @@ impl ProgressEntry { if self.searching_end < purge_upto_next { self.curr_inflight_id += 1; let snapshot_last = log_state.snapshot_last_log_id(); - self.inflight = Inflight::snapshot(snapshot_last.copied()).with_id(self.curr_inflight_id); + self.inflight = Inflight::snapshot(snapshot_last.cloned()).with_id(self.curr_inflight_id); return Ok(&self.inflight); } @@ -264,17 +264,17 @@ impl Validate for ProgressEntry { self.inflight.validate()?; - match self.inflight { + match &self.inflight { Inflight::None => {} Inflight::Logs { log_id_range, .. } => { // matching <= prev_log_id <= last_log_id // prev_log_id.next_index() <= searching_end - validit::less_equal!(self.matching, log_id_range.prev); + validit::less_equal!(&self.matching, &log_id_range.prev); validit::less_equal!(log_id_range.prev.next_index(), self.searching_end); } Inflight::Snapshot { last_log_id, .. } => { // There is no need to send a snapshot smaller than last matching. - validit::less!(self.matching, last_log_id); + validit::less!(&self.matching, &last_log_id); } } Ok(()) diff --git a/openraft/src/progress/inflight/mod.rs b/openraft/src/progress/inflight/mod.rs index 92df8fd3c..9adff98cb 100644 --- a/openraft/src/progress/inflight/mod.rs +++ b/openraft/src/progress/inflight/mod.rs @@ -177,7 +177,7 @@ impl Inflight { *self = { debug_assert!(upto >= log_id_range.prev); debug_assert!(upto <= log_id_range.last); - Inflight::logs(upto, log_id_range.last).with_id(*id) + Inflight::logs(upto, log_id_range.last.clone()).with_id(*id) } } Inflight::Snapshot { id: _, last_log_id } => { diff --git a/openraft/src/progress/mod.rs b/openraft/src/progress/mod.rs index 5f3335148..e39fed57c 100644 --- a/openraft/src/progress/mod.rs +++ b/openraft/src/progress/mod.rs @@ -138,10 +138,10 @@ where impl Display for VecProgress where - ID: PartialEq + Debug + Copy + 'static, - V: Copy + 'static, + ID: PartialEq + Debug + Clone + 'static, + V: Clone + 'static, V: Borrow

, - P: PartialOrd + Ord + Copy + 'static, + P: PartialOrd + Ord + Clone + 'static, QS: QuorumSet + 'static, ID: Display, V: Display, @@ -170,22 +170,22 @@ pub(crate) struct Stat { impl VecProgress where - ID: PartialEq + Copy + Debug + 'static, - V: Copy + 'static, + ID: PartialEq + Clone + Debug + 'static, + V: Clone + 'static, V: Borrow

, - P: PartialOrd + Ord + Copy + 'static, + P: PartialOrd + Ord + Clone + 'static, QS: QuorumSet, { pub(crate) fn new(quorum_set: QS, learner_ids: impl IntoIterator, default_v: V) -> Self { - let mut vector = quorum_set.ids().map(|id| (id, default_v)).collect::>(); + let mut vector = quorum_set.ids().map(|id| (id, default_v.clone())).collect::>(); let voter_count = vector.len(); - vector.extend(learner_ids.into_iter().map(|id| (id, default_v))); + vector.extend(learner_ids.into_iter().map(|id| (id, default_v.clone()))); Self { quorum_set, - granted: *default_v.borrow(), + granted: default_v.borrow().clone(), voter_count, vector, stat: Default::default(), @@ -231,10 +231,10 @@ where impl Progress for VecProgress where - ID: PartialEq + Debug + Copy + 'static, - V: Copy + 'static, + ID: PartialEq + Debug + Clone + 'static, + V: Clone + 'static, V: Borrow

, - P: PartialOrd + Ord + Copy + 'static, + P: PartialOrd + Ord + Clone + 'static, QS: QuorumSet + 'static, { /// Update one of the scalar value and re-calculate the committed value. @@ -283,7 +283,7 @@ where let elt = &mut self.vector[index]; - let prev_progress = *elt.1.borrow(); + let prev_progress = elt.1.borrow().clone(); f(&mut elt.1); @@ -324,7 +324,7 @@ where self.stat.is_quorum_count += 1; if self.quorum_set.is_quorum(it) { - self.granted = *prog; + self.granted = prog.clone(); break; } } @@ -365,12 +365,12 @@ where } fn upgrade_quorum_set(self, quorum_set: QS, leaner_ids: &[ID], default_v: V) -> Self { - let mut new_prog = Self::new(quorum_set, leaner_ids.iter().copied(), default_v); + let mut new_prog = Self::new(quorum_set, leaner_ids.iter().cloned(), default_v); new_prog.stat = self.stat.clone(); for (id, v) in self.iter() { - let _ = new_prog.update(id, *v); + let _ = new_prog.update(id, v.clone()); } new_prog } diff --git a/openraft/src/proposer/candidate.rs b/openraft/src/proposer/candidate.rs index 42af2f90f..e7ad11b8b 100644 --- a/openraft/src/proposer/candidate.rs +++ b/openraft/src/proposer/candidate.rs @@ -98,19 +98,19 @@ where /// Return the node ids that has granted this vote. #[allow(dead_code)] pub(crate) fn granters(&self) -> impl Iterator + '_ { - self.progress().iter().filter(|(_, granted)| *granted).map(|(target, _)| *target) + self.progress().iter().filter(|(_, granted)| *granted).map(|(target, _)| target.clone()) } pub(crate) fn into_leader(self) -> Leader { // Mark the vote as committed, i.e., being granted and saved by a quorum. let vote = { - let mut vote = *self.vote_ref(); + let mut vote = self.vote_ref().clone(); debug_assert!(!vote.is_committed()); vote.commit(); vote }; - let last_leader_log_ids = self.last_log_id().copied().into_iter().collect::>(); + let last_leader_log_ids = self.last_log_id().cloned().into_iter().collect::>(); Leader::new(vote, self.quorum_set.clone(), self.learner_ids, &last_leader_log_ids) } diff --git a/openraft/src/proposer/leader.rs b/openraft/src/proposer/leader.rs index 0b2ddb4c6..45636dcc9 100644 --- a/openraft/src/proposer/leader.rs +++ b/openraft/src/proposer/leader.rs @@ -73,13 +73,15 @@ where ) -> Self { debug_assert!(vote.is_committed()); debug_assert!( - Some(vote.committed_leader_id().unwrap()) >= last_leader_log_id.last().map(|x| *x.committed_leader_id()), + Some(vote.committed_leader_id().unwrap()) + >= last_leader_log_id.last().map(|x| x.committed_leader_id().clone()), "vote {} must GE last_leader_log_id.last() {}", vote, last_leader_log_id.display() ); debug_assert!( - Some(vote.committed_leader_id().unwrap()) >= last_leader_log_id.first().map(|x| *x.committed_leader_id()), + Some(vote.committed_leader_id().unwrap()) + >= last_leader_log_id.first().map(|x| x.committed_leader_id().clone()), "vote {} must GE last_leader_log_id.first() {}", vote, last_leader_log_id.display() @@ -90,12 +92,12 @@ where let vote_leader_id = vote.committed_leader_id().unwrap(); let first = last_leader_log_id.first(); - let noop_log_id = if first.map(|x| *x.committed_leader_id()) == Some(vote_leader_id) { + let noop_log_id = if first.map(|x| x.committed_leader_id().clone()) == Some(vote_leader_id) { // There is already log id proposed by the this leader. // E.g. the Leader is restarted without losing leadership. // // Set to the first log id proposed by this Leader. - first.copied() + first.cloned() } else { // Set to a log id that will be proposed. Some(LogId::new( @@ -104,15 +106,15 @@ where )) }; - let last_log_id = last_leader_log_id.last().copied(); + let last_log_id = last_leader_log_id.last().cloned(); Self { vote, - last_log_id, + last_log_id: last_log_id.clone(), noop_log_id, progress: VecProgress::new( quorum_set.clone(), - learner_ids.iter().copied(), + learner_ids.iter().cloned(), ProgressEntry::empty(last_log_id.next_index()), ), clock_progress: VecProgress::new(quorum_set, learner_ids, None), @@ -150,7 +152,7 @@ where let committed_leader_id = self.vote.committed_leader_id().unwrap(); let first = LogId::new(committed_leader_id, self.last_log_id().next_index()); - let mut last = first; + let mut last = first.clone(); for entry in entries { entry.set_log_id(&last); @@ -185,7 +187,7 @@ where let now = Instant::now(); tracing::debug!( - leader_id = display(node_id), + leader_id = display(&node_id), now = debug(now), "{}: update with leader's local time, before retrieving quorum acked clock", func_name!() diff --git a/openraft/src/quorum/quorum_set_impl.rs b/openraft/src/quorum/quorum_set_impl.rs index 0a5d008b6..2cfac26a4 100644 --- a/openraft/src/quorum/quorum_set_impl.rs +++ b/openraft/src/quorum/quorum_set_impl.rs @@ -4,7 +4,7 @@ use crate::quorum::quorum_set::QuorumSet; /// Impl a simple majority quorum set impl QuorumSet for BTreeSet -where ID: PartialOrd + Ord + Copy + 'static +where ID: PartialOrd + Ord + Clone + 'static { type Iter = std::collections::btree_set::IntoIter; @@ -29,7 +29,7 @@ where ID: PartialOrd + Ord + Copy + 'static /// Impl a simple majority quorum set impl QuorumSet for Vec -where ID: PartialOrd + Ord + Copy + 'static +where ID: PartialOrd + Ord + Clone + 'static { type Iter = std::collections::btree_set::IntoIter; @@ -54,7 +54,7 @@ where ID: PartialOrd + Ord + Copy + 'static /// Impl a simple majority quorum set impl QuorumSet for &[ID] -where ID: PartialOrd + Ord + Copy + 'static +where ID: PartialOrd + Ord + Clone + 'static { type Iter = std::collections::btree_set::IntoIter; @@ -73,6 +73,6 @@ where ID: PartialOrd + Ord + Copy + 'static } fn ids(&self) -> Self::Iter { - BTreeSet::from_iter(self.iter().copied()).into_iter() + BTreeSet::from_iter(self.iter().cloned()).into_iter() } } diff --git a/openraft/src/raft/impl_raft_blocking_write.rs b/openraft/src/raft/impl_raft_blocking_write.rs index 29809cd59..eb92f0a28 100644 --- a/openraft/src/raft/impl_raft_blocking_write.rs +++ b/openraft/src/raft/impl_raft_blocking_write.rs @@ -82,7 +82,7 @@ where C: RaftTypeConfig> tracing::debug!("res of first step: {}", res.summary()); - let (log_id, joint) = (res.log_id, res.membership.clone().unwrap()); + let (log_id, joint) = (res.log_id.clone(), res.membership.clone().unwrap()); if joint.get_joint_config().len() == 1 { return Ok(res); @@ -121,7 +121,7 @@ where C: RaftTypeConfig> /// /// A `node` is able to store the network address of a node. Thus an application does not /// need another store for mapping node-id to ip-addr when implementing the RaftNetwork. - #[tracing::instrument(level = "debug", skip(self, id), fields(target=display(id)))] + #[tracing::instrument(level = "debug", skip(self, id), fields(target=display(&id)))] pub async fn add_learner( &self, id: C::NodeId, @@ -131,7 +131,7 @@ where C: RaftTypeConfig> let (tx, rx) = oneshot_channel::(); let msg = RaftMsg::ChangeMembership { - changes: ChangeMembers::AddNodes(btreemap! {id=>node}), + changes: ChangeMembers::AddNodes(btreemap! {id.clone()=>node}), retain: true, tx, }; @@ -149,12 +149,12 @@ where C: RaftTypeConfig> // Otherwise, blocks until the replication to the new learner becomes up to date. // The log id of the membership that contains the added learner. - let membership_log_id = resp.log_id; + let membership_log_id = resp.log_id.clone(); let wait_res = self .wait(None) .metrics( - |metrics| match self.check_replication_upto_date(metrics, id, Some(membership_log_id)) { + |metrics| match self.check_replication_upto_date(metrics, id.clone(), Some(membership_log_id.clone())) { Ok(_matching) => true, // keep waiting Err(_) => false, diff --git a/openraft/src/raft/message/vote.rs b/openraft/src/raft/message/vote.rs index 1f85d5a44..c2964641c 100644 --- a/openraft/src/raft/message/vote.rs +++ b/openraft/src/raft/message/vote.rs @@ -56,7 +56,7 @@ impl MessageSummary> for VoteResponse { format!( "{{{}, last_log:{:?}}}", self.vote, - self.last_log_id.map(|x| x.to_string()) + self.last_log_id.as_ref().map(|x| x.to_string()) ) } } @@ -66,9 +66,9 @@ where NID: NodeId { pub fn new(vote: impl Borrow>, last_log_id: Option>, granted: bool) -> Self { Self { - vote: *vote.borrow(), + vote: vote.borrow().clone(), vote_granted: granted, - last_log_id: last_log_id.map(|x| *x.borrow()), + last_log_id: last_log_id.map(|x| x.borrow().clone()), } } @@ -87,7 +87,7 @@ where NID: NodeId f, "{{{}, last_log:{:?}}}", self.vote, - self.last_log_id.map(|x| x.to_string()) + self.last_log_id.as_ref().map(|x| x.to_string()) ) } } diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 7a01d5783..db3e339ee 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -242,7 +242,7 @@ where C: RaftTypeConfig { let (tx_api, rx_api) = mpsc::unbounded_channel(); let (tx_notify, rx_notify) = mpsc::unbounded_channel(); - let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id)); + let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id.clone())); let (tx_data_metrics, rx_data_metrics) = watch::channel(RaftDataMetrics::default()); let (tx_server_metrics, rx_server_metrics) = watch::channel(RaftServerMetrics::default()); let (tx_shutdown, rx_shutdown) = C::AsyncRuntime::oneshot(); @@ -259,11 +259,11 @@ where C: RaftTypeConfig parent: tracing::Span::current(), Level::DEBUG, "RaftCore", - id = display(id), + id = display(&id), cluster = display(&config.cluster_name) ); - let eng_config = EngineConfig::new::(id, config.as_ref()); + let eng_config = EngineConfig::new::(id.clone(), config.as_ref()); let state = { let mut helper = StorageHelper::new(&mut log_store, &mut state_machine); @@ -275,7 +275,7 @@ where C: RaftTypeConfig let sm_handle = worker::Worker::spawn(state_machine, tx_notify.clone()); let core: RaftCore = RaftCore { - id, + id: id.clone(), config: config.clone(), runtime_config: runtime_config.clone(), network, @@ -492,9 +492,9 @@ where C: RaftTypeConfig { tracing::debug!(req = display(&req), "Raft::install_snapshot()"); - let req_vote = req.vote; - let my_vote = self.with_raft_state(|state| *state.vote_ref()).await?; - let resp = InstallSnapshotResponse { vote: my_vote }; + let req_vote = req.vote.clone(); + let my_vote = self.with_raft_state(|state| state.vote_ref().clone()).await?; + let resp = InstallSnapshotResponse { vote: my_vote.clone() }; // Check vote. // It is not mandatory because it is just a read operation @@ -530,7 +530,7 @@ where C: RaftTypeConfig /// reads. This method is perfect for making decisions on where to route client requests. #[tracing::instrument(level = "debug", skip(self))] pub async fn current_leader(&self) -> Option { - self.metrics().borrow().current_leader + self.metrics().borrow().current_leader.clone() } /// Check to ensure this node is still the cluster leader, in order to guard against stale reads @@ -766,7 +766,7 @@ where C: RaftTypeConfig Some(x) => x, }; - let matched = *target_metrics; + let matched = target_metrics.clone(); let distance = replication_lag(&matched.index(), &metrics.last_log_index); diff --git a/openraft/src/raft_state/membership_state/change_handler.rs b/openraft/src/raft_state/membership_state/change_handler.rs index aa3094beb..a818250cd 100644 --- a/openraft/src/raft_state/membership_state/change_handler.rs +++ b/openraft/src/raft_state/membership_state/change_handler.rs @@ -57,8 +57,8 @@ where Ok(()) } else { Err(InProgress { - committed: *committed.log_id(), - membership_log_id: *effective.log_id(), + committed: committed.log_id().clone(), + membership_log_id: effective.log_id().clone(), }) } } diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index 6d5cfcc1a..b6eed0aa0 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -253,7 +253,7 @@ where ); if accepted.as_ref() > self.accepted.last_accepted_log_id(self.vote_ref().leader_id()) { - self.accepted = Accepted::new(*self.vote_ref().leader_id(), accepted); + self.accepted = Accepted::new(self.vote_ref().leader_id().clone(), accepted); } } @@ -273,9 +273,9 @@ where #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn update_committed(&mut self, committed: &Option>) -> Option>> { if committed.as_ref() > self.committed() { - let prev = self.committed().copied(); + let prev = self.committed().cloned(); - self.committed = *committed; + self.committed = committed.clone(); self.membership_state.commit(committed); Some(prev) @@ -403,7 +403,7 @@ where let last_leader_log_ids = self.log_ids.by_last_leader(); Leader::new( - *self.vote_ref(), + self.vote_ref().clone(), em.to_quorum_set(), em.learner_ids(), last_leader_log_ids, diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 55de50ea8..61262075f 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -154,7 +154,7 @@ where LS: RaftLogStorage, { /// Spawn a new replication task for the target node. - #[tracing::instrument(level = "trace", skip_all,fields(target=display(target), session_id=display(session_id)))] + #[tracing::instrument(level = "trace", skip_all,fields(target=display(&target), session_id=display(&session_id)))] #[allow(clippy::type_complexity)] #[allow(clippy::too_many_arguments)] pub(crate) fn spawn( @@ -208,7 +208,7 @@ where } } - #[tracing::instrument(level="debug", skip(self), fields(session=%self.session_id, target=display(self.target), cluster=%self.config.cluster_name))] + #[tracing::instrument(level="debug", skip(self), fields(session=%self.session_id, target=display(&self.target), cluster=%self.config.cluster_name))] async fn main(mut self) -> Result<(), ReplicationClosed> { loop { let action = self.next_action.take(); @@ -229,7 +229,7 @@ where Data::Heartbeat => { let m = &self.matching; // request_id==None will be ignored by RaftCore. - let d = DataWithId::new(RequestId::new_heartbeat(), LogIdRange::new(*m, *m)); + let d = DataWithId::new(RequestId::new_heartbeat(), LogIdRange::new(m.clone(), m.clone())); log_data = Some(d.clone()); self.send_log_entries(d).await @@ -266,7 +266,7 @@ where response: Response::HigherVote { target: self.target, higher: h.higher, - sender_vote: *self.session_id.vote_ref(), + sender_vote: self.session_id.vote_ref().clone(), }, }); return Ok(()); @@ -390,14 +390,14 @@ where if start == end { // Heartbeat RPC, no logs to send, last log id is the same as prev_log_id - let r = LogIdRange::new(rng.prev, rng.prev); + let r = LogIdRange::new(rng.prev.clone(), rng.prev.clone()); (vec![], r) } else { // limited_get_log_entries will return logs smaller than the range [start, end). let logs = self.log_reader.limited_get_log_entries(start, end).await?; - let first = *logs.first().map(|x| x.get_log_id()).unwrap(); - let last = *logs.last().map(|x| x.get_log_id()).unwrap(); + let first = logs.first().map(|x| x.get_log_id().clone()).unwrap(); + let last = logs.last().map(|x| x.get_log_id().clone()).unwrap(); debug_assert!( !logs.is_empty() && logs.len() <= (end - start) as usize, @@ -409,7 +409,7 @@ where last ); - let r = LogIdRange::new(rng.prev, Some(last)); + let r = LogIdRange::new(rng.prev.clone(), Some(last)); (logs, r) } }; @@ -418,9 +418,9 @@ where // Build the heartbeat frame to be sent to the follower. let payload = AppendEntriesRequest { - vote: *self.session_id.vote_ref(), - prev_log_id: sending_range.prev, - leader_commit: self.committed, + vote: self.session_id.vote_ref().clone(), + prev_log_id: sending_range.prev.clone(), + leader_commit: self.committed.clone(), entries: logs, }; @@ -442,7 +442,7 @@ where let to = Timeout { action: RPCTypes::AppendEntries, id: self.session_id.vote_ref().leader_id().voted_for().unwrap(), - target: self.target, + target: self.target.clone(), timeout: the_timeout, }; RPCError::Timeout(to) @@ -478,11 +478,11 @@ where Err(ReplicationError::HigherVote(HigherVote { higher: vote, - sender_vote: *self.session_id.vote_ref(), + sender_vote: self.session_id.vote_ref().clone(), })) } AppendEntriesResponse::Conflict => { - let conflict = sending_range.prev; + let conflict = sending_range.prev.clone(); debug_assert!(conflict.is_some(), "prev_log_id=None never conflict"); let conflict = conflict.unwrap(); @@ -498,10 +498,10 @@ where fn send_progress_error(&mut self, request_id: RequestId, err: RPCError) { let _ = self.tx_raft_core.send(Notify::Network { response: Response::Progress { - target: self.target, + target: self.target.clone(), request_id, result: Err(err.to_string()), - session_id: self.session_id, + session_id: self.session_id.clone(), }, }); } @@ -510,17 +510,17 @@ where fn send_progress(&mut self, request_id: RequestId, replication_result: ReplicationResult) { tracing::debug!( request_id = display(request_id), - target = display(self.target), + target = display(&self.target), curr_matching = display(self.matching.display()), result = display(&replication_result), "{}", func_name!() ); - match replication_result.result { + match &replication_result.result { Ok(matching) => { - self.validate_matching(matching); - self.matching = matching; + self.validate_matching(matching.clone()); + self.matching = matching.clone(); } Err(_conflict) => { // Conflict is not allowed to be less than the current matching. @@ -530,9 +530,9 @@ where let _ = self.tx_raft_core.send({ Notify::Network { response: Response::Progress { - session_id: self.session_id, + session_id: self.session_id.clone(), request_id, - target: self.target, + target: self.target.clone(), result: Ok(replication_result), }, } @@ -738,7 +738,7 @@ where let jh = C::spawn(Self::send_snapshot( request_id, self.snapshot_network.clone(), - *self.session_id.vote_ref(), + self.session_id.vote_ref().clone(), snapshot, option, rx_cancel, @@ -814,7 +814,7 @@ where let resp = result?; // Handle response conditions. - let sender_vote = *self.session_id.vote_ref(); + let sender_vote = self.session_id.vote_ref().clone(); if resp.vote > sender_vote { return Err(ReplicationError::HigherVote(HigherVote { higher: resp.vote, @@ -839,12 +839,15 @@ where leader_time: InstantOf, log_ids: DataWithId>, ) -> Option> { - self.send_progress(log_ids.request_id(), ReplicationResult::new(leader_time, Ok(matching))); + self.send_progress( + log_ids.request_id(), + ReplicationResult::new(leader_time, Ok(matching.clone())), + ); if matching < log_ids.data().last { Some(Data::new_logs( log_ids.request_id(), - LogIdRange::new(matching, log_ids.data().last), + LogIdRange::new(matching, log_ids.data().last.clone()), )) } else { None diff --git a/openraft/src/storage/callback.rs b/openraft/src/storage/callback.rs index 5ba2a9cc1..7330f2841 100644 --- a/openraft/src/storage/callback.rs +++ b/openraft/src/storage/callback.rs @@ -73,7 +73,7 @@ where C: RaftTypeConfig let res = match result { Ok(x) => { tracing::debug!("LogApplied upto {}", self.last_log_id); - let resp = (self.last_log_id, x); + let resp = (self.last_log_id.clone(), x); self.tx.send(Ok(resp)) } Err(e) => { diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index 1d082f2b5..cb253fa66 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -89,7 +89,7 @@ where // TODO: It is possible `committed < last_applied` because when installing snapshot, // new committed should be saved, but not yet. if committed < last_applied { - committed = last_applied; + committed = last_applied.clone(); } // Re-apply log entries to recover SM to latest state. @@ -113,9 +113,9 @@ where last_applied.display(), ); - self.log_store.purge(last_applied.unwrap()).await?; - last_log_id = last_applied; - last_purged_log_id = last_applied; + self.log_store.purge(last_applied.clone().unwrap()).await?; + last_log_id = last_applied.clone(); + last_purged_log_id = last_applied.clone(); } tracing::info!( @@ -123,7 +123,7 @@ where last_purged_log_id.display(), last_log_id.display() ); - let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self.log_store).await?; + let log_ids = LogIdList::load_log_ids(last_purged_log_id.clone(), last_log_id, self.log_store).await?; let snapshot = self.state_machine.get_current_snapshot().await?; @@ -145,11 +145,11 @@ where // TODO: `flushed` is not set. let io_state = IOState::new( - vote, + vote.clone(), LogIOId::default(), - last_applied, - snapshot_meta.last_log_id, - last_purged_log_id, + last_applied.clone(), + snapshot_meta.last_log_id.clone(), + last_purged_log_id.clone(), ); let now = C::now(); @@ -296,7 +296,7 @@ where for ent in entries.iter().rev() { if let Some(mem) = ent.get_membership() { - let em = StoredMembership::new(Some(*ent.get_log_id()), mem.clone()); + let em = StoredMembership::new(Some(ent.get_log_id().clone()), mem.clone()); res.insert(0, em); if res.len() == 2 { return Ok(res); diff --git a/openraft/src/storage/log_store_ext.rs b/openraft/src/storage/log_store_ext.rs index 49663fe1b..b0d060411 100644 --- a/openraft/src/storage/log_store_ext.rs +++ b/openraft/src/storage/log_store_ext.rs @@ -43,7 +43,7 @@ where C: RaftTypeConfig async fn get_log_id(&mut self, log_index: u64) -> Result, StorageError> { let entries = self.get_log_entries(log_index..=log_index).await?; - Ok(*entries[0].get_log_id()) + Ok(entries[0].get_log_id().clone()) } } diff --git a/openraft/src/storage/mod.rs b/openraft/src/storage/mod.rs index 5a6cfc273..24f03d12d 100644 --- a/openraft/src/storage/mod.rs +++ b/openraft/src/storage/mod.rs @@ -94,8 +94,8 @@ where { pub fn signature(&self) -> SnapshotSignature { SnapshotSignature { - last_log_id: self.last_log_id, - last_membership_log_id: *self.last_membership.log_id(), + last_log_id: self.last_log_id.clone(), + last_membership_log_id: self.last_membership.log_id().clone(), snapshot_id: self.snapshot_id.clone(), } } diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/suite.rs index 745861bc8..bd749ca2d 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/suite.rs @@ -525,7 +525,7 @@ where "state machine has higher log" ); assert_eq!( - initial.last_purged_log_id().copied(), + initial.last_purged_log_id().cloned(), Some(log_id_0(3, 1)), "state machine has higher log" ); @@ -753,13 +753,13 @@ where tokio::time::sleep(Duration::from_millis(1_000)).await; let ent = store.try_get_log_entry(3).await?; - assert_eq!(Some(log_id_0(1, 3)), ent.map(|x| *x.get_log_id())); + assert_eq!(Some(log_id_0(1, 3)), ent.map(|x| x.get_log_id().clone())); let ent = store.try_get_log_entry(0).await?; - assert_eq!(None, ent.map(|x| *x.get_log_id())); + assert_eq!(None, ent.map(|x| x.get_log_id().clone())); let ent = store.try_get_log_entry(11).await?; - assert_eq!(None, ent.map(|x| *x.get_log_id())); + assert_eq!(None, ent.map(|x| x.get_log_id().clone())); Ok(()) } @@ -1178,7 +1178,7 @@ where let snapshot_last_log_id = Some(log_id_0(3, 3)); let snapshot_last_membership = StoredMembership::new(Some(log_id_0(1, 2)), Membership::new(vec![btreeset![1, 2, 3]], None)); - let snapshot_applied_state = (snapshot_last_log_id, snapshot_last_membership.clone()); + let snapshot_applied_state = (snapshot_last_log_id.clone(), snapshot_last_membership.clone()); tracing::info!("--- build and get snapshot on leader state machine"); let ss1 = sm_l.get_snapshot_builder().await.build_snapshot().await?; diff --git a/openraft/src/vote/leader_id/leader_id_adv.rs b/openraft/src/vote/leader_id/leader_id_adv.rs index b31328c8d..929f1c8ec 100644 --- a/openraft/src/vote/leader_id/leader_id_adv.rs +++ b/openraft/src/vote/leader_id/leader_id_adv.rs @@ -31,12 +31,12 @@ impl LeaderId { } pub fn voted_for(&self) -> Option { - Some(self.node_id) + Some(self.node_id.clone()) } #[allow(clippy::wrong_self_convention)] pub(crate) fn to_committed(&self) -> CommittedLeaderId { - *self + self.clone() } /// Return if it is the same leader as the committed leader id. diff --git a/openraft/src/vote/leader_id/leader_id_std.rs b/openraft/src/vote/leader_id/leader_id_std.rs index acfbcb729..832924fe7 100644 --- a/openraft/src/vote/leader_id/leader_id_std.rs +++ b/openraft/src/vote/leader_id/leader_id_std.rs @@ -52,7 +52,7 @@ impl LeaderId { } pub fn voted_for(&self) -> Option { - self.voted_for + self.voted_for.clone() } #[allow(clippy::wrong_self_convention)] @@ -99,12 +99,13 @@ impl CommittedLeaderId { #[cfg(test)] #[allow(clippy::nonminimal_bool)] mod tests { - use crate::CommittedLeaderId; use crate::LeaderId; #[cfg(feature = "serde")] #[test] fn test_committed_leader_id_serde() -> anyhow::Result<()> { + use crate::CommittedLeaderId; + let c = CommittedLeaderId::::new(5, 10); let s = serde_json::to_string(&c)?; assert_eq!(r#"5"#, s); diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 257bab5c5..cc8a850d6 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -486,7 +486,7 @@ impl TypedRaftRouter { }; tracing::info!( - node_id = display(node_id), + node_id = display(&node_id), members = debug(&members), "initializing cluster" );