diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 3f5b532ccfda0..0b8f302c278b5 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -413,6 +413,10 @@ impl ColumnCatalog { !self.is_generated() && !self.is_rw_timestamp_column() } + pub fn can_drop(&self) -> bool { + !(self.is_hidden() || self.is_rw_sys_column()) + } + /// If the column is a generated column pub fn generated_expr(&self) -> Option<&ExprNode> { if let Some(GeneratedOrDefaultColumn::GeneratedColumn(desc)) = diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 04cc9c8f8defa..a611da6e2f5d5 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use anyhow::{anyhow, Context}; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::catalog::{ColumnCatalog, Engine}; +use risingwave_common::catalog::{ColumnCatalog, Engine, ROW_ID_COLUMN_ID}; use risingwave_common::hash::VnodeCount; use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -29,23 +29,27 @@ use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph}; use risingwave_sqlparser::ast::{ - AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Ident, ObjectName, - Statement, StructField, TableConstraint, + AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, ExplainOptions, Ident, + ObjectName, Statement, StructField, TableConstraint, }; use risingwave_sqlparser::parser::Parser; -use super::create_source::schema_has_schema_registry; -use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator}; +use super::create_table::{ + bind_sql_columns, bind_sql_columns_generated_and_default_constraints, bind_sql_pk_names, + gen_table_plan_inner, generate_stream_graph_for_replace_table, ColumnIdGenerator, + CreateTableInfo, CreateTableProps, +}; use super::util::SourceSchemaCompatExt; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; +use crate::catalog::source_catalog::SourceCatalog; use crate::catalog::table_catalog::TableType; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{Expr, ExprImpl, InputRef, Literal}; use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project}; use crate::handler::create_table::bind_table_constraints; use crate::session::SessionImpl; -use crate::{Binder, TableCatalog}; +use crate::{build_graph, Binder, OptimizerContext, TableCatalog}; /// Used in auto schema change process pub async fn get_new_table_definition_for_cdc_table( @@ -318,54 +322,17 @@ pub async fn handle_alter_table_column( operation: AlterTableOperation, ) -> Result { let session = handler_args.session; - let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?; + let (original_table, _associated_source) = + fetch_table_source_for_alter(session.as_ref(), &table_name)?; - if !original_catalog.incoming_sinks.is_empty() && original_catalog.has_generated_column() { + if !original_table.incoming_sinks.is_empty() && original_table.has_generated_column() { return Err(RwError::from(ErrorCode::BindError( "Alter a table with incoming sink and generated column has not been implemented." .to_owned(), ))); } - // Retrieve the original table definition and parse it to AST. - let [mut definition]: [_; 1] = Parser::parse_sql(&original_catalog.definition) - .context("unable to parse original table definition")? - .try_into() - .unwrap(); - let Statement::CreateTable { - columns, - format_encode, - .. - } = &mut definition - else { - panic!("unexpected statement: {:?}", definition); - }; - - let format_encode = format_encode - .clone() - .map(|format_encode| format_encode.into_v2_with_warning()); - - let fail_if_has_schema_registry = || { - if let Some(format_encode) = &format_encode - && schema_has_schema_registry(format_encode) - { - Err(ErrorCode::NotSupported( - "alter table with schema registry".to_owned(), - "try `ALTER TABLE .. FORMAT .. ENCODE .. (...)` instead".to_owned(), - )) - } else { - Ok(()) - } - }; - - if columns.is_empty() { - Err(ErrorCode::NotSupported( - "alter a table with empty column definitions".to_owned(), - "Please recreate the table with column definitions.".to_owned(), - ))? - } - - if !original_catalog.incoming_sinks.is_empty() + if !original_table.incoming_sinks.is_empty() && matches!(operation, AlterTableOperation::DropColumn { .. }) { return Err(ErrorCode::InvalidInputSyntax( @@ -373,27 +340,34 @@ pub async fn handle_alter_table_column( ))?; } + let fail_if_schema_is_inferred = || { + // TODO: check if the schema is inferred from `associated_source`. + Result::Ok(()) + }; + + let mut column_catalogs = original_table.columns().to_vec(); + let pk_column_ids = original_table.pk_column_ids(); + match operation { AlterTableOperation::AddColumn { - column_def: new_column, + column_def: added_column, } => { - fail_if_has_schema_registry()?; + // If the schema is inferred, do not allow adding columns. + fail_if_schema_is_inferred()?; // Duplicated names can actually be checked by `StreamMaterialize`. We do here for // better error reporting. - let new_column_name = new_column.name.real_value(); - if columns + let added_column_name = added_column.name.real_value(); + if column_catalogs .iter() - .any(|c| c.name.real_value() == new_column_name) + .any(|c| c.name() == added_column_name) { Err(ErrorCode::InvalidInputSyntax(format!( - "column \"{new_column_name}\" of table \"{table_name}\" already exists" + "column \"{added_column_name}\" of table \"{table_name}\" already exists" )))? } - if new_column - .options - .iter() + if (added_column.options.iter()) .any(|x| matches!(x.option, ColumnOption::GeneratedColumns(_))) { Err(ErrorCode::InvalidInputSyntax( @@ -401,8 +375,25 @@ pub async fn handle_alter_table_column( ))? } - // Add the new column to the table definition if it is not created by `create table (*)` syntax. - columns.push(new_column); + let added_column = vec![added_column]; + + let added_pk = !bind_sql_pk_names(&added_column, Vec::new())?.is_empty(); + if added_pk { + Err(ErrorCode::InvalidInputSyntax( + "cannot add a primary key column".to_owned(), + ))? + } + + let mut added_column_catalog = bind_sql_columns(&added_column)?; + + bind_sql_columns_generated_and_default_constraints( + &session, + table_name.real_value(), + &mut added_column_catalog, + added_column, + )?; + + column_catalogs.extend(added_column_catalog); } AlterTableOperation::DropColumn { @@ -415,16 +406,12 @@ pub async fn handle_alter_table_column( } // Check if the column to drop is referenced by any generated columns. - for column in original_catalog.columns() { - if column_name.real_value() == column.name() && !column.is_generated() { - fail_if_has_schema_registry()?; - } - + for column in original_table.columns() { if let Some(expr) = column.generated_expr() { let expr = ExprImpl::from_expr_proto(expr)?; - let refs = expr.collect_input_refs(original_catalog.columns().len()); + let refs = expr.collect_input_refs(original_table.columns().len()); for idx in refs.ones() { - let refed_column = &original_catalog.columns()[idx]; + let refed_column = &original_table.columns()[idx]; if refed_column.name() == column_name.real_value() { bail!(format!( "failed to drop column \"{}\" because it's referenced by a generated column \"{}\"", @@ -438,34 +425,51 @@ pub async fn handle_alter_table_column( // Locate the column by name and remove it. let column_name = column_name.real_value(); - let removed_column = columns - .extract_if(|c| c.name.real_value() == column_name) + let removed_column = column_catalogs + .extract_if(|c| c.name() == column_name) .at_most_one() .ok() - .unwrap(); - - if removed_column.is_some() { - // PASS - } else if if_exists { - return Ok(PgResponse::builder(StatementType::ALTER_TABLE) - .notice(format!( - "column \"{}\" does not exist, skipping", - column_name - )) - .into()); - } else { - Err(ErrorCode::InvalidInputSyntax(format!( - "column \"{}\" of table \"{}\" does not exist", - column_name, table_name - )))? + .expect("should not be multiple columns with the same name"); + + let Some(removed_column) = removed_column else { + if if_exists { + return Ok(PgResponse::builder(StatementType::ALTER_TABLE) + .notice(format!( + "column \"{}\" does not exist, skipping", + column_name + )) + .into()); + } else { + Err(ErrorCode::InvalidInputSyntax(format!( + "column \"{}\" of table \"{}\" does not exist", + column_name, table_name + )))? + } + }; + + if !removed_column.can_drop() { + bail!("cannot drop a hidden or system column"); + } + + // If the schema is inferred, only allow dropping generated columns. + if !removed_column.is_generated() { + fail_if_schema_is_inferred()?; + } + + if pk_column_ids.contains(&removed_column.column_id()) { + Err(ErrorCode::InvalidInputSyntax( + "cannot drop a primary key column".to_owned(), + ))? } } _ => unreachable!(), }; + column_catalogs.retain(|c| !c.is_rw_timestamp_column()); + let (source, table, graph, col_index_mapping, job_type) = - get_replace_table_plan(&session, table_name, definition, &original_catalog, None).await?; + get_replace_table_plan_2(&session, table_name, column_catalogs, &original_table).await?; let catalog_writer = session.catalog_writer()?; @@ -475,10 +479,121 @@ pub async fn handle_alter_table_column( Ok(PgResponse::empty_result(StatementType::ALTER_TABLE)) } +pub async fn get_replace_table_plan_2( + session: &Arc, + table_name: ObjectName, + mut new_columns: Vec, + old_catalog: &Arc, +) -> Result<( + Option, + Table, + StreamFragmentGraph, + ColIndexMapping, + TableJobType, +)> { + // Create handler args as if we're creating a new table with the altered definition. + let handler_args = HandlerArgs::new(session.clone(), &Statement::Abort, Arc::from(""))?; + let mut col_id_gen = ColumnIdGenerator::new_alter(old_catalog); + + for new_column in &mut new_columns { + new_column.column_desc.column_id = col_id_gen.generate(&*new_column)?; + } + + let context = OptimizerContext::new(handler_args, ExplainOptions::default()); + + let db_name = session.database(); + let (schema_name, name) = Binder::resolve_schema_qualified_name(db_name, table_name)?; + + let row_id_index = new_columns + .iter() + .position(|c| c.column_id() == ROW_ID_COLUMN_ID); + + if old_catalog.has_associated_source() || old_catalog.cdc_table_id.is_some() { + bail_not_implemented!("new replace table not supported for table with source"); + } + + let props = CreateTableProps { + definition: "".to_owned(), // TODO: no more definition! + append_only: old_catalog.append_only, + on_conflict: old_catalog.conflict_behavior.into(), + with_version_column: old_catalog + .version_column_index + .map(|i| old_catalog.columns()[i].name().to_owned()), + webhook_info: old_catalog.webhook_info.clone(), + engine: old_catalog.engine, + }; + + let info = CreateTableInfo { + columns: new_columns, + pk_column_ids: old_catalog.pk_column_ids(), + row_id_index, + watermark_descs: vec![], // TODO: this is not persisted in the catalog! + source_catalog: None, + version: col_id_gen.into_version(), + }; + + let (plan, table) = gen_table_plan_inner(context.into(), schema_name, name, info, props)?; + + let mut graph = build_graph(plan)?; + + // Fill the original table ID. + let table = Table { + id: old_catalog.id().table_id(), + ..table + }; + + // Calculate the mapping from the original columns to the new columns. + let col_index_mapping = ColIndexMapping::new( + old_catalog + .columns() + .iter() + .map(|old_c| { + table.columns.iter().position(|new_c| { + new_c.get_column_desc().unwrap().column_id == old_c.column_id().get_id() + }) + }) + .collect(), + table.columns.len(), + ); + + let incoming_sink_ids: HashSet<_> = old_catalog.incoming_sinks.iter().copied().collect(); + + let target_columns = table + .columns + .iter() + .map(|col| ColumnCatalog::from(col.clone())) + .filter(|col| !col.is_rw_timestamp_column()) + .collect_vec(); + + for sink in fetch_incoming_sinks(&session, &incoming_sink_ids)? { + hijack_merger_for_target_table( + &mut graph, + &target_columns, + &sink, + Some(&sink.unique_identity()), + )?; + } + + // Set some fields ourselves so that the meta service does not need to maintain them. + let mut table = table; + table.incoming_sinks = incoming_sink_ids.iter().copied().collect(); + table.maybe_vnode_count = VnodeCount::set(old_catalog.vnode_count()).to_protobuf(); + + Ok((None, table, graph, col_index_mapping, TableJobType::General)) +} + pub fn fetch_table_catalog_for_alter( session: &SessionImpl, table_name: &ObjectName, ) -> Result> { + let (table, _source) = fetch_table_source_for_alter(session, table_name)?; + Ok(table) +} + +pub fn fetch_table_source_for_alter( + session: &SessionImpl, + table_name: &ObjectName, +) -> Result<(Arc, Option>)> { let db_name = session.database(); let (schema_name, real_table_name) = Binder::resolve_schema_qualified_name(db_name, table_name.clone())?; @@ -487,8 +602,9 @@ pub fn fetch_table_catalog_for_alter( let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); - let original_catalog = { - let reader = session.env().catalog_reader().read_guard(); + let reader = session.env().catalog_reader().read_guard(); + + let table = { let (table, schema_name) = reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?; @@ -505,7 +621,24 @@ pub fn fetch_table_catalog_for_alter( table.clone() }; - Ok(original_catalog) + let source = if let Some(associated_source_id) = table.associated_source_id() { + let (source, schema_name) = + reader.get_source_by_name(db_name, schema_path, &real_table_name)?; + + assert_eq!( + source.id, + associated_source_id.table_id(), + "associated source id mismatches" + ); + + session.check_privilege_for_drop_alter(schema_name, &**source)?; + + Some(source.clone()) + } else { + None + }; + + Ok((table, source)) } #[cfg(test)] diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index a9a0d3964a3b5..a9aa0ba9720f5 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -728,7 +728,7 @@ pub struct CreateTableProps { } #[allow(clippy::too_many_arguments)] -fn gen_table_plan_inner( +pub fn gen_table_plan_inner( context: OptimizerContextRef, schema_name: Option, table_name: String,