From f0b0aa558608ac3aab713675580018b819d30f93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sat, 14 Sep 2024 11:07:04 +0800 Subject: [PATCH] Refactor: Chunk read log entry and check range on startup - Implement chunk-based reading of committed log entries when re-applying to state machine upon startup. - Add validation for log entry indexes, to avoid applying wrong entries to state machine. --- memstore/Cargo.toml | 1 + openraft/src/storage/helper.rs | 61 ++++++++++++++++++++++++++++++--- openraft/src/storage_error.rs | 4 +++ sledstore/Cargo.toml | 3 ++ stores/rocksstore-v2/Cargo.toml | 3 ++ 5 files changed, 67 insertions(+), 5 deletions(-) diff --git a/memstore/Cargo.toml b/memstore/Cargo.toml index b0fac485a..e8f3b4a65 100644 --- a/memstore/Cargo.toml +++ b/memstore/Cargo.toml @@ -24,6 +24,7 @@ tracing = { workspace = true } [dev-dependencies] [features] +bt = ["openraft/bt"] [package.metadata.docs.rs] all-features = true diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index 2ecc950dc..1d082f2b5 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -1,13 +1,14 @@ use std::marker::PhantomData; use std::sync::Arc; +use anyerror::AnyError; + use crate::display_ext::DisplayOptionExt; use crate::engine::LogIdList; use crate::entry::RaftPayload; use crate::log_id::RaftLogId; use crate::raft_state::io_state::log_io_id::LogIOId; use crate::raft_state::IOState; -use crate::storage::RaftLogReaderExt; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; use crate::type_config::TypeConfigExt; @@ -16,10 +17,12 @@ use crate::AsyncRuntime; use crate::EffectiveMembership; use crate::LogIdOptionExt; use crate::MembershipState; +use crate::RaftLogReader; use crate::RaftSnapshotBuilder; use crate::RaftState; use crate::RaftTypeConfig; use crate::StorageError; +use crate::StorageIOError; use crate::StoredMembership; /// StorageHelper provides additional methods to access a [`RaftLogStorage`] and @@ -94,10 +97,7 @@ where let start = last_applied.next_index(); let end = committed.next_index(); - tracing::info!("re-apply log {}..{} to state machine", start, end); - - let entries = self.log_store.get_log_entries(start..end).await?; - self.state_machine.apply(entries).await?; + self.reapply_committed(start, end).await?; last_applied = committed; } @@ -174,6 +174,57 @@ where }) } + /// Read log entries from [`RaftLogReader`] in chunks, and apply them to the state machine. + pub(crate) async fn reapply_committed(&mut self, mut start: u64, end: u64) -> Result<(), StorageError> { + let chunk_size = 64; + + tracing::info!( + "re-apply log [{}..{}) in {} item chunks to state machine", + chunk_size, + start, + end + ); + + let mut log_reader = self.log_store.get_log_reader().await; + + while start < end { + let chunk_end = std::cmp::min(end, start + chunk_size); + let entries = log_reader.try_get_log_entries(start..chunk_end).await?; + + let first = entries.first().map(|x| x.get_log_id().index); + let last = entries.last().map(|x| x.get_log_id().index); + + let make_err = || { + let err = AnyError::error(format!( + "Failed to get log entries, expected index: [{}, {}), got [{:?}, {:?})", + start, chunk_end, first, last + )); + + tracing::error!("{}", err); + err + }; + + if first != Some(start) { + return Err(StorageIOError::read_log_at_index(start, make_err()).into()); + } + if last != Some(chunk_end - 1) { + return Err(StorageIOError::read_log_at_index(chunk_end - 1, make_err()).into()); + } + + tracing::info!( + "re-apply {} log entries: [{}, {}),", + chunk_end - start, + start, + chunk_end + ); + self.state_machine.apply(entries).await?; + + start = chunk_end; + } + + Ok(()) + } + /// Returns the last 2 membership config found in log or state machine. /// /// A raft node needs to store at most 2 membership config log: diff --git a/openraft/src/storage_error.rs b/openraft/src/storage_error.rs index df65b7065..3ad10a704 100644 --- a/openraft/src/storage_error.rs +++ b/openraft/src/storage_error.rs @@ -263,6 +263,10 @@ where NID: NodeId Self::new(ErrorSubject::Log(log_id), ErrorVerb::Write, source) } + pub fn read_log_at_index(log_index: u64, source: impl Into) -> Self { + Self::new(ErrorSubject::LogIndex(log_index), ErrorVerb::Read, source) + } + pub fn read_log_entry(log_id: LogId, source: impl Into) -> Self { Self::new(ErrorSubject::Log(log_id), ErrorVerb::Read, source) } diff --git a/sledstore/Cargo.toml b/sledstore/Cargo.toml index f4cf7a10f..b33e6738d 100644 --- a/sledstore/Cargo.toml +++ b/sledstore/Cargo.toml @@ -26,5 +26,8 @@ tracing = "0.1.29" [dev-dependencies] tempfile = { version = "3.4.0" } +[features] +bt = ["openraft/bt"] + [package.metadata.docs.rs] all-features = true diff --git a/stores/rocksstore-v2/Cargo.toml b/stores/rocksstore-v2/Cargo.toml index 716e47632..e31e03eb9 100644 --- a/stores/rocksstore-v2/Cargo.toml +++ b/stores/rocksstore-v2/Cargo.toml @@ -30,5 +30,8 @@ tracing = "0.1.29" [dev-dependencies] tempfile = { version = "3.4.0" } +[features] +bt = ["openraft/bt"] + [package.metadata.docs.rs] all-features = true