diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index c64dd5de1..5824a366c 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -30,6 +30,7 @@ use arrow_array::{ use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; use bitvec::macros::internal::funty::Fundamental; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use parquet::data_type::ByteArray; use parquet::file::statistics::Statistics; use rust_decimal::prelude::ToPrimitive; use uuid::Uuid; @@ -676,6 +677,31 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> Result Result<[u8; 16]> { + if array.len() > 16 { + return Err(Error::new( + ErrorKind::DataInvalid, + "fail to extend array with len > 16 to array with 16", + )); + } + + // Check the sign bit: if the first byte's MSB is 1, it's negative + let is_negative = array.data().get(0).map_or(false, |&b| b & 0x80 != 0); + + // Create a buffer of 16 bytes filled with the sign extension value + let mut extended = if is_negative { + [0xFF; 16] // Fill with 0xFF for negative numbers + } else { + [0x00; 16] // Fill with 0x00 for positive numbers + }; + + // Copy the Vec into the rightmost part of the buffer + let start = 16 - array.len(); + extended[start..].copy_from_slice(&array.data()); + + Ok(extended) +} + macro_rules! get_parquet_stat_as_datum { ($limit_type:tt) => { paste::paste! { @@ -723,10 +749,21 @@ macro_rules! get_parquet_stat_as_datum { let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else { return Ok(None); }; - Some(Datum::new( primitive_type.clone(), - PrimitiveLiteral::Int128(i128::from_le_bytes(bytes.try_into()?)), + PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)), + )) + } + (PrimitiveType::Decimal { + precision: _, + scale: _, + }, Statistics::FixedLenByteArray(stats)) => { + let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else { + return Ok(None); + }; + Some(Datum::new( + primitive_type.clone(), + PrimitiveLiteral::Int128(i128::from_be_bytes(extend_to_i128_big_endian(bytes.into())?)), )) } ( diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 1aa0c600c..e2b7ce5f8 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -542,6 +542,17 @@ mod tests { NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(), NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10))) .into(), + // Parquet Statistics will use different representation for Decimal with precision 38 and scale 5, + // so we need to add a new field for it. + NestedField::optional( + 16, + "decimal_38", + Type::Primitive(PrimitiveType::Decimal { + precision: 38, + scale: 5, + }), + ) + .into(), ]) .build() .unwrap() @@ -1030,9 +1041,14 @@ mod tests { ) .unwrap(), ) as ArrayRef; + let col16 = Arc::new( + arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)]) + .with_precision_and_scale(38, 5) + .unwrap(), + ) as ArrayRef; let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![ col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, - col14, col15, + col14, col15, col16, ]) .unwrap(); @@ -1093,6 +1109,16 @@ mod tests { ), (14, Datum::uuid(Uuid::from_u128(0))), (15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])), + ( + 16, + Datum::new( + PrimitiveType::Decimal { + precision: 38, + scale: 5 + }, + PrimitiveLiteral::Int128(1) + ) + ), ]) ); assert_eq!( @@ -1126,6 +1152,16 @@ mod tests { 15, Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]) ), + ( + 16, + Datum::new( + PrimitiveType::Decimal { + precision: 38, + scale: 5 + }, + PrimitiveLiteral::Int128(100) + ) + ), ]) );