Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(frontend): disallow creating new avro sources without schema registry #15256

Merged
merged 3 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -163,21 +163,29 @@ create table s8_no_schema_field (
properties.bootstrap.server = 'message_queue:29092'
) FORMAT DEBEZIUM ENCODE JSON

statement ok
statement error without schema registry
create table s9 with (
connector = 'kafka',
topic = 'avro_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-simple-schema.avsc');

statement ok
create table s9 with (
connector = 'kafka',
topic = 'avro_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-simple-schema.avsc', with_deprecated_file_header = true);

statement ok
create table s10 with (
connector = 'kafka',
topic = 'avro_c_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-complex-schema.avsc');
) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-complex-schema.avsc', with_deprecated_file_header = true);

statement ok
create table s11 with (
Expand Down Expand Up @@ -282,7 +290,7 @@ create source s18 with (
topic = 'avro_c_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-complex-schema.avsc');
) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-complex-schema.avsc', with_deprecated_file_header = true);

# we cannot use confluent schema registry when connector is not kafka
statement error
Expand Down
38 changes: 19 additions & 19 deletions e2e_test/source/basic/old_row_format_syntax/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,15 @@ create table s8_no_schema_field (
properties.bootstrap.server = 'message_queue:29092'
) ROW FORMAT DEBEZIUM_JSON

statement ok
statement error without schema registry
create table s9 with (
connector = 'kafka',
topic = 'avro_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) row format avro row schema location 'file:///risingwave/avro-simple-schema.avsc'

statement ok
statement error without schema registry
create table s10 with (
connector = 'kafka',
topic = 'avro_c_bin',
Expand Down Expand Up @@ -262,7 +262,7 @@ create source s17 with (
scan.startup.mode = 'earliest'
) row format protobuf message 'test.User' row schema location 'file:///risingwave/proto-complex-schema'

statement ok
statement error without schema registry
create source s18 with (
connector = 'kafka',
topic = 'avro_c_bin',
Expand Down Expand Up @@ -560,15 +560,15 @@ select id, first_name, last_name, email from s8_no_schema_field;
1004 Anne1 Kretchmar [email protected]
1005 add add2 add

query IITFFBTT
select id, sequence_id, name, score, avg_score, is_lasted, entrance_date, birthday, passed from s9;
----
32 64 str_value 32 64 t 1970-01-01 1970-01-01 00:00:00+00:00 1 mon 1 day 00:00:01

query ITITT
select id, code, timestamp, xfas, contacts, sex from s10;
----
100 abc 1473305798 {"(0,200,10.0.0.1)","(1,400,10.0.0.2)"} ("{1xxx,2xxx}","{1xxx,2xxx}") MALE
# query IITFFBTT
# select id, sequence_id, name, score, avg_score, is_lasted, entrance_date, birthday, passed from s9;
# ----
# 32 64 str_value 32 64 t 1970-01-01 1970-01-01 00:00:00+00:00 1 mon 1 day 00:00:01
#
# query ITITT
# select id, code, timestamp, xfas, contacts, sex from s10;
# ----
# 100 abc 1473305798 {"(0,200,10.0.0.1)","(1,400,10.0.0.2)"} ("{1xxx,2xxx}","{1xxx,2xxx}") MALE

query ITITT
select id, code, timestamp, xfas, contacts, sex from s11;
Expand Down Expand Up @@ -706,11 +706,11 @@ drop table s8
statement ok
drop table s8_no_schema_field

statement ok
drop table s9

statement ok
drop table s10
# statement ok
# drop table s9
#
# statement ok
# drop table s10

statement ok
drop table s11
Expand All @@ -733,8 +733,8 @@ drop table s16
statement ok
drop source s17

statement ok
drop source s18
# statement ok
# drop source s18

statement ok
drop table s20
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use either::Either;
use itertools::Itertools;
use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{
is_column_ids_dedup, ColumnCatalog, ColumnDesc, Schema, TableId, INITIAL_SOURCE_VERSION_ID,
KAFKA_TIMESTAMP_COLUMN_NAME,
Expand Down Expand Up @@ -151,6 +152,15 @@ async fn extract_avro_table_schema(
let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?;
conf.map_to_columns()?
} else {
if let risingwave_connector::parser::EncodingProperties::Avro(avro_props) =
&parser_config.encoding_config
&& !avro_props.use_schema_registry
&& !format_encode_options
.get("with_deprecated_file_header")
.is_some_and(|v| v == "true")
{
bail_not_implemented!(issue = 12871, "avro without schema registry");
}
let conf = AvroParserConfig::new(parser_config.encoding_config).await?;
conf.map_to_columns()?
};
Expand Down
Loading