Skip to content

Commit

Permalink
tiny update
Browse files Browse the repository at this point in the history
  • Loading branch information
xzhseh committed Oct 23, 2023
1 parent de09078 commit 00a95ed
Showing 1 changed file with 14 additions and 1 deletion.
15 changes: 14 additions & 1 deletion src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,9 @@ fn recursive_parse_json(fields: &[Datum], full_name_vec: Option<Vec<String>>) ->
ret.insert(key, recursive_parse_json(v.fields(), None));
}
Some(ScalarImpl::Jsonb(v)) => {
if key.is_empty() {
key = "Jsonb".to_string();
}
ret.insert(key, v.take());
}
r#type => panic!("Not yet support ScalarImpl type: {:?}", r#type),
Expand Down Expand Up @@ -384,6 +387,13 @@ pub fn from_protobuf_value(
}
Value::Message(dyn_msg) => {
if dyn_msg.descriptor().full_name() == "google.protobuf.Any" {
// Sanity check
debug_assert!(
dyn_msg.has_field_by_name("type_url") &&
dyn_msg.has_field_by_name("value"),
"`type_url` & `value` must exist in fields of `dyn_msg`"
);

// The message is of type `Any`
let (type_url, payload) = extract_any_info(dyn_msg);

Expand All @@ -408,7 +418,7 @@ pub fn from_protobuf_value(
let f = msg_desc
.clone()
.fields()
.map(|f| f.name().to_string())
.map(|f| f.full_name().to_string())
.collect::<Vec<String>>();

// Decode the payload based on the `msg_desc`
Expand Down Expand Up @@ -495,6 +505,9 @@ fn protobuf_type_mapping(
.collect::<Result<Vec<_>>>()?;
let field_names = m.fields().map(|f| f.name().to_string()).collect_vec();

// Note that this part is useful for actual parsing
// Since RisingWave will parse message to `ScalarImpl::Jsonb`
// Please do NOT modify it
if field_names.len() == 2
&& field_names.contains(&"value".to_string())
&& field_names.contains(&"type_url".to_string())
Expand Down

0 comments on commit 00a95ed

Please sign in to comment.