Skip to content

Commit

Permalink
fix: session variable should not affect shared source (#19807)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Dec 16, 2024
1 parent 594ef89 commit 0b1c0e7
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 17 deletions.
19 changes: 5 additions & 14 deletions e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,21 @@ CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user;
statement ok
CREATE MATERIALIZED VIEW mv_user_2 AS SELECT * FROM src_user;

statement ok
CREATE TABLE t_user WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'pb_alter_source_shared_test',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

# age is new field
statement error
SELECT age FROM mv_user;

statement error
SELECT age FROM t_user;

# Push more events with extended fields
system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_alter_source_shared_test" 5 user_with_more_fields

sleep 5s


# set session variable shouldn't affect existing source.
statement ok
set streaming_use_shared_source to false;

# Refresh source schema
statement ok
ALTER SOURCE src_user REFRESH SCHEMA;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pub async fn refresh_sr_and_get_columns_diff(
session,
format_encode,
Either::Right(&with_properties),
CreateSourceType::from_with_properties(session, &with_properties),
CreateSourceType::for_replace(original_source),
)
.await?
else {
Expand Down
15 changes: 13 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ pub enum CreateSourceType {
}

impl CreateSourceType {
pub fn from_with_properties(
/// Note: shouldn't be used for `ALTER SOURCE`, since session variables should not affect existing source. We should respect the original type instead.
pub fn for_newly_created(
session: &SessionImpl,
with_properties: &impl WithPropertiesExt,
) -> Self {
Expand All @@ -172,6 +173,16 @@ impl CreateSourceType {
}
}

pub fn for_replace(catalog: &SourceCatalog) -> Self {
if !catalog.info.is_shared() {
CreateSourceType::NonShared
} else if catalog.with_properties.is_shareable_cdc_connector() {
CreateSourceType::SharedCdc
} else {
CreateSourceType::SharedNonCdc
}
}

pub fn is_shared(&self) -> bool {
matches!(
self,
Expand Down Expand Up @@ -818,7 +829,7 @@ pub async fn handle_create_source(
let format_encode = stmt.format_encode.into_v2_with_warning();
let with_properties = bind_connector_props(&handler_args, &format_encode, true)?;

let create_source_type = CreateSourceType::from_with_properties(&session, &*with_properties);
let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
let (columns_from_resolve_source, source_info) = bind_columns_from_source(
&session,
&format_encode,
Expand Down

0 comments on commit 0b1c0e7

Please sign in to comment.