Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(cdc source): merge cdc heartbeat chunk builder & data chunk builder #19671

Merged
merged 6 commits into from
Dec 6, 2024

Conversation

stdrc
Copy link
Member

@stdrc stdrc commented Dec 4, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Due to some historical reason, we build data chunk and heartbeat chunk for CDC source with different SourceStreamChunkBuilders, however, it seems that:

  1. If a split is not active, then only possible msg to produce is heartbeat msg.
  2. If a split is active, heartbeat msg should be ignored.

Merging heartbeat chunk builder with data chunk builder seems do no harm.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@stdrc stdrc changed the title merge cdc heartbeat chunk & data chunk refactor(cdc source): merge cdc heartbeat chunk & data chunk Dec 4, 2024
@stdrc stdrc marked this pull request as ready for review December 4, 2024 07:12
@StrikeW
Copy link
Contributor

StrikeW commented Dec 4, 2024

It seems that:
If the table is not active, then only possible msg to produce is heartbeat msg.
If the table is active, heartbeat msg is few comparing to data rows.
Merging these two types of chunks seems do no harm.

The context is a little vague, may I know more the context and background?

continue;
}

if batch.iter().all(|msg| msg.is_empty()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batch.iter().all will short-circuit after seeing the first false, so in normal cases, this check won't cause an additional O(n) cost.

@stdrc stdrc changed the title refactor(cdc source): merge cdc heartbeat chunk & data chunk refactor(cdc source): merge cdc heartbeat chunk builder & data chunk builder Dec 4, 2024
@stdrc
Copy link
Member Author

stdrc commented Dec 4, 2024

It seems that:
If the table is not active, then only possible msg to produce is heartbeat msg.
If the table is active, heartbeat msg is few comparing to data rows.
Merging these two types of chunks seems do no harm.

The context is a little vague, may I know more the context and background?

Just renamed the PR title and changed description to make it more understandable.

@stdrc stdrc force-pushed the rc/merge-hb-and-data-chunk-for-cdc branch from 4001bfe to 4e29427 Compare December 6, 2024 06:03
@stdrc stdrc changed the base branch from main to rc/rearrange-connector-parser December 6, 2024 06:03
Copy link
Contributor

@xiangjinwu xiangjinwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updated implementation does what the description says:

  • If the input batch contains only heartbeat, emit a chunk containing a single row of the last heartbeat msg.
  • If the input batch contains data, emit a chunk skipping heartbeat msgs in it.

Also discussed offline about the rationale:

It is okay that heartbeat and data msgs are interleaved, except that heartbeat msg cannot be the last row of a chunk. The old logic of separating them into 2 chunks (and emitting heartbeat first) was due to a limitation of the old builder API.

With the enhanced builder API it is unnecessary to shift all heartbeats to be beginning. And to avoid heartbeat being the last row, they could be simply ignored when data is present.


/// Check whether the source message is a CDC heartbeat message.
pub fn is_cdc_heartbeat(&self) -> bool {
self.key.is_none() && self.payload.is_none()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The payload of postgres heartbeat message is not none after #19385. If you rely on the assumption that payload.is_none(), you should revisit your code logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original logic on line 302 also relies on this, and it is the only place that appends to heartbeat_builder instead of builder. Do you mean that #19385 may cause the original logic to misplace heartbeat msgs into builder as well?

Copy link
Contributor

@StrikeW StrikeW Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For postgres source, the heartbeat message would not goes into the heartbeat_builder, since every time the heartbeat message will contain the payload now()::varchar as show in #19385, which can be seen as a normal data chunk.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if this now()::varchar row is at the end of a data chunk? Seems in this case the snapshot_done flag won't be update correctly, just like what we saw in the CI failure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, who will handle the now()::varchar row?

Copy link
Contributor

@StrikeW StrikeW Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, who will handle the now()::varchar row?

For postgres source it just same as other data chunk, the schema of cdc source is payload jsonb. I think you don't need to aware these details.

Copy link
Member Author

@stdrc stdrc Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some diving, now I understand this process. Let me try to summarize.

Why we need Debezium heartbeat?

The PostgreSQL instance contains multiple databases and one of them is a high-traffic database. Debezium captures changes in another database that is low-traffic in comparison to the other database. Debezium then cannot confirm the LSN as replication slots work per-database and Debezium is not invoked. As WAL is shared by all databases, the amount used tends to grow until an event is emitted by the database for which Debezium is capturing changes. To overcome this, it is necessary to:

  • Enable periodic heartbeat record generation with the heartbeat.interval.ms connector configuration property.
  • Regularly emit change events from the database for which Debezium is capturing changes.

As explained by Debezium document (quoted above), when Debezium is capturing a low-traffic database A, and at the same time there's another high-traffic database B in the PG instance, Debezium cannot "confirm the LSN", because the low-traffic database may have no traffic and hence won't trigger Debezium. To resolve this problem, we have to tell Debezium to "make some changes in database A periodically" so that the "LSN confirmation" can be triggered. This "periodic" behavior is controlled by heartbeat.interval.ms.

How is a heartbeat message like?

After we set heartbeat.interval.ms, RisingWave will receive heartbeat message from Debezium periodically, which are SourceMessages like this:

SourceMessage {
  key: None,
  payload: None,
  offset: "{\"sourcePartition\":{\"server\":\"RW_CDC_6\"},\"sourceOffset\":{\"lsn\":120699456,\"txId\":1035,\"ts_usec\":1733494134595487},\"isHeartbeat\":true}",
  split_id: "6",
  meta: DebeziumCdc(DebeziumCdcMeta { db_name_prefix_len: 0, full_table_name: "", source_ts_ms: 0, msg_type: Heartbeat })
}

The key and payload are both None, no matter what heartbeat.action.query is (we will come to this in the next section). When seeing a heartbeat message, connector::parser::into_chunk_stream_inner should yield a chunk containing an invisible row with the offset in the message, so that SourceExecutor can then do update_in_place and hence update_offset for the DebeziumCdcSplit metadata.

However, like the document said, the LSN (inside sourceOffset field) will not change because the "LSN confirmation" process cannot be triggered.

How does Debezium "make some changes"?

So now we need Debezium to "make some changes".

A separate process would then periodically update the table by either inserting a new row or repeatedly updating the same row. PostgreSQL then invokes Debezium, which confirms the latest LSN and allows the database to reclaim the WAL space. This task can be automated by means of the heartbeat.action.query connector configuration property.

As the same document said, the operation used to generate changes is configured via heartbeat.action.query. An example for this config given in the document is INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat'), which just inserts a meaningless row to a pre-created meaningless "heartbeat table".

In our case, we don't make changes by inserting into some "heartbeat table", we use pg_logical_emit_message, which "emits a logical decoding message" to "pass generic messages to logical decoding plugins through WAL". This PG function is also mentioned in Debezium document. When this function is called inside database A, a "message event" will be captured by Debezium, then it can do "LSN confirmation".

With this config set to SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar), we receive the following two SourceMessages periodically:

SourceMessage {
  key: None,
  payload: None,
  offset: "{\"sourcePartition\":{\"server\":\"RW_CDC_6\"},\"sourceOffset\":{\"lsn\":125080800,\"txId\":1050,\"ts_usec\":1733496265175398},\"isHeartbeat\":true}",
  split_id: "6",
  meta: DebeziumCdc(DebeziumCdcMeta { db_name_prefix_len: 0, full_table_name: "", source_ts_ms: 0, msg_type: Heartbeat })
}

SourceMessage {
  key: Some([123, 34, 115, 99, 104, 101, 109, 97, 34, 58, 110, 117, 108, 108, 44, 34, 112, 97, 121, 108, 111, 97, 100, 34, 58, 123, 34, 112, 114, 101, 102, 105, 120, 34, 58, 34, 104, 101, 97, 114, 116, 98, 101, 97, 116, 34, 125, 125]),
  payload: Some([123, 34, 115, 99, 104, 101, 109, 97, 34, 58, 110, 117, 108, 108, 44, 34, 112, 97, 121, 108, 111, 97, 100, 34, 58, 123, 34, 111, 112, 34, 58, 34, 109, 34, 44, 34, 116, 115, 95, 109, 115, 34, 58, 49, 55, 51, 51, 52, 57, 54, 50, 55, 54, 48, 50, 56, 44, 34, 115, 111, 117, 114, 99, 101, 34, 58, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 34, 50, 46, 54, 46, 50, 46, 70, 105, 110, 97, 108, 34, 44, 34, 99, 111, 110, 110, 101, 99, 116, 111, 114, 34, 58, 34, 112, 111, 115, 116, 103, 114, 101, 115, 113, 108, 34, 44, 34, 110, 97, 109, 101, 34, 58, 34, 82, 87, 95, 67, 68, 67, 95, 54, 34, 44, 34, 116, 115, 95, 109, 115, 34, 58, 49, 55, 51, 51, 52, 57, 54, 50, 54, 53, 49, 55, 53, 44, 34, 115, 110, 97, 112, 115, 104, 111, 116, 34, 58, 34, 102, 97, 108, 115, 101, 34, 44, 34, 100, 98, 34, 58, 34, 112, 111, 115, 116, 103, 114, 101, 115, 34, 44, 34, 115, 101, 113, 117, 101, 110, 99, 101, 34, 58, 34, 91, 92, 34, 49, 50, 53, 48, 56, 49, 48, 52, 56, 92, 34, 44, 92, 34, 49, 50, 53, 48, 56, 49, 48, 52, 56, 92, 34, 93, 34, 44, 34, 116, 115, 95, 117, 115, 34, 58, 49, 55, 51, 51, 52, 57, 54, 50, 54, 53, 49, 55, 53, 51, 57, 56, 44, 34, 116, 115, 95, 110, 115, 34, 58, 49, 55, 51, 51, 52, 57, 54, 50, 54, 53, 49, 55, 53, 51, 57, 56, 48, 48, 48, 44, 34, 115, 99, 104, 101, 109, 97, 34, 58, 34, 34, 44, 34, 116, 97, 98, 108, 101, 34, 58, 34, 34, 44, 34, 116, 120, 73, 100, 34, 58, 110, 117, 108, 108, 44, 34, 108, 115, 110, 34, 58, 49, 50, 53, 48, 56, 49, 48, 52, 56, 44, 34, 120, 109, 105, 110, 34, 58, 110, 117, 108, 108, 125, 44, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 123, 34, 112, 114, 101, 102, 105, 120, 34, 58, 34, 104, 101, 97, 114, 116, 98, 101, 97, 116, 34, 44, 34, 99, 111, 110, 116, 101, 110, 116, 34, 58, 34, 77, 106, 65, 121, 78, 67, 48, 120, 77, 105, 48, 119, 78, 105, 65, 121, 77, 106, 111, 48, 78, 68, 111, 122, 78, 83, 52, 49, 77, 84, 85, 52, 79, 84, 89, 114, 77, 68, 103, 61, 34, 125, 125, 125]),
  offset: "{\"sourcePartition\":{\"server\":\"RW_CDC_6\"},\"sourceOffset\":{\"lsn_proc\":125081048,\"messageType\":\"MESSAGE\",\"lsn_commit\":125081048,\"lsn\":125081048,\"ts_usec\":1733496265175398},\"isHeartbeat\":false}",
  split_id: "6",
  meta: DebeziumCdc(DebeziumCdcMeta { db_name_prefix_len: 0, full_table_name: "message", source_ts_ms: 1733496265175, msg_type: Data })
}

Note that, we will receive both the empty-payloaded heartbeat message (with isHeartbeat: true), and the "message event" generated by pg_logical_emit_message belonging to the message table. And since Debezium captured a new event (the "message event"), it "confirms LSN", so the LSN got updated.

In our current implementation, the first message will be handled as heartbeat message and emitted as an invisible empty row with "offset", the second will be parsed as a regular row, like:

| + | {"message": {"content": "MjAyNC0xMi0wNiAyMzo0MjowMC44MDAwMDErMDg=", "prefix": "heartbeat"}, "op": "m", "source": {"connector": "postgresql", "db": "postgres", "lsn": 125280216, "name": "RW_CDC_8", "schema": "", "sequence": "[\"125280216\",\"125280216\"]", "snapshot": "false", "table": "", "ts_ms": 1733499710495, "ts_ns": 1733499710495961000, "ts_us": 1733499710495961, "txId": null, "version": "2.6.2.Final", "xmin": null}, "ts_ms": 1733499721323} | {"sourcePartition":{"server":"RW_CDC_8"},"sourceOffset":{"lsn_proc":125280216,"messageType":"MESSAGE","lsn_commit":125280216,"lsn":125280216,"ts_usec":1733499710495961},"isHeartbeat":false} | message |   | 8 | {"sourcePartition":{"server":"RW_CDC_8"},"sourceOffset":{"lsn_proc":125280216,"messageType":"MESSAGE","lsn_commit":125280216,"lsn":125280216,"ts_usec":1733499710495961},"isHeartbeat":false} |

The MjAyNC0xMi0wNiAyMzo0MjowMC44MDAwMDErMDg= is the base64-encoded now()::varchar, and the prefix is heartbeat which we set when calling pg_logical_emit_message. And because the full_table_name is message which does not match any table we want to replicate, this row will be dismissed later (I didn't dive in this part, so sorrect me if I'm wrong.)

Both the offset of heartbeat message and of the second message will take effect when it comes to DebeziumCdcSplit::update_offset.

Now we can conclude that, the value of now()::varchar simply does not matter at all, and we can actually change it to an empty string or any other string to avoid confusion. And, the PR #19385, changing the first argument transactional from false to true, will not significantly change the SourceMessages we mentioned above, instead, it just generates additional transaction BEGIN and COMMIT messages before and after the second SourceMessage.

On PR #19385

For the issue which #19385 wanted to resolve (#16697), I guess maybe a better fix will be to set the flush argument to true when calling pg_logical_emit_message, instead of setting transactional to true.

The PG document says:

The flush parameter (default set to false) controls if the message is immediately flushed to WAL or not. flush has no effect with transactional, as the message's WAL record is flushed along with its transaction.

From my understanding, what we actually want is to immediately flush the emitted message to WAL so that Debezium can capture it.

Base automatically changed from rc/rearrange-connector-parser to main December 6, 2024 08:17
@stdrc stdrc force-pushed the rc/merge-hb-and-data-chunk-for-cdc branch from a8d2c05 to 185defc Compare December 6, 2024 08:28
@stdrc stdrc enabled auto-merge December 6, 2024 17:09
@stdrc stdrc added this pull request to the merge queue Dec 6, 2024
Merged via the queue into main with commit fcac311 Dec 6, 2024
29 of 30 checks passed
@stdrc stdrc deleted the rc/merge-hb-and-data-chunk-for-cdc branch December 6, 2024 17:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants