Skip to content

Commit

Permalink
refactor(frontend): merge steps for resolving on-conflict behavior in…
Browse files Browse the repository at this point in the history
… create table handler (#19916)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Dec 26, 2024
1 parent ad95d10 commit 62818d3
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 48 deletions.
102 changes: 67 additions & 35 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{
CdcTableDesc, ColumnCatalog, ColumnDesc, Engine, FieldLike, TableId, TableVersionId,
DEFAULT_SCHEMA_NAME, INITIAL_TABLE_VERSION_ID, RISINGWAVE_ICEBERG_ROW_ID, ROWID_PREFIX,
CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, Engine, FieldLike, TableId,
TableVersionId, DEFAULT_SCHEMA_NAME, INITIAL_TABLE_VERSION_ID, RISINGWAVE_ICEBERG_ROW_ID,
ROWID_PREFIX,
};
use risingwave_common::config::MetaBackend;
use risingwave_common::license::Feature;
Expand Down Expand Up @@ -667,6 +668,64 @@ fn gen_table_plan_with_source(
gen_table_plan_inner(context, schema_name, table_name, info, props)
}

/// On-conflict behavior either from user input or existing table catalog.
#[derive(Clone, Copy)]
pub enum EitherOnConflict {
Ast(Option<OnConflict>),
Resolved(ConflictBehavior),
}

impl From<Option<OnConflict>> for EitherOnConflict {
fn from(v: Option<OnConflict>) -> Self {
Self::Ast(v)
}
}

impl From<ConflictBehavior> for EitherOnConflict {
fn from(v: ConflictBehavior) -> Self {
Self::Resolved(v)
}
}

impl EitherOnConflict {
/// Resolves the conflict behavior based on the given information.
pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
let conflict_behavior = match self {
EitherOnConflict::Ast(on_conflict) => {
if append_only {
if row_id_as_pk {
// Primary key will be generated, no conflict check needed.
ConflictBehavior::NoCheck
} else {
// User defined PK on append-only table, enforce `DO NOTHING`.
if let Some(on_conflict) = on_conflict
&& on_conflict != OnConflict::Nothing
{
return Err(ErrorCode::InvalidInputSyntax(
"When PRIMARY KEY constraint applied to an APPEND ONLY table, \
the ON CONFLICT behavior must be DO NOTHING."
.to_owned(),
)
.into());
}
ConflictBehavior::IgnoreConflict
}
} else {
// Default to `UPDATE FULL` for non-append-only tables.
match on_conflict.unwrap_or(OnConflict::UpdateFull) {
OnConflict::UpdateFull => ConflictBehavior::Overwrite,
OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
}
}
}
EitherOnConflict::Resolved(b) => b,
};

Ok(conflict_behavior)
}
}

/// Arguments of the functions that generate a table plan, part 1.
///
/// Compared to [`CreateTableProps`], this struct contains fields that need some work of binding
Expand All @@ -687,7 +746,7 @@ pub struct CreateTableInfo {
pub struct CreateTableProps {
pub definition: String,
pub append_only: bool,
pub on_conflict: Option<OnConflict>,
pub on_conflict: EitherOnConflict,
pub with_version_column: Option<String>,
pub webhook_info: Option<PbWebhookSourceInfo>,
pub engine: Engine,
Expand All @@ -708,11 +767,7 @@ fn gen_table_plan_inner(
ref source_catalog,
..
} = info;
let CreateTableProps {
append_only,
on_conflict,
..
} = props;
let CreateTableProps { append_only, .. } = props;

let (database_id, schema_id) = context
.session_ctx()
Expand Down Expand Up @@ -740,21 +795,6 @@ fn gen_table_plan_inner(
vec![],
);

let pk_on_append_only = append_only && row_id_index.is_none();

let on_conflict = if pk_on_append_only {
let on_conflict = on_conflict.unwrap_or(OnConflict::Nothing);
if on_conflict != OnConflict::Nothing {
return Err(ErrorCode::InvalidInputSyntax(
"When PRIMARY KEY constraint applied to an APPEND ONLY table, the ON CONFLICT behavior must be DO NOTHING.".to_owned(),
)
.into());
}
Some(on_conflict)
} else {
on_conflict
};

if !append_only && !watermark_descs.is_empty() {
return Err(ErrorCode::NotSupported(
"Defining watermarks on table requires the table to be append only.".to_owned(),
Expand All @@ -771,15 +811,7 @@ fn gen_table_plan_inner(
.into());
}

let materialize = plan_root.gen_table_plan(
context,
table_name,
info,
CreateTableProps {
on_conflict,
..props
},
)?;
let materialize = plan_root.gen_table_plan(context, table_name, info, props)?;

let mut table = materialize.table().to_prost(schema_id, database_id);

Expand Down Expand Up @@ -911,7 +943,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table(
CreateTableProps {
definition,
append_only: false,
on_conflict,
on_conflict: on_conflict.into(),
with_version_column,
webhook_info: None,
engine,
Expand Down Expand Up @@ -1015,7 +1047,7 @@ pub(super) async fn handle_create_table_plan(
let props = CreateTableProps {
definition: handler_args.normalized_sql.clone(),
append_only,
on_conflict,
on_conflict: on_conflict.into(),
with_version_column: with_version_column.clone(),
webhook_info,
engine,
Expand Down Expand Up @@ -1772,7 +1804,7 @@ pub async fn generate_stream_graph_for_replace_table(
let props = CreateTableProps {
definition: handler_args.normalized_sql.clone(),
append_only,
on_conflict,
on_conflict: on_conflict.into(),
with_version_column: with_version_column.clone(),
webhook_info: original_catalog.webhook_info.clone(),
engine,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_table_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub async fn handle_create_as(
CreateTableProps {
definition: "".to_owned(), // TODO: empty definition means no schema change support
append_only,
on_conflict,
on_conflict: on_conflict.into(),
with_version_column,
webhook_info: None,
engine,
Expand Down
13 changes: 1 addition & 12 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pub use plan_visitor::{
ExecutionModeDecider, PlanVisitor, ReadStorageTableVisitor, RelationCollectorVisitor,
SysTableVisitor,
};
use risingwave_sqlparser::ast::OnConflict;

mod logical_optimization;
mod optimizer_context;
Expand Down Expand Up @@ -848,17 +847,7 @@ impl PlanRoot {
}
}

let conflict_behavior = match on_conflict {
Some(on_conflict) => match on_conflict {
OnConflict::UpdateFull => ConflictBehavior::Overwrite,
OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
},
None => match append_only {
true => ConflictBehavior::NoCheck,
false => ConflictBehavior::Overwrite,
},
};
let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;

if let ConflictBehavior::IgnoreConflict = conflict_behavior
&& version_column_index.is_some()
Expand Down

0 comments on commit 62818d3

Please sign in to comment.