Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change: change Vote<NID:NodeId> to Vote<C:RaftTypeConfig> #1279

Merged
merged 1 commit into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub struct StateMachine {
}

pub struct LogStore {
vote: RwLock<Option<Vote<NodeId>>>,
vote: RwLock<Option<Vote<TypeConfig>>>,
log: RwLock<BTreeMap<u64, Entry<TypeConfig>>>,
last_purged_log_id: RwLock<Option<LogId<NodeId>>>,
}
Expand Down Expand Up @@ -116,7 +116,7 @@ impl RaftLogReader<TypeConfig> for Arc<LogStore> {
Ok(entries)
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<TypeConfig>> {
async fn read_vote(&mut self) -> Result<Option<Vote<TypeConfig>>, StorageError<TypeConfig>> {
Ok(self.vote.read().await.clone())
}
}
Expand Down Expand Up @@ -196,7 +196,7 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<TypeConfig>> {
async fn save_vote(&mut self, vote: &Vote<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
let mut v = self.vote.write().await;
*v = Some(*vote);
Ok(())
Expand Down
10 changes: 5 additions & 5 deletions examples/memstore/src/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct LogStoreInner<C: RaftTypeConfig> {
committed: Option<LogId<C::NodeId>>,

/// The current granted vote.
vote: Option<Vote<C::NodeId>>,
vote: Option<Vote<C>>,
}

impl<C: RaftTypeConfig> Default for LogStoreInner<C> {
Expand Down Expand Up @@ -84,12 +84,12 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
Ok(self.committed.clone())
}

async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C>> {
async fn save_vote(&mut self, vote: &Vote<C>) -> Result<(), StorageError<C>> {
self.vote = Some(vote.clone());
Ok(())
}

async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C>> {
async fn read_vote(&mut self) -> Result<Option<Vote<C>>, StorageError<C>> {
Ok(self.vote.clone())
}

Expand Down Expand Up @@ -157,7 +157,7 @@ mod impl_log_store {
inner.try_get_log_entries(range).await
}

async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C>> {
async fn read_vote(&mut self) -> Result<Option<Vote<C>>, StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.read_vote().await
}
Expand All @@ -183,7 +183,7 @@ mod impl_log_store {
inner.read_committed().await
}

async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C>> {
async fn save_vote(&mut self, vote: &Vote<C>) -> Result<(), StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.save_vote(vote).await
}
Expand Down
3 changes: 1 addition & 2 deletions examples/raft-kv-memstore-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ pub mod protobuf {

pub mod typ {

use crate::NodeId;
use crate::TypeConfig;

pub type Vote = openraft::Vote<NodeId>;
pub type Vote = openraft::Vote<TypeConfig>;
pub type SnapshotMeta = openraft::SnapshotMeta<TypeConfig>;
pub type SnapshotData = <TypeConfig as openraft::RaftTypeConfig>::SnapshotData;
pub type Snapshot = openraft::Snapshot<TypeConfig>;
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-grpc/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl RaftNetworkV2<TypeConfig> for NetworkConnection {

async fn full_snapshot(
&mut self,
vote: openraft::Vote<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
vote: openraft::Vote<TypeConfig>,
snapshot: openraft::Snapshot<TypeConfig>,
_cancel: impl std::future::Future<Output = openraft::error::ReplicationClosed> + openraft::OptionalSend + 'static,
_option: RPCOption,
Expand Down
4 changes: 1 addition & 3 deletions examples/raft-kv-memstore-network-v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ pub type LogStore = store::LogStore;
pub type StateMachineStore = store::StateMachineStore;

pub mod typ {

use crate::NodeId;
use crate::TypeConfig;

pub type Raft = openraft::Raft<TypeConfig>;

pub type Vote = openraft::Vote<NodeId>;
pub type Vote = openraft::Vote<TypeConfig>;
pub type SnapshotMeta = openraft::SnapshotMeta<TypeConfig>;
pub type SnapshotData = <TypeConfig as openraft::RaftTypeConfig>::SnapshotData;
pub type Snapshot = openraft::Snapshot<TypeConfig>;
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-network-v2/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl RaftNetworkV2<TypeConfig> for Connection {
/// A real application should replace this method with customized implementation.
async fn full_snapshot(
&mut self,
vote: Vote<NodeId>,
vote: Vote<TypeConfig>,
snapshot: Snapshot<TypeConfig>,
_cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
_option: RPCOption,
Expand Down
3 changes: 1 addition & 2 deletions examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@ pub type StateMachineStore = store::StateMachineStore;

pub mod typ {

use crate::NodeId;
use crate::TypeConfig;

pub type Raft = openraft::Raft<TypeConfig>;

pub type Vote = openraft::Vote<NodeId>;
pub type Vote = openraft::Vote<TypeConfig>;
pub type SnapshotMeta = openraft::SnapshotMeta<TypeConfig>;
pub type SnapshotData = <TypeConfig as openraft::RaftTypeConfig>::SnapshotData;
pub type Snapshot = openraft::Snapshot<TypeConfig>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl RaftNetworkV2<TypeConfig> for Connection {
/// A real application should replace this method with customized implementation.
async fn full_snapshot(
&mut self,
vote: Vote<NodeId>,
vote: Vote<TypeConfig>,
snapshot: Snapshot<TypeConfig>,
_cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
_option: RPCOption,
Expand Down
6 changes: 3 additions & 3 deletions examples/raft-kv-memstore-singlethreaded/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ pub struct LogStore {
committed: RefCell<Option<LogId<NodeId>>>,

/// The current granted vote.
vote: RefCell<Option<Vote<NodeId>>>,
vote: RefCell<Option<Vote<TypeConfig>>>,
}

impl RaftLogReader<TypeConfig> for Rc<LogStore> {
Expand All @@ -129,7 +129,7 @@ impl RaftLogReader<TypeConfig> for Rc<LogStore> {
Ok(response)
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<TypeConfig>> {
async fn read_vote(&mut self) -> Result<Option<Vote<TypeConfig>>, StorageError<TypeConfig>> {
Ok(*self.vote.borrow())
}
}
Expand Down Expand Up @@ -312,7 +312,7 @@ impl RaftLogStorage<TypeConfig> for Rc<LogStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<TypeConfig>> {
async fn save_vote(&mut self, vote: &Vote<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
let mut v = self.vote.borrow_mut();
*v = Some(*vote);
Ok(())
Expand Down
8 changes: 4 additions & 4 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl LogStore {
.and_then(|v| serde_json::from_slice(&v).ok()))
}

fn set_vote_(&self, vote: &Vote<NodeId>) -> StorageResult<()> {
fn set_vote_(&self, vote: &Vote<TypeConfig>) -> StorageResult<()> {
self.db
.put_cf(self.store(), b"vote", serde_json::to_vec(vote).unwrap())
.map_err(|e| StorageError::write_vote(&e))?;
Expand All @@ -346,7 +346,7 @@ impl LogStore {
Ok(())
}

fn get_vote_(&self) -> StorageResult<Option<Vote<NodeId>>> {
fn get_vote_(&self) -> StorageResult<Option<Vote<TypeConfig>>> {
Ok(self
.db
.get_cf(self.store(), b"vote")
Expand Down Expand Up @@ -381,7 +381,7 @@ impl RaftLogReader<TypeConfig> for LogStore {
.collect()
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<TypeConfig>> {
async fn read_vote(&mut self) -> Result<Option<Vote<TypeConfig>>, StorageError<TypeConfig>> {
self.get_vote_()
}
}
Expand Down Expand Up @@ -418,7 +418,7 @@ impl RaftLogStorage<TypeConfig> for LogStore {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<TypeConfig>> {
async fn save_vote(&mut self, vote: &Vote<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
self.set_vote_(vote)
}

Expand Down
2 changes: 1 addition & 1 deletion openraft/src/core/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ where C: RaftTypeConfig
target: C::NodeId,

/// The higher vote observed.
higher: Vote<C::NodeId>,
higher: Vote<C>,

/// The Leader that sent replication request.
leader_vote: CommittedVote<C>,
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ where C: RaftTypeConfig
},

InstallFullSnapshot {
vote: Vote<C::NodeId>,
vote: Vote<C>,
snapshot: Snapshot<C>,
tx: ResultSender<C, SnapshotResponse<C>>,
},
Expand Down Expand Up @@ -101,7 +101,7 @@ where C: RaftTypeConfig
/// Otherwise, just reset Leader lease so that the node `to` can become Leader.
HandleTransferLeader {
/// The vote of the Leader that is transferring the leadership.
from: Vote<C::NodeId>,
from: Vote<C>,
/// The assigned node to be the next Leader.
to: C::NodeId,
},
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ where C: RaftTypeConfig
},

/// Save vote to storage
SaveVote { vote: Vote<C::NodeId> },
SaveVote { vote: Vote<C> },

/// Send vote to all other members
SendVote { vote_req: VoteRequest<C> },
Expand Down
8 changes: 4 additions & 4 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ where C: RaftTypeConfig
///
/// The candidate `last_log_id` is initialized with the attributes of Acceptor part:
/// [`RaftState`]
pub(crate) fn new_candidate(&mut self, vote: Vote<C::NodeId>) -> &mut Candidate<C, LeaderQuorumSet<C>> {
pub(crate) fn new_candidate(&mut self, vote: Vote<C>) -> &mut Candidate<C, LeaderQuorumSet<C>> {
let now = C::now();
let last_log_id = self.state.last_log_id().cloned();

Expand Down Expand Up @@ -378,7 +378,7 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_append_entries(
&mut self,
vote: &Vote<C::NodeId>,
vote: &Vote<C>,
prev_log_id: Option<LogId<C::NodeId>>,
entries: Vec<C::Entry>,
tx: Option<AppendEntriesTx<C>>,
Expand Down Expand Up @@ -417,7 +417,7 @@ where C: RaftTypeConfig

pub(crate) fn append_entries(
&mut self,
vote: &Vote<C::NodeId>,
vote: &Vote<C>,
prev_log_id: Option<LogId<C::NodeId>>,
entries: Vec<C::Entry>,
) -> Result<(), RejectAppendEntries<C>> {
Expand Down Expand Up @@ -451,7 +451,7 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_install_full_snapshot(
&mut self,
vote: Vote<C::NodeId>,
vote: Vote<C>,
snapshot: Snapshot<C>,
tx: ResultSender<C, SnapshotResponse<C>>,
) {
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn accept_vote<T, E, F>(
&mut self,
vote: &Vote<C::NodeId>,
vote: &Vote<C>,
tx: ResultSender<C, T, E>,
f: F,
) -> Option<ResultSender<C, T, E>>
Expand Down Expand Up @@ -98,7 +98,7 @@ where C: RaftTypeConfig
/// Note: This method does not check last-log-id. handle-vote-request has to deal with
/// last-log-id itself.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), RejectVoteRequest<C>> {
pub(crate) fn update_vote(&mut self, vote: &Vote<C>) -> Result<(), RejectVoteRequest<C>> {
// Partial ord compare:
// Vote does not have to be total ord.
// `!(a >= b)` does not imply `a < b`.
Expand Down
10 changes: 5 additions & 5 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,8 @@ where
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[error("seen a higher vote: {higher} GT mine: {sender_vote}")]
pub(crate) struct HigherVote<C: RaftTypeConfig> {
pub(crate) higher: Vote<C::NodeId>,
pub(crate) sender_vote: Vote<C::NodeId>,
pub(crate) higher: Vote<C>,
pub(crate) sender_vote: Vote<C>,
}

/// Error that indicates a **temporary** network error and when it is returned, Openraft will retry
Expand Down Expand Up @@ -603,7 +603,7 @@ pub struct LearnerNotFound<C: RaftTypeConfig> {
#[error("not allowed to initialize due to current raft state: last_log_id: {last_log_id:?} vote: {vote}")]
pub struct NotAllowed<C: RaftTypeConfig> {
pub last_log_id: Option<LogId<C::NodeId>>,
pub vote: Vote<C::NodeId>,
pub vote: Vote<C>,
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
Expand Down Expand Up @@ -636,7 +636,7 @@ pub enum NoForward {}
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub(crate) enum RejectVoteRequest<C: RaftTypeConfig> {
#[error("reject vote request by a greater vote: {0}")]
ByVote(Vote<C::NodeId>),
ByVote(Vote<C>),

#[allow(dead_code)]
#[error("reject vote request by a greater last-log-id: {0:?}")]
Expand All @@ -659,7 +659,7 @@ where C: RaftTypeConfig
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub(crate) enum RejectAppendEntries<C: RaftTypeConfig> {
#[error("reject AppendEntries by a greater vote: {0}")]
ByVote(Vote<C::NodeId>),
ByVote(Vote<C>),

#[error("reject AppendEntries because of conflicting log-id: {local:?}; expect to be: {expect:?}")]
ByConflictingLogId {
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/metrics/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum Metric<C>
where C: RaftTypeConfig
{
Term(u64),
Vote(Vote<C::NodeId>),
Vote(Vote<C>),
LastLogIndex(Option<u64>),
Applied(Option<LogId<C::NodeId>>),
AppliedIndex(Option<u64>),
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
pub current_term: u64,

/// The last flushed vote.
pub vote: Vote<C::NodeId>,
pub vote: Vote<C>,

/// The last log index has been appended to this Raft node's log.
pub last_log_index: Option<u64>,
Expand Down Expand Up @@ -280,7 +280,7 @@ where C: RaftTypeConfig
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct RaftServerMetrics<C: RaftTypeConfig> {
pub id: C::NodeId,
pub vote: Vote<C::NodeId>,
pub vote: Vote<C>,
pub state: ServerState,
pub current_leader: Option<C::NodeId>,

Expand Down
2 changes: 1 addition & 1 deletion openraft/src/metrics/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ where C: RaftTypeConfig

/// Wait for `vote` to become `want` or timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn vote(&self, want: Vote<C::NodeId>, msg: impl ToString) -> Result<RaftMetrics<C>, WaitError> {
pub async fn vote(&self, want: Vote<C>, msg: impl ToString) -> Result<RaftMetrics<C>, WaitError> {
self.eq(Metric::Vote(want), msg).await
}

Expand Down
4 changes: 2 additions & 2 deletions openraft/src/network/snapshot_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ mod tokio_rt {
{
async fn send_snapshot<Net>(
net: &mut Net,
vote: Vote<C::NodeId>,
vote: Vote<C>,
mut snapshot: Snapshot<C>,
mut cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
option: RPCOption,
Expand Down Expand Up @@ -299,7 +299,7 @@ pub trait SnapshotTransport<C: RaftTypeConfig> {
// TODO: consider removing dependency on RaftNetwork
async fn send_snapshot<Net>(
net: &mut Net,
vote: Vote<C::NodeId>,
vote: Vote<C>,
snapshot: Snapshot<C>,
cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
option: RPCOption,
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/network/v2/adapt_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ where

async fn full_snapshot(
&mut self,
vote: Vote<C::NodeId>,
vote: Vote<C>,
snapshot: Snapshot<C>,
cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
option: RPCOption,
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/network/v2/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ where C: RaftTypeConfig
/// [`Raft::install_full_snapshot()`]: crate::raft::Raft::install_full_snapshot
async fn full_snapshot(
&mut self,
vote: Vote<C::NodeId>,
vote: Vote<C>,
snapshot: Snapshot<C>,
cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
option: RPCOption,
Expand Down
Loading
Loading