From e1e881405ce9b180858d5ba278803b4795b63e74 Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Tue, 26 Nov 2024 20:59:13 +1030 Subject: [PATCH] feat(parquet)!: coerce_types flag for date64 (#6313) * feat(parquet): coerce_types flag for date64 * fix: use ARROW:schema instead of LogicalType to embed Date64 type * chore: lint * chore: lint * chore: lint * chore: add physical_type to StatisticsConverter to account for coerce_types * chore: blank line changes * chore: revert minor test changes * chore: update to latest parquet-testing * chore: add physical_type fix for get_data_page_statistics macro * docs: add docs for coerce_types * chore: cargo fmt --all * docs: coerce_types lossless round trip Co-authored-by: Ed Seidl --------- Co-authored-by: Ed Seidl --- .../src/arrow/array_reader/primitive_array.rs | 60 ++++++++- parquet/src/arrow/arrow_reader/mod.rs | 115 +++++++++++++++++- parquet/src/arrow/arrow_reader/statistics.rs | 56 ++++++--- parquet/src/arrow/arrow_writer/mod.rs | 14 ++- parquet/src/arrow/schema/mod.rs | 50 +++++--- parquet/src/file/properties.rs | 26 ++++ 6 files changed, 272 insertions(+), 49 deletions(-) diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 010e9c2eed3f..a952e00e12ef 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -208,10 +208,10 @@ where // As there is not always a 1:1 mapping between Arrow and Parquet, there // are datatypes which we must convert explicitly. // These are: - // - date64: we should cast int32 to date32, then date32 to date64. - // - decimal: cast in32 to decimal, int64 to decimal + // - date64: cast int32 to date32, then date32 to date64. + // - decimal: cast int32 to decimal, int64 to decimal let array = match target_type { - ArrowType::Date64 => { + ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => { // this is cheap as it internally reinterprets the data let a = arrow_cast::cast(&array, &ArrowType::Date32)?; arrow_cast::cast(&a, target_type)? @@ -305,9 +305,9 @@ mod tests { use crate::util::test_common::rand_gen::make_pages; use crate::util::InMemoryPageIterator; use arrow::datatypes::ArrowPrimitiveType; - use arrow_array::{Array, PrimitiveArray}; + use arrow_array::{Array, Date32Array, PrimitiveArray}; - use arrow::datatypes::DataType::Decimal128; + use arrow::datatypes::DataType::{Date32, Decimal128}; use rand::distributions::uniform::SampleUniform; use std::collections::VecDeque; @@ -783,4 +783,54 @@ mod tests { assert_ne!(array, &data_decimal_array) } } + + #[test] + fn test_primitive_array_reader_date32_type() { + // parquet `INT32` to date + let message_type = " + message test_schema { + REQUIRED INT32 date1 (DATE); + } + "; + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + let column_desc = schema.column(0); + + // create the array reader + { + let mut data = Vec::new(); + let mut page_lists = Vec::new(); + make_column_chunks::( + column_desc.clone(), + Encoding::PLAIN, + 100, + -99999999, + 99999999, + &mut Vec::new(), + &mut Vec::new(), + &mut data, + &mut page_lists, + true, + 2, + ); + let page_iterator = InMemoryPageIterator::new(page_lists); + + let mut array_reader = + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + .unwrap(); + + // read data from the reader + // the data type is date + let array = array_reader.next_batch(50).unwrap(); + assert_eq!(array.data_type(), &Date32); + let array = array.as_any().downcast_ref::().unwrap(); + let data_date_array = data[0..50] + .iter() + .copied() + .map(Some) + .collect::(); + assert_eq!(array, &data_date_array); + } + } } diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index f351c25bd3ab..a3d011346cf4 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -932,8 +932,8 @@ mod tests { use arrow_array::builder::*; use arrow_array::cast::AsArray; use arrow_array::types::{ - Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type, Float64Type, - Time32MillisecondType, Time64MicrosecondType, + Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type, + Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType, }; use arrow_array::*; use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime}; @@ -1272,6 +1272,117 @@ mod tests { Ok(()) } + #[test] + fn test_date32_roundtrip() -> Result<()> { + use arrow_array::Date32Array; + + let schema = Arc::new(Schema::new(vec![Field::new( + "date32", + ArrowDataType::Date32, + false, + )])); + + let mut buf = Vec::with_capacity(1024); + + let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?; + + let original = RecordBatch::try_new( + schema, + vec![Arc::new(Date32Array::from(vec![ + -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000, + ]))], + )?; + + writer.write(&original)?; + writer.close()?; + + let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?; + let ret = reader.next().unwrap()?; + assert_eq!(ret, original); + + // Ensure can be downcast to the correct type + ret.column(0).as_primitive::(); + + Ok(()) + } + + #[test] + fn test_date64_roundtrip() -> Result<()> { + use arrow_array::Date64Array; + + let schema = Arc::new(Schema::new(vec![ + Field::new("small-date64", ArrowDataType::Date64, false), + Field::new("big-date64", ArrowDataType::Date64, false), + Field::new("invalid-date64", ArrowDataType::Date64, false), + ])); + + let mut default_buf = Vec::with_capacity(1024); + let mut coerce_buf = Vec::with_capacity(1024); + + let coerce_props = WriterProperties::builder().set_coerce_types(true).build(); + + let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?; + let mut coerce_writer = + ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?; + + static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24; + + let original = RecordBatch::try_new( + schema, + vec![ + // small-date64 + Arc::new(Date64Array::from(vec![ + -1_000_000 * NUM_MILLISECONDS_IN_DAY, + -1_000 * NUM_MILLISECONDS_IN_DAY, + 0, + 1_000 * NUM_MILLISECONDS_IN_DAY, + 1_000_000 * NUM_MILLISECONDS_IN_DAY, + ])), + // big-date64 + Arc::new(Date64Array::from(vec![ + -10_000_000_000 * NUM_MILLISECONDS_IN_DAY, + -1_000_000_000 * NUM_MILLISECONDS_IN_DAY, + 0, + 1_000_000_000 * NUM_MILLISECONDS_IN_DAY, + 10_000_000_000 * NUM_MILLISECONDS_IN_DAY, + ])), + // invalid-date64 + Arc::new(Date64Array::from(vec![ + -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1, + -1_000 * NUM_MILLISECONDS_IN_DAY + 1, + 1, + 1_000 * NUM_MILLISECONDS_IN_DAY + 1, + 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1, + ])), + ], + )?; + + default_writer.write(&original)?; + coerce_writer.write(&original)?; + + default_writer.close()?; + coerce_writer.close()?; + + let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?; + let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?; + + let default_ret = default_reader.next().unwrap()?; + let coerce_ret = coerce_reader.next().unwrap()?; + + // Roundtrip should be successful when default writer used + assert_eq!(default_ret, original); + + // Only small-date64 should roundtrip successfully when coerce_types writer is used + assert_eq!(coerce_ret.column(0), original.column(0)); + assert_ne!(coerce_ret.column(1), original.column(1)); + assert_ne!(coerce_ret.column(2), original.column(2)); + + // Ensure both can be downcast to the correct type + default_ret.column(0).as_primitive::(); + coerce_ret.column(0).as_primitive::(); + + Ok(()) + } struct RandFixedLenGen {} impl RandGen for RandFixedLenGen { diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index 8a7511be2afe..09f8ec7cc274 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -21,6 +21,7 @@ /// `arrow-rs/parquet/tests/arrow_reader/statistics.rs`. use crate::arrow::buffer::bit_util::sign_extend_be; use crate::arrow::parquet_column; +use crate::basic::Type as PhysicalType; use crate::data_type::{ByteArray, FixedLenByteArray}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData}; @@ -318,7 +319,7 @@ make_decimal_stats_iterator!( /// data_type: The data type of the statistics (e.g. `DataType::Int32`) /// iterator: The iterator of [`ParquetStatistics`] to extract the statistics from. macro_rules! get_statistics { - ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { + ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => { paste! { match $data_type { DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter( @@ -370,10 +371,11 @@ macro_rules! get_statistics { DataType::Date32 => Ok(Arc::new(Date32Array::from_iter( [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()), ))), - DataType::Date64 => Ok(Arc::new(Date64Array::from_iter( + DataType::Date64 if $physical_type == Some(PhysicalType::INT32) => Ok(Arc::new(Date64Array::from_iter( [<$stat_type_prefix Int32StatsIterator>]::new($iterator) - .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000)), - ))), + .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000))))), + DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter( + [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),))), DataType::Timestamp(unit, timezone) =>{ let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()); Ok(match unit { @@ -487,7 +489,7 @@ macro_rules! get_statistics { Ok(Arc::new(arr)) }, DataType::Dictionary(_, value_type) => { - [<$stat_type_prefix:lower _ statistics>](value_type, $iterator) + [<$stat_type_prefix:lower _ statistics>](value_type, $iterator, $physical_type) }, DataType::Utf8View => { let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator); @@ -524,6 +526,7 @@ macro_rules! get_statistics { DataType::Map(_,_) | DataType::Duration(_) | DataType::Interval(_) | + DataType::Date64 | // required to cover $physical_type match guard DataType::Null | DataType::List(_) | DataType::ListView(_) | @@ -790,7 +793,7 @@ get_decimal_page_stats_iterator!( ); macro_rules! get_data_page_statistics { - ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { + ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => { paste! { match $data_type { DataType::Boolean => { @@ -929,7 +932,7 @@ macro_rules! get_data_page_statistics { Ok(Arc::new(builder.finish())) }, DataType::Dictionary(_, value_type) => { - [<$stat_type_prefix:lower _ page_statistics>](value_type, $iterator) + [<$stat_type_prefix:lower _ page_statistics>](value_type, $iterator, $physical_type) }, DataType::Timestamp(unit, timezone) => { let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(); @@ -941,7 +944,7 @@ macro_rules! get_data_page_statistics { }) }, DataType::Date32 => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))), - DataType::Date64 => Ok( + DataType::Date64 if $physical_type == Some(PhysicalType::INT32)=> Ok( Arc::new( Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) .map(|x| { @@ -954,6 +957,7 @@ macro_rules! get_data_page_statistics { ) ) ), + DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))), DataType::Decimal128(precision, scale) => Ok(Arc::new( Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)), DataType::Decimal256(precision, scale) => Ok(Arc::new( @@ -1040,6 +1044,7 @@ macro_rules! get_data_page_statistics { } Ok(Arc::new(builder.finish())) }, + DataType::Date64 | // required to cover $physical_type match guard DataType::Null | DataType::Duration(_) | DataType::Interval(_) | @@ -1067,8 +1072,9 @@ macro_rules! get_data_page_statistics { fn min_statistics<'a, I: Iterator>>( data_type: &DataType, iterator: I, + physical_type: Option, ) -> Result { - get_statistics!(Min, data_type, iterator) + get_statistics!(Min, data_type, iterator, physical_type) } /// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] @@ -1077,26 +1083,35 @@ fn min_statistics<'a, I: Iterator>>( fn max_statistics<'a, I: Iterator>>( data_type: &DataType, iterator: I, + physical_type: Option, ) -> Result { - get_statistics!(Max, data_type, iterator) + get_statistics!(Max, data_type, iterator, physical_type) } /// Extracts the min statistics from an iterator /// of parquet page [`Index`]'es to an [`ArrayRef`] -pub(crate) fn min_page_statistics<'a, I>(data_type: &DataType, iterator: I) -> Result +pub(crate) fn min_page_statistics<'a, I>( + data_type: &DataType, + iterator: I, + physical_type: Option, +) -> Result where I: Iterator, { - get_data_page_statistics!(Min, data_type, iterator) + get_data_page_statistics!(Min, data_type, iterator, physical_type) } /// Extracts the max statistics from an iterator /// of parquet page [`Index`]'es to an [`ArrayRef`] -pub(crate) fn max_page_statistics<'a, I>(data_type: &DataType, iterator: I) -> Result +pub(crate) fn max_page_statistics<'a, I>( + data_type: &DataType, + iterator: I, + physical_type: Option, +) -> Result where I: Iterator, { - get_data_page_statistics!(Max, data_type, iterator) + get_data_page_statistics!(Max, data_type, iterator, physical_type) } /// Extracts the null count statistics from an iterator @@ -1177,6 +1192,8 @@ pub struct StatisticsConverter<'a> { arrow_field: &'a Field, /// treat missing null_counts as 0 nulls missing_null_counts_as_zero: bool, + /// The physical type of the matched column in the Parquet schema + physical_type: Option, } impl<'a> StatisticsConverter<'a> { @@ -1304,6 +1321,7 @@ impl<'a> StatisticsConverter<'a> { parquet_column_index: parquet_index, arrow_field, missing_null_counts_as_zero: true, + physical_type: parquet_index.map(|idx| parquet_schema.column(idx).physical_type()), }) } @@ -1346,7 +1364,7 @@ impl<'a> StatisticsConverter<'a> { /// // get the minimum value for the column "foo" in the parquet file /// let min_values: ArrayRef = converter /// .row_group_mins(metadata.row_groups().iter()) - /// .unwrap(); + /// .unwrap(); /// // if "foo" is a Float64 value, the returned array will contain Float64 values /// assert_eq!(min_values, Arc::new(Float64Array::from(vec![Some(1.0), Some(2.0)])) as _); /// ``` @@ -1363,7 +1381,7 @@ impl<'a> StatisticsConverter<'a> { let iter = metadatas .into_iter() .map(|x| x.column(parquet_index).statistics()); - min_statistics(data_type, iter) + min_statistics(data_type, iter, self.physical_type) } /// Extract the maximum values from row group statistics in [`RowGroupMetaData`] @@ -1382,7 +1400,7 @@ impl<'a> StatisticsConverter<'a> { let iter = metadatas .into_iter() .map(|x| x.column(parquet_index).statistics()); - max_statistics(data_type, iter) + max_statistics(data_type, iter, self.physical_type) } /// Extract the null counts from row group statistics in [`RowGroupMetaData`] @@ -1490,7 +1508,7 @@ impl<'a> StatisticsConverter<'a> { (*num_data_pages, column_page_index_per_row_group_per_column) }); - min_page_statistics(data_type, iter) + min_page_statistics(data_type, iter, self.physical_type) } /// Extract the maximum values from Data Page statistics. @@ -1521,7 +1539,7 @@ impl<'a> StatisticsConverter<'a> { (*num_data_pages, column_page_index_per_row_group_per_column) }); - max_page_statistics(data_type, iter) + max_page_statistics(data_type, iter, self.physical_type) } /// Returns a [`UInt64Array`] with null counts for each data page. diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index c9f9114481d8..222d86131e0a 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -180,11 +180,11 @@ impl ArrowWriter { arrow_schema: SchemaRef, options: ArrowWriterOptions, ) -> Result { + let mut props = options.properties; let schema = match options.schema_root { - Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s)?, - None => arrow_to_parquet_schema(&arrow_schema)?, + Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s, props.coerce_types())?, + None => arrow_to_parquet_schema(&arrow_schema, props.coerce_types())?, }; - let mut props = options.properties; if !options.skip_arrow_metadata { // add serialized arrow schema add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props); @@ -549,8 +549,8 @@ impl ArrowColumnChunk { /// ])); /// /// // Compute the parquet schema -/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap(); /// let props = Arc::new(WriterProperties::default()); +/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref(), props.coerce_types()).unwrap(); /// /// // Create writers for each of the leaf columns /// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap(); @@ -858,6 +858,12 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result { match column.data_type() { + ArrowDataType::Date64 => { + let array = arrow_cast::cast(column, &ArrowDataType::Int64)?; + + let array = array.as_primitive::(); + write_primitive(typed, array.values(), levels) + } ArrowDataType::Int64 => { let array = column.as_primitive::(); write_primitive(typed, array.values(), levels) diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index bf1fb633227c..ec34840d858f 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -229,16 +229,20 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut /// /// The name of the root schema element defaults to `"arrow_schema"`, this can be /// overridden with [`arrow_to_parquet_schema_with_root`] -pub fn arrow_to_parquet_schema(schema: &Schema) -> Result { - arrow_to_parquet_schema_with_root(schema, "arrow_schema") +pub fn arrow_to_parquet_schema(schema: &Schema, coerce_types: bool) -> Result { + arrow_to_parquet_schema_with_root(schema, "arrow_schema", coerce_types) } /// Convert arrow schema to parquet schema specifying the name of the root schema element -pub fn arrow_to_parquet_schema_with_root(schema: &Schema, root: &str) -> Result { +pub fn arrow_to_parquet_schema_with_root( + schema: &Schema, + root: &str, + coerce_types: bool, +) -> Result { let fields = schema .fields() .iter() - .map(|field| arrow_to_parquet_type(field).map(Arc::new)) + .map(|field| arrow_to_parquet_type(field, coerce_types).map(Arc::new)) .collect::>()?; let group = Type::group_type_builder(root).with_fields(fields).build()?; Ok(SchemaDescriptor::new(Arc::new(group))) @@ -298,7 +302,7 @@ pub fn decimal_length_from_precision(precision: u8) -> usize { } /// Convert an arrow field to a parquet `Type` -fn arrow_to_parquet_type(field: &Field) -> Result { +fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { let name = field.name().as_str(); let repetition = if field.is_nullable() { Repetition::OPTIONAL @@ -415,12 +419,20 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_repetition(repetition) .with_id(id) .build(), - // date64 is cast to date32 (#1666) - DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32) - .with_logical_type(Some(LogicalType::Date)) - .with_repetition(repetition) - .with_id(id) - .build(), + DataType::Date64 => { + if coerce_types { + Type::primitive_type_builder(name, PhysicalType::INT32) + .with_logical_type(Some(LogicalType::Date)) + .with_repetition(repetition) + .with_id(id) + .build() + } else { + Type::primitive_type_builder(name, PhysicalType::INT64) + .with_repetition(repetition) + .with_id(id) + .build() + } + } DataType::Time32(TimeUnit::Second) => { // Cannot represent seconds in LogicalType Type::primitive_type_builder(name, PhysicalType::INT32) @@ -518,7 +530,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { Type::group_type_builder(name) .with_fields(vec![Arc::new( Type::group_type_builder("list") - .with_fields(vec![Arc::new(arrow_to_parquet_type(f)?)]) + .with_fields(vec![Arc::new(arrow_to_parquet_type(f, coerce_types)?)]) .with_repetition(Repetition::REPEATED) .build()?, )]) @@ -537,7 +549,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { // recursively convert children to types/nodes let fields = fields .iter() - .map(|f| arrow_to_parquet_type(f).map(Arc::new)) + .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new)) .collect::>()?; Type::group_type_builder(name) .with_fields(fields) @@ -551,8 +563,8 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_fields(vec![Arc::new( Type::group_type_builder(field.name()) .with_fields(vec![ - Arc::new(arrow_to_parquet_type(&struct_fields[0])?), - Arc::new(arrow_to_parquet_type(&struct_fields[1])?), + Arc::new(arrow_to_parquet_type(&struct_fields[0], coerce_types)?), + Arc::new(arrow_to_parquet_type(&struct_fields[1], coerce_types)?), ]) .with_repetition(Repetition::REPEATED) .build()?, @@ -571,7 +583,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { DataType::Dictionary(_, ref value) => { // Dictionary encoding not handled at the schema level let dict_field = field.clone().with_data_type(value.as_ref().clone()); - arrow_to_parquet_type(&dict_field) + arrow_to_parquet_type(&dict_field, coerce_types) } DataType::RunEndEncoded(_, _) => Err(arrow_err!( "Converting RunEndEncodedType to parquet not supported", @@ -1557,7 +1569,7 @@ mod tests { Field::new("decimal256", DataType::Decimal256(39, 2), false), ]; let arrow_schema = Schema::new(arrow_fields); - let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema).unwrap(); + let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap(); assert_eq!( parquet_schema.columns().len(), @@ -1594,7 +1606,7 @@ mod tests { false, )]; let arrow_schema = Schema::new(arrow_fields); - let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema); + let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true); assert!(converted_arrow_schema.is_err()); converted_arrow_schema.unwrap(); @@ -1866,7 +1878,7 @@ mod tests { // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?; - let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?; + let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema, true)?; let parq_fields = parq_schema_descr.root_schema().get_fields(); assert_eq!(parq_fields.len(), 2); assert_eq!(parq_fields[0].get_basic_info().id(), 1); diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 49980f525f47..cb07c1f497a7 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -57,6 +57,8 @@ pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05; pub const DEFAULT_BLOOM_FILTER_NDV: u64 = 1_000_000_u64; /// Default values for [`WriterProperties::statistics_truncate_length`] pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option = None; +/// Default values for [`WriterProperties::coerce_types`] +pub const DEFAULT_COERCE_TYPES: bool = false; /// Parquet writer version. /// @@ -163,6 +165,7 @@ pub struct WriterProperties { sorting_columns: Option>, column_index_truncate_length: Option, statistics_truncate_length: Option, + coerce_types: bool, } impl Default for WriterProperties { @@ -265,6 +268,19 @@ impl WriterProperties { self.statistics_truncate_length } + /// Returns `coerce_types` boolean + /// + /// Some Arrow types do not have a corresponding Parquet logical type. + /// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`. + /// Writers have the option to coerce these into native Parquet types. Type + /// coercion allows for meaningful representations that do not require + /// downstream readers to consider the embedded Arrow schema. However, type + /// coercion also prevents the data from being losslessly round-tripped. This method + /// returns `true` if type coercion enabled. + pub fn coerce_types(&self) -> bool { + self.coerce_types + } + /// Returns encoding for a data page, when dictionary encoding is enabled. /// This is not configurable. #[inline] @@ -361,6 +377,7 @@ pub struct WriterPropertiesBuilder { sorting_columns: Option>, column_index_truncate_length: Option, statistics_truncate_length: Option, + coerce_types: bool, } impl WriterPropertiesBuilder { @@ -381,6 +398,7 @@ impl WriterPropertiesBuilder { sorting_columns: None, column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH, + coerce_types: DEFAULT_COERCE_TYPES, } } @@ -401,6 +419,7 @@ impl WriterPropertiesBuilder { sorting_columns: self.sorting_columns, column_index_truncate_length: self.column_index_truncate_length, statistics_truncate_length: self.statistics_truncate_length, + coerce_types: self.coerce_types, } } @@ -731,6 +750,13 @@ impl WriterPropertiesBuilder { self.statistics_truncate_length = max_length; self } + + /// Sets flag to enable/disable type coercion. + /// Takes precedence over globally defined settings. + pub fn set_coerce_types(mut self, coerce_types: bool) -> Self { + self.coerce_types = coerce_types; + self + } } /// Controls the level of statistics to be computed by the writer and stored in