-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: supoprt check equality of schema and arrow schema #10903
Conversation
src/common/src/catalog/schema.rs
Outdated
.iter() | ||
.zip_eq_fast(arrow_schema.fields()) | ||
.all(|(field, arrow_field)| { | ||
field |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only check that the type in schema is simalar and ignore the other field (such as field name). Do we need to check those?🤔 cc @liurenjie1024
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can provide a parameter in this function to users to choose whether check names or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For current use case, I think we should also check field names. Fields in table schema is identified by column name and there is no enforce order fields. We can add a version without comparing field names when necessary
Codecov Report
@@ Coverage Diff @@
## main #10903 +/- ##
=======================================
Coverage 69.91% 69.91%
=======================================
Files 1309 1309
Lines 223864 223916 +52
=======================================
+ Hits 156512 156556 +44
- Misses 67352 67360 +8
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 4 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
(This discusses arrow types in general, not necessarily iceberg.) FYI there is an existing mapping between our DataType and arrow as part of UDF: To sink an internal DataType into an arrow one, there are some additional questions to answer:
Note that following the same rationale above, ingesting from a Time32(micro or nano) source is totally ok. That is, the mapping here may not be called with unidirectional words like "equality" or "similar", but directional words like from/source and into/sink. Some other types to note:
|
Thanks! We can directly reuse it.
Good question. As you say, when we convert the internal data to arrow one, it's possible cause the data is not the same(e.g. rounding or overflow). I think we can let the user choose omit or insert the inconsistent data in this case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM
src/common/src/catalog/schema.rs
Outdated
/// Check if the schema can covert to the iceberg table schema. | ||
/// | ||
/// If `is_check_name` is enable, the name of the field will be checked. | ||
pub fn check_to_arrow_schema(&self, arrow_schema: &ArrowSchema, is_check_name: bool) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub fn check_to_arrow_schema(&self, arrow_schema: &ArrowSchema, is_check_name: bool) -> bool { | |
pub fn same_as_arrow_schema(&self, arrow_schema: &ArrowSchema, is_check_name: bool) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this check should not compare fields in order, but should compare field names and data type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree matching fields by name is more reasonable here. Just for context, the cast between 2 structs are by position rather than by name #9694
Linking to the types supported by iceberg here: It seems the following types corresponds well: Then some types only exists on our side:
Some types only exists in iceberg:
Similar but different type:
|
@xiangjinwu Good point. Data type matching is quite important for compatibility, and we do need careful testing and documentation for it. My suggestion is that in first version, we only support well matched types as listed above, and reject others. For complex types, we should resolve them one by one later. |
src/common/src/catalog/schema.rs
Outdated
schema_fields | ||
.get(arrow_field.name()) | ||
.and_then(|data_type| { | ||
if *data_type == &arrow_field.data_type().into() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This into
currently panics on unsupported types. We need to update it to a TryFrom
and handle those cases.
Also note that, among the well matched types listed above:
- RW and official java iceberg uses
Binary
but icelake usesLargeBinary
- RW and official java iceberg uses
Time64(Micro)
but icelake usesTime32(Micro)
, likely a bug - RW allows
Timestamp(Micro, Some(_))
, official java iceberg usesTimestamp(Micro, Some("UTC"))
, but icelake usesTimestamp(Micro, None)
After updating into
to try_into
this PR should be good to go. The 3 inconsistencies above may be resolved in icelake side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find that here should be data type convert to arrow type, so that we can convert the internal data to arrow data and sink into icelake:
*data_type.into() == &arrow_field.data_type()
For data_type convert into arrow_type, only DataType::Serial
is unsupported type, can we directly convert DataType::Serial
to a int64.🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @xiangjinwu that it's safer to use try_into
here so that we will not panic when we have some data types not completely compatible with arrow/iceberg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some nits:
- What about putting this function in
arrow.rs
so that we have all the code for arrow compatibility layer in one place? - Why not sth like
impl PartialEq<arrow_schema::Schema> for Schema {
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not sth like impl PartialEq<arrow_schema::Schema> for Schema {?
Because I'm not sure whether the sematic of this function is PartialEq. I think the sematic of this function is that the schema can be convert to the arrow schema. But it doesn't means that they are equal. e.g:
Some types only exists in iceberg:
fixed(L): (a) report error (b) bytea with truncation
For this type, maybe we can let bytea convert to it, but bytea and fixed is not equal.
src/common/src/catalog/schema.rs
Outdated
@@ -197,6 +198,32 @@ impl Schema { | |||
true | |||
} | |||
} | |||
|
|||
/// Check if the schema is same as the iceberg table schema. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Check if the schema is same as the iceberg table schema. | |
/// Check if the schema is same as an Arrow schema. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
To support iceberg sink(#10875), we need to check the equality of schema before create the writer.
Icelake support to convert to its schema to the arrow schema, so this pr implement a function to check the equality between schema and arrow schema.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Click here for Documentation
Types of user-facing changes
Please keep the types that apply to your changes, and remove the others.
Release note