diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index 99e20f10b4551..5cf3a36d5c680 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -163,7 +163,7 @@ create table s8_no_schema_field ( properties.bootstrap.server = 'message_queue:29092' ) FORMAT DEBEZIUM ENCODE JSON -statement ok +statement error without schema registry create table s9 with ( connector = 'kafka', topic = 'avro_bin', @@ -171,13 +171,21 @@ create table s9 with ( scan.startup.mode = 'earliest' ) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-simple-schema.avsc'); +statement ok +create table s9 with ( + connector = 'kafka', + topic = 'avro_bin', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-simple-schema.avsc', with_deprecated_file_header = true); + statement ok create table s10 with ( connector = 'kafka', topic = 'avro_c_bin', properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest' -) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-complex-schema.avsc'); +) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-complex-schema.avsc', with_deprecated_file_header = true); statement ok create table s11 with ( @@ -282,7 +290,7 @@ create source s18 with ( topic = 'avro_c_bin', properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest' -) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-complex-schema.avsc'); +) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-complex-schema.avsc', with_deprecated_file_header = true); # we cannot use confluent schema registry when connector is not kafka statement error diff --git a/e2e_test/source/basic/old_row_format_syntax/kafka.slt b/e2e_test/source/basic/old_row_format_syntax/kafka.slt index b76460cac6551..6c387aeb4523f 100644 --- a/e2e_test/source/basic/old_row_format_syntax/kafka.slt +++ b/e2e_test/source/basic/old_row_format_syntax/kafka.slt @@ -155,7 +155,7 @@ create table s8_no_schema_field ( properties.bootstrap.server = 'message_queue:29092' ) ROW FORMAT DEBEZIUM_JSON -statement ok +statement error without schema registry create table s9 with ( connector = 'kafka', topic = 'avro_bin', @@ -163,7 +163,7 @@ create table s9 with ( scan.startup.mode = 'earliest' ) row format avro row schema location 'file:///risingwave/avro-simple-schema.avsc' -statement ok +statement error without schema registry create table s10 with ( connector = 'kafka', topic = 'avro_c_bin', @@ -262,7 +262,7 @@ create source s17 with ( scan.startup.mode = 'earliest' ) row format protobuf message 'test.User' row schema location 'file:///risingwave/proto-complex-schema' -statement ok +statement error without schema registry create source s18 with ( connector = 'kafka', topic = 'avro_c_bin', @@ -560,15 +560,15 @@ select id, first_name, last_name, email from s8_no_schema_field; 1004 Anne1 Kretchmar annek@noanswer.org 1005 add add2 add -query IITFFBTT -select id, sequence_id, name, score, avg_score, is_lasted, entrance_date, birthday, passed from s9; ----- -32 64 str_value 32 64 t 1970-01-01 1970-01-01 00:00:00+00:00 1 mon 1 day 00:00:01 - -query ITITT -select id, code, timestamp, xfas, contacts, sex from s10; ----- -100 abc 1473305798 {"(0,200,10.0.0.1)","(1,400,10.0.0.2)"} ("{1xxx,2xxx}","{1xxx,2xxx}") MALE +# query IITFFBTT +# select id, sequence_id, name, score, avg_score, is_lasted, entrance_date, birthday, passed from s9; +# ---- +# 32 64 str_value 32 64 t 1970-01-01 1970-01-01 00:00:00+00:00 1 mon 1 day 00:00:01 +# +# query ITITT +# select id, code, timestamp, xfas, contacts, sex from s10; +# ---- +# 100 abc 1473305798 {"(0,200,10.0.0.1)","(1,400,10.0.0.2)"} ("{1xxx,2xxx}","{1xxx,2xxx}") MALE query ITITT select id, code, timestamp, xfas, contacts, sex from s11; @@ -706,11 +706,11 @@ drop table s8 statement ok drop table s8_no_schema_field -statement ok -drop table s9 - -statement ok -drop table s10 +# statement ok +# drop table s9 +# +# statement ok +# drop table s10 statement ok drop table s11 @@ -733,8 +733,8 @@ drop table s16 statement ok drop source s17 -statement ok -drop source s18 +# statement ok +# drop source s18 statement ok drop table s20 diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index c8cfd938c23a2..4f518aad171ba 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -21,6 +21,7 @@ use either::Either; use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ is_column_ids_dedup, ColumnCatalog, ColumnDesc, Schema, TableId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME, @@ -151,6 +152,15 @@ async fn extract_avro_table_schema( let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?; conf.map_to_columns()? } else { + if let risingwave_connector::parser::EncodingProperties::Avro(avro_props) = + &parser_config.encoding_config + && !avro_props.use_schema_registry + && !format_encode_options + .get("with_deprecated_file_header") + .is_some_and(|v| v == "true") + { + bail_not_implemented!(issue = 12871, "avro without schema registry"); + } let conf = AvroParserConfig::new(parser_config.encoding_config).await?; conf.map_to_columns()? };