Skip to content

Commit

Permalink
fix(optimizer): reduce expr tree depth when merge logical operations (#…
Browse files Browse the repository at this point in the history
…17342)

Signed-off-by: Bugen Zhao <[email protected]>
Co-authored-by: Dylan <[email protected]>
Co-authored-by: xxchan <[email protected]>
  • Loading branch information
3 people authored Jun 21, 2024
1 parent 4a25f01 commit 8a4bd3f
Show file tree
Hide file tree
Showing 17 changed files with 205 additions and 98 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

120 changes: 120 additions & 0 deletions e2e_test/streaming/bug_fixes/stack_overflow_17342.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
statement ok
SET streaming_parallelism TO 1;

statement ok
CREATE TABLE t (v int);

# This query used to overflow the stack during optimization as it generated a left-deep tree
# of `OR xx IS NOT NULL` expression in the filter after each full outer join.
skipif madsim
statement ok
CREATE MATERIALIZED VIEW mv AS
SELECT
count(*)
FROM
t
FULL OUTER JOIN t t1 USING (v)
FULL OUTER JOIN t t2 USING (v)
FULL OUTER JOIN t t3 USING (v)
FULL OUTER JOIN t t4 USING (v)
FULL OUTER JOIN t t5 USING (v)
FULL OUTER JOIN t t6 USING (v)
FULL OUTER JOIN t t7 USING (v)
FULL OUTER JOIN t t8 USING (v)
FULL OUTER JOIN t t9 USING (v)
FULL OUTER JOIN t t10 USING (v)
FULL OUTER JOIN t t11 USING (v)
FULL OUTER JOIN t t12 USING (v)
FULL OUTER JOIN t t13 USING (v)
FULL OUTER JOIN t t14 USING (v)
FULL OUTER JOIN t t15 USING (v)
FULL OUTER JOIN t t16 USING (v)
FULL OUTER JOIN t t17 USING (v)
FULL OUTER JOIN t t18 USING (v)
FULL OUTER JOIN t t19 USING (v)
FULL OUTER JOIN t t20 USING (v)
FULL OUTER JOIN t t21 USING (v)
FULL OUTER JOIN t t22 USING (v)
FULL OUTER JOIN t t23 USING (v)
FULL OUTER JOIN t t24 USING (v)
FULL OUTER JOIN t t25 USING (v)
FULL OUTER JOIN t t26 USING (v)
FULL OUTER JOIN t t27 USING (v)
FULL OUTER JOIN t t28 USING (v)
FULL OUTER JOIN t t29 USING (v)
FULL OUTER JOIN t t30 USING (v)
FULL OUTER JOIN t t31 USING (v)
FULL OUTER JOIN t t32 USING (v)
FULL OUTER JOIN t t33 USING (v)
FULL OUTER JOIN t t34 USING (v)
FULL OUTER JOIN t t35 USING (v)
FULL OUTER JOIN t t36 USING (v)
FULL OUTER JOIN t t37 USING (v)
FULL OUTER JOIN t t38 USING (v)
FULL OUTER JOIN t t39 USING (v)
FULL OUTER JOIN t t40 USING (v)
FULL OUTER JOIN t t41 USING (v)
FULL OUTER JOIN t t42 USING (v)
FULL OUTER JOIN t t43 USING (v)
FULL OUTER JOIN t t44 USING (v)
FULL OUTER JOIN t t45 USING (v)
FULL OUTER JOIN t t46 USING (v)
FULL OUTER JOIN t t47 USING (v)
FULL OUTER JOIN t t48 USING (v)
FULL OUTER JOIN t t49 USING (v)
FULL OUTER JOIN t t50 USING (v)
FULL OUTER JOIN t t51 USING (v)
FULL OUTER JOIN t t52 USING (v)
FULL OUTER JOIN t t53 USING (v)
FULL OUTER JOIN t t54 USING (v)
FULL OUTER JOIN t t55 USING (v)
FULL OUTER JOIN t t56 USING (v)
FULL OUTER JOIN t t57 USING (v)
FULL OUTER JOIN t t58 USING (v)
FULL OUTER JOIN t t59 USING (v)
FULL OUTER JOIN t t60 USING (v)
FULL OUTER JOIN t t61 USING (v)
FULL OUTER JOIN t t62 USING (v)
FULL OUTER JOIN t t63 USING (v)
FULL OUTER JOIN t t64 USING (v)
FULL OUTER JOIN t t65 USING (v)
FULL OUTER JOIN t t66 USING (v)
FULL OUTER JOIN t t67 USING (v)
FULL OUTER JOIN t t68 USING (v)
FULL OUTER JOIN t t69 USING (v)
FULL OUTER JOIN t t70 USING (v)
FULL OUTER JOIN t t71 USING (v)
FULL OUTER JOIN t t72 USING (v)
FULL OUTER JOIN t t73 USING (v)
FULL OUTER JOIN t t74 USING (v)
FULL OUTER JOIN t t75 USING (v)
FULL OUTER JOIN t t76 USING (v)
FULL OUTER JOIN t t77 USING (v)
FULL OUTER JOIN t t78 USING (v)
FULL OUTER JOIN t t79 USING (v)
FULL OUTER JOIN t t80 USING (v)
FULL OUTER JOIN t t81 USING (v)
FULL OUTER JOIN t t82 USING (v)
FULL OUTER JOIN t t83 USING (v)
FULL OUTER JOIN t t84 USING (v)
FULL OUTER JOIN t t85 USING (v)
FULL OUTER JOIN t t86 USING (v)
FULL OUTER JOIN t t87 USING (v)
FULL OUTER JOIN t t88 USING (v)
FULL OUTER JOIN t t89 USING (v)
FULL OUTER JOIN t t90 USING (v)
FULL OUTER JOIN t t91 USING (v)
FULL OUTER JOIN t t92 USING (v)
FULL OUTER JOIN t t93 USING (v)
FULL OUTER JOIN t t94 USING (v)
FULL OUTER JOIN t t95 USING (v)
FULL OUTER JOIN t t96 USING (v)
FULL OUTER JOIN t t97 USING (v)
FULL OUTER JOIN t t98 USING (v)
;

statement ok
DROP TABLE t CASCADE;

statement ok
SET streaming_parallelism TO DEFAULT;
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@
select * from t1 where a = 1 or b = 2 or c = 3 or p = 4 or a = 5
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((((t1.a = 1:Int32) OR (t1.b = 2:Decimal)) OR (t1.c = 3:Int32)) OR (t1.p = 4:Int32)) OR (t1.a = 5:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 }
└─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((((t1.a = 1:Int32) OR (t1.b = 2:Decimal)) OR ((t1.c = 3:Int32) OR (t1.p = 4:Int32))) OR (t1.a = 5:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 }
└─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t1._row_id) }
└─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(idx1.t1._row_id) }
Expand All @@ -346,7 +346,7 @@
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: idx4, columns: [idx4.t1._row_id], scan_ranges: [idx4.p = Int32(4)], distribution: SomeShard }
batch_local_plan: |-
BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((((t1.a = 1:Int32) OR (t1.b = 2:Decimal)) OR (t1.c = 3:Int32)) OR (t1.p = 4:Int32)) OR (t1.a = 5:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 }
BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((((t1.a = 1:Int32) OR (t1.b = 2:Decimal)) OR ((t1.c = 3:Int32) OR (t1.p = 4:Int32))) OR (t1.a = 5:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 }
└─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] }
└─BatchUnion { all: true }
├─BatchExchange { order: [], dist: Single }
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/planner_test/tests/testdata/output/join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@
StreamMaterialize { columns: [x, i.t._row_id(hidden), i.t._row_id#1(hidden), i.x(hidden), i.t._row_id#2(hidden), i.t._row_id#3(hidden), i.x#1(hidden)], stream_key: [i.t._row_id, i.t._row_id#1, i.x, i.t._row_id#2, i.t._row_id#3, i.x#1], pk_columns: [i.t._row_id, i.t._row_id#1, i.x, i.t._row_id#2, i.t._row_id#3, i.x#1], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x) }
└─StreamProject { exprs: [Coalesce(i.x, i.x) as $expr1, i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x] }
└─StreamFilter { predicate: (((((IsNotNull(i.t._row_id) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.x)) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.x)) }
└─StreamFilter { predicate: (((IsNotNull(i.t._row_id) OR IsNotNull(i.t._row_id)) OR (IsNotNull(i.x) OR IsNotNull(i.t._row_id))) OR (IsNotNull(i.t._row_id) OR IsNotNull(i.x))) }
└─StreamHashJoin { type: FullOuter, predicate: i.x = i.x, output: [i.x, i.x, i.t._row_id, i.t._row_id, i.t._row_id, i.t._row_id] }
├─StreamShare { id: 4 }
│ └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] }
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/planner_test/tests/testdata/output/nexmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,20 @@
sql: SELECT auction, price FROM bid WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) }
└─BatchFilter { predicate: ((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR ((bid.auction = 2001:Int32) OR (bid.auction = 2019:Int32))) OR (bid.auction = 2087:Int32)) }
└─BatchScan { table: bid, columns: [bid.auction, bid.price], distribution: SomeShard }
sink_plan: |-
StreamSink { type: append-only, columns: [auction, price, bid._row_id(hidden)] }
└─StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) }
└─StreamFilter { predicate: ((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR ((bid.auction = 2001:Int32) OR (bid.auction = 2019:Int32))) OR (bid.auction = 2087:Int32)) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
stream_plan: |-
StreamMaterialize { columns: [auction, price, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: NoCheck }
└─StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) }
└─StreamFilter { predicate: ((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR ((bid.auction = 2001:Int32) OR (bid.auction = 2019:Int32))) OR (bid.auction = 2087:Int32)) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [auction, price, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] }
└── StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) }
└── StreamFilter { predicate: ((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR ((bid.auction = 2001:Int32) OR (bid.auction = 2019:Int32))) OR (bid.auction = 2087:Int32)) }
└── StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
├── tables: [ StreamScan: 0 ]
├── Upstream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,21 @@
sql: SELECT auction, price FROM bid WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchFilter { predicate: (((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR (auction = 2001:Int32)) OR (auction = 2019:Int32)) OR (auction = 2087:Int32)) }
└─BatchFilter { predicate: ((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR ((auction = 2001:Int32) OR (auction = 2019:Int32))) OR (auction = 2087:Int32)) }
└─BatchProject { exprs: [auction, price] }
└─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
stream_plan: |-
StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [auction, price, _row_id] }
└─StreamFilter { predicate: (((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR (auction = 2001:Int32)) OR (auction = 2019:Int32)) OR (auction = 2087:Int32)) }
└─StreamFilter { predicate: ((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR ((auction = 2001:Int32) OR (auction = 2019:Int32))) OR (auction = 2087:Int32)) }
└─StreamRowIdGen { row_id_index: 7 }
└─StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
├── tables: [ Materialize: 4294967294 ]
└── StreamProject { exprs: [auction, price, _row_id] }
└── StreamFilter { predicate: (((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR (auction = 2001:Int32)) OR (auction = 2019:Int32)) OR (auction = 2087:Int32)) }
└── StreamFilter { predicate: ((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR ((auction = 2001:Int32) OR (auction = 2019:Int32))) OR (auction = 2087:Int32)) }
└── StreamRowIdGen { row_id_index: 7 }
└── StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] } { tables: [ Source: 0 ] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [Field(bid, 0:Int32) as $expr3, Field(bid, 2:Int32) as $expr4, _row_id] }
└─StreamDynamicFilter { predicate: ($expr1 > $expr2), output_watermarks: [$expr1], output: [event_type, person, auction, bid, $expr1, _row_id], cleaned_by_watermark: true }
├─StreamFilter { predicate: (((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR (Field(bid, 0:Int32) = 2001:Int32)) OR (Field(bid, 0:Int32) = 2019:Int32)) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
├─StreamFilter { predicate: ((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR ((Field(bid, 0:Int32) = 2001:Int32) OR (Field(bid, 0:Int32) = 2019:Int32))) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
│ └─StreamRowIdGen { row_id_index: 5 }
│ └─StreamProject { exprs: [event_type, person, auction, bid, Proctime as $expr1, _row_id], output_watermarks: [$expr1] }
│ └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] }
Expand All @@ -128,7 +128,7 @@
StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] }
└── StreamProject { exprs: [Field(bid, 0:Int32) as $expr3, Field(bid, 2:Int32) as $expr4, _row_id] }
└── StreamDynamicFilter { predicate: ($expr1 > $expr2), output_watermarks: [$expr1], output: [event_type, person, auction, bid, $expr1, _row_id], cleaned_by_watermark: true } { tables: [ DynamicFilterLeft: 0, DynamicFilterRight: 1 ] }
├── StreamFilter { predicate: (((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR (Field(bid, 0:Int32) = 2001:Int32)) OR (Field(bid, 0:Int32) = 2019:Int32)) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
├── StreamFilter { predicate: ((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR ((Field(bid, 0:Int32) = 2001:Int32) OR (Field(bid, 0:Int32) = 2019:Int32))) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
│ └── StreamRowIdGen { row_id_index: 5 }
│ └── StreamProject { exprs: [event_type, person, auction, bid, Proctime as $expr1, _row_id], output_watermarks: [$expr1] }
│ └── StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] } { tables: [ Source: 2 ] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [Field(bid, 0:Int32) as $expr2, Field(bid, 2:Int32) as $expr3] }
└─BatchFilter { predicate: (((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR (Field(bid, 0:Int32) = 2001:Int32)) OR (Field(bid, 0:Int32) = 2019:Int32)) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
└─BatchFilter { predicate: ((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR ((Field(bid, 0:Int32) = 2001:Int32) OR (Field(bid, 0:Int32) = 2019:Int32))) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
└─BatchProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id] }
└─BatchSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] }
stream_plan: |-
StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [Field(bid, 0:Int32) as $expr2, Field(bid, 2:Int32) as $expr3, _row_id] }
└─StreamFilter { predicate: (((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR (Field(bid, 0:Int32) = 2001:Int32)) OR (Field(bid, 0:Int32) = 2019:Int32)) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
└─StreamFilter { predicate: ((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR ((Field(bid, 0:Int32) = 2001:Int32) OR (Field(bid, 0:Int32) = 2019:Int32))) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
└─StreamRowIdGen { row_id_index: 5 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
└─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id] }
Expand All @@ -92,7 +92,7 @@
Fragment 0
StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] }
└── StreamProject { exprs: [Field(bid, 0:Int32) as $expr2, Field(bid, 2:Int32) as $expr3, _row_id] }
└── StreamFilter { predicate: (((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR (Field(bid, 0:Int32) = 2001:Int32)) OR (Field(bid, 0:Int32) = 2019:Int32)) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
└── StreamFilter { predicate: ((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR ((Field(bid, 0:Int32) = 2001:Int32) OR (Field(bid, 0:Int32) = 2019:Int32))) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
└── StreamRowIdGen { row_id_index: 5 }
└── StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] } { tables: [ WatermarkFilter: 0 ] }
└── StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id] }
Expand Down
Loading

0 comments on commit 8a4bd3f

Please sign in to comment.