Skip to content

Commit

Permalink
feat: support iceberg sink partition write (#12664)
Browse files Browse the repository at this point in the history
Co-authored-by: ZENOTME <[email protected]>
  • Loading branch information
ZENOTME and ZENOTME authored Oct 8, 2023
1 parent 437d36e commit e69f513
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 16 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.4.0" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.0" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "72a65aed6ed7b3d529b311031c2c8d99650990e2" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "16dab0e36ab337e58ee8002d828def2d212fa116" }
arrow-array = "47"
arrow-cast = "47"
arrow-schema = "47"
Expand Down
2 changes: 2 additions & 0 deletions ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ bash ./start_spark_connect_server.sh
"$HOME"/.local/bin/poetry update --quiet
"$HOME"/.local/bin/poetry run python main.py -t ./test_case/no_partition_append_only.toml
"$HOME"/.local/bin/poetry run python main.py -t ./test_case/no_partition_upsert.toml
"$HOME"/.local/bin/poetry run python main.py -t ./test_case/partition_append_only.toml
"$HOME"/.local/bin/poetry run python main.py -t ./test_case/partition_upsert.toml


echo "--- Kill cluster"
Expand Down
40 changes: 40 additions & 0 deletions e2e_test/iceberg/test_case/partition_append_only.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.demo_table',
'''
CREATE TABLE demo_db.demo_table (
id long,
v_int int,
v_long long,
v_float float,
v_double double,
v_varchar string,
v_bool boolean,
v_date date,
v_timestamp timestamp,
v_ts_ntz timestamp_ntz
)
PARTITIONED BY (v_int,v_long,v_float,v_double,v_varchar,v_bool,v_date,v_timestamp,v_ts_ntz)
TBLPROPERTIES ('format-version'='2');
'''
]

slt = 'test_case/iceberg_sink_append_only.slt'

verify_schema = ['long', 'int', 'long', 'float', 'double', 'string', 'boolean', 'date', 'timestamp', 'timestamp_ntz']

verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id ASC'


verify_data = """
1,1,1000,1.1,1.11,1-1,true,2022-03-11,2022-03-11 01:00:00+00:00,2022-03-11 01:00:00
2,2,2000,2.2,2.22,2-2,false,2022-03-12,2022-03-12 02:00:00+00:00,2022-03-12 02:00:00
3,3,3000,3.3,3.33,3-3,true,2022-03-13,2022-03-13 03:00:00+00:00,2022-03-13 03:00:00
4,4,4000,4.4,4.44,4-4,false,2022-03-14,2022-03-14 04:00:00+00:00,2022-03-14 04:00:00
5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00
"""

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.demo_table',
'DROP SCHEMA IF EXISTS demo_db'
]
35 changes: 35 additions & 0 deletions e2e_test/iceberg/test_case/partition_upsert.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.demo_table',
'''
CREATE TABLE demo_db.demo_table (
id int,
v1 int,
v2 long,
v3 string
) USING iceberg
PARTITIONED BY (v1,v2)
TBLPROPERTIES ('format-version'='2');
'''
]

slt = 'test_case/iceberg_sink_upsert.slt'

verify_schema = ['int','int','long','string']

verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id, v1 ASC'

verify_data = """
1,1,50,1-50
1,2,2,2-2
1,3,2,3-2
1,5,2,5-2
1,8,2,8-2
1,13,2,13-2
1,21,2,21-2
"""

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.demo_table',
'DROP SCHEMA IF EXISTS demo_db'
]
39 changes: 25 additions & 14 deletions src/connector/src/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef};
use async_trait::async_trait;
use icelake::catalog::{load_catalog, CATALOG_NAME, CATALOG_TYPE};
use icelake::io::file_writer::DeltaWriterResult;
use icelake::io::EmptyLayer;
use icelake::transaction::Transaction;
use icelake::types::{data_file_from_json, data_file_to_json, DataFile};
use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile};
use icelake::{Table, TableIdentifier};
use risingwave_common::array::{to_record_batch_with_schema, Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
Expand Down Expand Up @@ -367,8 +368,12 @@ impl Sink for IcebergSink {
_connector_client: Option<ConnectorClient>,
) -> Result<Self::Coordinator> {
let table = self.create_table().await?;
let partition_type = table.current_partition_type()?;

Ok(IcebergSinkCommitter { table })
Ok(IcebergSinkCommitter {
table,
partition_type,
})
}
}

Expand Down Expand Up @@ -437,7 +442,7 @@ impl SinkWriter for IcebergWriter {

struct AppendOnlyWriter {
table: Table,
writer: icelake::io::task_writer::TaskWriter,
writer: icelake::io::task_writer::TaskWriter<EmptyLayer>,
schema: SchemaRef,
}

Expand All @@ -455,7 +460,9 @@ impl AppendOnlyWriter {

Ok(Self {
writer: table
.task_writer()
.writer_builder()
.await?
.build_task_writer()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
table,
Expand Down Expand Up @@ -484,7 +491,9 @@ impl AppendOnlyWriter {
let old_writer = std::mem::replace(
&mut self.writer,
self.table
.task_writer()
.writer_builder()
.await?
.build_task_writer()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
);
Expand Down Expand Up @@ -604,7 +613,7 @@ impl UpsertWriter {

struct UnpartitionDeltaWriter {
table: Table,
writer: icelake::io::file_writer::EqualityDeltaWriter,
writer: icelake::io::file_writer::EqualityDeltaWriter<EmptyLayer>,
unique_column_ids: Vec<usize>,
}

Expand Down Expand Up @@ -646,7 +655,10 @@ impl UnpartitionDeltaWriter {

struct PartitionDeltaWriter {
table: Table,
writers: HashMap<icelake::types::PartitionKey, icelake::io::file_writer::EqualityDeltaWriter>,
writers: HashMap<
icelake::types::PartitionKey,
icelake::io::file_writer::EqualityDeltaWriter<EmptyLayer>,
>,
partition_splitter: icelake::types::PartitionSplitter,
unique_column_ids: Vec<usize>,
}
Expand Down Expand Up @@ -730,10 +742,8 @@ struct WriteResult {
delete_files: Vec<DataFile>,
}

impl<'a> TryFrom<&'a SinkMetadata> for WriteResult {
type Error = SinkError;

fn try_from(value: &'a SinkMetadata) -> std::result::Result<Self, Self::Error> {
impl WriteResult {
fn try_from(value: &SinkMetadata, partition_type: &Any) -> Result<Self> {
if let Some(Serialized(v)) = &value.metadata {
let mut values = if let serde_json::Value::Object(v) =
serde_json::from_slice::<serde_json::Value>(&v.metadata).map_err(
Expand All @@ -752,7 +762,7 @@ impl<'a> TryFrom<&'a SinkMetadata> for WriteResult {
{
data_files = values
.into_iter()
.map(data_file_from_json)
.map(|value| data_file_from_json(value, partition_type.clone()))
.collect::<std::result::Result<Vec<DataFile>, icelake::Error>>()
.unwrap();
} else {
Expand All @@ -764,7 +774,7 @@ impl<'a> TryFrom<&'a SinkMetadata> for WriteResult {
{
delete_files = values
.into_iter()
.map(data_file_from_json)
.map(|value| data_file_from_json(value, partition_type.clone()))
.collect::<std::result::Result<Vec<DataFile>, icelake::Error>>()
.map_err(|e| anyhow!("Failed to parse data file from json: {}", e))?;
} else {
Expand Down Expand Up @@ -822,6 +832,7 @@ impl<'a> TryFrom<&'a WriteResult> for SinkMetadata {

pub struct IcebergSinkCommitter {
table: Table,
partition_type: Any,
}

#[async_trait::async_trait]
Expand All @@ -836,7 +847,7 @@ impl SinkCommitCoordinator for IcebergSinkCommitter {

let write_results = metadata
.iter()
.map(WriteResult::try_from)
.map(|meta| WriteResult::try_from(meta, &self.partition_type))
.collect::<Result<Vec<WriteResult>>>()?;

let mut txn = Transaction::new(&mut self.table);
Expand Down

0 comments on commit e69f513

Please sign in to comment.