Skip to content

Commit

Permalink
fix decimal parse for parquet statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Dec 24, 2024
1 parent 3b56acc commit bd06df2
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 3 deletions.
41 changes: 39 additions & 2 deletions crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use arrow_array::{
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
use bitvec::macros::internal::funty::Fundamental;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use parquet::data_type::ByteArray;
use parquet::file::statistics::Statistics;
use rust_decimal::prelude::ToPrimitive;
use uuid::Uuid;
Expand Down Expand Up @@ -676,6 +677,31 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Box<dyn ArrowDatum + Send
}
}

fn extend_to_i128_big_endian(array: ByteArray) -> Result<[u8; 16]> {
if array.len() > 16 {
return Err(Error::new(
ErrorKind::DataInvalid,
"fail to extend array with len > 16 to array with 16",
));
}

// Check the sign bit: if the first byte's MSB is 1, it's negative
let is_negative = array.data().get(0).map_or(false, |&b| b & 0x80 != 0);

// Create a buffer of 16 bytes filled with the sign extension value
let mut extended = if is_negative {
[0xFF; 16] // Fill with 0xFF for negative numbers
} else {
[0x00; 16] // Fill with 0x00 for positive numbers
};

// Copy the Vec<u8> into the rightmost part of the buffer
let start = 16 - array.len();
extended[start..].copy_from_slice(&array.data());

Ok(extended)
}

macro_rules! get_parquet_stat_as_datum {
($limit_type:tt) => {
paste::paste! {
Expand Down Expand Up @@ -723,10 +749,21 @@ macro_rules! get_parquet_stat_as_datum {
let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else {
return Ok(None);
};

Some(Datum::new(
primitive_type.clone(),
PrimitiveLiteral::Int128(i128::from_le_bytes(bytes.try_into()?)),
PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
))
}
(PrimitiveType::Decimal {
precision: _,
scale: _,
}, Statistics::FixedLenByteArray(stats)) => {
let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else {
return Ok(None);
};
Some(Datum::new(
primitive_type.clone(),
PrimitiveLiteral::Int128(i128::from_be_bytes(extend_to_i128_big_endian(bytes.into())?)),
))
}
(
Expand Down
38 changes: 37 additions & 1 deletion crates/iceberg/src/writer/file_writer/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,17 @@ mod tests {
NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(),
NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10)))
.into(),
// Parquet Statistics will use different representation for Decimal with precision 38 and scale 5,
// so we need to add a new field for it.
NestedField::optional(
16,
"decimal_38",
Type::Primitive(PrimitiveType::Decimal {
precision: 38,
scale: 5,
}),
)
.into(),
])
.build()
.unwrap()
Expand Down Expand Up @@ -1030,9 +1041,14 @@ mod tests {
)
.unwrap(),
) as ArrayRef;
let col16 = Arc::new(
arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
.with_precision_and_scale(38, 5)
.unwrap(),
) as ArrayRef;
let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
col14, col15,
col14, col15, col16,
])
.unwrap();

Expand Down Expand Up @@ -1093,6 +1109,16 @@ mod tests {
),
(14, Datum::uuid(Uuid::from_u128(0))),
(15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
(
16,
Datum::new(
PrimitiveType::Decimal {
precision: 38,
scale: 5
},
PrimitiveLiteral::Int128(1)
)
),
])
);
assert_eq!(
Expand Down Expand Up @@ -1126,6 +1152,16 @@ mod tests {
15,
Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
),
(
16,
Datum::new(
PrimitiveType::Decimal {
precision: 38,
scale: 5
},
PrimitiveLiteral::Int128(100)
)
),
])
);

Expand Down

0 comments on commit bd06df2

Please sign in to comment.