Skip to content

Commit

Permalink
Change: change Vote<NID:NodeId> to Vote<C:RaftTypeConfig>
Browse files Browse the repository at this point in the history
This refactoring moves Vote from a per-NodeId type to a per-TypeConfig type,
to make it consistent with `RaftTypeConfig` usage across the codebase.

- Part of: databendlabs#1278

Upgrade tip:

Vote is now parameterized by `RaftTypeConfig` instead of `NodeId`

- Change `Vote<NodeId>` to `Vote<C> where C: RaftTypeConfig`, for
  example, change `Vote<u64>` to `Vote<YourTypeConfig>`.
  • Loading branch information
drmingdrmer committed Dec 24, 2024
1 parent 409ef4b commit b6d2c7e
Show file tree
Hide file tree
Showing 47 changed files with 146 additions and 141 deletions.
6 changes: 3 additions & 3 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub struct StateMachine {
}

pub struct LogStore {
vote: RwLock<Option<Vote<NodeId>>>,
vote: RwLock<Option<Vote<TypeConfig>>>,
log: RwLock<BTreeMap<u64, Entry<TypeConfig>>>,
last_purged_log_id: RwLock<Option<LogId<NodeId>>>,
}
Expand Down Expand Up @@ -116,7 +116,7 @@ impl RaftLogReader<TypeConfig> for Arc<LogStore> {
Ok(entries)
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<TypeConfig>> {
async fn read_vote(&mut self) -> Result<Option<Vote<TypeConfig>>, StorageError<TypeConfig>> {
Ok(self.vote.read().await.clone())
}
}
Expand Down Expand Up @@ -196,7 +196,7 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<TypeConfig>> {
async fn save_vote(&mut self, vote: &Vote<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
let mut v = self.vote.write().await;
*v = Some(*vote);
Ok(())
Expand Down
10 changes: 5 additions & 5 deletions examples/memstore/src/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct LogStoreInner<C: RaftTypeConfig> {
committed: Option<LogId<C::NodeId>>,

/// The current granted vote.
vote: Option<Vote<C::NodeId>>,
vote: Option<Vote<C>>,
}

impl<C: RaftTypeConfig> Default for LogStoreInner<C> {
Expand Down Expand Up @@ -84,12 +84,12 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
Ok(self.committed.clone())
}

async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C>> {
async fn save_vote(&mut self, vote: &Vote<C>) -> Result<(), StorageError<C>> {
self.vote = Some(vote.clone());
Ok(())
}

async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C>> {
async fn read_vote(&mut self) -> Result<Option<Vote<C>>, StorageError<C>> {
Ok(self.vote.clone())
}

Expand Down Expand Up @@ -157,7 +157,7 @@ mod impl_log_store {
inner.try_get_log_entries(range).await
}

async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C>> {
async fn read_vote(&mut self) -> Result<Option<Vote<C>>, StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.read_vote().await
}
Expand All @@ -183,7 +183,7 @@ mod impl_log_store {
inner.read_committed().await
}

async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C>> {
async fn save_vote(&mut self, vote: &Vote<C>) -> Result<(), StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.save_vote(vote).await
}
Expand Down
3 changes: 1 addition & 2 deletions examples/raft-kv-memstore-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ pub mod protobuf {

pub mod typ {

use crate::NodeId;
use crate::TypeConfig;

pub type Vote = openraft::Vote<NodeId>;
pub type Vote = openraft::Vote<TypeConfig>;
pub type SnapshotMeta = openraft::SnapshotMeta<TypeConfig>;
pub type SnapshotData = <TypeConfig as openraft::RaftTypeConfig>::SnapshotData;
pub type Snapshot = openraft::Snapshot<TypeConfig>;
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-grpc/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl RaftNetworkV2<TypeConfig> for NetworkConnection {

async fn full_snapshot(
&mut self,
vote: openraft::Vote<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
vote: openraft::Vote<TypeConfig>,
snapshot: openraft::Snapshot<TypeConfig>,
_cancel: impl std::future::Future<Output = openraft::error::ReplicationClosed> + openraft::OptionalSend + 'static,
_option: RPCOption,
Expand Down
4 changes: 1 addition & 3 deletions examples/raft-kv-memstore-network-v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ pub type LogStore = store::LogStore;
pub type StateMachineStore = store::StateMachineStore;

pub mod typ {

use crate::NodeId;
use crate::TypeConfig;

pub type Raft = openraft::Raft<TypeConfig>;

pub type Vote = openraft::Vote<NodeId>;
pub type Vote = openraft::Vote<TypeConfig>;
pub type SnapshotMeta = openraft::SnapshotMeta<TypeConfig>;
pub type SnapshotData = <TypeConfig as openraft::RaftTypeConfig>::SnapshotData;
pub type Snapshot = openraft::Snapshot<TypeConfig>;
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-network-v2/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl RaftNetworkV2<TypeConfig> for Connection {
/// A real application should replace this method with customized implementation.
async fn full_snapshot(
&mut self,
vote: Vote<NodeId>,
vote: Vote<TypeConfig>,
snapshot: Snapshot<TypeConfig>,
_cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
_option: RPCOption,
Expand Down
3 changes: 1 addition & 2 deletions examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@ pub type StateMachineStore = store::StateMachineStore;

pub mod typ {

use crate::NodeId;
use crate::TypeConfig;

pub type Raft = openraft::Raft<TypeConfig>;

pub type Vote = openraft::Vote<NodeId>;
pub type Vote = openraft::Vote<TypeConfig>;
pub type SnapshotMeta = openraft::SnapshotMeta<TypeConfig>;
pub type SnapshotData = <TypeConfig as openraft::RaftTypeConfig>::SnapshotData;
pub type Snapshot = openraft::Snapshot<TypeConfig>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl RaftNetworkV2<TypeConfig> for Connection {
/// A real application should replace this method with customized implementation.
async fn full_snapshot(
&mut self,
vote: Vote<NodeId>,
vote: Vote<TypeConfig>,
snapshot: Snapshot<TypeConfig>,
_cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
_option: RPCOption,
Expand Down
6 changes: 3 additions & 3 deletions examples/raft-kv-memstore-singlethreaded/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ pub struct LogStore {
committed: RefCell<Option<LogId<NodeId>>>,

/// The current granted vote.
vote: RefCell<Option<Vote<NodeId>>>,
vote: RefCell<Option<Vote<TypeConfig>>>,
}

impl RaftLogReader<TypeConfig> for Rc<LogStore> {
Expand All @@ -129,7 +129,7 @@ impl RaftLogReader<TypeConfig> for Rc<LogStore> {
Ok(response)
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<TypeConfig>> {
async fn read_vote(&mut self) -> Result<Option<Vote<TypeConfig>>, StorageError<TypeConfig>> {
Ok(*self.vote.borrow())
}
}
Expand Down Expand Up @@ -312,7 +312,7 @@ impl RaftLogStorage<TypeConfig> for Rc<LogStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<TypeConfig>> {
async fn save_vote(&mut self, vote: &Vote<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
let mut v = self.vote.borrow_mut();
*v = Some(*vote);
Ok(())
Expand Down
8 changes: 4 additions & 4 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl LogStore {
.and_then(|v| serde_json::from_slice(&v).ok()))
}

fn set_vote_(&self, vote: &Vote<NodeId>) -> StorageResult<()> {
fn set_vote_(&self, vote: &Vote<TypeConfig>) -> StorageResult<()> {
self.db
.put_cf(self.store(), b"vote", serde_json::to_vec(vote).unwrap())
.map_err(|e| StorageError::write_vote(&e))?;
Expand All @@ -346,7 +346,7 @@ impl LogStore {
Ok(())
}

fn get_vote_(&self) -> StorageResult<Option<Vote<NodeId>>> {
fn get_vote_(&self) -> StorageResult<Option<Vote<TypeConfig>>> {
Ok(self
.db
.get_cf(self.store(), b"vote")
Expand Down Expand Up @@ -381,7 +381,7 @@ impl RaftLogReader<TypeConfig> for LogStore {
.collect()
}

async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<TypeConfig>> {
async fn read_vote(&mut self) -> Result<Option<Vote<TypeConfig>>, StorageError<TypeConfig>> {
self.get_vote_()
}
}
Expand Down Expand Up @@ -418,7 +418,7 @@ impl RaftLogStorage<TypeConfig> for LogStore {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<TypeConfig>> {
async fn save_vote(&mut self, vote: &Vote<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
self.set_vote_(vote)
}

Expand Down
2 changes: 1 addition & 1 deletion openraft/src/core/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ where C: RaftTypeConfig
target: C::NodeId,

/// The higher vote observed.
higher: Vote<C::NodeId>,
higher: Vote<C>,

/// The Leader that sent replication request.
leader_vote: CommittedVote<C>,
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ where C: RaftTypeConfig
},

InstallFullSnapshot {
vote: Vote<C::NodeId>,
vote: Vote<C>,
snapshot: Snapshot<C>,
tx: ResultSender<C, SnapshotResponse<C>>,
},
Expand Down Expand Up @@ -101,7 +101,7 @@ where C: RaftTypeConfig
/// Otherwise, just reset Leader lease so that the node `to` can become Leader.
HandleTransferLeader {
/// The vote of the Leader that is transferring the leadership.
from: Vote<C::NodeId>,
from: Vote<C>,
/// The assigned node to be the next Leader.
to: C::NodeId,
},
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ where C: RaftTypeConfig
},

/// Save vote to storage
SaveVote { vote: Vote<C::NodeId> },
SaveVote { vote: Vote<C> },

/// Send vote to all other members
SendVote { vote_req: VoteRequest<C> },
Expand Down
8 changes: 4 additions & 4 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ where C: RaftTypeConfig
///
/// The candidate `last_log_id` is initialized with the attributes of Acceptor part:
/// [`RaftState`]
pub(crate) fn new_candidate(&mut self, vote: Vote<C::NodeId>) -> &mut Candidate<C, LeaderQuorumSet<C>> {
pub(crate) fn new_candidate(&mut self, vote: Vote<C>) -> &mut Candidate<C, LeaderQuorumSet<C>> {
let now = C::now();
let last_log_id = self.state.last_log_id().cloned();

Expand Down Expand Up @@ -378,7 +378,7 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_append_entries(
&mut self,
vote: &Vote<C::NodeId>,
vote: &Vote<C>,
prev_log_id: Option<LogId<C::NodeId>>,
entries: Vec<C::Entry>,
tx: Option<AppendEntriesTx<C>>,
Expand Down Expand Up @@ -417,7 +417,7 @@ where C: RaftTypeConfig

pub(crate) fn append_entries(
&mut self,
vote: &Vote<C::NodeId>,
vote: &Vote<C>,
prev_log_id: Option<LogId<C::NodeId>>,
entries: Vec<C::Entry>,
) -> Result<(), RejectAppendEntries<C>> {
Expand Down Expand Up @@ -451,7 +451,7 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_install_full_snapshot(
&mut self,
vote: Vote<C::NodeId>,
vote: Vote<C>,
snapshot: Snapshot<C>,
tx: ResultSender<C, SnapshotResponse<C>>,
) {
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn accept_vote<T, E, F>(
&mut self,
vote: &Vote<C::NodeId>,
vote: &Vote<C>,
tx: ResultSender<C, T, E>,
f: F,
) -> Option<ResultSender<C, T, E>>
Expand Down Expand Up @@ -98,7 +98,7 @@ where C: RaftTypeConfig
/// Note: This method does not check last-log-id. handle-vote-request has to deal with
/// last-log-id itself.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), RejectVoteRequest<C>> {
pub(crate) fn update_vote(&mut self, vote: &Vote<C>) -> Result<(), RejectVoteRequest<C>> {
// Partial ord compare:
// Vote does not have to be total ord.
// `!(a >= b)` does not imply `a < b`.
Expand Down
10 changes: 5 additions & 5 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,8 @@ where
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[error("seen a higher vote: {higher} GT mine: {sender_vote}")]
pub(crate) struct HigherVote<C: RaftTypeConfig> {
pub(crate) higher: Vote<C::NodeId>,
pub(crate) sender_vote: Vote<C::NodeId>,
pub(crate) higher: Vote<C>,
pub(crate) sender_vote: Vote<C>,
}

/// Error that indicates a **temporary** network error and when it is returned, Openraft will retry
Expand Down Expand Up @@ -603,7 +603,7 @@ pub struct LearnerNotFound<C: RaftTypeConfig> {
#[error("not allowed to initialize due to current raft state: last_log_id: {last_log_id:?} vote: {vote}")]
pub struct NotAllowed<C: RaftTypeConfig> {
pub last_log_id: Option<LogId<C::NodeId>>,
pub vote: Vote<C::NodeId>,
pub vote: Vote<C>,
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
Expand Down Expand Up @@ -636,7 +636,7 @@ pub enum NoForward {}
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub(crate) enum RejectVoteRequest<C: RaftTypeConfig> {
#[error("reject vote request by a greater vote: {0}")]
ByVote(Vote<C::NodeId>),
ByVote(Vote<C>),

#[allow(dead_code)]
#[error("reject vote request by a greater last-log-id: {0:?}")]
Expand All @@ -659,7 +659,7 @@ where C: RaftTypeConfig
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub(crate) enum RejectAppendEntries<C: RaftTypeConfig> {
#[error("reject AppendEntries by a greater vote: {0}")]
ByVote(Vote<C::NodeId>),
ByVote(Vote<C>),

#[error("reject AppendEntries because of conflicting log-id: {local:?}; expect to be: {expect:?}")]
ByConflictingLogId {
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/metrics/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum Metric<C>
where C: RaftTypeConfig
{
Term(u64),
Vote(Vote<C::NodeId>),
Vote(Vote<C>),
LastLogIndex(Option<u64>),
Applied(Option<LogId<C::NodeId>>),
AppliedIndex(Option<u64>),
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
pub current_term: u64,

/// The last flushed vote.
pub vote: Vote<C::NodeId>,
pub vote: Vote<C>,

/// The last log index has been appended to this Raft node's log.
pub last_log_index: Option<u64>,
Expand Down Expand Up @@ -280,7 +280,7 @@ where C: RaftTypeConfig
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct RaftServerMetrics<C: RaftTypeConfig> {
pub id: C::NodeId,
pub vote: Vote<C::NodeId>,
pub vote: Vote<C>,
pub state: ServerState,
pub current_leader: Option<C::NodeId>,

Expand Down
2 changes: 1 addition & 1 deletion openraft/src/metrics/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ where C: RaftTypeConfig

/// Wait for `vote` to become `want` or timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn vote(&self, want: Vote<C::NodeId>, msg: impl ToString) -> Result<RaftMetrics<C>, WaitError> {
pub async fn vote(&self, want: Vote<C>, msg: impl ToString) -> Result<RaftMetrics<C>, WaitError> {
self.eq(Metric::Vote(want), msg).await
}

Expand Down
4 changes: 2 additions & 2 deletions openraft/src/network/snapshot_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ mod tokio_rt {
{
async fn send_snapshot<Net>(
net: &mut Net,
vote: Vote<C::NodeId>,
vote: Vote<C>,
mut snapshot: Snapshot<C>,
mut cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
option: RPCOption,
Expand Down Expand Up @@ -299,7 +299,7 @@ pub trait SnapshotTransport<C: RaftTypeConfig> {
// TODO: consider removing dependency on RaftNetwork
async fn send_snapshot<Net>(
net: &mut Net,
vote: Vote<C::NodeId>,
vote: Vote<C>,
snapshot: Snapshot<C>,
cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
option: RPCOption,
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/network/v2/adapt_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ where

async fn full_snapshot(
&mut self,
vote: Vote<C::NodeId>,
vote: Vote<C>,
snapshot: Snapshot<C>,
cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
option: RPCOption,
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/network/v2/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ where C: RaftTypeConfig
/// [`Raft::install_full_snapshot()`]: crate::raft::Raft::install_full_snapshot
async fn full_snapshot(
&mut self,
vote: Vote<C::NodeId>,
vote: Vote<C>,
snapshot: Snapshot<C>,
cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
option: RPCOption,
Expand Down
Loading

0 comments on commit b6d2c7e

Please sign in to comment.