From 61d4a7918e1578a86404ee87d02ebc217a114fd1 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Wed, 11 Dec 2024 15:31:50 +0800 Subject: [PATCH] feat(barrier): support database failure isolation (part 1, meta) (#19664) --- proto/stream_service.proto | 22 + src/error/src/tonic/extra.rs | 25 ++ src/meta/src/barrier/checkpoint/control.rs | 215 +++++++-- src/meta/src/barrier/checkpoint/mod.rs | 3 +- src/meta/src/barrier/checkpoint/recovery.rs | 434 +++++++++++++++++++ src/meta/src/barrier/complete_task.rs | 23 +- src/meta/src/barrier/context/context_impl.rs | 8 +- src/meta/src/barrier/context/mod.rs | 1 - src/meta/src/barrier/context/recovery.rs | 1 - src/meta/src/barrier/mod.rs | 2 - src/meta/src/barrier/rpc.rs | 102 +++-- src/meta/src/barrier/worker.rs | 132 +++++- src/stream/src/task/barrier_manager.rs | 39 +- 13 files changed, 885 insertions(+), 122 deletions(-) create mode 100644 src/meta/src/barrier/checkpoint/recovery.rs diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 070f029cb1d31..9ddc4a6250e2c 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -89,22 +89,44 @@ message StreamingControlStreamRequest { uint32 database_id = 2; } + message ResetDatabaseRequest { + uint32 database_id = 1; + uint32 reset_request_id = 2; + } + oneof request { InitRequest init = 1; InjectBarrierRequest inject_barrier = 2; RemovePartialGraphRequest remove_partial_graph = 3; CreatePartialGraphRequest create_partial_graph = 4; + ResetDatabaseRequest reset_database = 5; } } +message ScoredError { + string err_msg = 1; + int32 score = 2; +} + message StreamingControlStreamResponse { message InitResponse {} message ShutdownResponse {} + message ReportDatabaseFailureResponse { + uint32 database_id = 1; + } + + message ResetDatabaseResponse { + uint32 database_id = 1; + optional ScoredError root_err = 2; + uint32 reset_request_id = 3; + } oneof response { InitResponse init = 1; BarrierCompleteResponse complete_barrier = 2; ShutdownResponse shutdown = 3; + ReportDatabaseFailureResponse report_database_failure = 4; + ResetDatabaseResponse reset_database = 5; } } diff --git a/src/error/src/tonic/extra.rs b/src/error/src/tonic/extra.rs index dbf6b80e29124..a98db8afdbb11 100644 --- a/src/error/src/tonic/extra.rs +++ b/src/error/src/tonic/extra.rs @@ -21,6 +21,31 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub struct Score(pub i32); +/// A error with a score, used to find the root cause of multiple failures. +#[derive(Debug, Clone)] +pub struct ScoredError { + pub error: E, + pub score: Score, +} + +impl std::fmt::Display for ScoredError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.error.fmt(f) + } +} + +impl std::error::Error for ScoredError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.error.source() + } + + fn provide<'a>(&'a self, request: &mut std::error::Request<'a>) { + self.error.provide(request); + // HIGHLIGHT: Provide the score to make it retrievable from meta service. + request.provide_value(self.score); + } +} + /// Extra fields in errors that can be passed through the gRPC boundary. /// /// - Field being set to `None` means it is not available. diff --git a/src/meta/src/barrier/checkpoint/control.rs b/src/meta/src/barrier/checkpoint/control.rs index 62375e0fac1af..28ee5aa0bfe32 100644 --- a/src/meta/src/barrier/checkpoint/control.rs +++ b/src/meta/src/barrier/checkpoint/control.rs @@ -14,7 +14,9 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; +use std::future::{poll_fn, Future}; use std::mem::take; +use std::task::Poll; use anyhow::anyhow; use fail::fail_point; @@ -24,10 +26,15 @@ use risingwave_meta_model::WorkerId; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::PausedReason; +use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse; use risingwave_pb::stream_service::BarrierCompleteResponse; use tracing::{debug, warn}; use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl}; +use crate::barrier::checkpoint::recovery::{ + DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning, + RecoveringStateAction, +}; use crate::barrier::checkpoint::state::BarrierWorkerState; use crate::barrier::command::CommandContext; use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask}; @@ -47,17 +54,25 @@ use crate::{MetaError, MetaResult}; #[derive(Default)] pub(crate) struct CheckpointControl { - databases: HashMap, - hummock_version_stats: HummockVersionStats, + pub(super) databases: HashMap, + pub(super) hummock_version_stats: HummockVersionStats, } impl CheckpointControl { pub(crate) fn new( - databases: HashMap, + databases: impl IntoIterator, hummock_version_stats: HummockVersionStats, ) -> Self { Self { - databases, + databases: databases + .into_iter() + .map(|(database_id, control)| { + ( + database_id, + DatabaseCheckpointControlStatus::Running(control), + ) + }) + .collect(), hummock_version_stats, } } @@ -68,6 +83,7 @@ impl CheckpointControl { self.databases .get_mut(&database_id) .expect("should exist") + .expect_running("should have wait for completing command before enter recovery") .ack_completed(command_prev_epoch, creating_job_epochs); } } @@ -78,6 +94,9 @@ impl CheckpointControl { ) -> Option { let mut task = None; for database in self.databases.values_mut() { + let Some(database) = database.running_state_mut() else { + continue; + }; let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c)); database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats); } @@ -90,22 +109,35 @@ impl CheckpointControl { control_stream_manager: &mut ControlStreamManager, ) -> MetaResult<()> { let database_id = DatabaseId::new(resp.database_id); - self.databases - .get_mut(&database_id) - .expect("should exist") - .barrier_collected(resp, control_stream_manager) + let database_status = self.databases.get_mut(&database_id).expect("should exist"); + match database_status { + DatabaseCheckpointControlStatus::Running(database) => { + database.barrier_collected(resp, control_stream_manager) + } + DatabaseCheckpointControlStatus::Recovering(state) => { + state.barrier_collected(resp); + Ok(()) + } + } } pub(crate) fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool { - self.databases - .values() - .all(|database| database.can_inject_barrier(in_flight_barrier_nums)) + self.databases.values().all(|database| { + database + .running_state() + .map(|database| database.can_inject_barrier(in_flight_barrier_nums)) + .unwrap_or(true) + }) } pub(crate) fn max_prev_epoch(&self) -> Option { self.databases .values() - .map(|database| database.state.in_flight_prev_epoch()) + .flat_map(|database| { + database + .running_state() + .map(|database| database.state.in_flight_prev_epoch()) + }) .max_by_key(|epoch| epoch.value()) .cloned() } @@ -126,7 +158,9 @@ impl CheckpointControl { let max_prev_epoch = self.max_prev_epoch(); let (database, max_prev_epoch) = match self.databases.entry(database_id) { Entry::Occupied(entry) => ( - entry.into_mut(), + entry + .into_mut() + .expect_running("should not have command when not running"), max_prev_epoch.expect("should exist when having some database"), ), Entry::Vacant(entry) => match &command { @@ -147,7 +181,12 @@ impl CheckpointControl { new_database.state.in_flight_prev_epoch().clone() }; control_stream_manager.add_partial_graph(database_id, None)?; - (entry.insert(new_database), max_prev_epoch) + ( + entry + .insert(DatabaseCheckpointControlStatus::Running(new_database)) + .expect_running("just initialized as running"), + max_prev_epoch, + ) } Command::Flush | Command::Pause(PausedReason::Manual) @@ -177,6 +216,9 @@ impl CheckpointControl { curr_epoch.clone(), )?; for database in self.databases.values_mut() { + let Some(database) = database.running_state_mut() else { + continue; + }; if database.database_id == database_id { continue; } @@ -192,11 +234,13 @@ impl CheckpointControl { } } else { let Some(max_prev_epoch) = self.max_prev_epoch() else { - assert!(self.databases.is_empty()); return Ok(()); }; let curr_epoch = max_prev_epoch.next(); for database in self.databases.values_mut() { + let Some(database) = database.running_state_mut() else { + continue; + }; database.handle_new_barrier( None, checkpoint, @@ -214,12 +258,16 @@ impl CheckpointControl { pub(crate) fn update_barrier_nums_metrics(&self) { self.databases .values() + .flat_map(|database| database.running_state()) .for_each(|database| database.update_barrier_nums_metrics()); } pub(crate) fn gen_ddl_progress(&self) -> HashMap { let mut progress = HashMap::new(); - for database_checkpoint_control in self.databases.values() { + for status in self.databases.values() { + let Some(database_checkpoint_control) = status.running_state() else { + continue; + }; // Progress of normal backfill progress.extend( database_checkpoint_control @@ -245,7 +293,16 @@ impl CheckpointControl { } pub(crate) fn is_failed_at_worker_err(&self, worker_id: WorkerId) -> bool { - for database_checkpoint_control in self.databases.values() { + for database_status in self.databases.values() { + let database_checkpoint_control = match database_status { + DatabaseCheckpointControlStatus::Running(control) => control, + DatabaseCheckpointControlStatus::Recovering(state) => { + if state.remaining_workers().contains(&worker_id) { + return true; + } + continue; + } + }; let failed_barrier = database_checkpoint_control.barrier_wait_collect_from_worker(worker_id as _); if failed_barrier.is_some() @@ -265,30 +322,134 @@ impl CheckpointControl { } pub(crate) fn clear_on_err(&mut self, err: &MetaError) { - for (_, node) in self - .databases - .values_mut() - .flat_map(|database| take(&mut database.command_ctx_queue)) - { + for (_, node) in self.databases.values_mut().flat_map(|status| { + status + .running_state_mut() + .map(|database| take(&mut database.command_ctx_queue)) + .into_iter() + .flatten() + }) { for notifier in node.notifiers { notifier.notify_failed(err.clone()); } node.enqueue_time.observe_duration(); } - self.databases - .values_mut() - .for_each(|database| database.create_mview_tracker.abort_all()); + self.databases.values_mut().for_each(|database| { + if let Some(database) = database.running_state_mut() { + database.create_mview_tracker.abort_all() + } + }); } pub(crate) fn subscriptions( &self, ) -> impl Iterator + '_ { - self.databases.iter().map(|(database_id, database)| { - (*database_id, &database.state.inflight_subscription_info) + self.databases.iter().flat_map(|(database_id, database)| { + database + .checkpoint_control() + .map(|database| (*database_id, &database.state.inflight_subscription_info)) + }) + } +} + +pub(crate) enum CheckpointControlEvent<'a> { + EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>), + EnteringRunning(DatabaseStatusAction<'a, EnterRunning>), +} + +impl CheckpointControl { + pub(crate) fn on_reset_database_resp( + &mut self, + worker_id: WorkerId, + resp: ResetDatabaseResponse, + ) { + let database_id = DatabaseId::new(resp.database_id); + match self.databases.get_mut(&database_id).expect("should exist") { + DatabaseCheckpointControlStatus::Running(_) => { + unreachable!("should not receive reset database resp when running") + } + DatabaseCheckpointControlStatus::Recovering(state) => { + state.on_reset_database_resp(worker_id, resp) + } + } + } + + pub(crate) fn next_event( + &mut self, + ) -> impl Future> + Send + '_ { + let mut this = Some(self); + poll_fn(move |cx| { + let Some(this_mut) = this.as_mut() else { + unreachable!("should not be polled after poll ready") + }; + for (&database_id, database_status) in &mut this_mut.databases { + match database_status { + DatabaseCheckpointControlStatus::Running(_) => {} + DatabaseCheckpointControlStatus::Recovering(state) => { + let poll_result = state.poll_next_event(cx); + if let Poll::Ready(action) = poll_result { + let this = this.take().expect("checked Some"); + return Poll::Ready(match action { + RecoveringStateAction::EnterInitializing(reset_workers) => { + CheckpointControlEvent::EnteringInitializing( + this.new_database_status_action( + database_id, + EnterInitializing(reset_workers), + ), + ) + } + RecoveringStateAction::EnterRunning => { + CheckpointControlEvent::EnteringRunning( + this.new_database_status_action(database_id, EnterRunning), + ) + } + }); + } + } + } + } + Poll::Pending }) } } +pub(crate) enum DatabaseCheckpointControlStatus { + Running(DatabaseCheckpointControl), + Recovering(DatabaseRecoveringState), +} + +impl DatabaseCheckpointControlStatus { + fn running_state(&self) -> Option<&DatabaseCheckpointControl> { + match self { + DatabaseCheckpointControlStatus::Running(state) => Some(state), + DatabaseCheckpointControlStatus::Recovering(_) => None, + } + } + + fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> { + match self { + DatabaseCheckpointControlStatus::Running(state) => Some(state), + DatabaseCheckpointControlStatus::Recovering(_) => None, + } + } + + fn checkpoint_control(&self) -> Option<&DatabaseCheckpointControl> { + match self { + DatabaseCheckpointControlStatus::Running(control) => Some(control), + DatabaseCheckpointControlStatus::Recovering(state) => state.checkpoint_control(), + } + } + + fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl { + match self { + DatabaseCheckpointControlStatus::Running(state) => state, + DatabaseCheckpointControlStatus::Recovering(_) => { + panic!("should be at running: {}", reason) + } + } + } +} + /// Controls the concurrent execution of commands. pub(crate) struct DatabaseCheckpointControl { database_id: DatabaseId, diff --git a/src/meta/src/barrier/checkpoint/mod.rs b/src/meta/src/barrier/checkpoint/mod.rs index f9840f5eb3873..beafde4218eeb 100644 --- a/src/meta/src/barrier/checkpoint/mod.rs +++ b/src/meta/src/barrier/checkpoint/mod.rs @@ -14,7 +14,8 @@ mod control; mod creating_job; +mod recovery; mod state; -pub(super) use control::{CheckpointControl, DatabaseCheckpointControl}; +pub(crate) use control::{CheckpointControl, CheckpointControlEvent, DatabaseCheckpointControl}; pub(super) use state::BarrierWorkerState; diff --git a/src/meta/src/barrier/checkpoint/recovery.rs b/src/meta/src/barrier/checkpoint/recovery.rs new file mode 100644 index 0000000000000..1e7457154cafa --- /dev/null +++ b/src/meta/src/barrier/checkpoint/recovery.rs @@ -0,0 +1,434 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::mem::{replace, take}; +use std::task::{Context, Poll}; + +use futures::FutureExt; +use risingwave_common::catalog::DatabaseId; +use risingwave_meta_model::WorkerId; +use risingwave_pb::stream_service::streaming_control_stream_response::{ + ReportDatabaseFailureResponse, ResetDatabaseResponse, +}; +use risingwave_pb::stream_service::BarrierCompleteResponse; +use thiserror_ext::AsReport; +use tracing::{info, warn}; + +use crate::barrier::checkpoint::control::DatabaseCheckpointControlStatus; +use crate::barrier::checkpoint::{CheckpointControl, DatabaseCheckpointControl}; +use crate::barrier::complete_task::BarrierCompleteOutput; +use crate::barrier::rpc::ControlStreamManager; +use crate::barrier::worker::{ + get_retry_backoff_strategy, RetryBackoffFuture, RetryBackoffStrategy, +}; +use crate::barrier::DatabaseRuntimeInfoSnapshot; +use crate::MetaResult; + +/// We can treat each database as a state machine of 3 states: `Running`, `Resetting` and `Initializing`. +/// The state transition can be triggered when receiving 3 variants of response: `ReportDatabaseFailure`, `BarrierComplete`, `DatabaseReset`. +/// The logic of state transition can be summarized as followed: +/// +/// `Running` +/// - on `ReportDatabaseFailure` +/// - wait for the inflight B`arrierCompletingTask` to finish if there is any, mark the database as blocked in command queue +/// - send `ResetDatabaseRequest` with `reset_request_id` as 0 to all CNs, and save `reset_request_id` and the set of nodes that need to collect response. +/// - enter `Resetting` state. +/// - on `BarrierComplete`: update the `DatabaseCheckpointControl`. +/// - on `DatabaseReset`: unreachable +/// `Resetting` +/// - on `ReportDatabaseFailure` or `BarrierComplete`: ignore +/// - on `DatabaseReset`: +/// - if the `reset_request_id` in the response is less than the saved `reset_request_id`, ignore +/// - otherwise, mark the CN as collected. +/// - when all CNs have collected the response: +/// - load the database runtime info from catalog manager and fragment manager +/// - inject the initial barrier to CNs, save the set of nodes that need to collect response +/// - enter `Initializing` state +/// `Initializing` +/// - on `BarrierComplete`: +/// - mark the CN as collected +/// - when all CNs have collected the response: enter Running +/// - on `ReportDatabaseFailure` +/// - increment the previously saved `reset_request_id`, and send `ResetDatabaseRequest` to all CNs +/// - enter `Resetting` +/// - on `DatabaseReset`: unreachable +enum DatabaseRecoveringStage { + Resetting { + remaining_workers: HashSet, + reset_workers: HashMap, + reset_request_id: u32, + backoff_future: Option, + }, + Initializing { + remaining_workers: HashSet, + database: DatabaseCheckpointControl, + prev_epoch: u64, + }, +} + +pub(crate) struct DatabaseRecoveringState { + stage: DatabaseRecoveringStage, + next_reset_request_id: u32, + retry_backoff_strategy: RetryBackoffStrategy, +} + +pub(super) enum RecoveringStateAction { + EnterInitializing(HashMap), + EnterRunning, +} + +impl DatabaseRecoveringState { + fn next_retry(&mut self) -> (RetryBackoffFuture, u32) { + let backoff_future = self + .retry_backoff_strategy + .next() + .expect("should not be empty"); + let request_id = self.next_reset_request_id; + self.next_reset_request_id += 1; + (backoff_future, request_id) + } + + pub(super) fn barrier_collected(&mut self, resp: BarrierCompleteResponse) { + match &mut self.stage { + DatabaseRecoveringStage::Resetting { .. } => { + // ignore the collected barrier on resetting or backoff + } + DatabaseRecoveringStage::Initializing { + remaining_workers, + prev_epoch, + .. + } => { + assert!(remaining_workers.remove(&(resp.worker_id as WorkerId))); + assert_eq!(resp.epoch, *prev_epoch); + } + } + } + + pub(super) fn remaining_workers(&self) -> &HashSet { + match &self.stage { + DatabaseRecoveringStage::Resetting { + remaining_workers, .. + } + | DatabaseRecoveringStage::Initializing { + remaining_workers, .. + } => remaining_workers, + } + } + + pub(super) fn on_reset_database_resp( + &mut self, + worker_id: WorkerId, + resp: ResetDatabaseResponse, + ) { + match &mut self.stage { + DatabaseRecoveringStage::Resetting { + remaining_workers, + reset_workers, + reset_request_id, + .. + } => { + if resp.reset_request_id < *reset_request_id { + info!( + database_id = resp.database_id, + worker_id, + received_request_id = resp.reset_request_id, + ongoing_request_id = reset_request_id, + "ignore stale reset response" + ); + } else { + assert_eq!(resp.reset_request_id, *reset_request_id); + assert!(remaining_workers.remove(&worker_id)); + reset_workers + .try_insert(worker_id, resp) + .expect("non-duplicate"); + } + } + DatabaseRecoveringStage::Initializing { .. } => { + unreachable!("all reset resp should have been received in Resetting") + } + } + } + + pub(super) fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll { + match &mut self.stage { + DatabaseRecoveringStage::Resetting { + remaining_workers, + reset_workers, + backoff_future: backoff_future_option, + .. + } => { + let pass_backoff = if let Some(backoff_future) = backoff_future_option { + if backoff_future.poll_unpin(cx).is_ready() { + *backoff_future_option = None; + true + } else { + false + } + } else { + true + }; + if pass_backoff && remaining_workers.is_empty() { + return Poll::Ready(RecoveringStateAction::EnterInitializing(take( + reset_workers, + ))); + } + } + DatabaseRecoveringStage::Initializing { + remaining_workers, .. + } => { + if remaining_workers.is_empty() { + return Poll::Ready(RecoveringStateAction::EnterRunning); + } + } + } + Poll::Pending + } + + pub(super) fn checkpoint_control(&self) -> Option<&DatabaseCheckpointControl> { + match &self.stage { + DatabaseRecoveringStage::Resetting { .. } => None, + DatabaseRecoveringStage::Initializing { database, .. } => Some(database), + } + } +} + +pub(crate) struct DatabaseStatusAction<'a, A> { + control: &'a mut CheckpointControl, + database_id: DatabaseId, + pub(crate) action: A, +} + +impl DatabaseStatusAction<'_, A> { + pub(crate) fn database_id(&self) -> DatabaseId { + self.database_id + } +} + +impl CheckpointControl { + pub(super) fn new_database_status_action( + &mut self, + database_id: DatabaseId, + action: A, + ) -> DatabaseStatusAction<'_, A> { + DatabaseStatusAction { + control: self, + database_id, + action, + } + } +} + +pub(crate) struct EnterReset; + +impl DatabaseStatusAction<'_, EnterReset> { + pub(crate) fn enter( + self, + barrier_complete_output: Option, + control_stream_manager: &mut ControlStreamManager, + ) { + if let Some(output) = barrier_complete_output { + self.control.ack_completed(output); + } + let database_status = self + .control + .databases + .get_mut(&self.database_id) + .expect("should exist"); + match database_status { + DatabaseCheckpointControlStatus::Running(_) => { + let reset_request_id = 0; + let remaining_workers = + control_stream_manager.reset_database(self.database_id, reset_request_id); + *database_status = + DatabaseCheckpointControlStatus::Recovering(DatabaseRecoveringState { + stage: DatabaseRecoveringStage::Resetting { + remaining_workers, + reset_workers: Default::default(), + reset_request_id, + backoff_future: None, + }, + next_reset_request_id: reset_request_id + 1, + retry_backoff_strategy: get_retry_backoff_strategy(), + }); + } + DatabaseCheckpointControlStatus::Recovering(state) => match state.stage { + DatabaseRecoveringStage::Resetting { .. } => { + unreachable!("should not enter resetting again") + } + DatabaseRecoveringStage::Initializing { .. } => { + let (backoff_future, reset_request_id) = state.next_retry(); + let remaining_workers = + control_stream_manager.reset_database(self.database_id, reset_request_id); + state.stage = DatabaseRecoveringStage::Resetting { + remaining_workers, + reset_workers: Default::default(), + reset_request_id, + backoff_future: Some(backoff_future), + }; + } + }, + } + } +} + +impl CheckpointControl { + pub(crate) fn on_report_failure( + &mut self, + resp: ReportDatabaseFailureResponse, + control_stream_manager: &mut ControlStreamManager, + ) -> Option> { + let database_id = DatabaseId::new(resp.database_id); + let database_status = self.databases.get_mut(&database_id).expect("should exist"); + match database_status { + DatabaseCheckpointControlStatus::Running(_) => { + Some(self.new_database_status_action(database_id, EnterReset)) + } + DatabaseCheckpointControlStatus::Recovering(state) => match state.stage { + DatabaseRecoveringStage::Resetting { .. } => { + // ignore reported failure during resetting or backoff. + None + } + DatabaseRecoveringStage::Initializing { .. } => { + warn!(database_id = database_id.database_id, ""); + let (backoff_future, reset_request_id) = state.next_retry(); + let remaining_workers = + control_stream_manager.reset_database(database_id, reset_request_id); + state.stage = DatabaseRecoveringStage::Resetting { + remaining_workers, + reset_workers: Default::default(), + reset_request_id, + backoff_future: Some(backoff_future), + }; + None + } + }, + } + } +} + +pub(crate) struct EnterInitializing(pub(crate) HashMap); + +impl DatabaseStatusAction<'_, EnterInitializing> { + pub(crate) fn enter( + self, + runtime_info: DatabaseRuntimeInfoSnapshot, + control_stream_manager: &mut ControlStreamManager, + ) { + let database_status = self + .control + .databases + .get_mut(&self.database_id) + .expect("should exist"); + let status = match database_status { + DatabaseCheckpointControlStatus::Running(_) => { + unreachable!("should not enter initializing when running") + } + DatabaseCheckpointControlStatus::Recovering(state) => match state.stage { + DatabaseRecoveringStage::Initializing { .. } => { + unreachable!("can only enter initializing when resetting") + } + DatabaseRecoveringStage::Resetting { .. } => state, + }, + }; + let DatabaseRuntimeInfoSnapshot { + database_fragment_info, + mut state_table_committed_epochs, + subscription_info, + mut stream_actors, + mut source_splits, + mut background_jobs, + } = runtime_info; + let result: MetaResult<_> = try { + control_stream_manager.add_partial_graph(self.database_id, None)?; + control_stream_manager.inject_database_initial_barrier( + self.database_id, + database_fragment_info, + &mut state_table_committed_epochs, + &mut stream_actors, + &mut source_splits, + &mut background_jobs, + subscription_info, + None, + &self.control.hummock_version_stats, + )? + }; + match result { + Ok((remaining_workers, database, prev_epoch)) => { + status.stage = DatabaseRecoveringStage::Initializing { + remaining_workers, + database, + prev_epoch, + }; + } + Err(e) => { + warn!(database_id = self.database_id.database_id,e = ?e.as_report(), "failed to inject initial barrier"); + let (backoff_future, reset_request_id) = status.next_retry(); + let remaining_workers = + control_stream_manager.reset_database(self.database_id, reset_request_id); + status.stage = DatabaseRecoveringStage::Resetting { + remaining_workers, + reset_workers: Default::default(), + reset_request_id, + backoff_future: Some(backoff_future), + }; + } + } + } + + pub(crate) fn remove(self) { + self.control + .databases + .remove(&self.database_id) + .expect("should exist"); + } +} + +pub(crate) struct EnterRunning; + +impl DatabaseStatusAction<'_, EnterRunning> { + pub(crate) fn enter(self) { + let database_status = self + .control + .databases + .get_mut(&self.database_id) + .expect("should exist"); + match database_status { + DatabaseCheckpointControlStatus::Running(_) => { + unreachable!("should not enter running again") + } + DatabaseCheckpointControlStatus::Recovering(state) => { + let temp_place_holder = DatabaseRecoveringStage::Resetting { + remaining_workers: Default::default(), + reset_workers: Default::default(), + reset_request_id: 0, + backoff_future: None, + }; + match replace(&mut state.stage, temp_place_holder) { + DatabaseRecoveringStage::Resetting { .. } => { + unreachable!("can only enter running during initializing") + } + DatabaseRecoveringStage::Initializing { + remaining_workers, + database, + .. + } => { + assert!(remaining_workers.is_empty()); + *database_status = DatabaseCheckpointControlStatus::Running(database); + } + } + } + } + } +} diff --git a/src/meta/src/barrier/complete_task.rs b/src/meta/src/barrier/complete_task.rs index bc3ad725898d5..01d0bdef429e5 100644 --- a/src/meta/src/barrier/complete_task.rs +++ b/src/meta/src/barrier/complete_task.rs @@ -205,12 +205,31 @@ impl CompletingTask { } } - self.next_completed_barrier_inner() + async move { + if !matches!(self, CompletingTask::Completing { .. }) { + return pending().await; + }; + self.next_completed_barrier_inner().await + } + } + + pub(super) async fn wait_completing_task( + &mut self, + ) -> MetaResult> { + match self { + CompletingTask::None => Ok(None), + CompletingTask::Completing { .. } => { + self.next_completed_barrier_inner().await.map(Some) + } + CompletingTask::Err(_) => { + unreachable!("should not be called on previous err") + } + } } async fn next_completed_barrier_inner(&mut self) -> MetaResult { let CompletingTask::Completing { join_handle, .. } = self else { - return pending().await; + unreachable!() }; { diff --git a/src/meta/src/barrier/context/context_impl.rs b/src/meta/src/barrier/context/context_impl.rs index 8fa96ba128ece..f1306a99d7823 100644 --- a/src/meta/src/barrier/context/context_impl.rs +++ b/src/meta/src/barrier/context/context_impl.rs @@ -49,7 +49,9 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl { database_id: Option, recovery_reason: RecoveryReason, ) { - self.set_status(BarrierManagerStatus::Recovering(recovery_reason)); + if database_id.is_none() { + self.set_status(BarrierManagerStatus::Recovering(recovery_reason)); + } // Mark blocked and abort buffered schedules, they might be dirty already. self.scheduled_barriers @@ -58,7 +60,9 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl { fn mark_ready(&self, database_id: Option) { self.scheduled_barriers.mark_ready(database_id); - self.set_status(BarrierManagerStatus::Running); + if database_id.is_none() { + self.set_status(BarrierManagerStatus::Running); + } } async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> { diff --git a/src/meta/src/barrier/context/mod.rs b/src/meta/src/barrier/context/mod.rs index 144153e6449ff..afc5b745eb627 100644 --- a/src/meta/src/barrier/context/mod.rs +++ b/src/meta/src/barrier/context/mod.rs @@ -71,7 +71,6 @@ pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static { async fn reload_runtime_info(&self) -> MetaResult; - #[expect(dead_code)] async fn reload_database_runtime_info( &self, database_id: DatabaseId, diff --git a/src/meta/src/barrier/context/recovery.rs b/src/meta/src/barrier/context/recovery.rs index 37646fae7b0be..03575cff699e3 100644 --- a/src/meta/src/barrier/context/recovery.rs +++ b/src/meta/src/barrier/context/recovery.rs @@ -275,7 +275,6 @@ impl GlobalBarrierWorkerContextImpl { } } - #[expect(dead_code)] pub(super) async fn reload_database_runtime_info_impl( &self, database_id: DatabaseId, diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 673a28e2b02a8..8a0d8136580ec 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -185,7 +185,6 @@ impl BarrierWorkerRuntimeInfoSnapshot { } } -#[expect(dead_code)] #[derive(Debug)] struct DatabaseRuntimeInfoSnapshot { database_fragment_info: InflightDatabaseInfo, @@ -197,7 +196,6 @@ struct DatabaseRuntimeInfoSnapshot { } impl DatabaseRuntimeInfoSnapshot { - #[expect(dead_code)] fn validate( &self, database_id: DatabaseId, diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index feb8bcd4b5914..2dbf0ed70fb1c 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -15,7 +15,7 @@ use std::collections::{HashMap, HashSet}; use std::error::Error; use std::future::poll_fn; -use std::task::Poll; +use std::task::{Context, Poll}; use std::time::Duration; use anyhow::anyhow; @@ -37,14 +37,13 @@ use risingwave_pb::stream_plan::{ }; use risingwave_pb::stream_service::streaming_control_stream_request::{ CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph, - RemovePartialGraphRequest, + RemovePartialGraphRequest, ResetDatabaseRequest, }; use risingwave_pb::stream_service::{ - streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse, - InjectBarrierRequest, StreamingControlStreamRequest, + streaming_control_stream_request, streaming_control_stream_response, InjectBarrierRequest, + StreamingControlStreamRequest, }; use risingwave_rpc_client::StreamingControlHandle; -use rw_futures_util::pending_on_none; use thiserror_ext::AsReport; use tokio::time::{sleep, timeout}; use tokio_retry::strategy::ExponentialBackoff; @@ -176,20 +175,22 @@ impl ControlStreamManager { *self = Self::new(self.env.clone()); } - async fn next_response( + fn poll_next_response( &mut self, - ) -> Option<( + cx: &mut Context<'_>, + ) -> Poll<( WorkerId, MetaResult, )> { if self.nodes.is_empty() { - return None; + return Poll::Pending; } - let (worker_id, result) = poll_fn(|cx| { + let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending; + { for (worker_id, node) in &mut self.nodes { match node.handle.response_stream.poll_next_unpin(cx) { Poll::Ready(result) => { - return Poll::Ready(( + poll_result = Poll::Ready(( *worker_id, result .ok_or_else(|| anyhow!("end of stream").into()) @@ -210,47 +211,35 @@ impl ControlStreamManager { resp => Ok(resp), } }) - }), + }) )); + break; } Poll::Pending => { continue; } } } - Poll::Pending - }) - .await; + }; - if let Err(err) = &result { + if let Poll::Ready((worker_id, Err(err))) = &poll_result { let node = self .nodes - .remove(&worker_id) + .remove(worker_id) .expect("should exist when get shutdown resp"); warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream"); } - Some((worker_id, result)) + poll_result } - pub(super) async fn next_collect_barrier_response( + pub(super) async fn next_response( &mut self, - ) -> (WorkerId, MetaResult) { - use streaming_control_stream_response::Response; - - { - let (worker_id, result) = pending_on_none(self.next_response()).await; - - ( - worker_id, - result.map(|resp| match resp { - Response::CompleteBarrier(resp) => resp, - Response::Shutdown(_) | Response::Init(_) => { - unreachable!("should be treated as error") - } - }), - ) - } + ) -> ( + WorkerId, + MetaResult, + ) { + poll_fn(|cx| self.poll_next_response(cx)).await } pub(super) async fn collect_errors( @@ -261,14 +250,17 @@ impl ControlStreamManager { let mut errors = vec![(worker_id, first_err)]; #[cfg(not(madsim))] { - let _ = timeout(COLLECT_ERROR_TIMEOUT, async { - while let Some((worker_id, result)) = self.next_response().await { - if let Err(e) = result { - errors.push((worker_id, e)); + { + let _ = timeout(COLLECT_ERROR_TIMEOUT, async { + while !self.nodes.is_empty() { + let (worker_id, result) = self.next_response().await; + if let Err(e) = result { + errors.push((worker_id, e)); + } } - } - }) - .await; + }) + .await; + } } tracing::debug!(?errors, "collected stream errors"); errors @@ -613,6 +605,34 @@ impl ControlStreamManager { } }) } + + pub(super) fn reset_database( + &mut self, + database_id: DatabaseId, + reset_request_id: u32, + ) -> HashSet { + self.nodes.iter().filter_map(|(worker_id, node)| { + if node.handle + .request_sender + .send(StreamingControlStreamRequest { + request: Some( + streaming_control_stream_request::Request::ResetDatabase( + ResetDatabaseRequest { + database_id: database_id.database_id, + reset_request_id, + }, + ), + ), + }) + .is_err() + { + warn!(worker_id, node = ?node.worker.host,"failed to send reset database request"); + None + } else { + Some(*worker_id) + } + }).collect() + } } impl GlobalBarrierWorkerContextImpl { diff --git a/src/meta/src/barrier/worker.rs b/src/meta/src/barrier/worker.rs index 2b00262ea86c7..c8c4d0faa29e1 100644 --- a/src/meta/src/barrier/worker.rs +++ b/src/meta/src/barrier/worker.rs @@ -17,6 +17,7 @@ use std::mem::replace; use std::sync::{Arc, LazyLock}; use std::time::Duration; +use anyhow::anyhow; use arc_swap::ArcSwap; use itertools::Itertools; use risingwave_common::catalog::DatabaseId; @@ -24,15 +25,16 @@ use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::{PausedReason, Recovery}; +use risingwave_pb::stream_service::streaming_control_stream_response::Response; use thiserror_ext::AsReport; use tokio::sync::mpsc; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::task::JoinHandle; use tokio::time::Instant; -use tokio_retry::strategy::{jitter, ExponentialBackoff}; +use tonic::Status; use tracing::{debug, error, info, warn, Instrument}; -use crate::barrier::checkpoint::CheckpointControl; +use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent}; use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask}; use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl}; use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager}; @@ -311,19 +313,83 @@ impl GlobalBarrierWorker { } } }, - (worker_id, resp_result) = self.control_stream_manager.next_collect_barrier_response() => { - if let Err(e) = resp_result.and_then(|resp| self.checkpoint_control.barrier_collected(resp, &mut self.control_stream_manager)) { - { - + event = self.checkpoint_control.next_event() => { + let result: MetaResult<()> = try { + match event { + CheckpointControlEvent::EnteringInitializing(entering_initializing) => { + let database_id = entering_initializing.database_id(); + let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| { + resp.root_err.as_ref().map(|root_err| { + (*worker_id, ScoredError { + error: Status::internal(&root_err.err_msg), + score: Score(root_err.score) + }) + }) + })); + Self::report_collect_failure(&self.env, &error); + if let Some(runtime_info) = self.context.reload_database_runtime_info(database_id).await? { + runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| { + warn!(database_id = database_id.database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate"); + })?; + entering_initializing.enter(runtime_info, &mut self.control_stream_manager); + } else { + info!(database_id = database_id.database_id, "database removed after reloading empty runtime info"); + entering_initializing.remove(); + } + } + CheckpointControlEvent::EnteringRunning(entering_running) => { + self.context.mark_ready(Some(entering_running.database_id())); + entering_running.enter(); + } + } + }; + if let Err(e) = result { + self.failure_recovery(e).await; + } + } + (worker_id, resp_result) = self.control_stream_manager.next_response() => { + let resp = match resp_result { + Err(e) => { if self.checkpoint_control.is_failed_at_worker_err(worker_id) { let errors = self.control_stream_manager.collect_errors(worker_id, e).await; let err = merge_node_rpc_errors("get error from control stream", errors); - self.report_collect_failure(&err); + Self::report_collect_failure(&self.env, &err); self.failure_recovery(err).await; } else { warn!(worker_id, "no barrier to collect from worker, ignore err"); } + continue; + } + Ok(resp) => resp, + }; + let result: MetaResult<()> = try { + match resp { + Response::CompleteBarrier(resp) => { + self.checkpoint_control.barrier_collected(resp, &mut self.control_stream_manager)?; + }, + Response::ReportDatabaseFailure(resp) => { + if !self.enable_recovery { + panic!("database failure reported but recovery not enabled: {:?}", resp) + } + if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(resp, &mut self.control_stream_manager) { + let database_id = entering_recovery.database_id(); + warn!(database_id = database_id.database_id, "database entering recovery"); + self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into())); + // TODO: add log on blocking time + let output = self.completing_task.wait_completing_task().await?; + entering_recovery.enter(output, &mut self.control_stream_manager); + } + } + Response::ResetDatabase(resp) => { + self.checkpoint_control.on_reset_database_resp(worker_id, resp); + } + other @ Response::Init(_) | other @ Response::Shutdown(_) => { + Err(anyhow!("get expected response: {:?}", other))?; + } } + }; + if let Err(e) = result { + self.failure_recovery(e).await; } } new_barrier = self.periodic_barriers.next_barrier(&*self.context), @@ -465,33 +531,59 @@ impl GlobalBarrierWorker { impl GlobalBarrierWorker { /// Send barrier-complete-rpc and wait for responses from all CNs - pub(super) fn report_collect_failure(&self, error: &MetaError) { + pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) { // Record failure in event log. use risingwave_pb::meta::event_log; let event = event_log::EventCollectBarrierFail { error: error.to_report_string(), }; - self.env - .event_log_manager_ref() + env.event_log_manager_ref() .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]); } } -impl GlobalBarrierWorker { +mod retry_strategy { + use std::time::Duration; + + use tokio_retry::strategy::{jitter, ExponentialBackoff}; + // Retry base interval in milliseconds. const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20; // Retry max interval. const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5); + mod retry_backoff_future { + use std::future::Future; + use std::time::Duration; + + use tokio::time::sleep; + + pub(crate) type RetryBackoffFuture = impl Future + Unpin + Send + 'static; + pub(super) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture { + Box::pin(sleep(duration)) + } + } + pub(crate) use retry_backoff_future::*; + + pub(crate) type RetryBackoffStrategy = + impl Iterator + Send + 'static; + #[inline(always)] /// Initialize a retry strategy for operation in recovery. - fn get_retry_strategy() -> impl Iterator { - ExponentialBackoff::from_millis(Self::RECOVERY_RETRY_BASE_INTERVAL) - .max_delay(Self::RECOVERY_RETRY_MAX_INTERVAL) + pub(crate) fn get_retry_strategy() -> impl Iterator { + ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL) + .max_delay(RECOVERY_RETRY_MAX_INTERVAL) .map(jitter) } + + pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy { + get_retry_strategy().map(get_retry_backoff_future) + } } +pub(crate) use retry_strategy::*; +use risingwave_common::error::tonic::extra::{Score, ScoredError}; + impl GlobalBarrierWorker { /// Recovery the whole cluster from the latest epoch. /// @@ -520,7 +612,7 @@ impl GlobalBarrierWorker { err: Option, ) { tracing::info!("recovery start!"); - let retry_strategy = Self::get_retry_strategy(); + let retry_strategy = get_retry_strategy(); // We take retry into consideration because this is the latency user sees for a cluster to // get recovered. @@ -589,8 +681,16 @@ impl GlobalBarrierWorker { } while !collecting_databases.is_empty() { let (worker_id, result) = - control_stream_manager.next_collect_barrier_response().await; + control_stream_manager.next_response().await; let resp = result?; + let resp = match resp { + Response::CompleteBarrier(resp) => { + resp + } + other => { + return Err(anyhow!("expect Response::CollectBarrier but get {:?}", other).into()); + } + }; let database_id = DatabaseId::new(resp.database_id); let (node_to_collect, _, prev_epoch) = collecting_databases.get_mut(&database_id).expect("should exist"); assert_eq!(resp.epoch, *prev_epoch); diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index a4c8eda6ffde6..d8e99269ae18a 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -26,7 +26,7 @@ use futures::future::BoxFuture; use futures::stream::{BoxStream, FuturesOrdered}; use futures::{FutureExt, StreamExt, TryFutureExt}; use itertools::Itertools; -use risingwave_common::error::tonic::extra::Score; +use risingwave_common::error::tonic::extra::{Score, ScoredError}; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::barrier_complete_response::{ PbCreateMviewProgress, PbLocalSstableInfo, @@ -432,6 +432,9 @@ impl LocalBarrierWorker { ); Ok(()) } + Request::ResetDatabase(_) => { + unimplemented!("should not receive this request yet") + } Request::Init(_) => { unreachable!() } @@ -858,7 +861,7 @@ impl LocalBarrierWorker { once(first_err) .chain(later_errs.into_iter()) - .map(|e| ScoredStreamError::new(e.clone())) + .map(|e| e.with_score()) .max_by_key(|e| e.score) .expect("non-empty") } @@ -979,33 +982,11 @@ impl LocalBarrierManager { } /// A [`StreamError`] with a score, used to find the root cause of actor failures. -#[derive(Debug, Clone)] -struct ScoredStreamError { - error: StreamError, - score: Score, -} - -impl std::fmt::Display for ScoredStreamError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.error.fmt(f) - } -} - -impl std::error::Error for ScoredStreamError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - self.error.source() - } - - fn provide<'a>(&'a self, request: &mut std::error::Request<'a>) { - self.error.provide(request); - // HIGHLIGHT: Provide the score to make it retrievable from meta service. - request.provide_value(self.score); - } -} +type ScoredStreamError = ScoredError; -impl ScoredStreamError { +impl StreamError { /// Score the given error based on hard-coded rules. - fn new(error: StreamError) -> Self { + fn with_score(self) -> ScoredStreamError { // Explicitly list all error kinds here to notice developers to update this function when // there are changes in error kinds. @@ -1052,8 +1033,8 @@ impl ScoredStreamError { } } - let score = Score(stream_error_score(&error)); - Self { error, score } + let score = Score(stream_error_score(&self)); + ScoredStreamError { error: self, score } } }