Skip to content

Commit

Permalink
feat(optimizer): optimize join stream key (#12831)
Browse files Browse the repository at this point in the history
Co-authored-by: Eric Fu <[email protected]>
  • Loading branch information
chenzl25 and fuyufjh authored Oct 14, 2023
1 parent 103826b commit 387d251
Show file tree
Hide file tree
Showing 18 changed files with 631 additions and 519 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
└─StreamSimpleAgg { aggs: [sum0(count), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [count] }
└─StreamHashJoin { type: Inner, predicate: t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id, output: [t._row_id, t._row_id, t.id, t._row_id, t._row_id, t.id, t.id, t.id, t._row_id, t._row_id, t.id, t._row_id, t._row_id, t.id, t.id] }
└─StreamHashJoin { type: Inner, predicate: t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id, output: [t._row_id, t._row_id, t.id, t._row_id, t._row_id, t.id, t.id, t.id, t._row_id, t._row_id, t.id, t._row_id, t._row_id, t.id] }
├─StreamExchange { dist: HashShard(t.id) }
│ └─StreamHashJoin { type: Inner, predicate: t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id, output: [t.id, t.id, t.id, t.id, t._row_id, t._row_id, t._row_id, t._row_id] }
│ ├─StreamExchange { dist: HashShard(t.id) }
Expand Down
410 changes: 203 additions & 207 deletions src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
/* should generate delta join plan, and stream index scan */
select * from a join b on a.a1 = b.b1 ;
stream_plan: |-
StreamMaterialize { columns: [a1, a2, b1, b2], stream_key: [a1, b1], pk_columns: [a1, b1], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(a.a1, b.b1) }
StreamMaterialize { columns: [a1, a2, b1, b2], stream_key: [a1], pk_columns: [a1], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(a.a1) }
└─StreamDeltaJoin { type: Inner, predicate: a.a1 = b.b1, output: all }
├─StreamTableScan { table: a, columns: [a.a1, a.a2], pk: [a.a1], dist: UpstreamHashShard(a.a1) }
└─StreamTableScan { table: b, columns: [b.b1, b.b2], pk: [b.b1], dist: UpstreamHashShard(b.b1) }
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@
└─BatchExchange { order: [], dist: HashShard(a.k1) }
└─BatchScan { table: a, columns: [a.k1], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), a.k1(hidden)], stream_key: [ak1.a._row_id, a.k1, ak1.k1], pk_columns: [ak1.a._row_id, a.k1, ak1.k1], pk_conflict: NoCheck }
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), a.k1(hidden)], stream_key: [ak1.a._row_id, ak1.k1], pk_columns: [ak1.a._row_id, ak1.k1], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: ak1.k1 = a.k1, output: [ak1.v, count, ak1.a._row_id, ak1.k1, a.k1] }
├─StreamExchange { dist: HashShard(ak1.k1) }
│ └─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Expand All @@ -971,9 +971,13 @@
└─StreamTableScan { table: a, columns: [a.k1, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), a.k1(hidden)], stream_key: [ak1.a._row_id, a.k1, ak1.k1], pk_columns: [ak1.a._row_id, a.k1, ak1.k1], pk_conflict: NoCheck }
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), a.k1(hidden)], stream_key: [ak1.a._row_id, ak1.k1], pk_columns: [ak1.a._row_id, ak1.k1], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamHashJoin { type: Inner, predicate: ak1.k1 = a.k1, output: [ak1.v, count, ak1.a._row_id, ak1.k1, a.k1] } { left table: 0, right table: 2, left degree table: 1, right degree table: 3 }
└── StreamHashJoin { type: Inner, predicate: ak1.k1 = a.k1, output: [ak1.v, count, ak1.a._row_id, ak1.k1, a.k1] }
├── left table: 0
├── right table: 2
├── left degree table: 1
├── right degree table: 3
├── StreamExchange Hash([0]) from 1
└── StreamHashAgg { group_key: [a.k1], aggs: [count] } { intermediate state table: 5, state tables: [], distinct tables: [] }
└── StreamExchange Hash([0]) from 2
Expand Down Expand Up @@ -1014,7 +1018,12 @@
├── read pk prefix len hint: 1
└── vnode column idx: 0
Table 4294967294 { columns: [ v, bv, ak1.a._row_id, ak1.k1, a.k1 ], primary key: [ $2 ASC, $4 ASC, $3 ASC ], value indices: [ 0, 1, 2, 3, 4 ], distribution key: [ 3 ], read pk prefix len hint: 3 }
Table 4294967294
├── columns: [ v, bv, ak1.a._row_id, ak1.k1, a.k1 ]
├── primary key: [ $2 ASC, $3 ASC ]
├── value indices: [ 0, 1, 2, 3, 4 ]
├── distribution key: [ 3 ]
└── read pk prefix len hint: 2
- id: aggk1_join_Ak1_onk1
before:
Expand Down Expand Up @@ -1146,7 +1155,7 @@
└─BatchExchange { order: [], dist: HashShard(b.k1) }
└─BatchScan { table: b, columns: [b.k1], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [num, bv, a.k1(hidden), b.k1(hidden)], stream_key: [a.k1, b.k1], pk_columns: [a.k1, b.k1], pk_conflict: NoCheck }
StreamMaterialize { columns: [num, bv, a.k1(hidden), b.k1(hidden)], stream_key: [a.k1], pk_columns: [a.k1], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: a.k1 = b.k1, output: [count, count, a.k1, b.k1] }
├─StreamHashAgg { group_key: [a.k1], aggs: [count] }
│ └─StreamExchange { dist: HashShard(a.k1) }
Expand All @@ -1156,7 +1165,7 @@
└─StreamTableScan { table: b, columns: [b.k1, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [num, bv, a.k1(hidden), b.k1(hidden)], stream_key: [a.k1, b.k1], pk_columns: [a.k1, b.k1], pk_conflict: NoCheck }
StreamMaterialize { columns: [num, bv, a.k1(hidden), b.k1(hidden)], stream_key: [a.k1], pk_columns: [a.k1], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamHashJoin { type: Inner, predicate: a.k1 = b.k1, output: [count, count, a.k1, b.k1] }
├── left table: 0
Expand All @@ -1178,15 +1187,40 @@
├── Upstream
└── BatchPlanNode
Table 0 { columns: [ a_k1, count ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 0
├── columns: [ a_k1, count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0, 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 1 { columns: [ a_k1, _degree ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 1
├── columns: [ a_k1, _degree ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 2 { columns: [ b_k1, count ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 2
├── columns: [ b_k1, count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0, 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 3 { columns: [ b_k1, _degree ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 3
├── columns: [ b_k1, _degree ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 4 { columns: [ a_k1, count ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 4
├── columns: [ a_k1, count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 5
├── columns: [ vnode, _row_id, a_backfill_finished, a_row_count ]
Expand All @@ -1196,7 +1230,12 @@
├── read pk prefix len hint: 1
└── vnode column idx: 0
Table 6 { columns: [ b_k1, count ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 6
├── columns: [ b_k1, count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 7
├── columns: [ vnode, _row_id, b_backfill_finished, b_row_count ]
Expand All @@ -1208,10 +1247,10 @@
Table 4294967294
├── columns: [ num, bv, a.k1, b.k1 ]
├── primary key: [ $2 ASC, $3 ASC ]
├── primary key: [ $2 ASC ]
├── value indices: [ 0, 1, 2, 3 ]
├── distribution key: [ 2 ]
└── read pk prefix len hint: 2
└── read pk prefix len hint: 1
- sql: |
create table t1 (row_id int, uid int, v int, created_at timestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
└─BatchFilter { predicate: IsNotNull(all_sales.salesperson_id) }
└─BatchScan { table: all_sales, columns: [all_sales.salesperson_id, all_sales.amount], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [name, amount, customer_name, salesperson._row_id(hidden), all_sales._row_id(hidden), salesperson.id(hidden), all_sales.amount(hidden), salesperson.id#1(hidden)], stream_key: [salesperson._row_id, all_sales._row_id, salesperson.id, salesperson.id#1, all_sales.amount], pk_columns: [salesperson._row_id, all_sales._row_id, salesperson.id, salesperson.id#1, all_sales.amount], pk_conflict: NoCheck }
StreamMaterialize { columns: [name, amount, customer_name, salesperson._row_id(hidden), all_sales._row_id(hidden), salesperson.id(hidden), all_sales.amount(hidden), salesperson.id#1(hidden)], stream_key: [salesperson._row_id, all_sales._row_id, salesperson.id, all_sales.amount], pk_columns: [salesperson._row_id, all_sales._row_id, salesperson.id, all_sales.amount], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: salesperson.id IS NOT DISTINCT FROM salesperson.id AND all_sales.amount = max(all_sales.amount), output: [salesperson.name, max(all_sales.amount), all_sales.customer_name, salesperson._row_id, all_sales._row_id, salesperson.id, all_sales.amount, salesperson.id] }
├─StreamHashJoin { type: Inner, predicate: salesperson.id = all_sales.salesperson_id, output: [salesperson.id, salesperson.name, all_sales.customer_name, all_sales.amount, salesperson._row_id, all_sales._row_id] }
│ ├─StreamExchange { dist: HashShard(salesperson.id) }
Expand Down Expand Up @@ -86,7 +86,7 @@
└─BatchFilter { predicate: IsNotNull(all_sales.salesperson_id) }
└─BatchScan { table: all_sales, columns: [all_sales.salesperson_id, all_sales.customer_name, all_sales.amount], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [name, amount, customer_name, salesperson._row_id(hidden), salesperson.id(hidden), all_sales.salesperson_id(hidden)], stream_key: [salesperson._row_id, all_sales.salesperson_id, salesperson.id], pk_columns: [salesperson._row_id, all_sales.salesperson_id, salesperson.id], pk_conflict: NoCheck }
StreamMaterialize { columns: [name, amount, customer_name, salesperson._row_id(hidden), salesperson.id(hidden), all_sales.salesperson_id(hidden)], stream_key: [salesperson._row_id, salesperson.id], pk_columns: [salesperson._row_id, salesperson.id], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: salesperson.id IS NOT DISTINCT FROM all_sales.salesperson_id, output: [salesperson.name, all_sales.amount, all_sales.customer_name, salesperson._row_id, salesperson.id, all_sales.salesperson_id] }
├─StreamExchange { dist: HashShard(salesperson.id) }
│ └─StreamTableScan { table: salesperson, columns: [salesperson.id, salesperson.name, salesperson._row_id], pk: [salesperson._row_id], dist: UpstreamHashShard(salesperson._row_id) }
Expand Down Expand Up @@ -123,7 +123,7 @@
└─BatchFilter { predicate: IsNotNull(all_sales.salesperson_id) }
└─BatchScan { table: all_sales, columns: [all_sales.salesperson_id, all_sales.customer_name, all_sales.amount], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [name, amount, customer_name, salesperson._row_id(hidden), salesperson.id(hidden), all_sales.salesperson_id(hidden)], stream_key: [salesperson._row_id, all_sales.salesperson_id, salesperson.id], pk_columns: [salesperson._row_id, all_sales.salesperson_id, salesperson.id], pk_conflict: NoCheck }
StreamMaterialize { columns: [name, amount, customer_name, salesperson._row_id(hidden), salesperson.id(hidden), all_sales.salesperson_id(hidden)], stream_key: [salesperson._row_id, salesperson.id], pk_columns: [salesperson._row_id, salesperson.id], pk_conflict: NoCheck }
└─StreamHashJoin { type: LeftOuter, predicate: salesperson.id IS NOT DISTINCT FROM all_sales.salesperson_id, output: [salesperson.name, all_sales.amount, all_sales.customer_name, salesperson._row_id, salesperson.id, all_sales.salesperson_id] }
├─StreamExchange { dist: HashShard(salesperson.id) }
│ └─StreamTableScan { table: salesperson, columns: [salesperson.id, salesperson.name, salesperson._row_id], pk: [salesperson._row_id], dist: UpstreamHashShard(salesperson._row_id) }
Expand Down Expand Up @@ -164,7 +164,7 @@
└─BatchExchange { order: [], dist: HashShard(t.arr) }
└─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, arr, unnest, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck }
StreamMaterialize { columns: [x, arr, unnest, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, projected_row_id, arr], pk_columns: [t._row_id, projected_row_id, arr], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, Unnest($0), t._row_id, t.arr, projected_row_id] }
├─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
Expand Down
Loading

0 comments on commit 387d251

Please sign in to comment.