Skip to content

Commit

Permalink
feat: add rw_internal_table_info to identity which streaming job the …
Browse files Browse the repository at this point in the history
…internal table belongs (#19642)
  • Loading branch information
yezizp2012 authored Dec 2, 2024
1 parent 3960994 commit 845ed68
Show file tree
Hide file tree
Showing 14 changed files with 77 additions and 2 deletions.
3 changes: 3 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,9 @@ message Table {
// The information used by webhook source to validate the incoming data.
optional WebhookSourceInfo webhook_info = 41;

// This field stores the job ID for internal tables.
optional uint32 job_id = 42;

// Per-table catalog version, used by schema change. `None` for internal
// tables and tests. Not to be confused with the global catalog version for
// notification service.
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ mod rw_worker_nodes;
mod rw_actor_id_to_ddl;
mod rw_actor_splits;
mod rw_fragment_id_to_ddl;
mod rw_internal_table_info;
mod rw_worker_actor_count;
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;

/// Provides a mapping from `actor_id` to its ddl info.
/// Provides a mapping from `fragment_id` to its ddl info.
#[system_catalog(
view,
"rw_catalog.rw_fragment_id_to_ddl",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;

#[system_catalog(
view,
"rw_catalog.rw_internal_table_info",
"WITH all_streaming_jobs AS (
SELECT id, name, 'table' as job_type, schema_id, owner FROM rw_tables
UNION ALL
SELECT id, name, 'materialized view' as job_type, schema_id, owner FROM rw_materialized_views
UNION ALL
SELECT id, name, 'sink' as job_type, schema_id, owner FROM rw_sinks
UNION ALL
SELECT id, name, 'index' as job_type, schema_id, owner FROM rw_indexes
UNION ALL
SELECT id, name, 'source' as job_type, schema_id, owner FROM rw_sources WHERE is_shared = true
)
SELECT i.id,
i.name,
j.id as job_id,
j.name as job_name,
j.job_type,
s.name as schema_name,
u.name as owner
FROM rw_catalog.rw_internal_tables i
JOIN all_streaming_jobs j ON i.job_id = j.id
JOIN rw_catalog.rw_schemas s ON j.schema_id = s.id
JOIN rw_catalog.rw_users u ON j.owner = u.id"
)]
#[derive(Fields)]
struct RwInternalTableInfo {
#[primary_key]
id: i32,
name: String,
job_id: i32,
job_name: String,
job_type: String,
schema_name: String,
owner: String,
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ struct RwInternalTable {
id: i32,
name: String,
schema_id: i32,
job_id: i32,
owner: i32,
definition: String,
acl: Vec<String>,
Expand All @@ -48,6 +49,7 @@ fn read_rw_internal_tables(reader: &SysCatalogReaderImpl) -> Result<Vec<RwIntern
id: table.id.table_id as i32,
name: table.name().into(),
schema_id: schema.id() as i32,
job_id: table.job_id.unwrap().table_id as i32,
owner: table.owner as i32,
definition: table.create_sql(),
acl: get_acl_items(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use risingwave_frontend_macro::system_catalog;
SELECT id, name, 'sink' as relation_type FROM rw_sinks
UNION ALL
SELECT id, name, 'index' as relation_type FROM rw_indexes
UNION ALL
SELECT id, name, 'source' as relation_type FROM rw_sources WHERE is_shared = true
)
SELECT
job.id,
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ pub struct TableCatalog {
pub vnode_count: VnodeCount,

pub webhook_info: Option<PbWebhookSourceInfo>,

pub job_id: Option<TableId>,
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -467,6 +469,7 @@ impl TableCatalog {
cdc_table_id: self.cdc_table_id.clone(),
maybe_vnode_count: self.vnode_count.to_protobuf(),
webhook_info: self.webhook_info.clone(),
job_id: self.job_id.map(|id| id.table_id),
}
}

Expand Down Expand Up @@ -660,6 +663,7 @@ impl From<PbTable> for TableCatalog {
cdc_table_id: tb.cdc_table_id,
vnode_count,
webhook_info: tb.webhook_info,
job_id: tb.job_id.map(TableId::from),
}
}
}
Expand Down Expand Up @@ -752,6 +756,7 @@ mod tests {
cdc_table_id: None,
maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
webhook_info: None,
job_id: None,
}
.into();

Expand Down Expand Up @@ -820,6 +825,7 @@ mod tests {
cdc_table_id: None,
vnode_count: VnodeCount::set(233),
webhook_info: None,
job_id: None,
}
);
assert_eq!(table, TableCatalog::from(table.to_prost(0, 0)));
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ impl StreamMaterialize {
cdc_table_id: None,
vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later
webhook_info,
job_id: None,
})
}

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ impl TableCatalogBuilder {
cdc_table_id: None,
vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later
webhook_info: None,
job_id: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ pub(crate) mod tests {
cdc_table_id: None,
vnode_count: VnodeCount::set(vnode_count),
webhook_info: None,
job_id: None,
};
let batch_plan_node: PlanRef = LogicalScan::create(
"".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/meta/model/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl From<PbTable> for ActiveModel {
name: Set(pb_table.name),
optional_associated_source_id,
table_type: Set(table_type.into()),
belongs_to_job_id: Set(None),
belongs_to_job_id: Set(pb_table.job_id.map(|x| x as _)),
columns: Set(pb_table.columns.into()),
pk: Set(pb_table.pk.into()),
distribution_key: Set(pb_table.distribution_key.into()),
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ impl From<ObjectModel<table::Model>> for PbTable {
cdc_table_id: value.0.cdc_table_id,
maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(),
webhook_info: value.0.webhook_info.map(|info| info.to_protobuf()),
job_id: value.0.belongs_to_job_id.map(|id| id as _),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ impl CatalogController {
.oid;
table_id_map.insert(table.id, table_id as u32);
table.id = table_id as _;
table.job_id = Some(job_id as _);

let table_model = table::ActiveModel {
table_id: Set(table_id as _),
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/compaction_catalog_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ mod tests {
cdc_table_id: None,
maybe_vnode_count: None,
webhook_info: None,
job_id: None,
}
}

Expand Down

0 comments on commit 845ed68

Please sign in to comment.