Skip to content

Commit

Permalink
remove old methods of SourceStreamChunkBuilder
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Dec 17, 2024
1 parent d669d94 commit e10c786
Show file tree
Hide file tree
Showing 14 changed files with 162 additions and 157 deletions.
6 changes: 5 additions & 1 deletion src/connector/benches/debezium_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use json_common::*;
use paste::paste;
use rand::Rng;
use risingwave_connector::parser::{DebeziumParser, SourceStreamChunkBuilder};
use risingwave_connector::source::SourceCtrlOpts;

fn generate_debezium_json_row(rng: &mut impl Rng, change_event: &str) -> String {
let source = r#"{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null}"#;
Expand Down Expand Up @@ -57,7 +58,10 @@ macro_rules! create_debezium_bench_helpers {
|| (block_on(DebeziumParser::new_for_test(get_descs())).unwrap(), records.clone()) ,
| (mut parser, records) | async move {
let mut builder =
SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS);
SourceStreamChunkBuilder::new(get_descs(), SourceCtrlOpts {
chunk_size: NUM_RECORDS,
split_txn: false,
});
for record in records {
let writer = builder.row_writer();
parser.parse_inner(None, record, writer).await.unwrap();
Expand Down
18 changes: 15 additions & 3 deletions src/connector/benches/json_vs_plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use json_common::*;
use old_json_parser::JsonParser;
use risingwave_connector::parser::plain_parser::PlainParser;
use risingwave_connector::parser::{SourceStreamChunkBuilder, SpecificParserConfig};
use risingwave_connector::source::SourceContext;
use risingwave_connector::source::{SourceContext, SourceCtrlOpts};

// The original implementation used to parse JSON prior to #13707.
mod old_json_parser {
Expand Down Expand Up @@ -130,7 +130,13 @@ fn bench_plain_parser_and_json_parser(c: &mut Criterion) {
(parser, records.clone())
},
|(mut parser, records)| async move {
let mut builder = SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS);
let mut builder = SourceStreamChunkBuilder::new(
get_descs(),
SourceCtrlOpts {
chunk_size: NUM_RECORDS,
split_txn: false,
},
);
for record in records {
let writer = builder.row_writer();
parser
Expand All @@ -155,7 +161,13 @@ fn bench_plain_parser_and_json_parser(c: &mut Criterion) {
(parser, records.clone())
},
|(parser, records)| async move {
let mut builder = SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS);
let mut builder = SourceStreamChunkBuilder::new(
get_descs(),
SourceCtrlOpts {
chunk_size: NUM_RECORDS,
split_txn: false,
},
);
for record in records {
let writer = builder.row_writer();
parser.parse_inner(record, writer).await.unwrap();
Expand Down
8 changes: 5 additions & 3 deletions src/connector/src/parser/bytes_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl BytesAccessBuilder {

#[cfg(test)]
mod tests {
use itertools::Itertools;
use risingwave_common::array::Op;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum};
Expand All @@ -54,7 +55,7 @@ mod tests {
BytesProperties, EncodingProperties, ProtocolProperties, SourceColumnDesc,
SourceStreamChunkBuilder, SpecificParserConfig,
};
use crate::source::SourceContext;
use crate::source::{SourceContext, SourceCtrlOpts};

fn get_payload() -> Vec<Vec<u8>> {
vec![br#"t"#.to_vec(), br#"random"#.to_vec()]
Expand All @@ -70,7 +71,7 @@ mod tests {
.await
.unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);
let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());

for payload in get_payload() {
let writer = builder.row_writer();
Expand All @@ -80,7 +81,8 @@ mod tests {
.unwrap();
}

let chunk = builder.finish();
builder.finish_current_chunk();
let chunk = builder.consume_ready_chunks().next().unwrap();
let mut rows = chunk.rows();
{
let (op, row) = rows.next().unwrap();
Expand Down
34 changes: 22 additions & 12 deletions src/connector/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ mod tests {

use super::*;
use crate::parser::SourceStreamChunkBuilder;
use crate::source::SourceCtrlOpts;

#[tokio::test]
async fn test_data_types() {
Expand All @@ -162,12 +163,15 @@ mod tests {
)
.unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1);
let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());

let writer = builder.row_writer();
parser.parse_inner(payload.to_vec(), writer).await.unwrap();
parser
.parse_inner(payload.to_vec(), builder.row_writer())
.await
.unwrap();

let chunk = builder.finish();
builder.finish_current_chunk();
let chunk = builder.consume_ready_chunks().next().unwrap();
let (op, row) = chunk.rows().next().unwrap();
assert_eq!(op, Op::Insert);
assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
Expand Down Expand Up @@ -233,12 +237,15 @@ mod tests {
)
.unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);
let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());

let writer = builder.row_writer();
parser.parse_inner(payload.to_vec(), writer).await.unwrap();
parser
.parse_inner(payload.to_vec(), builder.row_writer())
.await
.unwrap();

let chunk = builder.finish();
builder.finish_current_chunk();
let chunk = builder.consume_ready_chunks().next().unwrap();

let mut rows = chunk.rows();

Expand Down Expand Up @@ -287,12 +294,15 @@ mod tests {
)
.unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);
let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());

let writer = builder.row_writer();
parser.parse_inner(payload.to_vec(), writer).await.unwrap();
parser
.parse_inner(payload.to_vec(), builder.row_writer())
.await
.unwrap();

let chunk = builder.finish();
builder.finish_current_chunk();
let chunk = builder.consume_ready_chunks().next().unwrap();

let mut rows = chunk.rows();

Expand Down
56 changes: 1 addition & 55 deletions src/connector/src/parser/chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,6 @@ pub struct SourceStreamChunkBuilder {
}

impl SourceStreamChunkBuilder {
// TODO(): remove
pub fn with_capacity(column_descs: Vec<SourceColumnDesc>, cap: usize) -> Self {
let builders = column_descs
.iter()
.map(|desc| desc.data_type.create_array_builder(cap))
.collect();

Self {
column_descs,
source_ctrl_ops: SourceCtrlOpts {
chunk_size: 256,
split_txn: false,
},
builders,
op_builder: Vec::with_capacity(cap),
vis_builder: BitmapBuilder::with_capacity(cap),
ongoing_txn: None,
ready_chunks: SmallVec::new(),
}
}

pub fn new(column_descs: Vec<SourceColumnDesc>, source_ctrl_ops: SourceCtrlOpts) -> Self {
let (builders, op_builder, vis_builder) =
Self::create_builders(&column_descs, source_ctrl_ops.chunk_size);
Expand Down Expand Up @@ -202,47 +181,14 @@ impl SourceStreamChunkBuilder {
}

/// Consumes and returns the ready [`StreamChunk`]s.
pub fn consume_ready_chunks(&mut self) -> impl Iterator<Item = StreamChunk> + '_ {
pub fn consume_ready_chunks(&mut self) -> impl ExactSizeIterator<Item = StreamChunk> + '_ {
self.ready_chunks.drain(..)
}

// TODO(): remove
/// Consumes the builder and returns a [`StreamChunk`].
pub fn finish(self) -> StreamChunk {
StreamChunk::with_visibility(
self.op_builder,
self.builders
.into_iter()
.map(|builder| builder.finish().into())
.collect(),
self.vis_builder.finish(),
)
}

// TODO(): remove
/// Resets the builder and returns a [`StreamChunk`], while reserving `next_cap` capacity for
/// the builders of the next [`StreamChunk`].
#[must_use]
pub fn take_and_reserve(&mut self, next_cap: usize) -> StreamChunk {
let descs = std::mem::take(&mut self.column_descs); // we don't use `descs` in `finish`
let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap));
builder.finish()
}

// TODO(): remove
pub fn len(&self) -> usize {
self.op_builder.len()
}

fn current_chunk_len(&self) -> usize {
self.op_builder.len()
}

// TODO(): remove
pub fn is_empty(&self) -> bool {
self.op_builder.is_empty()
}

/// Commit a newly-written record by appending `op` and `vis` to the corresponding builders.
/// This is supposed to be called via the `row_writer` only.
fn commit_record(&mut self, op: Op, vis: bool) {
Expand Down
13 changes: 9 additions & 4 deletions src/connector/src/parser/csv_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,15 @@ impl ByteStreamSourceParser for CsvParser {

#[cfg(test)]
mod tests {
use itertools::Itertools;
use risingwave_common::array::Op;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, ToOwnedDatum};

use super::*;
use crate::parser::SourceStreamChunkBuilder;
use crate::source::SourceCtrlOpts;

#[tokio::test]
async fn test_csv_without_headers() {
let data = vec![
Expand All @@ -204,14 +207,15 @@ mod tests {
SourceContext::dummy().into(),
)
.unwrap();
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4);
let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
for item in data {
parser
.parse_inner(item.as_bytes().to_vec(), builder.row_writer())
.await
.unwrap();
}
let chunk = builder.finish();
builder.finish_current_chunk();
let chunk = builder.consume_ready_chunks().next().unwrap();
let mut rows = chunk.rows();
{
let (op, row) = rows.next().unwrap();
Expand Down Expand Up @@ -311,13 +315,14 @@ mod tests {
SourceContext::dummy().into(),
)
.unwrap();
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4);
let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
for item in data {
let _ = parser
.parse_inner(item.as_bytes().to_vec(), builder.row_writer())
.await;
}
let chunk = builder.finish();
builder.finish_current_chunk();
let chunk = builder.consume_ready_chunks().next().unwrap();
let mut rows = chunk.rows();
{
let (op, row) = rows.next().unwrap();
Expand Down
18 changes: 8 additions & 10 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ mod tests {

use super::*;
use crate::parser::{DebeziumParser, SourceStreamChunkBuilder, SpecificParserConfig};
use crate::source::{SourceColumnDesc, SourceContext};
use crate::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts};
use crate::WithOptionsSecResolved;

const DEBEZIUM_AVRO_DATA: &[u8] = b"\x00\x00\x00\x00\x06\x00\x02\xd2\x0f\x0a\x53\x61\x6c\x6c\x79\x0c\x54\x68\x6f\x6d\x61\x73\x2a\x73\x61\x6c\x6c\x79\x2e\x74\x68\x6f\x6d\x61\x73\x40\x61\x63\x6d\x65\x2e\x63\x6f\x6d\x16\x32\x2e\x31\x2e\x32\x2e\x46\x69\x6e\x61\x6c\x0a\x6d\x79\x73\x71\x6c\x12\x64\x62\x73\x65\x72\x76\x65\x72\x31\xc0\xb4\xe8\xb7\xc9\x61\x00\x30\x66\x69\x72\x73\x74\x5f\x69\x6e\x5f\x64\x61\x74\x61\x5f\x63\x6f\x6c\x6c\x65\x63\x74\x69\x6f\x6e\x12\x69\x6e\x76\x65\x6e\x74\x6f\x72\x79\x00\x02\x12\x63\x75\x73\x74\x6f\x6d\x65\x72\x73\x00\x00\x20\x6d\x79\x73\x71\x6c\x2d\x62\x69\x6e\x2e\x30\x30\x30\x30\x30\x33\x8c\x06\x00\x00\x00\x02\x72\x02\x92\xc3\xe8\xb7\xc9\x61\x00";
Expand All @@ -225,15 +225,13 @@ mod tests {
columns: Vec<SourceColumnDesc>,
payload: Vec<u8>,
) -> Vec<(Op, OwnedRow)> {
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2);
{
let writer = builder.row_writer();
parser
.parse_inner(None, Some(payload), writer)
.await
.unwrap();
}
let chunk = builder.finish();
let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
parser
.parse_inner(None, Some(payload), builder.row_writer())
.await
.unwrap();
builder.finish_current_chunk();
let chunk = builder.consume_ready_chunks().next().unwrap();
chunk
.rows()
.map(|(op, row_ref)| (op, row_ref.into_owned_row()))
Expand Down
14 changes: 8 additions & 6 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ mod tests {
use std::ops::Deref;
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId, CDC_SOURCE_COLUMN_NUM};
use risingwave_common::row::Row;
use risingwave_common::types::Timestamptz;
Expand All @@ -217,7 +218,7 @@ mod tests {

use super::*;
use crate::parser::{JsonProperties, SourceStreamChunkBuilder, TransactionControl};
use crate::source::{ConnectorProperties, DataType};
use crate::source::{ConnectorProperties, DataType, SourceCtrlOpts};

#[tokio::test]
async fn test_parse_transaction_metadata() {
Expand Down Expand Up @@ -249,7 +250,7 @@ mod tests {
let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
.await
.unwrap();
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0);
let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());

// "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN
let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
Expand All @@ -258,7 +259,7 @@ mod tests {
.parse_one_with_txn(
None,
Some(begin_msg.as_bytes().to_vec()),
builder.row_writer(),
dummy_builder.row_writer(),
)
.await;
match res {
Expand All @@ -271,7 +272,7 @@ mod tests {
.parse_one_with_txn(
None,
Some(commit_msg.as_bytes().to_vec()),
builder.row_writer(),
dummy_builder.row_writer(),
)
.await;
match res {
Expand Down Expand Up @@ -321,7 +322,7 @@ mod tests {
let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
.await
.unwrap();
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0);
let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());

let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "c", "ts_ms": 1695277757017, "transaction": null } }"#;

Expand All @@ -334,7 +335,8 @@ mod tests {
.await;
match res {
Ok(ParseResult::Rows) => {
let chunk = builder.finish();
builder.finish_current_chunk();
let chunk = builder.consume_ready_chunks().next().unwrap();
for (_, row) in chunk.rows() {
let commit_ts = row.datum_at(5).unwrap().into_timestamptz();
assert_eq!(commit_ts, Timestamptz::from_millis(1695277757000).unwrap());
Expand Down
Loading

0 comments on commit e10c786

Please sign in to comment.