diff --git a/Cargo.lock b/Cargo.lock index ab2f45a519bae..2cd740e2f31b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -77,6 +77,22 @@ dependencies = [ "aes", ] +[[package]] +name = "aes-siv" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e08d0cdb774acd1e4dac11478b1a0c0d203134b2aab0ba25eb430de9b18f8b9" +dependencies = [ + "aead", + "aes", + "cipher", + "cmac", + "ctr", + "dbl", + "digest", + "zeroize", +] + [[package]] name = "ahash" version = "0.7.8" @@ -2526,6 +2542,17 @@ dependencies = [ "cc", ] +[[package]] +name = "cmac" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8543454e3c3f5126effff9cd44d562af4e31fb8ce1cc0d3dcd8f084515dbc1aa" +dependencies = [ + "cipher", + "dbl", + "digest", +] + [[package]] name = "cmake" version = "0.1.50" @@ -3604,6 +3631,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "dbl" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd2735a791158376708f9347fe8faba9667589d82427ef3aed6794a8981de3d9" +dependencies = [ + "generic-array", +] + [[package]] name = "debugid" version = "0.8.0" @@ -9374,7 +9410,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.11.2", + "parking_lot 0.12.1", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -11049,6 +11085,7 @@ dependencies = [ name = "risingwave_meta" version = "1.9.0-alpha" dependencies = [ + "aes-siv", "anyhow", "arc-swap", "assert_matches", @@ -11056,6 +11093,7 @@ dependencies = [ "aws-config", "axum 0.7.4", "base64-url", + "bincode 1.3.3", "bytes", "chrono", "clap", diff --git a/e2e_test/ddl/secret.slt b/e2e_test/ddl/secret.slt new file mode 100644 index 0000000000000..7c11e2e6245a3 --- /dev/null +++ b/e2e_test/ddl/secret.slt @@ -0,0 +1,23 @@ +statement error secret backend "fake-backend" is not supported +create secret secret_1 with ( + backend = 'fake-backend' +) as 'demo_secret'; + +statement ok +create secret secret_1 with ( + backend = 'meta' +) as 'demo_secret'; + +# wait for support for hashicorp_vault backend +# statement ok +# create secret secret_2 with ( +# backend = 'hashicorp_vault' +# ); + +query T +show secrets; +---- +secret_1 + +statement ok +drop secret secret_1; diff --git a/proto/catalog.proto b/proto/catalog.proto index 42b672a1b6c51..7dfefa003217d 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -62,10 +62,12 @@ message StreamSourceInfo { SchemaRegistryNameStrategy name_strategy = 10; optional string key_message_name = 11; plan_common.ExternalTableDesc external_table = 12; - // **This field should now be called `is_shared`.** Not renamed for backwards compatibility. + // **This field should now be called `is_shared`.** Not renamed for backwards + // compatibility. // // Whether the stream source is a shared source (it has a streaming job). - // This is related with [RFC: Reusable Source Executor](https://github.com/risingwavelabs/rfcs/pull/72). + // This is related with [RFC: Reusable Source + // Executor](https://github.com/risingwavelabs/rfcs/pull/72). // // Currently, the following sources can be shared: // @@ -80,6 +82,9 @@ message StreamSourceInfo { bool is_distributed = 15; // Options specified by user in the FORMAT ENCODE clause. map format_encode_options = 14; + + // Handle the source relies on any sceret. The key is the propertity name and the value is the secret id. + map secret_ref = 16; } message Source { @@ -174,6 +179,9 @@ message Sink { // Whether it should use background ddl or block until backfill finishes. CreateType create_type = 24; + + // Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id. + map secret_ref = 25; } message Subscription { @@ -239,7 +247,8 @@ message Index { optional uint64 created_at_epoch = 11; StreamJobStatus stream_job_status = 12; - // Use to record the prefix len of the index_item to reconstruct index columns provided by users. + // Use to record the prefix len of the index_item to reconstruct index columns + // provided by users. uint32 index_columns_len = 13; // Cluster version (tracked by git commit) when initialized/created optional string initialized_at_cluster_version = 14; @@ -319,8 +328,8 @@ message Table { // an optional column index which is the vnode of each row computed by the // table's consistent hash distribution optional uint32 vnode_col_index = 18; - // An optional column index of row id. If the primary key is specified by users, - // this will be `None`. + // An optional column index of row id. If the primary key is specified by + // users, this will be `None`. optional uint32 row_id_index = 19; // The column indices which are stored in the state store's value with // row-encoding. Currently is not supported yet and expected to be @@ -329,23 +338,26 @@ message Table { string definition = 21; // Used to control whether handling pk conflict for incoming data. HandleConflictBehavior handle_pk_conflict_behavior = 22; - // Anticipated read prefix pattern (number of fields) for the table, which can be utilized - // for implementing the table's bloom filter or other storage optimization techniques. + // Anticipated read prefix pattern (number of fields) for the table, which can + // be utilized for implementing the table's bloom filter or other storage + // optimization techniques. uint32 read_prefix_len_hint = 23; repeated int32 watermark_indices = 24; repeated int32 dist_key_in_pk = 25; - // A dml fragment id corresponds to the table, used to decide where the dml statement is executed. + // A dml fragment id corresponds to the table, used to decide where the dml + // statement is executed. optional uint32 dml_fragment_id = 26; // The range of row count of the table. - // This field is not always present due to backward compatibility. Use `Cardinality::unknown` in this case. + // This field is not always present due to backward compatibility. Use + // `Cardinality::unknown` in this case. plan_common.Cardinality cardinality = 27; optional uint64 initialized_at_epoch = 28; optional uint64 created_at_epoch = 29; - // This field is introduced in v1.2.0. It is used to indicate whether the table should use - // watermark_cache to avoid state cleaning as a performance optimization. - // In older versions we can just initialize without it. + // This field is introduced in v1.2.0. It is used to indicate whether the + // table should use watermark_cache to avoid state cleaning as a performance + // optimization. In older versions we can just initialize without it. bool cleaned_by_watermark = 30; // Used to filter created / creating tables in meta. @@ -363,14 +375,18 @@ message Table { optional string initialized_at_cluster_version = 35; optional string created_at_cluster_version = 36; - // TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables. + // TTL of the record in the table, to ensure the consistency with other tables + // in the streaming plan, it only applies to append-only tables. optional uint32 retention_seconds = 37; - // This field specifies the index of the column set in the "with version column" within all the columns. It is used for filtering during "on conflict" operations. + // This field specifies the index of the column set in the "with version + // column" within all the columns. It is used for filtering during "on + // conflict" operations. optional uint32 version_column_index = 38; - // Per-table catalog version, used by schema change. `None` for internal tables and tests. - // Not to be confused with the global catalog version for notification service. + // Per-table catalog version, used by schema change. `None` for internal + // tables and tests. Not to be confused with the global catalog version for + // notification service. TableVersion version = 100; } @@ -415,3 +431,13 @@ message Comment { optional uint32 column_index = 4; optional string description = 5; } + +message Secret { + uint32 id = 1; + string name = 2; + uint32 database_id = 3; + // The secret here is encrypted to bytes. + bytes value = 4; + uint32 owner = 5; + uint32 schema_id = 6; +} diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 58fb645e056eb..46c2a5c22ff6d 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -357,6 +357,26 @@ message GetDdlProgressResponse { repeated DdlProgress ddl_progress = 1; } +message CreateSecretRequest { + string name = 1; + bytes value = 2; + uint32 database_id = 3; + uint32 schema_id = 4; + uint32 owner_id = 5; +} + +message CreateSecretResponse { + uint64 version = 1; +} + +message DropSecretRequest { + uint32 secret_id = 1; +} + +message DropSecretResponse { + uint64 version = 1; +} + message CreateConnectionRequest { message PrivateLink { catalog.Connection.PrivateLinkService.PrivateLinkProvider provider = 1; @@ -427,6 +447,8 @@ service DdlService { rpc CreateMaterializedView(CreateMaterializedViewRequest) returns (CreateMaterializedViewResponse); rpc DropMaterializedView(DropMaterializedViewRequest) returns (DropMaterializedViewResponse); rpc CreateTable(CreateTableRequest) returns (CreateTableResponse); + rpc CreateSecret(CreateSecretRequest) returns (CreateSecretResponse); + rpc DropSecret(DropSecretRequest) returns (DropSecretResponse); rpc AlterName(AlterNameRequest) returns (AlterNameResponse); rpc AlterSource(AlterSourceRequest) returns (AlterSourceResponse); rpc AlterOwner(AlterOwnerRequest) returns (AlterOwnerResponse); diff --git a/proto/meta.proto b/proto/meta.proto index df90d99f6e9e9..9ad18cb3df7d5 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -407,6 +407,7 @@ message MetaSnapshot { reserved 9; reserved "parallel_unit_mappings"; GetSessionParamsResponse session_params = 20; + repeated catalog.Secret secrets = 23; repeated common.WorkerNode nodes = 10; hummock.HummockSnapshot hummock_snapshot = 11; hummock.HummockVersion hummock_version = 12; @@ -469,6 +470,7 @@ message SubscribeResponse { Recovery recovery = 25; FragmentWorkerSlotMapping streaming_worker_slot_mapping = 27; FragmentWorkerSlotMappings serving_worker_slot_mappings = 28; + catalog.Secret secret = 29; } reserved 12; reserved "parallel_unit_mapping"; diff --git a/proto/secret.proto b/proto/secret.proto new file mode 100644 index 0000000000000..f5065009519fd --- /dev/null +++ b/proto/secret.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +package secret; + +message SecretMetaBackend { + bytes value = 1; +} + +message SecretHashicropValutBackend { + string host = 1; + string vault_token = 2; +} + +message Secret { + // the message is stored in meta as encrypted bytes and is interpreted as bytes by catalog + oneof secret_backend { + SecretMetaBackend meta = 1; + SecretHashicropValutBackend hashicorp_vault = 2; + } +} diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index a611c40aebc68..e760a0e16866c 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -27,6 +27,7 @@ pub trait SubscribeTypeEnum { } pub struct SubscribeFrontend {} + impl SubscribeTypeEnum for SubscribeFrontend { fn subscribe_type() -> SubscribeType { SubscribeType::Frontend @@ -34,6 +35,7 @@ impl SubscribeTypeEnum for SubscribeFrontend { } pub struct SubscribeHummock {} + impl SubscribeTypeEnum for SubscribeHummock { fn subscribe_type() -> SubscribeType { SubscribeType::Hummock @@ -41,6 +43,7 @@ impl SubscribeTypeEnum for SubscribeHummock { } pub struct SubscribeCompactor {} + impl SubscribeTypeEnum for SubscribeCompactor { fn subscribe_type() -> SubscribeType { SubscribeType::Compactor @@ -48,6 +51,7 @@ impl SubscribeTypeEnum for SubscribeCompactor { } pub struct SubscribeCompute {} + impl SubscribeTypeEnum for SubscribeCompute { fn subscribe_type() -> SubscribeType { SubscribeType::Compute @@ -142,6 +146,7 @@ where | Info::RelationGroup(_) | Info::User(_) | Info::Connection(_) + | Info::Secret(_) | Info::Function(_) => { notification.version > info.version.as_ref().unwrap().catalog_version } @@ -231,6 +236,7 @@ where } } } + const RE_SUBSCRIBE_RETRY_INTERVAL: Duration = Duration::from_millis(100); #[async_trait::async_trait] diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index cef5b021b9dbd..86c6e8895c066 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -451,6 +451,41 @@ impl From for u32 { } } +#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)] +pub struct SecretId(pub u32); + +impl SecretId { + pub const fn new(id: u32) -> Self { + SecretId(id) + } + + pub const fn placeholder() -> Self { + SecretId(OBJECT_ID_PLACEHOLDER) + } + + pub fn secret_id(&self) -> u32 { + self.0 + } +} + +impl From for SecretId { + fn from(id: u32) -> Self { + Self::new(id) + } +} + +impl From<&u32> for SecretId { + fn from(id: &u32) -> Self { + Self::new(*id) + } +} + +impl From for u32 { + fn from(id: SecretId) -> Self { + id.0 + } +} + #[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum ConflictBehavior { #[default] diff --git a/src/common/src/config.rs b/src/common/src/config.rs index cd276e8a966f4..8ae14702d3261 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -366,6 +366,9 @@ pub struct MetaConfig { /// Whether compactor should rewrite row to remove dropped column. #[serde(default = "default::meta::enable_dropped_column_reclaim")] pub enable_dropped_column_reclaim: bool, + + #[serde(default = "default::meta::secret_store_private_key")] + pub secret_store_private_key: Vec, } #[derive(Copy, Clone, Debug, Default)] @@ -1322,9 +1325,14 @@ pub mod default { pub fn parallelism_control_trigger_first_delay_sec() -> u64 { 30 } + pub fn enable_dropped_column_reclaim() -> bool { false } + + pub fn secret_store_private_key() -> Vec { + "demo-secret-private-key".as_bytes().to_vec() + } } pub mod server { @@ -1492,6 +1500,7 @@ pub mod default { pub fn max_preload_io_retry_times() -> usize { 3 } + pub fn mem_table_spill_threshold() -> usize { 4 << 20 } @@ -1532,7 +1541,6 @@ pub mod default { } pub mod file_cache { - pub fn dir() -> String { "".to_string() } @@ -1786,7 +1794,8 @@ pub mod default { // decrease this configure when the generation of checkpoint barrier is not frequent. const DEFAULT_TIER_COMPACT_TRIGGER_NUMBER: u64 = 12; - const DEFAULT_TARGET_FILE_SIZE_BASE: u64 = 32 * 1024 * 1024; // 32MB + const DEFAULT_TARGET_FILE_SIZE_BASE: u64 = 32 * 1024 * 1024; + // 32MB const DEFAULT_MAX_SUB_COMPACTION: u32 = 4; const DEFAULT_LEVEL_MULTIPLIER: u64 = 5; const DEFAULT_MAX_SPACE_RECLAIM_BYTES: u64 = 512 * 1024 * 1024; // 512MB; @@ -1802,42 +1811,55 @@ pub mod default { pub fn max_bytes_for_level_base() -> u64 { DEFAULT_MAX_BYTES_FOR_LEVEL_BASE } + pub fn max_bytes_for_level_multiplier() -> u64 { DEFAULT_LEVEL_MULTIPLIER } + pub fn max_compaction_bytes() -> u64 { DEFAULT_MAX_COMPACTION_BYTES } + pub fn sub_level_max_compaction_bytes() -> u64 { DEFAULT_MIN_COMPACTION_BYTES } + pub fn level0_tier_compact_file_number() -> u64 { DEFAULT_TIER_COMPACT_TRIGGER_NUMBER } + pub fn target_file_size_base() -> u64 { DEFAULT_TARGET_FILE_SIZE_BASE } + pub fn compaction_filter_mask() -> u32 { (CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL).into() } + pub fn max_sub_compaction() -> u32 { DEFAULT_MAX_SUB_COMPACTION } + pub fn level0_stop_write_threshold_sub_level_number() -> u64 { DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_SUB_LEVEL_NUMBER } + pub fn level0_sub_level_compact_level_count() -> u32 { DEFAULT_MIN_SUB_LEVEL_COMPACT_LEVEL_COUNT } + pub fn level0_overlapping_sub_level_compact_level_count() -> u32 { DEFAULT_MIN_OVERLAPPING_SUB_LEVEL_COMPACT_LEVEL_COUNT } + pub fn max_space_reclaim_bytes() -> u64 { DEFAULT_MAX_SPACE_RECLAIM_BYTES } + pub fn level0_max_compact_file_number() -> u64 { DEFAULT_MAX_COMPACTION_FILE_COUNT } + pub fn tombstone_reclaim_ratio() -> u32 { DEFAULT_TOMBSTONE_RATIO_PERCENT } @@ -1962,6 +1984,7 @@ pub mod default { pub mod developer { use crate::util::env_var::env_var_is_true_or; + const RW_USE_OPENDAL_FOR_S3: &str = "RW_USE_OPENDAL_FOR_S3"; pub fn object_store_retry_unknown_service_error() -> bool { diff --git a/src/config/docs.md b/src/config/docs.md index aea210f5235af..2f8c4ce2812b1 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -56,6 +56,7 @@ This page is automatically generated by `./risedev generate-example-config` | periodic_split_compact_group_interval_sec | | 10 | | periodic_tombstone_reclaim_compaction_interval_sec | | 600 | | periodic_ttl_reclaim_compaction_interval_sec | Schedule `ttl_reclaim` compaction for all compaction groups with this interval. | 1800 | +| secret_store_private_key | | [100, 101, 109, 111, 45, 115, 101, 99, 114, 101, 116, 45, 112, 114, 105, 118, 97, 116, 101, 45, 107, 101, 121] | | split_group_size_limit | | 68719476736 | | table_write_throughput_threshold | The threshold of write throughput to trigger a group split. Increase this configuration value to avoid split too many groups with few data write. | 16777216 | | unrecognized | | | diff --git a/src/config/example.toml b/src/config/example.toml index a50b4b8c10d65..fb2243535d6a4 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -54,6 +54,7 @@ compact_task_table_size_partition_threshold_high = 536870912 event_log_enabled = true event_log_channel_max_size = 10 enable_dropped_column_reclaim = false +secret_store_private_key = [100, 101, 109, 111, 45, 115, 101, 99, 114, 101, 116, 45, 112, 114, 105, 118, 97, 116, 101, 45, 107, 101, 121] [meta.compaction_config] max_bytes_for_level_base = 536870912 diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 0fb466e5a1742..e5415c268d569 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use itertools::Itertools; use risingwave_common::catalog::{ @@ -83,6 +83,7 @@ impl SinkDesc { owner: UserId, connection_id: Option, dependent_relations: Vec, + secret_ref: HashMap, ) -> SinkCatalog { SinkCatalog { id: self.id, @@ -108,6 +109,7 @@ impl SinkDesc { created_at_cluster_version: None, initialized_at_cluster_version: None, create_type: self.create_type, + secret_ref, } } diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 7eaa977e16044..bf5dd89dd7894 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -337,6 +337,9 @@ pub struct SinkCatalog { pub created_at_cluster_version: Option, pub initialized_at_cluster_version: Option, pub create_type: CreateType, + + /// The secret reference for the sink, mapping from property name to secret id. + pub secret_ref: HashMap, } impl SinkCatalog { @@ -378,6 +381,7 @@ impl SinkCatalog { created_at_cluster_version: self.created_at_cluster_version.clone(), initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), create_type: self.create_type.to_proto() as i32, + secret_ref: self.secret_ref.clone(), } } @@ -471,6 +475,7 @@ impl From for SinkCatalog { initialized_at_cluster_version: pb.initialized_at_cluster_version, created_at_cluster_version: pb.created_at_cluster_version, create_type: CreateType::from_proto(create_type), + secret_ref: pb.secret_ref, } } } diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index b97c551f2ef78..f2bcdd2b62e12 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -34,7 +34,7 @@ use risingwave_rpc_client::MetaClient; use tokio::sync::watch::Receiver; use super::root_catalog::Catalog; -use super::{DatabaseId, TableId}; +use super::{DatabaseId, SecretId, TableId}; use crate::error::Result; use crate::user::UserId; @@ -43,6 +43,7 @@ pub type CatalogReadGuard = ArcRwLockReadGuard; /// [`CatalogReader`] can read catalog from local catalog and force the holder can not modify it. #[derive(Clone)] pub struct CatalogReader(Arc>); + impl CatalogReader { pub fn new(inner: Arc>) -> Self { CatalogReader(inner) @@ -130,6 +131,15 @@ pub trait CatalogWriter: Send + Sync { connection: create_connection_request::Payload, ) -> Result<()>; + async fn create_secret( + &self, + secret_name: String, + database_id: u32, + schema_id: u32, + owner_id: u32, + payload: Vec, + ) -> Result<()>; + async fn comment_on(&self, comment: PbComment) -> Result<()>; async fn drop_table( @@ -164,6 +174,8 @@ pub trait CatalogWriter: Send + Sync { async fn drop_connection(&self, connection_id: u32) -> Result<()>; + async fn drop_secret(&self, secret_id: SecretId) -> Result<()>; + async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()>; async fn alter_view_name(&self, view_id: u32, view_name: &str) -> Result<()>; @@ -373,6 +385,21 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } + async fn create_secret( + &self, + secret_name: String, + database_id: u32, + schema_id: u32, + owner_id: u32, + payload: Vec, + ) -> Result<()> { + let version = self + .meta_client + .create_secret(secret_name, database_id, schema_id, owner_id, payload) + .await?; + self.wait_version(version).await + } + async fn comment_on(&self, comment: PbComment) -> Result<()> { let version = self.meta_client.comment_on(comment).await?; self.wait_version(version).await @@ -455,6 +482,11 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } + async fn drop_secret(&self, secret_id: SecretId) -> Result<()> { + let version = self.meta_client.drop_secret(secret_id).await?; + self.wait_version(version).await + } + async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()> { let version = self .meta_client diff --git a/src/frontend/src/catalog/mod.rs b/src/frontend/src/catalog/mod.rs index 687fa5ac350e7..64c3f525dd8f6 100644 --- a/src/frontend/src/catalog/mod.rs +++ b/src/frontend/src/catalog/mod.rs @@ -39,6 +39,8 @@ pub(crate) mod system_catalog; pub(crate) mod table_catalog; pub(crate) mod view_catalog; +pub(crate) mod secret_catalog; + pub(crate) use catalog_service::CatalogReader; pub use index_catalog::IndexCatalog; pub use table_catalog::TableCatalog; @@ -55,6 +57,7 @@ pub(crate) type SchemaId = u32; pub(crate) type TableId = risingwave_common::catalog::TableId; pub(crate) type ColumnId = risingwave_common::catalog::ColumnId; pub(crate) type FragmentId = u32; +pub(crate) type SecretId = risingwave_common::catalog::SecretId; /// Check if the column name does not conflict with the internally reserved column name. pub fn check_valid_column_name(column_name: &str) -> Result<()> { diff --git a/src/frontend/src/catalog/root_catalog.rs b/src/frontend/src/catalog/root_catalog.rs index 7a99c199446dd..5e6431065ce79 100644 --- a/src/frontend/src/catalog/root_catalog.rs +++ b/src/frontend/src/catalog/root_catalog.rs @@ -21,8 +21,8 @@ use risingwave_common::session_config::{SearchPath, USER_NAME_WILD_CARD}; use risingwave_common::types::DataType; use risingwave_connector::sink::catalog::SinkCatalog; use risingwave_pb::catalog::{ - PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbSubscription, - PbTable, PbView, + PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, + PbSubscription, PbTable, PbView, }; use risingwave_pb::hummock::HummockVersionStats; @@ -30,10 +30,13 @@ use super::function_catalog::FunctionCatalog; use super::source_catalog::SourceCatalog; use super::subscription_catalog::SubscriptionCatalog; use super::view_catalog::ViewCatalog; -use super::{CatalogError, CatalogResult, ConnectionId, SinkId, SourceId, SubscriptionId, ViewId}; +use super::{ + CatalogError, CatalogResult, ConnectionId, SecretId, SinkId, SourceId, SubscriptionId, ViewId, +}; use crate::catalog::connection_catalog::ConnectionCatalog; use crate::catalog::database_catalog::DatabaseCatalog; use crate::catalog::schema_catalog::SchemaCatalog; +use crate::catalog::secret_catalog::SecretCatalog; use crate::catalog::system_catalog::{ get_sys_tables_in_schema, get_sys_views_in_schema, SystemTableCatalog, }; @@ -201,6 +204,14 @@ impl Catalog { .create_subscription(proto); } + pub fn create_secret(&mut self, proto: &PbSecret) { + self.get_database_mut(proto.database_id) + .unwrap() + .get_schema_mut(proto.schema_id) + .unwrap() + .create_secret(proto); + } + pub fn create_view(&mut self, proto: &PbView) { self.get_database_mut(proto.database_id) .unwrap() @@ -257,6 +268,25 @@ impl Catalog { } } + pub fn update_secret(&mut self, proto: &PbSecret) { + let database = self.get_database_mut(proto.database_id).unwrap(); + let schema = database.get_schema_mut(proto.schema_id).unwrap(); + let secret_id = SecretId::new(proto.id); + if schema.get_secret_by_id(&secret_id).is_some() { + schema.update_secret(proto); + } else { + // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement. + schema.create_secret(proto); + database + .iter_schemas_mut() + .find(|schema| { + schema.id() != proto.schema_id && schema.get_secret_by_id(&secret_id).is_some() + }) + .unwrap() + .drop_secret(secret_id); + } + } + pub fn drop_database(&mut self, db_id: DatabaseId) { let name = self.db_name_by_id.remove(&db_id).unwrap(); let database = self.database_by_name.remove(&name).unwrap(); @@ -377,6 +407,14 @@ impl Catalog { .drop_sink(sink_id); } + pub fn drop_secret(&mut self, db_id: DatabaseId, schema_id: SchemaId, secret_id: SecretId) { + self.get_database_mut(db_id) + .unwrap() + .get_schema_mut(schema_id) + .unwrap() + .drop_secret(secret_id); + } + pub fn update_sink(&mut self, proto: &PbSink) { let database = self.get_database_mut(proto.database_id).unwrap(); let schema = database.get_schema_mut(proto.schema_id).unwrap(); @@ -793,6 +831,21 @@ impl Catalog { Err(CatalogError::NotFound("view", view_id.to_string())) } + pub fn get_secret_by_name<'a>( + &self, + db_name: &str, + schema_path: SchemaPath<'a>, + secret_name: &str, + ) -> CatalogResult<(&Arc, &'a str)> { + schema_path + .try_find(|schema_name| { + Ok(self + .get_schema_by_name(db_name, schema_name)? + .get_secret_by_name(secret_name)) + })? + .ok_or_else(|| CatalogError::NotFound("secret", secret_name.to_string())) + } + pub fn get_connection_by_name<'a>( &self, db_name: &str, @@ -958,6 +1011,21 @@ impl Catalog { } } + pub fn check_secret_name_duplicated( + &self, + db_name: &str, + schema_name: &str, + secret_name: &str, + ) -> CatalogResult<()> { + let schema = self.get_schema_by_name(db_name, schema_name)?; + + if schema.get_secret_by_name(secret_name).is_some() { + Err(CatalogError::Duplicated("secret", secret_name.to_string())) + } else { + Ok(()) + } + } + /// Get the catalog cache's catalog version. pub fn version(&self) -> u64 { self.version diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index 20a99ad820af8..fffb171c4c8bc 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -22,7 +22,8 @@ use risingwave_common::types::DataType; use risingwave_connector::sink::catalog::SinkCatalog; pub use risingwave_expr::sig::*; use risingwave_pb::catalog::{ - PbConnection, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbSubscription, PbTable, PbView, + PbConnection, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, PbSubscription, + PbTable, PbView, }; use risingwave_pb::user::grant_privilege::Object; @@ -31,11 +32,12 @@ use super::{OwnedByUserCatalog, SubscriptionId}; use crate::catalog::connection_catalog::ConnectionCatalog; use crate::catalog::function_catalog::FunctionCatalog; use crate::catalog::index_catalog::IndexCatalog; +use crate::catalog::secret_catalog::SecretCatalog; use crate::catalog::source_catalog::SourceCatalog; use crate::catalog::system_catalog::SystemTableCatalog; use crate::catalog::table_catalog::TableCatalog; use crate::catalog::view_catalog::ViewCatalog; -use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SinkId, SourceId, ViewId}; +use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId, SinkId, SourceId, ViewId}; use crate::expr::{infer_type_name, infer_type_with_sigmap, Expr, ExprImpl}; use crate::user::UserId; @@ -62,6 +64,11 @@ pub struct SchemaCatalog { function_by_id: HashMap>, connection_by_name: HashMap>, connection_by_id: HashMap>, + secret_by_name: HashMap>, + secret_by_id: HashMap>, + + _secret_source_ref: HashMap>, + _secret_sink_ref: HashMap>, // This field is currently used only for `show connections` connection_source_ref: HashMap>, @@ -484,6 +491,46 @@ impl SchemaCatalog { .expect("connection not found by name"); } + pub fn create_secret(&mut self, prost: &PbSecret) { + let name = prost.name.clone(); + let id = SecretId::new(prost.id); + let secret = SecretCatalog::from(prost); + let secret_ref = Arc::new(secret); + + self.secret_by_id + .try_insert(id, secret_ref.clone()) + .unwrap(); + self.secret_by_name + .try_insert(name, secret_ref.clone()) + .unwrap(); + } + + pub fn update_secret(&mut self, prost: &PbSecret) { + let name = prost.name.clone(); + let id = SecretId::new(prost.id); + let secret = SecretCatalog::from(prost); + let secret_ref = Arc::new(secret); + + let old_secret = self.secret_by_id.get(&id).unwrap(); + // check if secret name get updated. + if old_secret.name != name { + self.secret_by_name.remove(&old_secret.name); + } + + self.secret_by_name.insert(name, secret_ref.clone()); + self.secret_by_id.insert(id, secret_ref); + } + + pub fn drop_secret(&mut self, secret_id: SecretId) { + let secret_ref = self + .secret_by_id + .remove(&secret_id) + .expect("secret not found by id"); + self.secret_by_name + .remove(&secret_ref.name) + .expect("secret not found by name"); + } + pub fn iter_all(&self) -> impl Iterator> { self.table_by_name.values() } @@ -546,6 +593,10 @@ impl SchemaCatalog { self.connection_by_name.values() } + pub fn iter_secret(&self) -> impl Iterator> { + self.secret_by_name.values() + } + pub fn iter_system_tables(&self) -> impl Iterator> { self.system_table_by_name.values() } @@ -687,6 +738,14 @@ impl SchemaCatalog { self.connection_by_name.get(connection_name) } + pub fn get_secret_by_name(&self, secret_name: &str) -> Option<&Arc> { + self.secret_by_name.get(secret_name) + } + + pub fn get_secret_by_id(&self, secret_id: &SecretId) -> Option<&Arc> { + self.secret_by_id.get(secret_id) + } + /// get all sources referencing the connection pub fn get_source_ids_by_connection( &self, @@ -764,6 +823,10 @@ impl From<&PbSchema> for SchemaCatalog { function_by_id: HashMap::new(), connection_by_name: HashMap::new(), connection_by_id: HashMap::new(), + secret_by_name: HashMap::new(), + secret_by_id: HashMap::new(), + _secret_source_ref: HashMap::new(), + _secret_sink_ref: HashMap::new(), connection_source_ref: HashMap::new(), connection_sink_ref: HashMap::new(), subscription_by_name: HashMap::new(), diff --git a/src/frontend/src/catalog/secret_catalog.rs b/src/frontend/src/catalog/secret_catalog.rs new file mode 100644 index 0000000000000..5e9aaae7dec99 --- /dev/null +++ b/src/frontend/src/catalog/secret_catalog.rs @@ -0,0 +1,45 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::catalog::PbSecret; + +use crate::catalog::{DatabaseId, OwnedByUserCatalog, SecretId}; +use crate::user::UserId; + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct SecretCatalog { + pub secret_id: SecretId, + pub name: String, + pub database_id: DatabaseId, + pub value: Vec, + pub owner: UserId, +} + +impl From<&PbSecret> for SecretCatalog { + fn from(value: &PbSecret) -> Self { + Self { + secret_id: SecretId::new(value.id), + database_id: value.database_id, + owner: value.owner, + name: value.name.clone(), + value: value.value.clone(), + } + } +} + +impl OwnedByUserCatalog for SecretCatalog { + fn owner(&self) -> UserId { + self.owner + } +} diff --git a/src/frontend/src/handler/create_secret.rs b/src/frontend/src/handler/create_secret.rs new file mode 100644 index 0000000000000..2e99f26e97cb8 --- /dev/null +++ b/src/frontend/src/handler/create_secret.rs @@ -0,0 +1,102 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pgwire::pg_response::{PgResponse, StatementType}; +use prost::Message; +use risingwave_common::bail_not_implemented; +use risingwave_sqlparser::ast::{CreateSecretStatement, SqlOption, Value}; + +use crate::error::{ErrorCode, Result}; +use crate::handler::{HandlerArgs, RwPgResponse}; +use crate::{Binder, WithOptions}; + +const SECRET_BACKEND_KEY: &str = "backend"; + +const SECRET_BACKEND_META: &str = "meta"; +const SECRET_BACKEND_HASHICORP_VAULT: &str = "hashicorp_vault"; + +pub async fn handle_create_secret( + handler_args: HandlerArgs, + stmt: CreateSecretStatement, +) -> Result { + let session = handler_args.session.clone(); + let db_name = session.database(); + let (schema_name, connection_name) = + Binder::resolve_schema_qualified_name(db_name, stmt.secret_name.clone())?; + + if let Err(e) = session.check_secret_name_duplicated(stmt.secret_name.clone()) { + return if stmt.if_not_exists { + Ok(PgResponse::builder(StatementType::CREATE_SECRET) + .notice(format!("secret \"{}\" exists, skipping", connection_name)) + .into()) + } else { + Err(e) + }; + } + + // check if the secret backend is supported + let with_props = WithOptions::try_from(stmt.with_properties.0.as_ref() as &[SqlOption])?; + let secret_payload: Vec = { + if let Some(backend) = with_props.inner().get(SECRET_BACKEND_KEY) { + match backend.to_lowercase().as_ref() { + SECRET_BACKEND_META => { + let backend = risingwave_pb::secret::Secret { + secret_backend: Some(risingwave_pb::secret::secret::SecretBackend::Meta( + risingwave_pb::secret::SecretMetaBackend { value: vec![] }, + )), + }; + backend.encode_to_vec() + } + SECRET_BACKEND_HASHICORP_VAULT => { + if stmt.credential != Value::Null { + return Err(ErrorCode::InvalidParameterValue( + "credential must be null for hashicorp_vault backend".to_string(), + ) + .into()); + } + bail_not_implemented!("hashicorp_vault backend is not implemented yet") + } + _ => { + return Err(ErrorCode::InvalidParameterValue(format!( + "secret backend \"{}\" is not supported. Supported backends are: {}", + backend, + [SECRET_BACKEND_META, SECRET_BACKEND_HASHICORP_VAULT].join(",") + )) + .into()); + } + } + } else { + return Err(ErrorCode::InvalidParameterValue(format!( + "secret backend is not specified in with clause. Supported backends are: {}", + [SECRET_BACKEND_META, SECRET_BACKEND_HASHICORP_VAULT].join(",") + )) + .into()); + } + }; + + let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; + + let catalog_writer = session.catalog_writer()?; + catalog_writer + .create_secret( + stmt.secret_name.real_value(), + database_id, + schema_id, + session.user_id(), + secret_payload, + ) + .await?; + + Ok(PgResponse::empty_result(StatementType::CREATE_SECRET)) +} diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index b81a40f6d5759..6f82425fa06e8 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -63,7 +63,7 @@ use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationC use crate::scheduler::streaming_manager::CreatingStreamingJobInfo; use crate::session::SessionImpl; use crate::stream_fragmenter::build_graph; -use crate::utils::resolve_privatelink_in_with_option; +use crate::utils::{resolve_privatelink_in_with_option, resolve_secret_in_with_options}; use crate::{Explain, Planner, TableCatalog, WithOptions}; // used to store result of `gen_sink_plan` @@ -145,6 +145,7 @@ pub fn gen_sink_plan( resolve_privatelink_in_with_option(&mut with_options, &sink_schema_name, session)?; conn_id.map(ConnectionId) }; + let secret_ref = resolve_secret_in_with_options(&mut with_options, session)?; let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose); if emit_on_window_close { @@ -254,6 +255,7 @@ pub fn gen_sink_plan( UserId::new(session.user_id()), connection_id, dependent_relations.into_iter().collect_vec(), + secret_ref, ); if let Some(table_catalog) = &target_table_catalog { diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index bd77ea68ce8a8..75e188c086947 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -81,7 +81,7 @@ use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext}; use crate::session::SessionImpl; -use crate::utils::resolve_privatelink_in_with_option; +use crate::utils::{resolve_privatelink_in_with_option, resolve_secret_in_with_options}; use crate::{bind_data_type, build_graph, OptimizerContext, WithOptions}; pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector"; @@ -1423,6 +1423,7 @@ pub async fn bind_create_source( let mut with_properties = WithOptions::new(with_properties); let connection_id = resolve_privatelink_in_with_option(&mut with_properties, &schema_name, session)?; + let _secret_ref = resolve_secret_in_with_options(&mut with_properties, session)?; let definition: String = handler_args.normalized_sql.clone(); diff --git a/src/frontend/src/handler/drop_secret.rs b/src/frontend/src/handler/drop_secret.rs new file mode 100644 index 0000000000000..37fbd2cedd408 --- /dev/null +++ b/src/frontend/src/handler/drop_secret.rs @@ -0,0 +1,64 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pgwire::pg_response::StatementType; +use risingwave_sqlparser::ast::ObjectName; + +use crate::catalog::root_catalog::SchemaPath; +use crate::error::Result; +use crate::handler::{HandlerArgs, RwPgResponse}; +use crate::Binder; + +pub async fn handle_drop_secret( + handler_args: HandlerArgs, + secret_name: ObjectName, + if_exists: bool, +) -> Result { + let session = handler_args.session; + let db_name = session.database(); + let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, secret_name)?; + let search_path = session.config().search_path(); + let user_name = &session.auth_context().user_name; + let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); + + let secret_id = { + let reader = session.env().catalog_reader().read_guard(); + let (secret, schema_name) = + match reader.get_secret_by_name(db_name, schema_path, secret_name.as_str()) { + Ok((c, s)) => (c, s), + Err(e) => { + return if if_exists { + Ok(RwPgResponse::builder(StatementType::DROP_SECRET) + .notice(format!( + "secret \"{}\" does not exist, skipping", + secret_name + )) + .into()) + } else { + Err(e.into()) + }; + } + }; + session.check_privilege_for_drop_alter(schema_name, &**secret)?; + + secret.secret_id + }; + + let catalog_writer = session.catalog_writer()?; + catalog_writer.drop_secret(secret_id).await?; + + Ok(RwPgResponse::builder(StatementType::DROP_SECRET) + .notice(format!("dropped secret \"{}\"", secret_name)) + .into()) +} diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 11c64c5bb27dd..f8beeedb19438 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -60,6 +60,7 @@ pub mod create_function; pub mod create_index; pub mod create_mv; pub mod create_schema; +pub mod create_secret; pub mod create_sink; pub mod create_source; pub mod create_sql_function; @@ -77,6 +78,7 @@ pub mod drop_function; mod drop_index; pub mod drop_mv; mod drop_schema; +pub mod drop_secret; pub mod drop_sink; pub mod drop_source; pub mod drop_subscription; @@ -257,6 +259,9 @@ pub async fn handle( Statement::CreateConnection { stmt } => { create_connection::handle_create_connection(handler_args, stmt).await } + Statement::CreateSecret { stmt } => { + create_secret::handle_create_secret(handler_args, stmt).await + } Statement::CreateFunction { or_replace, temporary, @@ -441,7 +446,8 @@ pub async fn handle( ObjectType::Schema | ObjectType::Database | ObjectType::User - | ObjectType::Connection => { + | ObjectType::Connection + | ObjectType::Secret => { bail_not_implemented!("DROP CASCADE"); } }; @@ -508,6 +514,9 @@ pub async fn handle( drop_connection::handle_drop_connection(handler_args, object_name, if_exists) .await } + ObjectType::Secret => { + drop_secret::handle_drop_secret(handler_args, object_name, if_exists).await + } } } // XXX: should we reuse Statement::Drop for DROP FUNCTION? diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index 359d556524240..f2d5186b67962 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -333,6 +333,12 @@ pub async fn handle_show_object( .iter_subscription() .map(|t| t.name.clone()) .collect(), + ShowObject::Secret { schema } => catalog_reader + .read_guard() + .get_schema_by_name(session.database(), &schema_or_default(&schema))? + .iter_secret() + .map(|t| t.name.clone()) + .collect(), ShowObject::Columns { table } => { let Ok(columns) = get_columns_from_table(&session, table.clone()) .or(get_columns_from_sink(&session, table.clone())) diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index f864f9608bdba..e5313b9601eb4 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -32,7 +32,7 @@ use risingwave_rpc_client::ComputeClientPoolRef; use tokio::sync::watch::Sender; use crate::catalog::root_catalog::Catalog; -use crate::catalog::FragmentId; +use crate::catalog::{FragmentId, SecretId}; use crate::scheduler::HummockSnapshotManagerRef; use crate::user::user_manager::UserInfoManager; use crate::user::UserInfoVersion; @@ -63,6 +63,7 @@ impl ObserverState for FrontendObserverNode { | Info::Schema(_) | Info::RelationGroup(_) | Info::Function(_) + | Info::Secret(_) | Info::Connection(_) => { self.handle_catalog_notification(resp); } @@ -142,6 +143,7 @@ impl ObserverState for FrontendObserverNode { serving_worker_slot_mappings, session_params, version, + secrets, } = snapshot; for db in databases { @@ -174,6 +176,9 @@ impl ObserverState for FrontendObserverNode { for connection in connections { catalog_guard.create_connection(&connection) } + for secret in secrets { + catalog_guard.create_secret(&secret) + } for user in users { user_guard.create_user(user) } @@ -346,6 +351,16 @@ impl FrontendObserverNode { Operation::Update => catalog_guard.update_connection(connection), _ => panic!("receive an unsupported notify {:?}", resp), }, + Info::Secret(secret) => match resp.operation() { + Operation::Add => catalog_guard.create_secret(secret), + Operation::Delete => catalog_guard.drop_secret( + secret.database_id, + secret.schema_id, + SecretId::new(secret.id), + ), + Operation::Update => catalog_guard.update_secret(secret), + _ => panic!("receive an unsupported notify {:?}", resp), + }, _ => unreachable!(), } assert!( diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 2fa728194a17d..04e6d7fb4e294 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -807,6 +807,26 @@ impl SessionImpl { } } + pub fn check_secret_name_duplicated(&self, name: ObjectName) -> Result<()> { + let db_name = self.database(); + let catalog_reader = self.env().catalog_reader().read_guard(); + let (schema_name, secret_name) = { + let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, name)?; + let search_path = self.config().search_path(); + let user_name = &self.auth_context().user_name; + let schema_name = match schema_name { + Some(schema_name) => schema_name, + None => catalog_reader + .first_valid_schema(db_name, &search_path, user_name)? + .name(), + }; + (schema_name, secret_name) + }; + catalog_reader + .check_secret_name_duplicated(db_name, &schema_name, &secret_name) + .map_err(RwError::from) + } + pub fn check_connection_name_duplicated(&self, name: ObjectName) -> Result<()> { let db_name = self.database(); let catalog_reader = self.env().catalog_reader().read_guard(); diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index b3bf70f9c523f..97ab9f4eecdc9 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -64,7 +64,7 @@ use tempfile::{Builder, NamedTempFile}; use crate::catalog::catalog_service::CatalogWriter; use crate::catalog::root_catalog::Catalog; -use crate::catalog::{ConnectionId, DatabaseId, SchemaId}; +use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId}; use crate::error::{ErrorCode, Result}; use crate::handler::RwPgResponse; use crate::meta_client::FrontendMetaClient; @@ -373,6 +373,17 @@ impl CatalogWriter for MockCatalogWriter { unreachable!() } + async fn create_secret( + &self, + _secret_name: String, + _database_id: u32, + _schema_id: u32, + _owner_id: u32, + _payload: Vec, + ) -> Result<()> { + unreachable!() + } + async fn comment_on(&self, _comment: PbComment) -> Result<()> { unreachable!() } @@ -530,6 +541,10 @@ impl CatalogWriter for MockCatalogWriter { unreachable!() } + async fn drop_secret(&self, _secret_id: SecretId) -> Result<()> { + unreachable!() + } + async fn drop_database(&self, database_id: u32) -> Result<()> { self.catalog.write().drop_database(database_id); Ok(()) diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index b5848d124c9d3..3ee50276e5d10 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -120,6 +120,15 @@ impl WithOptions { } } +pub(crate) fn resolve_secret_in_with_options( + _with_options: &mut WithOptions, + _session: &SessionImpl, +) -> RwResult> { + // todo: implement the function and take `resolve_privatelink_in_with_option` as reference + + Ok(HashMap::new()) +} + pub(crate) fn resolve_privatelink_in_with_option( with_options: &mut WithOptions, schema_name: &Option, diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 24dfecd99524e..6252d845788af 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -14,6 +14,7 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] +aes-siv = "0.7" anyhow = "1" arc-swap = "1" assert_matches = "1" @@ -21,6 +22,7 @@ async-trait = "0.1" aws-config = { workspace = true } aws-sdk-ec2 = { workspace = true } base64-url = { version = "3.0.0" } +bincode = "1.3" bytes = { version = "1", features = ["serde"] } chrono = "0.4" clap = { workspace = true } diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index 74968d2e3a11c..66f136b6159d1 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -10,6 +10,7 @@ mod m20240410_154406_session_params; mod m20240417_062305_subscription_internal_table_name; mod m20240418_142249_function_runtime; mod m20240506_112555_subscription_partial_ckpt; +mod m20240525_090457_secret; pub struct Migrator; @@ -25,6 +26,7 @@ impl MigratorTrait for Migrator { Box::new(m20240417_062305_subscription_internal_table_name::Migration), Box::new(m20240418_142249_function_runtime::Migration), Box::new(m20240506_112555_subscription_partial_ckpt::Migration), + Box::new(m20240525_090457_secret::Migration), ] } } diff --git a/src/meta/model_v2/migration/src/m20240525_090457_secret.rs b/src/meta/model_v2/migration/src/m20240525_090457_secret.rs new file mode 100644 index 0000000000000..f16bfca5ec035 --- /dev/null +++ b/src/meta/model_v2/migration/src/m20240525_090457_secret.rs @@ -0,0 +1,79 @@ +use sea_orm_migration::prelude::{Table as MigrationTable, *}; + +use crate::{assert_not_has_tables, drop_tables}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + assert_not_has_tables!(manager, Secret); + manager + .create_table( + MigrationTable::create() + .table(Secret::Table) + .if_not_exists() + .col( + ColumnDef::new(Secret::SecretId) + .integer() + .not_null() + .primary_key(), + ) + .col(ColumnDef::new(Secret::Name).string().not_null()) + .col(ColumnDef::new(Secret::Value).binary().not_null()) + .foreign_key( + &mut ForeignKey::create() + .name("FK_secret_object_id") + .from(Secret::Table, Secret::SecretId) + .to( + crate::m20230908_072257_init::Object::Table, + crate::m20230908_072257_init::Object::Oid, + ) + .on_delete(ForeignKeyAction::Cascade) + .to_owned(), + ) + .to_owned(), + ) + .await?; + + // Add a new column to the table + manager + .alter_table( + MigrationTable::alter() + .table(Sink::Table) + .add_column(ColumnDef::new(Sink::SecretRef).json_binary()) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + drop_tables!(manager, Secret); + manager + .alter_table( + MigrationTable::alter() + .table(Sink::Table) + .drop_column(Sink::SecretRef) + .to_owned(), + ) + .await?; + Ok(()) + } +} + +#[derive(DeriveIden)] +enum Secret { + Table, + SecretId, + Name, + Value, +} + +#[derive(DeriveIden)] +enum Sink { + Table, + SecretRef, +} diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 87d7e3e3597f6..864c647957ddf 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -43,6 +43,7 @@ pub mod index; pub mod object; pub mod object_dependency; pub mod schema; +pub mod secret; pub mod serde_seaql_migration; pub mod session_parameter; pub mod sink; @@ -72,6 +73,7 @@ pub type IndexId = ObjectId; pub type ViewId = ObjectId; pub type FunctionId = ObjectId; pub type ConnectionId = ObjectId; +pub type SecretId = ObjectId; pub type UserId = i32; pub type PrivilegeId = i32; @@ -284,6 +286,8 @@ impl From>> for ActorUpstreamActors { } } +derive_from_json_struct!(SecretRef, HashMap); + derive_from_blob!(StreamNode, PbStreamNode); derive_from_blob!(DataType, risingwave_pb::data::PbDataType); derive_array_from_blob!( diff --git a/src/meta/model_v2/src/object.rs b/src/meta/model_v2/src/object.rs index 2b9c291f1e4fe..663f436fcbcb6 100644 --- a/src/meta/model_v2/src/object.rs +++ b/src/meta/model_v2/src/object.rs @@ -40,6 +40,8 @@ pub enum ObjectType { Connection, #[sea_orm(string_value = "SUBSCRIPTION")] Subscription, + #[sea_orm(string_value = "SECRET")] + Secret, } impl ObjectType { @@ -55,6 +57,7 @@ impl ObjectType { ObjectType::Function => "function", ObjectType::Connection => "connection", ObjectType::Subscription => "subscription", + ObjectType::Secret => "secret", } } } diff --git a/src/meta/model_v2/src/prelude.rs b/src/meta/model_v2/src/prelude.rs index 7d3c8cde7acb1..b17eae112aef0 100644 --- a/src/meta/model_v2/src/prelude.rs +++ b/src/meta/model_v2/src/prelude.rs @@ -32,6 +32,7 @@ pub use super::index::Entity as Index; pub use super::object::Entity as Object; pub use super::object_dependency::Entity as ObjectDependency; pub use super::schema::Entity as Schema; +pub use super::secret::Entity as Secret; pub use super::session_parameter::Entity as SessionParameter; pub use super::sink::Entity as Sink; pub use super::source::Entity as Source; diff --git a/src/meta/model_v2/src/secret.rs b/src/meta/model_v2/src/secret.rs new file mode 100644 index 0000000000000..af3590dd0de58 --- /dev/null +++ b/src/meta/model_v2/src/secret.rs @@ -0,0 +1,57 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::catalog::PbSecret; +use sea_orm::entity::prelude::*; +use sea_orm::ActiveValue::Set; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "secret")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub secret_id: i32, + pub name: String, + #[sea_orm(column_type = "Binary(BlobSize::Blob(None))")] + pub value: Vec, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::object::Entity", + from = "Column::SecretId", + to = "super::object::Column::Oid", + on_update = "NoAction", + on_delete = "Cascade" + )] + Object, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Object.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} + +impl From for ActiveModel { + fn from(secret: PbSecret) -> Self { + Self { + secret_id: Set(secret.id as _), + name: Set(secret.name), + value: Set(secret.value), + } + } +} diff --git a/src/meta/model_v2/src/sink.rs b/src/meta/model_v2/src/sink.rs index eafa1beee92f2..78d0806f98a5e 100644 --- a/src/meta/model_v2/src/sink.rs +++ b/src/meta/model_v2/src/sink.rs @@ -18,8 +18,8 @@ use sea_orm::ActiveValue::Set; use serde::{Deserialize, Serialize}; use crate::{ - ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, Property, SinkFormatDesc, SinkId, - TableId, + ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, Property, SecretRef, + SinkFormatDesc, SinkId, TableId, }; #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] @@ -72,6 +72,8 @@ pub struct Model { pub sink_from_name: String, pub sink_format_desc: Option, pub target_table: Option, + // `secret_ref` stores a json string, mapping from property name to secret id. + pub secret_ref: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -127,6 +129,7 @@ impl From for ActiveModel { sink_from_name: Set(pb_sink.sink_from_name), sink_format_desc: Set(pb_sink.format_desc.as_ref().map(|x| x.into())), target_table: Set(pb_sink.target_table.map(|x| x as _)), + secret_ref: Set(Some(SecretRef::from(pb_sink.secret_ref))), } } } diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 21cc0c67860b0..0e49f0805bf1b 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -379,6 +379,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { .developer .max_trivial_move_task_count_per_loop, max_get_task_probe_times: config.meta.developer.max_get_task_probe_times, + secret_store_private_key: config.meta.secret_store_private_key, }, config.system.into_init_system_params(), Default::default(), diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 7b7d4f5a7d092..2e4ba23e02d8f 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -25,7 +25,7 @@ use risingwave_pb::catalog::connection::private_link_service::{ }; use risingwave_pb::catalog::connection::PbPrivateLinkService; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; -use risingwave_pb::catalog::{connection, Comment, Connection, CreateType}; +use risingwave_pb::catalog::{connection, Comment, Connection, CreateType, Secret}; use risingwave_pb::ddl_service::ddl_service_server::DdlService; use risingwave_pb::ddl_service::drop_table_request::PbSourceId; use risingwave_pb::ddl_service::*; @@ -148,6 +148,41 @@ impl DdlService for DdlServiceImpl { })) } + async fn create_secret( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let pb_secret = Secret { + id: 0, + name: req.get_name().clone(), + database_id: req.get_database_id(), + value: req.get_value().clone(), + owner: req.get_owner_id(), + schema_id: req.get_schema_id(), + }; + let version = self + .ddl_controller + .run_command(DdlCommand::CreateSecret(pb_secret)) + .await?; + + Ok(Response::new(CreateSecretResponse { version })) + } + + async fn drop_secret( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let secret_id = req.get_secret_id(); + let version = self + .ddl_controller + .run_command(DdlCommand::DropSecret(secret_id)) + .await?; + + Ok(Response::new(DropSecretResponse { version })) + } + async fn create_schema( &self, request: Request, diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs index f3fec987c2bf1..e4a8d298e0788 100644 --- a/src/meta/service/src/notification_service.rs +++ b/src/meta/service/src/notification_service.rs @@ -80,6 +80,7 @@ impl NotificationServiceImpl { views, functions, connections, + secrets, ) = catalog_guard.database.get_catalog(); let users = catalog_guard.user.list_users(); let notification_version = self.env.notification_manager().current_version().await; @@ -95,6 +96,7 @@ impl NotificationServiceImpl { views, functions, connections, + secrets, ), users, notification_version, @@ -114,6 +116,7 @@ impl NotificationServiceImpl { views, functions, connections, + secrets, ), users, ) = catalog_guard.snapshot().await?; @@ -130,6 +133,7 @@ impl NotificationServiceImpl { views, functions, connections, + secrets, ), users, notification_version, @@ -237,6 +241,7 @@ impl NotificationServiceImpl { views, functions, connections, + secrets, ), users, catalog_version, @@ -271,6 +276,7 @@ impl NotificationServiceImpl { subscriptions, functions, connections, + secrets, users, nodes, hummock_snapshot, diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index cb3e386c34fc5..200736725cd07 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -28,16 +28,16 @@ use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::table::TableType; use risingwave_meta_model_v2::{ actor, connection, database, fragment, function, index, object, object_dependency, schema, - sink, source, streaming_job, subscription, table, user_privilege, view, ActorId, + secret, sink, source, streaming_job, subscription, table, user_privilege, view, ActorId, ActorUpstreamActors, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, FunctionId, I32Array, IndexId, JobStatus, ObjectId, PrivateLinkService, Property, SchemaId, - SinkId, SourceId, StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, - UserId, ViewId, + SecretId, SinkId, SourceId, StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, + TableId, UserId, ViewId, }; use risingwave_pb::catalog::subscription::SubscriptionState; use risingwave_pb::catalog::table::PbTableType; use risingwave_pb::catalog::{ - PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, + PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, PbSubscription, PbTable, PbView, }; use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo; @@ -66,10 +66,10 @@ use crate::controller::rename::{alter_relation_rename, alter_relation_rename_ref use crate::controller::utils::{ check_connection_name_duplicate, check_database_name_duplicate, check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate, - ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id, - get_fragment_mappings_by_jobs, get_parallel_unit_to_worker_map, get_referring_objects, - get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids, - resolve_source_register_info_for_jobs, PartialObject, + check_secret_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty, + ensure_user_id, get_fragment_mappings_by_jobs, get_parallel_unit_to_worker_map, + get_referring_objects, get_referring_objects_cascade, get_user_privilege, + list_user_info_by_ids, resolve_source_register_info_for_jobs, PartialObject, }; use crate::controller::ObjectModel; use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION}; @@ -1100,6 +1100,88 @@ impl CatalogController { Ok(version) } + pub async fn create_secret(&self, mut pb_secret: PbSecret) -> MetaResult { + let inner = self.inner.write().await; + let owner_id = pb_secret.owner as _; + let txn = inner.db.begin().await?; + ensure_user_id(owner_id, &txn).await?; + ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?; + ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?; + check_secret_name_duplicate(&pb_secret, &txn).await?; + + let secret_obj = Self::create_object( + &txn, + ObjectType::Secret, + owner_id, + Some(pb_secret.database_id as _), + Some(pb_secret.schema_id as _), + ) + .await?; + pb_secret.id = secret_obj.oid as _; + let secret: secret::ActiveModel = pb_secret.clone().into(); + Secret::insert(secret).exec(&txn).await?; + + txn.commit().await?; + + let version = self + .notify_frontend( + NotificationOperation::Add, + NotificationInfo::Secret(pb_secret), + ) + .await; + Ok(version) + } + + pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult { + let inner = self.inner.read().await; + let (secret, obj) = Secret::find_by_id(secret_id) + .find_also_related(Object) + .one(&inner.db) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?; + Ok(ObjectModel(secret, obj.unwrap()).into()) + } + + pub async fn drop_secret(&self, secret_id: SecretId) -> MetaResult { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + let (secret, secret_obj) = Secret::find_by_id(secret_id) + .find_also_related(Object) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?; + ensure_object_not_refer(ObjectType::Secret, secret_id, &txn).await?; + + // Find affect users with privileges on the connection. + let to_update_user_ids: Vec = UserPrivilege::find() + .select_only() + .distinct() + .column(user_privilege::Column::UserId) + .filter(user_privilege::Column::Oid.eq(secret_id)) + .into_tuple() + .all(&txn) + .await?; + + let res = Object::delete_by_id(secret_id).exec(&txn).await?; + if res.rows_affected == 0 { + return Err(MetaError::catalog_id_not_found("secret", secret_id)); + } + let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?; + + txn.commit().await?; + + let pb_secret: PbSecret = ObjectModel(secret, secret_obj.unwrap()).into(); + + self.notify_users_update(user_infos).await; + let version = self + .notify_frontend( + NotificationOperation::Delete, + NotificationInfo::Secret(pb_secret), + ) + .await; + Ok(version) + } + pub async fn create_connection( &self, mut pb_connection: PbConnection, @@ -2804,6 +2886,7 @@ impl CatalogControllerInner { let views = self.list_views().await?; let functions = self.list_functions().await?; let connections = self.list_connections().await?; + let secrets = self.list_secrets().await?; let users = self.list_users().await?; @@ -2819,6 +2902,7 @@ impl CatalogControllerInner { views, functions, connections, + secrets, ), users, )) @@ -3037,6 +3121,17 @@ impl CatalogControllerInner { .collect()) } + async fn list_secrets(&self) -> MetaResult> { + let secret_objs = Secret::find() + .find_also_related(Object) + .all(&self.db) + .await?; + Ok(secret_objs + .into_iter() + .map(|(secret, obj)| ObjectModel(secret, obj.unwrap()).into()) + .collect()) + } + async fn list_functions(&self) -> MetaResult> { let func_objs = Function::find() .find_also_related(Object) diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 7e9f20f7557d6..43d7efed1ba58 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -12,10 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use anyhow::anyhow; use risingwave_common::util::epoch::Epoch; use risingwave_meta_model_v2::{ - connection, database, function, index, object, schema, sink, source, subscription, table, view, + connection, database, function, index, object, schema, secret, sink, source, subscription, + table, view, }; use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; @@ -23,7 +26,8 @@ use risingwave_pb::catalog::subscription::PbSubscriptionState; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableType}; use risingwave_pb::catalog::{ PbConnection, PbCreateType, PbDatabase, PbFunction, PbHandleConflictBehavior, PbIndex, - PbSchema, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbSubscription, PbTable, PbView, + PbSchema, PbSecret, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbSubscription, PbTable, + PbView, }; use sea_orm::{DatabaseConnection, ModelTrait}; @@ -82,6 +86,19 @@ impl From> for PbDatabase { } } +impl From> for PbSecret { + fn from(value: ObjectModel) -> Self { + Self { + id: value.0.secret_id as _, + name: value.0.name, + database_id: value.1.database_id.unwrap() as _, + value: value.0.value, + owner: value.1.owner_id as _, + schema_id: value.1.schema_id.unwrap() as _, + } + } +} + impl From> for PbSchema { fn from(value: ObjectModel) -> Self { Self { @@ -184,6 +201,10 @@ impl From> for PbSource { impl From> for PbSink { fn from(value: ObjectModel) -> Self { + let mut secret_ref_hashmap: HashMap = HashMap::new(); + if let Some(secret_ref) = value.0.secret_ref { + secret_ref_hashmap = secret_ref.into_inner(); + } Self { id: value.0.sink_id as _, schema_id: value.1.schema_id.unwrap() as _, @@ -213,6 +234,7 @@ impl From> for PbSink { initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, create_type: PbCreateType::Foreground as _, + secret_ref: secret_ref_hashmap, } } } diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index b654050708546..b98788248a115 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -24,11 +24,11 @@ use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::{ actor, actor_dispatcher, connection, database, fragment, function, index, object, - object_dependency, schema, sink, source, subscription, table, user, user_privilege, view, - worker_property, ActorId, DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, + object_dependency, schema, secret, sink, source, subscription, table, user, user_privilege, + view, worker_property, ActorId, DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, PrivilegeId, SchemaId, SourceId, StreamNode, UserId, WorkerId, }; -use risingwave_pb::catalog::{PbConnection, PbFunction, PbSubscription}; +use risingwave_pb::catalog::{PbConnection, PbFunction, PbSecret, PbSubscription}; use risingwave_pb::meta::{PbFragmentParallelUnitMapping, PbFragmentWorkerSlotMapping}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{PbFragmentTypeFlag, PbStreamNode, StreamSource}; @@ -411,6 +411,27 @@ where Ok(()) } +pub async fn check_secret_name_duplicate(pb_secret: &PbSecret, db: &C) -> MetaResult<()> +where + C: ConnectionTrait, +{ + let count = Secret::find() + .inner_join(Object) + .filter( + object::Column::DatabaseId + .eq(pb_secret.database_id as DatabaseId) + .and(object::Column::SchemaId.eq(pb_secret.schema_id as SchemaId)) + .and(secret::Column::Name.eq(&pb_secret.name)), + ) + .count(db) + .await?; + if count > 0 { + assert_eq!(count, 1); + return Err(MetaError::catalog_duplicated("secret", &pb_secret.name)); + } + Ok(()) +} + pub async fn check_subscription_name_duplicate( pb_subscription: &PbSubscription, db: &C, @@ -762,6 +783,7 @@ where ObjectType::Function => PbObject::FunctionId(oid), ObjectType::Connection => unreachable!("connection is not supported yet"), ObjectType::Subscription => PbObject::SubscriptionId(oid), + ObjectType::Secret => unreachable!("secret is not supported yet"), }; PbGrantPrivilege { action_with_opts: vec![PbActionWithGrantOption { diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index 5b1b24e82de21..1cea1e0c393d8 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -21,15 +21,15 @@ use risingwave_common::catalog::TableOption; use risingwave_pb::catalog::subscription::PbSubscriptionState; use risingwave_pb::catalog::table::TableType; use risingwave_pb::catalog::{ - Connection, CreateType, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, - StreamJobStatus, Subscription, Table, View, + Connection, CreateType, Database, Function, Index, PbStreamJobStatus, Schema, Secret, Sink, + Source, StreamJobStatus, Subscription, Table, View, }; use risingwave_pb::data::DataType; use risingwave_pb::user::grant_privilege::PbObject; use super::{ - ConnectionId, DatabaseId, FunctionId, RelationId, SchemaId, SinkId, SourceId, SubscriptionId, - ViewId, + ConnectionId, DatabaseId, FunctionId, RelationId, SchemaId, SecretId, SinkId, SourceId, + SubscriptionId, ViewId, }; use crate::manager::{IndexId, MetaSrvEnv, TableId, UserId}; use crate::model::MetadataModel; @@ -46,6 +46,7 @@ pub type Catalog = ( Vec, Vec, Vec, + Vec, ); type DatabaseKey = String; @@ -76,10 +77,15 @@ pub struct DatabaseManager { pub(super) functions: BTreeMap, /// Cached connection information. pub(super) connections: BTreeMap, + /// Cached secret information. + pub(super) secrets: BTreeMap, /// Relation reference count mapping. // TODO(zehua): avoid key conflicts after distinguishing table's and source's id generator. pub(super) relation_ref_count: HashMap, + + /// Secret reference count mapping + pub(super) secret_ref_count: HashMap, // In-progress creation tracker. pub(super) in_progress_creation_tracker: HashSet, // In-progress creating streaming job tracker: this is a temporary workaround to avoid clean up @@ -101,8 +107,10 @@ impl DatabaseManager { let functions = Function::list(env.meta_store().as_kv()).await?; let connections = Connection::list(env.meta_store().as_kv()).await?; let subscriptions = Subscription::list(env.meta_store().as_kv()).await?; + let secrets = Secret::list(env.meta_store().as_kv()).await?; let mut relation_ref_count = HashMap::new(); + let mut _secret_ref_count = HashMap::new(); let databases = BTreeMap::from_iter( databases @@ -129,6 +137,7 @@ impl DatabaseManager { .or_default() += 1; (subscription.id, subscription) })); + let secrets = BTreeMap::from_iter(secrets.into_iter().map(|secret| (secret.id, secret))); let indexes = BTreeMap::from_iter(indexes.into_iter().map(|index| (index.id, index))); let tables = BTreeMap::from_iter(tables.into_iter().map(|table| { for depend_relation_id in &table.dependent_relations { @@ -145,6 +154,8 @@ impl DatabaseManager { let functions = BTreeMap::from_iter(functions.into_iter().map(|f| (f.id, f))); let connections = BTreeMap::from_iter(connections.into_iter().map(|c| (c.id, c))); + // todo: scan over stream source info and sink to update secret ref count `_secret_ref_count` + Ok(Self { databases, schemas, @@ -157,6 +168,8 @@ impl DatabaseManager { functions, connections, relation_ref_count, + secrets, + secret_ref_count: _secret_ref_count, in_progress_creation_tracker: HashSet::default(), in_progress_creation_streaming_job: HashMap::default(), in_progress_creating_tables: HashMap::default(), @@ -200,6 +213,7 @@ impl DatabaseManager { self.views.values().cloned().collect_vec(), self.functions.values().cloned().collect_vec(), self.connections.values().cloned().collect_vec(), + self.secrets.values().cloned().collect_vec(), ) } @@ -292,6 +306,16 @@ impl DatabaseManager { } } + pub fn check_secret_name_duplicated(&self, secret_key: &RelationKey) -> MetaResult<()> { + if self.secrets.values().any(|x| { + x.database_id == secret_key.0 && x.schema_id == secret_key.1 && x.name.eq(&secret_key.2) + }) { + Err(MetaError::catalog_duplicated("secret", &secret_key.2)) + } else { + Ok(()) + } + } + pub fn list_databases(&self) -> Vec { self.databases.values().cloned().collect_vec() } @@ -467,6 +491,22 @@ impl DatabaseManager { } } + pub fn increase_secret_ref_count(&mut self, secret_id: SecretId) { + *self.secret_ref_count.entry(secret_id).or_insert(0) += 1; + } + + pub fn decrease_secret_ref_count(&mut self, secret_id: SecretId) { + match self.secret_ref_count.entry(secret_id) { + Entry::Occupied(mut o) => { + *o.get_mut() -= 1; + if *o.get() == 0 { + o.remove_entry(); + } + } + Entry::Vacant(_) => unreachable!(), + } + } + pub fn has_creation_in_database(&self, database_id: DatabaseId) -> bool { self.in_progress_creation_tracker .iter() diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 9259a32eb55cf..824d656e506c0 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -36,7 +36,7 @@ use risingwave_pb::catalog::subscription::PbSubscriptionState; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, TableType}; use risingwave_pb::catalog::{ Comment, Connection, CreateType, Database, Function, Index, PbSource, PbStreamJobStatus, - Schema, Sink, Source, StreamJobStatus, Subscription, Table, View, + Schema, Secret, Sink, Source, StreamJobStatus, Subscription, Table, View, }; use risingwave_pb::ddl_service::{alter_owner_request, alter_set_schema_request}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; @@ -63,6 +63,7 @@ pub type RelationId = u32; pub type IndexId = u32; pub type ViewId = u32; pub type FunctionId = u32; +pub type SecretId = u32; pub type UserId = u32; pub type ConnectionId = u32; @@ -328,6 +329,7 @@ impl CatalogManager { let mut users = BTreeMapTransaction::new(&mut user_core.user_info); let mut functions = BTreeMapTransaction::new(&mut database_core.functions); let mut connections = BTreeMapTransaction::new(&mut database_core.connections); + let mut secrets = BTreeMapTransaction::new(&mut database_core.secrets); /// `drop_by_database_id` provides a wrapper for dropping relations by database id, it will /// return the relation ids that dropped. @@ -360,6 +362,7 @@ impl CatalogManager { let views_to_drop = drop_by_database_id!(views, database_id); let functions_to_drop = drop_by_database_id!(functions, database_id); let connections_to_drop = drop_by_database_id!(connections, database_id); + let secrets_to_drop = drop_by_database_id!(secrets, database_id); connections_dropped = connections_to_drop.clone(); let objects = std::iter::once(Object::DatabaseId(database_id)) @@ -421,6 +424,7 @@ impl CatalogManager { .iter() .map(|connection| connection.owner), ) + .chain(secrets_to_drop.iter().map(|secret| secret.owner)) .for_each(|owner_id| user_core.decrease_ref(owner_id)); // Update relation ref count. @@ -478,6 +482,57 @@ impl CatalogManager { } } + pub async fn create_secret(&self, secret: Secret) -> MetaResult { + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + let user_core = &mut core.user; + database_core.ensure_database_id(secret.database_id)?; + database_core.ensure_schema_id(secret.schema_id)?; + #[cfg(not(test))] + user_core.ensure_user_id(secret.owner)?; + let key = ( + secret.database_id as DatabaseId, + secret.schema_id as SchemaId, + secret.name.clone(), + ); + database_core.check_secret_name_duplicated(&key)?; + + let secret_id = secret.id; + let mut secret_entry = BTreeMapTransaction::new(&mut database_core.secrets); + secret_entry.insert(secret_id, secret.to_owned()); + commit_meta!(self, secret_entry)?; + + user_core.increase_ref(secret.owner); + + let version = self + .notify_frontend(Operation::Add, Info::Secret(secret)) + .await; + Ok(version) + } + + pub async fn drop_secret(&self, secret_id: SecretId) -> MetaResult { + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + let user_core = &mut core.user; + let mut secrets = BTreeMapTransaction::new(&mut database_core.secrets); + + // todo: impl a ref count check for secret + // if secret is used by other relations, not found in the catalog or do not have the privilege to drop, return error + // else: commit the change and notify frontend + + let secret = secrets + .remove(secret_id) + .ok_or_else(|| anyhow!("secret not found"))?; + + commit_meta!(self, secrets)?; + user_core.decrease_ref(secret.owner); + + let version = self + .notify_frontend(Operation::Delete, Info::Secret(secret)) + .await; + Ok(version) + } + pub async fn create_connection( &self, connection: Connection, diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index a8e2909a11bf7..af7218219afc4 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -280,6 +280,9 @@ pub struct MetaOpts { pub compact_task_table_size_partition_threshold_low: u64, pub compact_task_table_size_partition_threshold_high: u64, + + // The private key for the secret store, used when the secret is stored in the meta. + pub secret_store_private_key: Vec, } impl MetaOpts { @@ -340,6 +343,7 @@ impl MetaOpts { object_store_config: ObjectStoreConfig::default(), max_trivial_move_task_count_per_loop: 256, max_get_task_probe_times: 5, + secret_store_private_key: "demo-secret-private-key".as_bytes().to_vec(), } } } diff --git a/src/meta/src/manager/id.rs b/src/meta/src/manager/id.rs index 7fbde6d655e83..023483116fdc8 100644 --- a/src/meta/src/manager/id.rs +++ b/src/meta/src/manager/id.rs @@ -137,6 +137,8 @@ pub mod IdCategory { pub const CompactionGroup: IdCategoryType = 15; pub const Function: IdCategoryType = 16; pub const Connection: IdCategoryType = 17; + + pub const Secret: IdCategoryType = 18; } pub type IdGeneratorManagerRef = Arc; @@ -160,6 +162,7 @@ pub struct IdGeneratorManager { parallel_unit: Arc, compaction_group: Arc, connection: Arc, + secret: Arc, } impl IdGeneratorManager { @@ -209,6 +212,7 @@ impl IdGeneratorManager { connection: Arc::new( StoredIdGenerator::new(meta_store.clone(), "connection", None).await, ), + secret: Arc::new(StoredIdGenerator::new(meta_store.clone(), "secret", None).await), } } @@ -230,6 +234,7 @@ impl IdGeneratorManager { IdCategory::HummockCompactionTask => &self.hummock_compaction_task, IdCategory::CompactionGroup => &self.compaction_group, IdCategory::Connection => &self.connection, + IdCategory::Secret => &self.secret, _ => unreachable!(), } } diff --git a/src/meta/src/model/catalog.rs b/src/meta/src/model/catalog.rs index 8f762255d60b7..c11be01d1a599 100644 --- a/src/meta/src/model/catalog.rs +++ b/src/meta/src/model/catalog.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_pb::catalog::{ - Connection, Database, Function, Index, Schema, Sink, Source, Subscription, Table, View, + Connection, Database, Function, Index, Schema, Secret, Sink, Source, Subscription, Table, View, }; use crate::model::{MetadataModel, MetadataModelResult}; @@ -38,6 +38,8 @@ const CATALOG_SCHEMA_CF_NAME: &str = "cf/catalog_schema"; const CATALOG_DATABASE_CF_NAME: &str = "cf/catalog_database"; /// Column family name for database catalog. const CATALOG_SUBSCRIPTION_CF_NAME: &str = "cf/catalog_subscription"; +/// Column family name for secret catalog. +const CATALOG_SECRET_CF_NAME: &str = "cf/catalog_secret"; macro_rules! impl_model_for_catalog { ($name:ident, $cf:ident, $key_ty:ty, $key_fn:ident) => { @@ -74,6 +76,7 @@ impl_model_for_catalog!(Table, CATALOG_TABLE_CF_NAME, u32, get_id); impl_model_for_catalog!(Schema, CATALOG_SCHEMA_CF_NAME, u32, get_id); impl_model_for_catalog!(Database, CATALOG_DATABASE_CF_NAME, u32, get_id); impl_model_for_catalog!(Subscription, CATALOG_SUBSCRIPTION_CF_NAME, u32, get_id); +impl_model_for_catalog!(Secret, CATALOG_SECRET_CF_NAME, u32, get_id); #[cfg(test)] mod tests { diff --git a/src/meta/src/model/mod.rs b/src/meta/src/model/mod.rs index 0f20fb482a654..e87251ee6d413 100644 --- a/src/meta/src/model/mod.rs +++ b/src/meta/src/model/mod.rs @@ -177,6 +177,7 @@ macro_rules! for_all_metadata_models { { risingwave_pb::user::UserInfo }, { risingwave_pb::catalog::Function }, { risingwave_pb::catalog::Connection }, + { risingwave_pb::catalog::Secret }, // These items need not be included in a meta snapshot. { crate::model::cluster::Worker }, { risingwave_pb::hummock::CompactTaskAssignment }, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index cd448dc51c880..f83e1420eb9c5 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -18,9 +18,12 @@ use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; +use aes_siv::aead::generic_array::GenericArray; +use aes_siv::aead::Aead; +use aes_siv::{Aes128SivAead, KeyInit}; use anyhow::Context; use itertools::Itertools; -use rand::Rng; +use rand::{Rng, RngCore}; use risingwave_common::config::DefaultParallelism; use risingwave_common::hash::{ParallelUnitMapping, VirtualNode}; use risingwave_common::system_param::reader::SystemParamsRead; @@ -45,7 +48,7 @@ use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ connection, Comment, Connection, CreateType, Database, Function, PbSource, PbTable, Schema, - Sink, Source, Subscription, Table, View, + Secret, Sink, Source, Subscription, Table, View, }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ @@ -58,6 +61,7 @@ use risingwave_pb::stream_plan::{ Dispatcher, DispatcherType, FragmentTypeFlag, MergeNode, PbStreamFragmentGraph, StreamFragmentGraph as StreamFragmentGraphProto, }; +use serde::{Deserialize, Serialize}; use thiserror_ext::AsReport; use tokio::sync::Semaphore; use tokio::time::sleep; @@ -65,12 +69,13 @@ use tracing::log::warn; use tracing::Instrument; use crate::barrier::BarrierManagerRef; +use crate::error::MetaErrorInner; use crate::manager::{ CatalogManagerRef, ConnectionId, DatabaseId, FragmentManagerRef, FunctionId, IdCategory, IdCategoryType, IndexId, LocalNotification, MetaSrvEnv, MetadataManager, MetadataManagerV1, - NotificationVersion, RelationIdEnum, SchemaId, SinkId, SourceId, StreamingClusterInfo, - StreamingJob, StreamingJobDiscriminants, SubscriptionId, TableId, UserId, ViewId, - IGNORED_NOTIFICATION_VERSION, + NotificationVersion, RelationIdEnum, SchemaId, SecretId, SinkId, SourceId, + StreamingClusterInfo, StreamingJob, StreamingJobDiscriminants, SubscriptionId, TableId, UserId, + ViewId, IGNORED_NOTIFICATION_VERSION, }; use crate::model::{FragmentId, StreamContext, TableFragments, TableParallelism}; use crate::rpc::cloud_provider::AwsEc2Client; @@ -148,11 +153,19 @@ pub enum DdlCommand { AlterSetSchema(alter_set_schema_request::Object, SchemaId), CreateConnection(Connection), DropConnection(ConnectionId), + CreateSecret(Secret), + DropSecret(SecretId), CommentOn(Comment), CreateSubscription(Subscription), DropSubscription(SubscriptionId, DropMode), } +#[derive(Deserialize, Serialize)] +struct SecretEncryption { + nonce: [u8; 16], + ciphertext: Vec, +} + impl DdlCommand { fn allow_in_recovery(&self) -> bool { match self { @@ -162,7 +175,9 @@ impl DdlCommand { | DdlCommand::DropFunction(_) | DdlCommand::DropView(_, _) | DdlCommand::DropStreamingJob(_, _, _) - | DdlCommand::DropConnection(_) => true, + | DdlCommand::DropConnection(_) + | DdlCommand::DropSecret(_) => true, + // Simply ban all other commands in recovery. _ => false, } @@ -331,6 +346,8 @@ impl DdlController { DdlCommand::DropConnection(connection_id) => { ctrl.drop_connection(connection_id).await } + DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await, + DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await, DdlCommand::AlterSourceColumn(source) => ctrl.alter_source_column(source).await, DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await, DdlCommand::CreateSubscription(subscription) => { @@ -609,6 +626,57 @@ impl DdlController { } } + async fn create_secret(&self, mut secret: Secret) -> MetaResult { + // The 'secret' part of the request we receive from the frontend is in plaintext; + // here, we need to encrypt it before storing it in the catalog. + + let encrypted_payload = { + let data = secret.get_value().as_slice(); + let key = self.env.opts.secret_store_private_key.as_slice(); + let encrypt_key = { + let mut k = key[..(std::cmp::min(key.len(), 32))].to_vec(); + k.resize_with(32, || 0); + k + }; + + let mut rng = rand::thread_rng(); + let mut nonce: [u8; 16] = [0; 16]; + rng.fill_bytes(&mut nonce); + let nonce_array = GenericArray::from_slice(&nonce); + let cipher = Aes128SivAead::new(encrypt_key.as_slice().into()); + + let ciphertext = cipher.encrypt(nonce_array, data).map_err(|e| { + MetaError::from(MetaErrorInner::InvalidParameter(format!( + "failed to encrypt secret {}: {:?}", + secret.name, e + ))) + })?; + bincode::serialize(&SecretEncryption { nonce, ciphertext }).map_err(|e| { + MetaError::from(MetaErrorInner::InvalidParameter(format!( + "failed to serialize secret {}: {:?}", + secret.name, + e.as_report() + ))) + })? + }; + secret.value = encrypted_payload; + + match &self.metadata_manager { + MetadataManager::V1(mgr) => { + secret.id = self.gen_unique_id::<{ IdCategory::Secret }>().await?; + mgr.catalog_manager.create_secret(secret).await + } + MetadataManager::V2(mgr) => mgr.catalog_controller.create_secret(secret).await, + } + } + + async fn drop_secret(&self, secret_id: SecretId) -> MetaResult { + match &self.metadata_manager { + MetadataManager::V1(mgr) => mgr.catalog_manager.drop_secret(secret_id).await, + MetadataManager::V2(mgr) => mgr.catalog_controller.drop_secret(secret_id as _).await, + } + } + pub(crate) async fn delete_vpc_endpoint(&self, connection: &Connection) -> MetaResult<()> { // delete AWS vpc endpoint if let Some(connection::Info::PrivateLinkService(svc)) = &connection.info @@ -908,7 +976,7 @@ impl DdlController { ctx, internal_tables, ) - .await + .await } (CreateType::Background, &StreamingJob::MaterializedView(_)) => { let ctrl = self.clone(); @@ -1931,20 +1999,20 @@ impl DdlController { // Map the column indices in the dispatchers with the given mapping. let downstream_fragments = self.metadata_manager.get_downstream_chain_fragments(id).await? - .into_iter() - .map(|(d, f)| - if let Some(mapping) = &table_col_index_mapping { - Some((mapping.rewrite_dispatch_strategy(&d)?, f)) - } else { - Some((d, f)) - }) - .collect::>() - .ok_or_else(|| { - // The `rewrite` only fails if some column is dropped. - MetaError::invalid_parameter( - "unable to drop the column due to being referenced by downstream materialized views or sinks", - ) - })?; + .into_iter() + .map(|(d, f)| + if let Some(mapping) = &table_col_index_mapping { + Some((mapping.rewrite_dispatch_strategy(&d)?, f)) + } else { + Some((d, f)) + }) + .collect::>() + .ok_or_else(|| { + // The `rewrite` only fails if some column is dropped. + MetaError::invalid_parameter( + "unable to drop the column due to being referenced by downstream materialized views or sinks", + ) + })?; let complete_graph = CompleteStreamFragmentGraph::with_downstreams( fragment_graph, diff --git a/src/prost/build.rs b/src/prost/build.rs index e031e5cfb01ae..67284d844cc3e 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -53,6 +53,7 @@ fn main() -> Result<(), Box> { "task_service", "telemetry", "user", + "secret", ]; let protos: Vec = proto_files .iter() diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index d4f0359fadab2..27d0523b84115 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -56,7 +56,7 @@ pub mod batch_plan; #[cfg_attr(madsim, path = "sim/task_service.rs")] pub mod task_service; #[rustfmt::skip] -#[cfg_attr(madsim, path="sim/connector_service.rs")] +#[cfg_attr(madsim, path = "sim/connector_service.rs")] pub mod connector_service; #[rustfmt::skip] #[cfg_attr(madsim, path = "sim/stream_plan.rs")] @@ -91,6 +91,10 @@ pub mod health; #[rustfmt::skip] #[path = "sim/telemetry.rs"] pub mod telemetry; + +#[rustfmt::skip] +#[path = "sim/secret.rs"] +pub mod secret; #[rustfmt::skip] #[path = "connector_service.serde.rs"] pub mod connector_service_serde; @@ -158,6 +162,10 @@ pub mod java_binding_serde; #[path = "telemetry.serde.rs"] pub mod telemetry_serde; +#[rustfmt::skip] +#[path = "secret.serde.rs"] +pub mod secret_serde; + #[derive(Clone, PartialEq, Eq, Debug, Error)] #[error("field `{0}` not found")] pub struct PbFieldNotFound(pub &'static str); diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index e4d6b53003788..296f8de4d888f 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -23,7 +23,7 @@ use async_trait::async_trait; use either::Either; use futures::stream::BoxStream; use lru::LruCache; -use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; +use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, SecretId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::system_param::reader::SystemParamsReader; @@ -172,6 +172,25 @@ impl MetaClient { Ok(resp.version) } + pub async fn create_secret( + &self, + secret_name: String, + database_id: u32, + schema_id: u32, + owner_id: u32, + value: Vec, + ) -> Result { + let request = CreateSecretRequest { + name: secret_name, + database_id, + schema_id, + owner_id, + value, + }; + let resp = self.inner.create_secret(request).await?; + Ok(resp.version) + } + pub async fn list_connections(&self, _name: Option<&str>) -> Result> { let request = ListConnectionsRequest {}; let resp = self.inner.list_connections(request).await?; @@ -184,6 +203,14 @@ impl MetaClient { Ok(resp.version) } + pub async fn drop_secret(&self, secret_id: SecretId) -> Result { + let request = DropSecretRequest { + secret_id: secret_id.into(), + }; + let resp = self.inner.drop_secret(request).await?; + Ok(resp.version) + } + /// Register the current node to the cluster and set the corresponding worker id. pub async fn register_new( addr_strategy: MetaAddressStrategy, @@ -1931,12 +1958,14 @@ macro_rules! for_all_meta_rpc { ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse } ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse } ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse } + ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse } ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse } ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse } ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse } ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse } ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse } ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse } + , {ddl_client, drop_secret, DropSecretRequest, DropSecretResponse} ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse } ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse } ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse } diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 2c3aa67cfaf36..4b96565a0d683 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -985,6 +985,7 @@ pub enum ShowObject { Subscription { schema: Option }, Columns { table: ObjectName }, Connection { schema: Option }, + Secret { schema: Option }, Function { schema: Option }, Indexes { table: ObjectName }, Cluster, @@ -1033,6 +1034,7 @@ impl fmt::Display for ShowObject { ShowObject::Jobs => write!(f, "JOBS"), ShowObject::ProcessList => write!(f, "PROCESSLIST"), ShowObject::Subscription { schema } => write!(f, "SUBSCRIPTIONS{}", fmt_schema(schema)), + ShowObject::Secret { schema } => write!(f, "SECRETS{}", fmt_schema(schema)), } } } @@ -1109,6 +1111,7 @@ pub struct ExplainOptions { // explain's plan type pub explain_type: ExplainType, } + impl Default for ExplainOptions { fn default() -> Self { Self { @@ -1118,6 +1121,7 @@ impl Default for ExplainOptions { } } } + impl fmt::Display for ExplainOptions { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let default = Self::default(); @@ -1271,6 +1275,9 @@ pub enum Statement { CreateConnection { stmt: CreateConnectionStatement, }, + CreateSecret { + stmt: CreateSecretStatement, + }, /// CREATE FUNCTION /// /// Postgres: @@ -1576,14 +1583,14 @@ impl fmt::Display for Statement { write!(f, "DESCRIBE {}", name)?; Ok(()) } - Statement::ShowObjects{ object: show_object, filter} => { + Statement::ShowObjects { object: show_object, filter } => { write!(f, "SHOW {}", show_object)?; if let Some(filter) = filter { write!(f, " {}", filter)?; } Ok(()) } - Statement::ShowCreateObject{ create_type: show_type, name } => { + Statement::ShowCreateObject { create_type: show_type, name } => { write!(f, "SHOW CREATE {} {}", show_type, name)?; Ok(()) } @@ -1597,7 +1604,7 @@ impl fmt::Display for Statement { source, returning, } => { - write!(f, "INSERT INTO {table_name} ", table_name = table_name,)?; + write!(f, "INSERT INTO {table_name} ", table_name = table_name, )?; if !columns.is_empty() { write!(f, "({}) ", display_comma_separated(columns))?; } @@ -1805,18 +1812,18 @@ impl fmt::Display for Statement { write!(f, "{}", display_comma_separated( include_column_options.iter().map(|option_item: &IncludeOptionItem| { format!("INCLUDE {}{}{}", - option_item.column_type, + option_item.column_type, if let Some(inner_field) = &option_item.inner_field { format!(" {}", inner_field) } else { "".into() } , if let Some(alias) = &option_item.column_alias { - format!(" AS {}", alias) - } else { - "".into() - } - ) + format!(" AS {}", alias) + } else { + "".into() + } + ) }).collect_vec().as_slice() ))?; } @@ -1875,6 +1882,7 @@ impl fmt::Display for Statement { Statement::DeclareCursor { stmt } => write!(f, "DECLARE {}", stmt,), Statement::FetchCursor { stmt } => write!(f, "FETCH {}", stmt), Statement::CloseCursor { stmt } => write!(f, "CLOSE {}", stmt), + Statement::CreateSecret { stmt } => write!(f, "CREATE SECRET {}", stmt), Statement::AlterDatabase { name, operation } => { write!(f, "ALTER DATABASE {} {}", name, operation) } @@ -2003,10 +2011,10 @@ impl fmt::Display for Statement { Ok(()) } Statement::Commit { chain } => { - write!(f, "COMMIT{}", if *chain { " AND CHAIN" } else { "" },) + write!(f, "COMMIT{}", if *chain { " AND CHAIN" } else { "" }, ) } Statement::Rollback { chain } => { - write!(f, "ROLLBACK{}", if *chain { " AND CHAIN" } else { "" },) + write!(f, "ROLLBACK{}", if *chain { " AND CHAIN" } else { "" }, ) } Statement::CreateSchema { schema_name, @@ -2110,7 +2118,7 @@ impl fmt::Display for Statement { Statement::AlterUser(statement) => { write!(f, "ALTER USER {}", statement) } - Statement::AlterSystem{param, value} => { + Statement::AlterSystem { param, value } => { f.write_str("ALTER SYSTEM SET ")?; write!( f, @@ -2528,6 +2536,7 @@ pub enum ObjectType { Database, User, Connection, + Secret, Subscription, } @@ -2543,6 +2552,7 @@ impl fmt::Display for ObjectType { ObjectType::Sink => "SINK", ObjectType::Database => "DATABASE", ObjectType::User => "USER", + ObjectType::Secret => "SECRET", ObjectType::Connection => "CONNECTION", ObjectType::Subscription => "SUBSCRIPTION", }) @@ -2571,11 +2581,13 @@ impl ParseTo for ObjectType { ObjectType::User } else if parser.parse_keyword(Keyword::CONNECTION) { ObjectType::Connection + } else if parser.parse_keyword(Keyword::SECRET) { + ObjectType::Secret } else if parser.parse_keyword(Keyword::SUBSCRIPTION) { ObjectType::Subscription } else { return parser.expected( - "TABLE, VIEW, INDEX, MATERIALIZED VIEW, SOURCE, SINK, SUBSCRIPTION, SCHEMA, DATABASE, USER or CONNECTION after DROP", + "TABLE, VIEW, INDEX, MATERIALIZED VIEW, SOURCE, SINK, SUBSCRIPTION, SCHEMA, DATABASE, USER, SECRET or CONNECTION after DROP", parser.peek_token(), ); }; @@ -2978,6 +2990,7 @@ impl fmt::Display for CreateFunctionBody { Ok(()) } } + #[derive(Debug, Default, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct CreateFunctionWithOptions { diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index aef8ec417f605..2e5b281d1938f 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -13,6 +13,7 @@ // limitations under the License. use core::fmt; +use core::fmt::Formatter; use std::fmt::Write; use itertools::Itertools; @@ -21,7 +22,7 @@ use serde::{Deserialize, Serialize}; use super::ddl::SourceWatermark; use super::legacy_source::{parse_source_schema, CompatibleSourceSchema}; -use super::{EmitMode, Ident, ObjectType, Query}; +use super::{EmitMode, Ident, ObjectType, Query, Value}; use crate::ast::{ display_comma_separated, display_separated, ColumnDef, ObjectName, SqlOption, TableConstraint, }; @@ -850,6 +851,47 @@ impl fmt::Display for CreateConnectionStatement { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct CreateSecretStatement { + pub if_not_exists: bool, + pub secret_name: ObjectName, + pub credential: Value, + pub with_properties: WithProperties, +} + +impl ParseTo for CreateSecretStatement { + fn parse_to(parser: &mut Parser) -> Result { + impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], parser); + impl_parse_to!(secret_name: ObjectName, parser); + impl_parse_to!(with_properties: WithProperties, parser); + let mut credential = Value::Null; + if parser.parse_keyword(Keyword::AS) { + credential = parser.parse_value()?; + } + Ok(Self { + if_not_exists, + secret_name, + credential, + with_properties, + }) + } +} + +impl fmt::Display for CreateSecretStatement { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let mut v: Vec = vec![]; + impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self); + impl_fmt_display!(secret_name, v, self); + impl_fmt_display!(with_properties, v, self); + if self.credential != Value::Null { + v.push("AS".to_string()); + impl_fmt_display!(credential, v, self); + } + v.iter().join(" ").fmt(f) + } +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct AstVec(pub Vec); diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index e3f968b0caac6..0ed69adab3ad0 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -448,6 +448,8 @@ define_keywords!( SCROLL, SEARCH, SECOND, + SECRET, + SECRETS, SELECT, SENSITIVE, SEQUENCE, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index a152e9e8d90da..243d9e695ff3a 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2178,6 +2178,8 @@ impl Parser { self.parse_create_database() } else if self.parse_keyword(Keyword::USER) { self.parse_create_user() + } else if self.parse_keyword(Keyword::SECRET) { + self.parse_create_secret() } else { self.expected("an object type after CREATE", self.peek_token()) } @@ -2529,6 +2531,12 @@ impl Parser { Ok(Statement::CreateUser(CreateUserStatement::parse_to(self)?)) } + fn parse_create_secret(&mut self) -> Result { + Ok(Statement::CreateSecret { + stmt: CreateSecretStatement::parse_to(self)?, + }) + } + pub fn parse_with_properties(&mut self) -> Result, ParserError> { Ok(self .parse_options_with_preceding_keyword(Keyword::WITH)? @@ -3721,7 +3729,7 @@ impl Parser { } /// Parse a literal value (numbers, strings, date/time, booleans) - fn parse_value(&mut self) -> Result { + pub fn parse_value(&mut self) -> Result { let token = self.next_token(); match token.token { Token::Word(w) => match w.keyword { @@ -4596,6 +4604,14 @@ impl Parser { return self.expected("from after columns", self.peek_token()); } } + Keyword::SECRETS => { + return Ok(Statement::ShowObjects { + object: ShowObject::Secret { + schema: self.parse_from_and_identifier()?, + }, + filter: self.parse_show_statement_filter()?, + }); + } Keyword::CONNECTIONS => { return Ok(Statement::ShowObjects { object: ShowObject::Connection { diff --git a/src/sqlparser/tests/testdata/create.yaml b/src/sqlparser/tests/testdata/create.yaml index 670e325ea08e8..831886b9bdb36 100644 --- a/src/sqlparser/tests/testdata/create.yaml +++ b/src/sqlparser/tests/testdata/create.yaml @@ -116,3 +116,7 @@ error_msg: |- sql parser error: Expected literal string, found: null at line:1, column:45 Near " tmp with encrypted password null" +- input: CREATE SECRET secret1 WITH (backend = 'meta') AS 'demo-secret' + formatted_sql: CREATE SECRET secret1 WITH (backend = 'meta') AS 'demo-secret' +- input: CREATE SECRET IF NOT EXISTS secret2 WITH (backend = 'meta') AS 'demo-secret + error_msg: 'sql parser error: Unterminated string literal at Line: 1, Column 62' diff --git a/src/sqlparser/tests/testdata/drop.yaml b/src/sqlparser/tests/testdata/drop.yaml index 3fd366d3ea71d..6b8a70d0bf9be 100644 --- a/src/sqlparser/tests/testdata/drop.yaml +++ b/src/sqlparser/tests/testdata/drop.yaml @@ -16,3 +16,7 @@ formatted_sql: DROP USER user - input: DROP USER IF EXISTS user formatted_sql: DROP USER IF EXISTS user +- input: DROP SECRET secret + formatted_sql: DROP SECRET secret +- input: DROP SECRET IF EXISTS secret + formatted_sql: DROP SECRET IF EXISTS secret diff --git a/src/sqlparser/tests/testdata/show.yaml b/src/sqlparser/tests/testdata/show.yaml index 8f15d2667c589..a3df33bff2cbc 100644 --- a/src/sqlparser/tests/testdata/show.yaml +++ b/src/sqlparser/tests/testdata/show.yaml @@ -53,3 +53,9 @@ - input: SHOW INDEXES FROM t formatted_sql: SHOW INDEXES FROM t formatted_ast: 'ShowObjects { object: Indexes { table: ObjectName([Ident { value: "t", quote_style: None }]) }, filter: None }' +- input: SHOW SECRETS + formatted_sql: SHOW SECRETS + formatted_ast: 'ShowObjects { object: Secret { schema: None }, filter: None }' +- input: SHOW SECRETS FROM t + formatted_sql: SHOW SECRETS FROM t + formatted_ast: 'ShowObjects { object: Secret { schema: Some(Ident { value: "t", quote_style: None }) }, filter: None }' diff --git a/src/utils/pgwire/src/pg_response.rs b/src/utils/pgwire/src/pg_response.rs index 8764c2bc91456..4f55c524942bc 100644 --- a/src/utils/pgwire/src/pg_response.rs +++ b/src/utils/pgwire/src/pg_response.rs @@ -26,6 +26,7 @@ use crate::types::Row; pub type RowSet = Vec; pub type RowSetResult = Result; + pub trait ValuesStream = Stream + Unpin + Send; #[derive(Copy, Clone, Debug, Eq, PartialEq)] @@ -56,6 +57,7 @@ pub enum StatementType { CREATE_AGGREGATE, CREATE_FUNCTION, CREATE_CONNECTION, + CREATE_SECRET, COMMENT, DECLARE_CURSOR, DESCRIBE, @@ -74,6 +76,7 @@ pub enum StatementType { DROP_DATABASE, DROP_USER, DROP_CONNECTION, + DROP_SECRET, ALTER_DATABASE, ALTER_SCHEMA, ALTER_INDEX, @@ -118,6 +121,7 @@ impl std::fmt::Display for StatementType { } pub trait Callback = Future> + Send; + pub type BoxedCallback = Pin>; pub struct PgResponse { @@ -293,6 +297,7 @@ impl StatementType { risingwave_sqlparser::ast::ObjectType::Connection => { Ok(StatementType::DROP_CONNECTION) } + risingwave_sqlparser::ast::ObjectType::Secret => Ok(StatementType::DROP_SECRET), risingwave_sqlparser::ast::ObjectType::Subscription => { Ok(StatementType::DROP_SUBSCRIPTION) }