Skip to content

Commit

Permalink
feat: support append data file and add e2e test (#9)
Browse files Browse the repository at this point in the history
* support append data file and add e2e test

* fix typos

* refine append action

* fix cargo sort

* add consistent check for partition value

* generate unique snapshot id

* avoid to set snapshot id for v2

* refine test

* fix unit test

* export ports

* fix None case for parant_snapshot_id

* fix parquect schema check

* refactor append action of transaction

* refine

* refine e2e test

* refine commit uuid

* fix file format field to uppercase in manifest

---------

Co-authored-by: ZENOTME <[email protected]>
  • Loading branch information
ZENOTME and ZENOTME authored Nov 20, 2024
1 parent a717ef9 commit 85076c7
Show file tree
Hide file tree
Showing 16 changed files with 1,136 additions and 46 deletions.
11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
[workspace]
resolver = "2"
members = [
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
"crates/integrations/*",
"crates/test_utils",
"crates/catalog/*",
"crates/e2e_test",
"crates/examples",
"crates/iceberg",
"crates/integrations/*",
"crates/test_utils",
]
exclude = ["bindings/python"]

Expand Down
2 changes: 1 addition & 1 deletion crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,7 @@ mod tests {
.with_schema_id(0)
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::from_iter([
additional_properties: HashMap::from_iter([
("spark.app.id", "local-1646787004168"),
("added-data-files", "1"),
("added-records", "1"),
Expand Down
37 changes: 37 additions & 0 deletions crates/e2e_test/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "iceberg-e2e_test"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
license = { workspace = true }
rust-version = { workspace = true }

[dependencies]
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
futures = { workspace = true }
iceberg = { workspace = true }
iceberg-catalog-rest = { workspace = true }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
log = { workspace = true }
parquet = { workspace = true }
port_scanner = { workspace = true }
tokio = { workspace = true }
74 changes: 74 additions & 0 deletions crates/e2e_test/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
use iceberg_test_utils::docker::DockerCompose;
use iceberg_test_utils::{normalize_test_name, set_up};
use port_scanner::scan_port_addr;
use tokio::time::sleep;

const REST_CATALOG_PORT: u16 = 8181;

pub struct TestFixture {
pub _docker_compose: DockerCompose,
pub rest_catalog: RestCatalog,
}

pub async fn set_test_fixture(func: &str) -> TestFixture {
set_up();
let docker_compose = DockerCompose::new(
normalize_test_name(format!("{}_{func}", module_path!())),
format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
);

// Start docker compose
docker_compose.run();

let rest_catalog_ip = docker_compose.get_container_ip("rest");

let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
loop {
if !scan_port_addr(&read_port) {
log::info!("Waiting for 1s rest catalog to ready...");
sleep(std::time::Duration::from_millis(1000)).await;
} else {
break;
}
}

let container_ip = docker_compose.get_container_ip("minio");
let read_port = format!("{}:{}", container_ip, 9000);

let config = RestCatalogConfig::builder()
.uri(format!("http://{}:{}", rest_catalog_ip, REST_CATALOG_PORT))
.props(HashMap::from([
(S3_ENDPOINT.to_string(), format!("http://{}", read_port)),
(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
(S3_REGION.to_string(), "us-east-1".to_string()),
]))
.build();
let rest_catalog = RestCatalog::new(config);

TestFixture {
_docker_compose: docker_compose,
rest_catalog,
}
}
62 changes: 62 additions & 0 deletions crates/e2e_test/testdata/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

version: '3.8'

services:
rest:
image: tabulario/iceberg-rest:0.10.0
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
- CATALOG_WAREHOUSE=s3://icebergdata/demo
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
depends_on:
- minio
links:
- minio:icebergdata.minio
ports:
- 8181:8181
expose:
- 8181

minio:
image: minio/minio:latest
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
ports:
- 9001:9001
expose:
- 9001
- 9000
command: [ "server", "/data", "--console-address", ":9001" ]

mc:
depends_on:
- minio
image: minio/mc:latest
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc rm -r --force minio/icebergdata; /usr/bin/mc mb minio/icebergdata; /usr/bin/mc policy set public minio/icebergdata; tail -f /dev/null "
178 changes: 178 additions & 0 deletions crates/e2e_test/tests/append_data_file_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Integration tests for rest catalog.
use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
use futures::TryStreamExt;
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::transaction::Transaction;
use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig};
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
use iceberg_e2e_test::set_test_fixture;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::file::properties::WriterProperties;

#[tokio::test]
async fn test_append_data_file() {
let fixture = set_test_fixture("test_create_table").await;

let ns = Namespace::with_properties(
NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
HashMap::from([
("owner".to_string(), "ray".to_string()),
("community".to_string(), "apache".to_string()),
]),
);

fixture
.rest_catalog
.create_namespace(ns.name(), ns.properties().clone())
.await
.unwrap();

let schema = Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![2])
.with_fields(vec![
NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();

let table_creation = TableCreation::builder()
.name("t1".to_string())
.schema(schema.clone())
.build();

let table = fixture
.rest_catalog
.create_table(ns.name(), table_creation)
.await
.unwrap();

// Create the writer and write the data
let schema: Arc<arrow_schema::Schema> = Arc::new(
table
.metadata()
.current_schema()
.as_ref()
.try_into()
.unwrap(),
);
let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
let file_name_generator = DefaultFileNameGenerator::new(
"test".to_string(),
None,
iceberg::spec::DataFileFormat::Parquet,
);
let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::default(),
table.metadata().current_schema().clone(),
table.file_io().clone(),
location_generator.clone(),
file_name_generator.clone(),
);
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder);
let mut data_file_writer = data_file_writer_builder
.build(DataFileWriterConfig::new(None))
.await
.unwrap();
let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);
let batch = RecordBatch::try_new(schema.clone(), vec![
Arc::new(col1) as ArrayRef,
Arc::new(col2) as ArrayRef,
Arc::new(col3) as ArrayRef,
])
.unwrap();
data_file_writer.write(batch.clone()).await.unwrap();
let data_file = data_file_writer.close().await.unwrap();

// check parquet file schema
let content = table
.file_io()
.new_input(data_file[0].file_path())
.unwrap()
.read()
.await
.unwrap();
let parquet_reader = parquet::arrow::arrow_reader::ArrowReaderMetadata::load(
&content,
ArrowReaderOptions::default(),
)
.unwrap();
let field_ids: Vec<i32> = parquet_reader
.parquet_schema()
.columns()
.iter()
.map(|col| col.self_type().get_basic_info().id())
.collect();
assert_eq!(field_ids, vec![1, 2, 3]);

// commit result
let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
let tx = append_action.apply().await.unwrap();
let table = tx.commit(&fixture.rest_catalog).await.unwrap();

// check result
let batch_stream = table
.scan()
.select_all()
.build()
.unwrap()
.to_arrow()
.await
.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0], batch);

// commit result again
let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
let tx = append_action.apply().await.unwrap();
let table = tx.commit(&fixture.rest_catalog).await.unwrap();

// check result again
let batch_stream = table
.scan()
.select_all()
.build()
.unwrap()
.to_arrow()
.await
.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 2);
assert_eq!(batches[0], batch);
assert_eq!(batches[1], batch);
}
Loading

0 comments on commit 85076c7

Please sign in to comment.