Skip to content

Commit

Permalink
fix(iceberg): fix commit_checkpoint_interval for iceberg table with c…
Browse files Browse the repository at this point in the history
…onnector (#19788)
  • Loading branch information
chenzl25 authored Dec 23, 2024
1 parent 5423dee commit 3cd7241
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
22 changes: 22 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_engine.slt
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,26 @@ statement ok
DROP TABLE full_type_t;


# test connector with commit_checkpoint_interval
statement ok
create table nexmark_t (
id BIGINT,
item_name VARCHAR,
description VARCHAR,
initial_bid BIGINT,
reserve BIGINT,
date_time TIMESTAMP,
expires TIMESTAMP,
seller BIGINT,
category BIGINT,
extra VARCHAR)
with (
connector = 'nexmark',
nexmark.table.type = 'Auction',
nexmark.split.num = '2',
nexmark.min.event.gap.in.ns = '500000',
commit_checkpoint_interval = 1
) engine = iceberg;

statement ok
DROP TABLE nexmark_t
12 changes: 9 additions & 3 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_common::{bail, bail_not_implemented};
use risingwave_connector::jvm_runtime::JVM;
use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
use risingwave_connector::source::cdc::build_cdc_table_id;
use risingwave_connector::source::cdc::external::{
ExternalTableConfig, ExternalTableImpl, DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
Expand Down Expand Up @@ -1381,7 +1382,7 @@ pub async fn handle_create_table(
pub async fn create_iceberg_engine_table(
session: Arc<SessionImpl>,
handler_args: HandlerArgs,
source: Option<PbSource>,
mut source: Option<PbSource>,
table: PbTable,
graph: StreamFragmentGraph,
job_type: TableJobType,
Expand Down Expand Up @@ -1621,7 +1622,7 @@ pub async fn create_iceberg_engine_table(
with.insert("table.name".to_owned(), iceberg_table_name.clone());
let commit_checkpoint_interval = handler_args
.with_options
.get("commit_checkpoint_interval")
.get(COMMIT_CHECKPOINT_INTERVAL)
.map(|v| v.to_owned())
.unwrap_or_else(|| "60".to_owned());
let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
Expand All @@ -1635,13 +1636,18 @@ pub async fn create_iceberg_engine_table(
bail!("commit_checkpoint_interval must be a positive integer: 0");
}

// remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field.
source
.as_mut()
.map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));

let sink_decouple = session.config().sink_decouple();
if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
bail!("config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled")
}

with.insert(
"commit_checkpoint_interval".to_owned(),
COMMIT_CHECKPOINT_INTERVAL.to_owned(),
commit_checkpoint_interval.to_string(),
);
with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
Expand Down

0 comments on commit 3cd7241

Please sign in to comment.