Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(optimizer): reduce expr tree depth when merge logical operations #17342

Merged
merged 11 commits into from
Jun 21, 2024
120 changes: 120 additions & 0 deletions e2e_test/streaming/bug_fixes/stack_overflow_17342.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
statement ok
SET streaming_parallelism TO 1;

statement ok
CREATE TABLE t (v int);

# This query used to overflow the stack during optimization as it generated a left-deep tree
# of `OR xx IS NOT NULL` expression in the filter after each full outer join.
skipif madsim
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

link #17347

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean why it doesn't work in madsim?

Copy link
Member Author

@BugenZhao BugenZhao Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if cfg!(madsim) {
f() // madsim does not support stack growth
} else {
stacker::maybe_grow(RED_ZONE, STACK_SIZE, f)
}

thread '<unnamed>' panicked at /risingwave/.cargo/registry/src/index.crates.io-6f17d22bba15001f/stacker-0.1.15/src/lib.rs:415:13:
assertion `left == right` failed
  left: -1
 right: 0
stack backtrace:
   0: rust_begin_unwind
             at /rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/panicking.rs:645:5
   1: core::panicking::panic_fmt
             at /rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/core/src/panicking.rs:72:14
   2: core::panicking::assert_failed_inner
             at /rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/core/src/panicking.rs:342:17
   3: core::panicking::assert_failed
             at /rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/core/src/panicking.rs:297:5
   4: stacker::guess_os_stack_limit
   5: stacker::STACK_LIMIT::__init
             at ./.cargo/registry/src/index.crates.io-6f17d22bba15001f/stacker-0.1.15/src/lib.rs:119:9
   6: stacker::STACK_LIMIT::__getit::{{closure}}
             at /rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/sys/common/thread_local/fast_local.rs:99:25
   7: std::sys::common::thread_local::lazy::LazyKeyInner<T>::initialize
             at /rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/sys/common/thread_local/mod.rs:54:25
   8: std::sys::common::thread_local::fast_local::Key<T>::try_initialize
             at /rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/sys/common/thread_local/fast_local.rs:190:27
   9: stacker::STACK_LIMIT::__getit
             at /rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/sys/common/thread_local/fast_local.rs:91:21
  10: std::thread::local::LocalKey<T>::try_with
             at /rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/thread/local.rs:269:32
  11: std::thread::local::LocalKey<T>::with
             at /rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/thread/local.rs:246:9
  12: stacker::get_stack_limit
             at ./.cargo/registry/src/index.crates.io-6f17d22bba15001f/stacker-0.1.15/src/lib.rs:125:17
  13: stacker::remaining_stack
             at ./.cargo/registry/src/index.crates.io-6f17d22bba15001f/stacker-0.1.15/src/lib.rs:92:5
  14: stacker::maybe_grow
             at ./.cargo/registry/src/index.crates.io-6f17d22bba15001f/stacker-0.1.15/src/lib.rs:50:30
  15: risingwave_common::util::recursive::Tracker::recurse
             at ./src/common/src/util/recursive.rs:95:9

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe fn guess_os_stack_limit() -> Option<usize> {
    let mut attr = std::mem::MaybeUninit::<libc::pthread_attr_t>::uninit();
    assert_eq!(libc::pthread_attr_init(attr.as_mut_ptr()), 0);
    assert_eq!(libc::pthread_getattr_np(libc::pthread_self(),
                                        attr.as_mut_ptr()), 0);
    let mut stackaddr = std::ptr::null_mut();
    let mut stacksize = 0;
    assert_eq!(libc::pthread_attr_getstack(
        attr.as_ptr(), &mut stackaddr, &mut stacksize
    ), 0);
    assert_eq!(libc::pthread_attr_destroy(attr.as_mut_ptr()), 0);
    Some(stackaddr as usize)
}

Some assertion failed. Possibly due to libc overridden. Haven't dived into it since it's not a big deal.

statement ok
CREATE MATERIALIZED VIEW mv AS
SELECT
count(*)
FROM
t
FULL OUTER JOIN t t1 USING (v)
FULL OUTER JOIN t t2 USING (v)
FULL OUTER JOIN t t3 USING (v)
FULL OUTER JOIN t t4 USING (v)
FULL OUTER JOIN t t5 USING (v)
FULL OUTER JOIN t t6 USING (v)
FULL OUTER JOIN t t7 USING (v)
FULL OUTER JOIN t t8 USING (v)
FULL OUTER JOIN t t9 USING (v)
FULL OUTER JOIN t t10 USING (v)
FULL OUTER JOIN t t11 USING (v)
FULL OUTER JOIN t t12 USING (v)
FULL OUTER JOIN t t13 USING (v)
FULL OUTER JOIN t t14 USING (v)
FULL OUTER JOIN t t15 USING (v)
FULL OUTER JOIN t t16 USING (v)
FULL OUTER JOIN t t17 USING (v)
FULL OUTER JOIN t t18 USING (v)
FULL OUTER JOIN t t19 USING (v)
FULL OUTER JOIN t t20 USING (v)
FULL OUTER JOIN t t21 USING (v)
FULL OUTER JOIN t t22 USING (v)
FULL OUTER JOIN t t23 USING (v)
FULL OUTER JOIN t t24 USING (v)
FULL OUTER JOIN t t25 USING (v)
FULL OUTER JOIN t t26 USING (v)
FULL OUTER JOIN t t27 USING (v)
FULL OUTER JOIN t t28 USING (v)
FULL OUTER JOIN t t29 USING (v)
FULL OUTER JOIN t t30 USING (v)
FULL OUTER JOIN t t31 USING (v)
FULL OUTER JOIN t t32 USING (v)
FULL OUTER JOIN t t33 USING (v)
FULL OUTER JOIN t t34 USING (v)
FULL OUTER JOIN t t35 USING (v)
FULL OUTER JOIN t t36 USING (v)
FULL OUTER JOIN t t37 USING (v)
FULL OUTER JOIN t t38 USING (v)
FULL OUTER JOIN t t39 USING (v)
FULL OUTER JOIN t t40 USING (v)
FULL OUTER JOIN t t41 USING (v)
FULL OUTER JOIN t t42 USING (v)
FULL OUTER JOIN t t43 USING (v)
FULL OUTER JOIN t t44 USING (v)
FULL OUTER JOIN t t45 USING (v)
FULL OUTER JOIN t t46 USING (v)
FULL OUTER JOIN t t47 USING (v)
FULL OUTER JOIN t t48 USING (v)
FULL OUTER JOIN t t49 USING (v)
FULL OUTER JOIN t t50 USING (v)
FULL OUTER JOIN t t51 USING (v)
FULL OUTER JOIN t t52 USING (v)
FULL OUTER JOIN t t53 USING (v)
FULL OUTER JOIN t t54 USING (v)
FULL OUTER JOIN t t55 USING (v)
FULL OUTER JOIN t t56 USING (v)
FULL OUTER JOIN t t57 USING (v)
FULL OUTER JOIN t t58 USING (v)
FULL OUTER JOIN t t59 USING (v)
FULL OUTER JOIN t t60 USING (v)
FULL OUTER JOIN t t61 USING (v)
FULL OUTER JOIN t t62 USING (v)
FULL OUTER JOIN t t63 USING (v)
FULL OUTER JOIN t t64 USING (v)
FULL OUTER JOIN t t65 USING (v)
FULL OUTER JOIN t t66 USING (v)
FULL OUTER JOIN t t67 USING (v)
FULL OUTER JOIN t t68 USING (v)
FULL OUTER JOIN t t69 USING (v)
FULL OUTER JOIN t t70 USING (v)
FULL OUTER JOIN t t71 USING (v)
FULL OUTER JOIN t t72 USING (v)
FULL OUTER JOIN t t73 USING (v)
FULL OUTER JOIN t t74 USING (v)
FULL OUTER JOIN t t75 USING (v)
FULL OUTER JOIN t t76 USING (v)
FULL OUTER JOIN t t77 USING (v)
FULL OUTER JOIN t t78 USING (v)
FULL OUTER JOIN t t79 USING (v)
FULL OUTER JOIN t t80 USING (v)
FULL OUTER JOIN t t81 USING (v)
FULL OUTER JOIN t t82 USING (v)
FULL OUTER JOIN t t83 USING (v)
FULL OUTER JOIN t t84 USING (v)
FULL OUTER JOIN t t85 USING (v)
FULL OUTER JOIN t t86 USING (v)
FULL OUTER JOIN t t87 USING (v)
FULL OUTER JOIN t t88 USING (v)
FULL OUTER JOIN t t89 USING (v)
FULL OUTER JOIN t t90 USING (v)
FULL OUTER JOIN t t91 USING (v)
FULL OUTER JOIN t t92 USING (v)
FULL OUTER JOIN t t93 USING (v)
FULL OUTER JOIN t t94 USING (v)
FULL OUTER JOIN t t95 USING (v)
FULL OUTER JOIN t t96 USING (v)
FULL OUTER JOIN t t97 USING (v)
FULL OUTER JOIN t t98 USING (v)
;

statement ok
DROP TABLE t CASCADE;

statement ok
SET streaming_parallelism TO DEFAULT;
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@
select * from t1 where a = 1 or b = 2 or c = 3 or p = 4 or a = 5
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((((t1.a = 1:Int32) OR (t1.b = 2:Decimal)) OR (t1.c = 3:Int32)) OR (t1.p = 4:Int32)) OR (t1.a = 5:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 }
└─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((((t1.a = 1:Int32) OR (t1.b = 2:Decimal)) OR ((t1.c = 3:Int32) OR (t1.p = 4:Int32))) OR (t1.a = 5:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 }
└─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t1._row_id) }
└─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(idx1.t1._row_id) }
Expand All @@ -346,7 +346,7 @@
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: idx4, columns: [idx4.t1._row_id], scan_ranges: [idx4.p = Int32(4)], distribution: SomeShard }
batch_local_plan: |-
BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((((t1.a = 1:Int32) OR (t1.b = 2:Decimal)) OR (t1.c = 3:Int32)) OR (t1.p = 4:Int32)) OR (t1.a = 5:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 }
BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((((t1.a = 1:Int32) OR (t1.b = 2:Decimal)) OR ((t1.c = 3:Int32) OR (t1.p = 4:Int32))) OR (t1.a = 5:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 }
└─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] }
└─BatchUnion { all: true }
├─BatchExchange { order: [], dist: Single }
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/planner_test/tests/testdata/output/join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@
StreamMaterialize { columns: [x, i.t._row_id(hidden), i.t._row_id#1(hidden), i.x(hidden), i.t._row_id#2(hidden), i.t._row_id#3(hidden), i.x#1(hidden)], stream_key: [i.t._row_id, i.t._row_id#1, i.x, i.t._row_id#2, i.t._row_id#3, i.x#1], pk_columns: [i.t._row_id, i.t._row_id#1, i.x, i.t._row_id#2, i.t._row_id#3, i.x#1], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x) }
└─StreamProject { exprs: [Coalesce(i.x, i.x) as $expr1, i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x] }
└─StreamFilter { predicate: (((((IsNotNull(i.t._row_id) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.x)) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.x)) }
└─StreamFilter { predicate: (((IsNotNull(i.t._row_id) OR IsNotNull(i.t._row_id)) OR (IsNotNull(i.x) OR IsNotNull(i.t._row_id))) OR (IsNotNull(i.t._row_id) OR IsNotNull(i.x))) }
└─StreamHashJoin { type: FullOuter, predicate: i.x = i.x, output: [i.x, i.x, i.t._row_id, i.t._row_id, i.t._row_id, i.t._row_id] }
├─StreamShare { id: 4 }
│ └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] }
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/planner_test/tests/testdata/output/nexmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,20 @@
sql: SELECT auction, price FROM bid WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) }
└─BatchFilter { predicate: ((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR ((bid.auction = 2001:Int32) OR (bid.auction = 2019:Int32))) OR (bid.auction = 2087:Int32)) }
└─BatchScan { table: bid, columns: [bid.auction, bid.price], distribution: SomeShard }
sink_plan: |-
StreamSink { type: append-only, columns: [auction, price, bid._row_id(hidden)] }
└─StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) }
└─StreamFilter { predicate: ((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR ((bid.auction = 2001:Int32) OR (bid.auction = 2019:Int32))) OR (bid.auction = 2087:Int32)) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
stream_plan: |-
StreamMaterialize { columns: [auction, price, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: NoCheck }
└─StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) }
└─StreamFilter { predicate: ((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR ((bid.auction = 2001:Int32) OR (bid.auction = 2019:Int32))) OR (bid.auction = 2087:Int32)) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [auction, price, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] }
└── StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) }
└── StreamFilter { predicate: ((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR ((bid.auction = 2001:Int32) OR (bid.auction = 2019:Int32))) OR (bid.auction = 2087:Int32)) }
└── StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
├── tables: [ StreamScan: 0 ]
├── Upstream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,21 @@
sql: SELECT auction, price FROM bid WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchFilter { predicate: (((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR (auction = 2001:Int32)) OR (auction = 2019:Int32)) OR (auction = 2087:Int32)) }
└─BatchFilter { predicate: ((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR ((auction = 2001:Int32) OR (auction = 2019:Int32))) OR (auction = 2087:Int32)) }
└─BatchProject { exprs: [auction, price] }
└─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
stream_plan: |-
StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [auction, price, _row_id] }
└─StreamFilter { predicate: (((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR (auction = 2001:Int32)) OR (auction = 2019:Int32)) OR (auction = 2087:Int32)) }
└─StreamFilter { predicate: ((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR ((auction = 2001:Int32) OR (auction = 2019:Int32))) OR (auction = 2087:Int32)) }
└─StreamRowIdGen { row_id_index: 7 }
└─StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
├── tables: [ Materialize: 4294967294 ]
└── StreamProject { exprs: [auction, price, _row_id] }
└── StreamFilter { predicate: (((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR (auction = 2001:Int32)) OR (auction = 2019:Int32)) OR (auction = 2087:Int32)) }
└── StreamFilter { predicate: ((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR ((auction = 2001:Int32) OR (auction = 2019:Int32))) OR (auction = 2087:Int32)) }
└── StreamRowIdGen { row_id_index: 7 }
└── StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] } { tables: [ Source: 0 ] }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [Field(bid, 0:Int32) as $expr3, Field(bid, 2:Int32) as $expr4, _row_id] }
└─StreamDynamicFilter { predicate: ($expr1 > $expr2), output_watermarks: [$expr1], output: [event_type, person, auction, bid, $expr1, _row_id], cleaned_by_watermark: true }
├─StreamFilter { predicate: (((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR (Field(bid, 0:Int32) = 2001:Int32)) OR (Field(bid, 0:Int32) = 2019:Int32)) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
├─StreamFilter { predicate: ((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR ((Field(bid, 0:Int32) = 2001:Int32) OR (Field(bid, 0:Int32) = 2019:Int32))) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
│ └─StreamRowIdGen { row_id_index: 5 }
│ └─StreamProject { exprs: [event_type, person, auction, bid, Proctime as $expr1, _row_id], output_watermarks: [$expr1] }
│ └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] }
Expand All @@ -128,7 +128,7 @@
StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] }
└── StreamProject { exprs: [Field(bid, 0:Int32) as $expr3, Field(bid, 2:Int32) as $expr4, _row_id] }
└── StreamDynamicFilter { predicate: ($expr1 > $expr2), output_watermarks: [$expr1], output: [event_type, person, auction, bid, $expr1, _row_id], cleaned_by_watermark: true } { tables: [ DynamicFilterLeft: 0, DynamicFilterRight: 1 ] }
├── StreamFilter { predicate: (((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR (Field(bid, 0:Int32) = 2001:Int32)) OR (Field(bid, 0:Int32) = 2019:Int32)) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
├── StreamFilter { predicate: ((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR ((Field(bid, 0:Int32) = 2001:Int32) OR (Field(bid, 0:Int32) = 2019:Int32))) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
│ └── StreamRowIdGen { row_id_index: 5 }
│ └── StreamProject { exprs: [event_type, person, auction, bid, Proctime as $expr1, _row_id], output_watermarks: [$expr1] }
│ └── StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] } { tables: [ Source: 2 ] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [Field(bid, 0:Int32) as $expr2, Field(bid, 2:Int32) as $expr3] }
└─BatchFilter { predicate: (((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR (Field(bid, 0:Int32) = 2001:Int32)) OR (Field(bid, 0:Int32) = 2019:Int32)) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
└─BatchFilter { predicate: ((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR ((Field(bid, 0:Int32) = 2001:Int32) OR (Field(bid, 0:Int32) = 2019:Int32))) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
└─BatchProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id] }
└─BatchSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] }
stream_plan: |-
StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [Field(bid, 0:Int32) as $expr2, Field(bid, 2:Int32) as $expr3, _row_id] }
└─StreamFilter { predicate: (((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR (Field(bid, 0:Int32) = 2001:Int32)) OR (Field(bid, 0:Int32) = 2019:Int32)) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
└─StreamFilter { predicate: ((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR ((Field(bid, 0:Int32) = 2001:Int32) OR (Field(bid, 0:Int32) = 2019:Int32))) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
└─StreamRowIdGen { row_id_index: 5 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
└─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id] }
Expand All @@ -92,7 +92,7 @@
Fragment 0
StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] }
└── StreamProject { exprs: [Field(bid, 0:Int32) as $expr2, Field(bid, 2:Int32) as $expr3, _row_id] }
└── StreamFilter { predicate: (((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR (Field(bid, 0:Int32) = 2001:Int32)) OR (Field(bid, 0:Int32) = 2019:Int32)) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
└── StreamFilter { predicate: ((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR ((Field(bid, 0:Int32) = 2001:Int32) OR (Field(bid, 0:Int32) = 2019:Int32))) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) }
└── StreamRowIdGen { row_id_index: 5 }
└── StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] } { tables: [ WatermarkFilter: 0 ] }
└── StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id] }
Expand Down
Loading
Loading