From 903daed19c544a1f910b0a7789cc80b260419d34 Mon Sep 17 00:00:00 2001 From: August Date: Mon, 23 Oct 2023 16:57:33 +0800 Subject: [PATCH] feat: support alter relation name in catalog controller (#12977) --- src/common/src/util/epoch.rs | 4 + src/meta/src/controller/catalog.rs | 161 ++++++- src/meta/src/controller/mod.rs | 182 +++++++- src/meta/src/controller/rename.rs | 430 ++++++++++++++++++ src/meta/src/controller/utils.rs | 25 +- src/meta/src/manager/catalog/utils.rs | 2 +- src/meta/src/model_v2/connection.rs | 12 +- src/meta/src/model_v2/index.rs | 7 +- .../migration/src/m20230908_072257_init.rs | 50 +- src/meta/src/model_v2/mod.rs | 79 +++- src/meta/src/model_v2/sink.rs | 28 +- src/meta/src/model_v2/source.rs | 20 +- src/meta/src/model_v2/table.rs | 57 ++- src/meta/src/model_v2/view.rs | 4 +- 14 files changed, 969 insertions(+), 92 deletions(-) create mode 100644 src/meta/src/controller/rename.rs diff --git a/src/common/src/util/epoch.rs b/src/common/src/util/epoch.rs index 86ed158c2e206..4d57c97b054b3 100644 --- a/src/common/src/util/epoch.rs +++ b/src/common/src/util/epoch.rs @@ -73,6 +73,10 @@ impl Epoch { Epoch(time << EPOCH_PHYSICAL_SHIFT_BITS) } + pub fn from_unix_millis(mi: u64) -> Self { + Epoch((mi - UNIX_RISINGWAVE_DATE_SEC * 1000) << EPOCH_PHYSICAL_SHIFT_BITS) + } + pub fn physical_now() -> u64 { UNIX_RISINGWAVE_DATE_EPOCH .elapsed() diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index c0cfcf3baba59..cb37307384aa2 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -15,11 +15,12 @@ use std::iter; use itertools::Itertools; +use risingwave_common::bail; use risingwave_common::catalog::{DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS}; use risingwave_pb::catalog::{ PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, }; -use risingwave_pb::meta::relation::{PbRelationInfo, RelationInfo}; +use risingwave_pb::meta::relation::PbRelationInfo; use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Operation as NotificationOperation, }; @@ -30,19 +31,21 @@ use sea_orm::{ }; use tokio::sync::RwLock; +use crate::controller::rename::{alter_relation_rename, alter_relation_rename_refs}; use crate::controller::utils::{ check_connection_name_duplicate, check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate, ensure_object_id, - ensure_object_not_refer, ensure_schema_empty, ensure_user_id, list_used_by, PartialObject, + ensure_object_not_refer, ensure_schema_empty, ensure_user_id, get_referring_objects, + get_referring_objects_cascade, PartialObject, }; use crate::controller::ObjectModel; use crate::manager::{MetaSrvEnv, NotificationVersion}; -use crate::model_v2::connection::PrivateLinkService; use crate::model_v2::object::ObjectType; use crate::model_v2::prelude::*; use crate::model_v2::{ - connection, database, function, index, object, object_dependency, schema, table, view, - ConnectionId, DatabaseId, FunctionId, ObjectId, SchemaId, SourceId, TableId, UserId, + connection, database, function, index, object, object_dependency, schema, sink, source, table, + view, ConnectionId, DatabaseId, FunctionId, ObjectId, PrivateLinkService, SchemaId, SourceId, + TableId, UserId, }; use crate::rpc::ddl_controller::DropMode; use crate::{MetaError, MetaResult}; @@ -503,7 +506,7 @@ impl CatalogController { assert_eq!(obj.obj_type, object_type); let mut to_drop_objects = match drop_mode { - DropMode::Cascade => list_used_by(object_id, &txn).await?, + DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?, DropMode::Restrict => { ensure_object_not_refer(object_type, object_id, &txn).await?; vec![] @@ -589,7 +592,7 @@ impl CatalogController { .into_iter() .map(|obj| match obj.obj_type { ObjectType::Table => PbRelation { - relation_info: Some(RelationInfo::Table(PbTable { + relation_info: Some(PbRelationInfo::Table(PbTable { id: obj.oid, schema_id: obj.schema_id.unwrap(), database_id: obj.database_id.unwrap(), @@ -597,7 +600,7 @@ impl CatalogController { })), }, ObjectType::Source => PbRelation { - relation_info: Some(RelationInfo::Source(PbSource { + relation_info: Some(PbRelationInfo::Source(PbSource { id: obj.oid, schema_id: obj.schema_id.unwrap(), database_id: obj.database_id.unwrap(), @@ -605,7 +608,7 @@ impl CatalogController { })), }, ObjectType::Sink => PbRelation { - relation_info: Some(RelationInfo::Sink(PbSink { + relation_info: Some(PbRelationInfo::Sink(PbSink { id: obj.oid, schema_id: obj.schema_id.unwrap(), database_id: obj.database_id.unwrap(), @@ -613,7 +616,7 @@ impl CatalogController { })), }, ObjectType::View => PbRelation { - relation_info: Some(RelationInfo::View(PbView { + relation_info: Some(PbRelationInfo::View(PbView { id: obj.oid, schema_id: obj.schema_id.unwrap(), database_id: obj.database_id.unwrap(), @@ -621,7 +624,7 @@ impl CatalogController { })), }, ObjectType::Index => PbRelation { - relation_info: Some(RelationInfo::Index(PbIndex { + relation_info: Some(PbRelationInfo::Index(PbIndex { id: obj.oid, schema_id: obj.schema_id.unwrap(), database_id: obj.database_id.unwrap(), @@ -647,6 +650,142 @@ impl CatalogController { version, )) } + + pub async fn alter_relation_name( + &self, + object_type: ObjectType, + object_id: ObjectId, + object_name: &str, + ) -> MetaResult { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + let obj: PartialObject = Object::find_by_id(object_id) + .into_partial_model() + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?; + assert_eq!(obj.obj_type, object_type); + check_relation_name_duplicate( + object_name, + obj.database_id.unwrap(), + obj.schema_id.unwrap(), + &txn, + ) + .await?; + + let mut to_update_relations = vec![]; + // rename relation. + macro_rules! rename_relation { + ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{ + let (mut relation, obj) = $entity::find_by_id($object_id) + .find_also_related(Object) + .one(&txn) + .await? + .unwrap(); + let old_name = relation.name.clone(); + relation.name = object_name.into(); + relation.definition = alter_relation_rename(&relation.definition, object_name); + let active_model = $table::ActiveModel { + $identity: ActiveValue::Set(relation.$identity), + name: ActiveValue::Set(object_name.into()), + definition: ActiveValue::Set(relation.definition.clone()), + ..Default::default() + }; + active_model.update(&txn).await?; + to_update_relations.push(PbRelation { + relation_info: Some(PbRelationInfo::$entity( + ObjectModel(relation, obj.unwrap()).into(), + )), + }); + old_name + }}; + } + + let old_name = match object_type { + ObjectType::Table => rename_relation!(Table, table, table_id, object_id), + ObjectType::Source => rename_relation!(Source, source, source_id, object_id), + ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id), + ObjectType::View => rename_relation!(View, view, view_id, object_id), + ObjectType::Index => { + let (mut index, obj) = Index::find_by_id(object_id) + .find_also_related(Object) + .one(&txn) + .await? + .unwrap(); + index.name = object_name.into(); + let index_table_id = index.index_table_id; + + // the name of index and its associated table is the same. + let active_model = index::ActiveModel { + index_id: ActiveValue::Set(index.index_id), + name: ActiveValue::Set(object_name.into()), + ..Default::default() + }; + active_model.update(&txn).await?; + to_update_relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Index( + ObjectModel(index, obj.unwrap()).into(), + )), + }); + rename_relation!(Table, table, table_id, index_table_id) + } + _ => unreachable!("only relation name can be altered."), + }; + + // rename referring relation name. + macro_rules! rename_relation_ref { + ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{ + let (mut relation, obj) = $entity::find_by_id($object_id) + .find_also_related(Object) + .one(&txn) + .await? + .unwrap(); + relation.definition = + alter_relation_rename_refs(&relation.definition, &old_name, object_name); + let active_model = $table::ActiveModel { + $identity: ActiveValue::Set(relation.$identity), + definition: ActiveValue::Set(relation.definition.clone()), + ..Default::default() + }; + active_model.update(&txn).await?; + to_update_relations.push(PbRelation { + relation_info: Some(PbRelationInfo::$entity( + ObjectModel(relation, obj.unwrap()).into(), + )), + }); + }}; + } + let objs = get_referring_objects(object_id, &txn).await?; + for obj in objs { + match obj.obj_type { + ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid), + ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid), + ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid), + ObjectType::Index => { + let index_table_id: Option = Index::find_by_id(obj.oid) + .select_only() + .column(index::Column::IndexTableId) + .into_tuple() + .one(&txn) + .await?; + rename_relation_ref!(Table, table, table_id, index_table_id.unwrap()); + } + _ => bail!("only table, sink, view and index depend on other objects."), + } + } + txn.commit().await?; + + let version = self + .notify_frontend( + NotificationOperation::Update, + NotificationInfo::RelationGroup(PbRelationGroup { + relations: to_update_relations, + }), + ) + .await; + + Ok(version) + } } #[cfg(test)] diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 5b0ff4ab99bef..07793e30a17fe 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -13,16 +13,23 @@ // limitations under the License. use anyhow::anyhow; +use risingwave_common::util::epoch::Epoch; use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo; -use risingwave_pb::catalog::{PbConnection, PbDatabase, PbSchema}; +use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; +use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableType}; +use risingwave_pb::catalog::{ + PbConnection, PbCreateType, PbDatabase, PbHandleConflictBehavior, PbIndex, PbSchema, PbSink, + PbSinkType, PbSource, PbStreamJobStatus, PbTable, PbView, +}; use sea_orm::{ActiveValue, DatabaseConnection, ModelTrait}; -use crate::model_v2::{connection, database, object, schema}; +use crate::model_v2::{connection, database, index, object, schema, sink, source, table, view}; use crate::MetaError; #[allow(dead_code)] pub mod catalog; pub mod cluster; +pub mod rename; pub mod system_param; pub mod utils; @@ -61,9 +68,9 @@ pub struct ObjectModel(M, object::Model); impl From> for PbDatabase { fn from(value: ObjectModel) -> Self { Self { - id: value.0.database_id as _, + id: value.0.database_id, name: value.0.name, - owner: value.1.owner_id as _, + owner: value.1.owner_id, } } } @@ -71,7 +78,7 @@ impl From> for PbDatabase { impl From for database::ActiveModel { fn from(db: PbDatabase) -> Self { Self { - database_id: ActiveValue::Set(db.id as _), + database_id: ActiveValue::Set(db.id), name: ActiveValue::Set(db.name), } } @@ -80,7 +87,7 @@ impl From for database::ActiveModel { impl From for schema::ActiveModel { fn from(schema: PbSchema) -> Self { Self { - schema_id: ActiveValue::Set(schema.id as _), + schema_id: ActiveValue::Set(schema.id), name: ActiveValue::Set(schema.name), } } @@ -89,10 +96,159 @@ impl From for schema::ActiveModel { impl From> for PbSchema { fn from(value: ObjectModel) -> Self { Self { - id: value.0.schema_id as _, + id: value.0.schema_id, name: value.0.name, - database_id: value.1.database_id.unwrap() as _, - owner: value.1.owner_id as _, + database_id: value.1.database_id.unwrap(), + owner: value.1.owner_id, + } + } +} + +impl From> for PbTable { + fn from(value: ObjectModel) -> Self { + Self { + id: value.0.table_id, + schema_id: value.1.schema_id.unwrap(), + database_id: value.1.database_id.unwrap(), + name: value.0.name, + columns: value.0.columns.0, + pk: value.0.pk.0, + dependent_relations: vec![], // todo: deprecate it. + table_type: PbTableType::from(value.0.table_type) as _, + distribution_key: value.0.distribution_key.0, + stream_key: value.0.stream_key.0, + append_only: value.0.append_only, + owner: value.1.owner_id, + properties: value.0.properties.0, + fragment_id: value.0.fragment_id as u32, + vnode_col_index: value.0.vnode_col_index, + row_id_index: value.0.row_id_index, + value_indices: value.0.value_indices.0, + definition: value.0.definition, + handle_pk_conflict_behavior: PbHandleConflictBehavior::from( + value.0.handle_pk_conflict_behavior, + ) as _, + read_prefix_len_hint: value.0.read_prefix_len_hint, + watermark_indices: value.0.watermark_indices.0, + dist_key_in_pk: value.0.dist_key_in_pk.0, + dml_fragment_id: value.0.dml_fragment_id.map(|id| id as u32), + cardinality: value.0.cardinality.map(|cardinality| cardinality.0), + initialized_at_epoch: Some( + Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0, + ), + created_at_epoch: Some( + Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0, + ), + cleaned_by_watermark: value.0.cleaned_by_watermark, + stream_job_status: PbStreamJobStatus::from(value.0.job_status) as _, + create_type: PbCreateType::from(value.0.create_type) as _, + version: Some(value.0.version.0), + optional_associated_source_id: value + .0 + .optional_associated_source_id + .map(PbOptionalAssociatedSourceId::AssociatedSourceId), + } + } +} + +impl From> for PbSource { + fn from(value: ObjectModel) -> Self { + Self { + id: value.0.source_id, + schema_id: value.1.schema_id.unwrap(), + database_id: value.1.database_id.unwrap(), + name: value.0.name, + row_id_index: value.0.row_id_index, + columns: value.0.columns.0, + pk_column_ids: value.0.pk_column_ids.0, + properties: value.0.properties.0, + owner: value.1.owner_id, + info: value.0.source_info.map(|info| info.0), + watermark_descs: value.0.watermark_descs.0, + definition: value.0.definition, + connection_id: value.0.connection_id, + // todo: using the timestamp from the database directly. + initialized_at_epoch: Some( + Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0, + ), + created_at_epoch: Some( + Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0, + ), + version: value.0.version, + optional_associated_table_id: value + .0 + .optional_associated_table_id + .map(PbOptionalAssociatedTableId::AssociatedTableId), + } + } +} + +impl From> for PbSink { + fn from(value: ObjectModel) -> Self { + Self { + id: value.0.sink_id, + schema_id: value.1.schema_id.unwrap(), + database_id: value.1.database_id.unwrap(), + name: value.0.name, + columns: value.0.columns.0, + plan_pk: value.0.plan_pk.0, + dependent_relations: vec![], // todo: deprecate it. + distribution_key: value.0.distribution_key.0, + downstream_pk: value.0.downstream_pk.0, + sink_type: PbSinkType::from(value.0.sink_type) as _, + owner: value.1.owner_id, + properties: value.0.properties.0, + definition: value.0.definition, + connection_id: value.0.connection_id, + initialized_at_epoch: Some( + Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0, + ), + created_at_epoch: Some( + Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0, + ), + db_name: value.0.db_name, + sink_from_name: value.0.sink_from_name, + stream_job_status: PbStreamJobStatus::from(value.0.job_status) as _, + format_desc: value.0.sink_format_desc.map(|desc| desc.0), + } + } +} + +impl From> for PbIndex { + fn from(value: ObjectModel) -> Self { + Self { + id: value.0.index_id, + schema_id: value.1.schema_id.unwrap(), + database_id: value.1.database_id.unwrap(), + name: value.0.name, + owner: value.1.owner_id, + index_table_id: value.0.index_table_id, + primary_table_id: value.0.primary_table_id, + index_item: value.0.index_items.0, + original_columns: value.0.original_columns.0, + initialized_at_epoch: Some( + Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0, + ), + created_at_epoch: Some( + Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0, + ), + stream_job_status: PbStreamJobStatus::from(value.0.job_status) as _, + } + } +} + +impl From> for PbView { + fn from(value: ObjectModel) -> Self { + Self { + id: value.0.view_id, + schema_id: value.1.schema_id.unwrap(), + database_id: value.1.database_id.unwrap(), + name: value.0.name, + owner: value.1.owner_id, + properties: value.0.properties.0, + sql: value.0.definition, + dependent_relations: vec![], // todo: deprecate it. + columns: value.0.columns.0, } } } @@ -100,11 +256,11 @@ impl From> for PbSchema { impl From> for PbConnection { fn from(value: ObjectModel) -> Self { Self { - id: value.1.oid as _, - schema_id: value.1.schema_id.unwrap() as _, - database_id: value.1.database_id.unwrap() as _, + id: value.1.oid, + schema_id: value.1.schema_id.unwrap(), + database_id: value.1.database_id.unwrap(), name: value.0.name, - owner: value.1.owner_id as _, + owner: value.1.owner_id, info: Some(PbConnectionInfo::PrivateLinkService(value.0.info.0)), } } diff --git a/src/meta/src/controller/rename.rs b/src/meta/src/controller/rename.rs new file mode 100644 index 0000000000000..254565efb391c --- /dev/null +++ b/src/meta/src/controller/rename.rs @@ -0,0 +1,430 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_common::util::column_index_mapping::ColIndexMapping; +use risingwave_pb::expr::expr_node::RexNode; +use risingwave_pb::expr::{ExprNode, FunctionCall, UserDefinedFunction}; +use risingwave_sqlparser::ast::{ + Array, CreateSink, CreateSinkStatement, CreateSourceStatement, Distinct, Expr, Function, + FunctionArg, FunctionArgExpr, Ident, ObjectName, Query, SelectItem, SetExpr, Statement, + TableAlias, TableFactor, TableWithJoins, +}; +use risingwave_sqlparser::parser::Parser; + +/// `alter_relation_rename` renames a relation to a new name in its `Create` statement, and returns +/// the updated definition raw sql. Note that the `definition` must be a `Create` statement and the +/// `new_name` must be a valid identifier, it should be validated before calling this function. To +/// update all relations that depend on the renamed one, use `alter_relation_rename_refs`. +pub fn alter_relation_rename(definition: &str, new_name: &str) -> String { + // This happens when we try to rename a table that's created by `CREATE TABLE AS`. Remove it + // when we support `SHOW CREATE TABLE` for `CREATE TABLE AS`. + if definition.is_empty() { + tracing::warn!("found empty definition when renaming relation, ignored."); + return definition.into(); + } + let ast = Parser::parse_sql(definition).expect("failed to parse relation definition"); + let mut stmt = ast + .into_iter() + .exactly_one() + .expect("should contains only one statement"); + + match &mut stmt { + Statement::CreateTable { name, .. } + | Statement::CreateView { name, .. } + | Statement::CreateIndex { name, .. } + | Statement::CreateSource { + stmt: CreateSourceStatement { + source_name: name, .. + }, + } + | Statement::CreateSink { + stmt: CreateSinkStatement { + sink_name: name, .. + }, + } => replace_table_name(name, new_name), + _ => unreachable!(), + }; + + stmt.to_string() +} + +/// `alter_relation_rename_refs` updates all references of renamed-relation in the definition of +/// target relation's `Create` statement. +pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> String { + let ast = Parser::parse_sql(definition).expect("failed to parse relation definition"); + let mut stmt = ast + .into_iter() + .exactly_one() + .expect("should contains only one statement"); + + match &mut stmt { + Statement::CreateTable { + query: Some(query), .. + } + | Statement::CreateView { query, .. } + | Statement::Query(query) // Used by view, actually we store a query as the definition of view. + | Statement::CreateSink { + stmt: + CreateSinkStatement { + sink_from: CreateSink::AsQuery(query), + .. + }, + } => { + QueryRewriter::rewrite_query(query, from, to); + } + Statement::CreateIndex { table_name, .. } + | Statement::CreateSink { + stmt: + CreateSinkStatement { + sink_from: CreateSink::From(table_name), + .. + }, + } => replace_table_name(table_name, to), + _ => unreachable!(), + }; + stmt.to_string() +} + +/// Replace the last ident in the `table_name` with the given name, the object name is ensured to be +/// non-empty. e.g. `schema.table` or `database.schema.table`. +fn replace_table_name(table_name: &mut ObjectName, to: &str) { + let idx = table_name.0.len() - 1; + table_name.0[idx] = Ident::new_unchecked(to); +} + +/// `QueryRewriter` is a visitor that updates all references of relation named `from` to `to` in the +/// given query, which is the part of create statement of `relation`. +struct QueryRewriter<'a> { + from: &'a str, + to: &'a str, +} + +impl QueryRewriter<'_> { + fn rewrite_query(query: &mut Query, from: &str, to: &str) { + let rewriter = QueryRewriter { from, to }; + rewriter.visit_query(query) + } + + /// Visit the query and update all references of relation named `from` to `to`. + fn visit_query(&self, query: &mut Query) { + if let Some(with) = &mut query.with { + for cte_table in &mut with.cte_tables { + self.visit_query(&mut cte_table.query); + } + } + self.visit_set_expr(&mut query.body); + for expr in &mut query.order_by { + self.visit_expr(&mut expr.expr); + } + } + + /// Visit table factor and update all references of relation named `from` to `to`. + /// Rewrite idents(i.e. `schema.table`, `table`) that contains the old name in the + /// following pattern: + /// 1. `FROM a` to `FROM new_a AS a` + /// 2. `FROM a AS b` to `FROM new_a AS b` + /// + /// So that we DON'T have to: + /// 1. rewrite the select and expr part like `schema.table.column`, `table.column`, + /// `alias.column` etc. + /// 2. handle the case that the old name is used as alias. + /// 3. handle the case that the new name is used as alias. + fn visit_table_factor(&self, table_factor: &mut TableFactor) { + match table_factor { + TableFactor::Table { name, alias, .. } => { + let idx = name.0.len() - 1; + if name.0[idx].real_value() == self.from { + if alias.is_none() { + *alias = Some(TableAlias { + name: Ident::new_unchecked(self.from), + columns: vec![], + }); + } + name.0[idx] = Ident::new_unchecked(self.to); + } + } + TableFactor::Derived { subquery, .. } => self.visit_query(subquery), + TableFactor::TableFunction { args, .. } => { + for arg in args { + self.visit_function_args(arg); + } + } + TableFactor::NestedJoin(table_with_joins) => { + self.visit_table_with_joins(table_with_joins); + } + } + } + + /// Visit table with joins and update all references of relation named `from` to `to`. + fn visit_table_with_joins(&self, table_with_joins: &mut TableWithJoins) { + self.visit_table_factor(&mut table_with_joins.relation); + for join in &mut table_with_joins.joins { + self.visit_table_factor(&mut join.relation); + } + } + + /// Visit query body expression and update all references. + fn visit_set_expr(&self, set_expr: &mut SetExpr) { + match set_expr { + SetExpr::Select(select) => { + if let Distinct::DistinctOn(exprs) = &mut select.distinct { + for expr in exprs { + self.visit_expr(expr); + } + } + for select_item in &mut select.projection { + self.visit_select_item(select_item); + } + for from_item in &mut select.from { + self.visit_table_with_joins(from_item); + } + if let Some(where_clause) = &mut select.selection { + self.visit_expr(where_clause); + } + for expr in &mut select.group_by { + self.visit_expr(expr); + } + if let Some(having) = &mut select.having { + self.visit_expr(having); + } + } + SetExpr::Query(query) => self.visit_query(query), + SetExpr::SetOperation { left, right, .. } => { + self.visit_set_expr(left); + self.visit_set_expr(right); + } + SetExpr::Values(_) => {} + } + } + + /// Visit function arguments and update all references. + fn visit_function_args(&self, function_args: &mut FunctionArg) { + match function_args { + FunctionArg::Unnamed(arg) | FunctionArg::Named { arg, .. } => match arg { + FunctionArgExpr::Expr(expr) | FunctionArgExpr::ExprQualifiedWildcard(expr, _) => { + self.visit_expr(expr) + } + FunctionArgExpr::QualifiedWildcard(_, None) | FunctionArgExpr::Wildcard(None) => {} + FunctionArgExpr::QualifiedWildcard(_, Some(exprs)) + | FunctionArgExpr::Wildcard(Some(exprs)) => { + for expr in exprs { + self.visit_expr(expr); + } + } + }, + } + } + + /// Visit function and update all references. + fn visit_function(&self, function: &mut Function) { + for arg in &mut function.args { + self.visit_function_args(arg); + } + } + + /// Visit expression and update all references. + fn visit_expr(&self, expr: &mut Expr) { + match expr { + Expr::FieldIdentifier(expr, ..) + | Expr::IsNull(expr) + | Expr::IsNotNull(expr) + | Expr::IsTrue(expr) + | Expr::IsNotTrue(expr) + | Expr::IsFalse(expr) + | Expr::IsNotFalse(expr) + | Expr::IsUnknown(expr) + | Expr::IsNotUnknown(expr) + | Expr::IsJson { expr, .. } + | Expr::InList { expr, .. } + | Expr::SomeOp(expr) + | Expr::AllOp(expr) + | Expr::UnaryOp { expr, .. } + | Expr::Cast { expr, .. } + | Expr::TryCast { expr, .. } + | Expr::AtTimeZone { + timestamp: expr, .. + } + | Expr::Extract { expr, .. } + | Expr::Substring { expr, .. } + | Expr::Overlay { expr, .. } + | Expr::Trim { expr, .. } + | Expr::Nested(expr) + | Expr::ArrayIndex { obj: expr, .. } + | Expr::ArrayRangeIndex { obj: expr, .. } => self.visit_expr(expr), + + Expr::Position { substring, string } => { + self.visit_expr(substring); + self.visit_expr(string); + } + + Expr::InSubquery { expr, subquery, .. } => { + self.visit_expr(expr); + self.visit_query(subquery); + } + Expr::Between { + expr, low, high, .. + } => { + self.visit_expr(expr); + self.visit_expr(low); + self.visit_expr(high); + } + + Expr::IsDistinctFrom(expr1, expr2) + | Expr::IsNotDistinctFrom(expr1, expr2) + | Expr::BinaryOp { + left: expr1, + right: expr2, + .. + } => { + self.visit_expr(expr1); + self.visit_expr(expr2); + } + Expr::Function(function) => self.visit_function(function), + Expr::Exists(query) | Expr::Subquery(query) | Expr::ArraySubquery(query) => { + self.visit_query(query) + } + + Expr::GroupingSets(exprs_vec) | Expr::Cube(exprs_vec) | Expr::Rollup(exprs_vec) => { + for exprs in exprs_vec { + for expr in exprs { + self.visit_expr(expr); + } + } + } + + Expr::Row(exprs) | Expr::Array(Array { elem: exprs, .. }) => { + for expr in exprs { + self.visit_expr(expr); + } + } + + Expr::LambdaFunction { body, args: _ } => self.visit_expr(body), + + // No need to visit. + Expr::Identifier(_) + | Expr::CompoundIdentifier(_) + | Expr::Collate { .. } + | Expr::Value(_) + | Expr::Parameter { .. } + | Expr::TypedString { .. } + | Expr::Case { .. } => {} + } + } + + /// Visit select item and update all references. + fn visit_select_item(&self, select_item: &mut SelectItem) { + match select_item { + SelectItem::UnnamedExpr(expr) + | SelectItem::ExprQualifiedWildcard(expr, _) + | SelectItem::ExprWithAlias { expr, .. } => self.visit_expr(expr), + SelectItem::QualifiedWildcard(_, None) | SelectItem::Wildcard(None) => {} + SelectItem::QualifiedWildcard(_, Some(exprs)) | SelectItem::Wildcard(Some(exprs)) => { + for expr in exprs { + self.visit_expr(expr); + } + } + } + } +} + +pub struct ReplaceTableExprRewriter { + pub table_col_index_mapping: ColIndexMapping, +} + +impl ReplaceTableExprRewriter { + pub fn rewrite_expr(&self, expr: &mut ExprNode) { + let rex_node = expr.rex_node.as_mut().unwrap(); + match rex_node { + RexNode::InputRef(input_col_idx) => { + *input_col_idx = self.table_col_index_mapping.map(*input_col_idx as usize) as u32 + } + RexNode::Constant(_) => {} + RexNode::Udf(udf) => self.rewrite_udf(udf), + RexNode::FuncCall(function_call) => self.rewrite_function_call(function_call), + RexNode::Now(_) => {} + } + } + + fn rewrite_udf(&self, udf: &mut UserDefinedFunction) { + udf.children + .iter_mut() + .for_each(|expr| self.rewrite_expr(expr)); + } + + fn rewrite_function_call(&self, function_call: &mut FunctionCall) { + function_call + .children + .iter_mut() + .for_each(|expr| self.rewrite_expr(expr)); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_alter_table_rename() { + let definition = "CREATE TABLE foo (a int, b int)"; + let new_name = "bar"; + let expected = "CREATE TABLE bar (a INT, b INT)"; + let actual = alter_relation_rename(definition, new_name); + assert_eq!(expected, actual); + } + + #[test] + fn test_rename_index_refs() { + let definition = "CREATE INDEX idx1 ON foo(v1 DESC, v2)"; + let from = "foo"; + let to = "bar"; + let expected = "CREATE INDEX idx1 ON bar(v1 DESC, v2)"; + let actual = alter_relation_rename_refs(definition, from, to); + assert_eq!(expected, actual); + } + + #[test] + fn test_rename_sink_refs() { + let definition = + "CREATE SINK sink_t FROM foo WITH (connector = 'kafka', format = 'append_only')"; + let from = "foo"; + let to = "bar"; + let expected = + "CREATE SINK sink_t FROM bar WITH (connector = 'kafka', format = 'append_only')"; + let actual = alter_relation_rename_refs(definition, from, to); + assert_eq!(expected, actual); + } + + #[test] + fn test_rename_with_alias_refs() { + let definition = + "CREATE MATERIALIZED VIEW mv1 AS SELECT foo.v1 AS m1v, foo.v2 AS m2v FROM foo"; + let from = "foo"; + let to = "bar"; + let expected = + "CREATE MATERIALIZED VIEW mv1 AS SELECT foo.v1 AS m1v, foo.v2 AS m2v FROM bar AS foo"; + let actual = alter_relation_rename_refs(definition, from, to); + assert_eq!(expected, actual); + + let definition = "CREATE MATERIALIZED VIEW mv1 AS SELECT foo.v1 AS m1v, (foo.v2).v3 AS m2v FROM foo WHERE foo.v1 = 1 AND (foo.v2).v3 IS TRUE"; + let expected = "CREATE MATERIALIZED VIEW mv1 AS SELECT foo.v1 AS m1v, (foo.v2).v3 AS m2v FROM bar AS foo WHERE foo.v1 = 1 AND (foo.v2).v3 IS TRUE"; + let actual = alter_relation_rename_refs(definition, from, to); + assert_eq!(expected, actual); + + let definition = "CREATE MATERIALIZED VIEW mv1 AS SELECT bar.v1 AS m1v, (bar.v2).v3 AS m2v FROM foo AS bar WHERE bar.v1 = 1"; + let expected = "CREATE MATERIALIZED VIEW mv1 AS SELECT bar.v1 AS m1v, (bar.v2).v3 AS m2v FROM bar AS bar WHERE bar.v1 = 1"; + let actual = alter_relation_rename_refs(definition, from, to); + assert_eq!(expected, actual); + } +} diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 964ee24ae99b5..d36918db3820d 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -115,8 +115,11 @@ pub struct PartialObject { pub database_id: Option, } -/// List all objects that are using the given one. It runs a recursive CTE to find all the dependencies. -pub async fn list_used_by(obj_id: ObjectId, db: &C) -> MetaResult> +/// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies. +pub async fn get_referring_objects_cascade( + obj_id: ObjectId, + db: &C, +) -> MetaResult> where C: ConnectionTrait, { @@ -318,6 +321,24 @@ where Ok(()) } +/// List all objects that are using the given one. +pub async fn get_referring_objects(object_id: ObjectId, db: &C) -> MetaResult> +where + C: ConnectionTrait, +{ + let objs = ObjectDependency::find() + .filter(object_dependency::Column::Oid.eq(object_id)) + .join( + JoinType::InnerJoin, + object_dependency::Relation::Object1.def(), + ) + .into_partial_model() + .all(db) + .await?; + + Ok(objs) +} + /// `ensure_schema_empty` ensures that the schema is empty, used by `DROP SCHEMA`. pub async fn ensure_schema_empty(schema_id: SchemaId, db: &C) -> MetaResult<()> where diff --git a/src/meta/src/manager/catalog/utils.rs b/src/meta/src/manager/catalog/utils.rs index 7e26e32ee62eb..ea579867fc320 100644 --- a/src/meta/src/manager/catalog/utils.rs +++ b/src/meta/src/manager/catalog/utils.rs @@ -401,7 +401,7 @@ impl ReplaceTableExprRewriter { #[cfg(test)] mod tests { - use crate::manager::catalog::utils::{alter_relation_rename, alter_relation_rename_refs}; + use super::*; #[test] fn test_alter_table_rename() { diff --git a/src/meta/src/model_v2/connection.rs b/src/meta/src/model_v2/connection.rs index f6638ed0b53a4..0096603c843a3 100644 --- a/src/meta/src/model_v2/connection.rs +++ b/src/meta/src/model_v2/connection.rs @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::catalog::connection::{PbInfo, PbPrivateLinkService}; +use risingwave_pb::catalog::connection::PbInfo; use risingwave_pb::catalog::PbConnection; use sea_orm::entity::prelude::*; -use sea_orm::{ActiveValue, FromJsonQueryResult}; -use serde::{Deserialize, Serialize}; +use sea_orm::ActiveValue; -use crate::model_v2::ConnectionId; +use crate::model_v2::{ConnectionId, PrivateLinkService}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "connection")] @@ -65,11 +64,6 @@ impl Related for Entity { impl ActiveModelBehavior for ActiveModel {} -#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)] -pub struct PrivateLinkService(pub PbPrivateLinkService); - -impl Eq for PrivateLinkService {} - impl From for ActiveModel { fn from(conn: PbConnection) -> Self { let Some(PbInfo::PrivateLinkService(private_link_srv)) = conn.info else { diff --git a/src/meta/src/model_v2/index.rs b/src/meta/src/model_v2/index.rs index 6a4b7d1b349ca..3b80632e2cfc3 100644 --- a/src/meta/src/model_v2/index.rs +++ b/src/meta/src/model_v2/index.rs @@ -14,7 +14,7 @@ use sea_orm::entity::prelude::*; -use crate::model_v2::{I32Array, IndexId, TableId}; +use crate::model_v2::{ExprNodeArray, I32Array, IndexId, JobStatus, TableId}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "index")] @@ -24,8 +24,9 @@ pub struct Model { pub name: String, pub index_table_id: TableId, pub primary_table_id: TableId, - pub index_items: Option, - pub original_columns: Option, + pub index_items: ExprNodeArray, + pub original_columns: I32Array, + pub job_status: JobStatus, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/src/meta/src/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/src/model_v2/migration/src/m20230908_072257_init.rs index 43a8e5d24d22f..c9559bd6feda2 100644 --- a/src/meta/src/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/src/model_v2/migration/src/m20230908_072257_init.rs @@ -404,15 +404,16 @@ impl MigrationTrait for Migration { .table(Source::Table) .col(ColumnDef::new(Source::SourceId).integer().primary_key()) .col(ColumnDef::new(Source::Name).string().not_null()) - .col(ColumnDef::new(Source::RowIdIndex).string()) - .col(ColumnDef::new(Source::Columns).json()) - .col(ColumnDef::new(Source::PkColumnIds).json()) - .col(ColumnDef::new(Source::Properties).json()) - .col(ColumnDef::new(Source::Definition).string()) + .col(ColumnDef::new(Source::RowIdIndex).integer()) + .col(ColumnDef::new(Source::Columns).json().not_null()) + .col(ColumnDef::new(Source::PkColumnIds).json().not_null()) + .col(ColumnDef::new(Source::Properties).json().not_null()) + .col(ColumnDef::new(Source::Definition).string().not_null()) .col(ColumnDef::new(Source::SourceInfo).json()) - .col(ColumnDef::new(Source::WatermarkDescs).json()) + .col(ColumnDef::new(Source::WatermarkDescs).json().not_null()) .col(ColumnDef::new(Source::OptionalAssociatedTableId).integer()) .col(ColumnDef::new(Source::ConnectionId).integer()) + .col(ColumnDef::new(Source::Version).big_integer().not_null()) .foreign_key( &mut ForeignKey::create() .name("FK_source_object_id") @@ -442,15 +443,17 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Table::Columns).json().not_null()) .col(ColumnDef::new(Table::Pk).json().not_null()) .col(ColumnDef::new(Table::DistributionKey).json().not_null()) + .col(ColumnDef::new(Table::StreamKey).json().not_null()) .col(ColumnDef::new(Table::AppendOnly).boolean().not_null()) .col(ColumnDef::new(Table::Properties).json().not_null()) .col(ColumnDef::new(Table::FragmentId).integer().not_null()) .col(ColumnDef::new(Table::VnodeColIndex).integer()) + .col(ColumnDef::new(Table::RowIdIndex).integer()) .col(ColumnDef::new(Table::ValueIndices).json().not_null()) .col(ColumnDef::new(Table::Definition).string().not_null()) .col( ColumnDef::new(Table::HandlePkConflictBehavior) - .integer() + .string() .not_null(), ) .col( @@ -467,6 +470,8 @@ impl MigrationTrait for Migration { .boolean() .not_null(), ) + .col(ColumnDef::new(Table::JobStatus).string().not_null()) + .col(ColumnDef::new(Table::CreateType).string().not_null()) .col(ColumnDef::new(Table::Version).json().not_null()) .foreign_key( &mut ForeignKey::create() @@ -506,16 +511,18 @@ impl MigrationTrait for Migration { .table(Sink::Table) .col(ColumnDef::new(Sink::SinkId).integer().primary_key()) .col(ColumnDef::new(Sink::Name).string().not_null()) - .col(ColumnDef::new(Sink::Columns).json()) - .col(ColumnDef::new(Sink::PkColumnIds).json()) - .col(ColumnDef::new(Sink::DistributionKey).json()) - .col(ColumnDef::new(Sink::DownstreamPk).json()) + .col(ColumnDef::new(Sink::Columns).json().not_null()) + .col(ColumnDef::new(Sink::PlanPk).json().not_null()) + .col(ColumnDef::new(Sink::DistributionKey).json().not_null()) + .col(ColumnDef::new(Sink::DownstreamPk).json().not_null()) .col(ColumnDef::new(Sink::SinkType).string().not_null()) - .col(ColumnDef::new(Sink::Properties).json()) + .col(ColumnDef::new(Sink::Properties).json().not_null()) .col(ColumnDef::new(Sink::Definition).string().not_null()) .col(ColumnDef::new(Sink::ConnectionId).integer()) .col(ColumnDef::new(Sink::DbName).string().not_null()) .col(ColumnDef::new(Sink::SinkFromName).string().not_null()) + .col(ColumnDef::new(Sink::SinkFormatDesc).json()) + .col(ColumnDef::new(Sink::JobStatus).string().not_null()) .foreign_key( &mut ForeignKey::create() .name("FK_sink_object_id") @@ -541,7 +548,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(View::ViewId).integer().primary_key()) .col(ColumnDef::new(View::Name).string().not_null()) .col(ColumnDef::new(View::Properties).json().not_null()) - .col(ColumnDef::new(View::Sql).string().not_null()) + .col(ColumnDef::new(View::Definition).string().not_null()) .col(ColumnDef::new(View::Columns).json().not_null()) .foreign_key( &mut ForeignKey::create() @@ -562,8 +569,9 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Index::Name).string().not_null()) .col(ColumnDef::new(Index::IndexTableId).integer().not_null()) .col(ColumnDef::new(Index::PrimaryTableId).integer().not_null()) - .col(ColumnDef::new(Index::IndexItems).json()) - .col(ColumnDef::new(Index::OriginalColumns).json()) + .col(ColumnDef::new(Index::IndexItems).json().not_null()) + .col(ColumnDef::new(Index::OriginalColumns).json().not_null()) + .col(ColumnDef::new(Index::JobStatus).string().not_null()) .foreign_key( &mut ForeignKey::create() .name("FK_index_object_id") @@ -862,10 +870,12 @@ enum Table { Columns, Pk, DistributionKey, + StreamKey, AppendOnly, Properties, FragmentId, VnodeColIndex, + RowIdIndex, ValueIndices, Definition, HandlePkConflictBehavior, @@ -875,6 +885,8 @@ enum Table { DmlFragmentId, Cardinality, CleanedByWatermark, + JobStatus, + CreateType, Version, } @@ -892,6 +904,7 @@ enum Source { WatermarkDescs, OptionalAssociatedTableId, ConnectionId, + Version, } #[derive(DeriveIden)] @@ -900,7 +913,7 @@ enum Sink { SinkId, Name, Columns, - PkColumnIds, + PlanPk, DistributionKey, DownstreamPk, SinkType, @@ -909,6 +922,8 @@ enum Sink { ConnectionId, DbName, SinkFromName, + SinkFormatDesc, + JobStatus, } #[derive(DeriveIden)] @@ -925,7 +940,7 @@ enum View { ViewId, Name, Properties, - Sql, + Definition, Columns, } @@ -938,6 +953,7 @@ enum Index { PrimaryTableId, IndexItems, OriginalColumns, + JobStatus, } #[derive(DeriveIden)] diff --git a/src/meta/src/model_v2/mod.rs b/src/meta/src/model_v2/mod.rs index d799a608933ac..1c2f928063fff 100644 --- a/src/meta/src/model_v2/mod.rs +++ b/src/meta/src/model_v2/mod.rs @@ -14,7 +14,8 @@ use std::collections::HashMap; -use sea_orm::FromJsonQueryResult; +use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus}; +use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult}; use serde::{Deserialize, Serialize}; pub mod prelude; @@ -63,19 +64,73 @@ pub type FunctionId = ObjectId; pub type ConnectionId = ObjectId; pub type UserId = u32; -#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Eq, Serialize, Deserialize, Default)] -pub struct I32Array(pub Vec); +#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "String", db_type = "String(None)")] +pub enum JobStatus { + #[sea_orm(string_value = "CREATING")] + Creating, + #[sea_orm(string_value = "CREATED")] + Created, +} -#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Eq, Serialize, Deserialize, Default)] -pub struct DataType(pub risingwave_pb::data::DataType); +impl From for PbStreamJobStatus { + fn from(job_status: JobStatus) -> Self { + match job_status { + JobStatus::Creating => Self::Creating, + JobStatus::Created => Self::Created, + } + } +} -#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Eq, Serialize, Deserialize, Default)] -pub struct DataTypeArray(pub Vec); +#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "String", db_type = "String(None)")] +pub enum CreateType { + #[sea_orm(string_value = "BACKGROUND")] + Background, + #[sea_orm(string_value = "FOREGROUND")] + Foreground, +} -#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)] -pub struct FieldArray(pub Vec); +impl From for PbCreateType { + fn from(create_type: CreateType) -> Self { + match create_type { + CreateType::Background => Self::Background, + CreateType::Foreground => Self::Foreground, + } + } +} -impl Eq for FieldArray {} +/// Defines struct with a single pb field that derives `FromJsonQueryResult`, it will helps to map json value stored in database to Pb struct. +macro_rules! derive_from_json_struct { + ($struct_name:ident, $field_type:ty) => { + #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)] + pub struct $struct_name(pub $field_type); + impl Eq for $struct_name {} + }; +} -#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Eq, Serialize, Deserialize, Default)] -pub struct Property(pub HashMap); +derive_from_json_struct!(I32Array, Vec); +derive_from_json_struct!(DataType, risingwave_pb::data::DataType); +derive_from_json_struct!(DataTypeArray, Vec); +derive_from_json_struct!(FieldArray, Vec); +derive_from_json_struct!(Property, HashMap); +derive_from_json_struct!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog); +derive_from_json_struct!( + ColumnCatalogArray, + Vec +); +derive_from_json_struct!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo); +derive_from_json_struct!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc); +derive_from_json_struct!( + WatermarkDescArray, + Vec +); +derive_from_json_struct!(ExprNodeArray, Vec); +derive_from_json_struct!(ColumnOrderArray, Vec); +derive_from_json_struct!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc); +derive_from_json_struct!(Cardinality, risingwave_pb::plan_common::PbCardinality); +derive_from_json_struct!(TableVersion, risingwave_pb::catalog::table::PbTableVersion); +derive_from_json_struct!( + PrivateLinkService, + risingwave_pb::catalog::connection::PbPrivateLinkService +); diff --git a/src/meta/src/model_v2/sink.rs b/src/meta/src/model_v2/sink.rs index 8c22a04a8fd01..bef46f1d7195f 100644 --- a/src/meta/src/model_v2/sink.rs +++ b/src/meta/src/model_v2/sink.rs @@ -12,9 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_pb::catalog::PbSinkType; use sea_orm::entity::prelude::*; -use crate::model_v2::{ConnectionId, I32Array, SinkId}; +use crate::model_v2::{ + ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, JobStatus, Property, + SinkFormatDesc, SinkId, +}; #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] #[sea_orm(rs_type = "String", db_type = "String(None)")] @@ -27,22 +31,34 @@ pub enum SinkType { Upsert, } +impl From for PbSinkType { + fn from(sink_type: SinkType) -> Self { + match sink_type { + SinkType::AppendOnly => Self::AppendOnly, + SinkType::ForceAppendOnly => Self::ForceAppendOnly, + SinkType::Upsert => Self::Upsert, + } + } +} + #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "sink")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub sink_id: SinkId, pub name: String, - pub columns: Option, - pub pk_column_ids: Option, - pub distribution_key: Option, - pub downstream_pk: Option, + pub columns: ColumnCatalogArray, + pub plan_pk: ColumnOrderArray, + pub distribution_key: I32Array, + pub downstream_pk: I32Array, pub sink_type: SinkType, - pub properties: Option, + pub properties: Property, pub definition: String, pub connection_id: Option, pub db_name: String, pub sink_from_name: String, + pub sink_format_desc: Option, + pub job_status: JobStatus, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/src/meta/src/model_v2/source.rs b/src/meta/src/model_v2/source.rs index 9bb6acc9382aa..2ad1de7914d96 100644 --- a/src/meta/src/model_v2/source.rs +++ b/src/meta/src/model_v2/source.rs @@ -14,7 +14,10 @@ use sea_orm::entity::prelude::*; -use crate::model_v2::{ConnectionId, SourceId, TableId}; +use crate::model_v2::{ + ColumnCatalogArray, ConnectionId, I32Array, Property, SourceId, StreamSourceInfo, TableId, + WatermarkDescArray, +}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "source")] @@ -22,15 +25,16 @@ pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub source_id: SourceId, pub name: String, - pub row_id_index: Option, - pub columns: Option, - pub pk_column_ids: Option, - pub properties: Option, - pub definition: Option, - pub source_info: Option, - pub watermark_descs: Option, + pub row_id_index: Option, + pub columns: ColumnCatalogArray, + pub pk_column_ids: I32Array, + pub properties: Property, + pub definition: String, + pub source_info: Option, + pub watermark_descs: WatermarkDescArray, pub optional_associated_table_id: Option, pub connection_id: Option, + pub version: u64, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/src/meta/src/model_v2/table.rs b/src/meta/src/model_v2/table.rs index b2815eed7c8a0..08caee7009f8f 100644 --- a/src/meta/src/model_v2/table.rs +++ b/src/meta/src/model_v2/table.rs @@ -12,9 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_pb::catalog::table::PbTableType; +use risingwave_pb::catalog::PbHandleConflictBehavior; use sea_orm::entity::prelude::*; -use crate::model_v2::{I32Array, Property, SourceId, TableId}; +use crate::model_v2::{ + Cardinality, ColumnCatalogArray, ColumnOrderArray, CreateType, I32Array, JobStatus, Property, + SourceId, TableId, TableVersion, +}; #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] #[sea_orm(rs_type = "String", db_type = "String(None)")] @@ -29,6 +34,38 @@ pub enum TableType { Internal, } +impl From for PbTableType { + fn from(table_type: TableType) -> Self { + match table_type { + TableType::Table => Self::Table, + TableType::MaterializedView => Self::MaterializedView, + TableType::Index => Self::Index, + TableType::Internal => Self::Internal, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "String", db_type = "String(None)")] +pub enum HandleConflictBehavior { + #[sea_orm(string_value = "OVERWRITE")] + Overwrite, + #[sea_orm(string_value = "IGNORE")] + Ignore, + #[sea_orm(string_value = "NO_CHECK")] + NoCheck, +} + +impl From for PbHandleConflictBehavior { + fn from(handle_conflict_behavior: HandleConflictBehavior) -> Self { + match handle_conflict_behavior { + HandleConflictBehavior::Overwrite => Self::Overwrite, + HandleConflictBehavior::Ignore => Self::Ignore, + HandleConflictBehavior::NoCheck => Self::NoCheck, + } + } +} + #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "table")] pub struct Model { @@ -37,23 +74,27 @@ pub struct Model { pub name: String, pub optional_associated_source_id: Option, pub table_type: TableType, - pub columns: Json, - pub pk: Json, + pub columns: ColumnCatalogArray, + pub pk: ColumnOrderArray, pub distribution_key: I32Array, + pub stream_key: I32Array, pub append_only: bool, pub properties: Property, pub fragment_id: i32, - pub vnode_col_index: I32Array, + pub vnode_col_index: Option, + pub row_id_index: Option, pub value_indices: I32Array, pub definition: String, - pub handle_pk_conflict_behavior: i32, - pub read_prefix_len_hint: i32, + pub handle_pk_conflict_behavior: HandleConflictBehavior, + pub read_prefix_len_hint: u32, pub watermark_indices: I32Array, pub dist_key_in_pk: I32Array, pub dml_fragment_id: Option, - pub cardinality: Option, + pub cardinality: Option, pub cleaned_by_watermark: bool, - pub version: Json, + pub job_status: JobStatus, + pub create_type: CreateType, + pub version: TableVersion, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/src/meta/src/model_v2/view.rs b/src/meta/src/model_v2/view.rs index 5bad8593e0b72..8f7d22408d3f2 100644 --- a/src/meta/src/model_v2/view.rs +++ b/src/meta/src/model_v2/view.rs @@ -25,7 +25,7 @@ pub struct Model { pub view_id: ViewId, pub name: String, pub properties: Property, - pub sql: String, + pub definition: String, pub columns: FieldArray, } @@ -55,7 +55,7 @@ impl From for ActiveModel { view_id: ActiveValue::Set(view.id as _), name: ActiveValue::Set(view.name), properties: ActiveValue::Set(Property(view.properties)), - sql: ActiveValue::Set(view.sql), + definition: ActiveValue::Set(view.sql), columns: ActiveValue::Set(FieldArray(view.columns)), } }