diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 1d9948081638a..cffbfabbef8fb 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -36,7 +36,7 @@ use crate::schema::schema_registry::{ #[derive(Debug)] pub struct AvroAccessBuilder { schema: Arc, - schema_resolver: Option>, + schema_resolver: AvroHeader, value: Option, } @@ -74,30 +74,43 @@ impl AvroAccessBuilder { ) -> ConnectorResult { // parse payload to avro value // if use confluent schema, get writer schema from confluent schema registry - if let Some(resolver) = &self.schema_resolver { - let (schema_id, mut raw_payload) = extract_schema_id(payload)?; - let writer_schema = resolver.get(schema_id).await?; - return Ok(from_avro_datum( - writer_schema.as_ref(), - &mut raw_payload, - Some(reader_schema), - )?); - } - let schema = reader_schema; - let mut reader = Reader::with_schema(schema, payload)?; - match reader.next() { - Some(Ok(v)) => Ok(v), - Some(Err(e)) => Err(e)?, - None => bail!("avro parse unexpected eof"), + match &self.schema_resolver { + AvroHeader::Confluent(resolver) => { + let (schema_id, mut raw_payload) = extract_schema_id(payload)?; + let writer_schema = resolver.get(schema_id).await?; + Ok(from_avro_datum( + writer_schema.as_ref(), + &mut raw_payload, + Some(reader_schema), + )?) + } + AvroHeader::File => { + let schema = reader_schema; + let mut reader = Reader::with_schema(schema, payload)?; + match reader.next() { + Some(Ok(v)) => Ok(v), + Some(Err(e)) => Err(e)?, + None => bail!("avro parse unexpected eof"), + } + } } } } #[derive(Debug, Clone)] pub struct AvroParserConfig { - pub schema: Arc, - pub key_schema: Option>, - pub schema_resolver: Option>, + schema: Arc, + key_schema: Option>, + schema_resolver: AvroHeader, +} + +#[derive(Debug, Clone)] +enum AvroHeader { + Confluent(Arc), + // Glue(...) + File, + // SingleObject, + // Fixed & None, } impl AvroParserConfig { @@ -138,7 +151,7 @@ impl AvroParserConfig { } else { None }, - schema_resolver: Some(Arc::new(resolver)), + schema_resolver: AvroHeader::Confluent(Arc::new(resolver)), }) } else { if enable_upsert { @@ -151,7 +164,7 @@ impl AvroParserConfig { Ok(Self { schema: Arc::new(schema), key_schema: None, - schema_resolver: None, + schema_resolver: AvroHeader::File, }) } }