Skip to content

Commit

Permalink
refactor(source): cleanup unused `ParserProperties::key_encoding_conf…
Browse files Browse the repository at this point in the history
…ig` (#17416)
  • Loading branch information
xiangjinwu authored and shanicky committed Jun 26, 2024
1 parent e9d26b9 commit 08bb317
Show file tree
Hide file tree
Showing 11 changed files with 2 additions and 34 deletions.
1 change: 0 additions & 1 deletion src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ impl MockDatagenSource {
.unwrap();
let parser_config = ParserConfig {
specific: SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Native,
protocol_config: ProtocolProperties::Native,
},
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/parser/bytes_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ mod tests {
async fn test_bytes_parser(get_payload: fn() -> Vec<Vec<u8>>) {
let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())];
let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Bytes(BytesProperties { column_name: None }),
protocol_config: ProtocolProperties::Plain,
};
Expand Down
8 changes: 2 additions & 6 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use risingwave_common::bail;
use super::simd_json_parser::DebeziumJsonAccessBuilder;
use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig};
use crate::error::ConnectorResult;
use crate::extract_key_config;
use crate::parser::unified::debezium::DebeziumChangeEvent;
use crate::parser::unified::json::TimestamptzHandling;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
Expand Down Expand Up @@ -89,8 +88,8 @@ impl DebeziumParser {
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
) -> ConnectorResult<Self> {
let (key_config, key_type) = extract_key_config!(props);
let key_builder = build_accessor_builder(key_config, key_type).await?;
let key_builder =
build_accessor_builder(props.encoding_config.clone(), EncodingType::Key).await?;
let payload_builder =
build_accessor_builder(props.encoding_config, EncodingType::Value).await?;
let debezium_props = if let ProtocolProperties::Debezium(props) = props.protocol_config {
Expand All @@ -114,7 +113,6 @@ impl DebeziumParser {
use crate::parser::JsonProperties;

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down Expand Up @@ -225,7 +223,6 @@ mod tests {
.collect::<Vec<_>>();

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down Expand Up @@ -298,7 +295,6 @@ mod tests {
.collect::<Vec<_>>();

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ mod tests {

async fn build_parser(rw_columns: Vec<SourceColumnDesc>) -> DebeziumParser {
let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/parser/maxwell/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ mod tests {
];

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down
3 changes: 0 additions & 3 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1049,15 +1049,13 @@ pub struct CommonParserConfig {

#[derive(Debug, Clone, Default)]
pub struct SpecificParserConfig {
pub key_encoding_config: Option<EncodingProperties>,
pub encoding_config: EncodingProperties,
pub protocol_config: ProtocolProperties,
}

impl SpecificParserConfig {
// for test only
pub const DEFAULT_PLAIN_JSON: SpecificParserConfig = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down Expand Up @@ -1278,7 +1276,6 @@ impl SpecificParserConfig {
}
};
Ok(Self {
key_encoding_config: None,
encoding_config,
protocol_config,
})
Expand Down
16 changes: 0 additions & 16 deletions src/connector/src/parser/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,6 @@ macro_rules! only_parse_payload {
};
}

// Extract encoding config and encoding type from ParserProperties
// for message key.
//
// Suppose (A, B) is the combination of key/payload combination:
// For (None, B), key should be the the key setting from B
// For (A, B), key should be the value setting from A
#[macro_export]
macro_rules! extract_key_config {
($props:ident) => {
match $props.key_encoding_config {
Some(config) => (config, EncodingType::Value),
None => ($props.encoding_config.clone(), EncodingType::Key),
}
};
}

/// Load raw bytes from:
/// * local file, for on-premise or testing.
/// * http/https, for common usage.
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/source/datagen/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ mod tests {
use_schema_registry: false,
timestamptz_handling: None,
}),
key_encoding_config: None,
},
data_types,
rows_per_second,
Expand Down
2 changes: 0 additions & 2 deletions src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,6 @@ mod tests {
state,
ParserConfig {
specific: SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Native,
protocol_config: ProtocolProperties::Native,
},
Expand Down Expand Up @@ -457,7 +456,6 @@ mod tests {
};
let parser_config = ParserConfig {
specific: SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Native,
protocol_config: ProtocolProperties::Native,
},
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/source/filesystem/s3/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,6 @@ mod tests {
let config = ParserConfig {
common: CommonParserConfig { rw_columns: descs },
specific: SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Csv(csv_config),
protocol_config: ProtocolProperties::Plain,
},
Expand Down
1 change: 0 additions & 1 deletion src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,6 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
#[try_stream(ok = Message, error = StreamExecutorError)]
pub async fn transform_upstream(upstream: BoxedMessageStream, output_columns: &[ColumnDesc]) {
let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down

0 comments on commit 08bb317

Please sign in to comment.