diff --git a/.github/workflows/auto-update-helm-and-operator-version-by-release.yml b/.github/workflows/auto-update-helm-and-operator-version-by-release.yml index 1cdc4250a94f6..25014791742db 100644 --- a/.github/workflows/auto-update-helm-and-operator-version-by-release.yml +++ b/.github/workflows/auto-update-helm-and-operator-version-by-release.yml @@ -37,7 +37,7 @@ jobs: echo "NEW_CHART_VERSION=$NEW_VERSION" >> $GITHUB_ENV - name: Create Pull Request - uses: peter-evans/create-pull-request@v6 + uses: peter-evans/create-pull-request@v7 with: token: ${{ secrets.PR_TOKEN }} commit-message: 'chore: bump risingwave to ${{ env.NEW_APP_VERSION }}, release chart ${{ env.NEW_CHART_VERSION }}' @@ -66,7 +66,7 @@ jobs: grep -rl "risingwavelabs/risingwave:$PREV_VERSION" . | xargs sed -i "s|risingwavelabs/risingwave:$PREV_VERSION|risingwavelabs/risingwave:${{ env.NEW_APP_VERSION }}|g" - name: Create Pull Request for risingwave-operator - uses: peter-evans/create-pull-request@v6 + uses: peter-evans/create-pull-request@v7 with: token: ${{ secrets.PR_TOKEN }} commit-message: 'chore: bump risingwave image tags to ${{ env.NEW_APP_VERSION }}' diff --git a/.github/workflows/doc.yml b/.github/workflows/doc.yml index 321df692c4f0a..9453cd34d035f 100644 --- a/.github/workflows/doc.yml +++ b/.github/workflows/doc.yml @@ -31,7 +31,7 @@ jobs: - name: Install dependencies for compiling RisingWave run: sudo apt-get update && sudo apt-get install -y make build-essential cmake protobuf-compiler curl openssl libssl-dev libsasl2-dev libcurl4-openssl-dev pkg-config postgresql-client tmux lld - name: Run sccache-cache - uses: mozilla-actions/sccache-action@v0.0.6 + uses: mozilla-actions/sccache-action@v0.0.7 with: version: "v0.5.2" - name: build rustdocs diff --git a/.github/workflows/license_check.yml b/.github/workflows/license_check.yml index 04b045cb17b54..2efcedd981e71 100644 --- a/.github/workflows/license_check.yml +++ b/.github/workflows/license_check.yml @@ -18,4 +18,4 @@ jobs: steps: - uses: actions/checkout@v4 - name: Check License Header - uses: apache/skywalking-eyes@775fe1ffda59b7e100aa144d0ef8d7beae17f97d + uses: apache/skywalking-eyes@3ea9df11bb3a5a85665377d1fd10c02edecf2c40 diff --git a/.github/workflows/typo.yml b/.github/workflows/typo.yml index d0bacf8c08ce5..35b12c14f91ae 100644 --- a/.github/workflows/typo.yml +++ b/.github/workflows/typo.yml @@ -10,4 +10,4 @@ jobs: uses: actions/checkout@v4 - name: Check spelling of the entire repository - uses: crate-ci/typos@v1.28.2 + uses: crate-ci/typos@v1.28.3 diff --git a/Cargo.lock b/Cargo.lock index 9e2ddd20dc3f3..bdb5531e5481d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3938,7 +3938,7 @@ dependencies = [ [[package]] name = "delta_btree_map" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "educe", "enum-as-inner 0.6.0", @@ -7216,7 +7216,7 @@ checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" [[package]] name = "local_stats_alloc" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "workspace-hack", ] @@ -9008,7 +9008,7 @@ dependencies = [ [[package]] name = "pgwire" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "auto_enums", @@ -10452,7 +10452,7 @@ dependencies = [ [[package]] name = "risedev" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "chrono", @@ -10485,7 +10485,7 @@ dependencies = [ [[package]] name = "risedev-config" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "clap", @@ -10498,7 +10498,7 @@ dependencies = [ [[package]] name = "risingwave-fields-derive" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "expect-test", "indoc", @@ -10510,7 +10510,7 @@ dependencies = [ [[package]] name = "risingwave_backup" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "async-trait", @@ -10527,12 +10527,12 @@ dependencies = [ "serde", "serde_json", "thiserror 1.0.63", - "twox-hash 1.6.3", + "twox-hash 2.0.1", ] [[package]] name = "risingwave_batch" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "async-recursion", @@ -10571,13 +10571,13 @@ dependencies = [ "tokio-postgres", "tokio-stream 0.1.15", "tracing", - "twox-hash 1.6.3", + "twox-hash 2.0.1", "workspace-hack", ] [[package]] name = "risingwave_batch_executors" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "assert_matches", @@ -10623,7 +10623,7 @@ dependencies = [ [[package]] name = "risingwave_bench" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "async-trait", @@ -10665,7 +10665,7 @@ dependencies = [ [[package]] name = "risingwave_cmd" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "clap", "madsim-tokio", @@ -10686,7 +10686,7 @@ dependencies = [ [[package]] name = "risingwave_cmd_all" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "clap", @@ -10720,7 +10720,7 @@ dependencies = [ [[package]] name = "risingwave_common" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "ahash 0.8.11", "anyhow", @@ -10830,7 +10830,7 @@ dependencies = [ "tracing-futures", "tracing-opentelemetry", "tracing-subscriber", - "twox-hash 1.6.3", + "twox-hash 2.0.1", "url", "uuid", "workspace-hack", @@ -10838,7 +10838,7 @@ dependencies = [ [[package]] name = "risingwave_common_estimate_size" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "bytes", "educe", @@ -10853,7 +10853,7 @@ dependencies = [ [[package]] name = "risingwave_common_heap_profiling" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "chrono", @@ -10868,7 +10868,7 @@ dependencies = [ [[package]] name = "risingwave_common_metrics" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "auto_impl", "bytes", @@ -10905,7 +10905,7 @@ dependencies = [ [[package]] name = "risingwave_common_proc_macro" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "bae", "itertools 0.13.0", @@ -10917,7 +10917,7 @@ dependencies = [ [[package]] name = "risingwave_common_secret" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "aes-gcm", "anyhow", @@ -10934,7 +10934,7 @@ dependencies = [ [[package]] name = "risingwave_common_service" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "async-trait", "axum", @@ -10958,7 +10958,7 @@ dependencies = [ [[package]] name = "risingwave_compaction_test" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "async-trait", @@ -10986,7 +10986,7 @@ dependencies = [ [[package]] name = "risingwave_compactor" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "async-trait", "await-tree", @@ -11010,7 +11010,7 @@ dependencies = [ [[package]] name = "risingwave_compute" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "async-trait", @@ -11058,7 +11058,7 @@ dependencies = [ [[package]] name = "risingwave_connector" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "apache-avro 0.16.0", @@ -11192,7 +11192,7 @@ dependencies = [ [[package]] name = "risingwave_connector_codec" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "apache-avro 0.16.0", @@ -11224,7 +11224,7 @@ dependencies = [ [[package]] name = "risingwave_ctl" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "bytes", @@ -11266,7 +11266,7 @@ dependencies = [ [[package]] name = "risingwave_dml" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "assert_matches", "criterion", @@ -11290,7 +11290,7 @@ dependencies = [ [[package]] name = "risingwave_e2e_extended_mode_test" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "chrono", @@ -11305,7 +11305,7 @@ dependencies = [ [[package]] name = "risingwave_error" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "bincode 1.3.3", @@ -11321,7 +11321,7 @@ dependencies = [ [[package]] name = "risingwave_expr" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "async-trait", @@ -11360,7 +11360,7 @@ dependencies = [ [[package]] name = "risingwave_expr_impl" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "aho-corasick", "anyhow", @@ -11427,7 +11427,7 @@ dependencies = [ [[package]] name = "risingwave_frontend" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "arc-swap", @@ -11518,7 +11518,7 @@ dependencies = [ [[package]] name = "risingwave_frontend_macro" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "proc-macro2", "quote", @@ -11527,7 +11527,7 @@ dependencies = [ [[package]] name = "risingwave_hummock_sdk" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "bytes", "easy-ext", @@ -11546,7 +11546,7 @@ dependencies = [ [[package]] name = "risingwave_hummock_test" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "async-trait", "bytes", @@ -11579,7 +11579,7 @@ dependencies = [ [[package]] name = "risingwave_hummock_trace" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "async-trait", "bincode 2.0.0-rc.3", @@ -11656,7 +11656,7 @@ dependencies = [ [[package]] name = "risingwave_license" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "expect-test", "jsonbb", @@ -11671,7 +11671,7 @@ dependencies = [ [[package]] name = "risingwave_mem_table_spill_test" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "async-trait", "bytes", @@ -11687,7 +11687,7 @@ dependencies = [ [[package]] name = "risingwave_meta" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "arc-swap", @@ -11768,7 +11768,7 @@ dependencies = [ [[package]] name = "risingwave_meta_dashboard" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "axum", @@ -11789,7 +11789,7 @@ dependencies = [ [[package]] name = "risingwave_meta_model" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "prost 0.13.1", "risingwave_common", @@ -11802,7 +11802,7 @@ dependencies = [ [[package]] name = "risingwave_meta_model_migration" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "async-std", "easy-ext", @@ -11815,7 +11815,7 @@ dependencies = [ [[package]] name = "risingwave_meta_node" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "clap", @@ -11848,7 +11848,7 @@ dependencies = [ [[package]] name = "risingwave_meta_service" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "async-trait", @@ -11877,7 +11877,7 @@ dependencies = [ [[package]] name = "risingwave_object_store" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "async-trait", "await-tree", @@ -11913,7 +11913,7 @@ dependencies = [ [[package]] name = "risingwave_pb" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "enum-as-inner 0.6.0", "fs-err", @@ -11934,7 +11934,7 @@ dependencies = [ [[package]] name = "risingwave_planner_test" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "expect-test", @@ -11956,7 +11956,7 @@ dependencies = [ [[package]] name = "risingwave_regress_test" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "clap", @@ -11970,7 +11970,7 @@ dependencies = [ [[package]] name = "risingwave_rpc_client" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "async-trait", @@ -12005,7 +12005,7 @@ dependencies = [ [[package]] name = "risingwave_rt" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "await-tree", "console", @@ -12092,7 +12092,7 @@ dependencies = [ [[package]] name = "risingwave_sqlparser" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "console", @@ -12113,7 +12113,7 @@ dependencies = [ [[package]] name = "risingwave_sqlsmith" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "chrono", @@ -12141,7 +12141,7 @@ dependencies = [ [[package]] name = "risingwave_state_cleaning_test" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "clap", @@ -12161,7 +12161,7 @@ dependencies = [ [[package]] name = "risingwave_storage" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "ahash 0.8.11", "anyhow", @@ -12232,7 +12232,7 @@ dependencies = [ [[package]] name = "risingwave_stream" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "anyhow", "arc-swap", @@ -12304,7 +12304,7 @@ dependencies = [ [[package]] name = "risingwave_telemetry_event" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "jsonbb", "madsim-tokio", @@ -12319,7 +12319,7 @@ dependencies = [ [[package]] name = "risingwave_test_runner" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "fail", "sync-point", @@ -12328,7 +12328,7 @@ dependencies = [ [[package]] name = "risingwave_variables" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "chrono", "workspace-hack", @@ -16574,7 +16574,7 @@ dependencies = [ [[package]] name = "with_options" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "proc-macro2", "quote", @@ -16595,7 +16595,7 @@ dependencies = [ [[package]] name = "workspace-config" -version = "2.2.0-alpha" +version = "2.3.0-alpha" dependencies = [ "libz-sys", "log", @@ -16608,7 +16608,7 @@ dependencies = [ [[package]] name = "workspace-hack" -version = "2.2.0-alpha" +version = "2.3.0-alpha" [[package]] name = "wyz" diff --git a/Cargo.toml b/Cargo.toml index 0644def23d5c9..4a86290bbc128 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,7 +73,7 @@ exclude = ["e2e_test/udf/wasm", "lints"] resolver = "2" [workspace.package] -version = "2.2.0-alpha" +version = "2.3.0-alpha" edition = "2021" homepage = "https://github.com/risingwavelabs/risingwave" keywords = ["sql", "database", "streaming"] diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index dda6de44382d1..a3e54e06eeaec 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -112,7 +112,7 @@ services: # Standard environment for CI, including MySQL and Postgres for metadata. ci-standard-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240911 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20241213 depends_on: - mysql - db diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index a52e7be793911..94ceaaaa853c1 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -32,7 +32,7 @@ mkdir ./connector-node tar xf ./risingwave-connector.tar.gz -C ./connector-node echo "--- Install dependencies" -python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema nats-py requests psycopg2-binary +python3 -m pip install --break-system-packages -r ./e2e_test/requirements.txt apt-get -y install jq echo "--- e2e, inline test" diff --git a/e2e_test/error_ui/simple/main.slt b/e2e_test/error_ui/simple/main.slt index c8df11e7496e1..4086553d88f5e 100644 --- a/e2e_test/error_ui/simple/main.slt +++ b/e2e_test/error_ui/simple/main.slt @@ -29,7 +29,7 @@ alter system set not_exist_key to value; db error: ERROR: Failed to run the query Caused by these errors (recent errors listed first): - 1: gRPC request to meta service failed: Internal error + 1: gRPC request to meta service (call `/meta.SystemParamsService/SetSystemParam`) failed: Internal error 2: SystemParams error: unrecognized system parameter "not_exist_key" diff --git a/e2e_test/requirements.txt b/e2e_test/requirements.txt new file mode 100644 index 0000000000000..59fef9c06fd42 --- /dev/null +++ b/e2e_test/requirements.txt @@ -0,0 +1,15 @@ +-r source_legacy/requirements.txt +-r source_inline/requirements.txt +requests +confluent_kafka==2.6.2 +# confluent_kafka doesn't list dependencies, so we need to install them manually +# https://github.com/confluentinc/confluent-kafka-python/issues/1712 +protobuf==5.28.0 +fastavro==1.9.4 +httpx==0.27.2 +attrs +cachetools +jsonschema +google-auth +google-api-core +google-cloud-kms diff --git a/e2e_test/sink/kafka/protobuf.slt b/e2e_test/sink/kafka/protobuf.slt index 82d1a0693f24f..c13c8f04f7405 100644 --- a/e2e_test/sink/kafka/protobuf.slt +++ b/e2e_test/sink/kafka/protobuf.slt @@ -124,7 +124,7 @@ format upsert encode protobuf ( db error: ERROR: Failed to run the query Caused by these errors (recent errors listed first): - 1: gRPC request to meta service failed: Internal error + 1: gRPC request to meta service (call `/ddl_service.DdlService/CreateSink`) failed: Internal error 2: failed to validate sink 3: config error 4: sink format/encode/key_encode unsupported: Upsert Protobuf None diff --git a/e2e_test/sink/license.slt b/e2e_test/sink/license.slt index 1ba7d6ee38114..278823da3dac9 100644 --- a/e2e_test/sink/license.slt +++ b/e2e_test/sink/license.slt @@ -24,7 +24,7 @@ WITH db error: ERROR: Failed to run the query Caused by these errors (recent errors listed first): - 1: gRPC request to meta service failed: Internal error + 1: gRPC request to meta service (call `/ddl_service.DdlService/CreateSink`) failed: Internal error 2: failed to validate sink 3: Internal error 4: feature DynamoDbSink is only available for tier Paid and above, while the current tier is Free @@ -49,7 +49,7 @@ WITH ( db error: ERROR: Failed to run the query Caused by these errors (recent errors listed first): - 1: gRPC request to meta service failed: Internal error + 1: gRPC request to meta service (call `/ddl_service.DdlService/CreateSink`) failed: Internal error 2: failed to validate sink 3: Internal error 4: feature SnowflakeSink is only available for tier Paid and above, while the current tier is Free @@ -70,7 +70,7 @@ WITH ( db error: ERROR: Failed to run the query Caused by these errors (recent errors listed first): - 1: gRPC request to meta service failed: Internal error + 1: gRPC request to meta service (call `/ddl_service.DdlService/CreateSink`) failed: Internal error 2: failed to validate sink 3: feature OpenSearchSink is only available for tier Paid and above, while the current tier is Free @@ -95,7 +95,7 @@ WITH db error: ERROR: Failed to run the query Caused by these errors (recent errors listed first): - 1: gRPC request to meta service failed: Internal error + 1: gRPC request to meta service (call `/ddl_service.DdlService/CreateSink`) failed: Internal error 2: failed to validate sink 3: Internal error 4: feature BigQuerySink is only available for tier Paid and above, while the current tier is Free @@ -174,7 +174,7 @@ WITH ( db error: ERROR: Failed to run the query Caused by these errors (recent errors listed first): - 1: gRPC request to meta service failed: Internal error + 1: gRPC request to meta service (call `/ddl_service.DdlService/CreateSink`) failed: Internal error 2: failed to validate sink 3: sink cannot pass validation: INTERNAL: Connection is closed @@ -197,7 +197,7 @@ WITH db error: ERROR: Failed to run the query Caused by these errors (recent errors listed first): - 1: gRPC request to meta service failed: Internal error + 1: gRPC request to meta service (call `/ddl_service.DdlService/CreateSink`) failed: Internal error 2: failed to validate sink 3: BigQuery error 4: No such file or directory (os error 2) @@ -207,4 +207,4 @@ statement ok DROP SINK snowflake_sink; statement ok -DROP TABLE t; \ No newline at end of file +DROP TABLE t; diff --git a/e2e_test/sink/remote/jdbc.load.slt b/e2e_test/sink/remote/jdbc.load.slt index 9a4ede4e032ed..32b9dcf875eef 100644 --- a/e2e_test/sink/remote/jdbc.load.slt +++ b/e2e_test/sink/remote/jdbc.load.slt @@ -66,7 +66,9 @@ create materialized view mv_remote_1 as select * from t_remote_1; statement ok CREATE SINK s_postgres_0 FROM mv_remote_0 WITH ( connector='jdbc', - jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector', + jdbc.url='jdbc:postgresql://db:5432/test', + user='test', + password='connector', table.name='t_remote_0', primary_key='id', type='upsert' @@ -75,7 +77,9 @@ CREATE SINK s_postgres_0 FROM mv_remote_0 WITH ( statement ok CREATE SINK s_postgres_1 FROM mv_remote_1 WITH ( connector='jdbc', - jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector', + jdbc.url='jdbc:postgresql://db:5432/test', + user='test', + password='connector', table.name='t_remote_1', primary_key='id', type='upsert' @@ -84,7 +88,9 @@ CREATE SINK s_postgres_1 FROM mv_remote_1 WITH ( statement ok CREATE SINK s_mysql_0 FROM mv_remote_0 WITH ( connector='jdbc', - jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw&connectionTimeZone=UTC', + jdbc.url='jdbc:mysql://mysql:3306/test?connectionTimeZone=UTC', + user='mysqluser', + password='mysqlpw', table.name='t_remote_0', primary_key='id', type='upsert' @@ -93,7 +99,9 @@ CREATE SINK s_mysql_0 FROM mv_remote_0 WITH ( statement ok CREATE SINK s_mysql_1 FROM mv_remote_1 WITH ( connector='jdbc', - jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw&connectionTimeZone=UTC', + jdbc.url='jdbc:mysql://mysql:3306/test?connectionTimeZone=UTC', + user='mysqluser', + password='mysqlpw', table.name='t_remote_1', primary_key='id', type='upsert' @@ -102,7 +110,9 @@ CREATE SINK s_mysql_1 FROM mv_remote_1 WITH ( statement ok CREATE SINK s2_postgres FROM rw_typed_data WITH ( connector='jdbc', - jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector', + jdbc.url='jdbc:postgresql://db:5432/test', + user='test', + password='connector', table.name='t_types', primary_key='id', schema.name='biz', @@ -112,7 +122,9 @@ CREATE SINK s2_postgres FROM rw_typed_data WITH ( statement ok CREATE SINK s2_mysql FROM rw_typed_data WITH ( connector='jdbc', - jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw&connectionTimeZone=UTC', + jdbc.url='jdbc:mysql://mysql:3306/test?connectionTimeZone=UTC', + user='mysqluser', + password='mysqlpw', table.name='t_types', primary_key='id', type='upsert' @@ -127,7 +139,9 @@ CREATE TABLE tt1 ( statement ok CREATE SINK s_pg_0 FROM tt1 WITH ( connector='jdbc', - jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector', + jdbc.url='jdbc:postgresql://db:5432/test', + user='test', + password='connector', table.name='t_append_only', type='append-only', force_append_only=true @@ -141,7 +155,9 @@ CREATE TABLE tt2 ( statement ok CREATE SINK s_pg_t2 FROM tt2 WITH ( connector='jdbc', - jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector', + jdbc.url='jdbc:postgresql://db:5432/test', + user='test', + password='connector', table.name='t2', schema.name='biz', type='append-only', @@ -155,7 +171,9 @@ create table t1_uuid (v1 int primary key, v2 varchar, v3 varchar); statement ok CREATE SINK s1_uuid FROM t1_uuid WITH ( connector='jdbc', - jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector', + jdbc.url='jdbc:postgresql://db:5432/test', + user='test', + password='connector', table.name='t1_uuid', primary_key='v1', type='upsert' @@ -174,7 +192,9 @@ INSERT INTO t1_test_uuid_delete VALUES ('fb48ecc1-917f-4f4b-ab6d-d8e37809caf8', statement ok CREATE SINK sk_t1_uuid FROM t1_test_uuid_delete WITH ( connector='jdbc', - jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector', + jdbc.url='jdbc:postgresql://db:5432/test', + user='test', + password='connector', table.name='sk_t1_uuid', primary_key='id, v2', type='upsert' diff --git a/e2e_test/source_inline/kafka/issue_19563.slt.serial b/e2e_test/source_inline/kafka/issue_19563.slt.serial index cb3986f934df4..e080e831d9025 100644 --- a/e2e_test/source_inline/kafka/issue_19563.slt.serial +++ b/e2e_test/source_inline/kafka/issue_19563.slt.serial @@ -46,10 +46,6 @@ select array_length(upstream_fragment_ids) from rw_fragments where array_contain ---- 3 -# XXX: wait until source reader is ready. then produce data. -# This is a temporary workaround for a data loss bug https://github.com/risingwavelabs/risingwave/issues/19681#issuecomment-2532183002 -sleep 2s - system ok cat < properties) + throws SQLException { + boolean pubAutoCreate = + properties.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true"); + var pubName = properties.get(DbzConnectorConfig.PG_PUB_NAME); + if (!pubAutoCreate) { + LOG.info( + "Postgres publication auto creation is disabled, skip creation for publication {}.", + pubName); + return; + } + createPostgresPublicationInner(properties, pubName); + } + /** * This method is used to create a publication for the cdc source job or cdc table if it doesn't * exist. */ - public static void createPostgresPublicationIfNeeded( + public static void createPostgresPublicationInSourceExecutor( Map properties, long sourceId) throws SQLException { boolean pubAutoCreate = properties.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true"); + var pubName = properties.get(DbzConnectorConfig.PG_PUB_NAME); if (!pubAutoCreate) { LOG.info( - "Postgres publication auto creation is disabled, skip creation for source {}.", + "Postgres publication auto creation is disabled, skip creation for publication {}, sourceId = {}.", + pubName, sourceId); return; } + createPostgresPublicationInner(properties, pubName); + } - var pubName = properties.get(DbzConnectorConfig.PG_PUB_NAME); + private static void createPostgresPublicationInner( + Map properties, String pubName) throws SQLException { var dbHost = properties.get(DbzConnectorConfig.HOST); var dbPort = properties.get(DbzConnectorConfig.PORT); var dbName = properties.get(DbzConnectorConfig.DB_NAME); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 2199dd6168d62..1f9a1e8d86246 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -354,22 +354,22 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException } } - if (!isPublicationExists) { + if (!isPublicationExists && !pubAutoCreate) { // We require a publication on upstream to publish table cdc events - if (!pubAutoCreate) { - throw ValidatorUtils.invalidArgument( - "Publication '" + pubName + "' doesn't exist and auto create is disabled"); - } else { - // createPublicationIfNeeded(Optional.empty()); - LOG.info( - "Publication '{}' doesn't exist, will be created in the process of streaming job.", - this.pubName); - } + throw ValidatorUtils.invalidArgument( + "Publication '" + pubName + "' doesn't exist and auto create is disabled"); } // If the source properties is created by share source, skip the following // check of publication if (isCdcSourceJob) { + if (!isPublicationExists) { + LOG.info( + "creating cdc source job: publication '{}' doesn't exist, creating...", + pubName); + DbzSourceUtils.createPostgresPublicationInValidate(userProps); + LOG.info("creating cdc source job: publication '{}' created successfully", pubName); + } return; } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java index dd03d0adaca8a..ac159a202e8ba 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java @@ -60,7 +60,7 @@ public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long boolean isCdcSourceJob = request.getIsSourceJob(); if (request.getSourceType() == POSTGRES) { - DbzSourceUtils.createPostgresPublicationIfNeeded( + DbzSourceUtils.createPostgresPublicationInSourceExecutor( request.getPropertiesMap(), request.getSourceId()); } diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java index 0a97447bb56f5..8ec969ed6fa72 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java @@ -53,7 +53,9 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) { var factory = JdbcUtils.getDialectFactory(jdbcUrl); this.config = config; try { - conn = JdbcUtils.getConnection(config.getJdbcUrl()); + conn = + JdbcUtils.getConnection( + config.getJdbcUrl(), config.getUser(), config.getPassword()); // Table schema has been validated before, so we get the PK from it directly this.pkColumnNames = tableSchema.getPrimaryKeys(); // column name -> java.sql.Types @@ -179,7 +181,11 @@ public boolean write(Iterable rows) { conn.close(); // create a new connection if the current connection is invalid - conn = JdbcUtils.getConnection(config.getJdbcUrl()); + conn = + JdbcUtils.getConnection( + config.getJdbcUrl(), + config.getUser(), + config.getPassword()); // reset the flag since we will retry to prepare the batch again updateFlag = false; jdbcStatements = new JdbcStatements(conn, config.getQueryTimeout()); diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkConfig.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkConfig.java index 94eb5cdc7e0ff..985baa775343c 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkConfig.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkConfig.java @@ -23,6 +23,10 @@ public class JDBCSinkConfig extends CommonSinkConfig { private String jdbcUrl; + @JsonProperty private String user; + + @JsonProperty private String password; + private String tableName; private String sinkType; @@ -54,6 +58,14 @@ public String getJdbcUrl() { return jdbcUrl; } + public String getUser() { + return user; + } + + public String getPassword() { + return password; + } + public String getTableName() { return tableName; } diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java index 875bda89681de..cc8ceacfe7c1d 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java @@ -56,7 +56,9 @@ public void validate( Set jdbcPks = new HashSet<>(); Set jdbcTableNames = new HashSet<>(); - try (Connection conn = DriverManager.getConnection(jdbcUrl); + try (Connection conn = + DriverManager.getConnection( + jdbcUrl, config.getUser(), config.getPassword()); ResultSet tableNamesResultSet = conn.getMetaData().getTables(null, schemaName, "%", null); ResultSet columnResultSet = diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JdbcUtils.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JdbcUtils.java index c18107c1328f5..bce0bcca16e71 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JdbcUtils.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JdbcUtils.java @@ -39,7 +39,8 @@ public static Optional getDialectFactory(String jdbcUrl) { } /** The connection returned by this method is *not* autoCommit */ - public static Connection getConnection(String jdbcUrl) throws SQLException { + public static Connection getConnection(String jdbcUrl, String user, String password) + throws SQLException { var props = new Properties(); // enable TCP keep alive to avoid connection closed by server // both MySQL and PG support this property @@ -55,6 +56,12 @@ public static Connection getConnection(String jdbcUrl) throws SQLException { int socketTimeout = isPg ? SOCKET_TIMEOUT : SOCKET_TIMEOUT * 1000; props.setProperty("connectTimeout", String.valueOf(connectTimeout)); props.setProperty("socketTimeout", String.valueOf(socketTimeout)); + if (user != null) { + props.put("user", user); + } + if (password != null) { + props.put("password", password); + } var conn = DriverManager.getConnection(jdbcUrl, props); // disable auto commit can improve performance diff --git a/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jAdapter.java b/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jAdapter.java index e0aee7633c518..63be7fe4439e6 100644 --- a/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jAdapter.java +++ b/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jAdapter.java @@ -36,17 +36,23 @@ private void logIfEnabled(int level, String msg) { private void logIfEnabled(int level, String format, Object arg) { if (TracingSlf4jImpl.isEnabled(level)) { - TracingSlf4jImpl.event( - name, level, new ParameterizedMessage(format, arg).getFormattedMessage()); + var pm = new ParameterizedMessage(format, arg); + if (null != pm.getThrowable()) { + logIfEnabled(level, pm.getFormattedMessage(), pm.getThrowable()); + } else { + TracingSlf4jImpl.event(name, level, pm.getFormattedMessage()); + } } } private void logIfEnabled(int level, String format, Object arg1, Object arg2) { if (TracingSlf4jImpl.isEnabled(level)) { - TracingSlf4jImpl.event( - name, - level, - new ParameterizedMessage(format, arg1, arg2).getFormattedMessage()); + var pm = new ParameterizedMessage(format, arg1, arg2); + if (null != pm.getThrowable()) { + logIfEnabled(level, pm.getFormattedMessage(), pm.getThrowable()); + } else { + TracingSlf4jImpl.event(name, level, pm.getFormattedMessage()); + } } } diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 169eaf5c3605d..c501f2df4962e 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -56,7 +56,7 @@ tokio-postgres = "0.7" tokio-stream = { workspace = true } tonic = { workspace = true } tracing = "0.1" -twox-hash = "1" +twox-hash = "2" [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/batch/executors/src/executor/s3_file_scan.rs b/src/batch/executors/src/executor/s3_file_scan.rs index 82a80a17dc742..8140011dfcfce 100644 --- a/src/batch/executors/src/executor/s3_file_scan.rs +++ b/src/batch/executors/src/executor/s3_file_scan.rs @@ -16,7 +16,9 @@ use futures_async_stream::try_stream; use futures_util::stream::StreamExt; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; -use risingwave_connector::source::iceberg::{new_s3_operator, read_parquet_file}; +use risingwave_connector::source::iceberg::{ + extract_bucket_and_file_name, new_s3_operator, read_parquet_file, +}; use risingwave_pb::batch_plan::file_scan_node; use risingwave_pb::batch_plan::file_scan_node::StorageType; use risingwave_pb::batch_plan::plan_node::NodeBody; @@ -82,13 +84,15 @@ impl S3FileScanExecutor { async fn do_execute(self: Box) { assert_eq!(self.file_format, FileFormat::Parquet); for file in self.file_location { + let (bucket, file_name) = extract_bucket_and_file_name(&file)?; let op = new_s3_operator( self.s3_region.clone(), self.s3_access_key.clone(), self.s3_secret_key.clone(), - file.clone(), + bucket.clone(), )?; - let chunk_stream = read_parquet_file(op, file, None, None, self.batch_size, 0).await?; + let chunk_stream = + read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?; #[for_await] for stream_chunk in chunk_stream { let stream_chunk = stream_chunk?; diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 5c6f526d7b506..2f8fe80ec81f9 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -122,7 +122,7 @@ tracing = "0.1" tracing-futures = { version = "0.2", features = ["futures-03"] } tracing-opentelemetry = { workspace = true } tracing-subscriber = "0.3.17" -twox-hash = "1" +twox-hash = "2" url = "2" uuid = { version = "1", features = ["v4"] } diff --git a/src/common/src/array/bool_array.rs b/src/common/src/array/bool_array.rs index 7ed0e4b703a22..3fe80e127beaf 100644 --- a/src/common/src/array/bool_array.rs +++ b/src/common/src/array/bool_array.rs @@ -257,8 +257,6 @@ mod tests { fn test_bool_array_hash() { use std::hash::BuildHasher; - use twox_hash::RandomXxHashBuilder64; - use super::super::test_util::{hash_finish, test_hash}; const ARR_NUM: usize = 2; @@ -286,7 +284,7 @@ mod tests { .map(|v| helper_test_builder(v.clone())) .collect_vec(); - let hasher_builder = RandomXxHashBuilder64::default(); + let hasher_builder = twox_hash::xxhash64::RandomState::default(); let mut states = vec![hasher_builder.build_hasher(); ARR_LEN]; vecs.iter().for_each(|v| { v.iter() diff --git a/src/common/src/array/decimal_array.rs b/src/common/src/array/decimal_array.rs index e1614cd28bac6..57aa2c5d995d3 100644 --- a/src/common/src/array/decimal_array.rs +++ b/src/common/src/array/decimal_array.rs @@ -70,8 +70,6 @@ mod tests { fn test_decimal_array_hash() { use std::hash::BuildHasher; - use twox_hash::RandomXxHashBuilder64; - use super::super::test_util::{hash_finish, test_hash}; const ARR_NUM: usize = 3; @@ -118,7 +116,7 @@ mod tests { }) .collect_vec(); - let hasher_builder = RandomXxHashBuilder64::default(); + let hasher_builder = twox_hash::xxhash64::RandomState::default(); let mut states = vec![hasher_builder.build_hasher(); ARR_LEN]; vecs.iter().for_each(|v| { v.iter() diff --git a/src/common/src/array/utf8_array.rs b/src/common/src/array/utf8_array.rs index 749a9efe3da13..bf39fba72df9b 100644 --- a/src/common/src/array/utf8_array.rs +++ b/src/common/src/array/utf8_array.rs @@ -332,8 +332,6 @@ mod tests { fn test_utf8_array_hash() { use std::hash::BuildHasher; - use twox_hash::RandomXxHashBuilder64; - use super::super::test_util::{hash_finish, test_hash}; const ARR_NUM: usize = 3; @@ -368,7 +366,7 @@ mod tests { let arrs = vecs.iter().map(Utf8Array::from_iter).collect_vec(); - let hasher_builder = RandomXxHashBuilder64::default(); + let hasher_builder = twox_hash::xxhash64::RandomState::default(); let mut states = vec![hasher_builder.build_hasher(); ARR_LEN]; vecs.iter().for_each(|v| { v.iter() diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 4d3a994631534..7ade9b71799ea 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -382,6 +382,7 @@ macro_rules! impl_system_params_for_test { ret.backup_storage_url = Some("memory".into()); ret.backup_storage_directory = Some("backup".into()); ret.use_new_object_prefix_strategy = Some(false); + ret.time_travel_retention_ms = Some(0); ret } }; diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index 71c82b7ee579b..49b6d9a425276 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -108,10 +108,9 @@ pub fn new_s3_operator( s3_region: String, s3_access_key: String, s3_secret_key: String, - location: String, + bucket: String, ) -> ConnectorResult { // Create s3 builder. - let bucket = extract_bucket(&location); let mut builder = S3::default().bucket(&bucket).region(&s3_region); builder = builder.secret_access_key(&s3_access_key); builder = builder.secret_access_key(&s3_secret_key); @@ -120,8 +119,6 @@ pub fn new_s3_operator( bucket, s3_region )); - builder = builder.disable_config_load(); - let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default()) @@ -130,13 +127,20 @@ pub fn new_s3_operator( Ok(op) } -fn extract_bucket(location: &str) -> String { - let prefix = "s3://"; - let start = prefix.len(); - let end = location[start..] - .find('/') - .unwrap_or(location.len() - start); - location[start..start + end].to_string() +pub fn extract_bucket_and_file_name(location: &str) -> ConnectorResult<(String, String)> { + let url = Url::parse(location)?; + let bucket = url + .host_str() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid s3 url: {}, missing bucket", location), + ) + })? + .to_owned(); + let prefix = format!("s3://{}/", bucket); + let file_name = location[prefix.len()..].to_string(); + Ok((bucket, file_name)) } pub async fn list_s3_directory( @@ -145,14 +149,7 @@ pub async fn list_s3_directory( s3_secret_key: String, dir: String, ) -> Result, anyhow::Error> { - let url = Url::parse(&dir)?; - let bucket = url.host_str().ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Invalid s3 url: {}, missing bucket", dir), - ) - })?; - + let (bucket, file_name) = extract_bucket_and_file_name(&dir)?; let prefix = format!("s3://{}/", bucket); if dir.starts_with(&prefix) { let mut builder = S3::default(); @@ -160,12 +157,16 @@ pub async fn list_s3_directory( .region(&s3_region) .access_key_id(&s3_access_key) .secret_access_key(&s3_secret_key) - .bucket(bucket); + .bucket(&bucket); + builder = builder.endpoint(&format!( + "https://{}.s3.{}.amazonaws.com", + bucket, s3_region + )); let op = Operator::new(builder)? .layer(RetryLayer::default()) .finish(); - op.list(&dir[prefix.len()..]) + op.list(&file_name) .await .map_err(|e| anyhow!(e)) .map(|list| { @@ -197,44 +198,39 @@ pub async fn list_s3_directory( /// Parquet file schema that match the requested schema. If an error occurs during processing, /// it returns an appropriate error. pub fn extract_valid_column_indices( - columns: Option>, + rw_columns: Vec, metadata: &FileMetaData, ) -> ConnectorResult> { - match columns { - Some(rw_columns) => { - let parquet_column_names = metadata - .schema_descr() - .columns() - .iter() - .map(|c| c.name()) - .collect_vec(); + let parquet_column_names = metadata + .schema_descr() + .columns() + .iter() + .map(|c| c.name()) + .collect_vec(); - let converted_arrow_schema = - parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata()) - .map_err(anyhow::Error::from)?; + let converted_arrow_schema = + parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata()) + .map_err(anyhow::Error::from)?; - let valid_column_indices: Vec = rw_columns - .iter() - .filter_map(|column| { - parquet_column_names - .iter() - .position(|&name| name == column.name) - .and_then(|pos| { - let arrow_data_type: &risingwave_common::array::arrow::arrow_schema_udf::DataType = converted_arrow_schema.field(pos).data_type(); - let rw_data_type: &risingwave_common::types::DataType = &column.data_type; + let valid_column_indices: Vec = rw_columns + .iter() + .filter_map(|column| { + parquet_column_names + .iter() + .position(|&name| name == column.name) + .and_then(|pos| { + let arrow_data_type: &risingwave_common::array::arrow::arrow_schema_udf::DataType = converted_arrow_schema.field(pos).data_type(); + let rw_data_type: &risingwave_common::types::DataType = &column.data_type; - if is_parquet_schema_match_source_schema(arrow_data_type, rw_data_type) { - Some(pos) - } else { - None - } - }) - }) - .collect(); - Ok(valid_column_indices) - } - None => Ok(vec![]), - } + if is_parquet_schema_match_source_schema(arrow_data_type, rw_data_type) { + Some(pos) + } else { + None + } + }) + }) + .collect(); + Ok(valid_column_indices) } /// Reads a specified Parquet file and converts its content into a stream of chunks. @@ -258,8 +254,14 @@ pub async fn read_parquet_file( let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?; let file_metadata = parquet_metadata.file_metadata(); - let column_indices = extract_valid_column_indices(rw_columns, file_metadata)?; - let projection_mask = ProjectionMask::leaves(file_metadata.schema_descr(), column_indices); + let projection_mask = match rw_columns { + Some(columns) => { + let column_indices = extract_valid_column_indices(columns, file_metadata)?; + ProjectionMask::leaves(file_metadata.schema_descr(), column_indices) + } + None => ProjectionMask::all(), + }; + // For the Parquet format, we directly convert from a record batch to a stream chunk. // Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file. let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader) @@ -289,7 +291,6 @@ pub async fn read_parquet_file( }) .collect(), }; - let parquet_parser = ParquetParser::new(columns, file_name, offset)?; let msg_stream: Pin< Box> + Send>, diff --git a/src/error/src/tonic.rs b/src/error/src/tonic.rs index 6e200bc054c9f..0c8afc61a276f 100644 --- a/src/error/src/tonic.rs +++ b/src/error/src/tonic.rs @@ -25,6 +25,9 @@ use tonic::metadata::{MetadataMap, MetadataValue}; /// The key of the metadata field that contains the serialized error. const ERROR_KEY: &str = "risingwave-error-bin"; +/// The key of the metadata field that contains the call name. +pub const CALL_KEY: &str = "risingwave-grpc-call"; + /// The service name that the error is from. Used to provide better error message. // TODO: also make it a field of `Extra`? type ServiceName = Cow<'static, str>; @@ -128,6 +131,9 @@ where pub struct TonicStatusWrapper { inner: tonic::Status, + /// The call name (path) of the gRPC request. + call: Option, + /// Optional service name from the client side. /// /// # Explanation @@ -160,8 +166,15 @@ impl TonicStatusWrapper { } } + let call = status + .metadata() + .get(CALL_KEY) + .and_then(|value| value.to_str().ok()) + .map(str::to_owned); + Self { inner: status, + call, client_side_service_name: None, } } @@ -196,6 +209,9 @@ impl std::fmt::Display for TonicStatusWrapper { { write!(f, " to {} service", service_name)?; } + if let Some(call) = &self.call { + write!(f, " (call `{}`)", call)?; + } write!(f, " failed: {}: ", self.inner.code())?; #[expect(rw::format_error)] // intentionally format the source itself diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index b3cf4e2860967..d49b4332b117f 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -21,7 +21,7 @@ use mysql_async::prelude::*; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::types::{DataType, ScalarImpl, StructType}; use risingwave_connector::source::iceberg::{ - get_parquet_fields, list_s3_directory, new_s3_operator, + extract_bucket_and_file_name, get_parquet_fields, list_s3_directory, new_s3_operator, }; pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; @@ -177,23 +177,19 @@ impl TableFunction { let schema = tokio::task::block_in_place(|| { FRONTEND_RUNTIME.block_on(async { + let location = match files.as_ref() { + Some(files) => files[0].clone(), + None => eval_args[5].clone(), + }; + let (bucket, file_name) = extract_bucket_and_file_name(&location)?; let op = new_s3_operator( eval_args[2].clone(), eval_args[3].clone(), eval_args[4].clone(), - match files.as_ref() { - Some(files) => files[0].clone(), - None => eval_args[5].clone(), - }, + bucket.clone(), )?; - let fields = get_parquet_fields( - op, - match files.as_ref() { - Some(files) => files[0].clone(), - None => eval_args[5].clone(), - }, - ) - .await?; + + let fields = get_parquet_fields(op, file_name).await?; let mut rw_types = vec![]; for field in &fields { diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 7c2171e733587..b1ef82f936098 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -2799,11 +2799,6 @@ impl CatalogController { inner.list_all_state_tables().await } - pub async fn list_all_state_table_ids(&self) -> MetaResult> { - let inner = self.inner.read().await; - inner.list_all_state_table_ids().await - } - pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult> { let inner = self.inner.read().await; let table_ids: Vec = Table::find() @@ -3228,6 +3223,10 @@ impl CatalogController { .collect(); Ok(res) } + + pub async fn list_time_travel_table_ids(&self) -> MetaResult> { + self.inner.read().await.list_time_travel_table_ids().await + } } /// `CatalogStats` is a struct to store the statistics of all catalogs. @@ -3360,17 +3359,6 @@ impl CatalogControllerInner { .collect()) } - /// `list_all_tables` return all ids of state tables. - pub async fn list_all_state_table_ids(&self) -> MetaResult> { - let table_ids: Vec = Table::find() - .select_only() - .column(table::Column::TableId) - .into_tuple() - .all(&self.db) - .await?; - Ok(table_ids) - } - /// `list_tables` return all `CREATED` tables, `CREATING` materialized views and internal tables that belong to them. async fn list_tables(&self) -> MetaResult> { let table_objs = Table::find() @@ -3589,6 +3577,21 @@ impl CatalogControllerInner { let _ = tx.send(Err(err.clone())); } } + + pub async fn list_time_travel_table_ids(&self) -> MetaResult> { + let table_ids: Vec = Table::find() + .select_only() + .filter(table::Column::TableType.is_in(vec![ + TableType::Table, + TableType::MaterializedView, + TableType::Index, + ])) + .column(table::Column::TableId) + .into_tuple() + .all(&self.db) + .await?; + Ok(table_ids) + } } async fn update_internal_tables( diff --git a/src/meta/src/controller/scale.rs b/src/meta/src/controller/scale.rs index 65dd58ff1d34e..84c743e437d17 100644 --- a/src/meta/src/controller/scale.rs +++ b/src/meta/src/controller/scale.rs @@ -22,10 +22,12 @@ use risingwave_connector::source::{SplitImpl, SplitMetaData}; use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::actor_dispatcher::DispatcherType; use risingwave_meta_model::fragment::DistributionType; -use risingwave_meta_model::prelude::{Actor, ActorDispatcher, Fragment, StreamingJob}; +use risingwave_meta_model::prelude::{ + Actor, ActorDispatcher, Fragment, Sink, Source, StreamingJob, Table, +}; use risingwave_meta_model::{ - actor, actor_dispatcher, fragment, streaming_job, ActorId, ActorMapping, ActorUpstreamActors, - ConnectorSplits, FragmentId, I32Array, ObjectId, VnodeBitmap, + actor, actor_dispatcher, fragment, sink, source, streaming_job, table, ActorId, ActorMapping, + ActorUpstreamActors, ConnectorSplits, FragmentId, I32Array, ObjectId, VnodeBitmap, }; use risingwave_meta_model_migration::{ Alias, CommonTableExpression, Expr, IntoColumnRef, QueryStatementBuilder, SelectStatement, @@ -165,7 +167,7 @@ pub struct RescheduleWorkingSet { pub fragment_downstreams: HashMap>, pub fragment_upstreams: HashMap>, - pub related_jobs: HashMap, + pub related_jobs: HashMap, } async fn resolve_no_shuffle_query( @@ -192,6 +194,67 @@ where Ok(result) } +async fn resolve_streaming_job_definition( + txn: &C, + job_ids: &HashSet, +) -> MetaResult> +where + C: ConnectionTrait, +{ + let job_ids = job_ids.iter().cloned().collect_vec(); + + // including table, materialized view, index + let common_job_definitions: Vec<(ObjectId, String)> = Table::find() + .select_only() + .columns([ + table::Column::TableId, + #[cfg(not(debug_assertions))] + table::Column::Name, + #[cfg(debug_assertions)] + table::Column::Definition, + ]) + .filter(table::Column::TableId.is_in(job_ids.clone())) + .into_tuple() + .all(txn) + .await?; + + let sink_definitions: Vec<(ObjectId, String)> = Sink::find() + .select_only() + .columns([ + sink::Column::SinkId, + #[cfg(not(debug_assertions))] + sink::Column::Name, + #[cfg(debug_assertions)] + sink::Column::Definition, + ]) + .filter(sink::Column::SinkId.is_in(job_ids.clone())) + .into_tuple() + .all(txn) + .await?; + + let source_definitions: Vec<(ObjectId, String)> = Source::find() + .select_only() + .columns([ + source::Column::SourceId, + #[cfg(not(debug_assertions))] + source::Column::Name, + #[cfg(debug_assertions)] + source::Column::Definition, + ]) + .filter(source::Column::SourceId.is_in(job_ids.clone())) + .into_tuple() + .all(txn) + .await?; + + let definitions: HashMap = common_job_definitions + .into_iter() + .chain(sink_definitions.into_iter()) + .chain(source_definitions.into_iter()) + .collect(); + + Ok(definitions) +} + impl CatalogController { pub async fn resolve_working_set_for_reschedule_fragments( &self, @@ -339,6 +402,9 @@ impl CatalogController { let related_job_ids: HashSet<_> = fragments.values().map(|fragment| fragment.job_id).collect(); + let related_job_definitions = + resolve_streaming_job_definition(txn, &related_job_ids).await?; + let related_jobs = StreamingJob::find() .filter(streaming_job::Column::JobId.is_in(related_job_ids)) .all(txn) @@ -346,7 +412,19 @@ impl CatalogController { let related_jobs = related_jobs .into_iter() - .map(|job| (job.job_id, job)) + .map(|job| { + let job_id = job.job_id; + ( + job_id, + ( + job, + related_job_definitions + .get(&job_id) + .cloned() + .unwrap_or("".to_owned()), + ), + ) + }) .collect(); Ok(RescheduleWorkingSet { diff --git a/src/meta/src/controller/system_param.rs b/src/meta/src/controller/system_param.rs index 5fef056ba0ba2..57fde5cc61ffc 100644 --- a/src/meta/src/controller/system_param.rs +++ b/src/meta/src/controller/system_param.rs @@ -19,7 +19,7 @@ use anyhow::anyhow; use risingwave_common::system_param::common::CommonHandler; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::system_param::{ - check_missing_params, default, derive_missing_fields, set_system_param, + check_missing_params, derive_missing_fields, set_system_param, }; use risingwave_common::{for_all_params, key_of}; use risingwave_meta_model::prelude::SystemParameter; @@ -132,18 +132,6 @@ for_all_params!(impl_system_params_from_db); for_all_params!(impl_merge_params); for_all_params!(impl_system_params_to_models); -fn apply_hard_code_override(params: &mut PbSystemParams) { - if params - .time_travel_retention_ms - .map(|v| v == 0) - .unwrap_or(true) - { - let default_v = default::time_travel_retention_ms(); - tracing::info!("time_travel_retention_ms has been overridden to {default_v}"); - params.time_travel_retention_ms = Some(default_v); - } -} - impl SystemParamsController { pub async fn new( sql_meta_store: SqlMetaStore, @@ -152,8 +140,7 @@ impl SystemParamsController { ) -> MetaResult { let db = sql_meta_store.conn; let params = SystemParameter::find().all(&db).await?; - let mut params = merge_params(system_params_from_db(params)?, init_params); - apply_hard_code_override(&mut params); + let params = merge_params(system_params_from_db(params)?, init_params); tracing::info!(initial_params = ?SystemParamsReader::new(¶ms), "initialize system parameters"); check_missing_params(¶ms).map_err(|e| anyhow!(e))?; let ctl = Self { diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index e56a79307b042..0407682d0a1ad 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -247,12 +247,6 @@ impl HummockManager { .time_travel_snapshot_interval_counter .saturating_add(1); } - let group_parents = version - .latest_version() - .levels - .values() - .map(|g| (g.group_id, g.parent_group_id)) - .collect(); let time_travel_tables_to_commit = table_compaction_group_mapping .iter() @@ -261,13 +255,22 @@ impl HummockManager { .get(table_id) .map(|committed_epoch| (table_id, cg_id, *committed_epoch)) }); + let time_travel_table_ids: HashSet<_> = self + .metadata_manager + .catalog_controller + .list_time_travel_table_ids() + .await + .map_err(|e| Error::Internal(e.into()))? + .into_iter() + .map(|id| id.try_into().unwrap()) + .collect(); let mut txn = self.env.meta_store_ref().conn.begin().await?; let version_snapshot_sst_ids = self .write_time_travel_metadata( &txn, time_travel_version, time_travel_delta, - &group_parents, + time_travel_table_ids, &versioning.last_time_travel_snapshot_sst_ids, time_travel_tables_to_commit, ) diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index e33d9a3e148a8..3e333ecafb94c 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -16,8 +16,9 @@ use std::collections::{HashMap, HashSet, VecDeque}; use anyhow::anyhow; use risingwave_common::catalog::TableId; +use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::Epoch; -use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; +use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::time_travel::{ refill_version, IncompleteHummockVersion, IncompleteHummockVersionDelta, @@ -380,20 +381,19 @@ impl HummockManager { txn: &DatabaseTransaction, version: Option<&HummockVersion>, delta: HummockVersionDelta, - group_parents: &HashMap, + time_travel_table_ids: HashSet, skip_sst_ids: &HashSet, tables_to_commit: impl Iterator, ) -> Result>> { - let select_groups = group_parents - .iter() - .filter_map(|(cg_id, _)| { - if should_ignore_group(find_root_group(*cg_id, group_parents)) { - None - } else { - Some(*cg_id) - } - }) - .collect::>(); + if self + .env + .system_params_reader() + .await + .time_travel_retention_ms() + == 0 + { + return Ok(None); + } async fn write_sstable_infos( mut sst_infos: impl Iterator, txn: &DatabaseTransaction, @@ -428,10 +428,7 @@ impl HummockManager { Ok(count) } - for (table_id, cg_id, committed_epoch) in tables_to_commit { - if !select_groups.contains(cg_id) { - continue; - } + for (table_id, _cg_id, committed_epoch) in tables_to_commit { let version_id: u64 = delta.id.to_u64(); let m = hummock_epoch_to_version::ActiveModel { epoch: Set(committed_epoch.try_into().unwrap()), @@ -446,16 +443,28 @@ impl HummockManager { let mut version_sst_ids = None; if let Some(version) = version { + // `version_sst_ids` is used to update `last_time_travel_snapshot_sst_ids`. version_sst_ids = Some( version - .get_sst_infos_from_groups(&select_groups) - .map(|s| s.sst_id) + .get_sst_infos() + .filter_map(|s| { + if s.table_ids + .iter() + .any(|tid| time_travel_table_ids.contains(tid)) + { + return Some(s.sst_id); + } + None + }) .collect(), ); write_sstable_infos( - version - .get_sst_infos_from_groups(&select_groups) - .filter(|s| !skip_sst_ids.contains(&s.sst_id)), + version.get_sst_infos().filter(|s| { + !skip_sst_ids.contains(&s.sst_id) + && s.table_ids + .iter() + .any(|tid| time_travel_table_ids.contains(tid)) + }), txn, self.env.opts.hummock_time_travel_sst_info_insert_batch_size, ) @@ -465,9 +474,11 @@ impl HummockManager { version.id.to_u64(), ) .unwrap()), - version: Set((&IncompleteHummockVersion::from((version, &select_groups)) - .to_protobuf()) - .into()), + version: Set( + (&IncompleteHummockVersion::from((version, &time_travel_table_ids)) + .to_protobuf()) + .into(), + ), }; hummock_time_travel_version::Entity::insert(m) .on_conflict_do_nothing() @@ -475,9 +486,12 @@ impl HummockManager { .await?; } let written = write_sstable_infos( - delta - .newly_added_sst_infos(Some(&select_groups)) - .filter(|s| !skip_sst_ids.contains(&s.sst_id)), + delta.newly_added_sst_infos().filter(|s| { + !skip_sst_ids.contains(&s.sst_id) + && s.table_ids + .iter() + .any(|tid| time_travel_table_ids.contains(tid)) + }), txn, self.env.opts.hummock_time_travel_sst_info_insert_batch_size, ) @@ -491,7 +505,7 @@ impl HummockManager { .unwrap()), version_delta: Set((&IncompleteHummockVersionDelta::from(( &delta, - &select_groups, + &time_travel_table_ids, )) .to_protobuf()) .into()), @@ -531,26 +545,6 @@ fn replay_archive( last_version } -fn find_root_group( - group_id: CompactionGroupId, - parents: &HashMap, -) -> CompactionGroupId { - let mut root = group_id; - while let Some(parent) = parents.get(&root) - && *parent != 0 - { - root = *parent; - } - root -} - -fn should_ignore_group(root_group_id: CompactionGroupId) -> bool { - // It is possible some intermediate groups has been dropped, - // so it's impossible to tell whether the root group is MaterializedView or not. - // Just treat them as MaterializedView for correctness. - root_group_id == StaticCompactionGroupId::StateDefault as CompactionGroupId -} - pub fn require_sql_meta_store_err() -> Error { Error::TimeTravel(anyhow!("require SQL meta store")) } diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 2da8c0fae3e1b..578ee101d0f27 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -590,6 +590,9 @@ impl ScaleController { vnode_bitmap, } = actors.first().unwrap().clone(); + let (related_job, job_definition) = + related_jobs.get(&job_id).expect("job not found"); + let fragment = CustomFragmentInfo { fragment_id: fragment_id as _, fragment_type_mask: fragment_type_mask as _, @@ -603,8 +606,7 @@ impl ScaleController { dispatcher, upstream_actor_id, vnode_bitmap: vnode_bitmap.map(|b| b.to_protobuf()), - // todo, we need to fill this part - mview_definition: "".to_owned(), + mview_definition: job_definition.to_owned(), expr_context: expr_contexts .get(&actor_id) .cloned() @@ -617,8 +619,6 @@ impl ScaleController { fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32)); - let related_job = related_jobs.get(&job_id).expect("job not found"); - fragment_state.insert( fragment_id, table_fragments::PbState::from(related_job.job_status), diff --git a/src/rpc_client/src/tracing.rs b/src/rpc_client/src/channel.rs similarity index 59% rename from src/rpc_client/src/tracing.rs rename to src/rpc_client/src/channel.rs index aab07d43225d4..375b487d9c31f 100644 --- a/src/rpc_client/src/tracing.rs +++ b/src/rpc_client/src/channel.rs @@ -15,21 +15,26 @@ use std::task::{Context, Poll}; use futures::Future; +use http::HeaderValue; use risingwave_common::util::tracing::TracingContext; use tonic::body::BoxBody; use tower::Service; -/// A service wrapper that injects the [`TracingContext`] obtained from the current tracing span -/// into the HTTP headers of the request. +/// A service wrapper that hacks the gRPC request and response for observability. /// -/// See also `TracingExtract` in the `common_service` crate. +/// - Inject the [`TracingContext`] obtained from the current tracing span into the HTTP headers of the request. +/// The server can then extract the [`TracingContext`] from the HTTP headers with the `TracingExtract` middleware. +/// See also `TracingExtract` in the `common_service` crate. +/// +/// - Add the path of the request (indicating the gRPC call) to the response headers. The error reporting can then +/// include the gRPC call name in the message. #[derive(Clone, Debug)] -pub struct TracingInjectChannel { +pub struct WrappedChannel { inner: tonic::transport::Channel, } #[cfg(not(madsim))] -impl Service> for TracingInjectChannel { +impl Service> for WrappedChannel { type Error = tonic::transport::Error; type Response = http::Response; @@ -47,31 +52,38 @@ impl Service> for TracingInjectChannel { let mut inner = std::mem::replace(&mut self.inner, clone); async move { + let path = req.uri().path().to_owned(); + let headers = TracingContext::from_current_span().to_http_headers(); req.headers_mut().extend(headers); - inner.call(req).await + + let mut response = inner.call(req).await; + + if let Ok(response) = &mut response { + if let Ok(path) = HeaderValue::from_str(&path) { + response + .headers_mut() + .insert(risingwave_error::tonic::CALL_KEY, path); + } + } + + response } } } -/// A wrapper around tonic's `Channel` that injects the [`TracingContext`] obtained from the current -/// tracing span when making gRPC requests. #[cfg(not(madsim))] -pub type Channel = TracingInjectChannel; +pub type Channel = WrappedChannel; #[cfg(madsim)] pub type Channel = tonic::transport::Channel; -/// An extension trait for tonic's `Channel` that wraps it into a [`TracingInjectChannel`]. -#[easy_ext::ext(TracingInjectedChannelExt)] +/// An extension trait for tonic's `Channel` that wraps it into a [`WrappedChannel`]. +#[easy_ext::ext(WrappedChannelExt)] impl tonic::transport::Channel { - /// Wraps the channel into a [`TracingInjectChannel`], so that the [`TracingContext`] obtained - /// from the current tracing span is injected into the HTTP headers of the request. - /// - /// The server can then extract the [`TracingContext`] from the HTTP headers with the - /// `TracingExtract` middleware. - pub fn tracing_injected(self) -> Channel { + /// Wraps the channel into a [`WrappedChannel`] for observability. + pub fn wrapped(self) -> Channel { #[cfg(not(madsim))] - return TracingInjectChannel { inner: self }; + return WrappedChannel { inner: self }; #[cfg(madsim)] return self; } diff --git a/src/rpc_client/src/frontend_client.rs b/src/rpc_client/src/frontend_client.rs index 4d72bbb76d4c4..166edfeee36b4 100644 --- a/src/rpc_client/src/frontend_client.rs +++ b/src/rpc_client/src/frontend_client.rs @@ -25,8 +25,8 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tonic::transport::Endpoint; use tonic::Response; +use crate::channel::{Channel, WrappedChannelExt}; use crate::error::Result; -use crate::tracing::{Channel, TracingInjectedChannelExt}; use crate::{RpcClient, RpcClientPool}; const DEFAULT_RETRY_INTERVAL: u64 = 50; @@ -49,7 +49,7 @@ impl FrontendClient { }, ) .await? - .tracing_injected(); + .wrapped(); Ok(Self( FrontendServiceClient::new(channel).max_decoding_message_size(usize::MAX), diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 12250ff3d5aa2..07ef650050406 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -52,6 +52,7 @@ pub mod error; use error::Result; +mod channel; mod compactor_client; mod compute_client; mod connector_client; @@ -60,7 +61,6 @@ mod hummock_meta_client; mod meta_client; mod sink_coordinate_client; mod stream_client; -mod tracing; pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient}; pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef}; diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 3b94b3946de70..a1e534de54944 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -105,10 +105,10 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::transport::Endpoint; use tonic::{Code, Request, Streaming}; +use crate::channel::{Channel, WrappedChannelExt}; use crate::error::{Result, RpcError}; use crate::hummock_meta_client::{CompactionEventItem, HummockMetaClient}; use crate::meta_rpc_client_method_impl; -use crate::tracing::{Channel, TracingInjectedChannelExt}; type ConnectionId = u32; type DatabaseId = u32; @@ -2033,7 +2033,7 @@ impl GrpcMetaClient { .connect_timeout(Duration::from_secs(5)) .monitored_connect("grpc-meta-client", Default::default()) .await? - .tracing_injected(); + .wrapped(); Ok(channel) } diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index 9b83ab82d7fef..f37cdf4b93681 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -28,8 +28,8 @@ use risingwave_pb::stream_service::*; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::transport::Endpoint; +use crate::channel::{Channel, WrappedChannelExt}; use crate::error::{Result, RpcError}; -use crate::tracing::{Channel, TracingInjectedChannelExt}; use crate::{stream_rpc_client_method_impl, RpcClient, RpcClientPool, UnboundedBidiStreamHandle}; #[derive(Clone)] @@ -55,7 +55,7 @@ impl StreamClient { }, ) .await? - .tracing_injected(); + .wrapped(); Ok(Self( StreamServiceClient::new(channel).max_decoding_message_size(usize::MAX), diff --git a/src/storage/backup/Cargo.toml b/src/storage/backup/Cargo.toml index 3fd7d7ecde5ce..23a2c99567c48 100644 --- a/src/storage/backup/Cargo.toml +++ b/src/storage/backup/Cargo.toml @@ -29,7 +29,7 @@ risingwave_pb = { workspace = true } serde = { version = "1", features = ["derive"] } serde_json = "1" thiserror = "1" -twox-hash = "1" +twox-hash = "2" [dev-dependencies] risingwave_hummock_sdk = { workspace = true } diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index 847efba5f26be..93f18516ef934 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -31,7 +31,6 @@ pub mod meta_snapshot_v2; pub mod storage; use std::collections::{HashMap, HashSet}; -use std::hash::Hasher; use itertools::Itertools; use risingwave_common::RW_VERSION; @@ -92,9 +91,7 @@ pub struct MetaSnapshotManifest { } pub fn xxhash64_checksum(data: &[u8]) -> u64 { - let mut hasher = twox_hash::XxHash64::with_seed(0); - hasher.write(data); - hasher.finish() + twox_hash::XxHash64::oneshot(0, data) } pub fn xxhash64_verify(data: &[u8], checksum: u64) -> BackupResult<()> { diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index fc8514eac8119..591b4f956b008 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -84,35 +84,6 @@ impl HummockVersion { .map(|s| s.sst_id) } - /// `get_sst_infos_from_groups` doesn't guarantee that all returned sst info belongs to `select_group`. - /// i.e. `select_group` is just a hint. - /// We separate `get_sst_infos_from_groups` and `get_sst_infos` because `get_sst_infos_from_groups` may be further customized in the future. - pub fn get_sst_infos_from_groups<'a>( - &'a self, - select_group: &'a HashSet, - ) -> impl Iterator + 'a { - self.levels - .iter() - .filter_map(|(cg_id, level)| { - if select_group.contains(cg_id) { - Some(level) - } else { - None - } - }) - .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter())) - .flat_map(|level| level.table_infos.iter()) - .chain(self.table_change_log.values().flat_map(|change_log| { - // TODO: optimization: strip table change log - change_log.0.iter().flat_map(|epoch_change_log| { - epoch_change_log - .old_value - .iter() - .chain(epoch_change_log.new_value.iter()) - }) - })) - } - pub fn level_iter bool>( &self, compaction_group_id: CompactionGroupId, diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index 4f2508a5772b7..d088d3e2ae843 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -16,9 +16,10 @@ use std::collections::{HashMap, HashSet}; use risingwave_pb::hummock::hummock_version::PbLevels; use risingwave_pb::hummock::hummock_version_delta::{PbChangeLogDelta, PbGroupDeltas}; -use risingwave_pb::hummock::{PbEpochNewChangeLog, PbSstableInfo}; +use risingwave_pb::hummock::{group_delta, PbEpochNewChangeLog, PbLevel, PbSstableInfo}; use crate::change_log::{TableChangeLog, TableChangeLogCommon}; +use crate::compaction_group::StateTableId; use crate::level::Level; use crate::sstable_info::SstableInfo; use crate::version::{ @@ -89,39 +90,42 @@ fn refill_sstable_info( } /// `SStableInfo` will be stripped. -impl From<(&HummockVersion, &HashSet)> for IncompleteHummockVersion { - fn from(p: (&HummockVersion, &HashSet)) -> Self { - let (version, select_group) = p; +impl From<(&HummockVersion, &HashSet)> for IncompleteHummockVersion { + fn from(p: (&HummockVersion, &HashSet)) -> Self { + let (version, time_travel_table_ids) = p; #[expect(deprecated)] Self { id: version.id, levels: version .levels .iter() - .filter_map(|(group_id, levels)| { - if select_group.contains(group_id) { - Some(( - *group_id as CompactionGroupId, - PbLevels::from(levels).into(), - )) - } else { - None - } + .map(|(group_id, levels)| { + let pblevels = rewrite_levels(PbLevels::from(levels), time_travel_table_ids); + (*group_id as CompactionGroupId, pblevels.into()) }) .collect(), max_committed_epoch: version.max_committed_epoch, table_watermarks: version.table_watermarks.clone(), - // TODO: optimization: strip table change log based on select_group table_change_log: version .table_change_log .iter() - .map(|(table_id, change_log)| { + .filter_map(|(table_id, change_log)| { + if !time_travel_table_ids.contains(&table_id.table_id()) { + return None; + } + debug_assert!(change_log.0.iter().all(|d| { + d.new_value.iter().chain(d.old_value.iter()).all(|s| { + s.table_ids + .iter() + .any(|tid| time_travel_table_ids.contains(tid)) + }) + })); let incomplete_table_change_log = change_log .0 .iter() .map(|e| PbEpochNewChangeLog::from(e).into()) .collect(); - (*table_id, TableChangeLogCommon(incomplete_table_change_log)) + Some((*table_id, TableChangeLogCommon(incomplete_table_change_log))) }) .collect(), state_table_info: version.state_table_info.clone(), @@ -129,15 +133,37 @@ impl From<(&HummockVersion, &HashSet)> for IncompleteHummockV } } +/// Removes SST refs that don't contain any of `time_travel_table_ids`. +fn rewrite_levels(mut levels: PbLevels, time_travel_table_ids: &HashSet) -> PbLevels { + fn rewrite_level(level: &mut PbLevel, time_travel_table_ids: &HashSet) { + // The stats like `total_file_size` are not updated accordingly since they won't be used in time travel query. + level.table_infos.retain(|sst| { + sst.table_ids + .iter() + .any(|tid| time_travel_table_ids.contains(tid)) + }); + } + for level in &mut levels.levels { + rewrite_level(level, time_travel_table_ids); + } + if let Some(l0) = levels.l0.as_mut() { + for sub_level in &mut l0.sub_levels { + rewrite_level(sub_level, time_travel_table_ids); + } + l0.sub_levels.retain(|s| !s.table_infos.is_empty()); + } + levels +} + /// [`IncompleteHummockVersionDelta`] is incomplete because `SSTableInfo` only has the `sst_id` set in the following fields: /// - `PbGroupDeltas` /// - `ChangeLogDelta` pub type IncompleteHummockVersionDelta = HummockVersionDeltaCommon; /// `SStableInfo` will be stripped. -impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHummockVersionDelta { - fn from(p: (&HummockVersionDelta, &HashSet)) -> Self { - let (delta, select_group) = p; +impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHummockVersionDelta { + fn from(p: (&HummockVersionDelta, &HashSet)) -> Self { + let (delta, time_travel_table_ids) = p; #[expect(deprecated)] Self { id: delta.id, @@ -145,29 +171,63 @@ impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHum group_deltas: delta .group_deltas .iter() - .filter_map(|(cg_id, deltas)| { - if select_group.contains(cg_id) { - Some((*cg_id, PbGroupDeltas::from(deltas).into())) - } else { - None - } + .map(|(cg_id, deltas)| { + let pb_group_deltas = + rewrite_group_deltas(PbGroupDeltas::from(deltas), time_travel_table_ids); + (*cg_id, pb_group_deltas.into()) }) .collect(), max_committed_epoch: delta.max_committed_epoch, trivial_move: delta.trivial_move, new_table_watermarks: delta.new_table_watermarks.clone(), removed_table_ids: delta.removed_table_ids.clone(), - // TODO: optimization: strip table change log based on select_group change_log_delta: delta .change_log_delta .iter() - .map(|(table_id, log_delta)| (*table_id, PbChangeLogDelta::from(log_delta).into())) + .filter_map(|(table_id, log_delta)| { + if !time_travel_table_ids.contains(&table_id.table_id()) { + return None; + } + debug_assert!(log_delta + .new_log + .as_ref() + .map(|d| { + d.new_value.iter().chain(d.old_value.iter()).all(|s| { + s.table_ids + .iter() + .any(|tid| time_travel_table_ids.contains(tid)) + }) + }) + .unwrap_or(true)); + Some((*table_id, PbChangeLogDelta::from(log_delta).into())) + }) .collect(), state_table_info_delta: delta.state_table_info_delta.clone(), } } } +/// Removes SST refs that don't contain any of `time_travel_table_ids`. +fn rewrite_group_deltas( + mut group_deltas: PbGroupDeltas, + time_travel_table_ids: &HashSet, +) -> PbGroupDeltas { + for group_delta in &mut group_deltas.group_deltas { + let Some(group_delta::DeltaType::NewL0SubLevel(new_sub_level)) = + &mut group_delta.delta_type + else { + tracing::error!(?group_delta, "unexpected delta type"); + continue; + }; + new_sub_level.inserted_table_infos.retain(|sst| { + sst.table_ids + .iter() + .any(|tid| time_travel_table_ids.contains(tid)) + }); + } + group_deltas +} + pub struct SstableIdInVersion { sst_id: HummockSstableId, object_id: HummockSstableObjectId, diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 6266ee84474b3..7f4b4dafb1bf7 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -524,32 +524,20 @@ where /// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`, /// but it is possible that the object is moved or split from other compaction groups or levels. pub fn newly_added_object_ids(&self) -> HashSet { - self.newly_added_sst_infos(None) + self.newly_added_sst_infos() .map(|sst| sst.object_id()) .collect() } pub fn newly_added_sst_ids(&self) -> HashSet { - self.newly_added_sst_infos(None) + self.newly_added_sst_infos() .map(|sst| sst.sst_id()) .collect() } - pub fn newly_added_sst_infos<'a>( - &'a self, - select_group: Option<&'a HashSet>, - ) -> impl Iterator + 'a { + pub fn newly_added_sst_infos(&self) -> impl Iterator { self.group_deltas - .iter() - .filter_map(move |(cg_id, group_deltas)| { - if let Some(select_group) = select_group - && !select_group.contains(cg_id) - { - None - } else { - Some(group_deltas) - } - }) + .values() .flat_map(|group_deltas| { group_deltas.group_deltas.iter().flat_map(|group_delta| { let sst_slice = match &group_delta { diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index fe73710ba76de..96dd4bd3f5818 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -13,10 +13,13 @@ // limitations under the License. use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; use std::time::Duration; use anyhow::anyhow; use either::Either; +use futures::stream::BoxStream; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::array::ArrayRef; @@ -38,6 +41,7 @@ use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{mpsc, oneshot}; use tokio::time::Instant; +use tracing::Instrument; use super::executor_core::StreamSourceCore; use super::{ @@ -46,6 +50,7 @@ use super::{ }; use crate::common::rate_limit::limited_chunk_size; use crate::executor::prelude::*; +use crate::executor::source::get_infinite_backoff_strategy; use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::UpdateMutation; @@ -474,6 +479,8 @@ impl SourceExecutor { }; core.split_state_store.init_epoch(first_epoch).await?; + // initial_dispatch_num is 0 means the source executor doesn't have downstream jobs + // and is newly created let mut is_uninitialized = self.actor_ctx.initial_dispatch_num == 0; for ele in &mut boot_state { if let Some(recover_state) = core @@ -500,20 +507,134 @@ impl SourceExecutor { let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); tracing::debug!(state = ?recover_state, "start with state"); - let (source_chunk_reader, latest_splits) = self - .build_stream_source_reader( - &source_desc, - recover_state, - // For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data. - // It's highly probable that the work of scanning historical data cannot be shared, - // so don't waste work on it. - // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 - // Note that shared CDC source is special. It already starts from latest. - self.is_shared_non_cdc && is_uninitialized, + + let mut received_resume_during_build = false; + let mut barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); + + // Build the source stream reader. + let (source_chunk_reader, latest_splits) = if is_uninitialized { + tracing::info!("source uninitialized, build source stream reader w/o retry."); + let (source_chunk_reader, latest_splits) = self + .build_stream_source_reader( + &source_desc, + recover_state, + // For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data. + // It's highly probable that the work of scanning historical data cannot be shared, + // so don't waste work on it. + // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 + // Note that shared CDC source is special. It already starts from latest. + self.is_shared_non_cdc, + ) + .instrument_await("source_build_reader") + .await?; + ( + source_chunk_reader.map_err(StreamExecutorError::connector_error), + latest_splits, ) - .instrument_await("source_build_reader") - .await?; - let source_chunk_reader = source_chunk_reader.map_err(StreamExecutorError::connector_error); + } else { + tracing::info!("source initialized, build source stream reader with retry."); + // Build the source stream reader with retry during recovery. + // We only build source stream reader with retry during recovery, + // because we can rely on the persisted source states to recover the source stream + // and can avoid the potential race with "seek to latest" + // https://github.com/risingwavelabs/risingwave/issues/19681#issuecomment-2532183002 + let mut reader_and_splits: Option<(BoxChunkSourceStream, Option>)> = + None; + let source_reader = source_desc.source.clone(); + let (column_ids, source_ctx) = self.prepare_source_stream_build(&source_desc); + let source_ctx = Arc::new(source_ctx); + let mut build_source_stream_fut = Box::pin(async move { + let backoff = get_infinite_backoff_strategy(); + tokio_retry::Retry::spawn(backoff, || async { + match source_reader + .build_stream( + recover_state.clone(), + column_ids.clone(), + source_ctx.clone(), + false, // not need to seek to latest since source state is initialized + ) + .await { + Ok((stream, latest_splits)) => Ok((stream, latest_splits)), + Err(e) => { + tracing::warn!(error = %e.as_report(), "failed to build source stream, retrying..."); + Err(e) + } + } + }) + .instrument(tracing::info_span!("build_source_stream_with_retry")) + .await + .expect("Retry build source stream until success.") + }); + + // loop to create source stream until success + loop { + if let Some(barrier) = build_source_stream_and_poll_barrier( + &mut barrier_stream, + &mut reader_and_splits, + &mut build_source_stream_fut, + ) + .await? + { + if let Message::Barrier(barrier) = barrier { + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Throttle(actor_to_apply) => { + if let Some(new_rate_limit) = + actor_to_apply.get(&self.actor_ctx.id) + && *new_rate_limit != self.rate_limit_rps + { + tracing::info!( + "updating rate limit from {:?} to {:?}", + self.rate_limit_rps, + *new_rate_limit + ); + + // update the rate limit option, we will apply the rate limit + // when we finish building the source stream. + self.rate_limit_rps = *new_rate_limit; + } + } + Mutation::Resume => { + // We record the Resume mutation here and postpone the resume of the source stream + // after we have successfully built the source stream. + received_resume_during_build = true; + } + _ => { + // ignore other mutations and output a warn log + tracing::warn!( + "Received a mutation {:?} to be ignored, because we only handle Throttle and Resume before + finish building source stream.", + mutation + ); + } + } + } + + // bump state store epoch + let _ = self.persist_state_and_clear_cache(barrier.epoch).await?; + yield Message::Barrier(barrier); + } else { + unreachable!( + "Only barrier message is expected when building source stream." + ); + } + } else { + assert!(reader_and_splits.is_some()); + tracing::info!("source stream created successfully"); + break; + } + } + let (source_chunk_reader, latest_splits) = + reader_and_splits.expect("source chunk reader and splits must be created"); + + ( + apply_rate_limit(source_chunk_reader, self.rate_limit_rps) + .boxed() + .map_err(StreamExecutorError::connector_error), + latest_splits, + ) + }; + if let Some(latest_splits) = latest_splits { // make sure it is written to state table later. // Then even it receives no messages, we can observe it in state table. @@ -525,13 +646,12 @@ impl SourceExecutor { } // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. - let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); let mut stream = StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); let mut command_paused = false; // - If the first barrier requires us to pause on startup, pause the stream. - if is_pause_on_startup { + if is_pause_on_startup && !received_resume_during_build { tracing::info!("source paused on startup"); stream.pause_stream(); command_paused = true; @@ -746,6 +866,29 @@ impl SourceExecutor { } } +async fn build_source_stream_and_poll_barrier( + barrier_stream: &mut BoxStream<'static, StreamExecutorResult>, + reader_and_splits: &mut Option<(BoxChunkSourceStream, Option>)>, + build_future: &mut Pin< + Box>)>>, + >, +) -> StreamExecutorResult> { + if reader_and_splits.is_some() { + return Ok(None); + } + + tokio::select! { + biased; + build_ret = &mut *build_future => { + *reader_and_splits = Some(build_ret); + Ok(None) + } + msg = barrier_stream.next() => { + msg.transpose() + } + } +} + impl Execute for SourceExecutor { fn execute(self: Box) -> BoxedMessageStream { if self.stream_source_core.is_some() {