Skip to content

Commit

Permalink
refactor: Move hive partitioning/multi-file handling outside of reade…
Browse files Browse the repository at this point in the history
…rs (#20203)
  • Loading branch information
coastalwhite authored Dec 20, 2024
1 parent 5d2d550 commit b862887
Show file tree
Hide file tree
Showing 31 changed files with 1,001 additions and 56 deletions.
9 changes: 9 additions & 0 deletions crates/polars-expr/src/expressions/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,10 @@ impl PhysicalExpr for AggregationExpr {
}
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
}

fn is_scalar(&self) -> bool {
true
}
Expand Down Expand Up @@ -731,6 +735,11 @@ impl PhysicalExpr for AggQuantileExpr {
))
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
self.quantile.collect_live_columns(lv);
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.input.to_field(input_schema)
}
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-expr/src/expressions/alias.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ impl PhysicalExpr for AliasExpr {
Ok(ac)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.physical_expr.collect_live_columns(lv);
lv.insert(self.name.clone());
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
Ok(Field::new(
self.name.clone(),
Expand Down
45 changes: 45 additions & 0 deletions crates/polars-expr/src/expressions/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::expressions::{
AggState, AggregationContext, PartitionedAggregation, PhysicalExpr, UpdateGroups,
};

#[derive(Clone)]
pub struct ApplyExpr {
inputs: Vec<Arc<dyn PhysicalExpr>>,
function: SpecialEq<Arc<dyn ColumnsUdf>>,
Expand Down Expand Up @@ -426,6 +427,50 @@ impl PhysicalExpr for ApplyExpr {
}
}
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
for i in &self.inputs {
i.collect_live_columns(lv);
}
}
fn replace_elementwise_const_columns(
&self,
const_columns: &PlHashMap<PlSmallStr, AnyValue<'static>>,
) -> Option<Arc<dyn PhysicalExpr>> {
if self.collect_groups == ApplyOptions::ElementWise {
let mut new_inputs = Vec::new();
for i in 0..self.inputs.len() {
match self.inputs[i].replace_elementwise_const_columns(const_columns) {
None => continue,
Some(new) => {
new_inputs.reserve(self.inputs.len());
new_inputs.extend(self.inputs[..i].iter().cloned());
new_inputs.push(new);
break;
},
}
}

// Only copy inputs if it is actually needed
if new_inputs.is_empty() {
return None;
}

new_inputs.extend(self.inputs[new_inputs.len()..].iter().map(|i| {
match i.replace_elementwise_const_columns(const_columns) {
None => i.clone(),
Some(new) => new,
}
}));

let mut slf = self.clone();
slf.inputs = new_inputs;
return Some(Arc::new(slf));
}

None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.expr.to_field(input_schema, Context::Default)
}
Expand Down
26 changes: 26 additions & 0 deletions crates/polars-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::expressions::{
AggState, AggregationContext, PartitionedAggregation, PhysicalExpr, UpdateGroups,
};

#[derive(Clone)]
pub struct BinaryExpr {
left: Arc<dyn PhysicalExpr>,
op: Operator,
Expand Down Expand Up @@ -265,6 +266,31 @@ impl PhysicalExpr for BinaryExpr {
}
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.left.collect_live_columns(lv);
self.right.collect_live_columns(lv);
}
fn replace_elementwise_const_columns(
&self,
const_columns: &PlHashMap<PlSmallStr, AnyValue<'static>>,
) -> Option<Arc<dyn PhysicalExpr>> {
let rcc_left = self.left.replace_elementwise_const_columns(const_columns);
let rcc_right = self.right.replace_elementwise_const_columns(const_columns);

if rcc_left.is_some() || rcc_right.is_some() {
let mut slf = self.clone();
if let Some(left) = rcc_left {
slf.left = left;
}
if let Some(right) = rcc_right {
slf.right = right;
}
return Some(Arc::new(slf));
}

None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.expr.to_field(input_schema, Context::Default)
}
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-expr/src/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ impl PhysicalExpr for CastExpr {
Ok(ac)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.input.to_field(input_schema).map(|mut fld| {
fld.coerce(self.dtype.clone());
Expand Down
17 changes: 17 additions & 0 deletions crates/polars-expr/src/expressions/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ impl PhysicalExpr for ColumnExpr {
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
let out = match self.schema.get_full(&self.name) {
Some((idx, _, _)) => {
Expand Down Expand Up @@ -178,6 +179,22 @@ impl PhysicalExpr for ColumnExpr {
Some(self)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
lv.insert(self.name.clone());
}
fn replace_elementwise_const_columns(
&self,
const_columns: &PlHashMap<PlSmallStr, AnyValue<'static>>,
) -> Option<Arc<dyn PhysicalExpr>> {
if let Some(av) = const_columns.get(&self.name) {
let lv = LiteralValue::from(av.clone());
let le = LiteralExpr::new(lv, self.expr.clone());
return Some(Arc::new(le));
}

None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
input_schema.get_field(&self.name).ok_or_else(|| {
polars_err!(
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-expr/src/expressions/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ impl PhysicalExpr for CountExpr {
Ok(AggregationContext::new(c, Cow::Borrowed(groups), true))
}

fn collect_live_columns(&self, _lv: &mut PlIndexSet<PlSmallStr>) {}

fn to_field(&self, _input_schema: &Schema) -> PolarsResult<Field> {
Ok(Field::new(PlSmallStr::from_static(LEN), IDX_DTYPE))
}
Expand Down
6 changes: 6 additions & 0 deletions crates/polars-expr/src/expressions/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ impl PhysicalExpr for FilterExpr {
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
let s_f = || self.input.evaluate(df, state);
let predicate_f = || self.by.evaluate(df, state);
Expand Down Expand Up @@ -145,6 +146,11 @@ impl PhysicalExpr for FilterExpr {
}
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
self.by.collect_live_columns(lv);
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.input.to_field(input_schema)
}
Expand Down
6 changes: 6 additions & 0 deletions crates/polars-expr/src/expressions/gather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ impl PhysicalExpr for GatherExpr {
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
let series = self.phys_expr.evaluate(df, state)?;
self.finish(df, state, series)
Expand Down Expand Up @@ -88,6 +89,11 @@ impl PhysicalExpr for GatherExpr {
Ok(ac)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.phys_expr.collect_live_columns(lv);
self.idx.collect_live_columns(lv);
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.phys_expr.to_field(input_schema)
}
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-expr/src/expressions/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl PhysicalExpr for LiteralExpr {
fn as_expression(&self) -> Option<&Expr> {
Some(&self.1)
}

fn evaluate(&self, _df: &DataFrame, _state: &ExecutionState) -> PolarsResult<Column> {
self.as_column()
}
Expand Down Expand Up @@ -148,6 +149,8 @@ impl PhysicalExpr for LiteralExpr {
Some(self)
}

fn collect_live_columns(&self, _lv: &mut PlIndexSet<PlSmallStr>) {}

fn to_field(&self, _input_schema: &Schema) -> PolarsResult<Field> {
let dtype = self.0.get_datatype();
Ok(Field::new(PlSmallStr::from_static("literal"), dtype))
Expand Down
20 changes: 18 additions & 2 deletions crates/polars-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,22 @@ pub trait PhysicalExpr: Send + Sync {
None
}

/// Get the variables that are used in the expression i.e. live variables.
/// This can contain duplicates.
fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>);

/// Replace columns that are known to be a constant value with their const value.
///
/// This should not replace values that are calculated non-elementwise e.g. col.max(),
/// col.std(), etc.
fn replace_elementwise_const_columns(
&self,
const_columns: &PlHashMap<PlSmallStr, AnyValue<'static>>,
) -> Option<Arc<dyn PhysicalExpr>> {
_ = const_columns;
None
}

/// Can take &dyn Statistics and determine of a file should be
/// read -> `true`
/// or not -> `false`
Expand Down Expand Up @@ -630,8 +646,8 @@ impl PhysicalIoExpr for PhysicalIoHelper {
.map(|c| c.take_materialized_series())
}

fn live_variables(&self) -> Option<Vec<PlSmallStr>> {
Some(expr_to_leaf_column_names(self.expr.as_expression()?))
fn collect_live_columns(&self, live_columns: &mut PlIndexSet<PlSmallStr>) {
self.expr.collect_live_columns(live_columns);
}

#[cfg(feature = "parquet")]
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-expr/src/expressions/rolling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ impl PhysicalExpr for RollingExpr {
polars_bail!(InvalidOperation: "rolling expression not allowed in aggregation");
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.phys_function.collect_live_columns(lv);
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.function.to_field(input_schema, Context::Default)
}
Expand Down
6 changes: 6 additions & 0 deletions crates/polars-expr/src/expressions/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ impl PhysicalExpr for SliceExpr {
Ok(ac)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
self.offset.collect_live_columns(lv);
self.length.collect_live_columns(lv);
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.input.to_field(input_schema)
}
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-expr/src/expressions/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl PhysicalExpr for SortExpr {
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
let series = self.physical_expr.evaluate(df, state)?;
series.sort_with(self.options)
Expand Down Expand Up @@ -104,6 +105,10 @@ impl PhysicalExpr for SortExpr {
Ok(ac)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.physical_expr.collect_live_columns(lv);
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.physical_expr.to_field(input_schema)
}
Expand Down
8 changes: 8 additions & 0 deletions crates/polars-expr/src/expressions/sortby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ impl PhysicalExpr for SortByExpr {
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
let series_f = || self.input.evaluate(df, state);
if self.by.is_empty() {
Expand Down Expand Up @@ -374,6 +375,13 @@ impl PhysicalExpr for SortByExpr {
Ok(ac_in)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
for i in &self.by {
i.collect_live_columns(lv);
}
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.input.to_field(input_schema)
}
Expand Down
6 changes: 6 additions & 0 deletions crates/polars-expr/src/expressions/ternary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,12 @@ impl PhysicalExpr for TernaryExpr {
Some(self)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.predicate.collect_live_columns(lv);
self.truthy.collect_live_columns(lv);
self.falsy.collect_live_columns(lv);
}

fn is_scalar(&self) -> bool {
self.returns_scalar
}
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-expr/src/expressions/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,16 @@ impl PhysicalExpr for WindowExpr {
false
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
for i in &self.group_by {
i.collect_live_columns(lv);
}
if let Some((i, _)) = &self.order_by {
i.collect_live_columns(lv);
}
self.phys_function.collect_live_columns(lv);
}

#[allow(clippy::ptr_arg)]
fn evaluate_on_groups<'a>(
&self,
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ or set 'streaming'",

pub use options::{ParallelStrategy, ParquetOptions};
use polars_error::{ErrString, PolarsError};
pub use polars_parquet::arrow::read::infer_schema;
pub use polars_parquet::read::FileMetadata;
pub use read_impl::{create_sorting_map, try_set_sorted_flag};
#[cfg(feature = "cloud")]
pub use reader::ParquetAsyncReader;
Expand Down
Loading

0 comments on commit b862887

Please sign in to comment.