diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index a315fd88fc9bc..80e526b01c15f 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -53,8 +53,8 @@ use risingwave_pb::catalog::{ use risingwave_pb::plan_common::{AdditionalColumnType, EncodeType, FormatType}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_sqlparser::ast::{ - get_delimiter, AstString, AvroSchema, ColumnDef, ConnectorSchema, CreateSourceStatement, - DebeziumAvroSchema, Encode, Format, Ident, ProtobufSchema, SourceWatermark, + get_delimiter, AstString, ColumnDef, ConnectorSchema, CreateSourceStatement, Encode, Format, + Ident, ProtobufSchema, SourceWatermark, }; use super::RwPgResponse; @@ -130,14 +130,20 @@ async fn extract_avro_table_schema( info: &StreamSourceInfo, with_properties: &HashMap, format_encode_options: &mut BTreeMap, + is_debezium: bool, ) -> Result> { let parser_config = SpecificParserConfig::new(info, with_properties)?; try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME); try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD); consume_aws_config_from_options(format_encode_options); - let conf = AvroParserConfig::new(parser_config.encoding_config).await?; - let vec_column_desc = conf.map_to_columns()?; + let vec_column_desc = if is_debezium { + let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?; + conf.map_to_columns()? + } else { + let conf = AvroParserConfig::new(parser_config.encoding_config).await?; + conf.map_to_columns()? + }; Ok(vec_column_desc .into_iter() .map(|col| ColumnCatalog { @@ -156,29 +162,6 @@ async fn extract_debezium_avro_table_pk_columns( Ok(conf.extract_pks()?.drain(..).map(|c| c.name).collect()) } -// Map an Avro schema to a relational schema and return the pk_column_ids. -async fn extract_debezium_avro_table_schema( - info: &StreamSourceInfo, - with_properties: &HashMap, - format_encode_options: &mut BTreeMap, -) -> Result> { - let parser_config = SpecificParserConfig::new(info, with_properties)?; - try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME); - try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD); - consume_aws_config_from_options(format_encode_options); - - let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?; - let vec_column_desc = conf.map_to_columns()?; - let column_catalog = vec_column_desc - .into_iter() - .map(|col| ColumnCatalog { - column_desc: col.into(), - is_hidden: false, - }) - .collect_vec(); - Ok(column_catalog) -} - /// Map a protobuf schema to a relational schema. async fn extract_protobuf_table_schema( schema: &ProtobufSchema, @@ -288,7 +271,6 @@ pub(crate) async fn bind_columns_from_source( session: &SessionImpl, source_schema: &ConnectorSchema, with_properties: &HashMap, - create_cdc_source_job: bool, ) -> Result<(Option>, StreamSourceInfo)> { const MESSAGE_NAME_KEY: &str = "message"; const KEY_MESSAGE_NAME_KEY: &str = "key.message"; @@ -298,14 +280,15 @@ pub(crate) async fn bind_columns_from_source( let format_encode_options = WithOptions::try_from(source_schema.row_options())?.into_inner(); let mut format_encode_options_to_consume = format_encode_options.clone(); - let get_key_message_name = |options: &mut BTreeMap| -> Option { + fn get_key_message_name(options: &mut BTreeMap) -> Option { consume_string_from_options(options, KEY_MESSAGE_NAME_KEY) .map(|ele| Some(ele.0)) .unwrap_or(None) - }; - let get_sr_name_strategy_check = |options: &mut BTreeMap, - use_sr: bool| - -> Result> { + } + fn get_sr_name_strategy_check( + options: &mut BTreeMap, + use_sr: bool, + ) -> Result> { let name_strategy = get_name_strategy_or_default(try_consume_string_from_options( options, NAME_STRATEGY_KEY, @@ -316,18 +299,19 @@ pub(crate) async fn bind_columns_from_source( ))); } Ok(name_strategy) + } + + let mut stream_source_info = StreamSourceInfo { + format: format_to_prost(&source_schema.format) as i32, + row_encode: row_encode_to_prost(&source_schema.row_encode) as i32, + format_encode_options, + ..Default::default() }; - let res = match (&source_schema.format, &source_schema.row_encode) { - (Format::Native, Encode::Native) => ( - None, - StreamSourceInfo { - format: FormatType::Native as i32, - row_encode: EncodeType::Native as i32, - format_encode_options, - ..Default::default() - }, - ), + let columns = match (&source_schema.format, &source_schema.row_encode) { + (Format::Native, Encode::Native) + | (Format::Plain, Encode::Bytes) + | (Format::DebeziumMongo, Encode::Json) => None, (Format::Plain, Encode::Protobuf) => { let (row_schema_location, use_schema_registry) = get_schema_location(&mut format_encode_options_to_consume)?; @@ -344,93 +328,58 @@ pub(crate) async fn bind_columns_from_source( protobuf_schema.use_schema_registry, )?; - ( - Some( - extract_protobuf_table_schema( - &protobuf_schema, - with_properties, - &mut format_encode_options_to_consume, - ) - .await?, - ), - StreamSourceInfo { - format: FormatType::Plain as i32, - row_encode: EncodeType::Protobuf as i32, - row_schema_location: protobuf_schema.row_schema_location.0.clone(), - use_schema_registry: protobuf_schema.use_schema_registry, - proto_message_name: protobuf_schema.message_name.0.clone(), - key_message_name: get_key_message_name(&mut format_encode_options_to_consume), - name_strategy: name_strategy - .unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32), - format_encode_options, - ..Default::default() - }, - ) - } - (Format::Plain, Encode::Json) => { - let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?; - let columns = if create_cdc_source_job { - Some(debezium_cdc_source_schema()) - } else { - extract_json_table_schema( - &schema_config, + stream_source_info.use_schema_registry = protobuf_schema.use_schema_registry; + stream_source_info.row_schema_location = protobuf_schema.row_schema_location.0.clone(); + stream_source_info.proto_message_name = protobuf_schema.message_name.0.clone(); + stream_source_info.key_message_name = + get_key_message_name(&mut format_encode_options_to_consume); + stream_source_info.name_strategy = + name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32); + + Some( + extract_protobuf_table_schema( + &protobuf_schema, with_properties, &mut format_encode_options_to_consume, ) - .await? - }; - - ( - columns, - StreamSourceInfo { - format: FormatType::Plain as i32, - row_encode: EncodeType::Json as i32, - use_schema_registry: json_schema_infer_use_schema_registry(&schema_config), - cdc_source_job: create_cdc_source_job, - format_encode_options, - ..Default::default() - }, + .await?, ) } - (Format::Plain, Encode::Avro) => { + (format @ (Format::Plain | Format::Upsert | Format::Debezium), Encode::Avro) => { let (row_schema_location, use_schema_registry) = get_schema_location(&mut format_encode_options_to_consume)?; - let avro_schema = AvroSchema { - row_schema_location, - use_schema_registry, - }; - let key_message_name = get_key_message_name(&mut format_encode_options_to_consume); + if matches!(format, Format::Debezium) && !use_schema_registry { + return Err(RwError::from(ProtocolError( + "schema location for DEBEZIUM_AVRO row format is not supported".to_string(), + ))); + } + let message_name = try_consume_string_from_options( &mut format_encode_options_to_consume, MESSAGE_NAME_KEY, ); let name_strategy = get_sr_name_strategy_check( &mut format_encode_options_to_consume, - avro_schema.use_schema_registry, + use_schema_registry, )?; - let stream_source_info = StreamSourceInfo { - format: FormatType::Plain as i32, - row_encode: EncodeType::Avro as i32, - row_schema_location: avro_schema.row_schema_location.0.clone(), - use_schema_registry: avro_schema.use_schema_registry, - proto_message_name: message_name.unwrap_or(AstString("".into())).0, - key_message_name, - name_strategy: name_strategy - .unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32), - format_encode_options, - ..Default::default() - }; - ( - Some( - extract_avro_table_schema( - &stream_source_info, - with_properties, - &mut format_encode_options_to_consume, - ) - .await?, - ), - stream_source_info, + + stream_source_info.use_schema_registry = use_schema_registry; + stream_source_info.row_schema_location = row_schema_location.0.clone(); + stream_source_info.proto_message_name = message_name.unwrap_or(AstString("".into())).0; + stream_source_info.key_message_name = + get_key_message_name(&mut format_encode_options_to_consume); + stream_source_info.name_strategy = + name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32); + + Some( + extract_avro_table_schema( + &stream_source_info, + with_properties, + &mut format_encode_options_to_consume, + matches!(format, Format::Debezium), + ) + .await?, ) } (Format::Plain, Encode::Csv) => { @@ -451,204 +400,81 @@ pub(crate) async fn bind_columns_from_source( .to_owned(), ))); } - ( - None, - StreamSourceInfo { - format: FormatType::Plain as i32, - row_encode: EncodeType::Csv as i32, - csv_delimiter: delimiter as i32, - csv_has_header: has_header, - format_encode_options, - ..Default::default() - }, - ) - } - (Format::Plain, Encode::Bytes) => ( - None, - StreamSourceInfo { - format: FormatType::Plain as i32, - row_encode: EncodeType::Bytes as i32, - format_encode_options, - ..Default::default() - }, - ), - (Format::Upsert, Encode::Json) => { - let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?; - let columns = extract_json_table_schema( - &schema_config, - with_properties, - &mut format_encode_options_to_consume, - ) - .await?; - ( - columns, - StreamSourceInfo { - format: FormatType::Upsert as i32, - row_encode: EncodeType::Json as i32, - use_schema_registry: json_schema_infer_use_schema_registry(&schema_config), - format_encode_options, - ..Default::default() - }, - ) - } - (Format::Upsert, Encode::Avro) => { - let (row_schema_location, use_schema_registry) = - get_schema_location(&mut format_encode_options_to_consume)?; - let avro_schema = AvroSchema { - row_schema_location, - use_schema_registry, - }; - - let name_strategy = get_sr_name_strategy_check( - &mut format_encode_options_to_consume, - avro_schema.use_schema_registry, - )? - .unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32); - let key_message_name = get_key_message_name(&mut format_encode_options_to_consume); - let message_name = try_consume_string_from_options( - &mut format_encode_options_to_consume, - MESSAGE_NAME_KEY, - ); + stream_source_info.csv_delimiter = delimiter as i32; + stream_source_info.csv_has_header = has_header; - let stream_source_info = StreamSourceInfo { - key_message_name, - format: FormatType::Upsert as i32, - row_encode: EncodeType::Avro as i32, - row_schema_location: avro_schema.row_schema_location.0.clone(), - use_schema_registry: avro_schema.use_schema_registry, - proto_message_name: message_name.unwrap_or(AstString("".into())).0, - name_strategy, - format_encode_options, - ..Default::default() - }; - let columns = extract_avro_table_schema( - &stream_source_info, - with_properties, - &mut format_encode_options_to_consume, - ) - .await?; - - (Some(columns), stream_source_info) + None } - - (Format::Debezium, Encode::Json) => { + ( + Format::Plain | Format::Upsert | Format::Maxwell | Format::Canal | Format::Debezium, + Encode::Json, + ) => { let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?; - ( - extract_json_table_schema( - &schema_config, - with_properties, - &mut format_encode_options_to_consume, - ) - .await?, - StreamSourceInfo { - format: FormatType::Debezium as i32, - row_encode: EncodeType::Json as i32, - use_schema_registry: json_schema_infer_use_schema_registry(&schema_config), - format_encode_options, - ..Default::default() - }, - ) - } - (Format::Debezium, Encode::Avro) => { - let (row_schema_location, use_schema_registry) = - get_schema_location(&mut format_encode_options_to_consume)?; - if !use_schema_registry { - return Err(RwError::from(ProtocolError( - "schema location for DEBEZIUM_AVRO row format is not supported".to_string(), - ))); - } - let avro_schema = DebeziumAvroSchema { - row_schema_location, - }; - - // no need to check whether works schema registry because debezium avro always work with - // schema registry - let name_strategy = - get_sr_name_strategy_check(&mut format_encode_options_to_consume, true)?; - let message_name = try_consume_string_from_options( - &mut format_encode_options_to_consume, - MESSAGE_NAME_KEY, - ); - let key_message_name = get_key_message_name(&mut format_encode_options_to_consume); - - let stream_source_info = StreamSourceInfo { - use_schema_registry, - proto_message_name: message_name.unwrap_or(AstString("".into())).0, - name_strategy: name_strategy - .unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32), - format: FormatType::Debezium as i32, - row_encode: EncodeType::Avro as i32, - row_schema_location: avro_schema.row_schema_location.0.clone(), - key_message_name, - format_encode_options, - ..Default::default() - }; + stream_source_info.use_schema_registry = + json_schema_infer_use_schema_registry(&schema_config); - let full_columns = extract_debezium_avro_table_schema( - &stream_source_info, + extract_json_table_schema( + &schema_config, with_properties, &mut format_encode_options_to_consume, ) - .await?; - - (Some(full_columns), stream_source_info) + .await? } - (Format::DebeziumMongo, Encode::Json) => ( - None, - StreamSourceInfo { - format: FormatType::DebeziumMongo as i32, - row_encode: EncodeType::Json as i32, - format_encode_options, - ..Default::default() - }, - ), - - (Format::Maxwell, Encode::Json) => { - let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?; - ( - extract_json_table_schema( - &schema_config, - with_properties, - &mut format_encode_options_to_consume, - ) - .await?, - StreamSourceInfo { - format: FormatType::Maxwell as i32, - row_encode: EncodeType::Json as i32, - use_schema_registry: json_schema_infer_use_schema_registry(&schema_config), - format_encode_options, - ..Default::default() - }, - ) + (format, encoding) => { + return Err(RwError::from(ProtocolError(format!( + "Unknown combination {:?} {:?}", + format, encoding + )))); } + }; - (Format::Canal, Encode::Json) => { - let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?; - ( - extract_json_table_schema( - &schema_config, - with_properties, - &mut format_encode_options_to_consume, - ) - .await?, - StreamSourceInfo { - format: FormatType::Canal as i32, - row_encode: EncodeType::Json as i32, - use_schema_registry: json_schema_infer_use_schema_registry(&schema_config), - format_encode_options, - ..Default::default() - }, - ) - } + if !format_encode_options_to_consume.is_empty() { + let err_string = format!( + "Get unknown format_encode_options for {:?} {:?}: {}", + source_schema.format, + source_schema.row_encode, + format_encode_options_to_consume + .keys() + .map(|k| k.to_string()) + .collect::>() + .join(","), + ); + session.notice_to_user(err_string); + } + Ok((columns, stream_source_info)) +} + +fn bind_columns_from_source_for_cdc( + session: &SessionImpl, + source_schema: &ConnectorSchema, + _with_properties: &HashMap, +) -> Result<(Option>, StreamSourceInfo)> { + let format_encode_options = WithOptions::try_from(source_schema.row_options())?.into_inner(); + let mut format_encode_options_to_consume = format_encode_options.clone(); + + match (&source_schema.format, &source_schema.row_encode) { + (Format::Plain, Encode::Json) => (), (format, encoding) => { + // Note: parser will also check this. Just be extra safe here return Err(RwError::from(ProtocolError(format!( - "Unknown combination {:?} {:?}", + "Row format for CDC connectors should be either omitted or set to `FORMAT PLAIN ENCODE JSON`, got: {:?} {:?}", format, encoding )))); } }; + let columns = debezium_cdc_source_schema(); + let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?; + + let stream_source_info = StreamSourceInfo { + format: format_to_prost(&source_schema.format) as i32, + row_encode: row_encode_to_prost(&source_schema.row_encode) as i32, + format_encode_options, + use_schema_registry: json_schema_infer_use_schema_registry(&schema_config), + cdc_source_job: true, + ..Default::default() + }; if !format_encode_options_to_consume.is_empty() { let err_string = format!( "Get unknown format_encode_options for {:?} {:?}: {}", @@ -662,7 +488,7 @@ pub(crate) async fn bind_columns_from_source( ); session.notice_to_user(err_string); } - Ok(res) + Ok((Some(columns), stream_source_info)) } /// add connector-spec columns to the end of column catalog @@ -1298,13 +1124,11 @@ pub async fn handle_create_source( false }; - let (columns_from_resolve_source, source_info) = bind_columns_from_source( - &session, - &source_schema, - &with_properties, - create_cdc_source_job, - ) - .await?; + let (columns_from_resolve_source, source_info) = if create_cdc_source_job { + bind_columns_from_source_for_cdc(&session, &source_schema, &with_properties)? + } else { + bind_columns_from_source(&session, &source_schema, &with_properties).await? + }; let columns_from_sql = bind_sql_columns(&stmt.columns)?; let mut columns = bind_all_columns( @@ -1434,6 +1258,29 @@ pub async fn handle_create_source( Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE)) } +fn format_to_prost(format: &Format) -> FormatType { + match format { + Format::Native => FormatType::Native, + Format::Plain => FormatType::Plain, + Format::Upsert => FormatType::Upsert, + Format::Debezium => FormatType::Debezium, + Format::DebeziumMongo => FormatType::DebeziumMongo, + Format::Maxwell => FormatType::Maxwell, + Format::Canal => FormatType::Canal, + } +} +fn row_encode_to_prost(row_encode: &Encode) -> EncodeType { + match row_encode { + Encode::Native => EncodeType::Native, + Encode::Json => EncodeType::Json, + Encode::Avro => EncodeType::Avro, + Encode::Protobuf => EncodeType::Protobuf, + Encode::Csv => EncodeType::Csv, + Encode::Bytes => EncodeType::Bytes, + Encode::Template => EncodeType::Template, + } +} + #[cfg(test)] pub mod tests { use std::collections::HashMap; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index d9df554d20320..02f0321932b79 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -478,13 +478,8 @@ pub(crate) async fn gen_create_table_plan_with_source( let sql_pk_names = bind_sql_pk_names(&column_defs, &constraints)?; - let (columns_from_resolve_source, source_info) = bind_columns_from_source( - context.session_ctx(), - &source_schema, - &with_properties, - false, - ) - .await?; + let (columns_from_resolve_source, source_info) = + bind_columns_from_source(context.session_ctx(), &source_schema, &with_properties).await?; let columns_from_sql = bind_sql_columns(&column_defs)?; let mut columns = bind_all_columns(