-
Notifications
You must be signed in to change notification settings - Fork 593
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(frontend): plan nested loop temporal join (#19201)
- Loading branch information
Showing
12 changed files
with
647 additions
and
174 deletions.
There are no files selected for viewing
96 changes: 96 additions & 0 deletions
96
e2e_test/streaming/temporal_join/append_only/nested_loop.slt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
statement ok | ||
SET RW_IMPLICIT_FLUSH TO true; | ||
|
||
statement ok | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
|
||
statement ok | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
|
||
statement ok | ||
create materialized view v1 as select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2; | ||
|
||
statement ok | ||
create materialized view v2 as select id1, a1, id2, a2 from stream cross join version FOR SYSTEM_TIME AS OF PROCTIME(); | ||
|
||
statement ok | ||
insert into stream values(1, 14, 111); | ||
|
||
statement ok | ||
insert into version values(1, 12, 122); | ||
|
||
statement ok | ||
insert into stream values(2, 13, 133); | ||
|
||
statement ok | ||
delete from version; | ||
|
||
query IIII rowsort | ||
select * from v1; | ||
---- | ||
2 13 1 12 | ||
|
||
query IIII rowsort | ||
select * from v2; | ||
---- | ||
2 13 1 12 | ||
|
||
statement ok | ||
insert into version values(2, 10, 102); | ||
|
||
statement ok | ||
insert into stream values(3, 9, 222); | ||
|
||
query IIII rowsort | ||
select * from v1; | ||
---- | ||
2 13 1 12 | ||
|
||
query IIII rowsort | ||
select * from v2; | ||
---- | ||
2 13 1 12 | ||
3 9 2 10 | ||
|
||
statement ok | ||
drop materialized view v1; | ||
|
||
statement ok | ||
drop materialized view v2; | ||
|
||
statement ok | ||
delete from version where id2 = 2; | ||
|
||
statement ok | ||
insert into version values(4, 10, 104); | ||
|
||
statement ok | ||
create materialized view v1 as select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 | ||
|
||
statement ok | ||
create materialized view v2 as select id1, a1, id2, a2 from stream cross join version FOR SYSTEM_TIME AS OF PROCTIME(); | ||
|
||
query IIII rowsort | ||
select * from v1; | ||
---- | ||
1 14 4 10 | ||
2 13 4 10 | ||
|
||
query IIII rowsort | ||
select * from v2; | ||
---- | ||
1 14 4 10 | ||
2 13 4 10 | ||
3 9 4 10 | ||
|
||
statement ok | ||
drop materialized view v1; | ||
|
||
statement ok | ||
drop materialized view v2; | ||
|
||
statement ok | ||
drop table stream; | ||
|
||
statement ok | ||
drop table version; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
68 changes: 68 additions & 0 deletions
68
src/frontend/planner_test/tests/testdata/input/nested_loop_temporal_join.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
- name: Left join type for temporal join | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int); | ||
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2 and b1 > b2 | ||
expected_outputs: | ||
- batch_error | ||
- stream_error | ||
- name: Inner join type for temporal join | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int); | ||
select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 where a2 < 10; | ||
expected_outputs: | ||
- stream_plan | ||
- name: Cross join for temporal join | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int); | ||
select id1, a1, id2, a2 from stream cross join version FOR SYSTEM_TIME AS OF PROCTIME(); | ||
expected_outputs: | ||
- stream_plan | ||
- name: implicit join with temporal tables | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
select id1, a1, id2, a2 from stream, version FOR SYSTEM_TIME AS OF PROCTIME() where a1 > a2 and a2 < 10; | ||
expected_outputs: | ||
- stream_plan | ||
- name: Temporal join with Aggregation | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
select count(*) from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 where a2 < 10; | ||
expected_outputs: | ||
- stream_plan | ||
- name: Temporal join type test | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int); | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
select id1, a1, id2, a2 from stream right join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2; | ||
expected_outputs: | ||
- stream_error | ||
- name: Temporal join append only test | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int); | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2; | ||
expected_outputs: | ||
- stream_error | ||
- name: multi-way temporal join | ||
sql: | | ||
create table stream(k int, a1 int, b1 int) APPEND ONLY; | ||
create table version1(k int, x1 int, y2 int, primary key (k)); | ||
create table version2(k int, x2 int, y2 int, primary key (k)); | ||
select stream.k, x1, x2, a1, b1 | ||
from stream | ||
join version1 FOR SYSTEM_TIME AS OF PROCTIME() on stream.a1 < version1.x1 | ||
join version2 FOR SYSTEM_TIME AS OF PROCTIME() on stream.b1 > version2.y2 where a1 < 10; | ||
expected_outputs: | ||
- stream_plan | ||
- name: use CTE as temporal join right table. https://github.com/risingwavelabs/risingwave/issues/18703 | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2)); | ||
with version as (select * from version) select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 > id2; | ||
expected_outputs: | ||
- binder_error |
112 changes: 112 additions & 0 deletions
112
src/frontend/planner_test/tests/testdata/output/nested_loop_temporal_join.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. | ||
- name: Left join type for temporal join | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int); | ||
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2 and b1 > b2 | ||
batch_error: |- | ||
Not supported: do not support temporal join for batch queries | ||
HINT: please use temporal join in streaming queries | ||
stream_error: |- | ||
Not supported: Temporal join requires an inner join | ||
HINT: Please use an inner join | ||
- name: Inner join type for temporal join | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int); | ||
select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 where a2 < 10; | ||
stream_plan: |- | ||
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), version._row_id(hidden)], stream_key: [stream._row_id, version._row_id], pk_columns: [stream._row_id, version._row_id], pk_conflict: NoCheck } | ||
└─StreamExchange { dist: HashShard(stream._row_id, version._row_id) } | ||
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.a1 > version.a2) AND (version.a2 < 10:Int32), nested_loop: true, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id, version._row_id] } | ||
├─StreamExchange { dist: Broadcast } | ||
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } | ||
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version._row_id) } | ||
└─StreamTableScan { table: version, columns: [version.id2, version.a2, version._row_id], stream_scan_type: UpstreamOnly, stream_key: [version._row_id], pk: [_row_id], dist: UpstreamHashShard(version._row_id) } | ||
- name: Cross join for temporal join | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int); | ||
select id1, a1, id2, a2 from stream cross join version FOR SYSTEM_TIME AS OF PROCTIME(); | ||
stream_plan: |- | ||
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), version._row_id(hidden)], stream_key: [stream._row_id, version._row_id], pk_columns: [stream._row_id, version._row_id], pk_conflict: NoCheck } | ||
└─StreamExchange { dist: HashShard(stream._row_id, version._row_id) } | ||
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: , nested_loop: true, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id, version._row_id] } | ||
├─StreamExchange { dist: Broadcast } | ||
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } | ||
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version._row_id) } | ||
└─StreamTableScan { table: version, columns: [version.id2, version.a2, version._row_id], stream_scan_type: UpstreamOnly, stream_key: [version._row_id], pk: [_row_id], dist: UpstreamHashShard(version._row_id) } | ||
- name: implicit join with temporal tables | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
select id1, a1, id2, a2 from stream, version FOR SYSTEM_TIME AS OF PROCTIME() where a1 > a2 and a2 < 10; | ||
stream_plan: |- | ||
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id2], pk_columns: [stream._row_id, id2], pk_conflict: NoCheck } | ||
└─StreamExchange { dist: HashShard(version.id2, stream._row_id) } | ||
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.a1 > version.a2) AND (version.a2 < 10:Int32), nested_loop: true, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } | ||
├─StreamExchange { dist: Broadcast } | ||
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } | ||
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } | ||
└─StreamTableScan { table: version, columns: [version.id2, version.a2], stream_scan_type: UpstreamOnly, stream_key: [version.id2], pk: [id2], dist: UpstreamHashShard(version.id2) } | ||
- name: Temporal join with Aggregation | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
select count(*) from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 > a2 where a2 < 10; | ||
stream_plan: |- | ||
StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } | ||
└─StreamProject { exprs: [sum0(count)] } | ||
└─StreamSimpleAgg [append_only] { aggs: [sum0(count), count] } | ||
└─StreamExchange { dist: Single } | ||
└─StreamStatelessSimpleAgg { aggs: [count] } | ||
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.a1 > version.a2) AND (version.a2 < 10:Int32), nested_loop: true, output: [stream._row_id, version.id2] } | ||
├─StreamExchange { dist: Broadcast } | ||
│ └─StreamTableScan { table: stream, columns: [stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } | ||
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } | ||
└─StreamTableScan { table: version, columns: [version.a2, version.id2], stream_scan_type: UpstreamOnly, stream_key: [version.id2], pk: [id2], dist: UpstreamHashShard(version.id2) } | ||
- name: Temporal join type test | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int); | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
select id1, a1, id2, a2 from stream right join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2; | ||
stream_error: |- | ||
Not supported: streaming nested-loop join | ||
HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible. | ||
See also: https://docs.risingwave.com/docs/current/sql-pattern-dynamic-filters/ | ||
- name: Temporal join append only test | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int); | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 < a2; | ||
stream_error: |- | ||
Not supported: Nested-loop Temporal join requires the left hash side to be append only | ||
HINT: Please ensure the left hash side is append only | ||
- name: multi-way temporal join | ||
sql: | | ||
create table stream(k int, a1 int, b1 int) APPEND ONLY; | ||
create table version1(k int, x1 int, y2 int, primary key (k)); | ||
create table version2(k int, x2 int, y2 int, primary key (k)); | ||
select stream.k, x1, x2, a1, b1 | ||
from stream | ||
join version1 FOR SYSTEM_TIME AS OF PROCTIME() on stream.a1 < version1.x1 | ||
join version2 FOR SYSTEM_TIME AS OF PROCTIME() on stream.b1 > version2.y2 where a1 < 10; | ||
stream_plan: |- | ||
StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version1.k(hidden), version2.k(hidden)], stream_key: [stream._row_id, version1.k, version2.k], pk_columns: [stream._row_id, version1.k, version2.k], pk_conflict: NoCheck } | ||
└─StreamExchange { dist: HashShard(stream._row_id, version1.k, version2.k) } | ||
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.b1 > version2.y2), nested_loop: true, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version1.k, version2.k] } | ||
├─StreamExchange { dist: Broadcast } | ||
│ └─StreamTemporalJoin { type: Inner, append_only: true, predicate: AND (stream.a1 < version1.x1), nested_loop: true, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } | ||
│ ├─StreamExchange { dist: Broadcast } | ||
│ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } | ||
│ │ └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } | ||
│ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version1.k) } | ||
│ └─StreamTableScan { table: version1, columns: [version1.x1, version1.k], stream_scan_type: UpstreamOnly, stream_key: [version1.k], pk: [k], dist: UpstreamHashShard(version1.k) } | ||
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version2.k) } | ||
└─StreamTableScan { table: version2, columns: [version2.x2, version2.y2, version2.k], stream_scan_type: UpstreamOnly, stream_key: [version2.k], pk: [k], dist: UpstreamHashShard(version2.k) } | ||
- name: use CTE as temporal join right table. https://github.com/risingwavelabs/risingwave/issues/18703 | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2)); | ||
with version as (select * from version) select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 > id2; | ||
binder_error: 'Bind error: Right table of a temporal join should not be a CTE. It should be a table, index, or materialized view' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.