diff --git a/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt b/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt index 5301eda7679b1..658d4fa95c6a0 100644 --- a/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt +++ b/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt @@ -26,30 +26,21 @@ CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user; statement ok CREATE MATERIALIZED VIEW mv_user_2 AS SELECT * FROM src_user; -statement ok -CREATE TABLE t_user WITH ( - ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, - topic = 'pb_alter_source_shared_test', - scan.startup.mode = 'earliest' -) -FORMAT PLAIN ENCODE PROTOBUF( - schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', - message = 'test.User' -); - # age is new field statement error SELECT age FROM mv_user; -statement error -SELECT age FROM t_user; - # Push more events with extended fields system ok python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_alter_source_shared_test" 5 user_with_more_fields sleep 5s + +# set session variable shouldn't affect existing source. +statement ok +set streaming_use_shared_source to false; + # Refresh source schema statement ok ALTER SOURCE src_user REFRESH SCHEMA; diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 05548351492b9..196d4a7eaf39e 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -168,7 +168,7 @@ pub async fn refresh_sr_and_get_columns_diff( session, format_encode, Either::Right(&with_properties), - CreateSourceType::from_with_properties(session, &with_properties), + CreateSourceType::for_replace(original_source), ) .await? else { diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 2981a96423edf..91abf9acf6c99 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -152,7 +152,8 @@ pub enum CreateSourceType { } impl CreateSourceType { - pub fn from_with_properties( + /// Note: shouldn't be used for `ALTER SOURCE`, since session variables should not affect existing source. We should respect the original type instead. + pub fn for_newly_created( session: &SessionImpl, with_properties: &impl WithPropertiesExt, ) -> Self { @@ -172,6 +173,16 @@ impl CreateSourceType { } } + pub fn for_replace(catalog: &SourceCatalog) -> Self { + if !catalog.info.is_shared() { + CreateSourceType::NonShared + } else if catalog.with_properties.is_shareable_cdc_connector() { + CreateSourceType::SharedCdc + } else { + CreateSourceType::SharedNonCdc + } + } + pub fn is_shared(&self) -> bool { matches!( self, @@ -818,7 +829,7 @@ pub async fn handle_create_source( let format_encode = stmt.format_encode.into_v2_with_warning(); let with_properties = bind_connector_props(&handler_args, &format_encode, true)?; - let create_source_type = CreateSourceType::from_with_properties(&session, &*with_properties); + let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties); let (columns_from_resolve_source, source_info) = bind_columns_from_source( &session, &format_encode,