Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Dec 24, 2024
1 parent b6c29b7 commit d0ef4be
Showing 1 changed file with 81 additions and 41 deletions.
122 changes: 81 additions & 41 deletions src/connector/src/sink/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@

use std::collections::{BTreeMap, HashSet};

use anyhow::{anyhow, Context};
use anyhow::anyhow;
use async_trait::async_trait;
use itertools::Itertools;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bail;
use risingwave_common::catalog::Schema;
use risingwave_common::row::{Row, RowExt};
use risingwave_common::row::Row;
use serde_derive::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use simd_json::prelude::ArrayTrait;
Expand Down Expand Up @@ -232,28 +232,27 @@ impl Sink for PostgresSink {
}
}

type ParameterColumnLength = usize;
type ParameterRowLength = usize;

struct ParameterBuffer<'a> {
/// A set of parameters to be inserted/deleted.
/// Each set is a flattened 2d-array.
parameters: Vec<Vec<Option<ScalarAdapter>>>,
/// the column dimension (fixed).
column_length: usize,
/// schema types to serialize into ScalarAdapter
/// schema types to serialize into `ScalarAdapter`
schema_types: &'a [PgType],
/// estimated number of parameters that can be sent in a single query.
estimated_parameter_size: usize,
/// current parameter buffer to be filled.
current_parameter_buffer: Vec<Option<ScalarAdapter>>,
}

impl ParameterBuffer {

impl<'a> ParameterBuffer<'a> {
/// The maximum number of parameters that can be sent in a single query.
/// See: <https://www.postgresql.org/docs/current/limits.html>
const MAX_PARAMETERS: usize = 65535;
/// and <https://github.com/sfackler/rust-postgres/issues/356>
const MAX_PARAMETERS: usize = 32768;

fn new(schema_types: &[PgType], chunk_size: usize) -> Self {
fn new(schema_types: &'a [PgType], chunk_size: usize) -> Self {
let estimated_parameter_size = usize::min(Self::MAX_PARAMETERS, chunk_size);
Self {
parameters: vec![],
Expand Down Expand Up @@ -284,7 +283,10 @@ impl ParameterBuffer {
}

fn new_buffer(&mut self) {
let filled_buffer = std::mem::replace(&mut self.current_parameter_buffer, Vec::with_capacity(self.estimated_parameter_size));
let filled_buffer = std::mem::replace(
&mut self.current_parameter_buffer,
Vec::with_capacity(self.estimated_parameter_size),
);
self.parameters.push(filled_buffer);
}

Expand Down Expand Up @@ -388,17 +390,29 @@ impl PostgresSinkWriter {
}
}
}
let (parameters, current_parameter_buffer) = parameter_buffer.into_parts();
self.execute_parameter(&mut transaction, parameters, Op::Insert, current_parameter_buffer).await?;
let (parameters, remaining) = parameter_buffer.into_parts();
Self::execute_parameter(
Op::Insert,
&mut transaction,
&self.schema,
&self.config.table,
&self.pk_indices,
parameters,
remaining,
)
.await?;
transaction.commit().await?;

Ok(())
}

async fn write_batch_non_append_only(&mut self, chunk: StreamChunk) -> Result<()> {
let mut transaction = self.client.transaction().await?;
// 1d flattened array of parameters to be inserted.
let mut insert_parameter_buffer = ParameterBuffer::new(&self.schema_types, chunk.cardinality());
let mut delete_parameter_buffer = ParameterBuffer::new(&self.schema_types, chunk.cardinality());
let mut insert_parameter_buffer =
ParameterBuffer::new(&self.schema_types, chunk.cardinality());
let mut delete_parameter_buffer =
ParameterBuffer::new(&self.schema_types, chunk.cardinality());
// 1d flattened array of parameters to be deleted.
for (op, row) in chunk.rows() {
match op {
Expand All @@ -415,53 +429,76 @@ impl PostgresSinkWriter {
}

let (delete_parameters, delete_remaining_parameter) = delete_parameter_buffer.into_parts();
self.execute_parameter(&mut transaction, delete_parameters, Op::Delete, delete_remaining_parameter).await?;
Self::execute_parameter(
Op::Delete,
&mut transaction,
&self.schema,
&self.config.table,
&self.pk_indices,
delete_parameters,
delete_remaining_parameter,
)
.await?;
let (insert_parameters, insert_remaining_parameter) = insert_parameter_buffer.into_parts();
self.execute_parameter(&mut transaction, insert_parameters, Op::Insert, insert_remaining_parameter).await?;

Self::execute_parameter(
Op::Insert,
&mut transaction,
&self.schema,
&self.config.table,
&self.pk_indices,
insert_parameters,
insert_remaining_parameter,
)
.await?;
transaction.commit().await?;

Ok(())
}

async fn execute_parameter(
&self,
transaction: &mut tokio_postgres::Transaction,
parameters: Vec<Vec<Option<ScalarAdapter>>>,
op: Op,
transaction: &mut tokio_postgres::Transaction<'_>,
schema: &Schema,
table_name: &str,
pk_indices: &[usize],
parameters: Vec<Vec<Option<ScalarAdapter>>>,
remaining_parameter: Vec<Option<ScalarAdapter>>,
) -> Result<()> {
let column_length = self.schema.fields().len();
let column_length = schema.fields().len();
if !parameters.is_empty() {
let parameter_length = parameters[0].len();
let rows_length = parameter_length / column_length;
assert_eq!(parameter_length % column_length, 0, "flattened parameters are unaligned");
let statement = match op {
Op::Insert => {
create_insert_sql(&self.schema, &self.config.table, rows_length)
}
Op::Delete => {
create_delete_sql(&self.schema, &self.config.table, &self.pk_indices, rows_length)
}
assert_eq!(
parameter_length % column_length,
0,
"flattened parameters are unaligned"
);
let statement = match op {
Op::Insert => create_insert_sql(schema, table_name, rows_length),
Op::Delete => create_delete_sql(schema, table_name, pk_indices, rows_length),
_ => unreachable!(),
};
let statement = transaction.prepare(&statement).await?;
for parameter in parameters {
transaction.query(&statement, &parameter).await.context(format!("error for sql: {}, number_of_parameters: {}", &statement, parameter.len()))?;
transaction.execute_raw(&statement, parameter).await?;
}
}
if !remaining_parameter.is_empty() {
let rows_length = remaining_parameter.len() / column_length;
assert_eq!(remaining_parameter.len() % column_length, 0, "flattened parameters are unaligned");
let statement = match op {
Op::Insert => {
create_insert_sql(&self.schema, &self.config.table, rows_length)
}
Op::Delete => {
create_delete_sql(&self.schema, &self.config.table, &self.pk_indices, rows_length)
}
assert_eq!(
remaining_parameter.len() % column_length,
0,
"flattened parameters are unaligned"
);
let statement = match op {
Op::Insert => create_insert_sql(schema, table_name, rows_length),
Op::Delete => create_delete_sql(schema, table_name, pk_indices, rows_length),
_ => unreachable!(),
};
let statement = transaction.prepare(&statement).await?;
transaction.query(&statement, &remaining_parameter).await.context(format!("error for sql: {}, number_of_parameters: {}", &statement, remaining_parameter.len()))?;
transaction
.execute_raw(&statement, remaining_parameter)
.await?;
}
Ok(())
}
Expand Down Expand Up @@ -591,6 +628,9 @@ mod tests {
]);
let table_name = "test_table";
let sql = create_delete_sql(&schema, table_name, &[1], 3);
check(sql, expect!["DELETE FROM test_table WHERE (b = $2)OR(b = $3)OR(b = $4)"]);
check(
sql,
expect!["DELETE FROM test_table WHERE (b = $2)OR(b = $3)OR(b = $4)"],
);
}
}

0 comments on commit d0ef4be

Please sign in to comment.