Skip to content

Commit

Permalink
refactor(source): move schema registry to parent mod (#12986)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Oct 23, 2023
1 parent 7574a1b commit 67b5feb
Show file tree
Hide file tree
Showing 12 changed files with 38 additions and 39 deletions.
6 changes: 3 additions & 3 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ use risingwave_pb::plan_common::ColumnDesc;

use super::schema_resolver::*;
use super::util::avro_schema_to_column_descs;
use crate::parser::schema_registry::{
extract_schema_id, get_subject_by_strategy, handle_sr_list, Client,
};
use crate::parser::unified::avro::{AvroAccess, AvroParseOptions};
use crate::parser::unified::AccessImpl;
use crate::parser::util::{read_schema_from_http, read_schema_from_local, read_schema_from_s3};
use crate::parser::{AccessBuilder, EncodingProperties, EncodingType};
use crate::schema::schema_registry::{
extract_schema_id, get_subject_by_strategy, handle_sr_list, Client,
};

// Default avro access builder
#[derive(Debug)]
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/avro/schema_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use moka::future::Cache;
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{Result, RwError};

use crate::parser::schema_registry::{Client, ConfluentSchema};
use crate::schema::schema_registry::{Client, ConfluentSchema};

#[derive(Debug)]
pub struct ConfluentSchemaResolver {
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ use risingwave_pb::plan_common::ColumnDesc;

use crate::parser::avro::schema_resolver::ConfluentSchemaResolver;
use crate::parser::avro::util::avro_schema_to_column_descs;
use crate::parser::schema_registry::{
extract_schema_id, get_subject_by_strategy, handle_sr_list, Client,
};
use crate::parser::unified::avro::{
avro_extract_field_schema, avro_schema_skip_union, AvroAccess, AvroParseOptions,
};
use crate::parser::unified::AccessImpl;
use crate::parser::{AccessBuilder, EncodingProperties, EncodingType};
use crate::schema::schema_registry::{
extract_schema_id, get_subject_by_strategy, handle_sr_list, Client,
};

const BEFORE: &str = "before";
const AFTER: &str = "after";
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@ use risingwave_common::try_match_expand;
use risingwave_pb::plan_common::ColumnDesc;

use super::avro::schema_resolver::ConfluentSchemaResolver;
use super::schema_registry::Client;
use super::util::{get_kafka_topic, read_schema_from_http, read_schema_from_local};
use super::{EncodingProperties, SchemaRegistryAuth, SpecificParserConfig};
use crate::only_parse_payload;
use crate::parser::avro::util::avro_schema_to_column_descs;
use crate::parser::schema_registry::handle_sr_list;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::util::apply_row_accessor_on_stream_chunk_writer;
use crate::parser::unified::AccessImpl;
use crate::parser::{
AccessBuilder, ByteStreamSourceParser, ParserFormat, SourceStreamChunkRowWriter,
};
use crate::schema::schema_registry::{handle_sr_list, Client};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
Expand Down
21 changes: 1 addition & 20 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::catalog::{
SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy, StreamSourceInfo,
};
pub use schema_registry::name_strategy_from_str;
use tracing_futures::Instrument;

use self::avro::AvroAccessBuilder;
Expand All @@ -46,6 +45,7 @@ use self::upsert_parser::UpsertParser;
use self::util::get_kafka_topic;
use crate::aws_auth::AwsAuthProps;
use crate::parser::maxwell::MaxwellParser;
use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::source::{
extract_source_struct, BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext,
SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceWithStateStream, SplitId,
Expand All @@ -63,7 +63,6 @@ mod maxwell;
mod mysql;
mod plain_parser;
mod protobuf;
mod schema_registry;
mod unified;
mod upsert_parser;
mod util;
Expand Down Expand Up @@ -783,24 +782,6 @@ impl SpecificParserConfig {
};
}

#[derive(Debug, Clone, Default)]
pub struct SchemaRegistryAuth {
username: Option<String>,
password: Option<String>,
}

impl From<&HashMap<String, String>> for SchemaRegistryAuth {
fn from(props: &HashMap<String, String>) -> Self {
const SCHEMA_REGISTRY_USERNAME: &str = "schema.registry.username";
const SCHEMA_REGISTRY_PASSWORD: &str = "schema.registry.password";

SchemaRegistryAuth {
username: props.get(SCHEMA_REGISTRY_USERNAME).cloned(),
password: props.get(SCHEMA_REGISTRY_PASSWORD).cloned(),
}
}
}

#[derive(Debug, Default, Clone)]
pub struct AvroProperties {
pub use_schema_registry: bool,
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ use risingwave_pb::plan_common::ColumnDesc;

use super::schema_resolver::*;
use crate::aws_utils::load_file_descriptor_from_s3;
use crate::parser::schema_registry::{
extract_schema_id, get_subject_by_strategy, handle_sr_list, Client,
};
use crate::parser::unified::protobuf::ProtobufAccess;
use crate::parser::unified::AccessImpl;
use crate::parser::{AccessBuilder, EncodingProperties};
use crate::schema::schema_registry::{
extract_schema_id, get_subject_by_strategy, handle_sr_list, Client,
};

#[derive(Debug)]
pub struct ProtobufAccessBuilder {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/protobuf/schema_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use risingwave_common::error::ErrorCode::{InternalError, ProtocolError};
use risingwave_common::error::{Result, RwError};
use url::Url;

use crate::parser::schema_registry::Client;
use crate::parser::util::download_from_http;
use crate::schema::schema_registry::Client;

const PB_SCHEMA_LOCATION_S3_REGION: &str = "region";

Expand Down
1 change: 1 addition & 0 deletions src/connector/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub mod protobuf;
pub mod schema_registry;

const MESSAGE_NAME_KEY: &str = "message";
const SCHEMA_LOCATION_KEY: &str = "schema.location";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Arc;

Expand All @@ -23,8 +23,25 @@ use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{Result, RwError};
use serde::de::DeserializeOwned;

use crate::parser::schema_registry::util::*;
use crate::parser::SchemaRegistryAuth;
use super::util::*;

#[derive(Debug, Clone, Default)]
pub struct SchemaRegistryAuth {
username: Option<String>,
password: Option<String>,
}

impl From<&HashMap<String, String>> for SchemaRegistryAuth {
fn from(props: &HashMap<String, String>) -> Self {
const SCHEMA_REGISTRY_USERNAME: &str = "schema.registry.username";
const SCHEMA_REGISTRY_PASSWORD: &str = "schema.registry.password";

SchemaRegistryAuth {
username: props.get(SCHEMA_REGISTRY_USERNAME).cloned(),
password: props.get(SCHEMA_REGISTRY_PASSWORD).cloned(),
}
}
}

/// An client for communication with schema registry
#[derive(Debug)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ enum ReqResp<T> {

#[cfg(test)]
mod test {
use crate::parser::schema_registry::handle_sr_list;
use super::super::handle_sr_list;

#[test]
fn test_handle_sr_list() {
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ use risingwave_common::error::ErrorCode::{self, InvalidInputSyntax, ProtocolErro
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_connector::parser::{
name_strategy_from_str, schema_to_columns, AvroParserConfig, DebeziumAvroParserConfig,
ProtobufParserConfig, SpecificParserConfig,
schema_to_columns, AvroParserConfig, DebeziumAvroParserConfig, ProtobufParserConfig,
SpecificParserConfig,
};
use risingwave_connector::schema::schema_registry::name_strategy_from_str;
use risingwave_connector::source::cdc::{
CITUS_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
};
Expand Down

0 comments on commit 67b5feb

Please sign in to comment.