Skip to content

Commit

Permalink
rename take to take_and_reserve
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Dec 6, 2024
1 parent 3244845 commit 185defc
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/connector/src/parser/chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl SourceStreamChunkBuilder {
/// Resets the builder and returns a [`StreamChunk`], while reserving `next_cap` capacity for
/// the builders of the next [`StreamChunk`].
#[must_use]
pub fn take(&mut self, next_cap: usize) -> StreamChunk {
pub fn take_and_reserve(&mut self, next_cap: usize) -> StreamChunk {
let descs = std::mem::take(&mut self.descs); // we don't use `descs` in `finish`
let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap));
builder.finish()
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
// for the sake of simplicity, let's force emit the partial transaction chunk
if *len > 0 {
*len = 0; // reset `len` while keeping `id`
yield chunk_builder.take(1); // next chunk will only contain the heartbeat
yield chunk_builder.take_and_reserve(1); // next chunk will only contain the heartbeat
}
}

Expand All @@ -323,7 +323,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
offset: &heartbeat_msg.offset,
},
));
yield chunk_builder.take(batch_len);
yield chunk_builder.take_and_reserve(batch_len);

continue;
}
Expand Down Expand Up @@ -420,7 +420,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
current_transaction = None;

if txn_started_in_last_batch {
yield chunk_builder.take(batch_len - (i + 1));
yield chunk_builder.take_and_reserve(batch_len - (i + 1));
txn_started_in_last_batch = false;
}
}
Expand Down Expand Up @@ -460,12 +460,12 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
"transaction is larger than {MAX_TRANSACTION_SIZE} rows, force commit"
);
*len = 0; // reset `len` while keeping `id`
yield chunk_builder.take(batch_len); // use curr batch len as next capacity, just a hint
yield chunk_builder.take_and_reserve(batch_len); // use curr batch len as next capacity, just a hint
}
// TODO(rc): we will have better chunk size control later
} else if !chunk_builder.is_empty() {
// not in transaction, yield the chunk now
yield chunk_builder.take(batch_len); // use curr batch len as next capacity, just a hint
yield chunk_builder.take_and_reserve(batch_len); // use curr batch len as next capacity, just a hint
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ mod tests {
_ => panic!("unexpected parse result: {:?}", res),
}

let output = builder.take(10);
let output = builder.take_and_reserve(10);
assert_eq!(0, output.cardinality());
}

Expand Down

0 comments on commit 185defc

Please sign in to comment.