Skip to content

Commit

Permalink
replace bool/Option indicating schema registry with enum
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Apr 26, 2024
1 parent 2d29508 commit a14d06d
Showing 1 changed file with 34 additions and 21 deletions.
55 changes: 34 additions & 21 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::schema::schema_registry::{
#[derive(Debug)]
pub struct AvroAccessBuilder {
schema: Arc<Schema>,
schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
schema_resolver: AvroHeader,
value: Option<Value>,
}

Expand Down Expand Up @@ -74,30 +74,43 @@ impl AvroAccessBuilder {
) -> ConnectorResult<Value> {
// parse payload to avro value
// if use confluent schema, get writer schema from confluent schema registry
if let Some(resolver) = &self.schema_resolver {
let (schema_id, mut raw_payload) = extract_schema_id(payload)?;
let writer_schema = resolver.get(schema_id).await?;
return Ok(from_avro_datum(
writer_schema.as_ref(),
&mut raw_payload,
Some(reader_schema),
)?);
}
let schema = reader_schema;
let mut reader = Reader::with_schema(schema, payload)?;
match reader.next() {
Some(Ok(v)) => Ok(v),
Some(Err(e)) => Err(e)?,
None => bail!("avro parse unexpected eof"),
match &self.schema_resolver {
AvroHeader::Confluent(resolver) => {
let (schema_id, mut raw_payload) = extract_schema_id(payload)?;
let writer_schema = resolver.get(schema_id).await?;
Ok(from_avro_datum(
writer_schema.as_ref(),
&mut raw_payload,
Some(reader_schema),
)?)
}
AvroHeader::File => {
let schema = reader_schema;
let mut reader = Reader::with_schema(schema, payload)?;
match reader.next() {
Some(Ok(v)) => Ok(v),
Some(Err(e)) => Err(e)?,
None => bail!("avro parse unexpected eof"),
}
}
}
}
}

#[derive(Debug, Clone)]
pub struct AvroParserConfig {
pub schema: Arc<Schema>,
pub key_schema: Option<Arc<Schema>>,
pub schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
schema: Arc<Schema>,
key_schema: Option<Arc<Schema>>,
schema_resolver: AvroHeader,
}

#[derive(Debug, Clone)]
enum AvroHeader {
Confluent(Arc<ConfluentSchemaResolver>),
// Glue(...)
File,
// SingleObject,
// Fixed & None,
}

impl AvroParserConfig {
Expand Down Expand Up @@ -138,7 +151,7 @@ impl AvroParserConfig {
} else {
None
},
schema_resolver: Some(Arc::new(resolver)),
schema_resolver: AvroHeader::Confluent(Arc::new(resolver)),
})
} else {
if enable_upsert {
Expand All @@ -151,7 +164,7 @@ impl AvroParserConfig {
Ok(Self {
schema: Arc::new(schema),
key_schema: None,
schema_resolver: None,
schema_resolver: AvroHeader::File,
})
}
}
Expand Down

0 comments on commit a14d06d

Please sign in to comment.