Skip to content

Commit

Permalink
use oncelock
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su committed Jul 3, 2024
1 parent 6be6cea commit 1d4549a
Show file tree
Hide file tree
Showing 20 changed files with 86 additions and 49 deletions.
39 changes: 24 additions & 15 deletions src/common/secret/src/secret_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::{BTreeMap, HashMap};
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::{anyhow, Context};
use parking_lot::RwLock;
Expand All @@ -29,28 +28,38 @@ use thiserror_ext::AsReport;
use super::error::{SecretError, SecretResult};
use super::SecretId;

pub static SECRET_MANAGER: std::sync::LazyLock<LocalSecretManager> =
std::sync::LazyLock::new(LocalSecretManager::new);
static INSTANCE: std::sync::OnceLock<LocalSecretManager> = std::sync::OnceLock::new();

#[derive(Debug)]
pub struct LocalSecretManager {
secrets: RwLock<HashMap<SecretId, Vec<u8>>>,
secret_file_dir: PathBuf,
}

pub type LocalSecretManagerRef = Arc<LocalSecretManager>;

impl LocalSecretManager {
fn new() -> Self {
let path_string =
std::env::var("RW_SECRET_TEMP_FILE_PATH").unwrap_or_else(|_e| "./secrets".to_string());
let secret_file_dir = PathBuf::from(path_string);
std::fs::remove_dir_all(&secret_file_dir).ok();
std::fs::create_dir_all(&secret_file_dir).unwrap();
Self {
secrets: RwLock::new(HashMap::new()),
secret_file_dir,
}
/// Initialize the secret manager with the given temp file path, cluster id, and encryption key.
/// # Panics
/// Panics if fail to create the secret file directory.
pub fn init(temp_file_dir: Option<String>) {
// use `get_or_init` to handle concurrent initialization in single node mode.
INSTANCE.get_or_init(|| {
let temp_file_dir = temp_file_dir.unwrap_or_else(|| "./secrets".to_string());
let secret_file_dir = PathBuf::from(temp_file_dir);
std::fs::remove_dir_all(&secret_file_dir).ok();
std::fs::create_dir_all(&secret_file_dir).unwrap();

Self {
secrets: RwLock::new(HashMap::new()),
secret_file_dir,
}
});
}

/// Get the global secret manager instance.
/// # Panics
/// Panics if the secret manager is not initialized.
pub fn global() -> &'static LocalSecretManager {
INSTANCE.get().unwrap()
}

pub fn add_secret(&self, secret_id: SecretId, secret: Vec<u8>) {
Expand Down
4 changes: 4 additions & 0 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ pub struct ComputeNodeOpts {
#[deprecated = "connector node has been deprecated."]
#[clap(long, hide = true, env = "RW_CONNECTOR_RPC_ENDPOINT")]
pub connector_rpc_endpoint: Option<String>,

/// The path of the temp secret file directory.
#[clap(long, hide = true, env = "RW_TEMP_SECRET_FILE_DIR")]
pub temp_secret_file_dir: Option<String>,
}

impl risingwave_common::opts::Opts for ComputeNodeOpts {
Expand Down
8 changes: 4 additions & 4 deletions src/compute/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::secret::SECRET_MANAGER;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
use risingwave_common_service::observer_manager::{ObserverState, SubscribeCompute};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
Expand All @@ -31,10 +31,10 @@ impl ObserverState for ComputeObserverNode {
Info::SystemParams(p) => self.system_params_manager.try_set_params(p),
Info::Secret(s) => match resp.operation() {
Operation::Add => {
SECRET_MANAGER.add_secret(s.id, s.value);
LocalSecretManager::global().add_secret(s.id, s.value);
}
Operation::Delete => {
SECRET_MANAGER.remove_secret(s.id);
LocalSecretManager::global().remove_secret(s.id);
}
_ => {
panic!("error type notification");
Expand All @@ -51,7 +51,7 @@ impl ObserverState for ComputeObserverNode {
let Some(Info::Snapshot(snapshot)) = resp.info else {
unreachable!();
};
SECRET_MANAGER.init_secrets(snapshot.secrets);
LocalSecretManager::global().init_secrets(snapshot.secrets);
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use risingwave_common::config::{
};
use risingwave_common::lru::init_global_sequencer_args;
use risingwave_common::monitor::connection::{RouterExt, TcpConfig};
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::telemetry::manager::TelemetryManager;
Expand Down Expand Up @@ -215,6 +216,8 @@ pub async fn compute_node_serve(
.await
.unwrap();

LocalSecretManager::init(opts.temp_secret_file_dir);

// Initialize observer manager.
let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params.clone()));
let compute_observer_node = ComputeObserverNode::new(system_params_manager.clone());
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use risingwave_common::bail;
use risingwave_common::catalog::{KAFKA_TIMESTAMP_COLUMN_NAME, TABLE_NAME_COLUMN_NAME};
use risingwave_common::log::LogSuppresser;
use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
use risingwave_common::secret::SECRET_MANAGER;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::types::{Datum, DatumCow, DatumRef, ScalarRefImpl};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::tracing::InstrumentStream;
Expand Down Expand Up @@ -1146,13 +1146,13 @@ impl SpecificParserConfig {
with_properties: &WithOptionsSecResolved,
) -> ConnectorResult<Self> {
let source_struct = extract_source_struct(info)?;
let format_encode_options_with_secret = SECRET_MANAGER.fill_secrets(
let format_encode_options_with_secret = LocalSecretManager::global().fill_secrets(
info.format_encode_options.clone(),
info.format_encode_secret_refs.clone(),
)?;
let (options, secret_refs) = with_properties.clone().into_parts();
let options_with_secret =
SECRET_MANAGER.fill_secrets(options.clone(), secret_refs.clone())?;
LocalSecretManager::global().fill_secrets(options.clone(), secret_refs.clone())?;
let format = source_struct.format;
let encode = source_struct.encode;
// this transformation is needed since there may be config for the protocol
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::metrics::{
LabelGuardedHistogram, LabelGuardedIntCounter, LabelGuardedIntGauge,
};
use risingwave_common::secret::{SecretError, SECRET_MANAGER};
use risingwave_common::secret::{LocalSecretManager, SecretError};
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_pb::catalog::PbSinkType;
use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
Expand Down Expand Up @@ -240,7 +240,7 @@ impl SinkParam {
) -> Result<Option<SinkFormatDesc>> {
match format_desc {
Some(mut format_desc) => {
format_desc.options = SECRET_MANAGER
format_desc.options = LocalSecretManager::global()
.fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
Ok(Some(format_desc))
}
Expand All @@ -254,8 +254,8 @@ impl SinkParam {
.visible_columns()
.map(|col| col.column_desc.clone())
.collect();
let properties_with_secret =
SECRET_MANAGER.fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
let properties_with_secret = LocalSecretManager::global()
.fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
Ok(Self {
sink_id: sink_catalog.id,
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use itertools::Itertools;
use risingwave_common::array::StreamChunk;
use risingwave_common::bail;
use risingwave_common::catalog::TableId;
use risingwave_common::secret::SECRET_MANAGER;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::types::{JsonbVal, Scalar};
use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
use risingwave_pb::plan_common::ExternalTableDesc;
Expand Down Expand Up @@ -388,7 +388,8 @@ impl ConnectorProperties {
deny_unknown_fields: bool,
) -> Result<Self> {
let (options, secret_refs) = with_properties.into_parts();
let mut options_with_secret = SECRET_MANAGER.fill_secrets(options, secret_refs)?;
let mut options_with_secret =
LocalSecretManager::global().fill_secrets(options, secret_refs)?;
let connector = options_with_secret
.remove(UPSTREAM_SOURCE_KEY)
.ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use futures_async_stream::try_stream;
use risingwave_common::bail;
use risingwave_common::catalog::{ColumnDesc, Schema};
use risingwave_common::row::OwnedRow;
use risingwave_common::secret::SECRET_MANAGER;
use risingwave_common::secret::LocalSecretManager;
use risingwave_pb::secret::PbSecretRef;
use serde_derive::{Deserialize, Serialize};

Expand Down Expand Up @@ -219,7 +219,8 @@ impl ExternalTableConfig {
connect_properties: BTreeMap<String, String>,
secret_refs: BTreeMap<String, PbSecretRef>,
) -> ConnectorResult<Self> {
let options_with_secret = SECRET_MANAGER.fill_secrets(connect_properties, secret_refs)?;
let options_with_secret =
LocalSecretManager::global().fill_secrets(connect_properties, secret_refs)?;
let json_value = serde_json::to_value(options_with_secret)?;
let config = serde_json::from_value::<ExternalTableConfig>(json_value)?;
Ok(config)
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::catalog::{ConnectionId, DatabaseId, Schema, SchemaId, TableId, UserId};
use risingwave_common::secret::SECRET_MANAGER;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::types::DataType;
use risingwave_common::{bail, catalog};
use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType};
Expand Down Expand Up @@ -325,7 +325,7 @@ pub async fn get_partition_compute_info(
let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
return Ok(None);
};
let properties = SECRET_MANAGER.fill_secrets(options, secret_refs)?;
let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
match connector.as_str() {
ICEBERG_SINK => {
let iceberg_config = IcebergConfig::from_btreemap(properties)?;
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_common::catalog::{
debug_assert_column_ids_distinct, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId,
INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
};
use risingwave_common::secret::SECRET_MANAGER;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::types::DataType;
use risingwave_connector::parser::additional_columns::{
build_additional_column_desc, get_supported_additional_columns,
Expand Down Expand Up @@ -316,7 +316,7 @@ pub(crate) async fn bind_columns_from_source(
)?
.into_parts();
// Need real secret to access the schema registry
let mut format_encode_options_to_consume = SECRET_MANAGER.fill_secrets(
let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
format_encode_options.clone(),
format_encode_secret_refs.clone(),
)?;
Expand Down Expand Up @@ -534,7 +534,7 @@ fn bind_columns_from_source_for_cdc(
.into_parts();

// Need real secret to access the schema registry
let mut format_encode_options_to_consume = SECRET_MANAGER.fill_secrets(
let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
format_encode_options.clone(),
format_encode_secret_refs.clone(),
)?;
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ pub struct FrontendOpts {
#[clap(long, hide = true, env = "RW_ENABLE_BARRIER_READ")]
#[override_opts(path = batch.enable_barrier_read)]
pub enable_barrier_read: Option<bool>,

/// The path of the temp secret file directory.
#[clap(long, hide = true, env = "RW_TEMP_SECRET_FILE_DIR")]
pub temp_secret_file_dir: Option<String>,
}

impl risingwave_common::opts::Opts for FrontendOpts {
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use parking_lot::RwLock;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
use risingwave_common::catalog::CatalogVersion;
use risingwave_common::hash::WorkerSlotMapping;
use risingwave_common::secret::SECRET_MANAGER;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
use risingwave_common_service::observer_manager::{ObserverState, SubscribeFrontend};
Expand Down Expand Up @@ -206,7 +206,7 @@ impl ObserverState for FrontendObserverNode {
.unwrap();
*self.session_params.write() =
serde_json::from_str(&session_params.unwrap().params).unwrap();
SECRET_MANAGER.init_secrets(secrets);
LocalSecretManager::global().init_secrets(secrets);
}
}

Expand Down Expand Up @@ -486,10 +486,10 @@ impl FrontendObserverNode {
};
match resp_op {
Operation::Add => {
SECRET_MANAGER.add_secret(secret.id, secret.value);
LocalSecretManager::global().add_secret(secret.id, secret.value);
}
Operation::Delete => {
SECRET_MANAGER.remove_secret(secret.id);
LocalSecretManager::global().remove_secret(secret.id);
}
_ => {
panic!("error type notification");
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use risingwave_common::config::{
load_config, BatchConfig, MetaConfig, MetricLevel, StreamingConfig,
};
use risingwave_common::memory::MemoryContext;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
use risingwave_common::system_param::local_manager::{
LocalSystemParamsManager, LocalSystemParamsManagerRef,
Expand Down Expand Up @@ -319,6 +320,8 @@ impl FrontendEnv {
let system_params_manager =
Arc::new(LocalSystemParamsManager::new(system_params_reader.clone()));

LocalSecretManager::init(opts.temp_secret_file_dir);

// This `session_params` should be initialized during the initial notification in `observer_manager`
let session_params = Arc::new(RwLock::new(SessionConfig::default()));
let frontend_observer_node = FrontendObserverNode::new(
Expand Down
5 changes: 5 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ pub struct MetaNodeOpts {
#[educe(Debug(ignore))]
#[clap(long, hide = true, env = "RW_SECRET_STORE_PRIVATE_KEY_HEX")]
pub secret_store_private_key_hex: Option<String>,

/// The path of the temp secret file directory.
#[clap(long, hide = true, env = "RW_TEMP_SECRET_FILE_DIR")]
pub temp_secret_file_dir: Option<String>,
}

impl risingwave_common::opts::Opts for MetaNodeOpts {
Expand Down Expand Up @@ -390,6 +394,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
.max_trivial_move_task_count_per_loop,
max_get_task_probe_times: config.meta.developer.max_get_task_probe_times,
secret_store_private_key,
temp_secret_file_dir: opts.temp_secret_file_dir,
table_info_statistic_history_times: config
.storage
.table_info_statistic_history_times,
Expand Down
3 changes: 3 additions & 0 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures::future::join_all;
use otlp_embedded::TraceServiceServer;
use regex::Regex;
use risingwave_common::monitor::connection::{RouterExt, TcpConfig};
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::telemetry::manager::TelemetryManager;
Expand Down Expand Up @@ -536,6 +537,8 @@ pub async fn start_service_as_election_leader(
)
.await?;

LocalSecretManager::init(opts.temp_secret_file_dir);

let notification_srv = NotificationServiceImpl::new(
env.clone(),
metadata_manager.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/meta/service/src/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use anyhow::{anyhow, Context};
use itertools::Itertools;
use risingwave_common::secret::{SecretEncryption, SECRET_MANAGER};
use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
use risingwave_meta::manager::{MetadataManager, SessionParamsManagerImpl};
use risingwave_meta::MetaResult;
use risingwave_pb::backup_service::MetaBackupManifestId;
Expand Down Expand Up @@ -63,7 +63,7 @@ impl NotificationServiceImpl {
serving_vnode_mapping,
};
let (secrets, _catalog_version) = service.get_decrypted_secret_snapshot().await?;
SECRET_MANAGER.init_secrets(secrets);
LocalSecretManager::global().init_secrets(secrets);
Ok(service)
}

Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;
use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS};
use risingwave_common::secret::SECRET_MANAGER;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
use risingwave_common::{bail, current_cluster_version};
use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
Expand Down Expand Up @@ -1122,7 +1122,7 @@ impl CatalogController {
let mut secret_plain = pb_secret;
secret_plain.value.clone_from(&secret_plain_payload);

SECRET_MANAGER.add_secret(secret_plain.id, secret_plain_payload);
LocalSecretManager::global().add_secret(secret_plain.id, secret_plain_payload);
self.env
.notification_manager()
.notify_compute_without_version(Operation::Add, Info::Secret(secret_plain.clone()));
Expand Down Expand Up @@ -1179,7 +1179,7 @@ impl CatalogController {

self.notify_users_update(user_infos).await;

SECRET_MANAGER.remove_secret(pb_secret.id);
LocalSecretManager::global().remove_secret(pb_secret.id);
self.env
.notification_manager()
.notify_compute_without_version(Operation::Delete, Info::Secret(pb_secret.clone()));
Expand Down
Loading

0 comments on commit 1d4549a

Please sign in to comment.