Skip to content

Commit

Permalink
support avro map
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Aug 9, 2024
1 parent 519cd3d commit a46e2cc
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 18 deletions.
6 changes: 3 additions & 3 deletions src/common/src/types/map_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ impl MapType {
&self.0 .1
}

pub fn into_struct(self) -> StructType {
pub fn into_struct(self) -> DataType {
let (key, value) = *self.0;
Self::struct_type_for_map(key, value)
DataType::Struct(Self::struct_type_for_map(key, value))
}

pub fn into_list(self) -> DataType {
DataType::List(Box::new(DataType::Struct(self.into_struct())))
DataType::List(Box::new(self.into_struct()))
}

/// String and integral types are allowed.
Expand Down
3 changes: 1 addition & 2 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,7 @@ impl DataType {
}
DataType::Map(datatype) => {
// Same as List<Struct<K,V>>
pb.field_type =
vec![DataType::Struct(datatype.clone().into_struct()).to_protobuf()];
pb.field_type = vec![datatype.clone().into_struct().to_protobuf()];
}
DataType::Boolean
| DataType::Int16
Expand Down
3 changes: 1 addition & 2 deletions src/common/src/util/value_encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,7 @@ fn deserialize_value(ty: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
DataType::List(item_type) => deserialize_list(item_type, data)?,
DataType::Map(map_type) => {
// FIXME: clone type everytime here is inefficient
let list = deserialize_list(&DataType::Struct(map_type.clone().into_struct()), data)?
.into_list();
let list = deserialize_list(&map_type.clone().into_struct(), data)?.into_list();
ScalarImpl::Map(MapValue::from_list_entries(list))
}
})
Expand Down
27 changes: 25 additions & 2 deletions src/connector/codec/src/decoder/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use risingwave_common::array::{ListValue, StructValue};
use risingwave_common::bail;
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{
DataType, Date, DatumCow, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
ToOwnedDatum,
DataType, Date, DatumCow, Interval, JsonbVal, MapValue, ScalarImpl, Time, Timestamp,
Timestamptz, ToOwnedDatum,
};
use risingwave_common::util::iter_util::ZipEqFast;

Expand Down Expand Up @@ -318,6 +318,29 @@ impl<'a> AvroParseOptions<'a> {
(DataType::Varchar, Value::Uuid(uuid)) => {
uuid.as_hyphenated().to_string().into_boxed_str().into()
}
(DataType::Map(map_type), Value::Map(map)) => {
let schema = self.extract_inner_schema(None);
let mut builder = map_type
.clone()
.into_struct()
.create_array_builder(map.len());
// Since the map is HashMap, we can ensure
// key is non-null and unique, keys and values have the same length.
for (k, v) in map {
let value_datum = Self {
schema,
relax_numeric: self.relax_numeric,
}
.convert_to_datum(v, map_type.value())?
.to_owned_datum();
builder.append(
StructValue::new(vec![Some(k.as_str().into()), value_datum])
.to_owned_datum(),
);
}
let list = ListValue::new(builder.finish());
MapValue::from_list_entries(list).into()
}

(_expected, _got) => Err(create_error())?,
};
Expand Down
16 changes: 7 additions & 9 deletions src/connector/codec/src/decoder/avro/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use apache_avro::AvroResult;
use itertools::Itertools;
use risingwave_common::error::NotImplemented;
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{DataType, Decimal};
use risingwave_common::types::{DataType, Decimal, MapType};
use risingwave_common::{bail, bail_not_implemented};
use risingwave_pb::plan_common::{AdditionalColumn, ColumnDesc, ColumnDescVersion};

Expand Down Expand Up @@ -57,8 +57,7 @@ impl ResolvedAvroSchema {
#[derive(Debug, Copy, Clone)]
pub enum MapHandling {
Jsonb,
// TODO: <https://github.com/risingwavelabs/risingwave/issues/13387>
// Map
Map,
}

impl MapHandling {
Expand All @@ -69,6 +68,7 @@ impl MapHandling {
) -> anyhow::Result<Option<Self>> {
let mode = match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
Some("jsonb") => Self::Jsonb,
Some("map") => Self::Map,
Some(v) => bail!("unrecognized {} value {}", Self::OPTION_KEY, v),
None => return Ok(None),
};
Expand Down Expand Up @@ -266,12 +266,10 @@ fn avro_type_mapping(
);
}
}
None => {
// We require it to be specified, because we don't want to have a bad default behavior.
// But perhaps changing the default behavior won't be a breaking change,
// because it affects only on creation time, what the result ColumnDesc will be, and the ColumnDesc will be persisted.
// This is unlike timestamp.handing.mode, which affects parser's behavior on the runtime.
bail!("`map.handling.mode` not specified in ENCODE AVRO (...). Currently supported modes: `jsonb`")
Some(MapHandling::Map) | None => {
let value = avro_type_mapping(value_schema.as_ref(), map_handling)
.context("failed to convert Avro map type")?;
DataType::Map(MapType::from_kv(DataType::Varchar, value))
}
}
}
Expand Down
114 changes: 114 additions & 0 deletions src/connector/codec/tests/integration_tests/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,3 +885,117 @@ fn test_union() {
])"#]],
);
}

#[test]
fn test_map() {
let schema = r#"
{
"type": "record",
"namespace": "com.redpanda.examples.avro",
"name": "ClickEvent",
"fields": [
{
"name": "map_str",
"type": {
"type": "map",
"values": "string"
},
"default": {}
},
{
"name": "map_map_int",
"type": {
"type": "map",
"values": {
"type": "map",
"values": "int"
}
}
}
]
}
"#;

let data = &[
// {"map_str": {"a":"1","b":"2"}, "map_map_int": {"m1": {"a":1,"b":2}, "m2": {"c":3,"d":4}}}
"0402610278026202790004046d310402610202620400046d32040263060264080000",
// {"map_map_int": {}}
"0000",
];

check(
schema,
data,
Config {
map_handling: None,
data_encoding: TestDataEncoding::HexBinary,
},
expect![[r#"
[
map_str(#1): Map(Varchar,Varchar),
map_map_int(#2): Map(Varchar,Map(Varchar,Int32)),
]"#]],
expect![[r#"
Owned([
StructValue(
Utf8("a"),
Utf8("x"),
),
StructValue(
Utf8("b"),
Utf8("y"),
),
])
Owned([
StructValue(
Utf8("m2"),
[
StructValue(
Utf8("d"),
Int32(4),
),
StructValue(
Utf8("c"),
Int32(3),
),
],
),
StructValue(
Utf8("m1"),
[
StructValue(
Utf8("b"),
Int32(2),
),
StructValue(
Utf8("a"),
Int32(1),
),
],
),
])
----
Owned([])
Owned([])"#]],
);

check(
schema,
data,
Config {
map_handling: Some(MapHandling::Jsonb),
data_encoding: TestDataEncoding::HexBinary,
},
expect![[r#"
[
map_str(#1): Jsonb,
map_map_int(#2): Jsonb,
]"#]],
expect![[r#"
Owned(Jsonb(JsonbRef({"a": "x", "b": "y"})))
Owned(Jsonb(JsonbRef({"m1": {"a": Number(1), "b": Number(2)}, "m2": {"c": Number(3), "d": Number(4)}})))
----
Owned(Jsonb(JsonbRef({})))
Owned(Jsonb(JsonbRef({})))"#]],
);
}
12 changes: 12 additions & 0 deletions src/connector/codec/tests/integration_tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ impl<'a> std::fmt::Debug for DataTypeTestDisplay<'a> {
.debug_tuple("List")
.field(&DataTypeTestDisplay(t))
.finish(),
DataType::Map(m) => {
write!(
f,
"Map({:?},{:?})",
&DataTypeTestDisplay(m.key()),
&DataTypeTestDisplay(m.value())
)
}
_ => {
// do not use alternative display for simple types
write!(f, "{:?}", self.0)
Expand Down Expand Up @@ -76,6 +84,10 @@ impl<'a> std::fmt::Debug for ScalarRefImplTestDisplay<'a> {
.debug_list()
.entries(l.iter().map(DatumRefTestDisplay))
.finish(),
ScalarRefImpl::Map(m) => f
.debug_list()
.entries(m.inner().iter().map(DatumRefTestDisplay))
.finish(),
_ => {
// do not use alternative display for simple types
write!(f, "{:?}", self.0)
Expand Down

0 comments on commit a46e2cc

Please sign in to comment.