diff --git a/src/connector/src/parser/chunk_builder.rs b/src/connector/src/parser/chunk_builder.rs index 80a7c38b84450..d8f7037a73143 100644 --- a/src/connector/src/parser/chunk_builder.rs +++ b/src/connector/src/parser/chunk_builder.rs @@ -79,7 +79,7 @@ impl SourceStreamChunkBuilder { /// Resets the builder and returns a [`StreamChunk`], while reserving `next_cap` capacity for /// the builders of the next [`StreamChunk`]. #[must_use] - pub fn take(&mut self, next_cap: usize) -> StreamChunk { + pub fn take_and_reserve(&mut self, next_cap: usize) -> StreamChunk { let descs = std::mem::take(&mut self.descs); // we don't use `descs` in `finish` let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap)); builder.finish() diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index ede6d426a78f3..e6d4d5c5588b1 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -302,7 +302,7 @@ async fn into_chunk_stream_inner( // for the sake of simplicity, let's force emit the partial transaction chunk if *len > 0 { *len = 0; // reset `len` while keeping `id` - yield chunk_builder.take(1); // next chunk will only contain the heartbeat + yield chunk_builder.take_and_reserve(1); // next chunk will only contain the heartbeat } } @@ -323,7 +323,7 @@ async fn into_chunk_stream_inner( offset: &heartbeat_msg.offset, }, )); - yield chunk_builder.take(batch_len); + yield chunk_builder.take_and_reserve(batch_len); continue; } @@ -420,7 +420,7 @@ async fn into_chunk_stream_inner( current_transaction = None; if txn_started_in_last_batch { - yield chunk_builder.take(batch_len - (i + 1)); + yield chunk_builder.take_and_reserve(batch_len - (i + 1)); txn_started_in_last_batch = false; } } @@ -460,12 +460,12 @@ async fn into_chunk_stream_inner( "transaction is larger than {MAX_TRANSACTION_SIZE} rows, force commit" ); *len = 0; // reset `len` while keeping `id` - yield chunk_builder.take(batch_len); // use curr batch len as next capacity, just a hint + yield chunk_builder.take_and_reserve(batch_len); // use curr batch len as next capacity, just a hint } // TODO(rc): we will have better chunk size control later } else if !chunk_builder.is_empty() { // not in transaction, yield the chunk now - yield chunk_builder.take(batch_len); // use curr batch len as next capacity, just a hint + yield chunk_builder.take_and_reserve(batch_len); // use curr batch len as next capacity, just a hint } } } diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index 86a1d3d831b6c..1144fdf00fe4b 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -451,7 +451,7 @@ mod tests { _ => panic!("unexpected parse result: {:?}", res), } - let output = builder.take(10); + let output = builder.take_and_reserve(10); assert_eq!(0, output.cardinality()); }