From 4793b84668efde25e6ca4a5ee153e71a3da2f0b0 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 23 Dec 2024 16:20:04 +0800 Subject: [PATCH 1/8] refactor(frontend): extract arguments for create table handler Signed-off-by: Bugen Zhao --- src/frontend/src/handler/create_table.rs | 269 ++++++++---------- src/frontend/src/handler/create_table_as.rs | 16 +- src/frontend/src/optimizer/mod.rs | 35 +-- .../optimizer/plan_node/stream_materialize.rs | 8 +- src/sqlparser/src/ast/mod.rs | 2 +- 5 files changed, 155 insertions(+), 175 deletions(-) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 6668448c3d836..0d1695016542e 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -479,13 +479,10 @@ pub(crate) async fn gen_create_table_plan_with_source( format_encode: FormatEncodeOptions, source_watermarks: Vec, mut col_id_gen: ColumnIdGenerator, - append_only: bool, - on_conflict: Option, - with_version_column: Option, include_column_options: IncludeOption, - engine: Engine, + bbb: BBB, ) -> Result<(PlanRef, Option, PbTable)> { - if append_only + if bbb.append_only && format_encode.format != Format::Plain && format_encode.format != Format::Native { @@ -499,6 +496,9 @@ pub(crate) async fn gen_create_table_plan_with_source( let session = &handler_args.session; let with_properties = bind_connector_props(&handler_args, &format_encode, false)?; + let db_name: &str = session.database(); + let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, table_name.clone())?; + let (columns_from_resolve_source, source_info) = bind_columns_from_source( session, &format_encode, @@ -533,14 +533,12 @@ pub(crate) async fn gen_create_table_plan_with_source( let (plan, table) = gen_table_plan_with_source( context.into(), + schema_name, source_catalog, - append_only, - on_conflict, - with_version_column, - Some(col_id_gen.into_version()), - database_id, - schema_id, - engine, + BBB { + version: Some(col_id_gen.into_version()), + ..bbb + }, )?; Ok((plan, Some(pb_source), table)) @@ -556,11 +554,7 @@ pub(crate) fn gen_create_table_plan( constraints: Vec, mut col_id_gen: ColumnIdGenerator, source_watermarks: Vec, - append_only: bool, - on_conflict: Option, - with_version_column: Option, - webhook_info: Option, - engine: Engine, + bbb: BBB, ) -> Result<(PlanRef, PbTable)> { let definition = context.normalized_sql().to_owned(); let mut columns = bind_sql_columns(&column_defs)?; @@ -581,12 +575,10 @@ pub(crate) fn gen_create_table_plan( constraints, definition, source_watermarks, - append_only, - on_conflict, - with_version_column, - Some(col_id_gen.into_version()), - webhook_info, - engine, + BBB { + version: Some(col_id_gen.into_version()), + ..bbb + }, ) } @@ -599,13 +591,9 @@ pub(crate) fn gen_create_table_plan_without_source( constraints: Vec, definition: String, source_watermarks: Vec, - append_only: bool, - on_conflict: Option, - with_version_column: Option, - version: Option, - webhook_info: Option, - engine: Engine, + bbb: BBB, ) -> Result<(PlanRef, PbTable)> { + // XXX: Why not bind outside? let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?; let (mut columns, pk_column_ids, row_id_index) = bind_pk_and_row_id_on_relation(columns, pk_names, true)?; @@ -627,89 +615,94 @@ pub(crate) fn gen_create_table_plan_without_source( let session = context.session_ctx().clone(); let db_name = session.database(); - let (schema_name, name) = Binder::resolve_schema_qualified_name(db_name, table_name)?; - let (database_id, schema_id) = - session.get_database_and_schema_id_for_create(schema_name.clone())?; + let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?; - gen_table_plan_inner( - context.into(), - name, + let aaa = AAA { columns, pk_column_ids, row_id_index, definition, watermark_descs, - append_only, - on_conflict, - with_version_column, - version, - None, - database_id, - schema_id, - webhook_info, - engine, - ) + source_catalog: None, + }; + + gen_table_plan_inner(context.into(), schema_name, table_name, aaa, bbb) } fn gen_table_plan_with_source( context: OptimizerContextRef, + schema_name: Option, source_catalog: SourceCatalog, - append_only: bool, - on_conflict: Option, - with_version_column: Option, - version: Option, /* TODO: this should always be `Some` if we support `ALTER - * TABLE` for `CREATE TABLE AS`. */ - database_id: DatabaseId, - schema_id: SchemaId, - engine: Engine, + bbb: BBB, ) -> Result<(PlanRef, PbTable)> { - let cloned_source_catalog = source_catalog.clone(); - gen_table_plan_inner( - context, - source_catalog.name, - source_catalog.columns, - source_catalog.pk_col_ids, - source_catalog.row_id_index, - source_catalog.definition, - source_catalog.watermark_descs, - append_only, - on_conflict, - with_version_column, - version, - Some(cloned_source_catalog), - database_id, - schema_id, - None, - engine, - ) + let table_name = source_catalog.name.clone(); + + let aaa = AAA { + columns: source_catalog.columns.clone(), + pk_column_ids: source_catalog.pk_col_ids.clone(), + row_id_index: source_catalog.row_id_index, + definition: source_catalog.definition.clone(), // TODO: ? + watermark_descs: source_catalog.watermark_descs.clone(), + source_catalog: Some(source_catalog), + }; + + gen_table_plan_inner(context, schema_name, table_name, aaa, bbb) +} + +pub(crate) struct BBB { + pub append_only: bool, + pub on_conflict: Option, + pub with_version_column: Option, + pub version: Option, /* TODO: this should always be `Some` if we support `ALTER + * TABLE` for `CREATE TABLE AS`. */ + pub webhook_info: Option, + pub engine: Engine, +} + +pub(crate) struct AAA { + pub columns: Vec, + pub pk_column_ids: Vec, + pub row_id_index: Option, + pub definition: String, + pub watermark_descs: Vec, + pub source_catalog: Option, } #[allow(clippy::too_many_arguments)] fn gen_table_plan_inner( context: OptimizerContextRef, + schema_name: Option, table_name: String, - columns: Vec, - pk_column_ids: Vec, - row_id_index: Option, - definition: String, - watermark_descs: Vec, - append_only: bool, - on_conflict: Option, - with_version_column: Option, - version: Option, /* TODO: this should always be `Some` if we support `ALTER - * TABLE` for `CREATE TABLE AS`. */ - source_catalog: Option, - database_id: DatabaseId, - schema_id: SchemaId, - webhook_info: Option, - engine: Engine, + aaa: AAA, + bbb: BBB, ) -> Result<(PlanRef, PbTable)> { + let AAA { + ref columns, + ref pk_column_ids, + row_id_index, + ref definition, + ref watermark_descs, + ref source_catalog, + } = aaa; + + let BBB { + append_only, + on_conflict, + ref with_version_column, + ref version, + ref webhook_info, + ref engine, + } = bbb; + + let (database_id, schema_id) = context + .session_ctx() + .get_database_and_schema_id_for_create(schema_name)?; + let session = context.session_ctx().clone(); let retention_seconds = context.with_options().retention_seconds(); - let is_external_source = source_catalog.is_some(); let source_node: PlanRef = LogicalSource::new( - source_catalog.map(|source| Rc::new(source.clone())), + source_catalog.clone().map(Rc::new), columns.clone(), row_id_index, SourceNodeKind::CreateTable, @@ -758,24 +751,8 @@ fn gen_table_plan_inner( .into()); } - let materialize = plan_root.gen_table_plan( - context, - table_name, - columns, - definition, - pk_column_ids, - row_id_index, - append_only, - on_conflict, - with_version_column, - watermark_descs, - version, - is_external_source, - retention_seconds, - None, - webhook_info, - engine, - )?; + let materialize = + plan_root.gen_table_plan(context, table_name, aaa, BBB { on_conflict, ..bbb })?; let mut table = materialize.table().to_prost(schema_id, database_id); @@ -896,24 +873,27 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( let materialize = plan_root.gen_table_plan( context, resolved_table_name, - columns, - definition, - pk_column_ids, - None, - false, - on_conflict, - with_version_column, - vec![], - Some(col_id_gen.into_version()), - true, - None, - Some(cdc_table_id), - None, - engine, + AAA { + columns, + pk_column_ids, + row_id_index: None, + definition, + watermark_descs: vec![], + source_catalog: Some((*source).clone()), + }, + BBB { + append_only: false, + on_conflict, + with_version_column, + version: Some(col_id_gen.into_version()), + webhook_info: None, + engine, + }, )?; let mut table = materialize.table().to_prost(schema_id, database_id); table.owner = session.user_id(); + table.cdc_table_id = Some(cdc_table_id); table.dependent_relations = vec![source.id]; Ok((materialize.into(), table)) @@ -1001,6 +981,18 @@ pub(super) async fn handle_create_table_plan( &include_column_options, &cdc_table_info, )?; + let webhook_info = webhook_info + .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info)) + .transpose()?; + + let bbb = BBB { + append_only, + on_conflict, + with_version_column: with_version_column.clone(), + version: None, // placeholder + webhook_info, + engine, + }; let ((plan, source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) { (Some(format_encode), None) => ( @@ -1014,20 +1006,13 @@ pub(super) async fn handle_create_table_plan( format_encode, source_watermarks, col_id_gen, - append_only, - on_conflict, - with_version_column, include_column_options, - engine, + bbb, ) .await?, TableJobType::General, ), (None, None) => { - let webhook_info = webhook_info - .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info)) - .transpose()?; - let context = OptimizerContext::new(handler_args, explain_options); let (plan, table) = gen_create_table_plan( context, @@ -1036,11 +1021,7 @@ pub(super) async fn handle_create_table_plan( constraints, col_id_gen, source_watermarks, - append_only, - on_conflict, - with_version_column, - webhook_info, - engine, + bbb, )?; ((plan, None, table), TableJobType::General) @@ -1761,6 +1742,15 @@ pub async fn generate_stream_graph_for_replace_table( ) -> Result<(StreamFragmentGraph, Table, Option, TableJobType)> { use risingwave_pb::catalog::table::OptionalAssociatedSourceId; + let bbb = BBB { + append_only, + on_conflict, + with_version_column: with_version_column.clone(), + version: None, // placeholder + webhook_info: original_catalog.webhook_info.clone(), + engine, + }; + let ((plan, mut source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) { (Some(format_encode), None) => ( gen_create_table_plan_with_source( @@ -1773,11 +1763,8 @@ pub async fn generate_stream_graph_for_replace_table( format_encode, source_watermarks, col_id_gen, - append_only, - on_conflict, - with_version_column, include_column_options, - engine, + bbb, ) .await?, TableJobType::General, @@ -1791,11 +1778,7 @@ pub async fn generate_stream_graph_for_replace_table( constraints, col_id_gen, source_watermarks, - append_only, - on_conflict, - with_version_column, - original_catalog.webhook_info.clone(), - engine, + bbb, )?; ((plan, None, table), TableJobType::General) } diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 2e24d4a516f27..94efcf0bccbba 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -21,7 +21,7 @@ use risingwave_sqlparser::ast::{ColumnDef, ObjectName, OnConflict, Query, Statem use super::{HandlerArgs, RwPgResponse}; use crate::binder::BoundStatement; use crate::error::{ErrorCode, Result}; -use crate::handler::create_table::{gen_create_table_plan_without_source, ColumnIdGenerator}; +use crate::handler::create_table::{gen_create_table_plan_without_source, ColumnIdGenerator, BBB}; use crate::handler::query::handle_query; use crate::{build_graph, Binder, OptimizerContext}; pub async fn handle_create_as( @@ -110,12 +110,14 @@ pub async fn handle_create_as( vec![], "".to_owned(), // TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS` vec![], // No watermark should be defined in for `CREATE TABLE AS` - append_only, - on_conflict, - with_version_column, - Some(col_id_gen.into_version()), - None, - engine, + BBB { + append_only, + on_conflict, + with_version_column, + version: Some(col_id_gen.into_version()), + webhook_info: None, + engine, + }, )?; let graph = build_graph(plan)?; diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 14e45f02517f4..45b79119f9ffa 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -76,6 +76,7 @@ use self::rule::*; use crate::catalog::table_catalog::{TableType, TableVersion}; use crate::error::{ErrorCode, Result}; use crate::expr::TimestamptzExprFinder; +use crate::handler::create_table::{AAA, BBB}; use crate::optimizer::plan_node::generic::{SourceNodeKind, Union}; use crate::optimizer::plan_node::{ BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion, @@ -639,25 +640,26 @@ impl PlanRoot { } /// Optimize and generate a create table plan. - #[allow(clippy::too_many_arguments)] pub fn gen_table_plan( mut self, context: OptimizerContextRef, table_name: String, - columns: Vec, - definition: String, - pk_column_ids: Vec, - row_id_index: Option, - append_only: bool, - on_conflict: Option, - with_version_column: Option, - watermark_descs: Vec, - version: Option, - with_external_source: bool, - retention_seconds: Option, - cdc_table_id: Option, - webhook_info: Option, - engine: Engine, + AAA { + columns, + pk_column_ids, + row_id_index, + definition, + watermark_descs, + source_catalog, + }: AAA, + BBB { + append_only, + on_conflict, + with_version_column, + version, + webhook_info, + engine, + }: BBB, ) -> Result { assert_eq!(self.phase, PlanPhase::Logical); assert_eq!(self.plan.convention(), Convention::Logical); @@ -751,6 +753,7 @@ impl PlanRoot { None }; + let with_external_source = source_catalog.is_some(); let union_inputs = if with_external_source { let mut external_source_node = stream_plan; external_source_node = @@ -890,8 +893,6 @@ impl PlanRoot { pk_column_indices, row_id_index, version, - retention_seconds, - cdc_table_id, webhook_info, engine, ) diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 2793c84b1d555..5ffd7111388cd 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -110,7 +110,6 @@ impl StreamMaterialize { table_type, None, cardinality, - retention_seconds, create_type, None, Engine::Hummock, @@ -137,8 +136,6 @@ impl StreamMaterialize { pk_column_indices: Vec, row_id_index: Option, version: Option, - retention_seconds: Option, - cdc_table_id: Option, webhook_info: Option, engine: Engine, ) -> Result { @@ -157,14 +154,11 @@ impl StreamMaterialize { TableType::Table, version, Cardinality::unknown(), // unknown cardinality for tables - retention_seconds, CreateType::Foreground, webhook_info, engine, )?; - table.cdc_table_id = cdc_table_id; - Ok(Self::new(input, table)) } @@ -233,7 +227,6 @@ impl StreamMaterialize { table_type: TableType, version: Option, cardinality: Cardinality, - retention_seconds: Option, create_type: CreateType, webhook_info: Option, engine: Engine, @@ -244,6 +237,7 @@ impl StreamMaterialize { let distribution_key = input.distribution().dist_column_indices().to_vec(); let append_only = input.append_only(); let watermark_columns = input.watermark_columns().clone(); + let retention_seconds = input.ctx().with_options().retention_seconds(); let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices { let table_pk = pk_column_indices diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index c0fce4a2d4780..ec525ddcc5951 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -2839,7 +2839,7 @@ impl fmt::Display for EmitMode { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum OnConflict { UpdateFull, From 218b95511f205bc370a168af579c782fa21d2ed5 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 23 Dec 2024 16:25:19 +0800 Subject: [PATCH 2/8] fix checks Signed-off-by: Bugen Zhao --- src/frontend/src/handler/create_table.rs | 12 ++++-------- src/frontend/src/optimizer/mod.rs | 7 ++----- .../src/optimizer/plan_node/stream_materialize.rs | 4 ++-- 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 0d1695016542e..2af8b6731677c 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -649,7 +649,7 @@ fn gen_table_plan_with_source( gen_table_plan_inner(context, schema_name, table_name, aaa, bbb) } -pub(crate) struct BBB { +pub struct BBB { pub append_only: bool, pub on_conflict: Option, pub with_version_column: Option, @@ -659,7 +659,7 @@ pub(crate) struct BBB { pub engine: Engine, } -pub(crate) struct AAA { +pub struct AAA { pub columns: Vec, pub pk_column_ids: Vec, pub row_id_index: Option, @@ -678,20 +678,16 @@ fn gen_table_plan_inner( ) -> Result<(PlanRef, PbTable)> { let AAA { ref columns, - ref pk_column_ids, row_id_index, - ref definition, ref watermark_descs, ref source_catalog, + .. } = aaa; let BBB { append_only, on_conflict, - ref with_version_column, - ref version, - ref webhook_info, - ref engine, + .. } = bbb; let (database_id, schema_id) = context diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 45b79119f9ffa..a39ba27d29493 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -51,14 +51,11 @@ pub use optimizer_context::*; use plan_expr_rewriter::ConstEvalRewriter; use property::Order; use risingwave_common::bail; -use risingwave_common::catalog::{ - ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Engine, Field, Schema, -}; +use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema}; use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_connector::sink::catalog::SinkFormatDesc; -use risingwave_pb::catalog::{PbWebhookSourceInfo, WatermarkDesc}; use risingwave_pb::stream_plan::StreamScanType; use self::heuristic_optimizer::ApplyOrder; @@ -73,7 +70,7 @@ use self::plan_visitor::InputRefValidator; use self::plan_visitor::{has_batch_exchange, CardinalityVisitor, StreamKeyChecker}; use self::property::{Cardinality, RequiredDist}; use self::rule::*; -use crate::catalog::table_catalog::{TableType, TableVersion}; +use crate::catalog::table_catalog::TableType; use crate::error::{ErrorCode, Result}; use crate::expr::TimestamptzExprFinder; use crate::handler::create_table::{AAA, BBB}; diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 5ffd7111388cd..e9560e9296761 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -82,7 +82,7 @@ impl StreamMaterialize { definition: String, table_type: TableType, cardinality: Cardinality, - retention_seconds: Option, + _retention_seconds: Option, // TODO ) -> Result { let input = Self::rewrite_input(input, user_distributed_by, table_type)?; // the hidden column name might refer some expr id @@ -141,7 +141,7 @@ impl StreamMaterialize { ) -> Result { let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?; - let mut table = Self::derive_table_catalog( + let table = Self::derive_table_catalog( input.clone(), name, user_order_by, From 2b3d06dbe4fdb7829c7837f69fa41c2057a5e5d1 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 23 Dec 2024 16:29:31 +0800 Subject: [PATCH 3/8] move definition Signed-off-by: Bugen Zhao --- src/frontend/src/handler/create_table.rs | 11 ++++------- src/frontend/src/handler/create_table_as.rs | 4 ++-- src/frontend/src/optimizer/mod.rs | 2 +- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 2af8b6731677c..f8033b8f9af92 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -556,7 +556,6 @@ pub(crate) fn gen_create_table_plan( source_watermarks: Vec, bbb: BBB, ) -> Result<(PlanRef, PbTable)> { - let definition = context.normalized_sql().to_owned(); let mut columns = bind_sql_columns(&column_defs)?; for c in &mut columns { c.column_desc.column_id = col_id_gen.generate(c.name()) @@ -573,7 +572,6 @@ pub(crate) fn gen_create_table_plan( columns, column_defs, constraints, - definition, source_watermarks, BBB { version: Some(col_id_gen.into_version()), @@ -589,7 +587,6 @@ pub(crate) fn gen_create_table_plan_without_source( columns: Vec, column_defs: Vec, constraints: Vec, - definition: String, source_watermarks: Vec, bbb: BBB, ) -> Result<(PlanRef, PbTable)> { @@ -621,7 +618,6 @@ pub(crate) fn gen_create_table_plan_without_source( columns, pk_column_ids, row_id_index, - definition, watermark_descs, source_catalog: None, }; @@ -641,7 +637,6 @@ fn gen_table_plan_with_source( columns: source_catalog.columns.clone(), pk_column_ids: source_catalog.pk_col_ids.clone(), row_id_index: source_catalog.row_id_index, - definition: source_catalog.definition.clone(), // TODO: ? watermark_descs: source_catalog.watermark_descs.clone(), source_catalog: Some(source_catalog), }; @@ -650,6 +645,7 @@ fn gen_table_plan_with_source( } pub struct BBB { + pub definition: String, pub append_only: bool, pub on_conflict: Option, pub with_version_column: Option, @@ -663,7 +659,6 @@ pub struct AAA { pub columns: Vec, pub pk_column_ids: Vec, pub row_id_index: Option, - pub definition: String, pub watermark_descs: Vec, pub source_catalog: Option, } @@ -873,11 +868,11 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( columns, pk_column_ids, row_id_index: None, - definition, watermark_descs: vec![], source_catalog: Some((*source).clone()), }, BBB { + definition, append_only: false, on_conflict, with_version_column, @@ -982,6 +977,7 @@ pub(super) async fn handle_create_table_plan( .transpose()?; let bbb = BBB { + definition: handler_args.normalized_sql.clone(), append_only, on_conflict, with_version_column: with_version_column.clone(), @@ -1739,6 +1735,7 @@ pub async fn generate_stream_graph_for_replace_table( use risingwave_pb::catalog::table::OptionalAssociatedSourceId; let bbb = BBB { + definition: handler_args.normalized_sql.clone(), append_only, on_conflict, with_version_column: with_version_column.clone(), diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 94efcf0bccbba..332b8eba4ae22 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -108,9 +108,9 @@ pub async fn handle_create_as( columns, vec![], vec![], - "".to_owned(), // TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS` - vec![], // No watermark should be defined in for `CREATE TABLE AS` + vec![], // No watermark should be defined in for `CREATE TABLE AS` BBB { + definition: "".to_owned(), /* TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS` */ append_only, on_conflict, with_version_column, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index a39ba27d29493..1d3c1f8dced73 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -645,11 +645,11 @@ impl PlanRoot { columns, pk_column_ids, row_id_index, - definition, watermark_descs, source_catalog, }: AAA, BBB { + definition, append_only, on_conflict, with_version_column, From 2a7b159b64f13479d5f2df5bfdadfa9a4be0d178 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 23 Dec 2024 16:53:14 +0800 Subject: [PATCH 4/8] move version Signed-off-by: Bugen Zhao --- src/frontend/src/handler/create_table.rs | 24 ++++++++++----------- src/frontend/src/handler/create_table_as.rs | 2 +- src/frontend/src/optimizer/mod.rs | 2 +- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index f8033b8f9af92..9d52689513a72 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -535,10 +535,8 @@ pub(crate) async fn gen_create_table_plan_with_source( context.into(), schema_name, source_catalog, - BBB { - version: Some(col_id_gen.into_version()), - ..bbb - }, + col_id_gen.into_version(), + bbb, )?; Ok((plan, Some(pb_source), table)) @@ -573,10 +571,8 @@ pub(crate) fn gen_create_table_plan( column_defs, constraints, source_watermarks, - BBB { - version: Some(col_id_gen.into_version()), - ..bbb - }, + Some(col_id_gen.into_version()), + bbb, ) } @@ -588,6 +584,7 @@ pub(crate) fn gen_create_table_plan_without_source( column_defs: Vec, constraints: Vec, source_watermarks: Vec, + version: Option, bbb: BBB, ) -> Result<(PlanRef, PbTable)> { // XXX: Why not bind outside? @@ -620,6 +617,7 @@ pub(crate) fn gen_create_table_plan_without_source( row_id_index, watermark_descs, source_catalog: None, + version, }; gen_table_plan_inner(context.into(), schema_name, table_name, aaa, bbb) @@ -629,6 +627,7 @@ fn gen_table_plan_with_source( context: OptimizerContextRef, schema_name: Option, source_catalog: SourceCatalog, + version: TableVersion, bbb: BBB, ) -> Result<(PlanRef, PbTable)> { let table_name = source_catalog.name.clone(); @@ -639,6 +638,7 @@ fn gen_table_plan_with_source( row_id_index: source_catalog.row_id_index, watermark_descs: source_catalog.watermark_descs.clone(), source_catalog: Some(source_catalog), + version: Some(version), }; gen_table_plan_inner(context, schema_name, table_name, aaa, bbb) @@ -649,8 +649,6 @@ pub struct BBB { pub append_only: bool, pub on_conflict: Option, pub with_version_column: Option, - pub version: Option, /* TODO: this should always be `Some` if we support `ALTER - * TABLE` for `CREATE TABLE AS`. */ pub webhook_info: Option, pub engine: Engine, } @@ -661,6 +659,8 @@ pub struct AAA { pub row_id_index: Option, pub watermark_descs: Vec, pub source_catalog: Option, + pub version: Option, /* TODO: this should always be `Some` if we support `ALTER + * TABLE` for `CREATE TABLE AS`. */ } #[allow(clippy::too_many_arguments)] @@ -870,13 +870,13 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( row_id_index: None, watermark_descs: vec![], source_catalog: Some((*source).clone()), + version: Some(col_id_gen.into_version()), }, BBB { definition, append_only: false, on_conflict, with_version_column, - version: Some(col_id_gen.into_version()), webhook_info: None, engine, }, @@ -981,7 +981,6 @@ pub(super) async fn handle_create_table_plan( append_only, on_conflict, with_version_column: with_version_column.clone(), - version: None, // placeholder webhook_info, engine, }; @@ -1739,7 +1738,6 @@ pub async fn generate_stream_graph_for_replace_table( append_only, on_conflict, with_version_column: with_version_column.clone(), - version: None, // placeholder webhook_info: original_catalog.webhook_info.clone(), engine, }; diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 332b8eba4ae22..0e7a2dffbc569 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -109,12 +109,12 @@ pub async fn handle_create_as( vec![], vec![], vec![], // No watermark should be defined in for `CREATE TABLE AS` + None, BBB { definition: "".to_owned(), /* TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS` */ append_only, on_conflict, with_version_column, - version: Some(col_id_gen.into_version()), webhook_info: None, engine, }, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 1d3c1f8dced73..2a22e825a899a 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -647,13 +647,13 @@ impl PlanRoot { row_id_index, watermark_descs, source_catalog, + version, }: AAA, BBB { definition, append_only, on_conflict, with_version_column, - version, webhook_info, engine, }: BBB, From 0c9ce7cd35fcc4ee8e0d53a66527a75bef542028 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 23 Dec 2024 17:14:43 +0800 Subject: [PATCH 5/8] rename & add docs Signed-off-by: Bugen Zhao --- src/frontend/src/handler/create_table.rs | 91 ++++++++++++--------- src/frontend/src/handler/create_table_as.rs | 4 +- src/frontend/src/optimizer/mod.rs | 10 +-- 3 files changed, 59 insertions(+), 46 deletions(-) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 9d52689513a72..4c790ce126e1b 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -480,9 +480,9 @@ pub(crate) async fn gen_create_table_plan_with_source( source_watermarks: Vec, mut col_id_gen: ColumnIdGenerator, include_column_options: IncludeOption, - bbb: BBB, + props: CreateTableProps, ) -> Result<(PlanRef, Option, PbTable)> { - if bbb.append_only + if props.append_only && format_encode.format != Format::Plain && format_encode.format != Format::Native { @@ -536,7 +536,7 @@ pub(crate) async fn gen_create_table_plan_with_source( schema_name, source_catalog, col_id_gen.into_version(), - bbb, + props, )?; Ok((plan, Some(pb_source), table)) @@ -552,7 +552,7 @@ pub(crate) fn gen_create_table_plan( constraints: Vec, mut col_id_gen: ColumnIdGenerator, source_watermarks: Vec, - bbb: BBB, + props: CreateTableProps, ) -> Result<(PlanRef, PbTable)> { let mut columns = bind_sql_columns(&column_defs)?; for c in &mut columns { @@ -572,7 +572,7 @@ pub(crate) fn gen_create_table_plan( constraints, source_watermarks, Some(col_id_gen.into_version()), - bbb, + props, ) } @@ -585,7 +585,7 @@ pub(crate) fn gen_create_table_plan_without_source( constraints: Vec, source_watermarks: Vec, version: Option, - bbb: BBB, + props: CreateTableProps, ) -> Result<(PlanRef, PbTable)> { // XXX: Why not bind outside? let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?; @@ -611,7 +611,7 @@ pub(crate) fn gen_create_table_plan_without_source( let db_name = session.database(); let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?; - let aaa = AAA { + let info = CreateTableInfo { columns, pk_column_ids, row_id_index, @@ -620,7 +620,7 @@ pub(crate) fn gen_create_table_plan_without_source( version, }; - gen_table_plan_inner(context.into(), schema_name, table_name, aaa, bbb) + gen_table_plan_inner(context.into(), schema_name, table_name, info, props) } fn gen_table_plan_with_source( @@ -628,11 +628,11 @@ fn gen_table_plan_with_source( schema_name: Option, source_catalog: SourceCatalog, version: TableVersion, - bbb: BBB, + props: CreateTableProps, ) -> Result<(PlanRef, PbTable)> { let table_name = source_catalog.name.clone(); - let aaa = AAA { + let info = CreateTableInfo { columns: source_catalog.columns.clone(), pk_column_ids: source_catalog.pk_col_ids.clone(), row_id_index: source_catalog.row_id_index, @@ -641,10 +641,27 @@ fn gen_table_plan_with_source( version: Some(version), }; - gen_table_plan_inner(context, schema_name, table_name, aaa, bbb) + gen_table_plan_inner(context, schema_name, table_name, info, props) } -pub struct BBB { +/// Arguments of the functions that generate a table plan, part 1. +/// +/// Compared to [`CreateTableProps`], this struct contains fields that need some work of binding +/// or resolving based on the user input. +pub struct CreateTableInfo { + pub columns: Vec, + pub pk_column_ids: Vec, + pub row_id_index: Option, + pub watermark_descs: Vec, + pub source_catalog: Option, + pub version: Option, // TODO: `CREATE TABLE AS` does not fill this field +} + +/// Arguments of the functions that generate a table plan, part 2. +/// +/// Compared to [`CreateTableInfo`], this struct contains fields that can be (relatively) simply +/// obtained from the input or the context. +pub struct CreateTableProps { pub definition: String, pub append_only: bool, pub on_conflict: Option, @@ -653,37 +670,26 @@ pub struct BBB { pub engine: Engine, } -pub struct AAA { - pub columns: Vec, - pub pk_column_ids: Vec, - pub row_id_index: Option, - pub watermark_descs: Vec, - pub source_catalog: Option, - pub version: Option, /* TODO: this should always be `Some` if we support `ALTER - * TABLE` for `CREATE TABLE AS`. */ -} - #[allow(clippy::too_many_arguments)] fn gen_table_plan_inner( context: OptimizerContextRef, schema_name: Option, table_name: String, - aaa: AAA, - bbb: BBB, + info: CreateTableInfo, + props: CreateTableProps, ) -> Result<(PlanRef, PbTable)> { - let AAA { + let CreateTableInfo { ref columns, row_id_index, ref watermark_descs, ref source_catalog, .. - } = aaa; - - let BBB { + } = info; + let CreateTableProps { append_only, on_conflict, .. - } = bbb; + } = props; let (database_id, schema_id) = context .session_ctx() @@ -742,8 +748,15 @@ fn gen_table_plan_inner( .into()); } - let materialize = - plan_root.gen_table_plan(context, table_name, aaa, BBB { on_conflict, ..bbb })?; + let materialize = plan_root.gen_table_plan( + context, + table_name, + info, + CreateTableProps { + on_conflict, + ..props + }, + )?; let mut table = materialize.table().to_prost(schema_id, database_id); @@ -864,7 +877,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( let materialize = plan_root.gen_table_plan( context, resolved_table_name, - AAA { + CreateTableInfo { columns, pk_column_ids, row_id_index: None, @@ -872,7 +885,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( source_catalog: Some((*source).clone()), version: Some(col_id_gen.into_version()), }, - BBB { + CreateTableProps { definition, append_only: false, on_conflict, @@ -976,7 +989,7 @@ pub(super) async fn handle_create_table_plan( .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info)) .transpose()?; - let bbb = BBB { + let props = CreateTableProps { definition: handler_args.normalized_sql.clone(), append_only, on_conflict, @@ -998,7 +1011,7 @@ pub(super) async fn handle_create_table_plan( source_watermarks, col_id_gen, include_column_options, - bbb, + props, ) .await?, TableJobType::General, @@ -1012,7 +1025,7 @@ pub(super) async fn handle_create_table_plan( constraints, col_id_gen, source_watermarks, - bbb, + props, )?; ((plan, None, table), TableJobType::General) @@ -1733,7 +1746,7 @@ pub async fn generate_stream_graph_for_replace_table( ) -> Result<(StreamFragmentGraph, Table, Option, TableJobType)> { use risingwave_pb::catalog::table::OptionalAssociatedSourceId; - let bbb = BBB { + let props = CreateTableProps { definition: handler_args.normalized_sql.clone(), append_only, on_conflict, @@ -1755,7 +1768,7 @@ pub async fn generate_stream_graph_for_replace_table( source_watermarks, col_id_gen, include_column_options, - bbb, + props, ) .await?, TableJobType::General, @@ -1769,7 +1782,7 @@ pub async fn generate_stream_graph_for_replace_table( constraints, col_id_gen, source_watermarks, - bbb, + props, )?; ((plan, None, table), TableJobType::General) } diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 0e7a2dffbc569..a5e0ef9b75492 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -21,7 +21,7 @@ use risingwave_sqlparser::ast::{ColumnDef, ObjectName, OnConflict, Query, Statem use super::{HandlerArgs, RwPgResponse}; use crate::binder::BoundStatement; use crate::error::{ErrorCode, Result}; -use crate::handler::create_table::{gen_create_table_plan_without_source, ColumnIdGenerator, BBB}; +use crate::handler::create_table::{gen_create_table_plan_without_source, ColumnIdGenerator, CreateTableProps}; use crate::handler::query::handle_query; use crate::{build_graph, Binder, OptimizerContext}; pub async fn handle_create_as( @@ -110,7 +110,7 @@ pub async fn handle_create_as( vec![], vec![], // No watermark should be defined in for `CREATE TABLE AS` None, - BBB { + CreateTableProps { definition: "".to_owned(), /* TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS` */ append_only, on_conflict, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 2a22e825a899a..787028a55ffea 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -73,7 +73,7 @@ use self::rule::*; use crate::catalog::table_catalog::TableType; use crate::error::{ErrorCode, Result}; use crate::expr::TimestamptzExprFinder; -use crate::handler::create_table::{AAA, BBB}; +use crate::handler::create_table::{CreateTableInfo, CreateTableProps}; use crate::optimizer::plan_node::generic::{SourceNodeKind, Union}; use crate::optimizer::plan_node::{ BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion, @@ -641,22 +641,22 @@ impl PlanRoot { mut self, context: OptimizerContextRef, table_name: String, - AAA { + CreateTableInfo { columns, pk_column_ids, row_id_index, watermark_descs, source_catalog, version, - }: AAA, - BBB { + }: CreateTableInfo, + CreateTableProps { definition, append_only, on_conflict, with_version_column, webhook_info, engine, - }: BBB, + }: CreateTableProps, ) -> Result { assert_eq!(self.phase, PlanPhase::Logical); assert_eq!(self.plan.convention(), Convention::Logical); From d0690b52bb6f9ee01402479a79535396fabfea58 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 23 Dec 2024 17:22:00 +0800 Subject: [PATCH 6/8] fix retention seconds Signed-off-by: Bugen Zhao --- src/frontend/src/optimizer/mod.rs | 3 +++ .../src/optimizer/plan_node/stream_materialize.rs | 8 ++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 787028a55ffea..230224557b63b 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -868,6 +868,8 @@ impl PlanRoot { ))? } + let retention_seconds = context.with_options().retention_seconds(); + let table_required_dist = { let mut bitset = FixedBitSet::with_capacity(columns.len()); for idx in &pk_column_indices { @@ -890,6 +892,7 @@ impl PlanRoot { pk_column_indices, row_id_index, version, + retention_seconds, webhook_info, engine, ) diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index e9560e9296761..174ff4656df37 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -82,7 +82,7 @@ impl StreamMaterialize { definition: String, table_type: TableType, cardinality: Cardinality, - _retention_seconds: Option, // TODO + retention_seconds: Option, ) -> Result { let input = Self::rewrite_input(input, user_distributed_by, table_type)?; // the hidden column name might refer some expr id @@ -97,6 +97,7 @@ impl StreamMaterialize { } else { CreateType::Foreground }; + let table = Self::derive_table_catalog( input.clone(), name, @@ -110,6 +111,7 @@ impl StreamMaterialize { table_type, None, cardinality, + retention_seconds, create_type, None, Engine::Hummock, @@ -136,6 +138,7 @@ impl StreamMaterialize { pk_column_indices: Vec, row_id_index: Option, version: Option, + retention_seconds: Option, webhook_info: Option, engine: Engine, ) -> Result { @@ -154,6 +157,7 @@ impl StreamMaterialize { TableType::Table, version, Cardinality::unknown(), // unknown cardinality for tables + retention_seconds, CreateType::Foreground, webhook_info, engine, @@ -227,6 +231,7 @@ impl StreamMaterialize { table_type: TableType, version: Option, cardinality: Cardinality, + retention_seconds: Option, create_type: CreateType, webhook_info: Option, engine: Engine, @@ -237,7 +242,6 @@ impl StreamMaterialize { let distribution_key = input.distribution().dist_column_indices().to_vec(); let append_only = input.append_only(); let watermark_columns = input.watermark_columns().clone(); - let retention_seconds = input.ctx().with_options().retention_seconds(); let (table_pk, stream_key) = if let Some(pk_column_indices) = pk_column_indices { let table_pk = pk_column_indices From ba1781ff3692959618b56944bac0689c7ff9c9fc Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 23 Dec 2024 17:22:39 +0800 Subject: [PATCH 7/8] fmt Signed-off-by: Bugen Zhao --- src/frontend/src/handler/create_table_as.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index a5e0ef9b75492..b24d0e0142fc5 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -21,7 +21,9 @@ use risingwave_sqlparser::ast::{ColumnDef, ObjectName, OnConflict, Query, Statem use super::{HandlerArgs, RwPgResponse}; use crate::binder::BoundStatement; use crate::error::{ErrorCode, Result}; -use crate::handler::create_table::{gen_create_table_plan_without_source, ColumnIdGenerator, CreateTableProps}; +use crate::handler::create_table::{ + gen_create_table_plan_without_source, ColumnIdGenerator, CreateTableProps, +}; use crate::handler::query::handle_query; use crate::{build_graph, Binder, OptimizerContext}; pub async fn handle_create_as( From 85de6b4be444ad32264003bfa78edd264cbd0e11 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 23 Dec 2024 17:51:08 +0800 Subject: [PATCH 8/8] version seems to be always present Signed-off-by: Bugen Zhao --- src/frontend/src/handler/create_table.rs | 10 +++++----- src/frontend/src/handler/create_table_as.rs | 4 ++-- .../src/optimizer/plan_node/stream_materialize.rs | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 4c790ce126e1b..4ef5fb26ed35b 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -571,7 +571,7 @@ pub(crate) fn gen_create_table_plan( column_defs, constraints, source_watermarks, - Some(col_id_gen.into_version()), + col_id_gen.into_version(), props, ) } @@ -584,7 +584,7 @@ pub(crate) fn gen_create_table_plan_without_source( column_defs: Vec, constraints: Vec, source_watermarks: Vec, - version: Option, + version: TableVersion, props: CreateTableProps, ) -> Result<(PlanRef, PbTable)> { // XXX: Why not bind outside? @@ -638,7 +638,7 @@ fn gen_table_plan_with_source( row_id_index: source_catalog.row_id_index, watermark_descs: source_catalog.watermark_descs.clone(), source_catalog: Some(source_catalog), - version: Some(version), + version, }; gen_table_plan_inner(context, schema_name, table_name, info, props) @@ -654,7 +654,7 @@ pub struct CreateTableInfo { pub row_id_index: Option, pub watermark_descs: Vec, pub source_catalog: Option, - pub version: Option, // TODO: `CREATE TABLE AS` does not fill this field + pub version: TableVersion, } /// Arguments of the functions that generate a table plan, part 2. @@ -883,7 +883,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( row_id_index: None, watermark_descs: vec![], source_catalog: Some((*source).clone()), - version: Some(col_id_gen.into_version()), + version: col_id_gen.into_version(), }, CreateTableProps { definition, diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index b24d0e0142fc5..f47334f0000fb 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -111,9 +111,9 @@ pub async fn handle_create_as( vec![], vec![], vec![], // No watermark should be defined in for `CREATE TABLE AS` - None, + col_id_gen.into_version(), CreateTableProps { - definition: "".to_owned(), /* TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS` */ + definition: "".to_owned(), // TODO: empty definition means no schema change support append_only, on_conflict, with_version_column, diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 174ff4656df37..6e4339658966b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -137,7 +137,7 @@ impl StreamMaterialize { version_column_index: Option, pk_column_indices: Vec, row_id_index: Option, - version: Option, + version: TableVersion, retention_seconds: Option, webhook_info: Option, engine: Engine, @@ -155,7 +155,7 @@ impl StreamMaterialize { Some(pk_column_indices), row_id_index, TableType::Table, - version, + Some(version), Cardinality::unknown(), // unknown cardinality for tables retention_seconds, CreateType::Foreground,