Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iceberg): support iceberg engine table (in local env) #19577

Merged
merged 60 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
41f3b90
support create table with primary key
chenzl25 Sep 4, 2024
73f914a
feat(iceberg): support create table without pk for nimtable (#18406)
chenzl25 Sep 4, 2024
5c83b2f
feat(nimtable): nimtable hide iceberg row (#18410)
chenzl25 Sep 5, 2024
11734b3
fix engine migration
Li0k Sep 9, 2024
94c045e
feat(iceberg): support drop table for nimtable (#18404)
chenzl25 Sep 4, 2024
f53edc2
feat(iceberg): support dql for nimtable (#18408)
chenzl25 Sep 4, 2024
559d5ac
feat(nimtable): support ban ddl for iceberg engine table (#18409)
Li0k Sep 5, 2024
25c7087
feat(nimtable): nimtable make drop table more robust (#18422)
chenzl25 Sep 7, 2024
6edbf1c
feat(nimtable): enable create index on iceberg table (#18526)
Li0k Sep 13, 2024
959f267
feat(nimtable): reuse existing env and add risedev nimtable (#18531)
chenzl25 Sep 19, 2024
343d9b7
feat(nimtable): fix drop table with schema (#18904)
chenzl25 Oct 15, 2024
9643140
resolve conficts with iceberg properties refactor
chenzl25 Sep 20, 2024
b9acc71
fix(nimtable): fix database path (#19182)
chenzl25 Nov 4, 2024
0b3de6a
fix compile
chenzl25 Nov 21, 2024
07887b1
change nimtable to iceberg
chenzl25 Nov 21, 2024
9060bd3
fix grafana
chenzl25 Nov 21, 2024
3204184
make iceberg ak&sk optional
chenzl25 Nov 21, 2024
91ecc30
resolve conflicts
chenzl25 Nov 25, 2024
c9d44a6
refactor enable_config_load
chenzl25 Nov 25, 2024
0353ac2
refactor env
chenzl25 Nov 26, 2024
8f90716
fix
chenzl25 Nov 26, 2024
7a10d06
fix
chenzl25 Nov 26, 2024
41726fb
resolve conflicts
chenzl25 Nov 26, 2024
34ab630
remove unnecessary changes
chenzl25 Nov 26, 2024
cfa03c9
fmt
chenzl25 Nov 26, 2024
1385e91
resolve conflicts
chenzl25 Nov 26, 2024
38221a4
fmt
chenzl25 Nov 26, 2024
3f7e1f7
fix test
chenzl25 Nov 26, 2024
9f28568
add iceberg-engine e2e-test
chenzl25 Nov 26, 2024
fbe3473
chmod iceberg engine script
chenzl25 Nov 26, 2024
2e032c3
add ci-iceberg-engine
chenzl25 Nov 26, 2024
8de1de0
fix ci
chenzl25 Nov 26, 2024
1487540
fix ci
chenzl25 Nov 26, 2024
c262e31
Merge branch 'main' into dylan/support_create_iceberg_engine_table
chenzl25 Nov 27, 2024
0062652
fix
chenzl25 Nov 27, 2024
9a4040c
Merge remote-tracking branch 'origin/main' into dylan/support_create_…
chenzl25 Nov 27, 2024
3c257d8
Merge remote-tracking branch 'origin/main' into dylan/support_create_…
chenzl25 Nov 27, 2024
af58731
fetch iceberg engine opts from the meta node
chenzl25 Nov 27, 2024
2e10ea7
fmt
chenzl25 Nov 27, 2024
80108f9
refactor iceberg engine ci
chenzl25 Nov 27, 2024
3b02778
refactor
chenzl25 Nov 27, 2024
b50277c
refactor
chenzl25 Nov 27, 2024
d4e163f
change engine label
chenzl25 Nov 28, 2024
e2bc603
resolve conflicts
chenzl25 Nov 28, 2024
55855aa
fmt
chenzl25 Nov 28, 2024
496b7d1
remove iceberg-engine from risedev
chenzl25 Nov 28, 2024
a5fc9cd
fmt
chenzl25 Nov 28, 2024
7231f60
resolve conflicts
chenzl25 Nov 29, 2024
dabe701
resolve conflicts
chenzl25 Dec 2, 2024
06838ba
fmt
chenzl25 Dec 2, 2024
c5c1e86
fix backward compatibility by making engine optional
chenzl25 Dec 3, 2024
c691fd4
fix
chenzl25 Dec 3, 2024
9795721
resolve conflicts
chenzl25 Dec 3, 2024
f03fd70
fix
chenzl25 Dec 3, 2024
35c9cee
resolve conflicts
chenzl25 Dec 3, 2024
1858251
refine
chenzl25 Dec 3, 2024
ba7a6d1
remove clap
chenzl25 Dec 4, 2024
3c782c2
Merge branch 'main' into dylan/support_create_iceberg_engine_table
chenzl25 Dec 4, 2024
932471f
Merge branch 'main' into dylan/support_create_iceberg_engine_table
chenzl25 Dec 5, 2024
1c681b8
Merge branch 'main' into dylan/support_create_iceberg_engine_table
chenzl25 Dec 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ services:
volumes:
- ..:/risingwave

iceberg-engine-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20241030
depends_on:
- db
volumes:
- ..:/risingwave

ci-flamegraph-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20241030
# NOTE(kwannoel): This is used in order to permit
Expand Down
48 changes: 48 additions & 0 deletions ci/scripts/e2e-iceberg-engine-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/usr/bin/env bash

# Exits as soon as any line fails.
set -euo pipefail

source ci/scripts/common.sh

# prepare environment
export CONNECTOR_LIBS_PATH="./connector-node/libs"

while getopts 'p:' opt; do
case ${opt} in
p )
profile=$OPTARG
;;
\? )
echo "Invalid Option: -$OPTARG" 1>&2
exit 1
;;
: )
echo "Invalid option: $OPTARG requires an argument" 1>&2
;;
esac
done
shift $((OPTIND -1))

download_and_prepare_rw "$profile" source

echo "--- Download connector node package"
buildkite-agent artifact download risingwave-connector.tar.gz ./
mkdir ./connector-node
tar xf ./risingwave-connector.tar.gz -C ./connector-node

PGPASSWORD=postgres psql -h db -p 5432 -U postgres -c "DROP DATABASE IF EXISTS metadata;" -c "CREATE DATABASE metadata;"

echo "--- starting risingwave cluster"
mkdir -p .risingwave/log
risedev ci-start ci-iceberg-engine
sleep 1


echo "--- testing iceberg engine"
sqllogictest -p 4566 -d dev './e2e_test/iceberg/test_case/iceberg_engine.slt'
sleep 1


echo "--- Kill cluster"
risedev ci-kill
15 changes: 15 additions & 0 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,21 @@ steps:
timeout_in_minutes: 15
retry: *auto-retry

- label: "end-to-end iceberg engine test"
if: build.pull_request.labels includes "ci/run-e2e-iceberg-engine-tests" || build.env("CI_STEPS") =~ /(^|,)e2e-iceberg-engine-tests?(,|$$)/
command: "ci/scripts/e2e-iceberg-engine-test.sh -p ci-dev"
depends_on:
- "build"
- "build-other"
plugins:
- docker-compose#v5.1.0:
run: iceberg-engine-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
retry: *auto-retry

- label: "end-to-end pulsar sink test"
if: build.pull_request.labels includes "ci/run-e2e-pulsar-sink-tests" || build.env("CI_STEPS") =~ /(^|,)e2e-pulsar-sink-tests?(,|$$)/
command: "ci/scripts/e2e-pulsar-sink-test.sh -p ci-dev"
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_engine.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
statement ok
set sink_decouple = false;

statement ok
create table t(id int primary key, xxname varchar) engine = iceberg;

statement ok
insert into t values(1, 'xxx');

statement ok
FLUSH;

sleep 5s

query ??
select * from t;
----
1 xxx

statement ok
DROP TABLE t;
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -4203,7 +4203,7 @@ def section_iceberg_metrics(outer_panels):
"read @ {{table_name}}",
),
panels.target(
f"sum({metric('nimtable_read_bytes')})",
f"sum({metric('iceberg_read_bytes')})",
"total read",
),
],
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ public boolean tableExists(String tableIdentifier) {
return catalog.tableExists(id);
}

/**
* Drop a table from the catalog.
*
* @param tableIdentifier The identifier of the table to drop.
* @return true if the table was dropped, false otherwise.
*/
public boolean dropTable(String tableIdentifier) {
TableIdentifier id = TableIdentifier.parse(tableIdentifier);
return catalog.dropTable(id);
}

/**
* Create JniCatalogWrapper instance.
*
Expand Down
9 changes: 9 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@ message Table {
int32 next_column_id = 2;
}

enum Engine {
ENGINE_UNSPECIFIED = 0;
HUMMOCK = 1;
ICEBERG = 2;
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
}

uint32 id = 1;
uint32 schema_id = 2;
uint32 database_id = 3;
Expand Down Expand Up @@ -473,6 +479,9 @@ message Table {
// This field stores the job ID for internal tables.
optional uint32 job_id = 42;

// Table Engine, currently only support hummock and iceberg
optional Engine engine = 43;

// Per-table catalog version, used by schema change. `None` for internal
// tables and tests. Not to be confused with the global catalog version for
// notification service.
Expand Down
17 changes: 17 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,23 @@ profile:
- use: compute-node
- use: frontend

ci-iceberg-engine:
steps:
- use: minio
- use: postgres
port: 5432
address: db
database: metadata
user: postgres
password: postgres
user-managed: true
application: metastore
- use: meta-node
meta-backend: postgres
- use: compute-node
- use: frontend
- use: compactor

meta-1cn-1fe-sqlite-with-recovery:
config-path: src/config/ci-recovery.toml
steps:
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/array/arrow/arrow_iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ impl ToArrow for IcebergCreateTableArrowConvert {
arrow_field
}

fn jsonb_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
let data_type = arrow_schema::DataType::Utf8;

let mut arrow_field = arrow_schema::Field::new(name, data_type, true);
self.add_field_id(&mut arrow_field);
arrow_field
}

/// Convert RisingWave data type to Arrow data type.
///
/// This function returns a `Field` instead of `DataType` because some may be converted to
Expand Down
31 changes: 31 additions & 0 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use futures::stream::BoxStream;
pub use internal_table::*;
use parse_display::Display;
pub use physical_table::*;
use risingwave_pb::catalog::table::PbEngine;
use risingwave_pb::catalog::{
CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
StreamJobStatus as PbStreamJobStatus,
Expand Down Expand Up @@ -550,6 +551,36 @@ impl ConflictBehavior {
}
}

#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Engine {
#[default]
Hummock,
Iceberg,
}

impl Engine {
pub fn from_protobuf(engine: &PbEngine) -> Self {
match engine {
PbEngine::Hummock | PbEngine::Unspecified => Engine::Hummock,
PbEngine::Iceberg => Engine::Iceberg,
}
}

pub fn to_protobuf(self) -> PbEngine {
match self {
Engine::Hummock => PbEngine::Hummock,
Engine::Iceberg => PbEngine::Iceberg,
}
}

pub fn debug_to_string(self) -> String {
match self {
Engine::Hummock => "Hummock".to_string(),
Engine::Iceberg => "Iceberg".to_string(),
}
}
}

#[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
pub enum StreamJobStatus {
#[default]
Expand Down
27 changes: 24 additions & 3 deletions src/connector/src/connector_common/iceberg/jni_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl CatalogV2 for JniCatalog {
.map_err(|e| {
iceberg::Error::new(
iceberg::ErrorKind::Unexpected,
"Failed to crete iceberg table.",
"Failed to create iceberg table.",
)
.with_source(e)
})?
Expand Down Expand Up @@ -342,8 +342,29 @@ impl CatalogV2 for JniCatalog {
}

/// Drop a table from the catalog.
async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> {
todo!()
async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> {
execute_with_jni_env(self.jvm, |env| {
let table_name_str = format!(
"{}.{}",
table.namespace().clone().inner().into_iter().join("."),
table.name()
);

let table_name_jstr = env.new_string(&table_name_str).unwrap();

call_method!(env, self.java_catalog.as_obj(), {boolean dropTable(String)},
&table_name_jstr)
.with_context(|| format!("Failed to drop iceberg table: {table_name_str}"))?;

Ok(())
})
.map_err(|e| {
iceberg::Error::new(
iceberg::ErrorKind::Unexpected,
"Failed to load iceberg table.",
)
.with_source(e)
})
}

/// Check if a table exists in the catalog.
Expand Down
Loading
Loading