Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): support online cache resize via risectl #19677

Merged
merged 8 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ license = "Apache-2.0"
repository = "https://github.com/risingwavelabs/risingwave"

[workspace.dependencies]
foyer = { version = "0.13.0", features = ["tracing", "nightly", "prometheus"] }
foyer = { version = "0.13.1", features = ["tracing", "nightly", "prometheus"] }
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1736,7 +1736,7 @@ def section_streaming_actors(outer_panels: Panels):
"it's very likely to be the performance bottleneck",
[
panels.target(
# Here we use `min` but actually no much difference. Any of the sampled epoches makes sense.
# Here we use `min` but actually no much difference. Any of the sampled epochs makes sense.
f"min({metric('stream_actor_current_epoch')} != 0) by (fragment_id)",
"fragment {{fragment_id}}",
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions proto/compute.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ message ShowConfigResponse {
string stream_config = 2;
}

message ResizeCacheRequest {
uint64 meta_cache_capacity = 1;
uint64 data_cache_capacity = 2;
}

message ResizeCacheResponse {}

service ConfigService {
rpc ShowConfig(ShowConfigRequest) returns (ShowConfigResponse);
rpc ResizeCache(ResizeCacheRequest) returns (ResizeCacheResponse);
}
52 changes: 50 additions & 2 deletions src/compute/src/rpc/service/config_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,24 @@
// limitations under the License.
use std::sync::Arc;

use foyer::HybridCache;
use risingwave_batch::task::BatchManager;
use risingwave_common::error::tonic::ToTonicStatus;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_pb::compute::config_service_server::ConfigService;
use risingwave_pb::compute::{ShowConfigRequest, ShowConfigResponse};
use risingwave_pb::compute::{
ResizeCacheRequest, ResizeCacheResponse, ShowConfigRequest, ShowConfigResponse,
};
use risingwave_storage::hummock::{Block, Sstable, SstableBlockIndex};
use risingwave_stream::task::LocalStreamManager;
use thiserror_ext::AsReport;
use tonic::{Code, Request, Response, Status};

pub struct ConfigServiceImpl {
batch_mgr: Arc<BatchManager>,
stream_mgr: LocalStreamManager,
meta_cache: Option<HybridCache<HummockSstableObjectId, Box<Sstable>>>,
block_cache: Option<HybridCache<SstableBlockIndex, Box<Block>>>,
}

#[async_trait::async_trait]
Expand All @@ -42,13 +50,53 @@ impl ConfigService for ConfigServiceImpl {
};
Ok(Response::new(show_config_response))
}

async fn resize_cache(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_meta_cache_memory_usage_ratio and get_block_cache_memory_usage_ratio will be inaccurate after this PR. We should change HummockMemoryCollector to use cache.capacity() instead.

&self,
request: Request<ResizeCacheRequest>,
) -> Result<Response<ResizeCacheResponse>, Status> {
let req = request.into_inner();

if let Some(meta_cache) = &self.meta_cache
&& req.meta_cache_capacity > 0
{
match meta_cache.memory().resize(req.meta_cache_capacity as _) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the resize operation is not persistent, which means if the CN restarts, the cache capacity will be back to the configured values from the toml config. Will this be a problem? For example, if we have 2 CNs in the cluster and after resize_cache, 1 CN restarts for some reason while the other one doesn't. I think it depends on when we are going to use resize_cache. If it is mainly for testing or perf tunning and after the tuning we will update the configs, this is fine. But if we rely on resize_cache in production, that will not be ideal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on how we define resize.
Is this a permanent operation or a temporary one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I also think that multi-CN cache config inconsistencies are a concern, and it's not easy to detect. If we allow online resize cache size, then we need to make sure that the operation succeeds on all machines, and is persistent, otherwise this is a risk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the resize operation is not persistent, which means if the CN restarts, the cache capacity will be back to the configured values from the toml config. Will this be a problem?

This feature just aims to resize the in-memory meta/data block cache without downtime. If there are restarts, it is fine to modify the persistent configuration directly. Modifying the per-node configuration can be (and maybe better be) achieved in the cloud control panel.

Ok(_) => tracing::info!(
"resize meta cache capacity to {:?}",
req.meta_cache_capacity
),
Err(e) => return Err(Status::internal(e.to_report_string())),
}
}

if let Some(block_cache) = &self.block_cache
&& req.data_cache_capacity > 0
{
match block_cache.memory().resize(req.data_cache_capacity as _) {
Ok(_) => tracing::info!(
"resize data cache capacity to {:?}",
req.data_cache_capacity
),
Err(e) => return Err(Status::internal(e.to_report_string())),
}
}

Ok(Response::new(ResizeCacheResponse {}))
}
}

impl ConfigServiceImpl {
pub fn new(batch_mgr: Arc<BatchManager>, stream_mgr: LocalStreamManager) -> Self {
pub fn new(
batch_mgr: Arc<BatchManager>,
stream_mgr: LocalStreamManager,
meta_cache: Option<HybridCache<HummockSstableObjectId, Box<Sstable>>>,
block_cache: Option<HybridCache<SstableBlockIndex, Box<Block>>>,
) -> Self {
Self {
batch_mgr,
stream_mgr,
meta_cache,
block_cache,
}
}
}
6 changes: 3 additions & 3 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,10 @@ pub async fn compute_node_serve(
let monitor_srv = MonitorServiceImpl::new(
stream_mgr.clone(),
config.server.clone(),
meta_cache,
block_cache,
meta_cache.clone(),
block_cache.clone(),
);
let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr.clone());
let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr.clone(), meta_cache, block_cache);
let health_srv = HealthServiceImpl::new();

let telemetry_manager = TelemetryManager::new(
Expand Down
2 changes: 2 additions & 0 deletions src/ctl/src/cmd_impl/hummock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod compaction_group;
mod list_version_deltas;
mod migrate_legacy_object;
mod pause_resume;
mod resize_cache;
mod tiered_cache_tracing;
mod trigger_full_gc;
mod trigger_manual_compaction;
Expand All @@ -31,6 +32,7 @@ pub use compaction_group::*;
pub use list_version_deltas::*;
pub use migrate_legacy_object::migrate_legacy_object;
pub use pause_resume::*;
pub use resize_cache::*;
pub use tiered_cache_tracing::*;
pub use trigger_full_gc::*;
pub use trigger_manual_compaction::*;
Expand Down
64 changes: 64 additions & 0 deletions src/ctl/src/cmd_impl/hummock/resize_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::process::exit;

use futures::future::try_join_all;
use risingwave_pb::compute::ResizeCacheRequest;
use risingwave_pb::meta::GetClusterInfoResponse;
use risingwave_rpc_client::ComputeClient;
use thiserror_ext::AsReport;

use crate::common::CtlContext;

macro_rules! fail {
($($arg:tt)*) => {{
println!($($arg)*);
exit(1);
}};
}

pub async fn resize_cache(
context: &CtlContext,
meta_cache_capacity: Option<u64>,
data_cache_capacity: Option<u64>,
) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;

let GetClusterInfoResponse { worker_nodes, .. } = match meta_client.get_cluster_info().await {
Ok(resp) => resp,
Err(e) => {
fail!("Failed to get cluster info: {}", e.as_report());
}
};

let futures = worker_nodes.iter().map(|worker| async {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a question:

I'm considering a partial success. Do we need to provide a retry capability to perform an rpc retry for the wrong worker node, to avoid as much as possible inconsistencies in the configs of multiple CNs? (I believe this is an idempotent operation.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, it is okay to make the user responsible to retry it.

let addr = worker.get_host().expect("worker host must be set");
let client = ComputeClient::new(addr.into())
.await
.unwrap_or_else(|_| panic!("Cannot open client to compute node {addr:?}"));
client
.resize_cache(ResizeCacheRequest {
meta_cache_capacity: meta_cache_capacity.unwrap_or(0),
data_cache_capacity: data_cache_capacity.unwrap_or(0),
})
.await
});

if let Err(e) = try_join_all(futures).await {
fail!("Failed to resize cache: {}", e.as_report())
}

Ok(())
}
18 changes: 18 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ enum HummockCommands {
#[clap(long, default_value = "100")]
concurrency: u32,
},
ResizeCache {
#[clap(long)]
meta_cache_capacity_mb: Option<u64>,
#[clap(long)]
data_cache_capacity_mb: Option<u64>,
},
}

#[derive(Subcommand)]
Expand Down Expand Up @@ -732,6 +738,18 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
}) => {
migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
}
Commands::Hummock(HummockCommands::ResizeCache {
meta_cache_capacity_mb,
data_cache_capacity_mb,
}) => {
const MIB: u64 = 1024 * 1024;
cmd_impl::hummock::resize_cache(
context,
meta_cache_capacity_mb.map(|v| v * MIB),
data_cache_capacity_mb.map(|v| v * MIB),
)
.await?
}
Commands::Table(TableCommands::Scan {
mv_name,
data_dir,
Expand Down
14 changes: 13 additions & 1 deletion src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use risingwave_common::util::tracing::TracingContext;
use risingwave_pb::batch_plan::{PlanFragment, TaskId, TaskOutputId};
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::compute::config_service_client::ConfigServiceClient;
use risingwave_pb::compute::{ShowConfigRequest, ShowConfigResponse};
use risingwave_pb::compute::{
ResizeCacheRequest, ResizeCacheResponse, ShowConfigRequest, ShowConfigResponse,
};
use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
use risingwave_pb::monitor_service::{
AnalyzeHeapRequest, AnalyzeHeapResponse, GetBackPressureRequest, GetBackPressureResponse,
Expand Down Expand Up @@ -277,6 +279,16 @@ impl ComputeClient {
.map_err(RpcError::from_compute_status)?
.into_inner())
}

pub async fn resize_cache(&self, request: ResizeCacheRequest) -> Result<ResizeCacheResponse> {
Ok(self
.config_client
.to_owned()
.resize_cache(request)
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}
}

#[async_trait]
Expand Down
4 changes: 0 additions & 4 deletions src/storage/src/hummock/sstable_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,10 +611,6 @@ impl SstableStore {
);
}

pub fn get_meta_memory_usage(&self) -> u64 {
self.meta_cache.memory().usage() as _
}

pub fn get_prefetch_memory_usage(&self) -> usize {
self.prefetch_buffer_usage.load(Ordering::Acquire)
}
Expand Down
10 changes: 5 additions & 5 deletions src/storage/src/hummock/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ impl HummockMemoryCollector {

impl MemoryCollector for HummockMemoryCollector {
fn get_meta_memory_usage(&self) -> u64 {
self.sstable_store.get_meta_memory_usage()
self.sstable_store.meta_cache().memory().usage() as _
}

fn get_data_memory_usage(&self) -> u64 {
Expand All @@ -701,13 +701,13 @@ impl MemoryCollector for HummockMemoryCollector {
}

fn get_meta_cache_memory_usage_ratio(&self) -> f64 {
self.sstable_store.get_meta_memory_usage() as f64
/ (self.storage_memory_config.meta_cache_capacity_mb * 1024 * 1024) as f64
self.sstable_store.meta_cache().memory().usage() as f64
/ self.sstable_store.meta_cache().memory().capacity() as f64
}

fn get_block_cache_memory_usage_ratio(&self) -> f64 {
self.get_data_memory_usage() as f64
/ (self.storage_memory_config.block_cache_capacity_mb * 1024 * 1024) as f64
self.sstable_store.block_cache().memory().usage() as f64
/ self.sstable_store.block_cache().memory().capacity() as f64
}

fn get_shared_buffer_usage_ratio(&self) -> f64 {
Expand Down
Loading