Skip to content

Commit

Permalink
AVRO-4031: [Rust] builder header (#3096)
Browse files Browse the repository at this point in the history
* AVRO-4031: [Rust] allow setting has_header in builder

* AVRO-4031: [Rust] add test for appending with multiple writers

* AVRO-4031: [Rust] Prefix the new IT test name with `avro_4031_`

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>

---------

Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Co-authored-by: Martin Tzvetanov Grigorov <[email protected]>
  • Loading branch information
MarcoLugo and martin-g authored Aug 16, 2024
1 parent 36a53f4 commit 14361c0
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 16 deletions.
6 changes: 3 additions & 3 deletions lang/rust/avro/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct Writer<'a, W> {
num_values: usize,
#[builder(default = generate_sync_marker())]
marker: [u8; 16],
#[builder(default = false, setter(skip))]
#[builder(default = false)]
has_header: bool,
#[builder(default)]
user_metadata: HashMap<String, Value>,
Expand Down Expand Up @@ -114,8 +114,8 @@ impl<'a, W: Write> Writer<'a, W> {
.writer(writer)
.codec(codec)
.marker(marker)
.has_header(true)
.build();
w.has_header = true;
w.resolved_schema = ResolvedSchema::try_from(schema).ok();
w
}
Expand All @@ -134,8 +134,8 @@ impl<'a, W: Write> Writer<'a, W> {
.writer(writer)
.codec(codec)
.marker(marker)
.has_header(true)
.build();
w.has_header = true;
w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
w
}
Expand Down
48 changes: 35 additions & 13 deletions lang/rust/avro/tests/append_to_existing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,17 @@ use apache_avro::{
};
use apache_avro_test_helper::TestResult;

const SCHEMA: &str = r#"{
"type": "record",
"name": "append_to_existing_file",
"fields": [
{"name": "a", "type": "int"}
]
}"#;

#[test]
fn avro_3630_append_to_an_existing_file() -> TestResult {
let schema_str = r#"
{
"type": "record",
"name": "append_to_existing_file",
"fields": [
{"name": "a", "type": "int"}
]
}
"#;

let schema = Schema::parse_str(schema_str).expect("Cannot parse the schema");
let schema = Schema::parse_str(SCHEMA).expect("Cannot parse the schema");

let bytes = get_avro_bytes(&schema);

Expand All @@ -51,13 +49,37 @@ fn avro_3630_append_to_an_existing_file() -> TestResult {
let reader = Reader::new(&*new_bytes).expect("Cannot read the new bytes");
let mut i = 1;
for value in reader {
check(value, i);
check(&value, i);
i += 1
}

Ok(())
}

#[test]
fn avro_4031_append_to_file_using_multiple_writers() -> TestResult {
let schema = Schema::parse_str(SCHEMA).expect("Cannot parse the schema");

let mut first_writer = Writer::builder().schema(&schema).writer(Vec::new()).build();
first_writer.append(create_datum(&schema, -42))?;
let mut resulting_bytes = first_writer.into_inner()?;
let first_marker = read_marker(&resulting_bytes);

let mut second_writer = Writer::builder()
.schema(&schema)
.has_header(true)
.marker(first_marker)
.writer(Vec::new())
.build();
second_writer.append(create_datum(&schema, 42))?;
resulting_bytes.append(&mut second_writer.into_inner()?);

let values: Vec<_> = Reader::new(&resulting_bytes[..])?.collect();
check(&values[0], -42);
check(&values[1], 42);
Ok(())
}

/// Simulates reading from a pre-existing .avro file and returns its bytes
fn get_avro_bytes(schema: &Schema) -> Vec<u8> {
let mut writer = Writer::new(schema, Vec::new());
Expand All @@ -75,7 +97,7 @@ fn create_datum(schema: &Schema, value: i32) -> Record {
}

/// Checks the read values
fn check(value: AvroResult<Value>, expected: i32) {
fn check(value: &AvroResult<Value>, expected: i32) {
match value {
Ok(value) => match value {
Value::Record(fields) => match &fields[0] {
Expand Down

0 comments on commit 14361c0

Please sign in to comment.