Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): cleanup unused ParserProperties::key_encoding_config #17416

Merged
merged 3 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ impl MockDatagenSource {
.unwrap();
let parser_config = ParserConfig {
specific: SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Native,
protocol_config: ProtocolProperties::Native,
},
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/parser/bytes_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ mod tests {
async fn test_bytes_parser(get_payload: fn() -> Vec<Vec<u8>>) {
let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())];
let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Bytes(BytesProperties { column_name: None }),
protocol_config: ProtocolProperties::Plain,
};
Expand Down
8 changes: 2 additions & 6 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use risingwave_common::bail;
use super::simd_json_parser::DebeziumJsonAccessBuilder;
use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig};
use crate::error::ConnectorResult;
use crate::extract_key_config;
use crate::parser::unified::debezium::DebeziumChangeEvent;
use crate::parser::unified::json::TimestamptzHandling;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
Expand Down Expand Up @@ -89,8 +88,8 @@ impl DebeziumParser {
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
) -> ConnectorResult<Self> {
let (key_config, key_type) = extract_key_config!(props);
let key_builder = build_accessor_builder(key_config, key_type).await?;
let key_builder =
build_accessor_builder(props.encoding_config.clone(), EncodingType::Key).await?;
let payload_builder =
build_accessor_builder(props.encoding_config, EncodingType::Value).await?;
let debezium_props = if let ProtocolProperties::Debezium(props) = props.protocol_config {
Expand All @@ -114,7 +113,6 @@ impl DebeziumParser {
use crate::parser::JsonProperties;

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down Expand Up @@ -225,7 +223,6 @@ mod tests {
.collect::<Vec<_>>();

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down Expand Up @@ -298,7 +295,6 @@ mod tests {
.collect::<Vec<_>>();

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ mod tests {

async fn build_parser(rw_columns: Vec<SourceColumnDesc>) -> DebeziumParser {
let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/parser/maxwell/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ mod tests {
];

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down
3 changes: 0 additions & 3 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,15 +1048,13 @@ pub struct CommonParserConfig {

#[derive(Debug, Clone, Default)]
pub struct SpecificParserConfig {
pub key_encoding_config: Option<EncodingProperties>,
pub encoding_config: EncodingProperties,
pub protocol_config: ProtocolProperties,
}

impl SpecificParserConfig {
// for test only
pub const DEFAULT_PLAIN_JSON: SpecificParserConfig = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down Expand Up @@ -1277,7 +1275,6 @@ impl SpecificParserConfig {
}
};
Ok(Self {
key_encoding_config: None,
encoding_config,
protocol_config,
})
Expand Down
16 changes: 0 additions & 16 deletions src/connector/src/parser/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,6 @@ macro_rules! only_parse_payload {
};
}

// Extract encoding config and encoding type from ParserProperties
// for message key.
//
// Suppose (A, B) is the combination of key/payload combination:
// For (None, B), key should be the the key setting from B
// For (A, B), key should be the value setting from A
#[macro_export]
macro_rules! extract_key_config {
($props:ident) => {
match $props.key_encoding_config {
Some(config) => (config, EncodingType::Value),
None => ($props.encoding_config.clone(), EncodingType::Key),
}
};
}

/// Load raw bytes from:
/// * local file, for on-premise or testing.
/// * http/https, for common usage.
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/source/datagen/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ mod tests {
use_schema_registry: false,
timestamptz_handling: None,
}),
key_encoding_config: None,
},
data_types,
rows_per_second,
Expand Down
2 changes: 0 additions & 2 deletions src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,6 @@ mod tests {
state,
ParserConfig {
specific: SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Native,
protocol_config: ProtocolProperties::Native,
},
Expand Down Expand Up @@ -457,7 +456,6 @@ mod tests {
};
let parser_config = ParserConfig {
specific: SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Native,
protocol_config: ProtocolProperties::Native,
},
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/source/filesystem/s3/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,6 @@ mod tests {
let config = ParserConfig {
common: CommonParserConfig { rw_columns: descs },
specific: SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Csv(csv_config),
protocol_config: ProtocolProperties::Plain,
},
Expand Down
1 change: 0 additions & 1 deletion src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,6 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
#[try_stream(ok = Message, error = StreamExecutorError)]
pub async fn transform_upstream(upstream: BoxedMessageStream, output_columns: &[ColumnDesc]) {
let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Expand Down
Loading