diff --git a/Cargo.toml b/Cargo.toml index 6c8cbe58c..daf83bf04 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ rust-version = "1.77.1" [workspace.dependencies] anyhow = "1.0.72" -apache-avro = "0.16" +apache-avro = { git = "https://github.com/apache/avro.git", recv = "fdab5db0816e28e3e10c87910c8b6f98c33072dc" } array-init = "2" arrow-arith = { version = "52" } arrow-array = { version = "52" } diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs index 636f1283c..524f42fca 100644 --- a/crates/iceberg/src/avro/schema.rs +++ b/crates/iceberg/src/avro/schema.rs @@ -24,16 +24,20 @@ use crate::spec::{ }; use crate::{ensure_data_valid, Error, ErrorKind, Result}; use apache_avro::schema::{ - DecimalSchema, FixedSchema, Name, RecordField as AvroRecordField, RecordFieldOrder, - RecordSchema, UnionSchema, + ArraySchema, DecimalSchema, FixedSchema, MapSchema, Name, RecordField as AvroRecordField, + RecordFieldOrder, RecordSchema, UnionSchema, }; use apache_avro::Schema as AvroSchema; use itertools::{Either, Itertools}; use serde_json::{Number, Value}; +const ELEMENT_ID: &str = "element-id"; const FILED_ID_PROP: &str = "field-id"; +const KEY_ID: &str = "key-id"; +const VALUE_ID: &str = "value-id"; const UUID_BYTES: usize = 16; const UUID_LOGICAL_TYPE: &str = "uuid"; +const MAP_LOGICAL_TYPE: &str = "map"; // # TODO: https://github.com/apache/iceberg-rust/issues/86 // This const may better to maintain in avro-rs. const LOGICAL_TYPE: &str = "logicalType"; @@ -123,8 +127,13 @@ impl SchemaVisitor for SchemaToAvroSchema { field_schema = avro_optional(field_schema)?; } - // TODO: We need to add element id prop here, but rust's avro schema doesn't support property except record schema. - Ok(Either::Left(AvroSchema::Array(Box::new(field_schema)))) + Ok(Either::Left(AvroSchema::Array(ArraySchema { + items: Box::new(field_schema), + attributes: BTreeMap::from([( + ELEMENT_ID.to_string(), + Value::Number(Number::from(list.element_field.id)), + )]), + }))) } fn map( @@ -140,7 +149,19 @@ impl SchemaVisitor for SchemaToAvroSchema { } if matches!(key_field_schema, AvroSchema::String) { - Ok(Either::Left(AvroSchema::Map(Box::new(value_field_schema)))) + Ok(Either::Left(AvroSchema::Map(MapSchema { + types: Box::new(value_field_schema), + attributes: BTreeMap::from([ + ( + KEY_ID.to_string(), + Value::Number(Number::from(map.key_field.id)), + ), + ( + VALUE_ID.to_string(), + Value::Number(Number::from(map.value_field.id)), + ), + ]), + }))) } else { // Avro map requires that key must be string type. Here we convert it to array if key is // not string type. @@ -186,7 +207,13 @@ impl SchemaVisitor for SchemaToAvroSchema { fields, )?; - Ok(Either::Left(AvroSchema::Array(item_avro_schema.into()))) + Ok(Either::Left(AvroSchema::Array(ArraySchema { + items: Box::new(item_avro_schema), + attributes: BTreeMap::from([( + LOGICAL_TYPE.to_string(), + Value::String(MAP_LOGICAL_TYPE.to_string()), + )]), + }))) } } @@ -254,6 +281,7 @@ pub(crate) fn avro_fixed_schema(len: usize, logical_type: Option<&str>) -> Resul doc: None, size: len, attributes, + default: None, })) } @@ -287,8 +315,9 @@ pub(crate) trait AvroSchemaVisitor { fn union(&mut self, union: &UnionSchema, options: Vec) -> Result; - fn array(&mut self, array: &AvroSchema, item: Self::T) -> Result; - fn map(&mut self, map: &AvroSchema, value: Self::T) -> Result; + fn array(&mut self, array: &ArraySchema, item: Self::T) -> Result; + fn map(&mut self, map: &MapSchema, value: Self::T) -> Result; + fn map_array(&mut self, array: &RecordSchema, key: Self::T, value: Self::T) -> Result; fn primitive(&mut self, schema: &AvroSchema) -> Result; } @@ -315,27 +344,43 @@ pub(crate) fn visit(schema: &AvroSchema, visitor: &mut V) visitor.union(union, option_results) } AvroSchema::Array(item) => { - let item_result = visit(item, visitor)?; - visitor.array(schema, item_result) + if let Some(logical_type) = item + .attributes + .get(LOGICAL_TYPE) + .and_then(|v| Value::as_str(v)) + { + if logical_type == MAP_LOGICAL_TYPE { + if let AvroSchema::Record(record_schema) = &*item.items { + let key = visit(&record_schema.fields[0].schema, visitor)?; + let value = visit(&record_schema.fields[1].schema, visitor)?; + return visitor.map_array(record_schema, key, value); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Can't convert avro map schema, item is not a record.", + )); + } + } else { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Logical type {logical_type} is not support in iceberg array type.", + ), + )); + } + } + let item_result = visit(&item.items, visitor)?; + visitor.array(item, item_result) } AvroSchema::Map(inner) => { - let item_result = visit(inner, visitor)?; - visitor.map(schema, item_result) + let item_result = visit(&inner.types, visitor)?; + visitor.map(inner, item_result) } schema => visitor.primitive(schema), } } -struct AvroSchemaToSchema { - next_id: i32, -} - -impl AvroSchemaToSchema { - fn next_field_id(&mut self) -> i32 { - self.next_id += 1; - self.next_id - } -} +struct AvroSchemaToSchema; impl AvroSchemaVisitor for AvroSchemaToSchema { // Only `AvroSchema::Null` will return `None` @@ -403,46 +448,79 @@ impl AvroSchemaVisitor for AvroSchemaToSchema { } } - fn array(&mut self, array: &AvroSchema, item: Option) -> Result { - if let AvroSchema::Array(item_schema) = array { - let element_field = NestedField::list_element( - self.next_field_id(), - item.unwrap(), - !is_avro_optional(item_schema), - ) - .into(); - Ok(Some(Type::List(ListType { element_field }))) - } else { - Err(Error::new( - ErrorKind::Unexpected, - "Expected avro array schema, but {array}", - )) - } + fn array(&mut self, array: &ArraySchema, item: Option) -> Result { + let element_field_id = array + .attributes + .get(ELEMENT_ID) + .and_then(Value::as_i64) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro array schema, missing element id.", + ) + })? + .try_into() + .map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro array schema, element id is not a valid i32.", + ) + })?; + let element_field = NestedField::list_element( + element_field_id, + item.unwrap(), + !is_avro_optional(&array.items), + ) + .into(); + Ok(Some(Type::List(ListType { element_field }))) } - fn map(&mut self, map: &AvroSchema, value: Option) -> Result> { - if let AvroSchema::Map(value_schema) = map { - // Due to avro rust implementation's limitation, we can't store attributes in map schema, - // we will fix it later when it has been resolved. - let key_field = NestedField::map_key_element( - self.next_field_id(), - Type::Primitive(PrimitiveType::String), - ); - let value_field = NestedField::map_value_element( - self.next_field_id(), - value.unwrap(), - !is_avro_optional(value_schema), - ); - Ok(Some(Type::Map(MapType { - key_field: key_field.into(), - value_field: value_field.into(), - }))) - } else { - Err(Error::new( - ErrorKind::Unexpected, - "Expected avro map schema, but {map}", - )) - } + fn map(&mut self, map: &MapSchema, value: Option) -> Result> { + let key_field_id = map + .attributes + .get(KEY_ID) + .and_then(Value::as_i64) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro map schema, missing key id.", + ) + })? + .try_into() + .map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro map schema, key id is not a valid i32.", + ) + })?; + let key_field = + NestedField::map_key_element(key_field_id, Type::Primitive(PrimitiveType::String)); + let value_field_id = map + .attributes + .get(VALUE_ID) + .and_then(Value::as_i64) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro map schema, missing value id.", + ) + })? + .try_into() + .map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro map schema, value id is not a valid i32.", + ) + })?; + let value_field = NestedField::map_value_element( + value_field_id, + value.unwrap(), + !is_avro_optional(&map.types), + ); + Ok(Some(Type::Map(MapType { + key_field: key_field.into(), + value_field: value_field.into(), + }))) } fn primitive(&mut self, schema: &AvroSchema) -> Result> { @@ -494,12 +572,71 @@ impl AvroSchemaVisitor for AvroSchemaToSchema { Ok(Some(typ)) } + + fn map_array(&mut self, array: &RecordSchema, key: Self::T, value: Self::T) -> Result { + let key = key.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro map schema, missing key schema.", + ) + })?; + let value = value.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro map schema, missing value schema.", + ) + })?; + let key_id = array.fields[0] + .custom_attributes + .get(FILED_ID_PROP) + .and_then(Value::as_i64) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro map schema, missing key id.", + ) + })? + .try_into() + .map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro map schema, key id is not a valid i32.", + ) + })?; + let value_id = array.fields[1] + .custom_attributes + .get(FILED_ID_PROP) + .and_then(Value::as_i64) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro map schema, missing value id.", + ) + })? + .try_into() + .map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro map schema, value id is not a valid i32.", + ) + })?; + let key_field = NestedField::required(key_id, array.fields[0].name.clone(), key); + let value_field = if is_avro_optional(&array.fields[1].schema) { + NestedField::optional(value_id, array.fields[1].name.clone(), value) + } else { + NestedField::required(value_id, array.fields[1].name.clone(), value) + }; + Ok(Some(Type::Map(MapType { + key_field: key_field.into(), + value_field: value_field.into(), + }))) + } } /// Converts avro schema to iceberg schema. pub(crate) fn avro_schema_to_schema(avro_schema: &AvroSchema) -> Result { if let AvroSchema::Record(_) = avro_schema { - let mut converter = AvroSchemaToSchema { next_id: 0 }; + let mut converter = AvroSchemaToSchema; let typ = visit(avro_schema, &mut converter)?.expect("Iceberg schema should not be none."); if let Type::Struct(s) = typ { Schema::builder() @@ -527,6 +664,7 @@ mod tests { use apache_avro::schema::{Namespace, UnionSchema}; use apache_avro::Schema as AvroSchema; use std::fs::read_to_string; + use std::sync::Arc; fn read_test_data_file_to_avro_schema(filename: &str) -> AvroSchema { let input = read_to_string(format!( @@ -539,15 +677,9 @@ mod tests { AvroSchema::parse_str(input.as_str()).unwrap() } - fn check_schema_conversion( - avro_schema: AvroSchema, - expected_iceberg_schema: Schema, - check_avro_to_iceberg: bool, - ) { - if check_avro_to_iceberg { - let converted_iceberg_schema = avro_schema_to_schema(&avro_schema).unwrap(); - assert_eq!(expected_iceberg_schema, converted_iceberg_schema); - } + fn check_schema_conversion(avro_schema: AvroSchema, expected_iceberg_schema: Schema) { + let converted_iceberg_schema = avro_schema_to_schema(&avro_schema).unwrap(); + assert_eq!(expected_iceberg_schema, converted_iceberg_schema); let converted_avro_schema = schema_to_avro_schema( avro_schema.name().unwrap().fullname(Namespace::None), @@ -555,6 +687,9 @@ mod tests { ) .unwrap(); assert_eq!(avro_schema, converted_avro_schema); + + let converted_converted_iceberg_schema = avro_schema_to_schema(&avro_schema).unwrap(); + assert_eq!(expected_iceberg_schema, converted_converted_iceberg_schema); } #[test] @@ -633,7 +768,6 @@ mod tests { check_schema_conversion( read_test_data_file_to_avro_schema("avro_schema_manifest_file_v1.json"), iceberg_schema, - false, ); } @@ -682,7 +816,7 @@ mod tests { .unwrap() }; - check_schema_conversion(avro_schema, iceberg_schema, false); + check_schema_conversion(avro_schema, iceberg_schema); } #[test] @@ -731,7 +865,7 @@ mod tests { .unwrap() }; - check_schema_conversion(avro_schema, iceberg_schema, false); + check_schema_conversion(avro_schema, iceberg_schema); } #[test] @@ -808,7 +942,117 @@ mod tests { .unwrap() }; - check_schema_conversion(avro_schema, iceberg_schema, false); + check_schema_conversion(avro_schema, iceberg_schema); + } + + #[test] + fn test_schema_with_array_map() { + let avro_schema = { + AvroSchema::parse_str( + r#" +{ + "type": "record", + "name": "avro_schema", + "fields": [ + { + "name": "optional", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "k102_v103", + "fields": [ + { + "name": "key", + "type": "boolean", + "field-id": 102 + }, + { + "name": "value", + "type": ["null", "boolean"], + "field-id": 103 + } + ] + }, + "default": [], + "logicalType": "map" + }, + "field-id": 100 + },{ + "name": "required", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "k105_v106", + "fields": [ + { + "name": "key", + "type": "boolean", + "field-id": 105 + }, + { + "name": "value", + "type": "boolean", + "field-id": 106 + } + ] + }, + "default": [], + "logicalType": "map" + }, + "field-id": 104 + } + ] +} +"#, + ) + .unwrap() + }; + + let iceberg_schema = { + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::required( + 100, + "optional", + Type::Map(MapType { + key_field: NestedField::map_key_element( + 102, + PrimitiveType::Boolean.into(), + ) + .into(), + value_field: NestedField::map_value_element( + 103, + PrimitiveType::Boolean.into(), + false, + ) + .into(), + }), + )), + Arc::new(NestedField::required( + 104, + "required", + Type::Map(MapType { + key_field: NestedField::map_key_element( + 105, + PrimitiveType::Boolean.into(), + ) + .into(), + value_field: NestedField::map_value_element( + 106, + PrimitiveType::Boolean.into(), + true, + ) + .into(), + }), + )), + ]) + .build() + .unwrap() + }; + + check_schema_conversion(avro_schema, iceberg_schema); } #[test] @@ -820,7 +1064,7 @@ mod tests { ]) .unwrap(); - let mut converter = AvroSchemaToSchema { next_id: 0 }; + let mut converter = AvroSchemaToSchema; let options = avro_schema .variants() @@ -832,7 +1076,7 @@ mod tests { #[test] fn test_string_type() { - let mut converter = AvroSchemaToSchema { next_id: 0 }; + let mut converter = AvroSchemaToSchema; let avro_schema = AvroSchema::String; assert_eq!( @@ -857,10 +1101,14 @@ mod tests { .unwrap() }; - let mut converter = AvroSchemaToSchema { next_id: 0 }; + let AvroSchema::Map(avro_schema) = avro_schema else { + unreachable!() + }; + + let mut converter = AvroSchemaToSchema; let iceberg_type = Type::Map(MapType { - key_field: NestedField::map_key_element(1, PrimitiveType::String.into()).into(), - value_field: NestedField::map_value_element(2, PrimitiveType::Long.into(), false) + key_field: NestedField::map_key_element(101, PrimitiveType::String.into()).into(), + value_field: NestedField::map_value_element(102, PrimitiveType::Long.into(), false) .into(), }); @@ -884,7 +1132,7 @@ mod tests { .unwrap() }; - let mut converter = AvroSchemaToSchema { next_id: 0 }; + let mut converter = AvroSchemaToSchema; let iceberg_type = Type::from(PrimitiveType::Fixed(22)); @@ -896,7 +1144,7 @@ mod tests { #[test] fn test_unknown_primitive() { - let mut converter = AvroSchemaToSchema { next_id: 0 }; + let mut converter = AvroSchemaToSchema; assert!(converter.primitive(&AvroSchema::Duration).is_err()); } @@ -935,7 +1183,7 @@ mod tests { .unwrap() }; - let mut converter = AvroSchemaToSchema { next_id: 0 }; + let mut converter = AvroSchemaToSchema; assert_eq!( Type::decimal(25, 19).unwrap(), @@ -945,7 +1193,7 @@ mod tests { #[test] fn test_date_type() { - let mut converter = AvroSchemaToSchema { next_id: 0 }; + let mut converter = AvroSchemaToSchema; assert_eq!( Type::from(PrimitiveType::Date),