From 492e2d7d03ccfb530692649503a3e74f8624844c Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 16 Dec 2024 16:47:13 +0800 Subject: [PATCH] fix: add mview_definition in scaled actors (#19784) Signed-off-by: Shanicky Chen --- src/meta/src/controller/scale.rs | 88 ++++++++++++++++++++++++++++++-- src/meta/src/stream/scale.rs | 8 +-- 2 files changed, 87 insertions(+), 9 deletions(-) diff --git a/src/meta/src/controller/scale.rs b/src/meta/src/controller/scale.rs index 65dd58ff1d34e..84c743e437d17 100644 --- a/src/meta/src/controller/scale.rs +++ b/src/meta/src/controller/scale.rs @@ -22,10 +22,12 @@ use risingwave_connector::source::{SplitImpl, SplitMetaData}; use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::actor_dispatcher::DispatcherType; use risingwave_meta_model::fragment::DistributionType; -use risingwave_meta_model::prelude::{Actor, ActorDispatcher, Fragment, StreamingJob}; +use risingwave_meta_model::prelude::{ + Actor, ActorDispatcher, Fragment, Sink, Source, StreamingJob, Table, +}; use risingwave_meta_model::{ - actor, actor_dispatcher, fragment, streaming_job, ActorId, ActorMapping, ActorUpstreamActors, - ConnectorSplits, FragmentId, I32Array, ObjectId, VnodeBitmap, + actor, actor_dispatcher, fragment, sink, source, streaming_job, table, ActorId, ActorMapping, + ActorUpstreamActors, ConnectorSplits, FragmentId, I32Array, ObjectId, VnodeBitmap, }; use risingwave_meta_model_migration::{ Alias, CommonTableExpression, Expr, IntoColumnRef, QueryStatementBuilder, SelectStatement, @@ -165,7 +167,7 @@ pub struct RescheduleWorkingSet { pub fragment_downstreams: HashMap>, pub fragment_upstreams: HashMap>, - pub related_jobs: HashMap, + pub related_jobs: HashMap, } async fn resolve_no_shuffle_query( @@ -192,6 +194,67 @@ where Ok(result) } +async fn resolve_streaming_job_definition( + txn: &C, + job_ids: &HashSet, +) -> MetaResult> +where + C: ConnectionTrait, +{ + let job_ids = job_ids.iter().cloned().collect_vec(); + + // including table, materialized view, index + let common_job_definitions: Vec<(ObjectId, String)> = Table::find() + .select_only() + .columns([ + table::Column::TableId, + #[cfg(not(debug_assertions))] + table::Column::Name, + #[cfg(debug_assertions)] + table::Column::Definition, + ]) + .filter(table::Column::TableId.is_in(job_ids.clone())) + .into_tuple() + .all(txn) + .await?; + + let sink_definitions: Vec<(ObjectId, String)> = Sink::find() + .select_only() + .columns([ + sink::Column::SinkId, + #[cfg(not(debug_assertions))] + sink::Column::Name, + #[cfg(debug_assertions)] + sink::Column::Definition, + ]) + .filter(sink::Column::SinkId.is_in(job_ids.clone())) + .into_tuple() + .all(txn) + .await?; + + let source_definitions: Vec<(ObjectId, String)> = Source::find() + .select_only() + .columns([ + source::Column::SourceId, + #[cfg(not(debug_assertions))] + source::Column::Name, + #[cfg(debug_assertions)] + source::Column::Definition, + ]) + .filter(source::Column::SourceId.is_in(job_ids.clone())) + .into_tuple() + .all(txn) + .await?; + + let definitions: HashMap = common_job_definitions + .into_iter() + .chain(sink_definitions.into_iter()) + .chain(source_definitions.into_iter()) + .collect(); + + Ok(definitions) +} + impl CatalogController { pub async fn resolve_working_set_for_reschedule_fragments( &self, @@ -339,6 +402,9 @@ impl CatalogController { let related_job_ids: HashSet<_> = fragments.values().map(|fragment| fragment.job_id).collect(); + let related_job_definitions = + resolve_streaming_job_definition(txn, &related_job_ids).await?; + let related_jobs = StreamingJob::find() .filter(streaming_job::Column::JobId.is_in(related_job_ids)) .all(txn) @@ -346,7 +412,19 @@ impl CatalogController { let related_jobs = related_jobs .into_iter() - .map(|job| (job.job_id, job)) + .map(|job| { + let job_id = job.job_id; + ( + job_id, + ( + job, + related_job_definitions + .get(&job_id) + .cloned() + .unwrap_or("".to_owned()), + ), + ) + }) .collect(); Ok(RescheduleWorkingSet { diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 2da8c0fae3e1b..578ee101d0f27 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -590,6 +590,9 @@ impl ScaleController { vnode_bitmap, } = actors.first().unwrap().clone(); + let (related_job, job_definition) = + related_jobs.get(&job_id).expect("job not found"); + let fragment = CustomFragmentInfo { fragment_id: fragment_id as _, fragment_type_mask: fragment_type_mask as _, @@ -603,8 +606,7 @@ impl ScaleController { dispatcher, upstream_actor_id, vnode_bitmap: vnode_bitmap.map(|b| b.to_protobuf()), - // todo, we need to fill this part - mview_definition: "".to_owned(), + mview_definition: job_definition.to_owned(), expr_context: expr_contexts .get(&actor_id) .cloned() @@ -617,8 +619,6 @@ impl ScaleController { fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32)); - let related_job = related_jobs.get(&job_id).expect("job not found"); - fragment_state.insert( fragment_id, table_fragments::PbState::from(related_job.job_status),