diff --git a/Makefile.toml b/Makefile.toml index 1203a847e1e94..86e9cea136bb5 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -1317,7 +1317,6 @@ dependencies = ["k", "l", "check-logs", "wait-processes-exit"] description = "Kill cluster, dump logs and check logs" [tasks.wait-processes-exit] -private = true category = "Misc" description = "Wait for RisingWave processes to exit" script = """ diff --git a/ci/scripts/deterministic-recovery-test.sh b/ci/scripts/deterministic-recovery-test.sh index 0d3a7b3fabed4..6514fe1f7c0c3 100755 --- a/ci/scripts/deterministic-recovery-test.sh +++ b/ci/scripts/deterministic-recovery-test.sh @@ -9,11 +9,19 @@ echo "--- Download artifacts" download-and-decompress-artifact risingwave_simulation . chmod +x ./risingwave_simulation -export RUST_LOG="info,risingwave_meta::barrier::recovery=debug" +export RUST_LOG="info,\ +risingwave_meta::barrier::recovery=debug,\ +risingwave_meta::rpc::ddl_controller=debug,\ +risingwave_meta::barrier::mod=debug,\ +risingwave_simulation=debug" export LOGDIR=.risingwave/log mkdir -p $LOGDIR +# FIXME(kwannoel): Why is this failing? +# echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, background_ddl" +# seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/background_ddl/sim/basic.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log' + echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, ddl" seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log' diff --git a/ci/scripts/run-backfill-tests.sh b/ci/scripts/run-backfill-tests.sh index 86dbcb376c255..d0d5eafb3c917 100755 --- a/ci/scripts/run-backfill-tests.sh +++ b/ci/scripts/run-backfill-tests.sh @@ -1,22 +1,31 @@ #!/usr/bin/env bash # Runs backfill tests. -# NOTE(kwannoel): -# The following scenario is adapted in madsim's integration tests as well. -# But this script reproduces it more reliably (I'm not sure why.) -# Hence keeping it in case we ever need to debug backfill again. # USAGE: # ```sh # cargo make ci-start ci-backfill # ./ci/scripts/run-backfill-tests.sh # ``` +# Example progress: +# dev=> select * from rw_catalog.rw_ddl_progress; +# ddl_id | ddl_statement | progress | initialized_at +#--------+------------------------------------------------+----------+------------------------------- +# 1002 | CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t | 56.12% | 2023-09-27 06:37:06.636+00:00 +#(1 row) set -euo pipefail PARENT_PATH=$(dirname "${BASH_SOURCE[0]}") +TEST_DIR=$PWD/e2e_test +BACKGROUND_DDL_DIR=$TEST_DIR/background_ddl +COMMON_DIR=$BACKGROUND_DDL_DIR/common + +CLUSTER_PROFILE='ci-1cn-1fe-with-recovery' +export RUST_LOG="risingwave_meta=debug" + run_sql_file() { psql -h localhost -p 4566 -d dev -U root -f "$@" } @@ -29,40 +38,309 @@ flush() { run_sql "FLUSH;" } -basic() { - echo "--- e2e, test_backfill_basic" +cancel_stream_jobs() { + ID=$(run_sql "select ddl_id from rw_catalog.rw_ddl_progress;" | tail -3 | head -1 | grep -E -o "[0-9]*") + echo "CANCELLING STREAM_JOB: $ID" + run_sql "CANCEL JOBS $ID;" .risingwave/log/compute-node.log 2>&1 & +} + +# Test snapshot and upstream read. +test_snapshot_and_upstream_read() { + echo "--- e2e, ci-backfill, test_snapshot_and_upstream_read" cargo make ci-start ci-backfill + run_sql_file "$PARENT_PATH"/sql/backfill/create_base_table.sql # Provide snapshot run_sql_file "$PARENT_PATH"/sql/backfill/insert.sql + + # Provide updates ... run_sql_file "$PARENT_PATH"/sql/backfill/insert.sql & + + # ... and concurrently create mv. run_sql_file "$PARENT_PATH"/sql/backfill/create_mv.sql & wait run_sql_file "$PARENT_PATH"/sql/backfill/select.sql Self { + Self::Foreground + } +} + +impl CreateType { + fn from_prost(prost: PbCreateType) -> Self { + match prost { + PbCreateType::Background => Self::Background, + PbCreateType::Foreground => Self::Foreground, + PbCreateType::Unspecified => unreachable!(), + } + } + + pub(crate) fn to_prost(self) -> PbCreateType { + match self { + Self::Background => PbCreateType::Background, + Self::Foreground => PbCreateType::Foreground, + } + } } #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] @@ -402,7 +437,7 @@ impl TableCatalog { created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0), cleaned_by_watermark: self.cleaned_by_watermark, stream_job_status: PbStreamJobStatus::Creating.into(), - create_type: PbCreateType::Foreground.into(), + create_type: self.create_type.to_prost().into(), } } @@ -455,6 +490,7 @@ impl From for TableCatalog { let id = tb.id; let tb_conflict_behavior = tb.handle_pk_conflict_behavior(); let table_type = tb.get_table_type().unwrap(); + let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground); let associated_source_id = tb.optional_associated_source_id.map(|id| match id { OptionalAssociatedSourceId::AssociatedSourceId(id) => id, }); @@ -514,6 +550,7 @@ impl From for TableCatalog { created_at_epoch: tb.created_at_epoch.map(Epoch::from), initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from), cleaned_by_watermark: matches!(tb.cleaned_by_watermark, true), + create_type: CreateType::from_prost(create_type), } } } @@ -660,6 +697,7 @@ mod tests { created_at_epoch: None, initialized_at_epoch: None, cleaned_by_watermark: false, + create_type: CreateType::Foreground, } ); assert_eq!(table, TableCatalog::from(table.to_prost(0, 0))); diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 128f1eeb24ed9..fb17537bc90e6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -26,7 +26,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody; use super::derive::derive_columns; use super::utils::{childless_record, Distill}; use super::{reorganize_elements_id, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode}; -use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion}; +use crate::catalog::table_catalog::{CreateType, TableCatalog, TableType, TableVersion}; use crate::catalog::FragmentId; use crate::optimizer::plan_node::derive::derive_pk; use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta}; @@ -232,6 +232,7 @@ impl StreamMaterialize { created_at_epoch: None, initialized_at_epoch: None, cleaned_by_watermark: false, + create_type: CreateType::Foreground, // Will be updated in the handler itself. }) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 475c5c0e32eb1..f167d73c53a46 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -23,7 +23,7 @@ use risingwave_common::catalog::{ }; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; -use crate::catalog::table_catalog::TableType; +use crate::catalog::table_catalog::{CreateType, TableType}; use crate::catalog::{ColumnId, FragmentId, TableCatalog, TableId}; use crate::optimizer::property::Cardinality; use crate::utils::WithOptions; @@ -177,6 +177,9 @@ impl TableCatalogBuilder { created_at_epoch: None, initialized_at_epoch: None, cleaned_by_watermark: false, + // NOTE(kwannoel): This may not match the create type of the materialized table. + // It should be ignored for internal tables. + create_type: CreateType::Foreground, } } diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs index 18bdc30c4165f..bd247c1e18980 100644 --- a/src/meta/service/src/notification_service.rs +++ b/src/meta/service/src/notification_service.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use risingwave_pb::backup_service::MetaBackupManifestId; -use risingwave_pb::catalog::{PbStreamJobStatus, Table}; +use risingwave_pb::catalog::Table; use risingwave_pb::common::worker_node::State::Running; use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_pb::hummock::WriteLimits; @@ -120,16 +120,7 @@ impl NotificationServiceImpl { async fn get_tables_and_creating_tables_snapshot(&self) -> (Vec, NotificationVersion) { let catalog_guard = self.catalog_manager.get_catalog_core_guard().await; - let mut tables = catalog_guard - .database - .list_tables() - .into_iter() - .filter(|t| { - t.stream_job_status == PbStreamJobStatus::Unspecified as i32 - || t.stream_job_status == PbStreamJobStatus::Created as i32 - }) - .collect_vec(); - tables.extend(catalog_guard.database.list_creating_tables()); + let tables = catalog_guard.database.list_tables(); let notification_version = self.env.notification_manager().current_version().await; (tables, notification_version) } diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 8d8076e56a233..bbe60c010b94b 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -36,7 +36,8 @@ use uuid::Uuid; use super::info::BarrierActorInfo; use super::trace::TracedEpoch; use crate::barrier::CommandChanges; -use crate::manager::{FragmentManagerRef, WorkerId}; +use crate::hummock::HummockManagerRef; +use crate::manager::{CatalogManagerRef, FragmentManagerRef, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments}; use crate::stream::{build_actor_connector_splits, SourceManagerRef, SplitAssignment}; use crate::MetaResult; @@ -216,7 +217,9 @@ impl Command { /// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given /// [`Command`]. pub struct CommandContext { - fragment_manager: FragmentManagerRef, + pub fragment_manager: FragmentManagerRef, + catalog_manager: CatalogManagerRef, + hummock_manager: HummockManagerRef, client_pool: StreamClientPoolRef, @@ -247,6 +250,8 @@ impl CommandContext { #[allow(clippy::too_many_arguments)] pub(super) fn new( fragment_manager: FragmentManagerRef, + catalog_manager: CatalogManagerRef, + hummock_manager: HummockManagerRef, client_pool: StreamClientPoolRef, info: BarrierActorInfo, prev_epoch: TracedEpoch, @@ -259,6 +264,8 @@ impl CommandContext { ) -> Self { Self { fragment_manager, + catalog_manager, + hummock_manager, client_pool, info: Arc::new(info), prev_epoch, @@ -663,7 +670,51 @@ impl CommandContext { Command::CancelStreamingJob(table_fragments) => { let node_actors = table_fragments.worker_actor_ids(); self.clean_up(node_actors).await?; - // Drop fragment info in meta store. + + // NOTE(kwannoel): At this point, meta has already registered the table ids. + // We should unregister them. + // This is required for background ddl, for foreground ddl this is a no-op. + // Foreground ddl is handled entirely by stream manager, so it will unregister + // the table ids on failure. + // On the other hand background ddl could be handled by barrier manager. + // It won't clean the tables on failure, + // since the failure could be recoverable. + // As such it needs to be handled here. + let table_id = table_fragments.table_id().table_id; + let mut table_ids = table_fragments.internal_table_ids(); + table_ids.push(table_id); + if let Err(e) = self.hummock_manager.unregister_table_ids(&table_ids).await { + tracing::warn!("Failed to unregister compaction group for {:#?}. They will be cleaned up on node restart. {:#?}", &table_ids, e); + } + + // NOTE(kwannoel): At this point, catalog manager has persisted the tables already. + // We need to cleanup the table state. So we can do it here. + // The logic is the same as above, for hummock_manager.unregister_table_ids. + if let Err(e) = self + .catalog_manager + .cancel_create_table_procedure( + table_fragments.table_id().table_id, + table_fragments.internal_table_ids(), + ) + .await + { + let table_id = table_fragments.table_id().table_id; + tracing::warn!( + table_id, + reason=?e, + "cancel_create_table_procedure failed for CancelStreamingJob", + ); + // If failed, check that table is not in meta store. + // If any table is, just panic, let meta do bootstrap recovery. + // Otherwise our persisted state is dirty. + let mut table_ids = table_fragments.internal_table_ids(); + table_ids.push(table_id); + self.catalog_manager.assert_tables_deleted(table_ids).await; + } + + // We need to drop table fragments here, + // since this is not done in stream manager (foreground ddl) + // OR barrier manager (background ddl) self.fragment_manager .drop_table_fragments_vec(&HashSet::from_iter(std::iter::once( table_fragments.table_id(), @@ -791,24 +842,4 @@ impl CommandContext { Ok(()) } - - /// Do some stuffs before the barrier is `finish`ed. Only used for `CreateStreamingJob`. - pub async fn pre_finish(&self) -> MetaResult<()> { - #[allow(clippy::single_match)] - match &self.command { - Command::CreateStreamingJob { - table_fragments, .. - } => { - // Update the state of the table fragments from `Creating` to `Created`, so that the - // fragments can be scaled. - self.fragment_manager - .mark_table_fragments_created(table_fragments.table_id()) - .await?; - } - - _ => {} - } - - Ok(()) - } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 8924992c2e18e..ed6ad289a5a68 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -50,7 +50,7 @@ use self::info::BarrierActorInfo; use self::notifier::Notifier; use self::progress::TrackingCommand; use crate::barrier::notifier::BarrierInfo; -use crate::barrier::progress::CreateMviewProgressTracker; +use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob}; use crate::barrier::BarrierEpochState::{Completed, InFlight}; use crate::hummock::HummockManagerRef; use crate::manager::sink_coordination::SinkCoordinatorManager; @@ -58,7 +58,7 @@ use crate::manager::{ CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, LocalNotification, MetaSrvEnv, WorkerId, }; -use crate::model::{ActorId, BarrierManagerState}; +use crate::model::{ActorId, BarrierManagerState, TableFragments}; use crate::rpc::metrics::MetaMetrics; use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; @@ -75,6 +75,35 @@ pub use self::command::{Command, Reschedule}; pub use self::schedule::BarrierScheduler; pub use self::trace::TracedEpoch; +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub(crate) struct TableMap { + inner: HashMap, +} + +impl TableMap { + pub fn remove(&mut self, table_id: &TableId) -> Option { + self.inner.remove(table_id) + } +} + +impl From> for TableMap { + fn from(inner: HashMap) -> Self { + Self { inner } + } +} + +impl From> for HashMap { + fn from(table_map: TableMap) -> Self { + table_map.inner + } +} + +pub(crate) type TableActorMap = TableMap>; +pub(crate) type TableUpstreamMvCountMap = TableMap>; +pub(crate) type TableDefinitionMap = TableMap; +pub(crate) type TableNotifierMap = TableMap; +pub(crate) type TableFragmentMap = TableMap; + /// Status of barrier manager. enum BarrierManagerStatus { /// Barrier manager is starting. @@ -177,7 +206,7 @@ struct CheckpointControl { metrics: Arc, /// Get notified when we finished Create MV and collect a barrier(checkpoint = true) - finished_commands: Vec, + finished_commands: Vec, } impl CheckpointControl { @@ -194,8 +223,8 @@ impl CheckpointControl { } /// Stash a command to finish later. - fn stash_command_to_finish(&mut self, finished_command: TrackingCommand) { - self.finished_commands.push(finished_command); + fn stash_command_to_finish(&mut self, finished_job: TrackingJob) { + self.finished_commands.push(finished_job); } /// Finish stashed commands. If the current barrier is not a `checkpoint`, we will not finish @@ -205,31 +234,32 @@ impl CheckpointControl { async fn finish_commands(&mut self, checkpoint: bool) -> MetaResult { for command in self .finished_commands - .extract_if(|c| checkpoint || c.context.kind.is_barrier()) + .extract_if(|c| checkpoint || c.is_barrier()) { // The command is ready to finish. We can now call `pre_finish`. - command.context.pre_finish().await?; - command - .notifiers - .into_iter() - .for_each(Notifier::notify_finished); + command.pre_finish().await?; + command.notify_finished(); } Ok(!self.finished_commands.is_empty()) } - fn cancel_command(&mut self, cancelled_command: TrackingCommand) { - if let Some(index) = self.command_ctx_queue.iter().position(|x| { - x.command_ctx.prev_epoch.value() == cancelled_command.context.prev_epoch.value() - }) { - self.command_ctx_queue.remove(index); - self.remove_changes(cancelled_command.context.command.changes()); + fn cancel_command(&mut self, cancelled_job: TrackingJob) { + if let TrackingJob::New(cancelled_command) = cancelled_job { + if let Some(index) = self.command_ctx_queue.iter().position(|x| { + x.command_ctx.prev_epoch.value() == cancelled_command.context.prev_epoch.value() + }) { + self.command_ctx_queue.remove(index); + self.remove_changes(cancelled_command.context.command.changes()); + } + } else { + // Recovered jobs do not need to be cancelled since only `RUNNING` actors will get recovered. } } fn cancel_stashed_command(&mut self, id: TableId) { self.finished_commands - .retain(|x| x.context.table_to_create() != Some(id)); + .retain(|x| x.table_to_create() != Some(id)); } /// Before resolving the actors to be sent or collected, we should first record the newly @@ -596,7 +626,7 @@ impl GlobalBarrierManager { let paused = self.take_pause_on_bootstrap().await.unwrap_or(false); let paused_reason = paused.then_some(PausedReason::Manual); - self.recovery(prev_epoch, paused_reason) + self.recovery(prev_epoch, paused_reason, true) .instrument(span) .await }; @@ -693,6 +723,8 @@ impl GlobalBarrierManager { let command_ctx = Arc::new(CommandContext::new( self.fragment_manager.clone(), + self.catalog_manager.clone(), + self.hummock_manager.clone(), self.env.stream_client_pool_ref(), info, prev_epoch.clone(), @@ -910,6 +942,7 @@ impl GlobalBarrierManager { let fail_nodes = complete_nodes .drain(index..) .chain(checkpoint_control.barrier_failed().into_iter()); + tracing::warn!("Failed to commit epoch {}: {:?}", prev_epoch, err); self.failure_recovery(err, fail_nodes, state, checkpoint_control) .await; } @@ -937,11 +970,7 @@ impl GlobalBarrierManager { } if self.enable_recovery { - // If failed, enter recovery mode. self.set_status(BarrierManagerStatus::Recovering).await; - let mut tracker = self.tracker.lock().await; - *tracker = CreateMviewProgressTracker::new(); - let latest_snapshot = self.hummock_manager.latest_snapshot(); let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch let span = tracing::info_span!( @@ -950,7 +979,12 @@ impl GlobalBarrierManager { prev_epoch = prev_epoch.value().0 ); - *state = self.recovery(prev_epoch, None).instrument(span).await; + // No need to clean dirty tables for barrier recovery, + // The foreground stream job should cleanup their own tables. + *state = self + .recovery(prev_epoch, None, false) + .instrument(span) + .await; self.set_status(BarrierManagerStatus::Running).await; } else { panic!("failed to execute barrier: {:?}", err); diff --git a/src/meta/src/barrier/notifier.rs b/src/meta/src/barrier/notifier.rs index 88acd9cd3dd7a..b28c5b01d53d9 100644 --- a/src/meta/src/barrier/notifier.rs +++ b/src/meta/src/barrier/notifier.rs @@ -30,7 +30,7 @@ pub struct BarrierInfo { /// Used for notifying the status of a scheduled command/barrier. #[derive(Debug, Default)] -pub(super) struct Notifier { +pub(crate) struct Notifier { /// Get notified when scheduled barrier is injected to compute nodes. pub injected: Option>, diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 6344667130b60..22cd6f8d9e200 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -25,14 +25,18 @@ use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgres use super::command::CommandContext; use super::notifier::Notifier; -use crate::barrier::Command; -use crate::model::ActorId; +use crate::barrier::{ + Command, TableActorMap, TableDefinitionMap, TableFragmentMap, TableNotifierMap, + TableUpstreamMvCountMap, +}; +use crate::manager::{FragmentManager, FragmentManagerRef}; +use crate::model::{ActorId, TableFragments}; +use crate::MetaResult; -type CreateMviewEpoch = Epoch; type ConsumedRows = u64; #[derive(Clone, Copy, Debug)] -enum ChainState { +pub enum ChainState { Init, ConsumingUpstream(Epoch, ConsumedRows), Done(ConsumedRows), @@ -45,10 +49,9 @@ struct Progress { done_count: usize, - /// Creating mv id. - creating_mv_id: TableId, - - /// Upstream mv count. Keep track of how many times each upstream MV appears. + /// Upstream mv count. + /// Keep track of how many times each upstream MV + /// appears in this stream job. upstream_mv_count: HashMap, /// Upstream mvs total key count. @@ -65,7 +68,6 @@ impl Progress { /// Create a [`Progress`] for some creating mview, with all `actors` containing the chain nodes. fn new( actors: impl IntoIterator, - creating_mv_id: TableId, upstream_mv_count: HashMap, upstream_total_key_count: u64, definition: String, @@ -79,7 +81,6 @@ impl Progress { Self { states, done_count: 0, - creating_mv_id, upstream_mv_count, upstream_total_key_count, consumed_rows: 0, @@ -139,6 +140,80 @@ impl Progress { } } +/// There are 2 kinds of `TrackingJobs`: +/// 1. `New`. This refers to the "New" type of tracking job. +/// It is instantiated and managed by the stream manager. +/// On recovery, the stream manager will stop managing the job. +/// 2. `Recovered`. This refers to the "Recovered" type of tracking job. +/// On recovery, the barrier manager will recover and start managing the job. +pub enum TrackingJob { + New(TrackingCommand), + Recovered(RecoveredTrackingJob), +} + +impl TrackingJob { + fn fragment_manager(&self) -> &FragmentManager { + match self { + TrackingJob::New(command) => command.context.fragment_manager.as_ref(), + TrackingJob::Recovered(recovered) => recovered.fragment_manager.as_ref(), + } + } + + pub(crate) fn is_barrier(&self) -> bool { + match self { + TrackingJob::Recovered(_) => true, + TrackingJob::New(command) => command.context.kind.is_barrier(), + } + } + + pub(crate) async fn pre_finish(&self) -> MetaResult<()> { + let table_fragments = match &self { + TrackingJob::New(command) => match &command.context.command { + Command::CreateStreamingJob { + table_fragments, .. + } => Some(table_fragments), + _ => None, + }, + TrackingJob::Recovered(recovered) => Some(&recovered.fragments), + }; + // Update the state of the table fragments from `Creating` to `Created`, so that the + // fragments can be scaled. + if let Some(table_fragments) = table_fragments { + self.fragment_manager() + .mark_table_fragments_created(table_fragments.table_id()) + .await?; + } + Ok(()) + } + + pub(crate) fn notify_finished(self) { + match self { + TrackingJob::New(command) => { + command + .notifiers + .into_iter() + .for_each(Notifier::notify_finished); + } + TrackingJob::Recovered(recovered) => { + recovered.finished.notify_finished(); + } + } + } + + pub(crate) fn table_to_create(&self) -> Option { + match self { + TrackingJob::New(command) => command.context.table_to_create(), + TrackingJob::Recovered(recovered) => Some(recovered.fragments.table_id()), + } + } +} + +pub struct RecoveredTrackingJob { + pub fragments: TableFragments, + pub finished: Notifier, + pub fragment_manager: FragmentManagerRef, +} + /// The command tracking by the [`CreateMviewProgressTracker`]. pub(super) struct TrackingCommand { /// The context of the command. @@ -150,15 +225,80 @@ pub(super) struct TrackingCommand { /// Track the progress of all creating mviews. When creation is done, `notify_finished` will be /// called on registered notifiers. +/// +/// Tracking is done as follows: +/// 1. We identify a `StreamJob` by its `TableId` of its `Materialized` table. +/// 2. For each stream job, there are several actors which run its tasks. +/// 3. With `progress_map` we can use the ID of the `StreamJob` to view its progress. +/// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`. pub(super) struct CreateMviewProgressTracker { - /// Progress of the create-mview DDL indicated by the epoch. - progress_map: HashMap, + /// Progress of the create-mview DDL indicated by the TableId. + progress_map: HashMap, /// Find the epoch of the create-mview DDL by the actor containing the chain node. - actor_map: HashMap, + actor_map: HashMap, } impl CreateMviewProgressTracker { + /// This step recovers state from the meta side: + /// 1. `Tables`. + /// 2. `TableFragments`. + /// + /// Other state are persisted by the `BackfillExecutor`, such as: + /// 1. `CreateMviewProgress`. + /// 2. `Backfill` position. + pub fn recover( + table_map: TableActorMap, + mut upstream_mv_counts: TableUpstreamMvCountMap, + mut definitions: TableDefinitionMap, + version_stats: HummockVersionStats, + mut finished_notifiers: TableNotifierMap, + mut table_fragment_map: TableFragmentMap, + fragment_manager: FragmentManagerRef, + ) -> Self { + let mut actor_map = HashMap::new(); + let mut progress_map = HashMap::new(); + let table_map: HashMap<_, Vec> = table_map.into(); + for (creating_table_id, actors) in table_map { + // 1. Recover `ChainState` in the tracker. + let mut states = HashMap::new(); + for actor in actors { + actor_map.insert(actor, creating_table_id); + states.insert(actor, ChainState::ConsumingUpstream(Epoch(0), 0)); + } + let upstream_mv_count = upstream_mv_counts.remove(&creating_table_id).unwrap(); + let upstream_total_key_count = upstream_mv_count + .iter() + .map(|(upstream_mv, count)| { + *count as u64 + * version_stats + .table_stats + .get(&upstream_mv.table_id) + .map_or(0, |stat| stat.total_key_count as u64) + }) + .sum(); + let definition = definitions.remove(&creating_table_id).unwrap(); + let progress = Progress { + states, + done_count: 0, // Fill only after first barrier pass + upstream_mv_count, + upstream_total_key_count, + consumed_rows: 0, // Fill only after first barrier pass + definition, + }; + let tracking_job = TrackingJob::Recovered(RecoveredTrackingJob { + fragments: table_fragment_map.remove(&creating_table_id).unwrap(), + finished: finished_notifiers.remove(&creating_table_id).unwrap(), + fragment_manager: fragment_manager.clone(), + }); + progress_map.insert(creating_table_id, (progress, tracking_job)); + } + Self { + progress_map, + actor_map, + } + } + pub fn new() -> Self { Self { progress_map: Default::default(), @@ -168,9 +308,9 @@ impl CreateMviewProgressTracker { pub fn gen_ddl_progress(&self) -> Vec { self.progress_map - .values() - .map(|(x, _)| DdlProgress { - id: x.creating_mv_id.table_id as u64, + .iter() + .map(|(table_id, (x, _))| DdlProgress { + id: table_id.table_id as u64, statement: x.definition.clone(), progress: format!("{:.2}%", x.calculate_progress() * 100.0), }) @@ -183,7 +323,7 @@ impl CreateMviewProgressTracker { pub fn find_cancelled_command( &mut self, actors_to_cancel: HashSet, - ) -> Option { + ) -> Option { let epochs = actors_to_cancel .into_iter() .map(|actor_id| self.actor_map.get(&actor_id)) @@ -205,16 +345,11 @@ impl CreateMviewProgressTracker { &mut self, command: TrackingCommand, version_stats: &HummockVersionStats, - ) -> Option { + ) -> Option { let actors = command.context.actors_to_track(); if actors.is_empty() { // The command can be finished immediately. - return Some(command); - } - - let ddl_epoch = command.context.curr_epoch.value(); - for &actor in &actors { - self.actor_map.insert(actor, ddl_epoch); + return Some(TrackingJob::New(command)); } let (creating_mv_id, upstream_mv_count, upstream_total_key_count, definition) = @@ -258,14 +393,19 @@ impl CreateMviewProgressTracker { unreachable!("Must be CreateStreamingJob."); }; + for &actor in &actors { + self.actor_map.insert(actor, creating_mv_id); + } + let progress = Progress::new( actors, - creating_mv_id, upstream_mv_count, upstream_total_key_count, definition, ); - let old = self.progress_map.insert(ddl_epoch, (progress, command)); + let old = self + .progress_map + .insert(creating_mv_id, (progress, TrackingJob::New(command))); assert!(old.is_none()); None } @@ -277,9 +417,9 @@ impl CreateMviewProgressTracker { &mut self, progress: &CreateMviewProgress, version_stats: &HummockVersionStats, - ) -> Option { + ) -> Option { let actor = progress.chain_actor_id; - let Some(epoch) = self.actor_map.get(&actor).copied() else { + let Some(table_id) = self.actor_map.get(&actor).copied() else { // On restart, backfill will ALWAYS notify CreateMviewProgressTracker, // even if backfill is finished on recovery. // This is because we don't know if only this actor is finished, @@ -299,7 +439,7 @@ impl CreateMviewProgressTracker { ChainState::ConsumingUpstream(progress.consumed_epoch.into(), progress.consumed_rows) }; - match self.progress_map.entry(epoch) { + match self.progress_map.entry(table_id) { Entry::Occupied(mut o) => { let progress = &mut o.get_mut().0; @@ -307,6 +447,7 @@ impl CreateMviewProgressTracker { .upstream_mv_count .iter() .map(|(upstream_mv, count)| { + assert_ne!(*count, 0); *count as u64 * version_stats .table_stats @@ -318,7 +459,10 @@ impl CreateMviewProgressTracker { progress.update(actor, new_state, upstream_total_key_count); if progress.is_done() { - tracing::debug!("all actors done for creating mview with epoch {}!", epoch); + tracing::debug!( + "all actors done for creating mview with table_id {}!", + table_id + ); // Clean-up the mapping from actors to DDL epoch. for actor in o.get().0.actors() { diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index b3b57f2be58f1..21197a8df98d4 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; +use anyhow::anyhow; use futures::future::try_join_all; use itertools::Itertools; +use risingwave_common::catalog::TableId; use risingwave_pb::common::ActorInfo; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::{BarrierKind, Mutation}; @@ -25,6 +27,7 @@ use risingwave_pb::stream_plan::AddMutation; use risingwave_pb::stream_service::{ BroadcastActorInfoTableRequest, BuildActorsRequest, ForceStopActorsRequest, UpdateActorsRequest, }; +use tokio::sync::oneshot; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tracing::{debug, warn, Instrument}; use uuid::Uuid; @@ -32,6 +35,8 @@ use uuid::Uuid; use super::TracedEpoch; use crate::barrier::command::CommandContext; use crate::barrier::info::BarrierActorInfo; +use crate::barrier::notifier::Notifier; +use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::{CheckpointControl, Command, GlobalBarrierManager}; use crate::manager::WorkerId; use crate::model::{BarrierManagerState, MigrationPlan}; @@ -60,22 +65,30 @@ impl GlobalBarrierManager { .await } + /// Please look at `CatalogManager::clean_dirty_tables` for more details. + /// This should only be called for bootstrap recovery. + async fn clean_dirty_tables(&self) -> MetaResult<()> { + let fragment_manager = self.fragment_manager.clone(); + self.catalog_manager + .clean_dirty_tables(fragment_manager) + .await?; + Ok(()) + } + /// Clean up all dirty streaming jobs. async fn clean_dirty_fragments(&self) -> MetaResult<()> { let stream_job_ids = self.catalog_manager.list_stream_job_ids().await?; let to_drop_table_fragments = self .fragment_manager - .list_dirty_table_fragments(|tf| { - !stream_job_ids.contains(&tf.table_id().table_id) || !tf.is_created() - }) + .list_dirty_table_fragments(|tf| !stream_job_ids.contains(&tf.table_id().table_id)) .await; - let to_drop_streaming_ids = to_drop_table_fragments .iter() .map(|t| t.table_id()) .collect(); debug!("clean dirty table fragments: {:?}", to_drop_streaming_ids); + self.fragment_manager .drop_table_fragments_vec(&to_drop_streaming_ids) .await?; @@ -86,7 +99,7 @@ impl GlobalBarrierManager { &to_drop_table_fragments ) .await.inspect_err(|e| - tracing::warn!( + warn!( "Failed to unregister compaction group for {:#?}. They will be cleaned up on node restart. {:#?}", to_drop_table_fragments, e) @@ -100,6 +113,101 @@ impl GlobalBarrierManager { Ok(()) } + async fn recover_background_mv_progress(&self) -> MetaResult<()> { + let creating_tables = self.catalog_manager.list_creating_background_mvs().await; + let creating_table_ids = creating_tables + .iter() + .map(|t| TableId { table_id: t.id }) + .collect_vec(); + + let mut senders = HashMap::new(); + let mut receivers = Vec::new(); + for table_id in creating_table_ids.iter().copied() { + let (finished_tx, finished_rx) = oneshot::channel(); + senders.insert( + table_id, + Notifier { + finished: Some(finished_tx), + ..Default::default() + }, + ); + + let fragments = self + .fragment_manager + .select_table_fragments_by_table_id(&table_id) + .await?; + let internal_table_ids = fragments.internal_table_ids(); + let internal_tables = self.catalog_manager.get_tables(&internal_table_ids).await; + let table = self.catalog_manager.get_tables(&[table_id.table_id]).await; + assert_eq!(table.len(), 1, "should only have 1 materialized table"); + let table = table.into_iter().next().unwrap(); + receivers.push((table, internal_tables, finished_rx)); + } + + let table_map = self + .fragment_manager + .get_table_id_actor_mapping(&creating_table_ids) + .await; + let table_fragment_map = self + .fragment_manager + .get_table_id_table_fragment_map(&creating_table_ids) + .await?; + let upstream_mv_counts = self + .fragment_manager + .get_upstream_relation_counts(&creating_table_ids) + .await; + let definitions: HashMap<_, _> = creating_tables + .into_iter() + .map(|t| (TableId { table_id: t.id }, t.definition)) + .collect(); + let version_stats = self.hummock_manager.get_version_stats().await; + // If failed, enter recovery mode. + { + let mut tracker = self.tracker.lock().await; + *tracker = CreateMviewProgressTracker::recover( + table_map.into(), + upstream_mv_counts.into(), + definitions.into(), + version_stats, + senders.into(), + table_fragment_map.into(), + self.fragment_manager.clone(), + ); + } + for (table, internal_tables, finished) in receivers { + let catalog_manager = self.catalog_manager.clone(); + tokio::spawn(async move { + let res: MetaResult<()> = try { + tracing::debug!("recovering stream job {}", table.id); + finished + .await + .map_err(|e| anyhow!("failed to finish command: {}", e))?; + + tracing::debug!("finished stream job {}", table.id); + // Once notified that job is finished we need to notify frontend. + // and mark catalog as created and commit to meta. + // both of these are done by catalog manager. + catalog_manager + .finish_create_table_procedure(internal_tables, table.clone()) + .await?; + tracing::debug!("notified frontend for stream job {}", table.id); + }; + if let Err(e) = res.as_ref() { + tracing::error!( + "stream job {} interrupted, will retry after recovery: {e:?}", + table.id + ); + // NOTE(kwannoel): We should not cleanup stream jobs, + // we don't know if it's just due to CN killed, + // or the job has actually failed. + // Users have to manually cancel the stream jobs, + // if they want to clean it. + } + }); + } + Ok(()) + } + /// Recovery the whole cluster from the latest epoch. /// /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be @@ -111,6 +219,7 @@ impl GlobalBarrierManager { &self, prev_epoch: TracedEpoch, paused_reason: Option, + bootstrap_recovery: bool, ) -> BarrierManagerState { // Mark blocked and abort buffered schedules, they might be dirty already. self.scheduled_barriers @@ -118,12 +227,25 @@ impl GlobalBarrierManager { .await; tracing::info!("recovery start!"); + if bootstrap_recovery { + self.clean_dirty_tables() + .await + .expect("clean dirty tables should not fail"); + } self.clean_dirty_fragments() .await .expect("clean dirty fragments"); + self.sink_manager.reset().await; let retry_strategy = Self::get_retry_strategy(); + // Mview progress needs to be recovered. + tracing::info!("recovering mview progress"); + self.recover_background_mv_progress() + .await + .expect("recover mview progress should not fail"); + tracing::info!("recovered mview progress"); + // We take retry into consideration because this is the latency user sees for a cluster to // get recovered. let recovery_timer = self.metrics.recovery_latency.start_timer(); @@ -172,6 +294,8 @@ impl GlobalBarrierManager { // Inject the `Initial` barrier to initialize all executors. let command_ctx = Arc::new(CommandContext::new( self.fragment_manager.clone(), + self.catalog_manager.clone(), + self.hummock_manager.clone(), self.env.stream_client_pool_ref(), info, prev_epoch.clone(), @@ -204,7 +328,7 @@ impl GlobalBarrierManager { warn!(err = ?err, "post_collect failed"); Err(err) } else { - Ok((new_epoch, response)) + Ok((new_epoch.clone(), response)) } } Err(err) => { diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index b056fea0e35e7..f8332819a4610 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -56,6 +56,9 @@ enum MetaErrorInner { #[error("{0} id not found: {1}")] CatalogIdNotFound(&'static str, u32), + #[error("table_fragment not exist: id={0}")] + FragmentNotFound(u32), + #[error("{0} with name {1} exists")] Duplicated(&'static str, String), @@ -135,6 +138,14 @@ impl MetaError { MetaErrorInner::CatalogIdNotFound(relation, id.into()).into() } + pub fn fragment_not_found>(id: T) -> Self { + MetaErrorInner::FragmentNotFound(id.into()).into() + } + + pub fn is_fragment_not_found(&self) -> bool { + matches!(self.inner.as_ref(), &MetaErrorInner::FragmentNotFound(..)) + } + pub fn catalog_duplicated>(relation: &'static str, name: T) -> Self { MetaErrorInner::Duplicated(relation, name.into()).into() } diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index ad1928e0bdd50..62b5692ce82ba 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -16,10 +16,12 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; use itertools::Itertools; +use risingwave_common::bail; use risingwave_common::catalog::TableOption; use risingwave_pb::catalog::table::TableType; use risingwave_pb::catalog::{ - Connection, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, Table, View, + Connection, CreateType, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, + StreamJobStatus, Table, View, }; use super::{ConnectionId, DatabaseId, FunctionId, RelationId, SchemaId, SinkId, SourceId, ViewId}; @@ -194,12 +196,16 @@ impl DatabaseManager { } pub fn check_relation_name_duplicated(&self, relation_key: &RelationKey) -> MetaResult<()> { - if self.tables.values().any(|x| { + if let Some(t) = self.tables.values().find(|x| { x.database_id == relation_key.0 && x.schema_id == relation_key.1 && x.name.eq(&relation_key.2) }) { - Err(MetaError::catalog_duplicated("table", &relation_key.2)) + if t.stream_job_status == StreamJobStatus::Creating as i32 { + bail!("table is in creating procedure: {}", t.id); + } else { + Err(MetaError::catalog_duplicated("table", &relation_key.2)) + } } else if self.sources.values().any(|x| { x.database_id == relation_key.0 && x.schema_id == relation_key.1 @@ -258,9 +264,22 @@ impl DatabaseManager { self.databases.values().cloned().collect_vec() } - pub fn list_creating_tables(&self) -> Vec
{ - self.in_progress_creating_tables + pub fn list_creating_background_mvs(&self) -> Vec
{ + self.tables + .values() + .filter(|&t| { + t.stream_job_status == PbStreamJobStatus::Creating as i32 + && t.table_type == TableType::MaterializedView as i32 + && t.create_type == CreateType::Background as i32 + }) + .cloned() + .collect_vec() + } + + pub fn list_persisted_creating_tables(&self) -> Vec
{ + self.tables .values() + .filter(|&t| t.stream_job_status == PbStreamJobStatus::Creating as i32) .cloned() .collect_vec() } @@ -389,10 +408,12 @@ impl DatabaseManager { .contains(&relation.clone()) } + /// For all types of DDL pub fn mark_creating(&mut self, relation: &RelationKey) { self.in_progress_creation_tracker.insert(relation.clone()); } + /// Only for streaming DDL pub fn mark_creating_streaming_job(&mut self, table_id: TableId, key: RelationKey) { self.in_progress_creation_streaming_job .insert(table_id, key); @@ -417,6 +438,11 @@ impl DatabaseManager { self.in_progress_creation_streaming_job.keys().cloned() } + pub fn clear_creating_stream_jobs(&mut self) { + self.in_progress_creation_tracker.clear(); + self.in_progress_creation_streaming_job.clear(); + } + pub fn mark_creating_tables(&mut self, tables: &[Table]) { self.in_progress_creating_tables .extend(tables.iter().map(|t| (t.id, t.clone()))); diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 1a74608c848a1..8b26b8afa11d9 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -43,7 +43,7 @@ use crate::model::{ }; use crate::storage::Transaction; use crate::stream::{SplitAssignment, TableRevision}; -use crate::MetaResult; +use crate::{MetaError, MetaResult}; pub struct FragmentManagerCore { table_fragments: BTreeMap, @@ -163,6 +163,56 @@ impl FragmentManager { map.values().cloned().collect() } + /// The `table_ids` here should correspond to stream jobs. + /// We get their corresponding table fragment, and from there, + /// we get the actors that are in the table fragment. + pub async fn get_table_id_actor_mapping( + &self, + table_ids: &[TableId], + ) -> HashMap> { + let map = &self.core.read().await.table_fragments; + let mut table_map = HashMap::new(); + for table_id in table_ids { + if let Some(table_fragment) = map.get(table_id) { + let mut actors = vec![]; + for fragment in table_fragment.fragments.values() { + for actor in &fragment.actors { + actors.push(actor.actor_id) + } + } + table_map.insert(*table_id, actors); + } + } + table_map + } + + /// Gets the counts for each upstream relation that each stream job + /// indicated by `table_ids` depends on. + /// For example in the following query: + /// ```sql + /// CREATE MATERIALIZED VIEW m1 AS + /// SELECT * FROM t1 JOIN t2 ON t1.a = t2.a JOIN t3 ON t2.b = t3.b + /// ``` + /// + /// We have t1 occurring once, and t2 occurring once. + pub async fn get_upstream_relation_counts( + &self, + table_ids: &[TableId], + ) -> HashMap> { + let map = &self.core.read().await.table_fragments; + let mut upstream_relation_counts = HashMap::new(); + for table_id in table_ids { + if let Some(table_fragments) = map.get(table_id) { + let dependent_ids = table_fragments.dependent_table_ids(); + let r = upstream_relation_counts.insert(*table_id, dependent_ids); + assert!(r.is_none(), "Each table_id should be unique!") + } else { + upstream_relation_counts.insert(*table_id, HashMap::new()); + } + } + upstream_relation_counts + } + pub fn get_mv_id_to_internal_table_ids_mapping(&self) -> Option)>> { match self.core.try_read() { Ok(core) => Some( @@ -231,10 +281,11 @@ impl FragmentManager { table_id: &TableId, ) -> MetaResult { let map = &self.core.read().await.table_fragments; - Ok(map - .get(table_id) - .cloned() - .with_context(|| format!("table_fragment not exist: id={}", table_id))?) + if let Some(table_fragment) = map.get(table_id) { + Ok(table_fragment.clone()) + } else { + Err(MetaError::fragment_not_found(table_id.table_id)) + } } pub async fn select_table_fragments_by_ids( @@ -244,15 +295,32 @@ impl FragmentManager { let map = &self.core.read().await.table_fragments; let mut table_fragments = Vec::with_capacity(table_ids.len()); for table_id in table_ids { - table_fragments.push( - map.get(table_id) - .cloned() - .with_context(|| format!("table_fragment not exist: id={}", table_id))?, - ); + table_fragments.push(if let Some(table_fragment) = map.get(table_id) { + table_fragment.clone() + } else { + return Err(MetaError::fragment_not_found(table_id.table_id)); + }); } Ok(table_fragments) } + pub async fn get_table_id_table_fragment_map( + &self, + table_ids: &[TableId], + ) -> MetaResult> { + let map = &self.core.read().await.table_fragments; + let mut id_to_fragment = HashMap::new(); + for table_id in table_ids { + let table_fragment = if let Some(table_fragment) = map.get(table_id) { + table_fragment.clone() + } else { + return Err(MetaError::fragment_not_found(table_id.table_id)); + }; + id_to_fragment.insert(*table_id, table_fragment); + } + Ok(id_to_fragment) + } + /// Start create a new `TableFragments` and insert it into meta store, currently the actors' /// state is `ActorState::Inactive` and the table fragments' state is `State::Initial`. pub async fn start_create_table_fragments( @@ -499,6 +567,8 @@ impl FragmentManager { /// Drop table fragments info and remove downstream actor infos in fragments from its dependent /// tables. + /// If table fragments already deleted, this should just be noop, + /// the delete function (`table_fragments.remove`) will not return an error. pub async fn drop_table_fragments_vec(&self, table_ids: &HashSet) -> MetaResult<()> { let mut guard = self.core.write().await; let current_revision = guard.table_revision; @@ -514,7 +584,7 @@ impl FragmentManager { table_fragments.remove(table_fragment.table_id()); let chain_actor_ids = table_fragment.chain_actor_ids(); let dependent_table_ids = table_fragment.dependent_table_ids(); - for dependent_table_id in dependent_table_ids { + for (dependent_table_id, _) in dependent_table_ids { if table_ids.contains(&dependent_table_id) { continue; } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index be3ed42b01b4d..bcac32922d180 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -17,7 +17,7 @@ mod fragment; mod user; mod utils; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::iter; use std::option::Option::Some; use std::sync::Arc; @@ -32,9 +32,10 @@ use risingwave_common::catalog::{ DEFAULT_SUPER_USER_FOR_PG_ID, DEFAULT_SUPER_USER_ID, SYSTEM_SCHEMAS, }; use risingwave_common::{bail, ensure}; -use risingwave_pb::catalog::table::OptionalAssociatedSourceId; +use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, TableType}; use risingwave_pb::catalog::{ - Connection, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, Table, View, + Connection, CreateType, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, + StreamJobStatus, Table, View, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, Object}; @@ -44,7 +45,7 @@ use tokio::sync::{Mutex, MutexGuard}; use user::*; use crate::manager::{IdCategory, MetaSrvEnv, NotificationVersion, StreamingJob}; -use crate::model::{BTreeMapTransaction, MetadataModel, ValTransaction}; +use crate::model::{BTreeMapTransaction, MetadataModel, TableFragments, ValTransaction}; use crate::storage::Transaction; use crate::{MetaError, MetaResult}; @@ -115,6 +116,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::epoch::Epoch; use risingwave_pb::meta::cancel_creating_jobs_request::CreatingJobInfo; use risingwave_pb::meta::relation::RelationInfo; +use risingwave_pb::meta::table_fragments::State; use risingwave_pb::meta::{Relation, RelationGroup}; pub(crate) use {commit_meta, commit_meta_with_trx}; @@ -629,9 +631,13 @@ impl CatalogManager { pub async fn start_create_stream_job_procedure( &self, stream_job: &StreamingJob, + internal_tables: Vec
, ) -> MetaResult<()> { match stream_job { - StreamingJob::MaterializedView(table) => self.start_create_table_procedure(table).await, + StreamingJob::MaterializedView(table) => { + self.start_create_table_procedure(table, internal_tables) + .await + } StreamingJob::Sink(sink) => self.start_create_sink_procedure(sink).await, StreamingJob::Index(index, index_table) => { self.start_create_index_procedure(index, index_table).await @@ -641,7 +647,7 @@ impl CatalogManager { self.start_create_table_procedure_with_source(source, table) .await } else { - self.start_create_table_procedure(table).await + self.start_create_table_procedure(table, vec![]).await } } } @@ -694,7 +700,11 @@ impl CatalogManager { } /// This is used for both `CREATE TABLE` and `CREATE MATERIALIZED VIEW`. - pub async fn start_create_table_procedure(&self, table: &Table) -> MetaResult<()> { + pub async fn start_create_table_procedure( + &self, + table: &Table, + internal_tables: Vec
, + ) -> MetaResult<()> { let core = &mut *self.core.lock().await; let database_core = &mut core.database; let user_core = &mut core.user; @@ -707,19 +717,151 @@ impl CatalogManager { #[cfg(not(test))] user_core.ensure_user_id(table.owner)?; let key = (table.database_id, table.schema_id, table.name.clone()); + database_core.check_relation_name_duplicated(&key)?; - if database_core.has_in_progress_creation(&key) { - bail!("table is in creating procedure"); - } else { - database_core.mark_creating(&key); - database_core.mark_creating_streaming_job(table.id, key); - for &dependent_relation_id in &table.dependent_relations { - database_core.increase_ref_count(dependent_relation_id); + let mut tables = BTreeMapTransaction::new(&mut database_core.tables); + assert!( + !tables.contains_key(&table.id), + "table must not already exist in meta" + ); + for table in internal_tables { + tables.insert(table.id, table); + } + tables.insert(table.id, table.clone()); + commit_meta!(self, tables)?; + + for &dependent_relation_id in &table.dependent_relations { + database_core.increase_ref_count(dependent_relation_id); + } + user_core.increase_ref(table.owner); + Ok(()) + } + + fn assert_table_creating(tables: &BTreeMap, table: &Table) { + if let Some(t) = tables.get(&table.id) + && let Ok(StreamJobStatus::Creating) = t.get_stream_job_status() + {} else { + panic!("Table must be in creating procedure: {table:#?}") + } + } + + pub async fn assert_tables_deleted(&self, table_ids: Vec) { + let core = self.core.lock().await; + let tables = &core.database.tables; + for id in table_ids { + assert_eq!(tables.get(&id), None,) + } + } + + /// We clean the following tables: + /// 1. Those which belonged to incomplete Foreground jobs. + /// 2. Those which did not persist their table fragments, we can't recover these. + /// 3. Those which were only initialized, but not actually running yet. + /// 4. From 2, since we don't have internal table ids from the fragments, + /// we can detect hanging table ids by just finding all internal ids + /// with: + /// 1. `stream_job_status` = CREATING + /// 2. Not belonging to a background stream job. + /// Clean up these hanging tables by the id. + pub async fn clean_dirty_tables(&self, fragment_manager: FragmentManagerRef) -> MetaResult<()> { + let creating_tables: Vec
= self.list_persisted_creating_tables().await; + tracing::debug!( + "creating_tables ids: {:#?}", + creating_tables.iter().map(|t| t.id).collect_vec() + ); + let mut reserved_internal_tables = HashSet::new(); + let mut tables_to_clean = vec![]; + let mut internal_tables_to_clean = vec![]; + for table in creating_tables { + tracing::trace!( + "checking table {} definition: {}, create_type: {:#?}, table_type: {:#?}", + table.id, + table.definition, + table.get_create_type().unwrap_or(CreateType::Foreground), + table.get_table_type().unwrap(), + ); + // 1. Incomplete Foreground jobs + if table.create_type == CreateType::Foreground as i32 + && table.table_type != TableType::Internal as i32 + // || table.create_type == CreateType::Unspecified as i32 + { + tracing::debug!("cleaning table_id for foreground: {:#?}", table.id); + tables_to_clean.push(table); + continue; } - user_core.increase_ref(table.owner); - Ok(()) + if table.table_type == TableType::Internal as i32 { + internal_tables_to_clean.push(table); + continue; + } + + // 2. No table fragments + assert_ne!(table.table_type, TableType::Internal as i32); + match fragment_manager + .select_table_fragments_by_table_id(&table.id.into()) + .await + { + Err(e) => { + if e.is_fragment_not_found() { + tracing::debug!("cleaning table_id for no fragments: {:#?}", table.id); + tables_to_clean.push(table); + continue; + } else { + return Err(e); + } + } + Ok(fragment) => { + let fragment: TableFragments = fragment; + // 3. For those in initial state (i.e. not running / created), + // we should purge them. + if fragment.state() == State::Initial { + tracing::debug!("cleaning table_id no initial state: {:#?}", table.id); + tables_to_clean.push(table); + continue; + } else { + assert_eq!(table.create_type, CreateType::Background as i32); + // 4. Get all the corresponding internal tables, the rest we can purge. + for id in fragment.internal_table_ids() { + reserved_internal_tables.insert(id); + } + continue; + } + } + } + } + for t in internal_tables_to_clean { + if !reserved_internal_tables.contains(&t.id) { + tracing::debug!( + "cleaning table_id for internal tables not reserved: {:#?}", + t.id + ); + tables_to_clean.push(t); + } + } + + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + let tables = &mut database_core.tables; + let mut tables = BTreeMapTransaction::new(tables); + for table in &tables_to_clean { + tracing::debug!("cleaning table_id: {}", table.id); + let table = tables.remove(table.id); + assert!(table.is_some()) + } + commit_meta!(self, tables)?; + + database_core.clear_creating_stream_jobs(); + let user_core = &mut core.user; + for table in &tables_to_clean { + // Recovered when init database manager. + for relation_id in &table.dependent_relations { + database_core.decrease_ref_count(*relation_id); + } + // Recovered when init user manager. + user_core.decrease_ref(table.owner); } + + Ok(()) } /// This is used for both `CREATE TABLE` and `CREATE MATERIALIZED VIEW`. @@ -730,17 +872,11 @@ impl CatalogManager { ) -> MetaResult { let core = &mut *self.core.lock().await; let database_core = &mut core.database; - let mut tables = BTreeMapTransaction::new(&mut database_core.tables); - let key = (table.database_id, table.schema_id, table.name.clone()); - assert!( - !tables.contains_key(&table.id) - && database_core.in_progress_creation_tracker.contains(&key), - "table must be in creating procedure" - ); - database_core.in_progress_creation_tracker.remove(&key); - database_core - .in_progress_creation_streaming_job - .remove(&table.id); + let tables = &mut database_core.tables; + if cfg!(not(test)) { + Self::assert_table_creating(tables, &table); + } + let mut tables = BTreeMapTransaction::new(tables); table.stream_job_status = PbStreamJobStatus::Created.into(); tables.insert(table.id, table.clone()); @@ -769,23 +905,60 @@ impl CatalogManager { Ok(version) } - pub async fn cancel_create_table_procedure(&self, table: &Table) { - let core = &mut *self.core.lock().await; - let database_core = &mut core.database; - let user_core = &mut core.user; - let key = (table.database_id, table.schema_id, table.name.clone()); - assert!( - !database_core.tables.contains_key(&table.id) - && database_core.has_in_progress_creation(&key), - "table must be in creating procedure" - ); + /// Used to cleanup states in stream manager. + /// It is required because failure may not necessarily happen in barrier, + /// e.g. when cordon nodes. + /// and we still need some way to cleanup the state. + pub async fn cancel_create_table_procedure( + &self, + table_id: TableId, + internal_table_ids: Vec, + ) -> MetaResult<()> { + let table = { + let core = &mut self.core.lock().await; + let database_core = &mut core.database; + let tables = &mut database_core.tables; + let Some(table) = tables.get(&table_id).cloned() else { + bail!( + "table_id {} missing when attempting to cancel job", + table_id + ) + }; + table + }; - database_core.unmark_creating(&key); - database_core.unmark_creating_streaming_job(table.id); - for &dependent_relation_id in &table.dependent_relations { - database_core.decrease_ref_count(dependent_relation_id); + tracing::trace!("cleanup tables for {}", table.id); + { + let core = &mut self.core.lock().await; + let database_core = &mut core.database; + + let mut table_ids = vec![table.id]; + table_ids.extend(internal_table_ids); + + let tables = &mut database_core.tables; + let mut tables = BTreeMapTransaction::new(tables); + for table_id in table_ids { + tables.remove(table_id); + } + commit_meta!(self, tables)?; + } + + { + let core = &mut self.core.lock().await; + { + let user_core = &mut core.user; + user_core.decrease_ref(table.owner); + } + + { + let database_core = &mut core.database; + for &dependent_relation_id in &table.dependent_relations { + database_core.decrease_ref_count(dependent_relation_id); + } + } } - user_core.decrease_ref(table.owner); + + Ok(()) } /// return id of streaming jobs in the database which need to be dropped by stream manager. @@ -975,7 +1148,7 @@ impl CatalogManager { match drop_mode { DropMode::Restrict => { return Err(MetaError::permission_denied(format!( - "Fail to delete table `{}` because {} other relation(s) depend on it", + "Fail to delete index table `{}` because {} other relation(s) depend on it", index_table.name, ref_count ))); } @@ -2205,6 +2378,24 @@ impl CatalogManager { self.core.lock().await.database.list_tables() } + /// Lists table catalogs for mviews, without their internal tables. + pub async fn list_creating_background_mvs(&self) -> Vec
{ + self.core + .lock() + .await + .database + .list_creating_background_mvs() + } + + /// Lists table catalogs for all tables with `stream_job_status=CREATING`. + pub async fn list_persisted_creating_tables(&self) -> Vec
{ + self.core + .lock() + .await + .database + .list_persisted_creating_tables() + } + pub async fn get_all_table_options(&self) -> HashMap { self.core.lock().await.database.get_all_table_options() } diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 611de4120a787..e02388eba4f3d 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use risingwave_common::catalog::TableVersionId; use risingwave_common::util::epoch::Epoch; -use risingwave_pb::catalog::{Index, Sink, Source, Table}; +use risingwave_pb::catalog::{CreateType, Index, Sink, Source, Table}; use crate::model::FragmentId; @@ -197,4 +197,13 @@ impl StreamingJob { None } } + + pub fn create_type(&self) -> CreateType { + match self { + Self::MaterializedView(table) => { + table.get_create_type().unwrap_or(CreateType::Foreground) + } + _ => CreateType::Foreground, + } + } } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 10fe5abe8aeaa..726bd7fcd8e73 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::ops::AddAssign; use itertools::Itertools; use risingwave_common::catalog::TableId; @@ -353,9 +354,12 @@ impl TableFragments { } /// Resolve dependent table - fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashSet) { + fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap) { if let Some(NodeBody::Chain(chain)) = stream_node.node_body.as_ref() { - table_ids.insert(TableId::new(chain.table_id)); + table_ids + .entry(TableId::new(chain.table_id)) + .or_default() + .add_assign(1); } for child in &stream_node.input { @@ -363,9 +367,10 @@ impl TableFragments { } } - /// Returns dependent table ids. - pub fn dependent_table_ids(&self) -> HashSet { - let mut table_ids = HashSet::new(); + /// Returns a mapping of dependent table ids of the `TableFragments` + /// to their corresponding count. + pub fn dependent_table_ids(&self) -> HashMap { + let mut table_ids = HashMap::new(); self.fragments.values().for_each(|fragment| { let actor = &fragment.actors[0]; Self::resolve_dependent_table(actor.nodes.as_ref().unwrap(), &mut table_ids); diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 2202e51ca6698..04b9729c5a5b8 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -414,6 +414,11 @@ impl DdlController { fragment_graph: StreamFragmentGraphProto, create_type: CreateType, ) -> MetaResult { + tracing::debug!( + id = stream_job.id(), + definition = stream_job.definition(), + "starting stream job", + ); let _permit = self .creating_streaming_job_permits .semaphore @@ -423,6 +428,8 @@ impl DdlController { let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap()); + + tracing::debug!(id = stream_job.id(), "preparing stream job"); let fragment_graph = self .prepare_stream_job(&mut stream_job, fragment_graph) .await?; @@ -432,6 +439,7 @@ impl DdlController { let mut internal_tables = vec![]; let result = try { + tracing::debug!(id = stream_job.id(), "building stream job"); let (ctx, table_fragments) = self .build_stream_job(env, &stream_job, fragment_graph) .await?; @@ -455,7 +463,7 @@ impl DdlController { let (ctx, table_fragments) = match result { Ok(r) => r, Err(e) => { - self.cancel_stream_job(&stream_job, internal_tables).await; + self.cancel_stream_job(&stream_job, internal_tables).await?; return Err(e); } }; @@ -467,7 +475,7 @@ impl DdlController { } CreateType::Background => { let ctrl = self.clone(); - let definition = stream_job.definition(); + let stream_job_id = stream_job.id(); let fut = async move { let result = ctrl .create_streaming_job_inner( @@ -478,9 +486,11 @@ impl DdlController { ) .await; match result { - Err(e) => tracing::error!(definition, error = ?e, "stream_job_error"), + Err(e) => { + tracing::error!(id=stream_job_id, error = ?e, "finish stream job failed") + } Ok(_) => { - tracing::info!(definition, "stream_job_ok") + tracing::info!(id = stream_job_id, "finish stream job succeeded") } } }; @@ -490,6 +500,7 @@ impl DdlController { } } + // We persist table fragments at this step. async fn create_streaming_job_inner( &self, stream_job: StreamingJob, @@ -497,15 +508,29 @@ impl DdlController { ctx: CreateStreamingJobContext, internal_tables: Vec
, ) -> MetaResult { + let job_id = stream_job.id(); + tracing::debug!(id = job_id, "creating stream job"); let result = self .stream_manager .create_streaming_job(table_fragments, ctx) .await; if let Err(e) = result { - self.cancel_stream_job(&stream_job, internal_tables).await; + match stream_job.create_type() { + // NOTE: This assumes that we will trigger recovery, + // and recover stream job progress. + CreateType::Background => { + tracing::error!(id = stream_job.id(), error = ?e, "finish stream job failed") + } + _ => { + self.cancel_stream_job(&stream_job, internal_tables).await?; + } + } return Err(e); }; - self.finish_stream_job(stream_job, internal_tables).await + tracing::debug!(id = job_id, "finishing stream job"); + let version = self.finish_stream_job(stream_job, internal_tables).await?; + tracing::debug!(id = job_id, "finished stream job"); + Ok(version) } async fn drop_streaming_job( @@ -570,6 +595,8 @@ impl DdlController { StreamFragmentGraph::new(fragment_graph, self.env.id_gen_manager_ref(), stream_job) .await?; + let internal_tables = fragment_graph.internal_tables().into_values().collect_vec(); + // 2. Set the graph-related fields and freeze the `stream_job`. stream_job.set_table_fragment_id(fragment_graph.table_fragment_id()); stream_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); @@ -577,7 +604,7 @@ impl DdlController { // 3. Mark current relation as "creating" and add reference count to dependent relations. self.catalog_manager - .start_create_stream_job_procedure(stream_job) + .start_create_stream_job_procedure(stream_job, internal_tables) .await?; Ok(fragment_graph) @@ -686,6 +713,7 @@ impl DdlController { table_properties: stream_job.properties(), definition: stream_job.definition(), mv_table_id: stream_job.mv_table(), + create_type: stream_job.create_type(), }; // 4. Mark creating tables, including internal tables and the table of the stream job. @@ -702,17 +730,27 @@ impl DdlController { Ok((ctx, table_fragments)) } - /// `cancel_stream_job` cancels a stream job and clean some states. - async fn cancel_stream_job(&self, stream_job: &StreamingJob, internal_tables: Vec
) { + /// This is NOT used by `CANCEL JOBS`. + /// It is used internally by `DdlController` to cancel and cleanup stream job. + async fn cancel_stream_job( + &self, + stream_job: &StreamingJob, + internal_tables: Vec
, + ) -> MetaResult<()> { let mut creating_internal_table_ids = internal_tables.into_iter().map(|t| t.id).collect_vec(); // 1. cancel create procedure. match stream_job { StreamingJob::MaterializedView(table) => { - creating_internal_table_ids.push(table.id); - self.catalog_manager - .cancel_create_table_procedure(table) + // barrier manager will do the cleanup. + let result = self + .catalog_manager + .cancel_create_table_procedure(table.id, creating_internal_table_ids.clone()) .await; + creating_internal_table_ids.push(table.id); + if let Err(e) = result { + tracing::warn!("Failed to cancel create table procedure, perhaps barrier manager has already cleaned it. Reason: {e:#?}"); + } } StreamingJob::Sink(sink) => { self.catalog_manager @@ -720,16 +758,23 @@ impl DdlController { .await; } StreamingJob::Table(source, table) => { - creating_internal_table_ids.push(table.id); if let Some(source) = source { self.catalog_manager .cancel_create_table_procedure_with_source(source, table) .await; } else { - self.catalog_manager - .cancel_create_table_procedure(table) + let result = self + .catalog_manager + .cancel_create_table_procedure( + table.id, + creating_internal_table_ids.clone(), + ) .await; + if let Err(e) = result { + tracing::warn!("Failed to cancel create table procedure, perhaps barrier manager has already cleaned it. Reason: {e:#?}"); + } } + creating_internal_table_ids.push(table.id); } StreamingJob::Index(index, table) => { creating_internal_table_ids.push(table.id); @@ -742,6 +787,7 @@ impl DdlController { self.catalog_manager .unmark_creating_tables(&creating_internal_table_ids, true) .await; + Ok(()) } /// `finish_stream_job` finishes a stream job and clean some states. diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 44198f65635cb..77a784c64ac09 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use futures::future::{join_all, try_join_all, BoxFuture}; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_pb::catalog::Table; +use risingwave_pb::catalog::{CreateType, Table}; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::Dispatcher; use risingwave_pb::stream_service::{ @@ -67,6 +67,8 @@ pub struct CreateStreamingJobContext { pub definition: String, pub mv_table_id: Option, + + pub create_type: CreateType, } impl CreateStreamingJobContext { @@ -112,22 +114,32 @@ impl CreatingStreamingJobInfo { jobs.remove(&job_id); } - async fn cancel_jobs(&self, job_ids: Vec) -> HashMap> { + async fn cancel_jobs( + &self, + job_ids: Vec, + ) -> (HashMap>, Vec) { let mut jobs = self.streaming_jobs.lock().await; let mut receivers = HashMap::new(); + let mut recovered_job_ids = vec![]; for job_id in job_ids { if let Some(job) = jobs.get_mut(&job_id) && let Some(shutdown_tx) = job.shutdown_tx.take() { let (tx, rx) = oneshot::channel(); - if shutdown_tx.send(CreatingState::Canceling{finish_tx: tx}).await.is_ok() { + if shutdown_tx.send(CreatingState::Canceling { finish_tx: tx }).await.is_ok() { receivers.insert(job_id, rx); } else { tracing::warn!("failed to send canceling state"); } + } else { + // If these job ids do not exist in streaming_jobs, + // we can infer they either: + // 1. are entirely non-existent, + // 2. OR they are recovered streaming jobs, and managed by BarrierManager. + recovered_job_ids.push(job_id); } } - receivers + (receivers, recovered_job_ids) } } @@ -407,7 +419,7 @@ impl GlobalStreamManager { definition, mv_table_id, internal_tables, - .. + create_type, }: CreateStreamingJobContext, ) -> MetaResult<()> { // Register to compaction group beforehand. @@ -424,8 +436,10 @@ impl GlobalStreamManager { table_fragments.internal_table_ids().len() + mv_table_id.map_or(0, |_| 1) ); revert_funcs.push(Box::pin(async move { - if let Err(e) = hummock_manager_ref.unregister_table_ids(®istered_table_ids).await { - tracing::warn!("Failed to unregister compaction group for {:#?}. They will be cleaned up on node restart. {:#?}", registered_table_ids, e); + if create_type == CreateType::Foreground { + if let Err(e) = hummock_manager_ref.unregister_table_ids(®istered_table_ids).await { + tracing::warn!("Failed to unregister compaction group for {:#?}. They will be cleaned up on node restart. {:#?}", registered_table_ids, e); + } } })); @@ -452,9 +466,11 @@ impl GlobalStreamManager { }) .await { - self.fragment_manager - .drop_table_fragments_vec(&HashSet::from_iter(std::iter::once(table_id))) - .await?; + if create_type == CreateType::Foreground { + self.fragment_manager + .drop_table_fragments_vec(&HashSet::from_iter(std::iter::once(table_id))) + .await?; + } return Err(err); } @@ -553,13 +569,18 @@ impl GlobalStreamManager { } /// Cancel streaming jobs and return the canceled table ids. + /// 1. Send cancel message to stream jobs (via `cancel_jobs`). + /// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`). + /// + /// Cleanup of their state will be cleaned up after the `CancelStreamJob` command succeeds, + /// by the barrier manager for both of them. pub async fn cancel_streaming_jobs(&self, table_ids: Vec) -> Vec { if table_ids.is_empty() { return vec![]; } let _reschedule_job_lock = self.reschedule_lock.read().await; - let receivers = self.creating_job_info.cancel_jobs(table_ids).await; + let (receivers, recovered_job_ids) = self.creating_job_info.cancel_jobs(table_ids).await; let futures = receivers.into_iter().map(|(id, receiver)| async move { if receiver.await.is_ok() { @@ -570,7 +591,35 @@ impl GlobalStreamManager { None } }); - join_all(futures).await.into_iter().flatten().collect_vec() + let mut cancelled_ids = join_all(futures).await.into_iter().flatten().collect_vec(); + + // NOTE(kwannoel): For recovered stream jobs, we can directly cancel them by running the barrier command, + // since Barrier manager manages the recovered stream jobs. + let futures = recovered_job_ids.into_iter().map(|id| async move { + let result: MetaResult<()> = try { + let fragment = self + .fragment_manager + .select_table_fragments_by_table_id(&id) + .await?; + self.barrier_scheduler + .run_command(Command::CancelStreamingJob(fragment)) + .await?; + }; + match result { + Ok(_) => { + tracing::info!("cancelled recovered streaming job {id}"); + Some(id) + }, + Err(_) => { + tracing::error!("failed to cancel recovered streaming job {id}, does {id} correspond to any jobs in `SHOW JOBS`?"); + None + }, + } + }); + let cancelled_recovered_ids = join_all(futures).await.into_iter().flatten().collect_vec(); + + cancelled_ids.extend(cancelled_recovered_ids); + cancelled_ids } } @@ -896,7 +945,7 @@ mod tests { }; self.catalog_manager - .start_create_table_procedure(&table) + .start_create_table_procedure(&table, vec![]) .await?; self.global_stream_manager .create_streaming_job(table_fragments, ctx) diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs new file mode 100644 index 0000000000000..1fd5c90e59e4b --- /dev/null +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -0,0 +1,96 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use anyhow::Result; +use risingwave_simulation::cluster::{Cluster, Configuration, KillOpts}; +use risingwave_simulation::utils::AssertResult; +use tokio::time::sleep; + +async fn kill_cn_and_wait_recover(cluster: &Cluster) { + // Kill it again + for _ in 0..5 { + cluster + .kill_node(&KillOpts { + kill_rate: 1.0, + kill_meta: false, + kill_frontend: false, + kill_compute: true, + kill_compactor: false, + restart_delay_secs: 1, + }) + .await; + sleep(Duration::from_secs(2)).await; + } + sleep(Duration::from_secs(10)).await; +} + +async fn kill_and_wait_recover(cluster: &Cluster) { + // Kill it again + for _ in 0..5 { + sleep(Duration::from_secs(2)).await; + cluster.kill_node(&KillOpts::ALL).await; + } + sleep(Duration::from_secs(20)).await; +} + +#[tokio::test] +async fn test_background_mv_barrier_recovery() -> Result<()> { + let mut cluster = Cluster::start(Configuration::for_backfill()).await?; + let mut session = cluster.start_session(); + + session.run("CREATE TABLE t1 (v1 int);").await?; + session + .run("INSERT INTO t1 select * from generate_series(1, 400000)") + .await?; + session.run("flush").await?; + session.run("SET BACKGROUND_DDL=true;").await?; + session + .run("create materialized view m1 as select * from t1;") + .await?; + + kill_cn_and_wait_recover(&cluster).await; + + // Send some upstream updates. + cluster + .run("INSERT INTO t1 select * from generate_series(1, 100000);") + .await?; + cluster.run("flush;").await?; + + kill_cn_and_wait_recover(&cluster).await; + + kill_and_wait_recover(&cluster).await; + + // Send some upstream updates. + cluster + .run("INSERT INTO t1 select * from generate_series(1, 100000);") + .await?; + cluster.run("flush;").await?; + + // Now just wait for it to complete. + + sleep(Duration::from_secs(10)).await; + + // Make sure after finished, we should have 5000_000 rows. + session + .run("SELECT COUNT(v1) FROM m1") + .await? + .assert_result_eq("600000"); + + session.run("DROP MATERIALIZED VIEW m1").await?; + session.run("DROP TABLE t1").await?; + + Ok(()) +} diff --git a/src/tests/simulation/tests/integration_tests/recovery/mod.rs b/src/tests/simulation/tests/integration_tests/recovery/mod.rs index 565487e8d7dbd..2430daad760a1 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/mod.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/mod.rs @@ -13,5 +13,6 @@ // limitations under the License. mod backfill; +mod background_ddl; mod nexmark_recovery; mod pause_on_bootstrap;