Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(secret): secret management (part 1) add secret ref protos and referent count in meta catalog #17474

Merged
merged 9 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "common.proto";
import "data.proto";
import "expr.proto";
import "plan_common.proto";
import "secret.proto";

option java_package = "com.risingwave.proto";
option optimize_for = SPEED;
Expand Down Expand Up @@ -61,6 +62,7 @@ message SourceNode {
map<string, string> with_properties = 3;
repeated bytes split = 4;
catalog.StreamSourceInfo info = 5;
map<string, secret.SecretRef> secret_refs = 6;
}

message ProjectNode {
Expand Down
8 changes: 6 additions & 2 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@
map<string, string> format_encode_options = 14;

// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id and type.
map<string, secret.SecretRef> secret_ref = 16;
// For format and encode options.
map<string, secret.SecretRef> format_encode_secret_refs = 16;

Check failure on line 89 in proto/catalog.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "16" with name "format_encode_secret_refs" on message "StreamSourceInfo" changed option "json_name" from "secretRef" to "formatEncodeSecretRefs".

Check failure on line 89 in proto/catalog.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "16" with name "format_encode_secret_refs" on message "StreamSourceInfo" changed type from "catalog.StreamSourceInfo.SecretRefEntry" to "catalog.StreamSourceInfo.FormatEncodeSecretRefsEntry".

Check failure on line 89 in proto/catalog.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "16" on message "StreamSourceInfo" changed name from "secret_ref" to "format_encode_secret_refs".
yezizp2012 marked this conversation as resolved.
Show resolved Hide resolved
}

message Source {
Expand Down Expand Up @@ -123,6 +124,8 @@
// Cluster version (tracked by git commit) when initialized/created
optional string initialized_at_cluster_version = 17;
optional string created_at_cluster_version = 18;
// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id.
map<string, secret.SecretRef> secret_refs = 19;
yuhao-su marked this conversation as resolved.
Show resolved Hide resolved

// Per-source catalog version, used by schema change.
uint64 version = 100;
Expand All @@ -141,6 +144,7 @@
plan_common.EncodeType encode = 2;
map<string, string> options = 3;
optional plan_common.EncodeType key_encode = 4;
map<string, secret.SecretRef> secret_refs = 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it duplicate with the pre-defined one in sink proto?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, not sure about we need to introduce format-encode ones for Sink, cc @xiangjinwu

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sink also uses schema.registry, so I guess it need secrets

}

// the catalog of the sink. There are two kind of schema here. The full schema is all columns
Expand Down Expand Up @@ -182,7 +186,7 @@
CreateType create_type = 24;

// Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id and type.
map<string, secret.SecretRef> secret_ref = 25;
map<string, secret.SecretRef> secret_refs = 25;

Check failure on line 189 in proto/catalog.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "25" with name "secret_refs" on message "Sink" changed option "json_name" from "secretRef" to "secretRefs".

Check failure on line 189 in proto/catalog.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "25" with name "secret_refs" on message "Sink" changed type from "catalog.Sink.SecretRefEntry" to "catalog.Sink.SecretRefsEntry".

Check failure on line 189 in proto/catalog.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "25" on message "Sink" changed name from "secret_ref" to "secret_refs".
}

message Subscription {
Expand Down
2 changes: 2 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package plan_common;
import "common.proto";
import "data.proto";
import "expr.proto";
import "secret.proto";

option java_package = "com.risingwave.proto";
option optimize_for = SPEED;
Expand Down Expand Up @@ -108,6 +109,7 @@ message ExternalTableDesc {
map<string, string> connect_properties = 6;
// upstream cdc source job id
uint32 source_id = 7;
map<string, secret.SecretRef> secret_refs = 8;
}

enum JoinType {
Expand Down
5 changes: 5 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "common.proto";
import "data.proto";
import "expr.proto";
import "plan_common.proto";
import "secret.proto";
import "source.proto";

option java_package = "com.risingwave.proto";
Expand Down Expand Up @@ -188,6 +189,7 @@ message StreamSource {
string source_name = 8;
// Streaming rate limit
optional uint32 rate_limit = 9;
map<string, secret.SecretRef> secret_refs = 10;
}

// copy contents from StreamSource to prevent compatibility issues in the future
Expand All @@ -203,6 +205,7 @@ message StreamFsFetch {
string source_name = 8;
// Streaming rate limit
optional uint32 rate_limit = 9;
map<string, secret.SecretRef> secret_refs = 10;
}

// The executor only for receiving barrier from the meta service. It always resides in the leaves
Expand Down Expand Up @@ -233,6 +236,7 @@ message SourceBackfillNode {

// `| partition_id | backfill_progress |`
catalog.Table state_table = 8;
map<string, secret.SecretRef> secret_refs = 9;
}

message SinkDesc {
Expand All @@ -254,6 +258,7 @@ message SinkDesc {
catalog.SinkFormatDesc format_desc = 13;
optional uint32 target_table = 14;
optional uint64 extra_partition_col_idx = 15;
map<string, secret.SecretRef> secret_refs = 16;
}

enum SinkLogStoreType {
Expand Down
1 change: 1 addition & 0 deletions src/common/src/catalog/external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl CdcTableDesc {
table_name: self.external_table_name.clone(),
stream_key: self.stream_key.iter().map(|k| *k as _).collect(),
connect_properties: self.connect_properties.clone(),
secret_refs: Default::default(),
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ impl SinkDesc {
distribution_key: self.distribution_key,
owner,
dependent_relations,
properties: self.properties.into_iter().collect(),
properties: self.properties,
secret_refs: secret_ref,
sink_type: self.sink_type,
format_desc: self.format_desc,
connection_id,
Expand All @@ -110,7 +111,6 @@ impl SinkDesc {
created_at_cluster_version: None,
initialized_at_cluster_version: None,
create_type: self.create_type,
secret_ref,
}
}

Expand All @@ -134,6 +134,7 @@ impl SinkDesc {
sink_from_name: self.sink_from_name.clone(),
target_table: self.target_table.map(|table_id| table_id.table_id()),
extra_partition_col_idx: self.extra_partition_col_idx.map(|idx| idx as u64),
secret_refs: Default::default(),
}
}
}
7 changes: 4 additions & 3 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ impl SinkFormatDesc {
encode: encode.into(),
options,
key_encode,
secret_refs: Default::default(),
}
}
}
Expand Down Expand Up @@ -340,7 +341,7 @@ pub struct SinkCatalog {
pub create_type: CreateType,

/// The secret reference for the sink, mapping from property name to secret id.
pub secret_ref: BTreeMap<String, PbSecretRef>,
pub secret_refs: BTreeMap<String, PbSecretRef>,
}

impl SinkCatalog {
Expand Down Expand Up @@ -382,7 +383,7 @@ impl SinkCatalog {
created_at_cluster_version: self.created_at_cluster_version.clone(),
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
create_type: self.create_type.to_proto() as i32,
secret_ref: self.secret_ref.clone(),
secret_refs: self.secret_refs.clone(),
}
}

Expand Down Expand Up @@ -476,7 +477,7 @@ impl From<PbSink> for SinkCatalog {
initialized_at_cluster_version: pb.initialized_at_cluster_version,
created_at_cluster_version: pb.created_at_cluster_version,
create_type: CreateType::from_proto(create_type),
secret_ref: pb.secret_ref,
secret_refs: pb.secret_refs,
}
}
}
Expand Down
38 changes: 32 additions & 6 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,15 +489,35 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
}
println!("table fragments migrated");

let mut object_dependencies = vec![];

// catalogs.
// source
if !sources.is_empty() {
let source_models: Vec<source::ActiveModel> = sources
.into_iter()
.map(|mut src| {
let mut dependent_secret_refs = vec![];
if let Some(id) = src.connection_id.as_mut() {
*id = *connection_rewrite.get(id).unwrap();
}
for secret_ref in src.secret_refs.values_mut() {
secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap();
dependent_secret_refs.push(secret_ref.secret_id);
}
if let Some(info) = &mut src.info {
for secret_ref in info.format_encode_secret_refs.values_mut() {
secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap();
dependent_secret_refs.push(secret_ref.secret_id);
}
}
object_dependencies.extend(dependent_secret_refs.into_iter().map(|secret_id| {
object_dependency::ActiveModel {
id: NotSet,
oid: Set(secret_id as _),
used_by: Set(src.id as _),
}
}));
src.into()
})
.collect();
Expand All @@ -507,8 +527,6 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
}
println!("sources migrated");

let mut object_dependencies = vec![];

// table
for table in tables {
let job_id = if table.table_type() == PbTableType::Internal {
Expand Down Expand Up @@ -554,13 +572,21 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
if let Some(id) = s.connection_id.as_mut() {
*id = *connection_rewrite.get(id).unwrap();
}
for secret_id in s.secret_ref.values_mut() {
secret_id.secret_id = *secret_rewrite.get(&secret_id.secret_id).unwrap();
let mut dependent_secret_refs = vec![];
for secret_ref in s.secret_refs.values_mut() {
secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap();
dependent_secret_refs.push(secret_ref.secret_id);
}
if let Some(desc) = &mut s.format_desc {
for secret_ref in desc.secret_refs.values_mut() {
secret_ref.secret_id = *secret_rewrite.get(&secret_ref.secret_id).unwrap();
dependent_secret_refs.push(secret_ref.secret_id);
}
}
object_dependencies.extend(s.secret_ref.values().map(|id| {
object_dependencies.extend(dependent_secret_refs.into_iter().map(|secret_id| {
object_dependency::ActiveModel {
id: NotSet,
oid: Set(id.secret_id as _),
oid: Set(secret_id as _),
used_by: Set(s.id as _),
}
}));
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl SourceCatalog {
version: self.version,
created_at_cluster_version: self.created_at_cluster_version.clone(),
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
secret_refs: Default::default(),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ impl ToBatchPb for BatchIcebergScan {
.collect(),
with_properties: source_catalog.with_properties.clone().into_iter().collect(),
split: vec![],
secret_refs: Default::default(),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_kafka_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl ToBatchPb for BatchKafkaScan {
.collect(),
with_properties: source_catalog.with_properties.clone().into_iter().collect(),
split: vec![],
secret_refs: Default::default(),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl ToBatchPb for BatchSource {
.collect(),
with_properties: source_catalog.with_properties.clone().into_iter().collect(),
split: vec![],
secret_refs: Default::default(),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ impl StreamNode for StreamFsFetch {
.collect_vec(),
with_properties: source_catalog.with_properties.clone().into_iter().collect(),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
secret_refs: Default::default(),
});
NodeBody::StreamFsFetch(StreamFsFetchNode {
node_inner: source_inner,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ impl StreamNode for StreamSource {
.collect_vec(),
with_properties: source_catalog.with_properties.clone().into_iter().collect(),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
secret_refs: Default::default(),
});
PbNodeBody::Source(SourceNode { source_inner })
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_source_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ impl StreamSourceScan {
.collect_vec(),
with_properties: source_catalog.with_properties.clone().into_iter().collect(),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
secret_refs: Default::default(),
};

let fields = self.schema().to_prost();
Expand Down
2 changes: 1 addition & 1 deletion src/meta/model_v2/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl From<PbSink> for ActiveModel {
sink_from_name: Set(pb_sink.sink_from_name),
sink_format_desc: Set(pb_sink.format_desc.as_ref().map(|x| x.into())),
target_table: Set(pb_sink.target_table.map(|x| x as _)),
secret_ref: Set(Some(SecretRef::from(pb_sink.secret_ref))),
secret_ref: Set(Some(SecretRef::from(pb_sink.secret_refs))),
}
}
}
2 changes: 1 addition & 1 deletion src/meta/model_v2/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl From<PbSource> for ActiveModel {
optional_associated_table_id: Set(optional_associated_table_id),
connection_id: Set(source.connection_id.map(|id| id as _)),
version: Set(source.version as _),
secret_ref: Set(None),
secret_ref: Set(Some(SecretRef::from(source.secret_refs))),
}
}
}
7 changes: 6 additions & 1 deletion src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ impl From<ObjectModel<table::Model>> for PbTable {

impl From<ObjectModel<source::Model>> for PbSource {
fn from(value: ObjectModel<source::Model>) -> Self {
let mut secret_ref_map = BTreeMap::new();
if let Some(secret_ref) = value.0.secret_ref {
secret_ref_map = secret_ref.to_protobuf();
}
Self {
id: value.0.source_id as _,
schema_id: value.1.schema_id.unwrap() as _,
Expand Down Expand Up @@ -195,6 +199,7 @@ impl From<ObjectModel<source::Model>> for PbSource {
.map(|id| PbOptionalAssociatedTableId::AssociatedTableId(id as _)),
initialized_at_cluster_version: value.1.initialized_at_cluster_version,
created_at_cluster_version: value.1.created_at_cluster_version,
secret_refs: secret_ref_map,
}
}
}
Expand Down Expand Up @@ -234,7 +239,7 @@ impl From<ObjectModel<sink::Model>> for PbSink {
initialized_at_cluster_version: value.1.initialized_at_cluster_version,
created_at_cluster_version: value.1.created_at_cluster_version,
create_type: PbCreateType::Foreground as _,
secret_ref: secret_ref_map,
secret_refs: secret_ref_map,
}
}
}
Expand Down
13 changes: 10 additions & 3 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,18 @@ impl CatalogController {
}
}

// get dependent secret ref.
// XXX: A relation can ref a secret more than 1 time.
yuhao-su marked this conversation as resolved.
Show resolved Hide resolved
let dependent_secret_refs = streaming_job.dependent_secret_refs();

let dependent_objs = dependent_relations
.iter()
.chain(dependent_secret_refs.iter());
// record object dependency.
if !dependent_relations.is_empty() {
ObjectDependency::insert_many(dependent_relations.into_iter().map(|id| {
if !dependent_secret_refs.is_empty() || !dependent_relations.is_empty() {
ObjectDependency::insert_many(dependent_objs.map(|id| {
object_dependency::ActiveModel {
oid: Set(id as _),
oid: Set(*id as _),
used_by: Set(streaming_job.id() as _),
..Default::default()
}
Expand Down
Loading
Loading