Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(stream): concurrently fetch row from storage and refill cache #19629

Draft
wants to merge 28 commits into
base: 12-13-add_bench_with_join_type_and_cache_workload
Choose a base branch
from

Conversation

kwannoel
Copy link
Contributor

@kwannoel kwannoel commented Dec 1, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Currently we read all rows matching a key, encode them, and store them in an in-memory data structure. We add this to the cache. Then we iterate each row from the in-memory data structure, decode them. This means we have to incur IO latency of reading all rows first. We also have to incur the memory cost of buffering all matching records.

Instead of doing this, we can iterate the rows, concurrently refill cache for them. Then we don't have to wait for IO to finish for ALL rows. We also avoid OOM, since while refilling cache, if we notice the number of rows exceeds some threshold, we can stop refilling.

Further, if we tolerate inconsistency, we will revert to the old strategy of reading all records into cache first, so we will preserve that logic.

This is done for all join types in the hash join executor.

Handling degree table matches and updates

Before this PR we:

  1. Read from the match side degree table when doing cache refill (hold an immutable ref)
  2. Update the match side degree table when doing cache refill.

However, in this PR we concurrently refill cache and handle matches. This means that 1&2 happen concurrently, which means we hold an immutable and mutable reference to the underlying match side degree table. This will be rejected by the borrow checker.

To solve this, we read from the match side degree table, keeping all the degrees in-memory. Only after that is complete, we start concurrently doing cache refill of the match side, and also updating degrees of match side. In this way the lifetimes of the mutable and immutable reference to match side degree table won't overlap.

Benchmark results

With a 30,000 record limit for each key, here's the comparison (using in-memory statestore, inner join):

amplification optimized peak memory (bytes) unoptimized peak memory (bytes) optimized runtime (ms) unoptimized runtime (ms)
10K 1,687,682 1,696,824 7.9825 (-13.791%) 10.188
20K 3,260,364 3,279,096 16.432 (-4.3679%) 17.146
30K 4,832,972 4,861,368 25.263 (-4.0490%) 26.360
40K 4,863,221 6,444,552 29.717 (-16.242%) 35.388
100K 4,863,221 15,942,456 56.554 (-38.688%) 92.481
200K 4,863,221 31,769,735 99.493 (-47.094%) 188.06
400K 4,863,221 63,427,368 187.22 (-51.652%) 387.23

You can observe how after 30,000, we don't do cache refill anymore, and so the memory usage becomes capped, and we can avoid OOM.

Do also note that because this is an in-memory statestore, we can see significant runtime improvements. I expect that with block cache enabled, we can also see similar improvements, although may not be as high.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

Copy link
Contributor Author

kwannoel commented Dec 1, 2024

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more

This stack of pull requests is managed by Graphite. Learn more about stacking.

@kwannoel kwannoel changed the title refactor perf(stream): concurrently fetch row from storage and refill cache Dec 1, 2024
@kwannoel kwannoel marked this pull request as ready for review December 1, 2024 07:58
@kwannoel kwannoel marked this pull request as draft December 1, 2024 07:59
@kwannoel kwannoel marked this pull request as ready for review December 2, 2024 01:29
@kwannoel kwannoel marked this pull request as draft December 2, 2024 01:30
@fuyufjh
Copy link
Member

fuyufjh commented Dec 4, 2024

The motivation is similar to #10979 / #12615, but this is much better. Love it!

IIRC, Nexmark has some queries with large join state. May use it as benchmark.

@kwannoel
Copy link
Contributor Author

kwannoel commented Dec 6, 2024

Benchmarking this in RWC does not show decrease in mem utilization. There's probably something wrong with the implementation. Writing a memory benchmark first.

@kwannoel
Copy link
Contributor Author

kwannoel commented Dec 9, 2024

Added benchmark: #19712. Continuing investigation.

@kwannoel kwannoel changed the base branch from main to kwannoel/join-bench December 9, 2024 05:54
@kwannoel
Copy link
Contributor Author

kwannoel commented Dec 9, 2024

Works well. See PR description. Memory utilization plateaus after the threshold set.

@kwannoel kwannoel force-pushed the kwannoel/join-bench branch from efda035 to 5d72e75 Compare December 9, 2024 07:22
@kwannoel kwannoel force-pushed the 11-29-refactor branch 2 times, most recently from 8766e99 to 3f8c1a6 Compare December 9, 2024 11:24
@kwannoel kwannoel changed the base branch from kwannoel/join-bench to graphite-base/19629 December 10, 2024 01:25
@kwannoel kwannoel force-pushed the graphite-base/19629 branch from f5188c8 to bd82fe3 Compare December 10, 2024 06:42
@kwannoel kwannoel changed the base branch from graphite-base/19629 to main December 10, 2024 06:42
@kwannoel
Copy link
Contributor Author

We don't have any issues supporting this for INNER JOIN, however for joins which require a degree table, more consideration is needed. Carving out this optimization just for INNER JOIN is possible, but it will add a new code branch and more complexity.


Currently we:

  1. Read all the degrees and matched rows into memory.
  2. Iterate through them, updating the degree table for the build side.

So the upfront IO cost is: max(read_degree_latency, read_matched_rows_latency). Typically read_matched_rows_latency should be higher, since degree will have less data, only key + degree.

Then the total latency for cache missed will be max(read_degree_latency, read_matched_rows_latency) + cpu latency of processing matched rows.

After this PR:
We concurrently read matched rows and handle them. However, for degrees we can't do so because
we need to iterate over the degrees, and update them at the same time. This means holding an immutable and mutable reference to StateTable at the same time.

Currently I workaround this by reading all the degrees into memory first. The size of the degrees shouldn't be too bad, since we just need to store the degrees.
This means the upfront IO cost will be read_degree_latency. Then we will need to pay IO costs of read_matched_rows_latency in an amortized way, since we concurrently stream and handle each matched row.

Then the total latency for cache missed will be read_degree_latency + cpu AND io latency of processing matched rows.
So it might take longer to do cache refill.

However, given that cache miss is expected to seldom occur, I think this is a reasonable trade-off to make.

Will benchmark this approach against a nexmark query with LEFT OUTER JOIN.

Another approach is to concurrently do point get, and write to the degree state table, since the point get reference to the degree table is short-lived. However, point get does not have prefetch interface yet.


The second issue is tolerating inconsistency. Previously we buffered all matched rows into memory. If there's a mismatch in the number of rows in the degree table rows and the matched table rows, we will compare their pk.

I think it's possible to support this with the point get approach. Alternatively, if tolerate inconsistency is set to true, we can always follow the old approach of refilling cache first.

@kwannoel kwannoel requested review from yuhao-su and st1page December 11, 2024 04:28
@kwannoel
Copy link
Contributor Author

kwannoel commented Dec 11, 2024

Will add a micro-bench for left join, cache hit before proceeding.

@kwannoel kwannoel force-pushed the 12-13-add_bench_with_join_type_and_cache_workload branch from 63656aa to f21aa13 Compare December 27, 2024 06:35
@kwannoel
Copy link
Contributor Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants