Skip to content

Commit

Permalink
only emit heartbeat msg when there's no other msg
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Dec 4, 2024
1 parent 33ed0e3 commit 4001bfe
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 15 deletions.
68 changes: 53 additions & 15 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
.map_ok(|_| ParseResult::Rows)
}

fn emit_empty_row<'a>(&'a mut self, mut writer: SourceStreamChunkRowWriter<'a>) {
fn append_empty_row<'a>(&'a mut self, mut writer: SourceStreamChunkRowWriter<'a>) {
_ = writer.do_insert(|_column| Ok(Datum::None));
}
}
Expand Down Expand Up @@ -719,23 +719,60 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
let batch = batch?;
let batch_len = batch.len();

let mut txn_started_in_last_batch = current_transaction.is_some();
if batch_len == 0 {
continue;
}

if batch.iter().all(|msg| msg.is_empty()) {
// There is no data message in the batch, just emit the latest heartbeat message.
// Note that all messages in `batch` should belong to the same split, so we don't
// have to do a split to heartbeats mapping here.

if let Some(Transaction { id, len }) = &mut current_transaction {
// if there's an ongoing transaction, something may be wrong
tracing::warn!(
id,
len,
"got a batch of empty messages during an ongoing transaction"
);
// for the sake of simplicity, let's force emit the partial transaction chunk
if *len > 0 {
*len = 0; // reset `len` while keeping `id`
yield chunk_builder.take(1); // next chunk will only contain the heartbeat
}
}

// According to the invariant we mentioned at the beginning of the `for batch` loop,
// there should be no data of previous batch in `chunk_builder`.
assert!(chunk_builder.is_empty());

let heartbeat_msg = batch.last().unwrap();
tracing::debug!(
offset = heartbeat_msg.offset,
"emitting a heartbeat message"
);
// TODO(rc): should be `chunk_builder.append_heartbeat` instead, which is simpler
parser.append_empty_row(chunk_builder.row_writer().invisible().with_meta(
MessageMeta {
meta: &heartbeat_msg.meta,
split_id: &heartbeat_msg.split_id,
offset: &heartbeat_msg.offset,
},
));
yield chunk_builder.take(batch_len);

continue;
}

// When we reach here, there is at least one data message in the batch. We should ignore all
// heartbeat messages.

let mut txn_started_in_last_batch = current_transaction.is_some();
let process_time_ms = chrono::Utc::now().timestamp_millis();

for (i, msg) in batch.into_iter().enumerate() {
if msg.key.is_none() && msg.payload.is_none() {
tracing::debug!(
offset = msg.offset,
"got a empty message, could be a heartbeat"
);
// Emit an empty invisible row for the heartbeat message.
parser.emit_empty_row(chunk_builder.row_writer().invisible().with_meta(
MessageMeta {
meta: &msg.meta,
split_id: &msg.split_id,
offset: &msg.offset,
},
));
if msg.is_empty() {
// ignore heartbeat messages
continue;
}

Expand Down Expand Up @@ -861,6 +898,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
*len = 0; // reset `len` while keeping `id`
yield chunk_builder.take(batch_len); // use curr batch len as next capacity, just a hint
}
// TODO(rc): we will have better chunk size control later
} else if !chunk_builder.is_empty() {
// not in transaction, yield the chunk now
yield chunk_builder.take(batch_len); // use curr batch len as next capacity, just a hint
Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,11 @@ impl SourceMessage {
meta: SourceMeta::Empty,
}
}

/// Check whether the source message is an empty message.
pub fn is_empty(&self) -> bool {
self.key.is_none() && self.payload.is_none()
}
}

#[derive(Debug, Clone)]
Expand Down

0 comments on commit 4001bfe

Please sign in to comment.