Skip to content

Commit

Permalink
feat(agg): introduce an internal agg kind to avoid minput state table…
Browse files Browse the repository at this point in the history
… after #12349 (#12384)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Oct 24, 2023
1 parent 02282b9 commit e64543f
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 200 deletions.
1 change: 1 addition & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ message AggCall {
MODE = 24;
LAST_VALUE = 25;
GROUPING = 26;
INTERNAL_LAST_SEEN_VALUE = 27;
}
Type type = 1;
repeated InputRef args = 2;
Expand Down
14 changes: 12 additions & 2 deletions src/expr/core/src/aggregate/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ pub enum AggKind {
PercentileDisc,
Mode,
Grouping,

/// Return last seen one of the input values.
InternalLastSeenValue,
}

impl AggKind {
Expand Down Expand Up @@ -264,6 +267,7 @@ impl AggKind {
PbType::PercentileDisc => Ok(AggKind::PercentileDisc),
PbType::Mode => Ok(AggKind::Mode),
PbType::Grouping => Ok(AggKind::Grouping),
PbType::InternalLastSeenValue => Ok(AggKind::InternalLastSeenValue),
PbType::Unspecified => bail!("Unrecognized agg."),
}
}
Expand Down Expand Up @@ -294,8 +298,9 @@ impl AggKind {
Self::VarSamp => PbType::VarSamp,
Self::PercentileCont => PbType::PercentileCont,
Self::PercentileDisc => PbType::PercentileDisc,
Self::Grouping => PbType::Grouping,
Self::Mode => PbType::Mode,
Self::Grouping => PbType::Grouping,
Self::InternalLastSeenValue => PbType::InternalLastSeenValue,
}
}
}
Expand Down Expand Up @@ -422,6 +427,7 @@ pub mod agg_kinds {
| AggKind::BoolAnd
| AggKind::BoolOr
| AggKind::ApproxCountDistinct
| AggKind::InternalLastSeenValue
};
}
pub use single_value_state;
Expand Down Expand Up @@ -450,7 +456,11 @@ impl AggKind {
/// Get the total phase agg kind from the partial phase agg kind.
pub fn partial_to_total(self) -> Option<Self> {
match self {
AggKind::BitXor | AggKind::Min | AggKind::Max | AggKind::Sum => Some(self),
AggKind::BitXor
| AggKind::Min
| AggKind::Max
| AggKind::Sum
| AggKind::InternalLastSeenValue => Some(self),
AggKind::Sum0 | AggKind::Count => Some(AggKind::Sum0),
agg_kinds::simply_cannot_two_phase!() => None,
agg_kinds::rewritten!() => None,
Expand Down
9 changes: 9 additions & 0 deletions src/expr/impl/src/aggregate/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ fn last_value<T>(_: T, input: T) -> T {
input
}

#[aggregate("internal_last_seen_value(*) -> auto", state = "ref")]
fn internal_last_seen_value<T>(state: T, input: T, retract: bool) -> T {
if retract {
state
} else {
input
}
}

/// Note the following corner cases:
///
/// ```slt
Expand Down
24 changes: 12 additions & 12 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1395,20 +1395,20 @@
sq_1.col_2;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [max(max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))))] }
└─BatchHashAgg { group_key: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))], aggs: [max(max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))))] }
└─BatchExchange { order: [], dist: HashShard(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))) }
└─BatchHashAgg { group_key: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))], aggs: [max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)))] }
└─BatchSortAgg { group_key: [lineitem.l_orderkey], aggs: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))] }
└─BatchProject { exprs: [max(max(internal_last_seen_value(lineitem.l_commitdate)))] }
└─BatchHashAgg { group_key: [internal_last_seen_value(lineitem.l_commitdate)], aggs: [max(max(internal_last_seen_value(lineitem.l_commitdate)))] }
└─BatchExchange { order: [], dist: HashShard(internal_last_seen_value(lineitem.l_commitdate)) }
└─BatchHashAgg { group_key: [internal_last_seen_value(lineitem.l_commitdate)], aggs: [max(internal_last_seen_value(lineitem.l_commitdate))] }
└─BatchSortAgg { group_key: [lineitem.l_orderkey], aggs: [internal_last_seen_value(lineitem.l_commitdate)] }
└─BatchScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_commitdate], distribution: UpstreamHashShard(lineitem.l_orderkey) }
stream_plan: |-
StreamMaterialize { columns: [col_0, first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))(hidden)], stream_key: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))], pk_columns: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))], pk_conflict: NoCheck }
└─StreamProject { exprs: [max(max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)))), first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))] }
└─StreamHashAgg { group_key: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))], aggs: [max(max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)))), count] }
└─StreamExchange { dist: HashShard(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))) }
└─StreamHashAgg { group_key: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)), $expr1], aggs: [max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))), count] }
└─StreamProject { exprs: [lineitem.l_orderkey, first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)), Vnode(lineitem.l_orderkey) as $expr1] }
└─StreamHashAgg { group_key: [lineitem.l_orderkey], aggs: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)), count] }
StreamMaterialize { columns: [col_0, internal_last_seen_value(lineitem.l_commitdate)(hidden)], stream_key: [internal_last_seen_value(lineitem.l_commitdate)], pk_columns: [internal_last_seen_value(lineitem.l_commitdate)], pk_conflict: NoCheck }
└─StreamProject { exprs: [max(max(internal_last_seen_value(lineitem.l_commitdate))), internal_last_seen_value(lineitem.l_commitdate)] }
└─StreamHashAgg { group_key: [internal_last_seen_value(lineitem.l_commitdate)], aggs: [max(max(internal_last_seen_value(lineitem.l_commitdate))), count] }
└─StreamExchange { dist: HashShard(internal_last_seen_value(lineitem.l_commitdate)) }
└─StreamHashAgg { group_key: [internal_last_seen_value(lineitem.l_commitdate), $expr1], aggs: [max(internal_last_seen_value(lineitem.l_commitdate)), count] }
└─StreamProject { exprs: [lineitem.l_orderkey, internal_last_seen_value(lineitem.l_commitdate), Vnode(lineitem.l_orderkey) as $expr1] }
└─StreamHashAgg { group_key: [lineitem.l_orderkey], aggs: [internal_last_seen_value(lineitem.l_commitdate), count] }
└─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_commitdate], pk: [lineitem.l_orderkey], dist: UpstreamHashShard(lineitem.l_orderkey) }
- name: two phase agg on hop window input should use two phase agg
sql: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@
select t2.c, t2.d, count(distinct t.a) from t join t2 on t.a = t2.c group by t2.c, t2.d;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [first_value(t2.c order_by(t2.c ASC))], aggs: [first_value(first_value(t2.d order_by(t2.d ASC)) order_by(first_value(t2.d order_by(t2.d ASC)) ASC)), count(t.a)] }
└─BatchExchange { order: [], dist: HashShard(first_value(t2.c order_by(t2.c ASC))) }
└─BatchHashAgg { group_key: [t.a], aggs: [first_value(t2.c order_by(t2.c ASC)), first_value(t2.d order_by(t2.d ASC))] }
└─BatchHashAgg { group_key: [internal_last_seen_value(t2.c)], aggs: [internal_last_seen_value(internal_last_seen_value(t2.d)), count(t.a)] }
└─BatchExchange { order: [], dist: HashShard(internal_last_seen_value(t2.c)) }
└─BatchHashAgg { group_key: [t.a], aggs: [internal_last_seen_value(t2.c), internal_last_seen_value(t2.d)] }
└─BatchLookupJoin { type: Inner, predicate: t.a = t2.c, output: [t2.c, t2.d, t.a] }
└─BatchExchange { order: [], dist: UpstreamHashShard(t.a) }
└─BatchScan { table: t, columns: [t.a], distribution: SomeShard }
Expand Down
59 changes: 23 additions & 36 deletions src/frontend/planner_test/tests/testdata/output/except.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,20 @@
create table t2 (a int, b numeric, c bigint, primary key(a));
select * from t1 except select * from t2;
optimized_logical_plan_for_batch: |-
LogicalAgg { group_key: [t1.a], aggs: [first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC))] }
LogicalAgg { group_key: [t1.a], aggs: [internal_last_seen_value(t1.b), internal_last_seen_value(t1.c)] }
└─LogicalJoin { type: LeftAnti, on: IsNotDistinctFrom(t1.a, t2.a) AND IsNotDistinctFrom(t1.b, t2.b) AND IsNotDistinctFrom(t1.c, t2.c), output: all }
├─LogicalScan { table: t1, columns: [t1.a, t1.b, t1.c] }
└─LogicalScan { table: t2, columns: [t2.a, t2.b, t2.c] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [t1.a], aggs: [first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC))] }
└─BatchHashAgg { group_key: [t1.a], aggs: [internal_last_seen_value(t1.b), internal_last_seen_value(t1.c)] }
└─BatchLookupJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all }
└─BatchExchange { order: [], dist: UpstreamHashShard(t1.a) }
└─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: UpstreamHashShard(t1.a) }
stream_plan: |-
StreamMaterialize { columns: [a, b, c], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck }
└─StreamProject { exprs: [t1.a, first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC))] }
└─StreamHashAgg { group_key: [t1.a], aggs: [first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC)), count] }
└─StreamProject { exprs: [t1.a, internal_last_seen_value(t1.b), internal_last_seen_value(t1.c)] }
└─StreamHashAgg { group_key: [t1.a], aggs: [internal_last_seen_value(t1.b), internal_last_seen_value(t1.c), count] }
└─StreamExchange { dist: HashShard(t1.a) }
└─StreamHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all }
├─StreamExchange { dist: HashShard(t1.a, t1.b, t1.c) }
Expand All @@ -130,91 +130,78 @@
└─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.a], dist: UpstreamHashShard(t2.a) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [a, b, c], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck } { materialized table: 4294967294 }
└── StreamProject { exprs: [t1.a, first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC))] }
└── StreamHashAgg { group_key: [t1.a], aggs: [first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC)), count] }
├── intermediate state table: 2
├── state tables: [ 0, 1 ]
StreamMaterialize { columns: [a, b, c], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamProject { exprs: [t1.a, internal_last_seen_value(t1.b), internal_last_seen_value(t1.c)] }
└── StreamHashAgg { group_key: [t1.a], aggs: [internal_last_seen_value(t1.b), internal_last_seen_value(t1.c), count] }
├── intermediate state table: 0
├── state tables: []
├── distinct tables: []
└── StreamExchange Hash([0]) from 1
Fragment 1
StreamHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all }
├── left table: 3
├── right table: 5
├── left degree table: 4
├── right degree table: 6
├── left table: 1
├── right table: 3
├── left degree table: 2
├── right degree table: 4
├── StreamExchange Hash([0, 1, 2]) from 2
└── StreamExchange Hash([0, 1, 2]) from 3
Fragment 2
Chain { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } { state table: 7 }
Chain { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } { state table: 5 }
├── Upstream
└── BatchPlanNode
Fragment 3
Chain { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.a], dist: UpstreamHashShard(t2.a) } { state table: 8 }
Chain { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.a], dist: UpstreamHashShard(t2.a) } { state table: 6 }
├── Upstream
└── BatchPlanNode
Table 0
├── columns: [ t1_a, t1_b, t1_c ]
├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 1
├── columns: [ t1_a, t1_c, t1_b ]
├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 2
├── columns: [ t1_a, first_value(t1_b order_by(t1_b ASC)), first_value(t1_c order_by(t1_c ASC)), count ]
├── columns: [ t1_a, internal_last_seen_value(t1_b), internal_last_seen_value(t1_c), count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2, 3 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 3
Table 1
├── columns: [ t1_a, t1_b, t1_c ]
├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: [ 0, 1, 2 ]
└── read pk prefix len hint: 3
Table 4
Table 2
├── columns: [ t1_a, t1_b, t1_c, _degree ]
├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
├── value indices: [ 3 ]
├── distribution key: [ 0, 1, 2 ]
└── read pk prefix len hint: 3
Table 5
Table 3
├── columns: [ t2_a, t2_b, t2_c ]
├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: [ 0, 1, 2 ]
└── read pk prefix len hint: 3
Table 6
Table 4
├── columns: [ t2_a, t2_b, t2_c, _degree ]
├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
├── value indices: [ 3 ]
├── distribution key: [ 0, 1, 2 ]
└── read pk prefix len hint: 3
Table 7
Table 5
├── columns: [ vnode, a, t1_backfill_finished, t1_row_count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2, 3 ]
├── distribution key: [ 0 ]
├── read pk prefix len hint: 1
└── vnode column idx: 0
Table 8
Table 6
├── columns: [ vnode, a, t2_backfill_finished, t2_row_count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2, 3 ]
Expand Down
Loading

0 comments on commit e64543f

Please sign in to comment.