diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index 1637b5df6c659..c063a8ca0cee7 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -209,7 +209,6 @@ impl MockDatagenSource { .unwrap(); let parser_config = ParserConfig { specific: SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Native, protocol_config: ProtocolProperties::Native, }, diff --git a/src/connector/src/parser/bytes_parser.rs b/src/connector/src/parser/bytes_parser.rs index 5df7fa28118d3..eeccf17be7a27 100644 --- a/src/connector/src/parser/bytes_parser.rs +++ b/src/connector/src/parser/bytes_parser.rs @@ -63,7 +63,6 @@ mod tests { async fn test_bytes_parser(get_payload: fn() -> Vec>) { let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())]; let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Bytes(BytesProperties { column_name: None }), protocol_config: ProtocolProperties::Plain, }; diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 817c2a788f2be..bc43e556cb4f6 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -19,7 +19,6 @@ use risingwave_common::bail; use super::simd_json_parser::DebeziumJsonAccessBuilder; use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig}; use crate::error::ConnectorResult; -use crate::extract_key_config; use crate::parser::unified::debezium::DebeziumChangeEvent; use crate::parser::unified::json::TimestamptzHandling; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; @@ -89,8 +88,8 @@ impl DebeziumParser { rw_columns: Vec, source_ctx: SourceContextRef, ) -> ConnectorResult { - let (key_config, key_type) = extract_key_config!(props); - let key_builder = build_accessor_builder(key_config, key_type).await?; + let key_builder = + build_accessor_builder(props.encoding_config.clone(), EncodingType::Key).await?; let payload_builder = build_accessor_builder(props.encoding_config, EncodingType::Value).await?; let debezium_props = if let ProtocolProperties::Debezium(props) = props.protocol_config { @@ -114,7 +113,6 @@ impl DebeziumParser { use crate::parser::JsonProperties; let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, @@ -225,7 +223,6 @@ mod tests { .collect::>(); let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, @@ -298,7 +295,6 @@ mod tests { .collect::>(); let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index 85399ed772768..f9738eb9e357e 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -130,7 +130,6 @@ mod tests { async fn build_parser(rw_columns: Vec) -> DebeziumParser { let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, diff --git a/src/connector/src/parser/maxwell/simd_json_parser.rs b/src/connector/src/parser/maxwell/simd_json_parser.rs index 5db6cdd52e90c..e5bc7291ccf07 100644 --- a/src/connector/src/parser/maxwell/simd_json_parser.rs +++ b/src/connector/src/parser/maxwell/simd_json_parser.rs @@ -35,7 +35,6 @@ mod tests { ]; let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 26cf746b535dc..cab3b0dbd7df1 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -1048,7 +1048,6 @@ pub struct CommonParserConfig { #[derive(Debug, Clone, Default)] pub struct SpecificParserConfig { - pub key_encoding_config: Option, pub encoding_config: EncodingProperties, pub protocol_config: ProtocolProperties, } @@ -1056,7 +1055,6 @@ pub struct SpecificParserConfig { impl SpecificParserConfig { // for test only pub const DEFAULT_PLAIN_JSON: SpecificParserConfig = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, @@ -1277,7 +1275,6 @@ impl SpecificParserConfig { } }; Ok(Self { - key_encoding_config: None, encoding_config, protocol_config, }) diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index 30cb1fbf7d62e..590ba927854d3 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -92,22 +92,6 @@ macro_rules! only_parse_payload { }; } -// Extract encoding config and encoding type from ParserProperties -// for message key. -// -// Suppose (A, B) is the combination of key/payload combination: -// For (None, B), key should be the the key setting from B -// For (A, B), key should be the value setting from A -#[macro_export] -macro_rules! extract_key_config { - ($props:ident) => { - match $props.key_encoding_config { - Some(config) => (config, EncodingType::Value), - None => ($props.encoding_config.clone(), EncodingType::Key), - } - }; -} - /// Load raw bytes from: /// * local file, for on-premise or testing. /// * http/https, for common usage. diff --git a/src/connector/src/source/datagen/source/generator.rs b/src/connector/src/source/datagen/source/generator.rs index 5716631dff620..600efda2f6255 100644 --- a/src/connector/src/source/datagen/source/generator.rs +++ b/src/connector/src/source/datagen/source/generator.rs @@ -278,7 +278,6 @@ mod tests { use_schema_registry: false, timestamptz_handling: None, }), - key_encoding_config: None, }, data_types, rows_per_second, diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index d3115f504f32e..992343058e9ef 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -399,7 +399,6 @@ mod tests { state, ParserConfig { specific: SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Native, protocol_config: ProtocolProperties::Native, }, @@ -457,7 +456,6 @@ mod tests { }; let parser_config = ParserConfig { specific: SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Native, protocol_config: ProtocolProperties::Native, }, diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index c3c800d6a5317..0340e584d0b8f 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -289,7 +289,6 @@ mod tests { let config = ParserConfig { common: CommonParserConfig { rw_columns: descs }, specific: SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Csv(csv_config), protocol_config: ProtocolProperties::Plain, }, diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 168e619cc956a..4e564130accbb 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -641,7 +641,6 @@ impl CdcBackfillExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] pub async fn transform_upstream(upstream: BoxedMessageStream, output_columns: &[ColumnDesc]) { let props = SpecificParserConfig { - key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None,