Skip to content

Commit

Permalink
feat(connector): structural logging for parsing error (#12659)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Oct 9, 2023
1 parent fbd54d8 commit b4f357c
Show file tree
Hide file tree
Showing 15 changed files with 173 additions and 199 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ tokio-util = { version = "0.7", features = ["codec", "io"] }
tonic = { workspace = true }
tonic_0_9 = { package = "tonic", version = "0.9" }
tracing = "0.1"
tracing-futures = { version = "0.2", features = ["futures-03"] }
url = "2"
urlencoding = "2"

Expand All @@ -140,6 +141,7 @@ criterion = { workspace = true, features = ["async_tokio", "async"] }
prost-types = "0.12"
rand = "0.8"
tempfile = "3"
tracing-test = "0.2"

[build-dependencies]
prost-build = "0.12"
Expand Down
18 changes: 9 additions & 9 deletions src/connector/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use risingwave_common::error::ErrorCode::{self, ProtocolError};
use risingwave_common::error::{Result, RwError};
use simd_json::{BorrowedValue, Mutable, ValueAccess};
Expand Down Expand Up @@ -89,24 +90,23 @@ impl CanalJsonParser {
"'data' is missing for creating event".to_string(),
))
})?;

let mut errors = Vec::new();
let mut guard = None;
for event in events.drain(..) {
let accessor = JsonAccess::new_with_options(event, &JsonParseOptions::CANAL);
match apply_row_operation_on_stream_chunk_writer((op, accessor), &mut writer) {
Ok(this_guard) => guard = Some(this_guard),
Ok(_) => {}
Err(err) => errors.push(err),
}
}
if let Some(guard) = guard {
if !errors.is_empty() {
tracing::error!(?errors, "failed to parse some columns");
}
Ok(guard)

if errors.is_empty() {
Ok(())
} else {
Err(RwError::from(ErrorCode::InternalError(format!(
"failed to parse all columns: {:?}",
errors
"failed to parse {} row(s) in a single canal json message: {}",
errors.len(),
errors.iter().join(", ")
))))
}
}
Expand Down
62 changes: 31 additions & 31 deletions src/connector/src/parser/csv_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;

use anyhow::anyhow;
use risingwave_common::error::ErrorCode::{InternalError, ProtocolError};
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::types::{Date, Datum, Decimal, ScalarImpl, Time, Timestamp, Timestamptz};
use risingwave_common::types::{Date, Decimal, Time, Timestamp, Timestamptz};

use super::unified::{AccessError, AccessResult};
use super::{ByteStreamSourceParser, CsvProperties};
use crate::only_parse_payload;
use crate::parser::SourceStreamChunkRowWriter;
use crate::source::{DataType, SourceColumnDesc, SourceContext, SourceContextRef};

macro_rules! to_rust_type {
macro_rules! parse {
($v:ident, $t:ty) => {
$v.parse::<$t>()
.map_err(|_| anyhow!("failed parse {} from {}", stringify!($t), $v))?
$v.parse::<$t>().map_err(|_| AccessError::TypeError {
expected: stringify!($t).to_owned(),
got: "string".to_owned(),
value: $v.to_string(),
})
};
}

Expand Down Expand Up @@ -74,29 +75,26 @@ impl CsvParser {
}

#[inline]
fn parse_string(dtype: &DataType, v: String) -> Result<Datum> {
fn parse_string(dtype: &DataType, v: String) -> AccessResult {
let v = match dtype {
// mysql use tinyint to represent boolean
DataType::Boolean => ScalarImpl::Bool(to_rust_type!(v, i16) != 0),
DataType::Int16 => ScalarImpl::Int16(to_rust_type!(v, i16)),
DataType::Int32 => ScalarImpl::Int32(to_rust_type!(v, i32)),
DataType::Int64 => ScalarImpl::Int64(to_rust_type!(v, i64)),
DataType::Float32 => ScalarImpl::Float32(to_rust_type!(v, f32).into()),
DataType::Float64 => ScalarImpl::Float64(to_rust_type!(v, f64).into()),
DataType::Boolean => (parse!(v, i16)? != 0).into(),
DataType::Int16 => parse!(v, i16)?.into(),
DataType::Int32 => parse!(v, i32)?.into(),
DataType::Int64 => parse!(v, i64)?.into(),
DataType::Float32 => parse!(v, f32)?.into(),
DataType::Float64 => parse!(v, f64)?.into(),
// FIXME: decimal should have more precision than f64
DataType::Decimal => Decimal::from_str(v.as_str())
.map_err(|_| anyhow!("parse decimal from string err {}", v))?
.into(),
DataType::Decimal => parse!(v, Decimal)?.into(),
DataType::Varchar => v.into(),
DataType::Date => ScalarImpl::Date(to_rust_type!(v, Date)),
DataType::Time => ScalarImpl::Time(to_rust_type!(v, Time)),
DataType::Timestamp => ScalarImpl::Timestamp(to_rust_type!(v, Timestamp)),
DataType::Timestamptz => ScalarImpl::Timestamptz(to_rust_type!(v, Timestamptz)),
DataType::Date => parse!(v, Date)?.into(),
DataType::Time => parse!(v, Time)?.into(),
DataType::Timestamp => parse!(v, Timestamp)?.into(),
DataType::Timestamptz => parse!(v, Timestamptz)?.into(),
_ => {
return Err(RwError::from(InternalError(format!(
"CSV data source not support type {}",
dtype
))))
return Err(AccessError::UnsupportedType {
ty: dtype.to_string(),
})
}
};
Ok(Some(v))
Expand All @@ -109,12 +107,12 @@ impl CsvParser {
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<()> {
let mut fields = self.read_row(&payload)?;

if let Some(headers) = &mut self.headers {
if headers.is_empty() {
*headers = fields;
// Here we want a row, but got nothing. So it's an error for the `parse_inner` but
// has no bad impact on the system.
return Err(RwError::from(ProtocolError("This message indicates a header, no row will be inserted. However, internal parser state was updated.".to_string())));
// The header row does not output a row, so we return early.
return Ok(());
}
writer.insert(|desc| {
if let Some(i) = headers.iter().position(|name| name == &desc.name) {
Expand All @@ -126,7 +124,7 @@ impl CsvParser {
} else {
Ok(None)
}
})
})?;
} else {
fields.reverse();
writer.insert(|desc| {
Expand All @@ -138,8 +136,10 @@ impl CsvParser {
} else {
Ok(None)
}
})
})?;
}

Ok(())
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl DebeziumParser {
if let Ok(transaction_control) = row_op.transaction_control() {
Ok(ParseResult::TransactionControl(transaction_control))
} else {
Err(err)
Err(err)?
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/debezium/mongo_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl DebeziumMongoJsonParser {

let row_op = DebeziumChangeEvent::with_value(MongoProjection::new(accessor));

apply_row_operation_on_stream_chunk_writer(row_op, &mut writer)
apply_row_operation_on_stream_chunk_writer(row_op, &mut writer).map_err(Into::into)
}
}

Expand Down
22 changes: 10 additions & 12 deletions src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,9 @@ mod tests {
assert_json_eq(&row[12], "{\"k1\": \"v1_updated\", \"k2\": 33}");
}

#[cfg(not(madsim))] // Traced test does not work with madsim
#[tokio::test]
#[tracing_test::traced_test]
async fn test2_debezium_json_parser_overflow() {
let columns = vec![
SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
Expand Down Expand Up @@ -478,23 +480,19 @@ mod tests {
r#"{{"payload":{{"before":null,"after":{{"O_KEY":{},"O_BOOL":{},"O_TINY":{},"O_INT":{},"O_REAL":{},"O_DOUBLE":{}}},"source":{{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null}},"op":"c","ts_ms":1678158055464,"transaction":null}}}}"#,
values[0], values[1], values[2], values[3], values[4], values[5]
).as_bytes().to_vec();
let e = parser

let res = parser
.parse_inner(None, Some(data), builder.row_writer())
.await
.unwrap_err();
println!("{}", e);
.await;
if i < 5 {
// For other overflow, the parsing succeeds but the type conversion fails
assert!(
e.to_string().contains("AccessError: TypeError"),
"i={i}, actual error: {e}"
);
// The errors are ignored and logged.
res.unwrap();
assert!(logs_contain("Expected type"), "{i}");
} else {
// For f64 overflow, the parsing fails
assert!(
e.to_string().contains("InvalidNumber"),
"i={i}, actual error: {e}"
);
let e = res.unwrap_err();
assert!(e.to_string().contains("InvalidNumber"), "{i}: {e}");
}
}
}
Expand Down
21 changes: 10 additions & 11 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;

use apache_avro::Schema;
use itertools::{Either, Itertools};
use jst::{convert_avro, Context};
use risingwave_common::error::ErrorCode::{self, InternalError, ProtocolError};
use risingwave_common::error::{Result, RwError};
Expand Down Expand Up @@ -111,29 +112,27 @@ impl JsonParser {
let value = simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..])
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
let values = if let simd_json::BorrowedValue::Array(arr) = value {
arr
Either::Left(arr.into_iter())
} else {
vec![value]
Either::Right(std::iter::once(value))
};

let mut errors = Vec::new();
let mut guard = None;
for value in values {
let accessor = JsonAccess::new(value);
match apply_row_accessor_on_stream_chunk_writer(accessor, &mut writer) {
Ok(this_guard) => guard = Some(this_guard),
Ok(_) => {}
Err(err) => errors.push(err),
}
}

if let Some(guard) = guard {
if !errors.is_empty() {
tracing::error!(?errors, "failed to parse some columns");
}
Ok(guard)
if errors.is_empty() {
Ok(())
} else {
Err(RwError::from(ErrorCode::InternalError(format!(
"failed to parse all columns: {:?}",
errors
"failed to parse {} row(s) in a single json message: {}",
errors.len(),
errors.iter().join(", ")
))))
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/maxwell/maxwell_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl MaxwellParser {
let payload_accessor = self.payload_builder.generate_accessor(payload).await?;
let row_op = MaxwellChangeEvent::new(payload_accessor);

apply_row_operation_on_stream_chunk_writer(row_op, &mut writer)
apply_row_operation_on_stream_chunk_writer(row_op, &mut writer).map_err(Into::into)
}
}

Expand Down
Loading

0 comments on commit b4f357c

Please sign in to comment.