Skip to content

Commit

Permalink
build: Upgrade DF to 36.0.0 and arrow-rs 50.0.0 (apache#66)
Browse files Browse the repository at this point in the history
* Upgrade DF and arrow-rs

* fix benches

* fix merge

* fix merge

* Update core/src/execution/datafusion/expressions/scalar_funcs.rs

Co-authored-by: Liang-Chi Hsieh <[email protected]>

* Update core/src/execution/datafusion/expressions/scalar_funcs.rs

Co-authored-by: Liang-Chi Hsieh <[email protected]>

---------

Co-authored-by: o_voievodin <[email protected]>
Co-authored-by: Liang-Chi Hsieh <[email protected]>
  • Loading branch information
3 people authored Feb 28, 2024
1 parent 27f167b commit ee977c3
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 61 deletions.
93 changes: 65 additions & 28 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ itertools = "0.11.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.8" }
paste = "1.0.14"
datafusion-common = { version = "35.0.0" }
datafusion = { default-features = false, version = "35.0.0", features = ["unicode_expressions"] }
datafusion-physical-expr = { version = "35.0.0", default-features = false , features = ["unicode_expressions"] }
datafusion-common = { version = "36.0.0" }
datafusion = { default-features = false, version = "36.0.0", features = ["unicode_expressions"] }
datafusion-physical-expr = { version = "36.0.0", default-features = false , features = ["unicode_expressions"] }
unicode-segmentation = "^1.10.1"
once_cell = "1.18.0"
regex = "1.9.6"
Expand Down
2 changes: 2 additions & 0 deletions core/benches/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub fn create_int64_array(size: usize, null_density: f32, min: i64, max: i64) ->
.collect()
}

#[allow(dead_code)]
pub fn create_primitive_array<T>(size: usize, null_density: f32) -> PrimitiveArray<T>
where
T: ArrowPrimitiveType,
Expand All @@ -64,6 +65,7 @@ where

/// Creates a dictionary with random keys and values, with value type `T`.
/// Note here the keys are the dictionary indices.
#[allow(dead_code)]
pub fn create_dictionary_array<T>(
size: usize,
value_size: usize,
Expand Down
12 changes: 6 additions & 6 deletions core/src/execution/datafusion/expressions/avg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ use arrow_array::{
Array, ArrayRef, ArrowNumericType, Int64Array, PrimitiveArray,
};
use arrow_schema::{DataType, Field};
use datafusion::logical_expr::{type_coercion::aggregates::avg_return_type, Accumulator};
use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::{
expressions::format_state_name, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr,
use datafusion::logical_expr::{
type_coercion::aggregates::avg_return_type, Accumulator, EmitTo, GroupsAccumulator,
};
use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::{expressions::format_state_name, AggregateExpr, PhysicalExpr};
use std::{any::Any, sync::Arc};

use arrow_array::ArrowNativeTypeOp;
Expand Down Expand Up @@ -146,7 +146,7 @@ pub struct AvgAccumulator {
}

impl Accumulator for AvgAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::Float64(self.sum),
ScalarValue::from(self.count),
Expand Down Expand Up @@ -175,7 +175,7 @@ impl Accumulator for AvgAccumulator {
Ok(())
}

fn evaluate(&self) -> Result<ScalarValue> {
fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Float64(
self.sum.map(|f| f / self.count as f64),
))
Expand Down
10 changes: 4 additions & 6 deletions core/src/execution/datafusion/expressions/avg_decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ use arrow_array::{
Array, ArrayRef, Decimal128Array, Int64Array, PrimitiveArray,
};
use arrow_schema::{DataType, Field};
use datafusion::logical_expr::Accumulator;
use datafusion::logical_expr::{Accumulator, EmitTo, GroupsAccumulator};
use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::{
expressions::format_state_name, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr,
};
use datafusion_physical_expr::{expressions::format_state_name, AggregateExpr, PhysicalExpr};
use std::{any::Any, sync::Arc};

use arrow_array::ArrowNativeTypeOp;
Expand Down Expand Up @@ -214,7 +212,7 @@ impl AvgDecimalAccumulator {
}

impl Accumulator for AvgDecimalAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::Decimal128(self.sum, self.sum_precision, self.sum_scale),
ScalarValue::from(self.count),
Expand Down Expand Up @@ -266,7 +264,7 @@ impl Accumulator for AvgDecimalAccumulator {
Ok(())
}

fn evaluate(&self) -> Result<ScalarValue> {
fn evaluate(&mut self) -> Result<ScalarValue> {
fn make_decimal128(value: Option<i128>, precision: u8, scale: i8) -> ScalarValue {
ScalarValue::Decimal128(value, precision, scale)
}
Expand Down
20 changes: 13 additions & 7 deletions core/src/execution/datafusion/expressions/scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@ use datafusion::{
physical_plan::ColumnarValue,
};
use datafusion_common::{
cast::as_generic_string_array, internal_err, DataFusionError, Result as DataFusionResult,
ScalarValue,
cast::as_generic_string_array, exec_err, internal_err, DataFusionError,
Result as DataFusionResult, ScalarValue,
};
use datafusion_physical_expr::{
execution_props::ExecutionProps,
functions::{create_physical_fun, make_scalar_function},
math_expressions,
execution_props::ExecutionProps, functions::create_physical_fun, math_expressions,
};
use num::{BigInt, Signed, ToPrimitive};
use unicode_segmentation::UnicodeSegmentation;
Expand Down Expand Up @@ -366,7 +364,12 @@ fn spark_round(
let (precision, scale) = get_precision_scale(data_type);
make_decimal_array(array, precision, scale, &f)
}
_ => make_scalar_function(math_expressions::round)(args),
DataType::Float32 | DataType::Float64 => {
Ok(ColumnarValue::Array(math_expressions::round(&[
array.clone()
])?))
}
dt => exec_err!("Not supported datatype for ROUND: {dt}"),
},
ColumnarValue::Scalar(a) => match a {
ScalarValue::Int64(a) if *point < 0 => {
Expand All @@ -386,7 +389,10 @@ fn spark_round(
let (precision, scale) = get_precision_scale(data_type);
make_decimal_scalar(a, precision, scale, &f)
}
_ => make_scalar_function(math_expressions::round)(args),
ScalarValue::Float32(_) | ScalarValue::Float64(_) => Ok(ColumnarValue::Scalar(
ScalarValue::try_from_array(&math_expressions::round(&[a.to_array()?])?, 0)?,
)),
dt => exec_err!("Not supported datatype for ROUND: {dt}"),
},
}
}
Expand Down
Loading

0 comments on commit ee977c3

Please sign in to comment.