Skip to content

Commit

Permalink
feat(parquet)!: coerce_types flag for date64 (#6313)
Browse files Browse the repository at this point in the history
* feat(parquet): coerce_types flag for date64

* fix: use ARROW:schema instead of LogicalType to embed Date64 type

* chore: lint

* chore: lint

* chore: lint

* chore: add physical_type to StatisticsConverter to account for coerce_types

* chore: blank line changes

* chore: revert minor test changes

* chore: update to latest parquet-testing

* chore: add physical_type fix for get_data_page_statistics macro

* docs: add docs for coerce_types

* chore: cargo fmt --all

* docs: coerce_types lossless round trip

Co-authored-by: Ed Seidl <[email protected]>

---------

Co-authored-by: Ed Seidl <[email protected]>
  • Loading branch information
dsgibbons and etseidl authored Nov 26, 2024
1 parent 741cbe8 commit e1e8814
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 49 deletions.
60 changes: 55 additions & 5 deletions parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,10 @@ where
// As there is not always a 1:1 mapping between Arrow and Parquet, there
// are datatypes which we must convert explicitly.
// These are:
// - date64: we should cast int32 to date32, then date32 to date64.
// - decimal: cast in32 to decimal, int64 to decimal
// - date64: cast int32 to date32, then date32 to date64.
// - decimal: cast int32 to decimal, int64 to decimal
let array = match target_type {
ArrowType::Date64 => {
ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => {
// this is cheap as it internally reinterprets the data
let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
arrow_cast::cast(&a, target_type)?
Expand Down Expand Up @@ -305,9 +305,9 @@ mod tests {
use crate::util::test_common::rand_gen::make_pages;
use crate::util::InMemoryPageIterator;
use arrow::datatypes::ArrowPrimitiveType;
use arrow_array::{Array, PrimitiveArray};
use arrow_array::{Array, Date32Array, PrimitiveArray};

use arrow::datatypes::DataType::Decimal128;
use arrow::datatypes::DataType::{Date32, Decimal128};
use rand::distributions::uniform::SampleUniform;
use std::collections::VecDeque;

Expand Down Expand Up @@ -783,4 +783,54 @@ mod tests {
assert_ne!(array, &data_decimal_array)
}
}

#[test]
fn test_primitive_array_reader_date32_type() {
// parquet `INT32` to date
let message_type = "
message test_schema {
REQUIRED INT32 date1 (DATE);
}
";
let schema = parse_message_type(message_type)
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
.unwrap();
let column_desc = schema.column(0);

// create the array reader
{
let mut data = Vec::new();
let mut page_lists = Vec::new();
make_column_chunks::<Int32Type>(
column_desc.clone(),
Encoding::PLAIN,
100,
-99999999,
99999999,
&mut Vec::new(),
&mut Vec::new(),
&mut data,
&mut page_lists,
true,
2,
);
let page_iterator = InMemoryPageIterator::new(page_lists);

let mut array_reader =
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
.unwrap();

// read data from the reader
// the data type is date
let array = array_reader.next_batch(50).unwrap();
assert_eq!(array.data_type(), &Date32);
let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
let data_date_array = data[0..50]
.iter()
.copied()
.map(Some)
.collect::<Date32Array>();
assert_eq!(array, &data_date_array);
}
}
}
115 changes: 113 additions & 2 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,8 +932,8 @@ mod tests {
use arrow_array::builder::*;
use arrow_array::cast::AsArray;
use arrow_array::types::{
Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type, Float64Type,
Time32MillisecondType, Time64MicrosecondType,
Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type,
Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
};
use arrow_array::*;
use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
Expand Down Expand Up @@ -1272,6 +1272,117 @@ mod tests {
Ok(())
}

#[test]
fn test_date32_roundtrip() -> Result<()> {
use arrow_array::Date32Array;

let schema = Arc::new(Schema::new(vec![Field::new(
"date32",
ArrowDataType::Date32,
false,
)]));

let mut buf = Vec::with_capacity(1024);

let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;

let original = RecordBatch::try_new(
schema,
vec![Arc::new(Date32Array::from(vec![
-1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
]))],
)?;

writer.write(&original)?;
writer.close()?;

let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
let ret = reader.next().unwrap()?;
assert_eq!(ret, original);

// Ensure can be downcast to the correct type
ret.column(0).as_primitive::<Date32Type>();

Ok(())
}

#[test]
fn test_date64_roundtrip() -> Result<()> {
use arrow_array::Date64Array;

let schema = Arc::new(Schema::new(vec![
Field::new("small-date64", ArrowDataType::Date64, false),
Field::new("big-date64", ArrowDataType::Date64, false),
Field::new("invalid-date64", ArrowDataType::Date64, false),
]));

let mut default_buf = Vec::with_capacity(1024);
let mut coerce_buf = Vec::with_capacity(1024);

let coerce_props = WriterProperties::builder().set_coerce_types(true).build();

let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
let mut coerce_writer =
ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;

static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;

let original = RecordBatch::try_new(
schema,
vec![
// small-date64
Arc::new(Date64Array::from(vec![
-1_000_000 * NUM_MILLISECONDS_IN_DAY,
-1_000 * NUM_MILLISECONDS_IN_DAY,
0,
1_000 * NUM_MILLISECONDS_IN_DAY,
1_000_000 * NUM_MILLISECONDS_IN_DAY,
])),
// big-date64
Arc::new(Date64Array::from(vec![
-10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
-1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
0,
1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
])),
// invalid-date64
Arc::new(Date64Array::from(vec![
-1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
-1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1,
1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
])),
],
)?;

default_writer.write(&original)?;
coerce_writer.write(&original)?;

default_writer.close()?;
coerce_writer.close()?;

let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;

let default_ret = default_reader.next().unwrap()?;
let coerce_ret = coerce_reader.next().unwrap()?;

// Roundtrip should be successful when default writer used
assert_eq!(default_ret, original);

// Only small-date64 should roundtrip successfully when coerce_types writer is used
assert_eq!(coerce_ret.column(0), original.column(0));
assert_ne!(coerce_ret.column(1), original.column(1));
assert_ne!(coerce_ret.column(2), original.column(2));

// Ensure both can be downcast to the correct type
default_ret.column(0).as_primitive::<Date64Type>();
coerce_ret.column(0).as_primitive::<Date64Type>();

Ok(())
}
struct RandFixedLenGen {}

impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
Expand Down
Loading

0 comments on commit e1e8814

Please sign in to comment.