Skip to content

Commit

Permalink
fix(query): union all panic in mysql client (#17095)
Browse files Browse the repository at this point in the history
fix union all panic
  • Loading branch information
zhyass authored Dec 23, 2024
1 parent 22f9ba2 commit 793e2ae
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 183 deletions.
98 changes: 57 additions & 41 deletions src/query/sql/src/executor/physical_plans/physical_union_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use databend_common_exception::Result;
use databend_common_expression::DataField;
use databend_common_expression::DataSchema;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::DataSchemaRefExt;
use databend_common_expression::RemoteExpr;
Expand All @@ -24,6 +27,7 @@ use crate::executor::PhysicalPlanBuilder;
use crate::optimizer::SExpr;
use crate::ColumnSet;
use crate::IndexType;
use crate::ScalarExpr;
use crate::TypeCheck;

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -52,32 +56,49 @@ impl PhysicalPlanBuilder {
&mut self,
s_expr: &SExpr,
union_all: &crate::plans::UnionAll,
required: ColumnSet,
mut required: ColumnSet,
stat_info: PlanStatsInfo,
) -> Result<PhysicalPlan> {
// 1. Prune unused Columns.
let left_required = union_all
let metadata = self.metadata.read().clone();
let lazy_columns = metadata.lazy_columns();
required.extend(lazy_columns.clone());
let indices: Vec<_> = union_all
.left_outputs
.iter()
.fold(required.clone(), |mut acc, v| {
acc.insert(v.0);
acc
});
let right_required = union_all.right_outputs.iter().fold(required, |mut acc, v| {
acc.insert(v.0);
acc
});
.enumerate()
.filter_map(|(index, v)| required.contains(&v.0).then_some(index))
.collect();
let (left_required, right_required) = if indices.is_empty() {
(
HashSet::from([union_all.left_outputs[0].0]),
HashSet::from([union_all.right_outputs[0].0]),
)
} else {
indices.iter().fold(
(
HashSet::with_capacity(indices.len()),
HashSet::with_capacity(indices.len()),
),
|(mut left, mut right), &index| {
left.insert(union_all.left_outputs[index].0);
right.insert(union_all.right_outputs[index].0);
(left, right)
},
)
};

// 2. Build physical plan.
let left_plan = self.build(s_expr.child(0)?, left_required).await?;
let right_plan = self.build(s_expr.child(1)?, right_required).await?;
let left_plan = self.build(s_expr.child(0)?, left_required.clone()).await?;
let right_plan = self.build(s_expr.child(1)?, right_required.clone()).await?;

let left_schema = left_plan.output_schema()?;
let right_schema = right_plan.output_schema()?;

let fields = union_all
.left_outputs
.iter()
.filter(|(index, _)| left_required.contains(index))
.map(|(index, expr)| {
if let Some(expr) = expr {
Ok(DataField::new(&index.to_string(), expr.data_type()?))
Expand All @@ -87,35 +108,9 @@ impl PhysicalPlanBuilder {
})
.collect::<Result<Vec<_>>>()?;

let left_outputs = union_all
.left_outputs
.iter()
.map(|(index, scalar_expr)| {
if let Some(scalar_expr) = scalar_expr {
let expr = scalar_expr
.type_check(left_schema.as_ref())?
.project_column_ref(|idx| left_schema.index_of(&idx.to_string()).unwrap());
Ok((*index, Some(expr.as_remote_expr())))
} else {
Ok((*index, None))
}
})
.collect::<Result<Vec<_>>>()?;

let right_outputs = union_all
.right_outputs
.iter()
.map(|(index, scalar_expr)| {
if let Some(scalar_expr) = scalar_expr {
let expr = scalar_expr
.type_check(right_schema.as_ref())?
.project_column_ref(|idx| right_schema.index_of(&idx.to_string()).unwrap());
Ok((*index, Some(expr.as_remote_expr())))
} else {
Ok((*index, None))
}
})
.collect::<Result<Vec<_>>>()?;
let left_outputs = process_outputs(&union_all.left_outputs, &left_required, &left_schema)?;
let right_outputs =
process_outputs(&union_all.right_outputs, &right_required, &right_schema)?;

Ok(PhysicalPlan::UnionAll(UnionAll {
plan_id: 0,
Expand All @@ -130,3 +125,24 @@ impl PhysicalPlanBuilder {
}))
}
}

fn process_outputs(
outputs: &[(IndexType, Option<ScalarExpr>)],
required: &ColumnSet,
schema: &DataSchema,
) -> Result<Vec<(IndexType, Option<RemoteExpr>)>> {
outputs
.iter()
.filter(|(index, _)| required.contains(index))
.map(|(index, scalar_expr)| {
if let Some(scalar_expr) = scalar_expr {
let expr = scalar_expr
.type_check(schema)?
.project_column_ref(|idx| schema.index_of(&idx.to_string()).unwrap());
Ok((*index, Some(expr.as_remote_expr())))
} else {
Ok((*index, None))
}
})
.collect()
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,12 @@ select a, b, change$action, change$is_update from s3 as _change_delete order by
4 4 INSERT 0
6 5 INSERT 0

# ISSUE 17085
query I
select a from s3 where a > 5
----
6

statement ok
create table t4(a int, b int)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,93 +61,81 @@ AggregateFinal
├── aggregate functions: []
├── estimated rows: 0.00
└── UnionAll
├── output columns: [t.id (#0), de (#6)]
├── output columns: [t.id (#0)]
├── estimated rows: 0.00
├── EvalScalar
│ ├── output columns: [t.id (#0), de (#6)]
│ ├── expressions: [if(CAST(is_not_null(sum(tb.de) (#5)) AS Boolean NULL), CAST(assume_not_null(sum(tb.de) (#5)) AS Int64 NULL), true, 0, NULL)]
├── AggregateFinal
│ ├── output columns: [t.id (#0)]
│ ├── group by: [id]
│ ├── aggregate functions: []
│ ├── estimated rows: 0.00
│ └── AggregateFinal
│ ├── output columns: [sum(tb.de) (#5), t.id (#0)]
│ └── AggregatePartial
│ ├── group by: [id]
│ ├── aggregate functions: [sum(sum(coalesce(t3.val, 0)))]
│ ├── aggregate functions: []
│ ├── estimated rows: 0.00
│ └── AggregatePartial
│ ├── group by: [id]
│ ├── aggregate functions: [sum(sum(coalesce(t3.val, 0)))]
│ └── HashJoin
│ ├── output columns: [t.id (#0)]
│ ├── join type: LEFT OUTER
│ ├── build keys: [tb.sid (#1)]
│ ├── probe keys: [t.id (#0)]
│ ├── filters: []
│ ├── estimated rows: 0.00
│ └── HashJoin
│ ├── output columns: [t.id (#0), sum(coalesce(t3.val, 0)) (#4)]
│ ├── join type: LEFT OUTER
│ ├── build keys: [tb.sid (#1)]
│ ├── probe keys: [t.id (#0)]
│ ├── filters: []
│ ├── AggregateFinal(Build)
│ │ ├── output columns: [t2.sid (#1)]
│ │ ├── group by: [sid]
│ │ ├── aggregate functions: []
│ │ ├── estimated rows: 0.00
│ │ └── AggregatePartial
│ │ ├── group by: [sid]
│ │ ├── aggregate functions: []
│ │ ├── estimated rows: 0.00
│ │ └── Filter
│ │ ├── output columns: [t2.sid (#1)]
│ │ ├── filters: [is_true(t3.sid (#1) = 1)]
│ │ ├── estimated rows: 0.00
│ │ └── TableScan
│ │ ├── table: default.default.t2
│ │ ├── output columns: [sid (#1)]
│ │ ├── read rows: 0
│ │ ├── read size: 0
│ │ ├── partitions total: 0
│ │ ├── partitions scanned: 0
│ │ ├── push downs: [filters: [is_true(t2.sid (#1) = 1)], limit: NONE]
│ │ └── estimated rows: 0.00
│ └── Filter(Probe)
│ ├── output columns: [t.id (#0)]
│ ├── filters: [is_true(t.id (#0) = 1)]
│ ├── estimated rows: 0.00
│ ├── AggregateFinal(Build)
│ │ ├── output columns: [sum(coalesce(t3.val, 0)) (#4), t2.sid (#1)]
│ │ ├── group by: [sid]
│ │ ├── aggregate functions: [sum(sum_arg_0)]
│ │ ├── estimated rows: 0.00
│ │ └── AggregatePartial
│ │ ├── group by: [sid]
│ │ ├── aggregate functions: [sum(sum_arg_0)]
│ │ ├── estimated rows: 0.00
│ │ └── EvalScalar
│ │ ├── output columns: [t2.sid (#1), sum_arg_0 (#3)]
│ │ ├── expressions: [if(CAST(is_not_null(t3.val (#2)) AS Boolean NULL), CAST(assume_not_null(t3.val (#2)) AS Int32 NULL), true, 0, NULL)]
│ │ ├── estimated rows: 0.00
│ │ └── Filter
│ │ ├── output columns: [t2.sid (#1), t2.val (#2)]
│ │ ├── filters: [is_true(t3.sid (#1) = 1)]
│ │ ├── estimated rows: 0.00
│ │ └── TableScan
│ │ ├── table: default.default.t2
│ │ ├── output columns: [sid (#1), val (#2)]
│ │ ├── read rows: 0
│ │ ├── read size: 0
│ │ ├── partitions total: 0
│ │ ├── partitions scanned: 0
│ │ ├── push downs: [filters: [is_true(t2.sid (#1) = 1)], limit: NONE]
│ │ └── estimated rows: 0.00
│ └── Filter(Probe)
│ ├── output columns: [t.id (#0)]
│ ├── filters: [is_true(t.id (#0) = 1)]
│ ├── estimated rows: 0.00
│ └── TableScan
│ ├── table: default.default.t1
│ ├── output columns: [id (#0)]
│ ├── read rows: 0
│ ├── read size: 0
│ ├── partitions total: 0
│ ├── partitions scanned: 0
│ ├── push downs: [filters: [is_true(t1.id (#0) = 1)], limit: NONE]
│ └── estimated rows: 0.00
└── EvalScalar
├── output columns: [t.id (#7), de (#8)]
├── expressions: [0]
│ └── TableScan
│ ├── table: default.default.t1
│ ├── output columns: [id (#0)]
│ ├── read rows: 0
│ ├── read size: 0
│ ├── partitions total: 0
│ ├── partitions scanned: 0
│ ├── push downs: [filters: [is_true(t1.id (#0) = 1)], limit: NONE]
│ └── estimated rows: 0.00
└── AggregateFinal
├── output columns: [t.id (#7)]
├── group by: [id]
├── aggregate functions: []
├── estimated rows: 0.00
└── AggregateFinal
├── output columns: [t.id (#7)]
└── AggregatePartial
├── group by: [id]
├── aggregate functions: []
├── estimated rows: 0.00
└── AggregatePartial
├── group by: [id]
├── aggregate functions: []
└── Filter
├── output columns: [t.id (#7)]
├── filters: [is_true(t.id (#7) = 1)]
├── estimated rows: 0.00
└── Filter
├── output columns: [t.id (#7)]
├── filters: [is_true(t.id (#7) = 1)]
├── estimated rows: 0.00
└── TableScan
├── table: default.default.t1
├── output columns: [id (#7)]
├── read rows: 0
├── read size: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [is_true(t1.id (#7) = 1)], limit: NONE]
└── estimated rows: 0.00
└── TableScan
├── table: default.default.t1
├── output columns: [id (#7)]
├── read rows: 0
├── read size: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [is_true(t1.id (#7) = 1)], limit: NONE]
└── estimated rows: 0.00

statement ok
drop table if exists t1;
Expand Down
36 changes: 36 additions & 0 deletions tests/sqllogictests/suites/mode/standalone/explain/union.test
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,42 @@ Limit
├── push downs: [filters: [], limit: 1]
└── estimated rows: 2.00

# ISSUE 17085
query T
explain select b from (select * from t1 where a>1 union all select * from t2 where b>2)
----
UnionAll
├── output columns: [t1.b (#1)]
├── estimated rows: 2.00
├── Filter
│ ├── output columns: [t1.b (#1)]
│ ├── filters: [is_true(t1.a (#0) > 1)]
│ ├── estimated rows: 1.00
│ └── TableScan
│ ├── table: default.default.t1
│ ├── output columns: [a (#0), b (#1)]
│ ├── read rows: 2
│ ├── read size: < 1 KiB
│ ├── partitions total: 1
│ ├── partitions scanned: 1
│ ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
│ ├── push downs: [filters: [is_true(t1.a (#0) > 1)], limit: NONE]
│ └── estimated rows: 2.00
└── Filter
├── output columns: [t2.b (#3)]
├── filters: [is_true(t2.b (#3) > 2)]
├── estimated rows: 1.00
└── TableScan
├── table: default.default.t2
├── output columns: [b (#3)]
├── read rows: 2
├── read size: < 1 KiB
├── partitions total: 1
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
├── push downs: [filters: [is_true(t2.b (#3) > 2)], limit: NONE]
└── estimated rows: 2.00

statement ok
drop table t1

Expand Down
Loading

0 comments on commit 793e2ae

Please sign in to comment.