Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(secret): introduce secret management #17456

Merged
merged 22 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 20 additions & 37 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ env_scripts = [

set_env ENABLE_TELEMETRY "false"
set_env RW_TELEMETRY_TYPE "test"
set_env RW_SECRET_STORE_PRIVATE_KEY_HEX "0123456789abcdef"
set_env RW_TEMP_SECRET_FILE_DIR "${PREFIX_SECRET}"

is_sanitizer_enabled = get_env ENABLE_SANITIZER
is_hdfs_backend = get_env ENABLE_HDFS
Expand Down Expand Up @@ -144,6 +146,7 @@ rm -rf "${PREFIX_DATA}"
rm -rf "${PREFIX_LOG}"
rm -rf "${PREFIX_CONFIG}"
rm -rf "${PREFIX_PROFILING}"
rm -rf "${PREFIX_SECRET}"
'''

[tasks.reset-rw]
Expand Down
2 changes: 2 additions & 0 deletions ci/scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export MCLI_DOWNLOAD_BIN=https://rw-ci-deps-dist.s3.amazonaws.com/mc
export GCLOUD_DOWNLOAD_TGZ=https://rw-ci-deps-dist.s3.amazonaws.com/google-cloud-cli-475.0.0-linux-x86_64.tar.gz
export NEXTEST_HIDE_PROGRESS_BAR=true
export RW_TELEMETRY_TYPE=test
export RW_SECRET_STORE_PRIVATE_KEY_HEX="0123456789abcdef"

unset LANG
if [ -n "${BUILDKITE_COMMIT:-}" ]; then
export GIT_SHA=$BUILDKITE_COMMIT
Expand Down
10 changes: 9 additions & 1 deletion e2e_test/sink/deltalake_rust_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float,
statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;

statement ok
CREATE SECRET deltalake_s3_secret_key WITH (
backend = 'meta'
) as 'hummockadmin';

statement ok
create sink s6 as select * from mv6
with (
Expand All @@ -12,7 +17,7 @@ with (
force_append_only = 'true',
location = 's3a://deltalake/deltalake-test',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.secret.key = secret deltalake_s3_secret_key,
s3.endpoint = 'http://127.0.0.1:9301'
);

Expand All @@ -25,6 +30,9 @@ FLUSH;
statement ok
DROP SINK s6;

statement ok
DROP SECRET deltalake_s3_secret_key;

statement ok
DROP MATERIALIZED VIEW mv6;

Expand Down
18 changes: 14 additions & 4 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,25 @@ CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar);
statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;

statement ok
CREATE SECRET iceberg_s3_access_key WITH (
backend = 'meta'
) as 'hummockadmin';

statement ok
CREATE SECRET iceberg_s3_secret_key WITH (
backend = 'meta'
) as 'hummockadmin';

statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'v1',
warehouse.path = 's3a://iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.access.key = secret iceberg_s3_access_key,
s3.secret.key = secret iceberg_s3_secret_key,
s3.region = 'us-east-1',
catalog.name = 'demo',
catalog.type = 'storage',
Expand All @@ -25,8 +35,8 @@ CREATE SOURCE iceberg_demo_source WITH (
connector = 'iceberg',
warehouse.path = 's3a://iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.access.key = secret iceberg_s3_access_key,
s3.secret.key = secret iceberg_s3_secret_key,
s3.region = 'us-east-1',
catalog.name = 'demo',
catalog.type = 'storage',
Expand Down
24 changes: 21 additions & 3 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@ mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_create.sql
system ok
mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_init_data.sql

statement ok
create secret mysql_pwd with (
backend = 'meta'
) as '${MYSQL_PWD:}';

# create a cdc source job, which format fixed to `FORMAT PLAIN ENCODE JSON`
statement ok
create source mysql_mytest with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'rwcdc',
password = '${MYSQL_PWD:}',
password = secret mysql_pwd,
database.name = 'mytest',
server.id = '5601'
);
Expand Down Expand Up @@ -48,6 +53,9 @@ from mysql_mytest table 'mytest.products';
# sleep to ensure (default,'Milk','Milk is a white liquid food') is consumed from Debezium message instead of backfill.
sleep 10s

statement error Permission denied
drop secret mysql_pwd;

system ok
mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES (default,'Milk','Milk is a white liquid food');
INSERT INTO orders VALUES (default, '2023-11-28 15:08:22', 'Bob', 10.52, 100, false);"
Expand Down Expand Up @@ -190,13 +198,23 @@ SELECT c_tinyint, c_smallint, c_mediumint, c_integer, c_bigint, c_decimal, c_flo
-128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b 1001-01-01 00:00:00 1998-01-01 00:00:00 1970-01-01 00:00:01+00:00
NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d 1001-01-01 NULL 2000-01-01 00:00:00 NULL

statement ok
create secret pg_pwd with (
backend = 'meta'
) as '${PGPASSWORD:}';

statement ok
create secret pg_username with (
backend = 'meta'
) as '${PGUSER:$USER}';

statement ok
create source pg_source with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
username = secret pg_username,
password = secret pg_pwd,
database.name = '${PGDATABASE:postgres}',
slot.name = 'pg_slot'
);
Expand Down
5 changes: 5 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,8 @@ message Secret {
uint32 owner = 5;
uint32 schema_id = 6;
}

message OptionsWithSecret {
map<string, string> options = 1;
map<string, secret.SecretRef> secret_refs = 2;
}
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ message AddWorkerNodeResponse {
reserved "system_params";
common.Status status = 1;
optional uint32 node_id = 2;
string cluster_id = 4;
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
}

message ActivateWorkerNodeRequest {
Expand Down
12 changes: 8 additions & 4 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use risingwave_connector::source::reader::reader::SourceReader;
use risingwave_connector::source::{
ConnectorProperties, SourceColumnDesc, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
};
use risingwave_connector::WithOptionsSecResolved;
use risingwave_pb::batch_plan::plan_node::NodeBody;

use super::Executor;
Expand Down Expand Up @@ -64,12 +65,15 @@ impl BoxedExecutorBuilder for SourceExecutor {
)?;

// prepare connector source
let source_props = source_node.with_properties.clone();
let config =
ConnectorProperties::extract(source_props, false).map_err(BatchError::connector)?;
let options_with_secret = WithOptionsSecResolved::new(
source_node.with_properties.clone(),
source_node.secret_refs.clone(),
);
let config = ConnectorProperties::extract(options_with_secret.clone(), false)
.map_err(BatchError::connector)?;

let info = source_node.get_info().unwrap();
let parser_config = SpecificParserConfig::new(info, &source_node.with_properties)?;
let parser_config = SpecificParserConfig::new(info, &options_with_secret)?;

let columns: Vec<_> = source_node
.columns
Expand Down
1 change: 1 addition & 0 deletions src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ fn mock_from_legacy_type(
format,
encode: SinkEncode::Json,
options: Default::default(),
secret_refs: Default::default(),
key_encode: None,
}))
} else {
Expand Down
3 changes: 3 additions & 0 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ mod test {
heap_profiling_dir: None,
dangerous_max_idle_secs: None,
connector_rpc_endpoint: None,
temp_secret_file_dir: None,
},
),
compute_opts: Some(
Expand All @@ -375,6 +376,7 @@ mod test {
async_stack_trace: None,
heap_profiling_dir: None,
connector_rpc_endpoint: None,
temp_secret_file_dir: None,
},
),
frontend_opts: Some(
Expand All @@ -391,6 +393,7 @@ mod test {
config_path: "src/config/test.toml",
metrics_level: None,
enable_barrier_read: None,
temp_secret_file_dir: None,
},
),
compactor_opts: None,
Expand Down
1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ risingwave-fields-derive = { path = "./fields-derive" }
risingwave_common_estimate_size = { workspace = true }
risingwave_common_metrics = { path = "./metrics" }
risingwave_common_proc_macro = { workspace = true }
risingwave_common_secret = { path = "./secret" }
risingwave_error = { workspace = true }
risingwave_license = { workspace = true }
risingwave_pb = { workspace = true }
Expand Down
29 changes: 29 additions & 0 deletions src/common/secret/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "risingwave_common_secret"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
repository = { workspace = true }

[package.metadata.cargo-machete]
ignored = ["workspace-hack"]

[package.metadata.cargo-udeps.ignore]
normal = ["workspace-hack"]

[dependencies]
aes-gcm = "0.10"
anyhow = "1"
bincode = "1"
parking_lot = { workspace = true }
prost = { workspace = true }
risingwave_pb = { workspace = true }
serde = { version = "1" }
thiserror = "1"
thiserror-ext = { workspace = true }
tracing = "0.1"

[lints]
workspace = true
Loading
Loading