Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: supoprt check equality of schema and arrow schema #10903

Merged
merged 1 commit into from
Jul 17, 2023
Merged

Conversation

ZENOTME
Copy link
Contributor

@ZENOTME ZENOTME commented Jul 12, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

To support iceberg sink(#10875), we need to check the equality of schema before create the writer.
Icelake support to convert to its schema to the arrow schema, so this pr implement a function to check the equality between schema and arrow schema.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)

Documentation

Click here for Documentation

Types of user-facing changes

Please keep the types that apply to your changes, and remove the others.

  • Installation and deployment
  • Connector (sources & sinks)
  • SQL commands, functions, and operators
  • RisingWave cluster configuration changes
  • Other (please specify in the release note below)

Release note

.iter()
.zip_eq_fast(arrow_schema.fields())
.all(|(field, arrow_field)| {
field
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only check that the type in schema is simalar and ignore the other field (such as field name). Do we need to check those?🤔 cc @liurenjie1024

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can provide a parameter in this function to users to choose whether check names or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For current use case, I think we should also check field names. Fields in table schema is identified by column name and there is no enforce order fields. We can add a version without comparing field names when necessary

@codecov
Copy link

codecov bot commented Jul 12, 2023

Codecov Report

Merging #10903 (24b0d08) into main (4cdf329) will increase coverage by 0.00%.
The diff coverage is 96.15%.

@@           Coverage Diff           @@
##             main   #10903   +/-   ##
=======================================
  Coverage   69.91%   69.91%           
=======================================
  Files        1309     1309           
  Lines      223864   223916   +52     
=======================================
+ Hits       156512   156556   +44     
- Misses      67352    67360    +8     
Flag Coverage Δ
rust 69.91% <96.15%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/common/src/catalog/schema.rs 83.20% <96.15%> (+3.40%) ⬆️

... and 4 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@xiangjinwu
Copy link
Contributor

xiangjinwu commented Jul 12, 2023

(This discusses arrow types in general, not necessarily iceberg.)

FYI there is an existing mapping between our DataType and arrow as part of UDF:
https://github.com/risingwavelabs/risingwave/blob/main/src/common/src/array/arrow.rs

To sink an internal DataType into an arrow one, there are some additional questions to answer:

  • Is it okay to sink a Time (always microsecond as i64) into Time64(nano)?
    • Maybe ok, by * 1000 without overflow
  • Is it okay to sink a Time (always microsecond as i64) into Time64(milli or sec)?
    • Maybe ok, by rounding
  • Is it okay to sink a Time (always microsecond as i64) into Time32(milli or sec)?
    • Maybe ok, by rounding without overflow
  • Is it okay to sink a Time (always microsecond as i64) into Time32(micro or nano)?
    • Can overflow

Note that following the same rationale above, ingesting from a Time32(micro or nano) source is totally ok. That is, the mapping here may not be called with unidirectional words like "equality" or "similar", but directional words like from/source and into/sink.

Some other types to note:

  • Timestamp is equivalent to Timestamp(micro, None)
  • Timestamptz is equivalent to Timestamp(micro, Some("UTC")), or we can also support timezone conversion if non-UTC is given
  • Interval is closest to Interval(MonthDayNano) rather than Duration
  • Bytea to FixedSizeBinary may lead to truncation (also FixedSizeList)
  • Varchar can be mapped to Utf8
  • current dynamic scale Decimal need a static scale Decimal256(56, 28) without loss of precision. The UDF implementation is just experimental
  • UDF uses a trick to handle jsonb and rw_int256:
    • varchar is Utf8 and jsonb is LargeUtf8
    • decimal is Decimal128 and rw_int256 is Decimal256

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jul 12, 2023

FYI there is an existing mapping between our DataType and arrow as part of UDF:
https://github.com/risingwavelabs/risingwave/blob/main/src/common/src/array/arrow.rs

Thanks! We can directly reuse it.

To sink an internal DataType into an arrow one, there are some additional questions to answer:
Is it okay to sink a Time (always microsecond as i64) into Time64(nano)?
Maybe ok, by * 1000 without overflow
Is it okay to sink a Time (always microsecond as i64) into Time64(milli or sec)?
Maybe ok, by rounding
Is it okay to sink a Time (always microsecond as i64) into Time32(milli or sec)?
Maybe ok, by rounding without overflow
Is it okay to sink a Time (always microsecond as i64) into Time32(micro or nano)?
Can overflow

Good question. As you say, when we convert the internal data to arrow one, it's possible cause the data is not the same(e.g. rounding or overflow). I think we can let the user choose omit or insert the inconsistent data in this case.

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM

/// Check if the schema can covert to the iceberg table schema.
///
/// If `is_check_name` is enable, the name of the field will be checked.
pub fn check_to_arrow_schema(&self, arrow_schema: &ArrowSchema, is_check_name: bool) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn check_to_arrow_schema(&self, arrow_schema: &ArrowSchema, is_check_name: bool) -> bool {
pub fn same_as_arrow_schema(&self, arrow_schema: &ArrowSchema, is_check_name: bool) -> bool {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this check should not compare fields in order, but should compare field names and data type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree matching fields by name is more reasonable here. Just for context, the cast between 2 structs are by position rather than by name #9694

@xiangjinwu
Copy link
Contributor

xiangjinwu commented Jul 12, 2023

Linking to the types supported by iceberg here:
https://iceberg.apache.org/docs/latest/schemas/
https://iceberg.apache.org/spec/#schemas-and-data-types
icelake-arrow-mapping
official-arrow-mapping

It seems the following types corresponds well: boolean, int32, int64, float32, float64, date, time, timestamp, timestamptz, string, binary, list, struct

Then some types only exists on our side:

  • int16 may be stored in int32 (or even int64? do we intend to support or reject sinking from int32 into int64?)
  • interval seems not presentable. Options: (a) report error (b) as string (c) as fixed(16) or binary
  • jsonb maybe as string

Some types only exists in iceberg:

  • fixed(L): (a) report error (b) bytea with truncation
  • map<K, V>: report error because hstore is not supported

Similar but different type: decimal
Maybe allow sinking into any decimal(p, s), with rounding when not enough scale, and runtime overflow error / skip when not enough precision.
Example: we may have 1e28 and 1e-28 in the same column, which is impossible in iceberg

  • When the iceberg column is decimal(38, <=9), it can hold 1e28 but need to round 1e-28 to 0
  • When the iceberg column is decimal(38, >=28), it can hold 1e-28 losslessly but 1e28 would be overflow error
  • For scale 10..=27, it would overflow for 1e28 and round 1e-28 to 0

@liurenjie1024
Copy link
Contributor

@xiangjinwu Good point. Data type matching is quite important for compatibility, and we do need careful testing and documentation for it. My suggestion is that in first version, we only support well matched types as listed above, and reject others. For complex types, we should resolve them one by one later.

schema_fields
.get(arrow_field.name())
.and_then(|data_type| {
if *data_type == &arrow_field.data_type().into() {
Copy link
Contributor

@xiangjinwu xiangjinwu Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This into currently panics on unsupported types. We need to update it to a TryFrom and handle those cases.

Also note that, among the well matched types listed above:

  • RW and official java iceberg uses Binary but icelake uses LargeBinary
  • RW and official java iceberg uses Time64(Micro) but icelake uses Time32(Micro), likely a bug
  • RW allows Timestamp(Micro, Some(_)), official java iceberg uses Timestamp(Micro, Some("UTC")), but icelake uses Timestamp(Micro, None)

After updating into to try_into this PR should be good to go. The 3 inconsistencies above may be resolved in icelake side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find that here should be data type convert to arrow type, so that we can convert the internal data to arrow data and sink into icelake:
*data_type.into() == &arrow_field.data_type()

For data_type convert into arrow_type, only DataType::Serial is unsupported type, can we directly convert DataType::Serial to a int64.🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @xiangjinwu that it's safer to use try_into here so that we will not panic when we have some data types not completely compatible with arrow/iceberg.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some nits:

  • What about putting this function in arrow.rs so that we have all the code for arrow compatibility layer in one place?
  • Why not sth like impl PartialEq<arrow_schema::Schema> for Schema {?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not sth like impl PartialEq<arrow_schema::Schema> for Schema {?

Because I'm not sure whether the sematic of this function is PartialEq. I think the sematic of this function is that the schema can be convert to the arrow schema. But it doesn't means that they are equal. e.g:

Some types only exists in iceberg:
fixed(L): (a) report error (b) bytea with truncation

For this type, maybe we can let bytea convert to it, but bytea and fixed is not equal.

@@ -197,6 +198,32 @@ impl Schema {
true
}
}

/// Check if the schema is same as the iceberg table schema.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Check if the schema is same as the iceberg table schema.
/// Check if the schema is same as an Arrow schema.

@ZENOTME ZENOTME requested a review from liurenjie1024 July 17, 2023 04:45
@github-actions github-actions bot added breaking-change user-facing-changes Contains changes that are visible to users labels Jul 17, 2023
@ZENOTME ZENOTME removed user-facing-changes Contains changes that are visible to users breaking-change labels Jul 17, 2023
Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@liurenjie1024 liurenjie1024 added this pull request to the merge queue Jul 17, 2023
Merged via the queue into main with commit 9587646 Jul 17, 2023
@liurenjie1024 liurenjie1024 deleted the zj/schema branch July 17, 2023 06:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants