Skip to content

Commit

Permalink
supoprt validate schema with arrow schema
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Jul 12, 2023
1 parent b34064f commit 08eac1d
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 2 deletions.
17 changes: 16 additions & 1 deletion src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@

use std::ops::Index;

use arrow_schema::Schema as ArrowSchema;
use itertools::Itertools;
use risingwave_pb::plan_common::{PbColumnDesc, PbField};

use super::ColumnDesc;
use crate::array::ArrayBuilderImpl;
use crate::types::{DataType, StructType};
use crate::util::iter_util::ZipEqFast;

/// The field in the schema of the executor's return data
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Field {
Expand Down Expand Up @@ -197,6 +197,21 @@ impl Schema {
true
}
}

/// Check if the schema is simaler to the iceberg table schema.
pub fn has_simaler_schema(&self, arrow_schema: &ArrowSchema) -> bool {
if self.fields.len() != arrow_schema.fields().len() {
return false;
}
self.fields
.iter()
.zip_eq_fast(arrow_schema.fields())
.all(|(field, arrow_field)| {
field
.data_type()
.similar_to_arrow_type(arrow_field.data_type())
})
}
}

impl Field {
Expand Down
90 changes: 89 additions & 1 deletion src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::fmt::Debug;
use std::hash::Hash;
use std::str::{FromStr, Utf8Error};

use arrow_schema::DataType as ArrowDataType;
use bytes::{Buf, BufMut, Bytes};
use chrono::{Datelike, Timelike};
use itertools::Itertools;
Expand All @@ -39,7 +40,7 @@ use crate::array::{
pub use crate::array::{ListRef, ListValue, StructRef, StructValue};
use crate::error::{BoxedError, ErrorCode, Result as RwResult};
use crate::estimate_size::EstimateSize;
use crate::util::iter_util::ZipEqDebug;
use crate::util::iter_util::{ZipEqDebug, ZipEqFast};

mod datetime;
mod decimal;
Expand Down Expand Up @@ -467,6 +468,93 @@ impl DataType {
}
d
}

/// Check if this type able to be casted to the arrow data type.
///
/// If this function return true, means that it possible to cast this type to arrow data type.
/// But it doesn't mean that the cast will success.
///
/// If this function return false, means that it can't cast this type to arrow data type or
/// can't support it yet.
pub fn similar_to_arrow_type(&self, other: &ArrowDataType) -> bool {
match other {
// Every type can be a null
ArrowDataType::Null => true,
ArrowDataType::Boolean => matches!(self, DataType::Boolean),
ArrowDataType::Int16 => matches!(self, DataType::Int16),
ArrowDataType::Int32 => matches!(self, DataType::Int32),
ArrowDataType::Int64 => matches!(self, DataType::Int64),
ArrowDataType::Float32 => matches!(self, DataType::Float32),
ArrowDataType::Float64 => matches!(self, DataType::Float64),
ArrowDataType::Timestamp(_, _) => {
matches!(self, DataType::Timestamp | DataType::Timestamptz)
}
ArrowDataType::Date32 => matches!(self, DataType::Date),
ArrowDataType::Time32(_) => matches!(self, DataType::Time),
ArrowDataType::Duration(_) => matches!(self, DataType::Interval),
ArrowDataType::Binary => matches!(self, DataType::Bytea),
ArrowDataType::FixedSizeBinary(_) => matches!(self, DataType::Bytea),
ArrowDataType::LargeBinary => matches!(self, DataType::Bytea),
ArrowDataType::Utf8 => matches!(self, DataType::Bytea),
ArrowDataType::LargeUtf8 => matches!(self, DataType::Bytea),
ArrowDataType::Decimal128(_, _) => matches!(self, DataType::Decimal),
ArrowDataType::List(field) => {
if let DataType::List(inner) = self {
inner.similar_to_arrow_type(field.data_type())
} else {
false
}
}
ArrowDataType::FixedSizeList(field, _) => {
if let DataType::List(inner) = self {
inner.similar_to_arrow_type(field.data_type())
} else {
false
}
}
ArrowDataType::LargeList(field) => {
if let DataType::List(inner) = self {
inner.similar_to_arrow_type(field.data_type())
} else {
false
}
}
ArrowDataType::Struct(other_fields) => {
if let DataType::Struct(inner) = self {
if other_fields.len() != inner.len() {
return false;
}
other_fields
.iter()
.map(|field| field.data_type())
.zip_eq_fast(inner.types())
.all(|(arrow_data_type, data_type)| {
data_type.similar_to_arrow_type(arrow_data_type)
})
} else {
false
}
}
// TODO: we can use other types to represent these types. e.g.
// `ArrowDataType::Int8 => matches!(self,
// DataType::Int16|DataType::Int32|DataType::Int64)`.
ArrowDataType::Int8 => false,
ArrowDataType::UInt8 => false,
ArrowDataType::UInt16 => false,
ArrowDataType::UInt32 => false,
ArrowDataType::UInt64 => false,
ArrowDataType::Float16 => false,
ArrowDataType::Date64 => false,
ArrowDataType::Time64(_) => false,
ArrowDataType::Decimal256(_, _) => false,
// NOTE: Currently we don't support these types.
ArrowDataType::Union(_, _, _) => false,
ArrowDataType::Dictionary(_, _) => false,
ArrowDataType::Map(_, _) => false,
ArrowDataType::RunEndEncoded(_, _) => false,
ArrowDataType::Interval(_) => false,
}
}
}

impl From<DataType> for PbDataType {
Expand Down

0 comments on commit 08eac1d

Please sign in to comment.