From d0ef4be5ad7f976b42e2386a9c8c54e9558dc88f Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 6 Dec 2024 14:34:06 +0800 Subject: [PATCH] fmt --- src/connector/src/sink/postgres.rs | 122 +++++++++++++++++++---------- 1 file changed, 81 insertions(+), 41 deletions(-) diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index 2bd40edfd585e..56a286303cf38 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -14,13 +14,13 @@ use std::collections::{BTreeMap, HashSet}; -use anyhow::{anyhow, Context}; +use anyhow::anyhow; use async_trait::async_trait; use itertools::Itertools; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bail; use risingwave_common::catalog::Schema; -use risingwave_common::row::{Row, RowExt}; +use risingwave_common::row::Row; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use simd_json::prelude::ArrayTrait; @@ -232,28 +232,27 @@ impl Sink for PostgresSink { } } -type ParameterColumnLength = usize; -type ParameterRowLength = usize; - struct ParameterBuffer<'a> { /// A set of parameters to be inserted/deleted. /// Each set is a flattened 2d-array. parameters: Vec>>, /// the column dimension (fixed). column_length: usize, - /// schema types to serialize into ScalarAdapter + /// schema types to serialize into `ScalarAdapter` schema_types: &'a [PgType], + /// estimated number of parameters that can be sent in a single query. estimated_parameter_size: usize, + /// current parameter buffer to be filled. current_parameter_buffer: Vec>, } -impl ParameterBuffer { - +impl<'a> ParameterBuffer<'a> { /// The maximum number of parameters that can be sent in a single query. /// See: - const MAX_PARAMETERS: usize = 65535; + /// and + const MAX_PARAMETERS: usize = 32768; - fn new(schema_types: &[PgType], chunk_size: usize) -> Self { + fn new(schema_types: &'a [PgType], chunk_size: usize) -> Self { let estimated_parameter_size = usize::min(Self::MAX_PARAMETERS, chunk_size); Self { parameters: vec![], @@ -284,7 +283,10 @@ impl ParameterBuffer { } fn new_buffer(&mut self) { - let filled_buffer = std::mem::replace(&mut self.current_parameter_buffer, Vec::with_capacity(self.estimated_parameter_size)); + let filled_buffer = std::mem::replace( + &mut self.current_parameter_buffer, + Vec::with_capacity(self.estimated_parameter_size), + ); self.parameters.push(filled_buffer); } @@ -388,17 +390,29 @@ impl PostgresSinkWriter { } } } - let (parameters, current_parameter_buffer) = parameter_buffer.into_parts(); - self.execute_parameter(&mut transaction, parameters, Op::Insert, current_parameter_buffer).await?; + let (parameters, remaining) = parameter_buffer.into_parts(); + Self::execute_parameter( + Op::Insert, + &mut transaction, + &self.schema, + &self.config.table, + &self.pk_indices, + parameters, + remaining, + ) + .await?; transaction.commit().await?; + Ok(()) } async fn write_batch_non_append_only(&mut self, chunk: StreamChunk) -> Result<()> { let mut transaction = self.client.transaction().await?; // 1d flattened array of parameters to be inserted. - let mut insert_parameter_buffer = ParameterBuffer::new(&self.schema_types, chunk.cardinality()); - let mut delete_parameter_buffer = ParameterBuffer::new(&self.schema_types, chunk.cardinality()); + let mut insert_parameter_buffer = + ParameterBuffer::new(&self.schema_types, chunk.cardinality()); + let mut delete_parameter_buffer = + ParameterBuffer::new(&self.schema_types, chunk.cardinality()); // 1d flattened array of parameters to be deleted. for (op, row) in chunk.rows() { match op { @@ -415,53 +429,76 @@ impl PostgresSinkWriter { } let (delete_parameters, delete_remaining_parameter) = delete_parameter_buffer.into_parts(); - self.execute_parameter(&mut transaction, delete_parameters, Op::Delete, delete_remaining_parameter).await?; + Self::execute_parameter( + Op::Delete, + &mut transaction, + &self.schema, + &self.config.table, + &self.pk_indices, + delete_parameters, + delete_remaining_parameter, + ) + .await?; let (insert_parameters, insert_remaining_parameter) = insert_parameter_buffer.into_parts(); - self.execute_parameter(&mut transaction, insert_parameters, Op::Insert, insert_remaining_parameter).await?; - + Self::execute_parameter( + Op::Insert, + &mut transaction, + &self.schema, + &self.config.table, + &self.pk_indices, + insert_parameters, + insert_remaining_parameter, + ) + .await?; transaction.commit().await?; + + Ok(()) } async fn execute_parameter( - &self, - transaction: &mut tokio_postgres::Transaction, - parameters: Vec>>, op: Op, + transaction: &mut tokio_postgres::Transaction<'_>, + schema: &Schema, + table_name: &str, + pk_indices: &[usize], + parameters: Vec>>, remaining_parameter: Vec>, ) -> Result<()> { - let column_length = self.schema.fields().len(); + let column_length = schema.fields().len(); if !parameters.is_empty() { let parameter_length = parameters[0].len(); let rows_length = parameter_length / column_length; - assert_eq!(parameter_length % column_length, 0, "flattened parameters are unaligned"); - let statement = match op { - Op::Insert => { - create_insert_sql(&self.schema, &self.config.table, rows_length) - } - Op::Delete => { - create_delete_sql(&self.schema, &self.config.table, &self.pk_indices, rows_length) - } + assert_eq!( + parameter_length % column_length, + 0, + "flattened parameters are unaligned" + ); + let statement = match op { + Op::Insert => create_insert_sql(schema, table_name, rows_length), + Op::Delete => create_delete_sql(schema, table_name, pk_indices, rows_length), _ => unreachable!(), }; let statement = transaction.prepare(&statement).await?; for parameter in parameters { - transaction.query(&statement, ¶meter).await.context(format!("error for sql: {}, number_of_parameters: {}", &statement, parameter.len()))?; + transaction.execute_raw(&statement, parameter).await?; } } if !remaining_parameter.is_empty() { let rows_length = remaining_parameter.len() / column_length; - assert_eq!(remaining_parameter.len() % column_length, 0, "flattened parameters are unaligned"); - let statement = match op { - Op::Insert => { - create_insert_sql(&self.schema, &self.config.table, rows_length) - } - Op::Delete => { - create_delete_sql(&self.schema, &self.config.table, &self.pk_indices, rows_length) - } + assert_eq!( + remaining_parameter.len() % column_length, + 0, + "flattened parameters are unaligned" + ); + let statement = match op { + Op::Insert => create_insert_sql(schema, table_name, rows_length), + Op::Delete => create_delete_sql(schema, table_name, pk_indices, rows_length), _ => unreachable!(), }; let statement = transaction.prepare(&statement).await?; - transaction.query(&statement, &remaining_parameter).await.context(format!("error for sql: {}, number_of_parameters: {}", &statement, remaining_parameter.len()))?; + transaction + .execute_raw(&statement, remaining_parameter) + .await?; } Ok(()) } @@ -591,6 +628,9 @@ mod tests { ]); let table_name = "test_table"; let sql = create_delete_sql(&schema, table_name, &[1], 3); - check(sql, expect!["DELETE FROM test_table WHERE (b = $2)OR(b = $3)OR(b = $4)"]); + check( + sql, + expect!["DELETE FROM test_table WHERE (b = $2)OR(b = $3)OR(b = $4)"], + ); } }