Skip to content

Commit

Permalink
refactor(memory): use 0.7 * system memory as default `total_memory_by…
Browse files Browse the repository at this point in the history
…tes` (#12832)
  • Loading branch information
fuyufjh authored Oct 13, 2023
1 parent 7761c4f commit ac4ae56
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 44 deletions.
2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ pub struct StorageConfig {
pub compactor_max_task_multiplier: f32,

/// The percentage of memory available when compactor is deployed separately.
/// total_memory_available_bytes = total_memory_available_bytes *
/// total_memory_available_bytes = system_memory_available_bytes *
/// compactor_memory_available_proportion
#[serde(default = "default::storage::compactor_memory_available_proportion")]
pub compactor_memory_available_proportion: f64,
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use sysinfo::{System, SystemExt};

use crate::util::env_var::env_var_is_true_or;
use crate::util::resource_util::cpu::total_cpu_available;
use crate::util::resource_util::memory::{total_memory_available_bytes, total_memory_used_bytes};
use crate::util::resource_util::memory::{system_memory_available_bytes, total_memory_used_bytes};

/// Url of telemetry backend
pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v1/report";
Expand Down Expand Up @@ -98,7 +98,7 @@ impl SystemData {
let mut sys = System::new();

let memory = {
let available = total_memory_available_bytes();
let available = system_memory_available_bytes();
let used = total_memory_used_bytes();
Memory {
available,
Expand Down
64 changes: 32 additions & 32 deletions src/common/src/util/resource_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ pub enum CgroupVersion {
V2,
}

// Current controllers available in implementation.
/// Current controllers available in implementation.
pub enum Controller {
Cpu,
Memory,
}

// Default constant Cgroup paths and hierarchy.
/// Default constant Cgroup paths and hierarchy.
const DEFAULT_CGROUP_ROOT_HIERARCYHY: &str = "/sys/fs/cgroup";
const DEFAULT_CGROUP_V2_CONTROLLER_LIST_PATH: &str = "/sys/fs/cgroup/cgroup.controllers";
const DEFAULT_CGROUP_MAX_INDICATOR: &str = "max";
Expand All @@ -44,33 +44,33 @@ mod runtime {
env::consts::OS.eq(DEFAULT_LINUX_IDENTIFIER)
}

// checks if is running in a docker container by checking for docker env file, or if it is
// running in a kubernetes pod.
/// checks if is running in a docker container by checking for docker env file, or if it is
/// running in a kubernetes pod.
fn is_running_in_container() -> bool {
return env_var_check_if_running_in_container()
|| docker_env_exists()
|| is_running_in_kubernetes_pod();

// checks for existence of docker env file
/// checks for existence of docker env file
fn docker_env_exists() -> bool {
Path::new(DEFAULT_DOCKER_ENV_PATH).exists()
}

// checks for environment
/// checks for environment
fn env_var_check_if_running_in_container() -> bool {
env::var(DEFAULT_IN_CONTAINER_ENV_VARIABLE).is_ok()
}

// checks if it is running in a kubernetes pod
/// checks if it is running in a kubernetes pod
fn is_running_in_kubernetes_pod() -> bool {
Path::new(DEFAULT_KUBERNETES_SECRETS_PATH).exists()
}
}

// Given a certain controller, checks if it is enabled.
// For cgroup_v1, existence of directory with controller name is checked in cgroup default root
// hierarchy. e.g if directory "/sys/fs/cgroup"/cpu" exists then CPU controller is enabled.
// For cgroup_v2, check the controller list path for the controller name.
/// Given a certain controller, checks if it is enabled.
/// For cgroup v1, existence of directory with controller name is checked in cgroup default root
/// hierarchy. e.g if directory "/sys/fs/cgroup"/cpu" exists then CPU controller is enabled.
/// For cgroup v2, check the controller list path for the controller name.
pub fn is_controller_activated(
controller_type: super::Controller,
cgroup_version: CgroupVersion,
Expand All @@ -90,7 +90,7 @@ mod runtime {
}
}

// If cgroup exists or is enabled in kernel, returnb true, else false.
/// If cgroup exists or is enabled in kernel, returnb true, else false.
fn cgroup_exists() -> bool {
Path::new(super::DEFAULT_CGROUP_ROOT_HIERARCYHY).is_dir()
}
Expand Down Expand Up @@ -134,20 +134,20 @@ pub mod memory {

use super::runtime::get_resource;

// Default paths for memory limtiations and usage for cgroup_v1 and cgroup_v2.
/// Default paths for memory limtiations and usage for cgroup v1 and cgroup v2.
const V1_MEMORY_LIMIT_PATH: &str = "/sys/fs/cgroup/memory/memory.limit_in_bytes";
const V1_MEMORY_CURRENT_PATH: &str = "/sys/fs/cgroup/memory/memory.usage_in_bytes";
const V2_MEMORY_LIMIT_PATH: &str = "/sys/fs/cgroup/memory.max";
const V2_MEMORY_CURRENT_PATH: &str = "/sys/fs/cgroup/memory.current";

// Returns the system memory.
/// Returns the system memory.
pub fn get_system_memory() -> usize {
let mut sys = System::new();
sys.refresh_memory();
sys.total_memory() as usize
}

// Returns the used memory of the system.
/// Returns the used memory of the system.
pub fn get_system_memory_used() -> usize {
let mut sys = System::new();
sys.refresh_memory();
Expand Down Expand Up @@ -187,9 +187,9 @@ pub mod memory {
///
/// Basic usage:
/// ``` ignore
/// let mem_available = memory::total_memory_available_bytes();
/// let mem_available = memory::system_memory_available_bytes();
/// ```
pub fn total_memory_available_bytes() -> usize {
pub fn system_memory_available_bytes() -> usize {
get_resource(
"memory available",
super::Controller::Memory,
Expand All @@ -198,10 +198,10 @@ pub mod memory {
)
}

// Returns the memory limit of a container if running in a container else returns the system
// memory available.
// When the limit is set to max, total_memory_available_bytes() will return default system
// memory.
/// Returns the memory limit of a container if running in a container else returns the system
/// memory available.
/// When the limit is set to max, [`system_memory_available_bytes()`] will return default system
/// memory.
fn get_container_memory_limit(
cgroup_version: super::CgroupVersion,
) -> Result<usize, std::io::Error> {
Expand All @@ -214,8 +214,8 @@ pub mod memory {
Ok(std::cmp::min(value, system))
}

// Returns the memory used in a container if running in a container else returns the system
// memory used.
/// Returns the memory used in a container if running in a container else returns the system
/// memory used.
fn get_container_memory_used(
cgroup_version: super::CgroupVersion,
) -> Result<usize, std::io::Error> {
Expand All @@ -235,7 +235,7 @@ pub mod cpu {
use super::runtime::get_resource;
use super::util::parse_error;

// Default constant Cgroup paths and hierarchy.
/// Default constant Cgroup paths and hierarchy.
const V1_CPU_QUOTA_PATH: &str = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
const V1_CPU_PERIOD_PATH: &str = "/sys/fs/cgroup/cpu/cpu.cfs_period_us";
const V2_CPU_LIMIT_PATH: &str = "/sys/fs/cgroup/cpu.max";
Expand Down Expand Up @@ -264,7 +264,7 @@ pub mod cpu {
)
}

// Returns the CPU limit of the container.
/// Returns the CPU limit of the container.
fn get_container_cpu_limit(
cgroup_version: super::CgroupVersion,
) -> Result<f32, std::io::Error> {
Expand All @@ -277,15 +277,15 @@ pub mod cpu {
}
}

// Returns the total system cpu.
/// Returns the total system cpu.
pub fn get_system_cpu() -> f32 {
match thread::available_parallelism() {
Ok(available_parallelism) => available_parallelism.get() as f32,
Err(e) => panic!("Failed to get available parallelism, error: {}", e),
}
}

// Returns the CPU limit when cgroup_V1 is utilised.
/// Returns the CPU limit when cgroup v1 is utilised.
pub fn get_cpu_limit_v1(
quota_path: &str,
period_path: &str,
Expand All @@ -304,7 +304,7 @@ pub mod cpu {
Ok((cpu_quota as f32) / (cpu_period as f32))
}

// Returns the CPU limit when cgroup_V2 is utilised.
/// Returns the CPU limit when cgroup v2 is utilised.
pub fn get_cpu_limit_v2(limit_path: &str, max_value: f32) -> Result<f32, std::io::Error> {
let cpu_limit_string = fs_err::read_to_string(limit_path)?;

Expand Down Expand Up @@ -333,7 +333,7 @@ pub mod cpu {
}

mod util {
// Parses the filepath and checks for the existence of controller_name in the file.
/// Parses the filepath and checks for the existence of `controller_name` in the file.
pub fn parse_controller_enable_file_for_cgroup_v2(
file_path: &str,
controller_name: &str,
Expand Down Expand Up @@ -362,7 +362,7 @@ mod util {
)
}

// Reads an integer value from a file path.
/// Reads an integer value from a file path.
pub fn read_usize(file_path: &str) -> Result<usize, std::io::Error> {
let content = fs_err::read_to_string(file_path)?;
let limit_val = content
Expand All @@ -372,8 +372,8 @@ mod util {
Ok(limit_val)
}

// Helper function that helps to retrieve value in file, if value is "max", max_value will be
// returned instead.
/// Helper function that helps to retrieve value in file, if value is "max", `max_value` will be
/// returned instead.
pub fn read_usize_or_max(file_path: &str, max_value: usize) -> Result<usize, std::io::Error> {
let content = fs_err::read_to_string(file_path)?;
if content.trim() == super::DEFAULT_CGROUP_MAX_INDICATOR {
Expand Down
14 changes: 9 additions & 5 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ use std::pin::Pin;
use clap::{Parser, ValueEnum};
use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig};
use risingwave_common::util::resource_util::cpu::total_cpu_available;
use risingwave_common::util::resource_util::memory::total_memory_available_bytes;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use serde::{Deserialize, Serialize};

/// If `total_memory_bytes` is not specified, the default memory limit will be set to
/// the system memory limit multiplied by this proportion
const DEFAULT_MEMORY_PROPORTION: f64 = 0.7;

/// Command-line arguments for compute-node.
#[derive(Parser, Clone, Debug, OverrideConfig)]
#[command(
Expand Down Expand Up @@ -168,9 +172,9 @@ impl Role {
}

fn validate_opts(opts: &ComputeNodeOpts) {
let total_memory_available_bytes = total_memory_available_bytes();
if opts.total_memory_bytes > total_memory_available_bytes {
let error_msg = format!("total_memory_bytes {} is larger than the total memory available bytes {} that can be acquired.", opts.total_memory_bytes, total_memory_available_bytes);
let system_memory_available_bytes = system_memory_available_bytes();
if opts.total_memory_bytes > system_memory_available_bytes {
let error_msg = format!("total_memory_bytes {} is larger than the total memory available bytes {} that can be acquired.", opts.total_memory_bytes, system_memory_available_bytes);
tracing::error!(error_msg);
panic!("{}", error_msg);
}
Expand Down Expand Up @@ -224,7 +228,7 @@ pub fn start(opts: ComputeNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>>
}

fn default_total_memory_bytes() -> usize {
total_memory_available_bytes()
(system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
}

fn default_parallelism() -> usize {
Expand Down
5 changes: 3 additions & 2 deletions src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::LazyLock;
use jni::strings::JNIString;
use jni::{InitArgsBuilder, JNIVersion, JavaVM, NativeMethod};
use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::util::resource_util::memory::total_memory_available_bytes;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;

pub static JVM: LazyLock<Result<JavaVM, RwError>> = LazyLock::new(|| {
let libs_path = if let Ok(libs_path) = std::env::var("CONNECTOR_LIBS_PATH") {
Expand Down Expand Up @@ -65,7 +65,8 @@ pub static JVM: LazyLock<Result<JavaVM, RwError>> = LazyLock::new(|| {
heap_size
} else {
// Use 10% of total memory by default
format!("{}", total_memory_available_bytes() / 10)
// TODO: should use compute-node's total_memory_bytes
format!("{}", system_memory_available_bytes() / 10)
};

// Build the VM properties
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {

validate_config(&config);

let total_memory_bytes = resource_util::memory::total_memory_available_bytes();
let total_memory_bytes = resource_util::memory::system_memory_available_bytes();
let heap_profiler =
HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
// Run a background heap profiler
Expand Down
2 changes: 1 addition & 1 deletion src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub async fn prepare_start_parameters(
&storage_memory_config,
)));
let total_memory_available_bytes =
(resource_util::memory::total_memory_available_bytes() as f64
(resource_util::memory::system_memory_available_bytes() as f64
* config.storage.compactor_memory_available_proportion) as usize;
let meta_cache_capacity_bytes = storage_opts.meta_cache_capacity_mb * (1 << 20);
let compactor_memory_limit_bytes = match config.storage.compactor_memory_limit_mb {
Expand Down

0 comments on commit ac4ae56

Please sign in to comment.