diff --git a/src/meta/src/controller/scale.rs b/src/meta/src/controller/scale.rs index 13fa188759cba..a14ad3bc67377 100644 --- a/src/meta/src/controller/scale.rs +++ b/src/meta/src/controller/scale.rs @@ -166,7 +166,7 @@ pub struct RescheduleWorkingSet { pub fragment_downstreams: HashMap>, pub fragment_upstreams: HashMap>, - pub related_jobs: HashMap, + pub related_jobs: HashMap, } async fn resolve_no_shuffle_query( @@ -193,6 +193,67 @@ where Ok(result) } +async fn resolve_streaming_job_definition( + txn: &C, + job_ids: &HashSet, +) -> MetaResult> +where + C: ConnectionTrait, +{ + let job_ids = job_ids.iter().cloned().collect_vec(); + + // including table, materialized view, index + let common_job_definitions: Vec<(ObjectId, String)> = Table::find() + .select_only() + .columns([ + table::Column::TableId, + #[cfg(not(debug_assertions))] + table::Column::Name, + #[cfg(debug_assertions)] + table::Column::Definition, + ]) + .filter(table::Column::TableId.is_in(job_ids.clone())) + .into_tuple() + .all(txn) + .await?; + + let sink_definitions: Vec<(ObjectId, String)> = Sink::find() + .select_only() + .columns([ + sink::Column::SinkId, + #[cfg(not(debug_assertions))] + sink::Column::Name, + #[cfg(debug_assertions)] + sink::Column::Definition, + ]) + .filter(sink::Column::SinkId.is_in(job_ids.clone())) + .into_tuple() + .all(txn) + .await?; + + let source_definitions: Vec<(ObjectId, String)> = Source::find() + .select_only() + .columns([ + source::Column::SourceId, + #[cfg(not(debug_assertions))] + source::Column::Name, + #[cfg(debug_assertions)] + source::Column::Definition, + ]) + .filter(source::Column::SourceId.is_in(job_ids.clone())) + .into_tuple() + .all(txn) + .await?; + + let definitions: HashMap = common_job_definitions + .into_iter() + .chain(sink_definitions.into_iter()) + .chain(source_definitions.into_iter()) + .collect(); + + Ok(definitions) +} + impl CatalogController { pub async fn resolve_working_set_for_reschedule_fragments( &self, @@ -340,6 +401,9 @@ impl CatalogController { let related_job_ids: HashSet<_> = fragments.values().map(|fragment| fragment.job_id).collect(); + let related_job_definitions = + resolve_streaming_job_definition(txn, &related_job_ids).await?; + let related_jobs = StreamingJob::find() .filter(streaming_job::Column::JobId.is_in(related_job_ids)) .all(txn) @@ -347,7 +411,19 @@ impl CatalogController { let related_jobs = related_jobs .into_iter() - .map(|job| (job.job_id, job)) + .map(|job| { + let job_id = job.job_id; + ( + job_id, + ( + job, + related_job_definitions + .get(&job_id) + .cloned() + .unwrap_or("".to_owned()), + ), + ) + }) .collect(); Ok(RescheduleWorkingSet { diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 9982a2f5a4781..9e5caa45cd8c0 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -649,6 +649,9 @@ impl ScaleController { vnode_bitmap, } = actors.first().unwrap().clone(); + let (related_job, job_definition) = + related_jobs.get(&job_id).expect("job not found"); + let fragment = CustomFragmentInfo { fragment_id: fragment_id as _, fragment_type_mask: fragment_type_mask as _, @@ -662,8 +665,7 @@ impl ScaleController { dispatcher, upstream_actor_id, vnode_bitmap, - // todo, we need to fill this part - mview_definition: "".to_string(), + mview_definition: job_definition.to_owned(), expr_context: expr_contexts .get(&actor_id) .cloned() @@ -676,8 +678,6 @@ impl ScaleController { fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32)); - let related_job = related_jobs.get(&job_id).expect("job not found"); - fragment_state.insert( fragment_id, table_fragments::PbState::from(related_job.job_status),