diff --git a/src/metric_engine/src/compaction/executor.rs b/src/metric_engine/src/compaction/executor.rs index cc2f14149b..77adf22b39 100644 --- a/src/metric_engine/src/compaction/executor.rs +++ b/src/metric_engine/src/compaction/executor.rs @@ -30,6 +30,7 @@ use parquet::{ arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter}, file::properties::WriterProperties, }; +use tokio::sync::mpsc::Sender; use tracing::{debug, error, trace}; use crate::{ @@ -57,6 +58,7 @@ struct Inner { write_props: WriterProperties, inused_memory: AtomicU64, mem_limit: u64, + trigger_tx: Sender<()>, } impl Executor { @@ -70,6 +72,7 @@ impl Executor { parquet_reader: Arc, write_props: WriterProperties, mem_limit: u64, + trigger_tx: Sender<()>, ) -> Self { let inner = Inner { runtime, @@ -81,6 +84,7 @@ impl Executor { write_props, mem_limit, inused_memory: AtomicU64::new(0), + trigger_tx, }; Self { inner: Arc::new(inner), @@ -115,6 +119,9 @@ impl Executor { self.inner .inused_memory .fetch_sub(task_size, Ordering::Relaxed); + if let Err(e) = self.inner.trigger_tx.try_send(()) { + debug!("send pick task trigger signal failed, err{e:?}"); + } } pub fn on_failure(&self, task: &Task) { diff --git a/src/metric_engine/src/compaction/picker.rs b/src/metric_engine/src/compaction/picker.rs index 5dbe16443f..ac104ac764 100644 --- a/src/metric_engine/src/compaction/picker.rs +++ b/src/metric_engine/src/compaction/picker.rs @@ -35,6 +35,7 @@ impl Picker { segment_duration: Duration, new_sst_max_size: u64, input_sst_max_num: usize, + input_sst_min_num: usize, ) -> Self { Self { manifest, @@ -43,6 +44,7 @@ impl Picker { segment_duration, new_sst_max_size, input_sst_max_num, + input_sst_min_num, ), } } @@ -58,6 +60,7 @@ pub struct TimeWindowCompactionStrategy { segment_duration: Duration, new_sst_max_size: u64, input_sst_max_num: usize, + input_sst_min_num: usize, } impl TimeWindowCompactionStrategy { @@ -65,11 +68,13 @@ impl TimeWindowCompactionStrategy { segment_duration: Duration, new_sst_max_size: u64, input_sst_max_num: usize, + input_sst_min_num: usize, ) -> Self { Self { segment_duration, new_sst_max_size, input_sst_max_num, + input_sst_min_num, } } @@ -150,7 +155,7 @@ impl TimeWindowCompactionStrategy { ) -> Option> { for (segment, mut files) in files_by_segment.into_iter().rev() { trace!(segment = ?segment, files = ?files, "Loop segment for pick files"); - if files.len() < 2 { + if files.len() < self.input_sst_min_num { continue; } @@ -170,7 +175,7 @@ impl TimeWindowCompactionStrategy { }) .collect::>(); - if compaction_files.len() >= 2 { + if compaction_files.len() >= self.input_sst_min_num { return Some(compaction_files); } } @@ -192,7 +197,7 @@ mod tests { #[test] fn test_pick_candidate() { let segment_duration = Duration::from_millis(20); - let strategy = TimeWindowCompactionStrategy::new(segment_duration, 9999, 10); + let strategy = TimeWindowCompactionStrategy::new(segment_duration, 9999, 10, 2); let ssts = (0_i64..5_i64) .map(|i| { diff --git a/src/metric_engine/src/compaction/scheduler.rs b/src/metric_engine/src/compaction/scheduler.rs index 74a6870908..22a5bca81d 100644 --- a/src/metric_engine/src/compaction/scheduler.rs +++ b/src/metric_engine/src/compaction/scheduler.rs @@ -63,6 +63,7 @@ impl Scheduler { let store = store.clone(); let manifest = manifest.clone(); let write_props = config.write_props.clone(); + let trigger_tx = trigger_tx.clone(); let executor = Executor::new( runtime.clone(), store, @@ -72,6 +73,7 @@ impl Scheduler { parquet_reader, write_props, config.memory_limit, + trigger_tx.clone(), ); runtime.spawn(async move { @@ -86,6 +88,7 @@ impl Scheduler { segment_duration, config.new_sst_max_size, config.input_sst_max_num, + config.input_sst_min_num, ); Self::generate_task_loop(task_tx, trigger_rx, picker, config.schedule_interval) .await; diff --git a/src/metric_engine/src/config.rs b/src/metric_engine/src/config.rs index 49cb8afdc3..5b2d7497f7 100644 --- a/src/metric_engine/src/config.rs +++ b/src/metric_engine/src/config.rs @@ -35,6 +35,7 @@ pub struct SchedulerConfig { pub ttl: Option, pub new_sst_max_size: u64, pub input_sst_max_num: usize, + pub input_sst_min_num: usize, } impl Default for SchedulerConfig { @@ -47,6 +48,7 @@ impl Default for SchedulerConfig { ttl: None, new_sst_max_size: bytesize::gb(1_u64), input_sst_max_num: 30, + input_sst_min_num: 5, } } } diff --git a/src/metric_engine/src/read.rs b/src/metric_engine/src/read.rs index 2acd0eed77..2a28836f18 100644 --- a/src/metric_engine/src/read.rs +++ b/src/metric_engine/src/read.rs @@ -426,7 +426,7 @@ impl ParquetReader { }) .collect::>(); let scan_config = FileScanConfig::new(dummy_url, self.schema.arrow_schema.clone()) - .with_output_ordering(vec![sort_exprs.clone(); file_groups.len()]) + .with_output_ordering(vec![sort_exprs.clone()]) .with_file_groups(file_groups) .with_projection(projections); @@ -443,8 +443,6 @@ impl ParquetReader { // when convert between arrow and parquet. let parquet_exec = builder.build(); let sort_exec = SortPreservingMergeExec::new(sort_exprs, Arc::new(parquet_exec)) - // TODO: make fetch size configurable. - .with_fetch(Some(1024)) .with_round_robin_repartition(true); let merge_exec = MergeExec::new( diff --git a/src/server/Cargo.toml b/src/server/Cargo.toml index f38d3f185d..94617fa98c 100644 --- a/src/server/Cargo.toml +++ b/src/server/Cargo.toml @@ -43,4 +43,4 @@ serde = { workspace = true } tokio = { workspace = true } toml = { workspace = true } tracing = { workspace = true } -tracing-subscriber = { workspace = true, features = ["local-time"] } +tracing-subscriber = { workspace = true, features = ["local-time", "env-filter"] } diff --git a/src/server/src/main.rs b/src/server/src/main.rs index ab5a93611e..cdfd38ba02 100644 --- a/src/server/src/main.rs +++ b/src/server/src/main.rs @@ -39,6 +39,7 @@ use metric_engine::{ }; use object_store::local::LocalFileSystem; use tracing::{error, info}; +use tracing_subscriber::EnvFilter; #[derive(Parser, Debug)] #[command(version, about, long_about)] @@ -70,6 +71,7 @@ pub fn main() { .with_file(true) .with_line_number(true) .with_target(false) + .with_env_filter(EnvFilter::from_default_env()) .with_timer(tracing_subscriber::fmt::time::LocalTime::rfc_3339()) .init();