Skip to content

Commit

Permalink
legacy row format syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Feb 28, 2024
1 parent 87abf29 commit a83b334
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 0 deletions.
11 changes: 11 additions & 0 deletions e2e_test/source/basic/old_row_format_syntax/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,20 @@ create table s8_no_schema_field (
properties.bootstrap.server = 'message_queue:29092'
) ROW FORMAT DEBEZIUM_JSON

statement error without schema registry
create table s9 with (
connector = 'kafka',
topic = 'avro_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) row format avro row schema location 'file:///risingwave/avro-simple-schema.avsc'

statement ok
create table s9 with (
connector = 'kafka',
topic = 'avro_bin',
properties.bootstrap.server = 'message_queue:29092',
with_deprecated_file_header = true,
scan.startup.mode = 'earliest'
) row format avro row schema location 'file:///risingwave/avro-simple-schema.avsc'

Expand All @@ -168,6 +177,7 @@ create table s10 with (
connector = 'kafka',
topic = 'avro_c_bin',
properties.bootstrap.server = 'message_queue:29092',
with_deprecated_file_header = true,
scan.startup.mode = 'earliest'
) row format avro row schema location 'file:///risingwave/avro-complex-schema.avsc'

Expand Down Expand Up @@ -267,6 +277,7 @@ create source s18 with (
connector = 'kafka',
topic = 'avro_c_bin',
properties.bootstrap.server = 'message_queue:29092',
with_deprecated_file_header = true,
scan.startup.mode = 'earliest'
) row format avro row schema location 'file:///risingwave/avro-complex-schema.avsc'

Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ async fn extract_avro_table_schema(
&& !format_encode_options
.get("with_deprecated_file_header")
.is_some_and(|v| v == "true")
&& !with_properties
.get("with_deprecated_file_header")
.is_some_and(|v| v == "true")
{
bail_not_implemented!(issue = 12871, "avro without schema registry");
}
Expand Down

0 comments on commit a83b334

Please sign in to comment.