Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(frontend): generate replace plan without relying on definition #19866

Draft
wants to merge 2 commits into
base: bz/extract-create-table-args-3
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ impl ColumnCatalog {
!self.is_generated() && !self.is_rw_timestamp_column()
}

pub fn can_drop(&self) -> bool {
!(self.is_hidden() || self.is_rw_sys_column())
}

/// If the column is a generated column
pub fn generated_expr(&self) -> Option<&ExprNode> {
if let Some(GeneratedOrDefaultColumn::GeneratedColumn(desc)) =
Expand Down
243 changes: 189 additions & 54 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Context};
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{ColumnCatalog, Engine};
use risingwave_common::catalog::{ColumnCatalog, Engine, ROW_ID_COLUMN_ID};
use risingwave_common::hash::VnodeCount;
use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
Expand All @@ -29,13 +29,18 @@ use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Ident, ObjectName,
Statement, StructField, TableConstraint,
AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, ExplainOptions, Ident,
ObjectName, Statement, StructField, TableConstraint,
};
use risingwave_sqlparser::parser::Parser;

use super::create_source::schema_has_schema_registry;
use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator};
use super::create_table::{
bind_pk_and_row_id_on_relation, bind_sql_columns,
bind_sql_columns_generated_and_default_constraints, gen_create_table_plan,
gen_create_table_plan_without_source, gen_table_plan_inner,
generate_stream_graph_for_replace_table, ColumnIdGenerator, CreateTableInfo, CreateTableProps,
};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
Expand All @@ -45,7 +50,7 @@ use crate::expr::{Expr, ExprImpl, InputRef, Literal};
use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
use crate::handler::create_table::bind_table_constraints;
use crate::session::SessionImpl;
use crate::{Binder, TableCatalog};
use crate::{build_graph, Binder, OptimizerContext, TableCatalog};

/// Used in auto schema change process
pub async fn get_new_table_definition_for_cdc_table(
Expand Down Expand Up @@ -328,42 +333,43 @@ pub async fn handle_alter_table_column(
}

// Retrieve the original table definition and parse it to AST.
let [mut definition]: [_; 1] = Parser::parse_sql(&original_catalog.definition)
.context("unable to parse original table definition")?
.try_into()
.unwrap();
let Statement::CreateTable {
columns,
format_encode,
..
} = &mut definition
else {
panic!("unexpected statement: {:?}", definition);
};

let format_encode = format_encode
.clone()
.map(|format_encode| format_encode.into_v2_with_warning());

let fail_if_has_schema_registry = || {
if let Some(format_encode) = &format_encode
&& schema_has_schema_registry(format_encode)
{
Err(ErrorCode::NotSupported(
"alter table with schema registry".to_owned(),
"try `ALTER TABLE .. FORMAT .. ENCODE .. (...)` instead".to_owned(),
))
} else {
Ok(())
}
};

if columns.is_empty() {
Err(ErrorCode::NotSupported(
"alter a table with empty column definitions".to_owned(),
"Please recreate the table with column definitions.".to_owned(),
))?
}
// let [mut definition]: [_; 1] = Parser::parse_sql(&original_catalog.definition)
// .context("unable to parse original table definition")?
// .try_into()
// .unwrap();

// let Statement::CreateTable {
// columns: _,
// format_encode,
// ..
// } = &mut definition
// else {
// panic!("unexpected statement: {:?}", definition);
// };

// let format_encode = format_encode
// .clone()
// .map(|format_encode| format_encode.into_v2_with_warning());

// let fail_if_has_schema_registry = || {
// if let Some(format_encode) = &format_encode
// && schema_has_schema_registry(format_encode)
// {
// Err(ErrorCode::NotSupported(
// "alter table with schema registry".to_owned(),
// "try `ALTER TABLE .. FORMAT .. ENCODE .. (...)` instead".to_owned(),
// ))
// } else {
// Ok(())
// }
// };

// if columns.is_empty() {
// Err(ErrorCode::NotSupported(
// "alter a table with empty column definitions".to_owned(),
// "Please recreate the table with column definitions.".to_owned(),
// ))?
// }

if !original_catalog.incoming_sinks.is_empty()
&& matches!(operation, AlterTableOperation::DropColumn { .. })
Expand All @@ -373,19 +379,19 @@ pub async fn handle_alter_table_column(
))?;
}

let mut column_catalogs = original_catalog.columns().to_vec();
let pk_column_ids = original_catalog.pk_column_ids();

match operation {
AlterTableOperation::AddColumn {
column_def: new_column,
} => {
fail_if_has_schema_registry()?;
// fail_if_has_schema_registry()?;

// Duplicated names can actually be checked by `StreamMaterialize`. We do here for
// better error reporting.
let new_column_name = new_column.name.real_value();
if columns
.iter()
.any(|c| c.name.real_value() == new_column_name)
{
if column_catalogs.iter().any(|c| c.name() == new_column_name) {
Err(ErrorCode::InvalidInputSyntax(format!(
"column \"{new_column_name}\" of table \"{table_name}\" already exists"
)))?
Expand All @@ -401,8 +407,18 @@ pub async fn handle_alter_table_column(
))?
}

// Add the new column to the table definition if it is not created by `create table (*)` syntax.
columns.push(new_column);
let new_column = vec![new_column];

let mut new_column_catalog = bind_sql_columns(&new_column)?;

bind_sql_columns_generated_and_default_constraints(
&session,
table_name.real_value(),
&mut new_column_catalog, // no ref to existing columns
new_column,
)?;

column_catalogs.extend(new_column_catalog);
}

AlterTableOperation::DropColumn {
Expand All @@ -417,7 +433,7 @@ pub async fn handle_alter_table_column(
// Check if the column to drop is referenced by any generated columns.
for column in original_catalog.columns() {
if column_name.real_value() == column.name() && !column.is_generated() {
fail_if_has_schema_registry()?;
// fail_if_has_schema_registry()?;
}

if let Some(expr) = column.generated_expr() {
Expand All @@ -438,13 +454,16 @@ pub async fn handle_alter_table_column(

// Locate the column by name and remove it.
let column_name = column_name.real_value();
let removed_column = columns
.extract_if(|c| c.name.real_value() == column_name)
let removed_column = column_catalogs
.extract_if(|c| c.name() == column_name)
.at_most_one()
.ok()
.unwrap();

if removed_column.is_some() {
if let Some(removed_column) = removed_column {
if !removed_column.can_drop() {
bail!("cannot drop");
}
// PASS
} else if if_exists {
return Ok(PgResponse::builder(StatementType::ALTER_TABLE)
Expand All @@ -464,8 +483,18 @@ pub async fn handle_alter_table_column(
_ => unreachable!(),
};

let (source, table, graph, col_index_mapping, job_type) =
get_replace_table_plan(&session, table_name, definition, &original_catalog, None).await?;
column_catalogs.retain(|c| !c.is_rw_timestamp_column());

// TODO: Check pk not changed.

let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan_2(
&session,
table_name,
column_catalogs,
&original_catalog,
None,
)
.await?;

let catalog_writer = session.catalog_writer()?;

Expand All @@ -475,6 +504,112 @@ pub async fn handle_alter_table_column(
Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
}

pub async fn get_replace_table_plan_2(
session: &Arc<SessionImpl>,
table_name: ObjectName,
mut new_columns: Vec<ColumnCatalog>,
old_catalog: &Arc<TableCatalog>,
new_version_columns: Option<Vec<ColumnCatalog>>, // only provided in auto schema change
) -> Result<(
Option<Source>,
Table,
StreamFragmentGraph,
ColIndexMapping,
TableJobType,
)> {
// Create handler args as if we're creating a new table with the altered definition.
let handler_args = HandlerArgs::new(session.clone(), &Statement::Abort, Arc::from(""))?;
let mut col_id_gen = ColumnIdGenerator::new_alter(old_catalog);

for new_column in &mut new_columns {
new_column.column_desc.column_id = col_id_gen.generate(&*new_column)?;
}

let context = OptimizerContext::new(handler_args, ExplainOptions::default());

let db_name = session.database();
let (schema_name, name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
let (database_id, schema_id) =
session.get_database_and_schema_id_for_create(schema_name.clone())?;

let row_id_index = new_columns
.iter()
.position(|c| c.column_id() == ROW_ID_COLUMN_ID);

if old_catalog.has_associated_source() || old_catalog.cdc_table_id.is_some() {
bail_not_implemented!("new replace table not supported for table with source");
}

let props = CreateTableProps {
definition: "".to_owned(), // TODO: no more definition!
append_only: old_catalog.append_only,
on_conflict: old_catalog.conflict_behavior.into(),
with_version_column: old_catalog
.version_column_index
.map(|i| old_catalog.columns()[i].name().to_owned()),
webhook_info: old_catalog.webhook_info.clone(),
engine: old_catalog.engine,
};

let info = CreateTableInfo {
columns: new_columns,
pk_column_ids: old_catalog.pk_column_ids(),
row_id_index,
watermark_descs: vec![], // TODO: this is not persisted in the catalog!
source_catalog: None,
version: col_id_gen.into_version(),
};

let (plan, table) = gen_table_plan_inner(context.into(), schema_name, name, info, props)?;

let mut graph = build_graph(plan)?;

// Fill the original table ID.
let table = Table {
id: old_catalog.id().table_id(),
..table
};

// Calculate the mapping from the original columns to the new columns.
let col_index_mapping = ColIndexMapping::new(
old_catalog
.columns()
.iter()
.map(|old_c| {
table.columns.iter().position(|new_c| {
new_c.get_column_desc().unwrap().column_id == old_c.column_id().get_id()
})
})
.collect(),
table.columns.len(),
);

let incoming_sink_ids: HashSet<_> = old_catalog.incoming_sinks.iter().copied().collect();

let target_columns = table
.columns
.iter()
.map(|col| ColumnCatalog::from(col.clone()))
.filter(|col| !col.is_rw_timestamp_column())
.collect_vec();

for sink in fetch_incoming_sinks(&session, &incoming_sink_ids)? {
hijack_merger_for_target_table(
&mut graph,
&target_columns,
&sink,
Some(&sink.unique_identity()),
)?;
}

// Set some fields ourselves so that the meta service does not need to maintain them.
let mut table = table;
table.incoming_sinks = incoming_sink_ids.iter().copied().collect();
table.maybe_vnode_count = VnodeCount::set(old_catalog.vnode_count()).to_protobuf();

Ok((None, table, graph, col_index_mapping, TableJobType::General))
}

pub fn fetch_table_catalog_for_alter(
session: &SessionImpl,
table_name: &ObjectName,
Expand Down
Loading
Loading