From ea6821d9bbe096c9171b8c3350d42f3c87d374bc Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Wed, 20 Nov 2024 11:18:39 -0600 Subject: [PATCH 01/11] init --- e2e_test/ddl/secret.slt | 17 ++++ e2e_test/sink/iceberg_sink.slt | 5 ++ .../source_legacy/cdc/cdc.share_stream.slt | 26 +++++- proto/ddl_service.proto | 14 +++ src/common/secret/src/secret_manager.rs | 18 +++- src/compute/src/observer/observer_manager.rs | 3 + src/frontend/src/catalog/catalog_service.rs | 33 +++++++ src/frontend/src/catalog/secret_catalog.rs | 4 +- src/frontend/src/handler/alter_secret.rs | 68 +++++++++++++++ src/frontend/src/handler/create_secret.rs | 85 +++++++++---------- src/frontend/src/handler/drop_secret.rs | 74 ++++++++++------ src/frontend/src/handler/mod.rs | 6 ++ src/frontend/src/observer/observer_manager.rs | 3 + src/frontend/src/test_utils.rs | 12 +++ src/meta/service/src/ddl_service.rs | 21 +++++ src/meta/src/controller/catalog.rs | 38 ++++++++- src/meta/src/rpc/ddl_controller.rs | 45 +++++++--- src/rpc_client/src/meta_client.rs | 24 ++++++ src/sqlparser/src/ast/ddl.rs | 17 +++- src/sqlparser/src/ast/mod.rs | 18 +++- src/sqlparser/src/parser.rs | 17 +++- src/utils/pgwire/src/pg_response.rs | 1 + 22 files changed, 458 insertions(+), 91 deletions(-) create mode 100644 src/frontend/src/handler/alter_secret.rs diff --git a/e2e_test/ddl/secret.slt b/e2e_test/ddl/secret.slt index 6522565986a18..6cedc89a8e2a9 100644 --- a/e2e_test/ddl/secret.slt +++ b/e2e_test/ddl/secret.slt @@ -40,6 +40,23 @@ create secret secret_1 with ( backend = 'meta' ) as 'demo_secret'; +statement ok +alter secret secret_1 with ( + backend = 'meta' +) as 'demo_secret_altered'; + +statement error +alter secret secret_2 with ( + backend = 'meta' +) as 'demo_secret_altered'; +--- +db error: ERROR: Failed to run the query + +Caused by these errors (recent errors listed first): + 1: Catalog error + 2: secret not found: secret_2 + + # wait for support for hashicorp_vault backend # statement ok # create secret secret_2 with ( diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index b08abd8a4918c..ced999e546d14 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -10,6 +10,11 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; statement ok CREATE SECRET iceberg_s3_access_key WITH ( backend = 'meta' +) as 'hummockadmin_wrong'; + +statement ok +ALTER SECRET iceberg_s3_access_key WITH ( + backend = 'meta' ) as 'hummockadmin'; statement ok diff --git a/e2e_test/source_legacy/cdc/cdc.share_stream.slt b/e2e_test/source_legacy/cdc/cdc.share_stream.slt index cf1000957b6fb..82e1a35cd74e0 100644 --- a/e2e_test/source_legacy/cdc/cdc.share_stream.slt +++ b/e2e_test/source_legacy/cdc/cdc.share_stream.slt @@ -22,6 +22,31 @@ mysql --protocol=tcp -u root mytest < e2e_test/source_legacy/cdc/mysql_init_data statement ok create secret mysql_pwd with ( backend = 'meta' +) as 'incorrect_password'; + +# create a cdc source job, with incorrct password +statement error +create source mysql_mytest with ( + connector = 'mysql-cdc', + hostname = '${MYSQL_HOST:localhost}', + port = '${MYSQL_TCP_PORT:8306}', + username = 'rwcdc', + password = secret mysql_pwd, + database.name = 'mytest', + server.id = '5601' +); +--- +db error: ERROR: Failed to run the query + +Caused by these errors (recent errors listed first): + 1: gRPC request to meta service failed: Internal error + 2: failed to create source worker + 3: failed to create SplitEnumerator + 4: source cannot pass validation + +statement ok +alter secret mysql_pwd with ( + backend = 'meta' ) as '${MYSQL_PWD:}'; # create a cdc source job, which format fixed to `FORMAT PLAIN ENCODE JSON` @@ -36,7 +61,6 @@ create source mysql_mytest with ( server.id = '5601' ); - statement error Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source create materialized view mv as select * from mysql_mytest; diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 6467bd6e1d7e7..c3d0b2985197d 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -414,6 +414,19 @@ message DropSecretResponse { WaitVersion version = 1; } +message AlterSecretRequest { + uint32 secret_id = 1; + string name = 2; + bytes value = 3; + uint32 database_id = 4; + uint32 schema_id = 5; + uint32 owner_id = 6; +} + +message AlterSecretResponse { + WaitVersion version = 1; +} + message CreateConnectionRequest { message PrivateLink { catalog.Connection.PrivateLinkService.PrivateLinkProvider provider = 1; @@ -510,6 +523,7 @@ service DdlService { rpc CreateTable(CreateTableRequest) returns (CreateTableResponse); rpc CreateSecret(CreateSecretRequest) returns (CreateSecretResponse); rpc DropSecret(DropSecretRequest) returns (DropSecretResponse); + rpc AlterSecret(AlterSecretRequest) returns (AlterSecretResponse); rpc AlterName(AlterNameRequest) returns (AlterNameResponse); rpc AlterSource(AlterSourceRequest) returns (AlterSourceResponse); rpc AlterOwner(AlterOwnerRequest) returns (AlterOwnerResponse); diff --git a/src/common/secret/src/secret_manager.rs b/src/common/secret/src/secret_manager.rs index b6a71a4c3ebe8..b431565e5b438 100644 --- a/src/common/secret/src/secret_manager.rs +++ b/src/common/secret/src/secret_manager.rs @@ -74,7 +74,23 @@ impl LocalSecretManager { pub fn add_secret(&self, secret_id: SecretId, secret: Vec) { let mut secret_guard = self.secrets.write(); - secret_guard.insert(secret_id, secret); + if secret_guard.insert(secret_id, secret).is_some() { + tracing::error!( + secret_id = secret_id, + "adding a secret but it already exists, overwriting it" + ); + }; + } + + pub fn update_secret(&self, secret_id: SecretId, secret: Vec) { + let mut secret_guard = self.secrets.write(); + if secret_guard.insert(secret_id, secret).is_none() { + tracing::error!( + secret_id = secret_id, + "updating a secret but it does not exist, adding it" + ); + } + self.remove_secret_file_if_exist(&secret_id); } pub fn init_secrets(&self, secrets: Vec) { diff --git a/src/compute/src/observer/observer_manager.rs b/src/compute/src/observer/observer_manager.rs index c028c1e851613..3f3a57c363930 100644 --- a/src/compute/src/observer/observer_manager.rs +++ b/src/compute/src/observer/observer_manager.rs @@ -38,6 +38,9 @@ impl ObserverState for ComputeObserverNode { Operation::Delete => { LocalSecretManager::global().remove_secret(s.id); } + Operation::Update => { + LocalSecretManager::global().update_secret(s.id, s.value); + } _ => { panic!("error type notification"); } diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 271d395181df8..d4720caba7cf1 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -175,6 +175,16 @@ pub trait CatalogWriter: Send + Sync { async fn drop_secret(&self, secret_id: SecretId) -> Result<()>; + async fn alter_secret( + &self, + secret_id: u32, + secret_name: String, + database_id: u32, + schema_id: u32, + owner_id: u32, + payload: Vec, + ) -> Result<()>; + async fn alter_name( &self, object_id: alter_name_request::Object, @@ -506,6 +516,29 @@ impl CatalogWriter for CatalogWriterImpl { let version = self.meta_client.alter_swap_rename(object).await?; self.wait_version(version).await } + + async fn alter_secret( + &self, + secret_id: u32, + secret_name: String, + database_id: u32, + schema_id: u32, + owner_id: u32, + payload: Vec, + ) -> Result<()> { + let version = self + .meta_client + .alter_secret( + secret_id, + secret_name, + database_id, + schema_id, + owner_id, + payload, + ) + .await?; + self.wait_version(version).await + } } impl CatalogWriterImpl { diff --git a/src/frontend/src/catalog/secret_catalog.rs b/src/frontend/src/catalog/secret_catalog.rs index d1f9048baf0e7..db50fbc201bdb 100644 --- a/src/frontend/src/catalog/secret_catalog.rs +++ b/src/frontend/src/catalog/secret_catalog.rs @@ -14,7 +14,7 @@ use risingwave_pb::catalog::PbSecret; -use crate::catalog::{DatabaseId, OwnedByUserCatalog, SecretId}; +use crate::catalog::{DatabaseId, OwnedByUserCatalog, SchemaId, SecretId}; use crate::user::UserId; #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -22,6 +22,7 @@ pub struct SecretCatalog { pub id: SecretId, pub name: String, pub database_id: DatabaseId, + pub schema_id: SchemaId, pub value: Vec, pub owner: UserId, } @@ -34,6 +35,7 @@ impl From<&PbSecret> for SecretCatalog { owner: value.owner, name: value.name.clone(), value: value.value.clone(), + schema_id: value.schema_id, } } } diff --git a/src/frontend/src/handler/alter_secret.rs b/src/frontend/src/handler/alter_secret.rs new file mode 100644 index 0000000000000..5ad25e84b0e69 --- /dev/null +++ b/src/frontend/src/handler/alter_secret.rs @@ -0,0 +1,68 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pgwire::pg_response::StatementType; +use risingwave_common::license::Feature; +use risingwave_sqlparser::ast::{AlterSecretOperation, ObjectName, SqlOption}; + +use super::create_secret::get_secret_payload; +use super::drop_secret::fetch_secret_catalog_with_db_schema_id; +use crate::error::Result; +use crate::handler::{HandlerArgs, RwPgResponse}; +use crate::WithOptions; + +pub async fn handle_alter_secret( + handler_args: HandlerArgs, + secret_name: ObjectName, + sql_options: Vec, + operation: AlterSecretOperation, +) -> Result { + Feature::SecretManagement + .check_available() + .map_err(|e| anyhow::anyhow!(e))?; + + let session = handler_args.session; + + if let Some((secret_catalog, _, _)) = + fetch_secret_catalog_with_db_schema_id(&session, &secret_name, false)? + { + let AlterSecretOperation::ChangeCredential { new_credential } = operation; + + let with_options = WithOptions::try_from(sql_options.as_ref() as &[SqlOption])?; + + let secret_payload = get_secret_payload(new_credential, with_options)?; + + let catalog_writer = session.catalog_writer()?; + + catalog_writer + .alter_secret( + secret_catalog.id.secret_id(), + secret_catalog.name.clone(), + secret_catalog.database_id, + secret_catalog.schema_id, + secret_catalog.owner, + secret_payload, + ) + .await?; + + Ok(RwPgResponse::empty_result(StatementType::ALTER_SECRET)) + } else { + Ok(RwPgResponse::builder(StatementType::ALTER_SECRET) + .notice(format!( + "secret \"{}\" does not exist, skipping", + secret_name + )) + .into()) + } +} diff --git a/src/frontend/src/handler/create_secret.rs b/src/frontend/src/handler/create_secret.rs index 9810751361be3..4a347a0fb86f6 100644 --- a/src/frontend/src/handler/create_secret.rs +++ b/src/frontend/src/handler/create_secret.rs @@ -37,60 +37,21 @@ pub async fn handle_create_secret( let session = handler_args.session.clone(); let db_name = session.database(); - let (schema_name, connection_name) = + let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, stmt.secret_name.clone())?; if let Err(e) = session.check_secret_name_duplicated(stmt.secret_name.clone()) { return if stmt.if_not_exists { Ok(PgResponse::builder(StatementType::CREATE_SECRET) - .notice(format!("secret \"{}\" exists, skipping", connection_name)) + .notice(format!("secret \"{}\" exists, skipping", secret_name)) .into()) } else { Err(e) }; } + let with_options = WithOptions::try_from(stmt.with_properties.0.as_ref() as &[SqlOption])?; - let secret = secret_to_str(&stmt.credential)?.as_bytes().to_vec(); - - // check if the secret backend is supported - let with_props = WithOptions::try_from(stmt.with_properties.0.as_ref() as &[SqlOption])?; - let secret_payload: Vec = { - if let Some(backend) = with_props.get(SECRET_BACKEND_KEY) { - match backend.to_lowercase().as_ref() { - SECRET_BACKEND_META => { - let backend = risingwave_pb::secret::Secret { - secret_backend: Some(risingwave_pb::secret::secret::SecretBackend::Meta( - risingwave_pb::secret::SecretMetaBackend { value: secret }, - )), - }; - backend.encode_to_vec() - } - SECRET_BACKEND_HASHICORP_VAULT => { - if stmt.credential != Value::Null { - return Err(ErrorCode::InvalidParameterValue( - "credential must be null for hashicorp_vault backend".to_string(), - ) - .into()); - } - bail_not_implemented!("hashicorp_vault backend is not implemented yet") - } - _ => { - return Err(ErrorCode::InvalidParameterValue(format!( - "secret backend \"{}\" is not supported. Supported backends are: {}", - backend, - [SECRET_BACKEND_META, SECRET_BACKEND_HASHICORP_VAULT].join(",") - )) - .into()); - } - } - } else { - return Err(ErrorCode::InvalidParameterValue(format!( - "secret backend is not specified in with clause. Supported backends are: {}", - [SECRET_BACKEND_META, SECRET_BACKEND_HASHICORP_VAULT].join(",") - )) - .into()); - } - }; + let secret_payload = get_secret_payload(stmt.credential, with_options)?; let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; @@ -117,3 +78,41 @@ fn secret_to_str(value: &Value) -> Result { .into()), } } + +pub(crate) fn get_secret_payload(credential: Value, with_options: WithOptions) -> Result> { + let secret = secret_to_str(&credential)?.as_bytes().to_vec(); + + if let Some(backend) = with_options.get(SECRET_BACKEND_KEY) { + match backend.to_lowercase().as_ref() { + SECRET_BACKEND_META => { + let backend = risingwave_pb::secret::Secret { + secret_backend: Some(risingwave_pb::secret::secret::SecretBackend::Meta( + risingwave_pb::secret::SecretMetaBackend { value: secret }, + )), + }; + Ok(backend.encode_to_vec()) + } + SECRET_BACKEND_HASHICORP_VAULT => { + if credential != Value::Null { + return Err(ErrorCode::InvalidParameterValue( + "credential must be null for hashicorp_vault backend".to_string(), + ) + .into()); + } + bail_not_implemented!("hashicorp_vault backend is not implemented yet") + } + _ => Err(ErrorCode::InvalidParameterValue(format!( + "secret backend \"{}\" is not supported. Supported backends are: {}", + backend, + [SECRET_BACKEND_META, SECRET_BACKEND_HASHICORP_VAULT].join(",") + )) + .into()), + } + } else { + Err(ErrorCode::InvalidParameterValue(format!( + "secret backend is not specified in with clause. Supported backends are: {}", + [SECRET_BACKEND_META, SECRET_BACKEND_HASHICORP_VAULT].join(",") + )) + .into()) + } +} diff --git a/src/frontend/src/handler/drop_secret.rs b/src/frontend/src/handler/drop_secret.rs index eff4b35224b8b..4720d73bfa7e6 100644 --- a/src/frontend/src/handler/drop_secret.rs +++ b/src/frontend/src/handler/drop_secret.rs @@ -12,13 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use pgwire::pg_response::StatementType; use risingwave_common::license::Feature; use risingwave_sqlparser::ast::ObjectName; use crate::catalog::root_catalog::SchemaPath; +use crate::catalog::secret_catalog::SecretCatalog; +use crate::catalog::{DatabaseId, SchemaId}; use crate::error::Result; use crate::handler::{HandlerArgs, RwPgResponse}; +use crate::session::SessionImpl; use crate::Binder; pub async fn handle_drop_secret( @@ -31,37 +36,54 @@ pub async fn handle_drop_secret( .map_err(|e| anyhow::anyhow!(e))?; let session = handler_args.session; + + if let Some((secret_catalog, _, _)) = + fetch_secret_catalog_with_db_schema_id(&session, &secret_name, if_exists)? + { + let catalog_writer = session.catalog_writer()?; + catalog_writer.drop_secret(secret_catalog.id).await?; + + Ok(RwPgResponse::empty_result(StatementType::DROP_SECRET)) + } else { + Ok(RwPgResponse::builder(StatementType::DROP_SECRET) + .notice(format!( + "secret \"{}\" does not exist, skipping", + secret_name + )) + .into()) + } +} + +/// Fetch the secret catalog and the `database/schema_id` of the source. +pub fn fetch_secret_catalog_with_db_schema_id( + session: &SessionImpl, + secret_name: &ObjectName, + if_exists: bool, +) -> Result, DatabaseId, SchemaId)>> { let db_name = session.database(); - let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, secret_name)?; + let (schema_name, secret_name) = + Binder::resolve_schema_qualified_name(db_name, secret_name.clone())?; let search_path = session.config().search_path(); let user_name = &session.auth_context().user_name; - let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); - let secret_id = { - let reader = session.env().catalog_reader().read_guard(); - let (secret, schema_name) = - match reader.get_secret_by_name(db_name, schema_path, secret_name.as_str()) { - Ok((c, s)) => (c, s), - Err(e) => { - return if if_exists { - Ok(RwPgResponse::builder(StatementType::DROP_SECRET) - .notice(format!( - "secret \"{}\" does not exist, skipping", - secret_name - )) - .into()) - } else { - Err(e.into()) - }; - } - }; - session.check_privilege_for_drop_alter(schema_name, &**secret)?; + let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); - secret.id - }; + let reader = session.env().catalog_reader().read_guard(); + match reader.get_secret_by_name(db_name, schema_path, &secret_name) { + Ok((catalog, schema_name)) => { + session.check_privilege_for_drop_alter(schema_name, &**catalog)?; - let catalog_writer = session.catalog_writer()?; - catalog_writer.drop_secret(secret_id).await?; + let db = reader.get_database_by_name(db_name)?; + let schema = db.get_schema_by_name(schema_name).unwrap(); - Ok(RwPgResponse::empty_result(StatementType::DROP_SECRET)) + Ok(Some((Arc::clone(catalog), db.id(), schema.id()))) + } + Err(e) => { + if if_exists { + Ok(None) + } else { + Err(e.into()) + } + } + } } diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 9cf94a37c65b0..cfa6dc10277e8 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -42,6 +42,7 @@ use crate::utils::WithOptions; mod alter_owner; mod alter_parallelism; mod alter_rename; +mod alter_secret; mod alter_set_schema; mod alter_source_column; mod alter_source_with_sr; @@ -1076,6 +1077,11 @@ pub async fn handle( Statement::AlterSystem { param, value } => { alter_system::handle_alter_system(handler_args, param, value).await } + Statement::AlterSecret { + name, + with_options, + operation, + } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await, Statement::StartTransaction { modes } => { transaction::handle_begin(handler_args, START_TRANSACTION, modes).await } diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 92408c2b03885..270c467b04754 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -481,6 +481,9 @@ impl FrontendObserverNode { Operation::Delete => { LocalSecretManager::global().remove_secret(secret.id); } + Operation::Update => { + LocalSecretManager::global().update_secret(secret.id, secret.value); + } _ => { panic!("error type notification"); } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index d94b1dd2652d6..bb12ba14d9b90 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -649,6 +649,18 @@ impl CatalogWriter for MockCatalogWriter { async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> { todo!() } + + async fn alter_secret( + &self, + _secret_id: u32, + _secret_name: String, + _database_id: u32, + _schema_id: u32, + _owner_id: u32, + _payload: Vec, + ) -> Result<()> { + unreachable!() + } } impl MockCatalogWriter { diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index e59c4a4100141..5cc34de4ce6ae 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -180,6 +180,27 @@ impl DdlService for DdlServiceImpl { Ok(Response::new(DropSecretResponse { version })) } + async fn alter_secret( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let pb_secret = Secret { + id: req.get_secret_id(), + name: req.get_name().clone(), + database_id: req.get_database_id(), + value: req.get_value().clone(), + owner: req.get_owner_id(), + schema_id: req.get_schema_id(), + }; + let version = self + .ddl_controller + .run_command(DdlCommand::AlterSecret(pb_secret)) + .await?; + + Ok(Response::new(AlterSecretResponse { version })) + } + async fn create_schema( &self, request: Request, diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 6af4353090645..d1bf0027e6bff 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -1397,6 +1397,42 @@ impl CatalogController { Ok(version) } + pub async fn alter_secret( + &self, + pb_secret: PbSecret, + secret_plain_payload: Vec, + ) -> MetaResult { + let inner = self.inner.write().await; + let owner_id = pb_secret.owner as _; + let txn = inner.db.begin().await?; + ensure_user_id(owner_id, &txn).await?; + ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?; + ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?; + + let secret: secret::ActiveModel = pb_secret.clone().into(); + Secret::update(secret).exec(&txn).await?; + + txn.commit().await?; + + // Notify the compute and frontend node plain secret + let mut secret_plain = pb_secret; + secret_plain.value.clone_from(&secret_plain_payload); + + LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload); + self.env + .notification_manager() + .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone())); + + let version = self + .notify_frontend( + NotificationOperation::Update, + NotificationInfo::Secret(secret_plain), + ) + .await; + + Ok(version) + } + pub async fn get_secret_by_id(&self, secret_id: SecretId) -> MetaResult { let inner = self.inner.read().await; let (secret, obj) = Secret::find_by_id(secret_id) @@ -1417,7 +1453,7 @@ impl CatalogController { .ok_or_else(|| MetaError::catalog_id_not_found("secret", secret_id))?; ensure_object_not_refer(ObjectType::Secret, secret_id, &txn).await?; - // Find affect users with privileges on the connection. + // Find affect users with privileges on the secret. let to_update_user_ids: Vec = UserPrivilege::find() .select_only() .distinct() diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 8ef2ec8dc4c96..43d12d0c478db 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -148,6 +148,7 @@ pub enum DdlCommand { CreateConnection(Connection), DropConnection(ConnectionId), CreateSecret(Secret), + AlterSecret(Secret), DropSecret(SecretId), CommentOn(Comment), CreateSubscription(Subscription), @@ -176,6 +177,7 @@ impl DdlCommand { | DdlCommand::CreateConnection(_) | DdlCommand::CommentOn(_) | DdlCommand::CreateSecret(_) + | DdlCommand::AlterSecret(_) | DdlCommand::AlterSwapRename(_) => true, DdlCommand::CreateStreamingJob(_, _, _, _) | DdlCommand::CreateSourceWithoutStreamingJob(_) @@ -345,6 +347,7 @@ impl DdlController { } DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await, DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await, + DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await, DdlCommand::AlterSourceColumn(source) => ctrl.alter_source(source).await, DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await, DdlCommand::CreateSubscription(subscription) => { @@ -515,10 +518,9 @@ impl DdlController { .await } - async fn create_secret(&self, mut secret: Secret) -> MetaResult { - // The 'secret' part of the request we receive from the frontend is in plaintext; - // here, we need to encrypt it before storing it in the catalog. - let secret_plain_payload = secret.value.clone(); + // The 'secret' part of the request we receive from the frontend is in plaintext; + // here, we need to encrypt it before storing it in the catalog. + fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult> { let secret_store_private_key = self .env .opts @@ -526,16 +528,21 @@ impl DdlController { .clone() .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?; - let encrypted_payload = { - let encrypted_secret = SecretEncryption::encrypt( - secret_store_private_key.as_slice(), - secret.get_value().as_slice(), - ) - .context(format!("failed to encrypt secret {}", secret.name))?; - encrypted_secret - .serialize() - .context(format!("failed to serialize secret {}", secret.name))? - }; + let encrypted_payload = SecretEncryption::encrypt( + secret_store_private_key.as_slice(), + secret.get_value().as_slice(), + ) + .context(format!("failed to encrypt secret {}", secret.name))?; + Ok(encrypted_payload + .serialize() + .context(format!("failed to serialize secret {}", secret.name))?) + } + + async fn create_secret(&self, mut secret: Secret) -> MetaResult { + // The 'secret' part of the request we receive from the frontend is in plaintext; + // here, we need to encrypt it before storing it in the catalog. + let secret_plain_payload = secret.value.clone(); + let encrypted_payload = self.get_encrypted_payload(&secret)?; secret.value = encrypted_payload; self.metadata_manager @@ -551,6 +558,16 @@ impl DdlController { .await } + async fn alter_secret(&self, mut secret: Secret) -> MetaResult { + let secret_plain_payload = secret.value.clone(); + let encrypted_payload = self.get_encrypted_payload(&secret)?; + secret.value = encrypted_payload; + self.metadata_manager + .catalog_controller + .alter_secret(secret, secret_plain_payload) + .await + } + async fn create_subscription( &self, mut subscription: Subscription, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index be733e8d4ec1d..bae4527710750 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -585,6 +585,29 @@ impl MetaClient { .ok_or_else(|| anyhow!("wait version not set"))?) } + pub async fn alter_secret( + &self, + secret_id: u32, + secret_name: String, + database_id: u32, + schema_id: u32, + owner_id: u32, + value: Vec, + ) -> Result { + let request = AlterSecretRequest { + secret_id, + name: secret_name, + database_id, + schema_id, + owner_id, + value, + }; + let resp = self.inner.alter_secret(request).await?; + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) + } + pub async fn replace_table( &self, source: Option, @@ -2082,6 +2105,7 @@ macro_rules! for_all_meta_rpc { ,{ ddl_client, wait, WaitRequest, WaitResponse } ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse } ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse } + ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse } ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse } ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse } ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse } diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index d94cf80cb9f2d..6b708290d0905 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -20,7 +20,7 @@ use core::fmt; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use super::FormatEncodeOptions; +use super::{FormatEncodeOptions, Value}; use crate::ast::{ display_comma_separated, display_separated, DataType, Expr, Ident, ObjectName, SetVariableValue, }; @@ -214,6 +214,11 @@ pub enum AlterConnectionOperation { SetSchema { new_schema_name: ObjectName }, } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum AlterSecretOperation { + ChangeCredential { new_credential: Value }, +} + impl fmt::Display for AlterDatabaseOperation { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -481,6 +486,16 @@ impl fmt::Display for AlterConnectionOperation { } } +impl fmt::Display for AlterSecretOperation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlterSecretOperation::ChangeCredential { new_credential } => { + write!(f, "AS {new_credential}") + } + } + } +} + /// An `ALTER COLUMN` (`Statement::AlterTable`) operation #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 563dc66be4780..1dd6c83258ca4 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -38,8 +38,8 @@ use winnow::PResult; pub use self::data_type::{DataType, StructField}; pub use self::ddl::{ AlterColumnOperation, AlterConnectionOperation, AlterDatabaseOperation, AlterFunctionOperation, - AlterSchemaOperation, AlterTableOperation, ColumnDef, ColumnOption, ColumnOptionDef, - ReferentialAction, SourceWatermark, TableConstraint, + AlterSchemaOperation, AlterSecretOperation, AlterTableOperation, ColumnDef, ColumnOption, + ColumnOptionDef, ReferentialAction, SourceWatermark, TableConstraint, }; pub use self::legacy_source::{ get_delimiter, AvroSchema, CompatibleFormatEncode, DebeziumAvroSchema, ProtobufSchema, @@ -1432,6 +1432,13 @@ pub enum Statement { name: ObjectName, operation: AlterConnectionOperation, }, + /// ALTER SECRET + AlterSecret { + /// Secret name + name: ObjectName, + with_options: Vec, + operation: AlterSecretOperation, + }, /// DESCRIBE TABLE OR SOURCE Describe { /// Table or Source name @@ -1961,6 +1968,13 @@ impl fmt::Display for Statement { Statement::AlterConnection { name, operation } => { write!(f, "ALTER CONNECTION {} {}", name, operation) } + Statement::AlterSecret { name, with_options, operation } => { + write!(f, "ALTER SECRET {}", name)?; + if !with_options.is_empty() { + write!(f, " WITH ({})", display_comma_separated(with_options))?; + } + write!(f, " {}", operation) + } Statement::Discard(t) => write!(f, "DISCARD {}", t), Statement::Drop(stmt) => write!(f, "DROP {}", stmt), Statement::DropFunction { diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 7d01a2b35cf67..22b1113dfd9ef 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3054,9 +3054,11 @@ impl Parser<'_> { self.parse_alter_system() } else if self.parse_keyword(Keyword::SUBSCRIPTION) { self.parse_alter_subscription() + } else if self.parse_keyword(Keyword::SECRET) { + self.parse_alter_secret() } else { self.expected( - "DATABASE, SCHEMA, TABLE, INDEX, MATERIALIZED, VIEW, SINK, SUBSCRIPTION, SOURCE, FUNCTION, USER or SYSTEM after ALTER" + "DATABASE, SCHEMA, TABLE, INDEX, MATERIALIZED, VIEW, SINK, SUBSCRIPTION, SOURCE, FUNCTION, USER, SECRET or SYSTEM after ALTER" ) } } @@ -3569,6 +3571,19 @@ impl Parser<'_> { Ok(Statement::AlterSystem { param, value }) } + pub fn parse_alter_secret(&mut self) -> PResult { + let secret_name = self.parse_object_name()?; + let with_options = self.parse_with_properties()?; + self.expect_keyword(Keyword::AS)?; + let new_credential = self.parse_value()?; + let operation = AlterSecretOperation::ChangeCredential { new_credential }; + Ok(Statement::AlterSecret { + name: secret_name, + with_options, + operation, + }) + } + /// Parse a copy statement pub fn parse_copy(&mut self) -> PResult { let table_name = self.parse_object_name()?; diff --git a/src/utils/pgwire/src/pg_response.rs b/src/utils/pgwire/src/pg_response.rs index 5718533f67097..33e1ec9c5c22b 100644 --- a/src/utils/pgwire/src/pg_response.rs +++ b/src/utils/pgwire/src/pg_response.rs @@ -89,6 +89,7 @@ pub enum StatementType { ALTER_FUNCTION, ALTER_CONNECTION, ALTER_SYSTEM, + ALTER_SECRET, REVOKE_PRIVILEGE, // Introduce ORDER_BY statement type cuz Calcite unvalidated AST has SqlKind.ORDER_BY. Note // that Statement Type is not designed to be one to one mapping with SqlKind. From b784e27917cc9366af1b9a430915373009e02427 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Wed, 20 Nov 2024 14:11:38 -0600 Subject: [PATCH 02/11] refine --- e2e_test/source_legacy/cdc/cdc.share_stream.slt | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/e2e_test/source_legacy/cdc/cdc.share_stream.slt b/e2e_test/source_legacy/cdc/cdc.share_stream.slt index 82e1a35cd74e0..5d0d6b8ac0755 100644 --- a/e2e_test/source_legacy/cdc/cdc.share_stream.slt +++ b/e2e_test/source_legacy/cdc/cdc.share_stream.slt @@ -36,13 +36,15 @@ create source mysql_mytest with ( server.id = '5601' ); --- -db error: ERROR: Failed to run the query +# The detailed error message is commented out because the user IP in error message may vary in different environments. +# db error: ERROR: Failed to run the query -Caused by these errors (recent errors listed first): - 1: gRPC request to meta service failed: Internal error - 2: failed to create source worker - 3: failed to create SplitEnumerator - 4: source cannot pass validation +# Caused by these errors (recent errors listed first): +# 1: gRPC request to meta service failed: Internal error +# 2: failed to create source worker +# 3: failed to create SplitEnumerator +# 4: source cannot pass validation +# 5: Internal error: Access denied for user 'rwcdc'@'172.17.0.1' (using password: YES) statement ok alter secret mysql_pwd with ( From fd5188006fddc9163ef1a5b13980c22cf2b07869 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Wed, 20 Nov 2024 14:24:21 -0600 Subject: [PATCH 03/11] fix --- e2e_test/ddl/secret.slt | 2 +- e2e_test/source_legacy/cdc/cdc.share_stream.slt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e_test/ddl/secret.slt b/e2e_test/ddl/secret.slt index 6cedc89a8e2a9..44f2b71e89b70 100644 --- a/e2e_test/ddl/secret.slt +++ b/e2e_test/ddl/secret.slt @@ -51,7 +51,7 @@ alter secret secret_2 with ( ) as 'demo_secret_altered'; --- db error: ERROR: Failed to run the query - + Caused by these errors (recent errors listed first): 1: Catalog error 2: secret not found: secret_2 diff --git a/e2e_test/source_legacy/cdc/cdc.share_stream.slt b/e2e_test/source_legacy/cdc/cdc.share_stream.slt index 5d0d6b8ac0755..af108d62d067b 100644 --- a/e2e_test/source_legacy/cdc/cdc.share_stream.slt +++ b/e2e_test/source_legacy/cdc/cdc.share_stream.slt @@ -38,7 +38,7 @@ create source mysql_mytest with ( --- # The detailed error message is commented out because the user IP in error message may vary in different environments. # db error: ERROR: Failed to run the query - + # Caused by these errors (recent errors listed first): # 1: gRPC request to meta service failed: Internal error # 2: failed to create source worker From 61b57f7a0ace57b3d6f48373110872251c86ffb2 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Wed, 20 Nov 2024 14:32:59 -0600 Subject: [PATCH 04/11] fix --- src/sqlparser/src/ast/ddl.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 6b708290d0905..35eac31cefa72 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -215,6 +215,7 @@ pub enum AlterConnectionOperation { } #[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum AlterSecretOperation { ChangeCredential { new_credential: Value }, } From dc6d3d83c463395ef7e0fea0ab41b910cb8e8ebd Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Wed, 20 Nov 2024 20:05:34 -0600 Subject: [PATCH 05/11] fix --- e2e_test/ddl/secret.slt | 2 +- e2e_test/source_legacy/cdc/cdc.share_stream.slt | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/e2e_test/ddl/secret.slt b/e2e_test/ddl/secret.slt index 44f2b71e89b70..889f6fad14e4e 100644 --- a/e2e_test/ddl/secret.slt +++ b/e2e_test/ddl/secret.slt @@ -49,7 +49,7 @@ statement error alter secret secret_2 with ( backend = 'meta' ) as 'demo_secret_altered'; ---- +---- db error: ERROR: Failed to run the query Caused by these errors (recent errors listed first): diff --git a/e2e_test/source_legacy/cdc/cdc.share_stream.slt b/e2e_test/source_legacy/cdc/cdc.share_stream.slt index af108d62d067b..39ba81896c8aa 100644 --- a/e2e_test/source_legacy/cdc/cdc.share_stream.slt +++ b/e2e_test/source_legacy/cdc/cdc.share_stream.slt @@ -35,10 +35,9 @@ create source mysql_mytest with ( database.name = 'mytest', server.id = '5601' ); ---- # The detailed error message is commented out because the user IP in error message may vary in different environments. +# ---- # db error: ERROR: Failed to run the query - # Caused by these errors (recent errors listed first): # 1: gRPC request to meta service failed: Internal error # 2: failed to create source worker From 90377f1f4effff2abd71b81291b01ecc4dc71933 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Mon, 25 Nov 2024 14:53:47 -0600 Subject: [PATCH 06/11] add recover in test --- .../source_legacy/cdc/cdc.share_stream.slt | 32 +++++++++++++++++++ src/common/secret/src/secret_manager.rs | 13 ++++++-- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/e2e_test/source_legacy/cdc/cdc.share_stream.slt b/e2e_test/source_legacy/cdc/cdc.share_stream.slt index 39ba81896c8aa..64eb7069a4369 100644 --- a/e2e_test/source_legacy/cdc/cdc.share_stream.slt +++ b/e2e_test/source_legacy/cdc/cdc.share_stream.slt @@ -87,6 +87,22 @@ from mysql_mytest table 'mytest.products'; # sleep to ensure (default,'Milk','Milk is a white liquid food') is consumed from Debezium message instead of backfill. sleep 10s +### BEGIN test change secret in MySQL and ALTER SECRET in Risingwave +onlyif can-use-recover +statement ok +alter secret mysql_pwd with ( + backend = 'meta' +) as 'new_password'; + +onlyif can-use-recover +system ok +mysql --protocol=tcp -u root -e "ALTER USER 'rwcdc'@'%' IDENTIFIED BY 'new_password';" + +onlyif can-use-recover +statement ok +recover; +### END + statement error Permission denied drop secret mysql_pwd; @@ -600,3 +616,19 @@ query II select * from upper_orders_shared order by id; ---- 1 happy + +### BEGIN reset the password to the original one +onlyif can-use-recover +statement ok +alter secret mysql_pwd with ( + backend = 'meta' +) as '${MYSQL_PWD:}'; + +onlyif can-use-recover +system ok +mysql --protocol=tcp -u root -e "ALTER USER 'rwcdc'@'%' IDENTIFIED BY '${MYSQL_PWD:}';" + +onlyif can-use-recover +statement ok +recover; +### END \ No newline at end of file diff --git a/src/common/secret/src/secret_manager.rs b/src/common/secret/src/secret_manager.rs index b431565e5b438..5f12433dbd93f 100644 --- a/src/common/secret/src/secret_manager.rs +++ b/src/common/secret/src/secret_manager.rs @@ -190,9 +190,7 @@ impl LocalSecretManager { } fn get_secret_value(pb_secret_bytes: &[u8]) -> SecretResult> { - let pb_secret = risingwave_pb::secret::Secret::decode(pb_secret_bytes) - .context("failed to decode secret")?; - let secret_value = match pb_secret.get_secret_backend().unwrap() { + let secret_value = match Self::get_pb_secret_backend(pb_secret_bytes)? { risingwave_pb::secret::secret::SecretBackend::Meta(backend) => backend.value.clone(), risingwave_pb::secret::secret::SecretBackend::HashicorpVault(_) => { return Err(anyhow!("hashicorp_vault backend is not implemented yet").into()) @@ -200,4 +198,13 @@ impl LocalSecretManager { }; Ok(secret_value) } + + /// Get the secret backend from the given decrypted secret bytes. + pub fn get_pb_secret_backend( + pb_secret_bytes: &[u8] + ) -> SecretResult { + let pb_secret = risingwave_pb::secret::Secret::decode(pb_secret_bytes) + .context("failed to decode secret")?; + Ok(pb_secret.get_secret_backend().unwrap().clone()) + } } From c7b2c11aa0111182034534227efd4b6af0f79a25 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Mon, 25 Nov 2024 15:54:36 -0600 Subject: [PATCH 07/11] allow no with --- e2e_test/ddl/secret.slt | 3 ++ .../source_legacy/cdc/cdc.share_stream.slt | 8 +--- src/common/secret/src/secret_manager.rs | 2 +- src/frontend/src/handler/alter_secret.rs | 43 ++++++++++++++++--- src/frontend/src/handler/create_secret.rs | 2 +- 5 files changed, 45 insertions(+), 13 deletions(-) diff --git a/e2e_test/ddl/secret.slt b/e2e_test/ddl/secret.slt index 889f6fad14e4e..cf9d16214da38 100644 --- a/e2e_test/ddl/secret.slt +++ b/e2e_test/ddl/secret.slt @@ -45,6 +45,9 @@ alter secret secret_1 with ( backend = 'meta' ) as 'demo_secret_altered'; +statement ok +alter secret secret_1 as 'demo_secret_altered_again'; + statement error alter secret secret_2 with ( backend = 'meta' diff --git a/e2e_test/source_legacy/cdc/cdc.share_stream.slt b/e2e_test/source_legacy/cdc/cdc.share_stream.slt index 64eb7069a4369..37840417fa7cc 100644 --- a/e2e_test/source_legacy/cdc/cdc.share_stream.slt +++ b/e2e_test/source_legacy/cdc/cdc.share_stream.slt @@ -46,9 +46,7 @@ create source mysql_mytest with ( # 5: Internal error: Access denied for user 'rwcdc'@'172.17.0.1' (using password: YES) statement ok -alter secret mysql_pwd with ( - backend = 'meta' -) as '${MYSQL_PWD:}'; +alter secret mysql_pwd as '${MYSQL_PWD:}'; # create a cdc source job, which format fixed to `FORMAT PLAIN ENCODE JSON` statement ok @@ -620,9 +618,7 @@ select * from upper_orders_shared order by id; ### BEGIN reset the password to the original one onlyif can-use-recover statement ok -alter secret mysql_pwd with ( - backend = 'meta' -) as '${MYSQL_PWD:}'; +alter secret mysql_pwd as '${MYSQL_PWD:}'; onlyif can-use-recover system ok diff --git a/src/common/secret/src/secret_manager.rs b/src/common/secret/src/secret_manager.rs index 5f12433dbd93f..723c0f5791d4d 100644 --- a/src/common/secret/src/secret_manager.rs +++ b/src/common/secret/src/secret_manager.rs @@ -201,7 +201,7 @@ impl LocalSecretManager { /// Get the secret backend from the given decrypted secret bytes. pub fn get_pb_secret_backend( - pb_secret_bytes: &[u8] + pb_secret_bytes: &[u8], ) -> SecretResult { let pb_secret = risingwave_pb::secret::Secret::decode(pb_secret_bytes) .context("failed to decode secret")?; diff --git a/src/frontend/src/handler/alter_secret.rs b/src/frontend/src/handler/alter_secret.rs index 5ad25e84b0e69..3ab5ece2ee518 100644 --- a/src/frontend/src/handler/alter_secret.rs +++ b/src/frontend/src/handler/alter_secret.rs @@ -12,11 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use anyhow::anyhow; use pgwire::pg_response::StatementType; +use prost::Message; +use risingwave_common::bail_not_implemented; use risingwave_common::license::Feature; +use risingwave_common::secret::LocalSecretManager; +use risingwave_pb::secret::secret; use risingwave_sqlparser::ast::{AlterSecretOperation, ObjectName, SqlOption}; -use super::create_secret::get_secret_payload; +use super::create_secret::{get_secret_payload, secret_to_str}; use super::drop_secret::fetch_secret_catalog_with_db_schema_id; use crate::error::Result; use crate::handler::{HandlerArgs, RwPgResponse}; @@ -39,15 +44,43 @@ pub async fn handle_alter_secret( { let AlterSecretOperation::ChangeCredential { new_credential } = operation; - let with_options = WithOptions::try_from(sql_options.as_ref() as &[SqlOption])?; - - let secret_payload = get_secret_payload(new_credential, with_options)?; + let secret_id = secret_catalog.id.secret_id(); + let secret_payload = if sql_options.is_empty() { + let original_pb_secret_bytes = LocalSecretManager::global() + .get_secret(secret_id) + .ok_or(anyhow!( + "Failed to get secret in secret manager, secret_id: {}", + secret_id + ))?; + let original_secret_backend = + LocalSecretManager::get_pb_secret_backend(&original_pb_secret_bytes)?; + match original_secret_backend { + secret::SecretBackend::Meta(_) => { + let new_secret_value_bytes = + secret_to_str(&new_credential)?.as_bytes().to_vec(); + let secret_payload = risingwave_pb::secret::Secret { + secret_backend: Some(risingwave_pb::secret::secret::SecretBackend::Meta( + risingwave_pb::secret::SecretMetaBackend { + value: new_secret_value_bytes, + }, + )), + }; + secret_payload.encode_to_vec() + } + secret::SecretBackend::HashicorpVault(_) => { + bail_not_implemented!("hashicorp_vault backend is not implemented yet") + } + } + } else { + let with_options = WithOptions::try_from(sql_options.as_ref() as &[SqlOption])?; + get_secret_payload(new_credential, with_options)? + }; let catalog_writer = session.catalog_writer()?; catalog_writer .alter_secret( - secret_catalog.id.secret_id(), + secret_id, secret_catalog.name.clone(), secret_catalog.database_id, secret_catalog.schema_id, diff --git a/src/frontend/src/handler/create_secret.rs b/src/frontend/src/handler/create_secret.rs index 4a347a0fb86f6..6d5a6283e3288 100644 --- a/src/frontend/src/handler/create_secret.rs +++ b/src/frontend/src/handler/create_secret.rs @@ -69,7 +69,7 @@ pub async fn handle_create_secret( Ok(PgResponse::empty_result(StatementType::CREATE_SECRET)) } -fn secret_to_str(value: &Value) -> Result { +pub fn secret_to_str(value: &Value) -> Result { match value { Value::DoubleQuotedString(s) | Value::SingleQuotedString(s) => Ok(s.to_string()), _ => Err(ErrorCode::InvalidInputSyntax( From b7624e7c315abe776397c611e241526a383a4cf6 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Mon, 25 Nov 2024 19:21:04 -0600 Subject: [PATCH 08/11] inc mysql backend e2e timeout --- ci/workflows/pull-request.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index e10ffb2d0091f..355729bbf8ac0 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -819,7 +819,7 @@ steps: config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 30 + timeout_in_minutes: 32 retry: *auto-retry # FIXME(kwannoel): Let the github PR labeller label it, if sqlsmith source files has changes. From bde33356ce33e8e667ea36a95d45b2f59c672488 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Tue, 26 Nov 2024 08:59:34 -0600 Subject: [PATCH 09/11] check secret exist on meta --- src/meta/src/controller/catalog.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index d1bf0027e6bff..45052fc4d1855 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -1409,6 +1409,11 @@ impl CatalogController { ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?; ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?; + // Check if the secret exists. + if Secret::find_by_id(pb_secret.get_id() as i32).one(&txn).await?.is_none() { + return Err(MetaError::catalog_id_not_found("secret", pb_secret.get_id())); + } + let secret: secret::ActiveModel = pb_secret.clone().into(); Secret::update(secret).exec(&txn).await?; From a8e2a5a215305a9f3ee44ce6a6d286480a893dcd Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Tue, 26 Nov 2024 09:16:55 -0600 Subject: [PATCH 10/11] fmt --- src/meta/src/controller/catalog.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 1d8d84b491f29..52bdb59a2f350 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -1410,8 +1410,15 @@ impl CatalogController { ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?; // Check if the secret exists. - if Secret::find_by_id(pb_secret.get_id() as i32).one(&txn).await?.is_none() { - return Err(MetaError::catalog_id_not_found("secret", pb_secret.get_id())); + if Secret::find_by_id(pb_secret.get_id() as i32) + .one(&txn) + .await? + .is_none() + { + return Err(MetaError::catalog_id_not_found( + "secret", + pb_secret.get_id(), + )); } let secret: secret::ActiveModel = pb_secret.clone().into(); From d28fc1ed310a68672f51c1853cd9deb771e4dfc9 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Tue, 26 Nov 2024 09:45:24 -0600 Subject: [PATCH 11/11] refine --- src/meta/src/controller/catalog.rs | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 52bdb59a2f350..5950f0357eee5 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -1409,18 +1409,7 @@ impl CatalogController { ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?; ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?; - // Check if the secret exists. - if Secret::find_by_id(pb_secret.get_id() as i32) - .one(&txn) - .await? - .is_none() - { - return Err(MetaError::catalog_id_not_found( - "secret", - pb_secret.get_id(), - )); - } - + ensure_object_id(ObjectType::Secret, pb_secret.id as _, &txn).await?; let secret: secret::ActiveModel = pb_secret.clone().into(); Secret::update(secret).exec(&txn).await?;