Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Upgrade to DataFusion 44.0.0 #1154

Draft
wants to merge 41 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
dfd68aa
move aggregate expressions to spark-expr crate
andygrove Dec 7, 2024
f638123
move more expressions
andygrove Dec 7, 2024
3257543
move benchmark
andygrove Dec 7, 2024
42ecfbb
normalize_nan
andygrove Dec 7, 2024
03bff06
bitwise not
andygrove Dec 7, 2024
7bc7ab8
comet scalar funcs
andygrove Dec 7, 2024
83dc657
update bench imports
andygrove Dec 7, 2024
601fed2
save
andygrove Dec 9, 2024
3b194db
save
andygrove Dec 9, 2024
bc19b74
save
andygrove Dec 9, 2024
4121532
upmerge
andygrove Dec 9, 2024
2731c7e
remove unused imports
andygrove Dec 9, 2024
911a0b3
clippy
andygrove Dec 9, 2024
a45c329
implement more hashers
andygrove Dec 9, 2024
227061d
implement Hash and PartialEq
andygrove Dec 10, 2024
bf0072d
implement Hash and PartialEq
andygrove Dec 10, 2024
97bbedc
implement Hash and PartialEq
andygrove Dec 10, 2024
97e2ac3
benches
andygrove Dec 10, 2024
6a73f62
fix ScalarUDFImpl.return_type failure
andygrove Dec 10, 2024
606403e
exclude test from miri
andygrove Dec 10, 2024
3ae7866
ignore correct test
andygrove Dec 10, 2024
fd5279a
ignore another test
andygrove Dec 10, 2024
4638fe3
remove miri checks
andygrove Dec 11, 2024
febc1f1
use return_type_from_exprs
andygrove Dec 11, 2024
93187d0
Revert "use return_type_from_exprs"
andygrove Dec 11, 2024
1ed7f3a
use DF main branch
andygrove Dec 11, 2024
b7dcbd3
hacky workaround for regression in ScalarUDFImpl.return_type
andygrove Dec 11, 2024
7bcfd18
fix repo url
andygrove Dec 11, 2024
e14a07a
pin to revision
andygrove Dec 11, 2024
9754d52
Merge remote-tracking branch 'apache/main' into upgrade-df-dec2024
andygrove Dec 13, 2024
9653541
bump to latest rev
andygrove Dec 13, 2024
df5d183
bump to latest DF rev
andygrove Dec 18, 2024
9a568ec
upmerge
andygrove Dec 19, 2024
d1f646a
bump DF to rev 9f530dd
andygrove Dec 19, 2024
2d08de5
add Cargo.lock
andygrove Dec 19, 2024
2cbf3ff
bump DF version
andygrove Dec 22, 2024
199f0d0
no default features
andygrove Dec 22, 2024
2ed53c8
Revert "remove miri checks"
andygrove Dec 22, 2024
a75226c
Update pin to DataFusion e99e02b9b9093ceb0c13a2dd32a2a89beba47930
alamb Dec 24, 2024
9d45dfa
update pin
alamb Dec 24, 2024
fa4c56c
Merge pull request #1 from alamb/alamb/update-to-upgrade-df-44
andygrove Dec 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
540 changes: 263 additions & 277 deletions native/Cargo.lock

Large diffs are not rendered by default.

29 changes: 15 additions & 14 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,21 @@ edition = "2021"
rust-version = "1.79"

[workspace.dependencies]
arrow = { version = "53.2.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { version = "53.2.0" }
arrow-buffer = { version = "53.2.0" }
arrow-data = { version = "53.2.0" }
arrow-schema = { version = "53.2.0" }
parquet = { version = "53.2.0", default-features = false, features = ["experimental"] }
datafusion = { version = "43.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions"] }
datafusion-common = { version = "43.0.0" }
datafusion-functions = { version = "43.0.0", features = ["crypto_expressions"] }
datafusion-functions-nested = { version = "43.0.0", default-features = false }
datafusion-expr = { version = "43.0.0", default-features = false }
datafusion-execution = { version = "43.0.0", default-features = false }
datafusion-physical-plan = { version = "43.0.0", default-features = false }
datafusion-physical-expr = { version = "43.0.0", default-features = false }
arrow = { version = "53.3.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { version = "53.3.0" }
arrow-buffer = { version = "53.3.0" }
arrow-data = { version = "53.3.0" }
arrow-schema = { version = "53.3.0" }
parquet = { version = "53.3.0", default-features = false, features = ["experimental"] }
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "3864b113c3e3fe85e18462d6374f8244c4f77b27", default-features = false, features = ["unicode_expressions", "crypto_expressions"] }
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "3864b113c3e3fe85e18462d6374f8244c4f77b27", default-features = false }
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "3864b113c3e3fe85e18462d6374f8244c4f77b27", default-features = false, features = ["crypto_expressions"] }
datafusion-functions-nested = { git = "https://github.com/apache/datafusion.git", rev = "3864b113c3e3fe85e18462d6374f8244c4f77b27", default-features = false }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "3864b113c3e3fe85e18462d6374f8244c4f77b27", default-features = false }
datafusion-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "3864b113c3e3fe85e18462d6374f8244c4f77b27", default-features = false }
datafusion-execution = { git = "https://github.com/apache/datafusion.git", rev = "3864b113c3e3fe85e18462d6374f8244c4f77b27", default-features = false }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "3864b113c3e3fe85e18462d6374f8244c4f77b27", default-features = false }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "3864b113c3e3fe85e18462d6374f8244c4f77b27", default-features = false }
datafusion-comet-spark-expr = { path = "spark-expr", version = "0.5.0" }
datafusion-comet-proto = { path = "proto", version = "0.5.0" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
Expand Down
46 changes: 19 additions & 27 deletions native/core/src/execution/expressions/bloom_filter_might_contain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,37 @@ use crate::{execution::util::spark_bloom_filter::SparkBloomFilter, parquet::data
use arrow::record_batch::RecordBatch;
use arrow_array::cast::as_primitive_array;
use arrow_schema::{DataType, Schema};
use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
use datafusion::physical_plan::ColumnarValue;
use datafusion_common::{internal_err, Result, ScalarValue};
use datafusion_physical_expr::PhysicalExpr;
use std::{
any::Any,
fmt::Display,
hash::{Hash, Hasher},
sync::Arc,
};
use std::hash::Hash;
use std::{any::Any, fmt::Display, sync::Arc};

/// A physical expression that checks if a value might be in a bloom filter. It corresponds to the
/// Spark's `BloomFilterMightContain` expression.
#[derive(Debug, Hash)]
#[derive(Debug, Eq)]
pub struct BloomFilterMightContain {
pub bloom_filter_expr: Arc<dyn PhysicalExpr>,
pub value_expr: Arc<dyn PhysicalExpr>,
bloom_filter: Option<SparkBloomFilter>,
}

impl Hash for BloomFilterMightContain {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.bloom_filter_expr.hash(state);
self.value_expr.hash(state);
self.bloom_filter.hash(state);
}
}

impl PartialEq for BloomFilterMightContain {
fn eq(&self, other: &Self) -> bool {
self.bloom_filter_expr.eq(&other.bloom_filter_expr)
&& self.value_expr.eq(&other.value_expr)
&& self.bloom_filter.eq(&other.bloom_filter)
}
}

impl Display for BloomFilterMightContain {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
Expand All @@ -49,18 +60,6 @@ impl Display for BloomFilterMightContain {
}
}

impl PartialEq<dyn Any> for BloomFilterMightContain {
fn eq(&self, _other: &dyn Any) -> bool {
down_cast_any_ref(_other)
.downcast_ref::<Self>()
.map(|other| {
self.bloom_filter_expr.eq(&other.bloom_filter_expr)
&& self.value_expr.eq(&other.value_expr)
})
.unwrap_or(false)
}
}

fn evaluate_bloom_filter(
bloom_filter_expr: &Arc<dyn PhysicalExpr>,
) -> Result<Option<SparkBloomFilter>> {
Expand Down Expand Up @@ -141,11 +140,4 @@ impl PhysicalExpr for BloomFilterMightContain {
Arc::clone(&children[1]),
)?))
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.bloom_filter_expr.hash(&mut s);
self.value_expr.hash(&mut s);
self.hash(&mut s);
}
}
23 changes: 2 additions & 21 deletions native/core/src/execution/expressions/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::{
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Schema, TimeUnit};
use datafusion::logical_expr::ColumnarValue;
use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
use datafusion_common::{internal_err, ScalarValue};
use datafusion_physical_expr::PhysicalExpr;
use jni::{
Expand All @@ -32,11 +31,11 @@ use jni::{
use std::{
any::Any,
fmt::{Display, Formatter},
hash::{Hash, Hasher},
hash::Hash,
sync::Arc,
};

#[derive(Debug, Hash)]
#[derive(Debug, Hash, PartialEq, Eq)]
pub struct Subquery {
/// The ID of the execution context that owns this subquery. We use this ID to retrieve the
/// subquery result.
Expand All @@ -63,19 +62,6 @@ impl Display for Subquery {
}
}

impl PartialEq<dyn Any> for Subquery {
fn eq(&self, other: &dyn Any) -> bool {
down_cast_any_ref(other)
.downcast_ref::<Self>()
.map(|x| {
self.id.eq(&x.id)
&& self.data_type.eq(&x.data_type)
&& self.exec_context_id.eq(&x.exec_context_id)
})
.unwrap_or(false)
}
}

impl PhysicalExpr for Subquery {
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -209,9 +195,4 @@ impl PhysicalExpr for Subquery {
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.hash(&mut s)
}
}
9 changes: 4 additions & 5 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
use arrow::datatypes::DataType as ArrowDataType;
use arrow_array::RecordBatch;
use datafusion::{
execution::{
disk_manager::DiskManagerConfig,
runtime_env::{RuntimeConfig, RuntimeEnv},
},
execution::{disk_manager::DiskManagerConfig, runtime_env::RuntimeEnv},
physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream},
prelude::{SessionConfig, SessionContext},
};
Expand Down Expand Up @@ -52,6 +49,7 @@ use crate::{
};
use datafusion_comet_proto::spark_operator::Operator;
use datafusion_common::ScalarValue;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use futures::stream::StreamExt;
use jni::{
objects::GlobalRef,
Expand Down Expand Up @@ -188,7 +186,7 @@ fn prepare_datafusion_session_context(
memory_fraction: f64,
comet_task_memory_manager: Arc<GlobalRef>,
) -> CometResult<SessionContext> {
let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);
let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager(DiskManagerConfig::NewOs);

// Check if we are using unified memory manager integrated with Spark.
if use_unified_memory_manager {
Expand Down Expand Up @@ -216,6 +214,7 @@ fn prepare_datafusion_session_context(
&ScalarValue::Float64(Some(1.1)),
);

#[allow(deprecated)]
let runtime = RuntimeEnv::try_new(rt_config)?;

let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime));
Expand Down
4 changes: 3 additions & 1 deletion native/core/src/execution/operators/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use arrow_array::{
use arrow_data::transform::MutableArrayData;
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, SchemaRef};

use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*};
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};
Expand Down Expand Up @@ -78,7 +79,8 @@ impl CopyExec {
let cache = PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&schema)),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
EmissionType::Final,
Boundedness::Bounded,
);

Self {
Expand Down
6 changes: 4 additions & 2 deletions native/core/src/execution/operators/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

use arrow_array::{RecordBatch, RecordBatchOptions};
use arrow_schema::SchemaRef;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::{
execution::TaskContext,
physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
RecordBatchStream, SendableRecordBatchStream,
},
};
Expand Down Expand Up @@ -54,7 +55,8 @@ impl ExpandExec {
let cache = PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&schema)),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
EmissionType::Final,
Boundedness::Bounded,
);

Self {
Expand Down
3 changes: 2 additions & 1 deletion native/core/src/execution/operators/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ impl FilterExec {
Ok(PlanProperties::new(
eq_properties,
input.output_partitioning().clone(), // Output Partitioning
input.execution_mode(), // Execution Mode
input.pipeline_behavior(),
input.boundedness(),
))
}
}
Expand Down
4 changes: 3 additions & 1 deletion native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use arrow_data::ffi::FFI_ArrowArray;
use arrow_data::ArrayData;
use arrow_schema::ffi::FFI_ArrowSchema;
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time,
};
Expand Down Expand Up @@ -122,7 +123,8 @@ impl ScanExec {
// The partitioning is not important because we are not using DataFusion's
// query planner or optimizer
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
EmissionType::Final,
Boundedness::Bounded,
);

Ok(Self {
Expand Down
Loading
Loading