Skip to content

Commit

Permalink
add config for storage
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Dec 21, 2024
1 parent 6954816 commit cbe305d
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 106 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/metric_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod executor;
mod picker;
mod scheduler;

pub use scheduler::{Scheduler as CompactionScheduler, SchedulerConfig};
pub use scheduler::Scheduler as CompactionScheduler;

use crate::sst::SstFile;

Expand Down
29 changes: 1 addition & 28 deletions src/metric_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use std::{sync::Arc, time::Duration};

use anyhow::Context;
use parquet::file::properties::WriterProperties;
use tokio::{
sync::mpsc::{self, Receiver, Sender},
task::JoinHandle,
Expand All @@ -29,6 +28,7 @@ use tracing::{info, warn};
use super::{executor::Executor, picker::Picker};
use crate::{
compaction::Task,
config::SchedulerConfig,
manifest::ManifestRef,
read::ParquetReader,
sst::SstPathGenerator,
Expand Down Expand Up @@ -149,30 +149,3 @@ impl Scheduler {
}
}
}

#[derive(Clone)]
pub struct SchedulerConfig {
pub schedule_interval: Duration,
pub max_pending_compaction_tasks: usize,
// Runner config
pub memory_limit: u64,
pub write_props: WriterProperties,
// Picker config
pub ttl: Option<Duration>,
pub new_sst_max_size: u64,
pub input_sst_max_num: usize,
}

impl Default for SchedulerConfig {
fn default() -> Self {
Self {
schedule_interval: Duration::from_secs(5),
max_pending_compaction_tasks: 10,
memory_limit: bytesize::gb(30_u64),
write_props: WriterProperties::default(),
ttl: None,
new_sst_max_size: bytesize::gb(1_u64),
input_sst_max_num: 30,
}
}
}
117 changes: 117 additions & 0 deletions src/metric_engine/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{collections::HashMap, time::Duration};

use parquet::{
basic::{Compression, Encoding, ZstdLevel},
file::properties::WriterProperties,
};

use crate::types::UpdateMode;

#[derive(Clone)]
pub struct SchedulerConfig {
pub schedule_interval: Duration,
pub max_pending_compaction_tasks: usize,
// Runner config
pub memory_limit: u64,
pub write_props: WriterProperties,
// Picker config
pub ttl: Option<Duration>,
pub new_sst_max_size: u64,
pub input_sst_max_num: usize,
}

impl Default for SchedulerConfig {
fn default() -> Self {
Self {
schedule_interval: Duration::from_secs(10),
max_pending_compaction_tasks: 10,
memory_limit: bytesize::gb(3_u64),
write_props: WriterProperties::default(),
ttl: None,
new_sst_max_size: bytesize::gb(1_u64),
input_sst_max_num: 30,
}
}
}

#[derive(Debug)]
pub struct ColumnOptions {
pub enable_dict: Option<bool>,
pub enable_bloom_filter: Option<bool>,
pub encoding: Option<Encoding>,
pub compression: Option<Compression>,
}

#[derive(Debug)]
pub struct WriteOptions {
pub max_row_group_size: usize,
pub write_bacth_size: usize,
pub enable_sorting_columns: bool,
// use to set column props with default value
pub enable_dict: bool,
pub enable_bloom_filter: bool,
pub encoding: Encoding,
pub compression: Compression,
// use to set column props with column name
pub column_options: Option<HashMap<String, ColumnOptions>>,
}

impl Default for WriteOptions {
fn default() -> Self {
Self {
max_row_group_size: 8192,
write_bacth_size: 1024,
enable_sorting_columns: true,
enable_dict: false,
enable_bloom_filter: false,
encoding: Encoding::PLAIN,
compression: Compression::ZSTD(ZstdLevel::default()),
column_options: None,
}
}
}

#[derive(Debug)]
pub struct ManifestMergeOptions {
pub channel_size: usize,
pub merge_interval_seconds: usize,
pub min_merge_threshold: usize,
pub hard_merge_threshold: usize,
pub soft_merge_threshold: usize,
}

impl Default for ManifestMergeOptions {
fn default() -> Self {
Self {
channel_size: 3,
merge_interval_seconds: 5,
min_merge_threshold: 10,
soft_merge_threshold: 50,
hard_merge_threshold: 90,
}
}
}

#[derive(Debug, Default)]
pub struct StorageOptions {
pub write_opts: WriteOptions,
pub manifest_merge_opts: ManifestMergeOptions,
pub update_mode: UpdateMode,
}
1 change: 1 addition & 0 deletions src/metric_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#![feature(duration_constructors)]
mod compaction;
pub mod config;
pub mod error;
mod macros;
pub mod manifest;
Expand Down
3 changes: 2 additions & 1 deletion src/metric_engine/src/manifest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ use tokio::sync::{
use tracing::{debug, error, info, trace};

use crate::{
config::ManifestMergeOptions,
sst::{FileId, FileMeta, SstFile},
types::{ManifestMergeOptions, ObjectStoreRef, RuntimeRef, TimeRange},
types::{ObjectStoreRef, RuntimeRef, TimeRange},
AnyhowError, Result,
};

Expand Down
8 changes: 3 additions & 5 deletions src/metric_engine/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,13 @@ use parquet::{
use tokio::runtime::Runtime;

use crate::{
compaction::{CompactionScheduler, SchedulerConfig},
compaction::CompactionScheduler,
config::{SchedulerConfig, StorageOptions, WriteOptions},
ensure,
manifest::{Manifest, ManifestRef},
read::ParquetReader,
sst::{FileMeta, SstFile, SstPathGenerator},
types::{
ObjectStoreRef, StorageOptions, StorageSchema, TimeRange, WriteOptions, WriteResult,
SEQ_COLUMN_NAME,
},
types::{ObjectStoreRef, StorageSchema, TimeRange, WriteResult, SEQ_COLUMN_NAME},
Result,
};

Expand Down
67 changes: 0 additions & 67 deletions src/metric_engine/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@
// under the License.

use std::{
collections::HashMap,
ops::{Add, Deref, Range},
sync::Arc,
time::Duration,
};

use arrow_schema::SchemaRef;
use object_store::ObjectStore;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use tokio::runtime::Runtime;

use crate::sst::FileId;
Expand Down Expand Up @@ -127,78 +125,13 @@ pub struct WriteResult {
pub size: usize,
}

#[derive(Debug)]
pub struct ColumnOptions {
pub enable_dict: Option<bool>,
pub enable_bloom_filter: Option<bool>,
pub encoding: Option<Encoding>,
pub compression: Option<Compression>,
}

#[derive(Debug)]
pub struct WriteOptions {
pub max_row_group_size: usize,
pub write_bacth_size: usize,
pub enable_sorting_columns: bool,
// use to set column props with default value
pub enable_dict: bool,
pub enable_bloom_filter: bool,
pub encoding: Encoding,
pub compression: Compression,
// use to set column props with column name
pub column_options: Option<HashMap<String, ColumnOptions>>,
}

impl Default for WriteOptions {
fn default() -> Self {
Self {
max_row_group_size: 8192,
write_bacth_size: 1024,
enable_sorting_columns: true,
enable_dict: false,
enable_bloom_filter: false,
encoding: Encoding::PLAIN,
compression: Compression::ZSTD(ZstdLevel::default()),
column_options: None,
}
}
}

#[derive(Debug)]
pub struct ManifestMergeOptions {
pub channel_size: usize,
pub merge_interval_seconds: usize,
pub min_merge_threshold: usize,
pub hard_merge_threshold: usize,
pub soft_merge_threshold: usize,
}

impl Default for ManifestMergeOptions {
fn default() -> Self {
Self {
channel_size: 3,
merge_interval_seconds: 5,
min_merge_threshold: 10,
soft_merge_threshold: 50,
hard_merge_threshold: 90,
}
}
}

#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum UpdateMode {
#[default]
Overwrite,
Append,
}

#[derive(Debug, Default)]
pub struct StorageOptions {
pub write_opts: WriteOptions,
pub manifest_merge_opts: ManifestMergeOptions,
pub update_mode: UpdateMode,
}

#[derive(Debug, Clone)]
pub struct StorageSchema {
pub arrow_schema: SchemaRef,
Expand Down
2 changes: 1 addition & 1 deletion src/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ serde = { workspace = true }
tokio = { workspace = true }
toml = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
tracing-subscriber = { workspace = true , features = ["local-time"]}
11 changes: 8 additions & 3 deletions src/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ use arrow::{
use clap::Parser;
use config::{Config, StorageConfig};
use metric_engine::{
config::StorageOptions,
storage::{
CloudObjectStorage, CompactRequest, StorageRuntimes, TimeMergeStorageRef, WriteRequest,
},
types::{RuntimeRef, StorageOptions},
types::RuntimeRef,
};
use object_store::local::LocalFileSystem;
use tracing::{error, info};
Expand Down Expand Up @@ -65,8 +66,12 @@ struct AppState {
}

pub fn main() {
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt::init();
tracing_subscriber::fmt()
.with_file(true)
.with_line_number(true)
.with_target(false)
.with_timer(tracing_subscriber::fmt::time::LocalTime::rfc_3339())
.init();

let args = Args::parse();
let config_body = fs::read_to_string(args.config).expect("read config file failed");
Expand Down

0 comments on commit cbe305d

Please sign in to comment.