Skip to content

Commit

Permalink
Add ArrowToParquetSchemaConverter, deprecate `arrow_to_parquet_sche…
Browse files Browse the repository at this point in the history
…ma` (#6840)

* Add ArrowToParquetSchemaConverter, deprecate `arrow_to_parquet_schema` et al

* Fmt

* update test

* Update parquet/src/arrow/schema/mod.rs

Co-authored-by: Ed Seidl <[email protected]>

* Apply suggestions from code review

Co-authored-by: Ed Seidl <[email protected]>

* Improve comments

* Add more detail to WriterPropertiesBuilder docs

* Update parquet/src/file/properties.rs

Co-authored-by: Ed Seidl <[email protected]>

* Fix some more capitalization and add a link to Parquet date spec

* Update parquet/src/arrow/schema/mod.rs

Co-authored-by: Raphael Taylor-Davies <[email protected]>

* Revert "Update parquet/src/arrow/schema/mod.rs"

This reverts commit bd4e2d5.

* rename to ArrowSchemaConverter

* change from build --> convert

* update doc

* fix fmt

---------

Co-authored-by: Ed Seidl <[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>
  • Loading branch information
3 people authored Dec 16, 2024
1 parent 9ffa065 commit 2808625
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 63 deletions.
26 changes: 14 additions & 12 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@ use arrow_array::types::*;
use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};

use super::schema::{
add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
arrow_to_parquet_schema_with_root, decimal_length_from_precision,
};
use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};

use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
use crate::arrow::ArrowSchemaConverter;
use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
use crate::column::writer::encoder::ColumnValueEncoder;
use crate::column::writer::{
Expand Down Expand Up @@ -181,10 +179,11 @@ impl<W: Write + Send> ArrowWriter<W> {
options: ArrowWriterOptions,
) -> Result<Self> {
let mut props = options.properties;
let schema = match options.schema_root {
Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s, props.coerce_types())?,
None => arrow_to_parquet_schema(&arrow_schema, props.coerce_types())?,
};
let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
if let Some(schema_root) = &options.schema_root {
converter = converter.schema_root(schema_root);
}
let schema = converter.convert(&arrow_schema)?;
if !options.skip_arrow_metadata {
// add serialized arrow schema
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
Expand Down Expand Up @@ -390,9 +389,9 @@ impl ArrowWriterOptions {
}

/// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
pub fn with_schema_root(self, name: String) -> Self {
pub fn with_schema_root(self, schema_root: String) -> Self {
Self {
schema_root: Some(name),
schema_root: Some(schema_root),
..self
}
}
Expand Down Expand Up @@ -538,7 +537,7 @@ impl ArrowColumnChunk {
/// # use std::sync::Arc;
/// # use arrow_array::*;
/// # use arrow_schema::*;
/// # use parquet::arrow::arrow_to_parquet_schema;
/// # use parquet::arrow::ArrowSchemaConverter;
/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers};
/// # use parquet::file::properties::WriterProperties;
/// # use parquet::file::writer::SerializedFileWriter;
Expand All @@ -550,7 +549,10 @@ impl ArrowColumnChunk {
///
/// // Compute the parquet schema
/// let props = Arc::new(WriterProperties::default());
/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref(), props.coerce_types()).unwrap();
/// let parquet_schema = ArrowSchemaConverter::new()
/// .with_coerce_types(props.coerce_types())
/// .convert(&schema)
/// .unwrap();
///
/// // Create writers for each of the leaf columns
/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
Expand Down
8 changes: 6 additions & 2 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,13 @@ pub use self::async_writer::AsyncArrowWriter;
use crate::schema::types::SchemaDescriptor;
use arrow_schema::{FieldRef, Schema};

// continue to export deprecated methods until they are removed
#[allow(deprecated)]
pub use self::schema::arrow_to_parquet_schema;

pub use self::schema::{
arrow_to_parquet_schema, parquet_to_arrow_field_levels, parquet_to_arrow_schema,
parquet_to_arrow_schema_by_columns, FieldLevels,
parquet_to_arrow_field_levels, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
ArrowSchemaConverter, FieldLevels,
};

/// Schema metadata key used to store serialized Arrow IPC schema
Expand Down
172 changes: 141 additions & 31 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Provides API for converting parquet schema to arrow schema and vice versa.
//!
//! The main interfaces for converting parquet schema to arrow schema are
//! `parquet_to_arrow_schema`, `parquet_to_arrow_schema_by_columns` and
//! `parquet_to_arrow_field`.
//!
//! The interfaces for converting arrow schema to parquet schema is coming.
//! Converting Parquet schema <--> Arrow schema: [`ArrowSchemaConverter`] and [parquet_to_arrow_schema]
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
Expand Down Expand Up @@ -226,27 +220,134 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut
}
}

/// Convert arrow schema to parquet schema
/// Converter for Arrow schema to Parquet schema
///
/// The name of the root schema element defaults to `"arrow_schema"`, this can be
/// overridden with [`arrow_to_parquet_schema_with_root`]
pub fn arrow_to_parquet_schema(schema: &Schema, coerce_types: bool) -> Result<SchemaDescriptor> {
arrow_to_parquet_schema_with_root(schema, "arrow_schema", coerce_types)
/// Example:
/// ```
/// # use std::sync::Arc;
/// # use arrow_schema::{Field, Schema, DataType};
/// # use parquet::arrow::ArrowSchemaConverter;
/// use parquet::schema::types::{SchemaDescriptor, Type};
/// use parquet::basic; // note there are two `Type`s in the following example
/// // create an Arrow Schema
/// let arrow_schema = Schema::new(vec![
/// Field::new("a", DataType::Int64, true),
/// Field::new("b", DataType::Date32, true),
/// ]);
/// // convert the Arrow schema to a Parquet schema
/// let parquet_schema = ArrowSchemaConverter::new()
/// .convert(&arrow_schema)
/// .unwrap();
///
/// let expected_parquet_schema = SchemaDescriptor::new(
/// Arc::new(
/// Type::group_type_builder("arrow_schema")
/// .with_fields(vec![
/// Arc::new(
/// Type::primitive_type_builder("a", basic::Type::INT64)
/// .build().unwrap()
/// ),
/// Arc::new(
/// Type::primitive_type_builder("b", basic::Type::INT32)
/// .with_converted_type(basic::ConvertedType::DATE)
/// .with_logical_type(Some(basic::LogicalType::Date))
/// .build().unwrap()
/// ),
/// ])
/// .build().unwrap()
/// )
/// );
/// assert_eq!(parquet_schema, expected_parquet_schema);
/// ```
#[derive(Debug)]
pub struct ArrowSchemaConverter<'a> {
/// Name of the root schema in Parquet
schema_root: &'a str,
/// Should we coerce Arrow types to compatible Parquet types?
///
/// See docs on [Self::with_coerce_types]`
coerce_types: bool,
}

/// Convert arrow schema to parquet schema specifying the name of the root schema element
pub fn arrow_to_parquet_schema_with_root(
schema: &Schema,
root: &str,
coerce_types: bool,
) -> Result<SchemaDescriptor> {
let fields = schema
.fields()
.iter()
.map(|field| arrow_to_parquet_type(field, coerce_types).map(Arc::new))
.collect::<Result<_>>()?;
let group = Type::group_type_builder(root).with_fields(fields).build()?;
Ok(SchemaDescriptor::new(Arc::new(group)))
impl Default for ArrowSchemaConverter<'_> {
fn default() -> Self {
Self::new()
}
}

impl<'a> ArrowSchemaConverter<'a> {
/// Create a new converter
pub fn new() -> Self {
Self {
schema_root: "arrow_schema",
coerce_types: false,
}
}

/// Should Arrow types be coerced into Parquet native types (default `false`).
///
/// Setting this option to `true` will result in Parquet files that can be
/// read by more readers, but may lose precision for Arrow types such as
/// [`DataType::Date64`] which have no direct [corresponding Parquet type].
///
/// By default, this converter does not coerce to native Parquet types. Enabling type
/// coercion allows for meaningful representations that do not require
/// downstream readers to consider the embedded Arrow schema, and can allow
/// for greater compatibility with other Parquet implementations. However,
/// type coercion also prevents data from being losslessly round-tripped.
///
/// # Discussion
///
/// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
/// corresponding Parquet logical type. Thus, they can not be losslessly
/// round-tripped when stored using the appropriate Parquet logical type.
/// For example, some Date64 values may be truncated when stored with
/// parquet's native 32 bit date type.
///
/// For [`List`] and [`Map`] types, some Parquet readers expect certain
/// schema elements to have specific names (earlier versions of the spec
/// were somewhat ambiguous on this point). Type coercion will use the names
/// prescribed by the Parquet specification, potentially losing naming
/// metadata from the Arrow schema.
///
/// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
/// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
/// [corresponding Parquet type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
///
pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
self.coerce_types = coerce_types;
self
}

/// Set the root schema element name (defaults to `"arrow_schema"`).
pub fn schema_root(mut self, schema_root: &'a str) -> Self {
self.schema_root = schema_root;
self
}

/// Convert the specified Arrow [`Schema`] to the desired Parquet [`SchemaDescriptor`]
///
/// See example in [`ArrowSchemaConverter`]
pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
let fields = schema
.fields()
.iter()
.map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
.collect::<Result<_>>()?;
let group = Type::group_type_builder(self.schema_root)
.with_fields(fields)
.build()?;
Ok(SchemaDescriptor::new(Arc::new(group)))
}
}

/// Convert arrow schema to parquet schema
///
/// The name of the root schema element defaults to `"arrow_schema"`, this can be
/// overridden with [`ArrowSchemaConverter`]
#[deprecated(since = "54.0.0", note = "Use `ArrowToParquetSchemaConverter` instead")]
pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
ArrowSchemaConverter::new().convert(schema)
}

fn parse_key_value_metadata(
Expand Down Expand Up @@ -1488,7 +1589,10 @@ mod tests {
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true).unwrap();
let converted_arrow_schema = ArrowSchemaConverter::new()
.with_coerce_types(true)
.convert(&arrow_schema)
.unwrap();
assert_eq!(
parquet_schema.columns().len(),
converted_arrow_schema.columns().len()
Expand All @@ -1512,7 +1616,10 @@ mod tests {
";
let parquet_group_type = parse_message_type(message_type).unwrap();
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap();
let converted_arrow_schema = ArrowSchemaConverter::new()
.with_coerce_types(false)
.convert(&arrow_schema)
.unwrap();
assert_eq!(
parquet_schema.columns().len(),
converted_arrow_schema.columns().len()
Expand Down Expand Up @@ -1668,7 +1775,7 @@ mod tests {
Field::new("decimal256", DataType::Decimal256(39, 2), false),
];
let arrow_schema = Schema::new(arrow_fields);
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap();
let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();

assert_eq!(
parquet_schema.columns().len(),
Expand Down Expand Up @@ -1705,9 +1812,10 @@ mod tests {
false,
)];
let arrow_schema = Schema::new(arrow_fields);
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true);
let converted_arrow_schema = ArrowSchemaConverter::new()
.with_coerce_types(true)
.convert(&arrow_schema);

assert!(converted_arrow_schema.is_err());
converted_arrow_schema.unwrap();
}

Expand Down Expand Up @@ -1978,7 +2086,9 @@ mod tests {
// don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;

let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema, true)?;
let parq_schema_descr = ArrowSchemaConverter::new()
.with_coerce_types(true)
.convert(&arrow_schema)?;
let parq_fields = parq_schema_descr.root_schema().get_fields();
assert_eq!(parq_fields.len(), 2);
assert_eq!(parq_fields[0].get_basic_info().id(), 1);
Expand Down
37 changes: 19 additions & 18 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
// under the License.

//! Configuration via [`WriterProperties`] and [`ReaderProperties`]
use std::str::FromStr;
use std::{collections::HashMap, sync::Arc};

use crate::basic::{Compression, Encoding};
use crate::compression::{CodecOptions, CodecOptionsBuilder};
use crate::file::metadata::KeyValue;
use crate::format::SortingColumn;
use crate::schema::types::ColumnPath;
use std::str::FromStr;
use std::{collections::HashMap, sync::Arc};

/// Default value for [`WriterProperties::data_page_size_limit`]
pub const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
Expand Down Expand Up @@ -780,22 +779,24 @@ impl WriterPropertiesBuilder {
self
}

/// Sets flag to control if type coercion is enabled (defaults to `false`).
/// Should the writer coerce types to parquet native types (defaults to `false`).
///
/// # Notes
/// Some Arrow types do not have a corresponding Parquet logical type.
/// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
/// Also, for [`List`] and [`Map`] types, Parquet expects certain schema elements
/// to have specific names to be considered fully compliant.
/// Writers have the option to coerce these types and names to match those required
/// by the Parquet specification.
/// This type coercion allows for meaningful representations that do not require
/// downstream readers to consider the embedded Arrow schema, and can allow for greater
/// compatibility with other Parquet implementations. However, type
/// coercion also prevents the data from being losslessly round-tripped.
///
/// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
/// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
/// Leaving this option the default `false` will ensure the exact same data
/// written to parquet using this library will be read.
///
/// Setting this option to `true` will result in parquet files that can be
/// read by more readers, but potentially lose information in the process.
///
/// * Types such as [`DataType::Date64`], which have no direct corresponding
/// Parquet type, may be stored with lower precision.
///
/// * The internal field names of `List` and `Map` types will be renamed if
/// necessary to match what is required by the newest Parquet specification.
///
/// See [`ArrowToParquetSchemaConverter::with_coerce_types`] for more details
///
/// [`DataType::Date64`]: arrow_schema::DataType::Date64
/// [`ArrowToParquetSchemaConverter::with_coerce_types`]: crate::arrow::ArrowSchemaConverter::with_coerce_types
pub fn set_coerce_types(mut self, coerce_types: bool) -> Self {
self.coerce_types = coerce_types;
self
Expand Down

0 comments on commit 2808625

Please sign in to comment.