From 6ae695c65006520f57d20e9e38548d6e6e250a07 Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Mon, 20 Nov 2023 18:13:13 +0800 Subject: [PATCH 1/3] chore(deps): Bump apache-avro to 0.16.0 --- Cargo.lock | 142 ++++++++++++------ src/connector/Cargo.toml | 2 +- src/connector/src/parser/avro/parser.rs | 17 ++- src/connector/src/parser/avro/util.rs | 12 +- .../src/parser/debezium/avro_parser.rs | 1 + src/connector/src/parser/unified/avro.rs | 7 +- src/connector/src/sink/encoder/avro.rs | 6 +- .../src/test_data/simple-schema.avsc | 11 +- 8 files changed, 124 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a5fe9a23f3806..b876d870a1800 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -171,7 +171,7 @@ dependencies = [ "byteorder", "digest", "lazy_static", - "libflate", + "libflate 1.4.0", "log", "num-bigint", "quad-rand", @@ -179,40 +179,38 @@ dependencies = [ "regex", "serde", "serde_json", - "strum 0.25.0", - "strum_macros 0.25.2", + "strum", + "strum_macros", "thiserror", - "typed-builder", + "typed-builder 0.14.0", "uuid", "zerocopy", ] [[package]] name = "apache-avro" -version = "0.15.0" -source = "git+https://github.com/risingwavelabs/avro?rev=3c257c1f65e63a80ac44f1cffaf083cdaeab431a" +version = "0.16.0" +source = "git+https://github.com/risingwavelabs/avro?rev=b251943586e7fe7dd52319fc84f8be47921434aa#b251943586e7fe7dd52319fc84f8be47921434aa" dependencies = [ - "byteorder", "bzip2", "crc32fast", "digest", "lazy_static", - "libflate", + "libflate 2.0.0", "log", "num-bigint", "quad-rand", "rand", - "regex", + "regex-lite", "serde", "serde_json", "snap", - "strum 0.24.1", - "strum_macros 0.24.3", + "strum", + "strum_macros", "thiserror", - "typed-builder", + "typed-builder 0.16.2", "uuid", "xz2", - "zerocopy", "zstd 0.12.4", ] @@ -1837,8 +1835,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c64043d6c7b7a4c58e39e7efccfdea7b93d885a795d0c054a69dbbf4dd52686" dependencies = [ "crossterm 0.27.0", - "strum 0.25.0", - "strum_macros 0.25.2", + "strum", + "strum_macros", "unicode-width", ] @@ -1960,6 +1958,15 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + [[package]] name = "cpp_demangle" version = "0.4.3" @@ -2430,6 +2437,12 @@ name = "darwin-libproc-sys" version = "0.2.0" source = "git+https://github.com/risingwavelabs/darwin-libproc.git?rev=a502be24bd0971463f5bcbfe035a248d8ba503b7#a502be24bd0971463f5bcbfe035a248d8ba503b7" +[[package]] +name = "dary_heap" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7762d17f1241643615821a8455a0b2c3e803784b058693d990b11f2dce25a0ca" + [[package]] name = "dashmap" version = "5.5.3" @@ -4048,7 +4061,7 @@ version = "0.0.10" source = "git+https://github.com/icelake-io/icelake?rev=5cdcdffd24f4624a0a43f92c5f368988169a799b#5cdcdffd24f4624a0a43f92c5f368988169a799b" dependencies = [ "anyhow", - "apache-avro 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", + "apache-avro 0.15.0", "arrow-arith", "arrow-array", "arrow-buffer", @@ -4479,7 +4492,20 @@ checksum = "5ff4ae71b685bbad2f2f391fe74f6b7659a34871c08b210fdc039e43bee07d18" dependencies = [ "adler32", "crc32fast", - "libflate_lz77", + "libflate_lz77 1.2.0", +] + +[[package]] +name = "libflate" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7d5654ae1795afc7ff76f4365c2c8791b0feb18e8996a96adad8ffd7c3b2bf" +dependencies = [ + "adler32", + "core2", + "crc32fast", + "dary_heap", + "libflate_lz77 2.0.0", ] [[package]] @@ -4491,6 +4517,17 @@ dependencies = [ "rle-decode-fast", ] +[[package]] +name = "libflate_lz77" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be5f52fb8c451576ec6b79d3f4deb327398bc05bbdbd99021a6e77a4c855d524" +dependencies = [ + "core2", + "hashbrown 0.13.2", + "rle-decode-fast", +] + [[package]] name = "libloading" version = "0.7.4" @@ -6999,6 +7036,12 @@ dependencies = [ "regex-syntax 0.8.0", ] +[[package]] +name = "regex-lite" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b661b2f27137bdbc16f00eda72866a92bb28af1753ffbd56744fb6e2e9cd8e" + [[package]] name = "regex-syntax" version = "0.6.29" @@ -7344,8 +7387,8 @@ dependencies = [ "risingwave_meta_node", "risingwave_rt", "shell-words", - "strum 0.25.0", - "strum_macros 0.25.2", + "strum", + "strum_macros", "task_stats_alloc", "tempfile", "tikv-jemallocator", @@ -7435,8 +7478,8 @@ dependencies = [ "smallbitset", "speedate", "static_assertions", - "strum 0.25.0", - "strum_macros 0.25.2", + "strum", + "strum_macros", "sysinfo", "tempfile", "thiserror", @@ -7592,7 +7635,7 @@ name = "risingwave_connector" version = "1.3.0-alpha" dependencies = [ "anyhow", - "apache-avro 0.15.0 (git+https://github.com/risingwavelabs/avro?rev=3c257c1f65e63a80ac44f1cffaf083cdaeab431a)", + "apache-avro 0.16.0", "arrow-array", "arrow-schema", "async-nats", @@ -7666,8 +7709,8 @@ dependencies = [ "serde_with", "serde_yaml", "simd-json", - "strum 0.25.0", - "strum_macros 0.25.2", + "strum", + "strum_macros", "syn 1.0.109", "tempfile", "thiserror", @@ -8194,7 +8237,7 @@ dependencies = [ "prost 0.12.1", "prost-helpers", "serde", - "strum 0.25.0", + "strum", "thiserror", "walkdir", "workspace-hack", @@ -8923,7 +8966,7 @@ dependencies = [ "serde", "serde_json", "sqlx", - "strum 0.25.0", + "strum", "thiserror", "time", "tracing", @@ -9611,8 +9654,8 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "242f76c50fd18cbf098607090ade73a08d39cfd84ea835f3796a2c855223b19b" dependencies = [ - "strum 0.25.0", - "strum_macros 0.25.2", + "strum", + "strum_macros", ] [[package]] @@ -9958,32 +10001,13 @@ dependencies = [ "syn 2.0.37", ] -[[package]] -name = "strum" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" - [[package]] name = "strum" version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" dependencies = [ - "strum_macros 0.25.2", -] - -[[package]] -name = "strum_macros" -version = "0.24.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" -dependencies = [ - "heck 0.4.1", - "proc-macro2", - "quote", - "rustversion", - "syn 1.0.109", + "strum_macros", ] [[package]] @@ -10837,6 +10861,26 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "typed-builder" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34085c17941e36627a879208083e25d357243812c30e7d7387c3b954f30ade16" +dependencies = [ + "typed-builder-macro", +] + +[[package]] +name = "typed-builder-macro" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "typenum" version = "1.16.0" @@ -11494,7 +11538,7 @@ dependencies = [ "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", - "strum 0.25.0", + "strum", "subtle", "syn 1.0.109", "syn 2.0.37", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 7970dbe881324..a930a2f256417 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -15,7 +15,7 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" -apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "3c257c1f65e63a80ac44f1cffaf083cdaeab431a", features = [ +apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "b251943586e7fe7dd52319fc84f8be47921434aa", features = [ "snappy", "zstandard", "bzip", diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 273b4dcd57fcf..12bf833cd98ff 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -203,6 +203,7 @@ mod test { use std::ops::Sub; use std::path::PathBuf; + use apache_avro::schema::RecordSchema; use apache_avro::types::{Record, Value}; use apache_avro::{Codec, Days, Duration, Millis, Months, Reader, Schema, Writer}; use itertools::Itertools; @@ -471,12 +472,17 @@ mod test { match build_field(inner_schema) { None => { - let index_of_union = - union_schema.find_schema(&Value::Null).unwrap().0 as u32; + let index_of_union = union_schema + .find_schema_with_known_schemata::<&Schema>(&Value::Null, None, &None) + .unwrap() + .0 as u32; Some(Value::Union(index_of_union, Box::new(Value::Null))) } Some(value) => { - let index_of_union = union_schema.find_schema(&value).unwrap().0 as u32; + let index_of_union = union_schema + .find_schema_with_known_schemata::<&Schema>(&value, None, &None) + .unwrap() + .0 as u32; Some(Value::Union(index_of_union, Box::new(value))) } } @@ -487,9 +493,9 @@ mod test { fn build_avro_data(schema: &Schema) -> Record<'_> { let mut record = Record::new(schema).unwrap(); - if let Schema::Record { + if let Schema::Record(RecordSchema { name: _, fields, .. - } = schema.clone() + }) = schema.clone() { for field in &fields { let value = build_field(&field.schema) @@ -513,7 +519,6 @@ mod test { #[tokio::test] async fn test_new_avro_parser() { let avro_parser_rs = new_avro_parser_from_local("simple-schema.avsc").await; - assert!(avro_parser_rs.is_ok()); let avro_parser = avro_parser_rs.unwrap(); println!("avro_parser = {:?}", avro_parser); } diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index e1b63962bf23c..7f7ded02c4396 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use apache_avro::Schema; +use apache_avro::schema::{DecimalSchema, RecordSchema, Schema}; use itertools::Itertools; use risingwave_common::types::{DataType, Decimal}; use risingwave_pb::plan_common::ColumnDesc; pub fn avro_schema_to_column_descs(schema: &Schema) -> anyhow::Result> { - if let Schema::Record { fields, .. } = schema { + if let Schema::Record(RecordSchema { fields, .. }) = schema { let mut index = 0; let fields = fields .iter() @@ -40,11 +40,11 @@ fn avro_field_to_column_desc( ) -> anyhow::Result { let data_type = avro_type_mapping(schema)?; match schema { - Schema::Record { + Schema::Record(RecordSchema { name: schema_name, fields, .. - } => { + }) => { let vec_column = fields .iter() .map(|f| avro_field_to_column_desc(&f.name, &f.schema, index)) @@ -80,7 +80,7 @@ fn avro_type_mapping(schema: &Schema) -> anyhow::Result { Schema::Boolean => DataType::Boolean, Schema::Float => DataType::Float32, Schema::Double => DataType::Float64, - Schema::Decimal { precision, .. } => { + Schema::Decimal(DecimalSchema { precision, .. }) => { if *precision > Decimal::MAX_PRECISION.into() { tracing::warn!( "RisingWave supports decimal precision up to {}, but got {}. Will truncate.", @@ -98,7 +98,7 @@ fn avro_type_mapping(schema: &Schema) -> anyhow::Result { Schema::Enum { .. } => DataType::Varchar, Schema::TimeMillis => DataType::Time, Schema::TimeMicros => DataType::Time, - Schema::Record { fields, name, .. } => { + Schema::Record(RecordSchema { fields, name, .. }) => { if name.name == DBZ_VARIABLE_SCALE_DECIMAL_NAME && name.namespace == Some(DBZ_VARIABLE_SCALE_DECIMAL_NAMESPACE.into()) { diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index e18560f6c747b..7498f2e39b1b2 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -204,6 +204,7 @@ mod tests { let inner_shema_str = r#"{ "type": "record", "name": "Value", + "namespace": "dbserver1.inventory.customers", "fields": [ { "name": "id", diff --git a/src/connector/src/parser/unified/avro.rs b/src/connector/src/parser/unified/avro.rs index 3b7bcd14e7b6c..9712eb3c6abbe 100644 --- a/src/connector/src/parser/unified/avro.rs +++ b/src/connector/src/parser/unified/avro.rs @@ -15,6 +15,7 @@ use std::str::FromStr; use anyhow::anyhow; +use apache_avro::schema::{DecimalSchema, RecordSchema}; use apache_avro::types::Value; use apache_avro::{Decimal as AvroDecimal, Schema}; use chrono::Datelike; @@ -110,9 +111,9 @@ impl<'a> AvroParseOptions<'a> { // ---- Decimal ----- (Some(DataType::Decimal) | None, Value::Decimal(avro_decimal)) => { let (precision, scale) = match self.schema { - Some(Schema::Decimal { + Some(Schema::Decimal(DecimalSchema { precision, scale, .. - }) => (*precision, *scale), + })) => (*precision, *scale), _ => Err(create_error())?, }; let decimal = avro_decimal_to_rust_decimal(avro_decimal.clone(), precision, scale) @@ -392,7 +393,7 @@ pub fn avro_extract_field_schema<'a>( name: Option<&'a str>, ) -> anyhow::Result<&'a Schema> { match schema { - Schema::Record { fields, lookup, .. } => { + Schema::Record(RecordSchema { fields, lookup, .. }) => { let name = name.ok_or_else(|| anyhow::format_err!("no name provided for a field in record"))?; let index = lookup.get(name).ok_or_else(|| { diff --git a/src/connector/src/sink/encoder/avro.rs b/src/connector/src/sink/encoder/avro.rs index 7aff29dbb43b7..12b2b6ba6919b 100644 --- a/src/connector/src/sink/encoder/avro.rs +++ b/src/connector/src/sink/encoder/avro.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use apache_avro::schema::Schema as AvroSchema; +use apache_avro::schema::{RecordSchema, Schema as AvroSchema}; use apache_avro::types::{Record, Value}; use risingwave_common::catalog::Schema; use risingwave_common::row::Row; @@ -250,7 +250,7 @@ fn validate_fields<'rw>( rw_fields: impl Iterator, avro: &AvroSchema, ) -> Result<()> { - let AvroSchema::Record { fields, lookup, .. } = avro else { + let AvroSchema::Record(RecordSchema { fields, lookup, .. }) = avro else { return Err(FieldEncodeError::new(format!( "expect avro record but got {}", avro.canonical_form(), @@ -283,7 +283,7 @@ fn encode_fields<'avro, 'rw>( schema: &'avro AvroSchema, ) -> Result> { let mut record = Record::new(schema).unwrap(); - let AvroSchema::Record { fields, lookup, .. } = schema else { + let AvroSchema::Record(RecordSchema { fields, lookup, .. }) = schema else { unreachable!() }; let mut present = vec![false; fields.len()]; diff --git a/src/connector/src/test_data/simple-schema.avsc b/src/connector/src/test_data/simple-schema.avsc index 91e806c396a77..f4223cf91291f 100644 --- a/src/connector/src/test_data/simple-schema.avsc +++ b/src/connector/src/test_data/simple-schema.avsc @@ -34,20 +34,19 @@ { "name": "entrance_date", "type": "int", - "logicalType": "date", - "default": null + "logicalType": "date" }, { "name": "birthday", "type": "long", "logicalType": "timestamp-millis", - "default": null + "default": 0 }, { "name": "anniversary", "type": "long", "logicalType": "timestamp-micros", - "default": null + "default": 0 }, { "name": "passed", @@ -61,7 +60,7 @@ { "name": "bytes", "type": "bytes", - "default": null + "default": "" } ] -} \ No newline at end of file +} From 23d4b96c29842a2d981778f75b88d631e4674974 Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Mon, 20 Nov 2023 18:17:45 +0800 Subject: [PATCH 2/3] cleanup workaround --- src/connector/src/sink/encoder/avro.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/connector/src/sink/encoder/avro.rs b/src/connector/src/sink/encoder/avro.rs index 12b2b6ba6919b..f9c7f8f15dbe3 100644 --- a/src/connector/src/sink/encoder/avro.rs +++ b/src/connector/src/sink/encoder/avro.rs @@ -570,9 +570,7 @@ mod tests { Some(ScalarImpl::Interval(Interval::from_month_day_usec( 13, 2, 1000000, ))), - // https://github.com/apache/avro/pull/2283 - // r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#, - r#"{"type": {"type": "fixed", "name": "Duration", "size": 12}, "logicalType": "duration"}"#, + r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#, Value::Duration(apache_avro::Duration::new( apache_avro::Months::new(13), apache_avro::Days::new(2), @@ -648,8 +646,7 @@ mod tests { -1, i64::MAX, ))), - // https://github.com/apache/avro/pull/2283 - r#"{"type": {"type": "fixed", "name": "Duration", "size": 12}, "logicalType": "duration"}"#, + r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#, "encode error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration", ); From 42cc97ffe1cf2e3754bbf82c56b4ab01e3bddb62 Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Mon, 20 Nov 2023 22:22:15 +0800 Subject: [PATCH 3/3] update avro_bin.1 following simple-schema.avsc fix --- scripts/source/test_data/avro_bin.1 | Bin 867 -> 844 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/scripts/source/test_data/avro_bin.1 b/scripts/source/test_data/avro_bin.1 index 7ceba48e6e187054183892fcb6792add655d8cd8..8bf7eb866c8454230c0284d3b1f8c4571d52d7db 100644 GIT binary patch delta 105 zcmaFNc7{#JKPiimMJ%zbC||EQIU_YUam_}h9>&Rg8N(;8ZpO}w>HsE3hXM=2#WEwMDG zM9B)IVDdc1a27D*8)FoN5y&)I0whu!Tg$Mu^Xi*?PVqZVwR?EFox-+G=3+JyKo