From 2f8ad1fb2282bd79935e32023c391156fc42b0f1 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 9 Aug 2024 02:51:23 +0800 Subject: [PATCH] support avro map --- src/common/src/types/map_type.rs | 6 +- src/common/src/types/mod.rs | 3 +- src/common/src/util/value_encoding/mod.rs | 3 +- src/connector/codec/src/decoder/avro/mod.rs | 27 ++++- .../codec/src/decoder/avro/schema.rs | 15 +-- .../codec/tests/integration_tests/avro.rs | 114 ++++++++++++++++++ .../codec/tests/integration_tests/utils.rs | 12 ++ 7 files changed, 162 insertions(+), 18 deletions(-) diff --git a/src/common/src/types/map_type.rs b/src/common/src/types/map_type.rs index 11600bb45b42d..4d9ec3dc5f143 100644 --- a/src/common/src/types/map_type.rs +++ b/src/common/src/types/map_type.rs @@ -73,13 +73,13 @@ impl MapType { &self.0 .1 } - pub fn into_struct(self) -> StructType { + pub fn into_struct(self) -> DataType { let (key, value) = *self.0; - Self::struct_type_for_map(key, value) + DataType::Struct(Self::struct_type_for_map(key, value)) } pub fn into_list(self) -> DataType { - DataType::List(Box::new(DataType::Struct(self.into_struct()))) + DataType::List(Box::new(self.into_struct())) } /// String and integral types are allowed. diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 32625e96a7110..6efa8536fc722 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -345,8 +345,7 @@ impl DataType { } DataType::Map(datatype) => { // Same as List> - pb.field_type = - vec![DataType::Struct(datatype.clone().into_struct()).to_protobuf()]; + pb.field_type = vec![datatype.clone().into_struct().to_protobuf()]; } DataType::Boolean | DataType::Int16 diff --git a/src/common/src/util/value_encoding/mod.rs b/src/common/src/util/value_encoding/mod.rs index d72dcdc7303f3..3b4167331cb7e 100644 --- a/src/common/src/util/value_encoding/mod.rs +++ b/src/common/src/util/value_encoding/mod.rs @@ -359,8 +359,7 @@ fn deserialize_value(ty: &DataType, data: &mut impl Buf) -> Result { DataType::List(item_type) => deserialize_list(item_type, data)?, DataType::Map(map_type) => { // FIXME: clone type everytime here is inefficient - let list = deserialize_list(&DataType::Struct(map_type.clone().into_struct()), data)? - .into_list(); + let list = deserialize_list(&map_type.clone().into_struct(), data)?.into_list(); ScalarImpl::Map(MapValue::from_list_entries(list)) } }) diff --git a/src/connector/codec/src/decoder/avro/mod.rs b/src/connector/codec/src/decoder/avro/mod.rs index 93d16a32508db..3fd9e8a8bf995 100644 --- a/src/connector/codec/src/decoder/avro/mod.rs +++ b/src/connector/codec/src/decoder/avro/mod.rs @@ -25,8 +25,8 @@ use risingwave_common::array::{ListValue, StructValue}; use risingwave_common::bail; use risingwave_common::log::LogSuppresser; use risingwave_common::types::{ - DataType, Date, DatumCow, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz, - ToOwnedDatum, + DataType, Date, DatumCow, Interval, JsonbVal, MapValue, ScalarImpl, Time, Timestamp, + Timestamptz, ToOwnedDatum, }; use risingwave_common::util::iter_util::ZipEqFast; @@ -318,6 +318,29 @@ impl<'a> AvroParseOptions<'a> { (DataType::Varchar, Value::Uuid(uuid)) => { uuid.as_hyphenated().to_string().into_boxed_str().into() } + (DataType::Map(map_type), Value::Map(map)) => { + let schema = self.extract_inner_schema(None); + let mut builder = map_type + .clone() + .into_struct() + .create_array_builder(map.len()); + // Since the map is HashMap, we can ensure + // key is non-null and unique, keys and values have the same length. + for (k, v) in map { + let value_datum = Self { + schema, + relax_numeric: self.relax_numeric, + } + .convert_to_datum(v, map_type.value())? + .to_owned_datum(); + builder.append( + StructValue::new(vec![Some(k.as_str().into()), value_datum]) + .to_owned_datum(), + ); + } + let list = ListValue::new(builder.finish()); + MapValue::from_list_entries(list).into() + } (_expected, _got) => Err(create_error())?, }; diff --git a/src/connector/codec/src/decoder/avro/schema.rs b/src/connector/codec/src/decoder/avro/schema.rs index 324b7fd426a56..114b281b44aa8 100644 --- a/src/connector/codec/src/decoder/avro/schema.rs +++ b/src/connector/codec/src/decoder/avro/schema.rs @@ -20,7 +20,7 @@ use apache_avro::AvroResult; use itertools::Itertools; use risingwave_common::error::NotImplemented; use risingwave_common::log::LogSuppresser; -use risingwave_common::types::{DataType, Decimal}; +use risingwave_common::types::{DataType, Decimal, MapType}; use risingwave_common::{bail, bail_not_implemented}; use risingwave_pb::plan_common::{AdditionalColumn, ColumnDesc, ColumnDescVersion}; @@ -57,8 +57,7 @@ impl ResolvedAvroSchema { #[derive(Debug, Copy, Clone)] pub enum MapHandling { Jsonb, - // TODO: - // Map + Map, } impl MapHandling { @@ -266,12 +265,10 @@ fn avro_type_mapping( ); } } - None => { - // We require it to be specified, because we don't want to have a bad default behavior. - // But perhaps changing the default behavior won't be a breaking change, - // because it affects only on creation time, what the result ColumnDesc will be, and the ColumnDesc will be persisted. - // This is unlike timestamp.handing.mode, which affects parser's behavior on the runtime. - bail!("`map.handling.mode` not specified in ENCODE AVRO (...). Currently supported modes: `jsonb`") + Some(MapHandling::Map) | None => { + let value = avro_type_mapping(value_schema.as_ref(), map_handling) + .context("failed to convert Avro map type")?; + DataType::Map(MapType::from_kv(DataType::Varchar, value)) } } } diff --git a/src/connector/codec/tests/integration_tests/avro.rs b/src/connector/codec/tests/integration_tests/avro.rs index 11421c151d7a5..f57785e5e4034 100644 --- a/src/connector/codec/tests/integration_tests/avro.rs +++ b/src/connector/codec/tests/integration_tests/avro.rs @@ -885,3 +885,117 @@ fn test_union() { ])"#]], ); } + +#[test] +fn test_map() { + let schema = r#" +{ + "type": "record", + "namespace": "com.redpanda.examples.avro", + "name": "ClickEvent", + "fields": [ + { + "name": "map_str", + "type": { + "type": "map", + "values": "string" + }, + "default": {} + }, + { + "name": "map_map_int", + "type": { + "type": "map", + "values": { + "type": "map", + "values": "int" + } + } + } + ] +} + "#; + + let data = &[ + // {"map_str": {"a":"1","b":"2"}, "map_map_int": {"m1": {"a":1,"b":2}, "m2": {"c":3,"d":4}}} + "0402610278026202790004046d310402610202620400046d32040263060264080000", + // {"map_map_int": {}} + "0000", + ]; + + check( + schema, + data, + Config { + map_handling: None, + data_encoding: TestDataEncoding::HexBinary, + }, + expect![[r#" + [ + map_str(#1): Map(Varchar,Varchar), + map_map_int(#2): Map(Varchar,Map(Varchar,Int32)), + ]"#]], + expect![[r#" + Owned([ + StructValue( + Utf8("a"), + Utf8("x"), + ), + StructValue( + Utf8("b"), + Utf8("y"), + ), + ]) + Owned([ + StructValue( + Utf8("m2"), + [ + StructValue( + Utf8("d"), + Int32(4), + ), + StructValue( + Utf8("c"), + Int32(3), + ), + ], + ), + StructValue( + Utf8("m1"), + [ + StructValue( + Utf8("b"), + Int32(2), + ), + StructValue( + Utf8("a"), + Int32(1), + ), + ], + ), + ]) + ---- + Owned([]) + Owned([])"#]], + ); + + check( + schema, + data, + Config { + map_handling: Some(MapHandling::Jsonb), + data_encoding: TestDataEncoding::HexBinary, + }, + expect![[r#" + [ + map_str(#1): Jsonb, + map_map_int(#2): Jsonb, + ]"#]], + expect![[r#" + Owned(Jsonb(JsonbRef({"a": "x", "b": "y"}))) + Owned(Jsonb(JsonbRef({"m1": {"a": Number(1), "b": Number(2)}, "m2": {"c": Number(3), "d": Number(4)}}))) + ---- + Owned(Jsonb(JsonbRef({}))) + Owned(Jsonb(JsonbRef({})))"#]], + ); +} diff --git a/src/connector/codec/tests/integration_tests/utils.rs b/src/connector/codec/tests/integration_tests/utils.rs index cecb0796c455a..dd375656c51e3 100644 --- a/src/connector/codec/tests/integration_tests/utils.rs +++ b/src/connector/codec/tests/integration_tests/utils.rs @@ -44,6 +44,14 @@ impl<'a> std::fmt::Debug for DataTypeTestDisplay<'a> { .debug_tuple("List") .field(&DataTypeTestDisplay(t)) .finish(), + DataType::Map(m) => { + write!( + f, + "Map({:?},{:?})", + &DataTypeTestDisplay(m.key()), + &DataTypeTestDisplay(m.value()) + ) + } _ => { // do not use alternative display for simple types write!(f, "{:?}", self.0) @@ -76,6 +84,10 @@ impl<'a> std::fmt::Debug for ScalarRefImplTestDisplay<'a> { .debug_list() .entries(l.iter().map(DatumRefTestDisplay)) .finish(), + ScalarRefImpl::Map(m) => f + .debug_list() + .entries(m.inner().iter().map(DatumRefTestDisplay)) + .finish(), _ => { // do not use alternative display for simple types write!(f, "{:?}", self.0)