Skip to content

Commit

Permalink
move wildcard expansions to the analyzer
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal committed Jul 27, 2024
1 parent 93d7b6f commit 5d70b3d
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 32 deletions.
18 changes: 17 additions & 1 deletion datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use arrow::compute::kernels::cast_utils::{
parse_interval_day_time, parse_interval_month_day_nano, parse_interval_year_month,
};
use arrow::datatypes::{DataType, Field};
use datafusion_common::{plan_err, Column, Result, ScalarValue};
use datafusion_common::{plan_err, Column, Result, ScalarValue, TableReference};
use sqlparser::ast::NullTreatment;
use std::any::Any;
use std::fmt::Debug;
Expand Down Expand Up @@ -122,6 +122,22 @@ pub fn wildcard() -> Expr {
Expr::Wildcard { qualifier: None }
}

/// Create an 't.*' [`Expr::Wildcard`] expression that matches all columns from a specific table
///
/// # Example
///
/// ```rust
/// use datafusion_common::TableReference;
/// use datafusion_expr::{qualified_wildcard};
/// let p = qualified_wildcard(TableReference::bare("t"));
/// assert_eq!(p.to_string(), "t.*")
/// ```
pub fn qualified_wildcard(qualifier: TableReference) -> Expr {
Expr::Wildcard {
qualifier: Some(qualifier),
}
}

/// Return a new expression `left <op> right`
pub fn binary_expr(left: Expr, op: Operator, right: Expr) -> Expr {
Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
Expand Down
18 changes: 3 additions & 15 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ use crate::logical_plan::{
};
use crate::type_coercion::binary::{comparison_coercion, values_coercion};
use crate::utils::{
can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
group_window_expr_by_sort_keys,
can_hash, columnize_expr, compare_sort_expr, expand_wildcard, expr_to_columns,
find_valid_equijoin_key_pair, group_window_expr_by_sort_keys,
};
use crate::{
and, binary_expr, logical_plan::tree_node::unwrap_arc, DmlStatement, Expr,
Expand Down Expand Up @@ -1440,22 +1439,11 @@ pub fn project(
plan: LogicalPlan,
expr: impl IntoIterator<Item = impl Into<Expr>>,
) -> Result<LogicalPlan> {
// TODO: move it into analyzer
let input_schema = plan.schema();
let mut projected_expr = vec![];
for e in expr {
let e = e.into();
match e {
Expr::Wildcard { qualifier: None } => {
projected_expr.extend(expand_wildcard(input_schema, &plan, None)?)
}
Expr::Wildcard {
qualifier: Some(qualifier),
} => projected_expr.extend(expand_qualified_wildcard(
&qualifier,
input_schema,
None,
)?),
Expr::Wildcard { .. } => projected_expr.push(e),
_ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?),
}
}
Expand Down
189 changes: 189 additions & 0 deletions datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::AnalyzerRule;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult};
use datafusion_common::{plan_err, Result};
use datafusion_expr::utils::{expand_qualified_wildcard, expand_wildcard};
use datafusion_expr::{Expr, LogicalPlan, Projection};
use std::collections::HashMap;
use std::sync::Arc;

#[derive(Default)]
pub struct ExpandWildcardRule {}

impl ExpandWildcardRule {
pub fn new() -> Self {
Self {}
}
}

impl AnalyzerRule for ExpandWildcardRule {
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
// Because the wildcard expansion is based on the schema of the input plan,
// using `transform_up_with_subqueries` here.
plan.transform_up_with_subqueries(analyzer_internal).data()
}

fn name(&self) -> &str {
"expand_wildcard_rule"
}
}

fn analyzer_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Projection(Projection {
expr,
input,
schema,
..
}) => {
let mut projected_expr = vec![];
for e in expr {
match e {
Expr::Wildcard { qualifier } => {
if let Some(qualifier) = qualifier {
projected_expr.extend(expand_qualified_wildcard(
&qualifier,
input.schema(),
None,
)?);
} else {
projected_expr.extend(expand_wildcard(
input.schema(),
&input,
None,
)?);
}
}
_ => projected_expr.push(e),
}
}
validate_unique_names("Projections", projected_expr.iter())?;
Ok(Transformed::yes(
Projection::try_new(projected_expr, Arc::clone(&input))
.map(LogicalPlan::Projection)?,
))
}
_ => Ok(Transformed::no(plan)),
}
}

fn validate_unique_names<'a>(
node_name: &str,
expressions: impl IntoIterator<Item = &'a Expr>,
) -> Result<()> {
let mut unique_names = HashMap::new();

expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
let name = expr.display_name()?;
match unique_names.get(&name) {
None => {
unique_names.insert(name, (position, expr));
Ok(())
},
Some((existing_position, existing_expr)) => {
plan_err!("{node_name} require unique expression names \
but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
at position {position} have the same name. Consider aliasing (\"AS\") one of them."
)
}
}
})
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test::{assert_analyzed_plan_eq_display_indent, test_table_scan};
use datafusion_common::TableReference;
use datafusion_expr::{
col, in_subquery, qualified_wildcard, wildcard, LogicalPlanBuilder,
};

fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> {
assert_analyzed_plan_eq_display_indent(
Arc::new(ExpandWildcardRule::new()),
plan,
expected,
)
}

#[test]
fn test_expand_wildcard() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![wildcard()])?
.build()?;
let expected =
"Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
assert_plan_eq(plan, expected)
}

#[test]
fn test_expand_qualified_wildcard() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![qualified_wildcard(TableReference::bare("test"))])?
.build()?;
let expected =
"Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
assert_plan_eq(plan, expected)
}

#[test]
fn test_expand_qualified_wildcard_in_subquery() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![qualified_wildcard(TableReference::bare("test"))])?
.build()?;
let plan = LogicalPlanBuilder::from(plan)
.project(vec![wildcard()])?
.build()?;
let expected =
"Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\
\n Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
assert_plan_eq(plan, expected)
}

#[test]
fn test_expand_wildcard_in_subquery() -> Result<()> {
let projection_a = LogicalPlanBuilder::from(test_table_scan()?)
.project(vec![col("a")])?
.build()?;
let subquery = LogicalPlanBuilder::from(projection_a)
.project(vec![wildcard()])?
.build()?;
let plan = LogicalPlanBuilder::from(test_table_scan()?)
.filter(in_subquery(col("a"), Arc::new(subquery)))?
.project(vec![wildcard()])?
.build()?;
let expected = "\
Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\
\n Filter: test.a IN (<subquery>) [a:UInt32, b:UInt32, c:UInt32]\
\n Subquery: [a:UInt32]\
\n Projection: test.a [a:UInt32]\
\n Projection: test.a [a:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
assert_plan_eq(plan, expected)
}
}
3 changes: 3 additions & 0 deletions datafusion/optimizer/src/analyzer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datafusion_expr::expr_rewriter::FunctionRewrite;
use datafusion_expr::{Expr, LogicalPlan};

use crate::analyzer::count_wildcard_rule::CountWildcardRule;
use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule;
use crate::analyzer::inline_table_scan::InlineTableScan;
use crate::analyzer::subquery::check_subquery_expr;
use crate::analyzer::type_coercion::TypeCoercion;
Expand All @@ -38,6 +39,7 @@ use crate::utils::log_plan;
use self::function_rewrite::ApplyFunctionRewrites;

pub mod count_wildcard_rule;
pub mod expand_wildcard_rule;
pub mod function_rewrite;
pub mod inline_table_scan;
pub mod subquery;
Expand Down Expand Up @@ -91,6 +93,7 @@ impl Analyzer {
Arc::new(InlineTableScan::new()),
Arc::new(TypeCoercion::new()),
Arc::new(CountWildcardRule::new()),
Arc::new(ExpandWildcardRule::new()),
];
Self::with_rules(rules)
}
Expand Down
22 changes: 6 additions & 16 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ use datafusion_expr::utils::{
find_aggregate_exprs, find_window_exprs,
};
use datafusion_expr::{
Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
qualified_wildcard, wildcard, Aggregate, Expr, Filter, GroupingSet, LogicalPlan,
LogicalPlanBuilder, Partitioning,
};
use sqlparser::ast::{
Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr,
Expand Down Expand Up @@ -531,8 +532,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}

/// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions.
///
/// Wildcards are expanded into the concrete list of columns.
fn prepare_select_exprs(
&self,
plan: &LogicalPlan,
Expand Down Expand Up @@ -590,44 +589,35 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if empty_from {
return plan_err!("SELECT * with no tables specified is not valid");
}
// do not expand from outer schema
let expanded_exprs =
expand_wildcard(plan.schema().as_ref(), plan, Some(&options))?;
// If there is a REPLACE statement, replace that column with the given
// replace expression. Column name remains the same.
if let Some(replace) = options.opt_replace {
self.replace_columns(
plan,
empty_from,
planner_context,
expanded_exprs,
vec![wildcard()],
replace,
)
} else {
Ok(expanded_exprs)
Ok(vec![wildcard()])
}
}
SelectItem::QualifiedWildcard(object_name, options) => {
Self::check_wildcard_options(&options)?;
let qualifier = idents_to_table_reference(object_name.0, false)?;
// do not expand from outer schema
let expanded_exprs = expand_qualified_wildcard(
&qualifier,
plan.schema().as_ref(),
Some(&options),
)?;
// If there is a REPLACE statement, replace that column with the given
// replace expression. Column name remains the same.
if let Some(replace) = options.opt_replace {
self.replace_columns(
plan,
empty_from,
planner_context,
expanded_exprs,
vec![qualified_wildcard(qualifier)],
replace,
)
} else {
Ok(expanded_exprs)
Ok(vec![qualified_wildcard(qualifier)])
}
}
}
Expand Down

0 comments on commit 5d70b3d

Please sign in to comment.