Skip to content

Commit

Permalink
fix: add mview_definition in scaled actors (#19784)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Dec 16, 2024
1 parent 0b1c0e7 commit 492e2d7
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 9 deletions.
88 changes: 83 additions & 5 deletions src/meta/src/controller/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ use risingwave_connector::source::{SplitImpl, SplitMetaData};
use risingwave_meta_model::actor::ActorStatus;
use risingwave_meta_model::actor_dispatcher::DispatcherType;
use risingwave_meta_model::fragment::DistributionType;
use risingwave_meta_model::prelude::{Actor, ActorDispatcher, Fragment, StreamingJob};
use risingwave_meta_model::prelude::{
Actor, ActorDispatcher, Fragment, Sink, Source, StreamingJob, Table,
};
use risingwave_meta_model::{
actor, actor_dispatcher, fragment, streaming_job, ActorId, ActorMapping, ActorUpstreamActors,
ConnectorSplits, FragmentId, I32Array, ObjectId, VnodeBitmap,
actor, actor_dispatcher, fragment, sink, source, streaming_job, table, ActorId, ActorMapping,
ActorUpstreamActors, ConnectorSplits, FragmentId, I32Array, ObjectId, VnodeBitmap,
};
use risingwave_meta_model_migration::{
Alias, CommonTableExpression, Expr, IntoColumnRef, QueryStatementBuilder, SelectStatement,
Expand Down Expand Up @@ -165,7 +167,7 @@ pub struct RescheduleWorkingSet {
pub fragment_downstreams: HashMap<FragmentId, Vec<(FragmentId, DispatcherType)>>,
pub fragment_upstreams: HashMap<FragmentId, Vec<(FragmentId, DispatcherType)>>,

pub related_jobs: HashMap<ObjectId, streaming_job::Model>,
pub related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
}

async fn resolve_no_shuffle_query<C>(
Expand All @@ -192,6 +194,67 @@ where
Ok(result)
}

async fn resolve_streaming_job_definition<C>(
txn: &C,
job_ids: &HashSet<ObjectId>,
) -> MetaResult<HashMap<ObjectId, String>>
where
C: ConnectionTrait,
{
let job_ids = job_ids.iter().cloned().collect_vec();

// including table, materialized view, index
let common_job_definitions: Vec<(ObjectId, String)> = Table::find()
.select_only()
.columns([
table::Column::TableId,
#[cfg(not(debug_assertions))]
table::Column::Name,
#[cfg(debug_assertions)]
table::Column::Definition,
])
.filter(table::Column::TableId.is_in(job_ids.clone()))
.into_tuple()
.all(txn)
.await?;

let sink_definitions: Vec<(ObjectId, String)> = Sink::find()
.select_only()
.columns([
sink::Column::SinkId,
#[cfg(not(debug_assertions))]
sink::Column::Name,
#[cfg(debug_assertions)]
sink::Column::Definition,
])
.filter(sink::Column::SinkId.is_in(job_ids.clone()))
.into_tuple()
.all(txn)
.await?;

let source_definitions: Vec<(ObjectId, String)> = Source::find()
.select_only()
.columns([
source::Column::SourceId,
#[cfg(not(debug_assertions))]
source::Column::Name,
#[cfg(debug_assertions)]
source::Column::Definition,
])
.filter(source::Column::SourceId.is_in(job_ids.clone()))
.into_tuple()
.all(txn)
.await?;

let definitions: HashMap<ObjectId, String> = common_job_definitions
.into_iter()
.chain(sink_definitions.into_iter())
.chain(source_definitions.into_iter())
.collect();

Ok(definitions)
}

impl CatalogController {
pub async fn resolve_working_set_for_reschedule_fragments(
&self,
Expand Down Expand Up @@ -339,14 +402,29 @@ impl CatalogController {
let related_job_ids: HashSet<_> =
fragments.values().map(|fragment| fragment.job_id).collect();

let related_job_definitions =
resolve_streaming_job_definition(txn, &related_job_ids).await?;

let related_jobs = StreamingJob::find()
.filter(streaming_job::Column::JobId.is_in(related_job_ids))
.all(txn)
.await?;

let related_jobs = related_jobs
.into_iter()
.map(|job| (job.job_id, job))
.map(|job| {
let job_id = job.job_id;
(
job_id,
(
job,
related_job_definitions
.get(&job_id)
.cloned()
.unwrap_or("".to_owned()),
),
)
})
.collect();

Ok(RescheduleWorkingSet {
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,9 @@ impl ScaleController {
vnode_bitmap,
} = actors.first().unwrap().clone();

let (related_job, job_definition) =
related_jobs.get(&job_id).expect("job not found");

let fragment = CustomFragmentInfo {
fragment_id: fragment_id as _,
fragment_type_mask: fragment_type_mask as _,
Expand All @@ -603,8 +606,7 @@ impl ScaleController {
dispatcher,
upstream_actor_id,
vnode_bitmap: vnode_bitmap.map(|b| b.to_protobuf()),
// todo, we need to fill this part
mview_definition: "".to_owned(),
mview_definition: job_definition.to_owned(),
expr_context: expr_contexts
.get(&actor_id)
.cloned()
Expand All @@ -617,8 +619,6 @@ impl ScaleController {

fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32));

let related_job = related_jobs.get(&job_id).expect("job not found");

fragment_state.insert(
fragment_id,
table_fragments::PbState::from(related_job.job_status),
Expand Down

0 comments on commit 492e2d7

Please sign in to comment.