Skip to content

Commit

Permalink
add compaction task min input size
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Dec 22, 2024
1 parent 9358266 commit 0e3e4d1
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 7 deletions.
7 changes: 7 additions & 0 deletions src/metric_engine/src/compaction/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use parquet::{
arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter},
file::properties::WriterProperties,
};
use tokio::sync::mpsc::Sender;
use tracing::{debug, error, trace};

use crate::{
Expand Down Expand Up @@ -57,6 +58,7 @@ struct Inner {
write_props: WriterProperties,
inused_memory: AtomicU64,
mem_limit: u64,
trigger_tx: Sender<()>,
}

impl Executor {
Expand All @@ -70,6 +72,7 @@ impl Executor {
parquet_reader: Arc<ParquetReader>,
write_props: WriterProperties,
mem_limit: u64,
trigger_tx: Sender<()>,
) -> Self {
let inner = Inner {
runtime,
Expand All @@ -81,6 +84,7 @@ impl Executor {
write_props,
mem_limit,
inused_memory: AtomicU64::new(0),
trigger_tx,
};
Self {
inner: Arc::new(inner),
Expand Down Expand Up @@ -115,6 +119,9 @@ impl Executor {
self.inner
.inused_memory
.fetch_sub(task_size, Ordering::Relaxed);
if let Err(e) = self.inner.trigger_tx.try_send(()) {
debug!("send pick task trigger signal failed, err{e:?}");
}
}

pub fn on_failure(&self, task: &Task) {
Expand Down
11 changes: 8 additions & 3 deletions src/metric_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl Picker {
segment_duration: Duration,
new_sst_max_size: u64,
input_sst_max_num: usize,
input_sst_min_num: usize,
) -> Self {
Self {
manifest,
Expand All @@ -43,6 +44,7 @@ impl Picker {
segment_duration,
new_sst_max_size,
input_sst_max_num,
input_sst_min_num,
),
}
}
Expand All @@ -58,18 +60,21 @@ pub struct TimeWindowCompactionStrategy {
segment_duration: Duration,
new_sst_max_size: u64,
input_sst_max_num: usize,
input_sst_min_num: usize,
}

impl TimeWindowCompactionStrategy {
pub fn new(
segment_duration: Duration,
new_sst_max_size: u64,
input_sst_max_num: usize,
input_sst_min_num: usize,
) -> Self {
Self {
segment_duration,
new_sst_max_size,
input_sst_max_num,
input_sst_min_num,
}
}

Expand Down Expand Up @@ -150,7 +155,7 @@ impl TimeWindowCompactionStrategy {
) -> Option<Vec<SstFile>> {
for (segment, mut files) in files_by_segment.into_iter().rev() {
trace!(segment = ?segment, files = ?files, "Loop segment for pick files");
if files.len() < 2 {
if files.len() < self.input_sst_min_num {
continue;
}

Expand All @@ -170,7 +175,7 @@ impl TimeWindowCompactionStrategy {
})
.collect::<Vec<_>>();

if compaction_files.len() >= 2 {
if compaction_files.len() >= self.input_sst_min_num {
return Some(compaction_files);
}
}
Expand All @@ -192,7 +197,7 @@ mod tests {
#[test]
fn test_pick_candidate() {
let segment_duration = Duration::from_millis(20);
let strategy = TimeWindowCompactionStrategy::new(segment_duration, 9999, 10);
let strategy = TimeWindowCompactionStrategy::new(segment_duration, 9999, 10, 2);

let ssts = (0_i64..5_i64)
.map(|i| {
Expand Down
3 changes: 3 additions & 0 deletions src/metric_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl Scheduler {
let store = store.clone();
let manifest = manifest.clone();
let write_props = config.write_props.clone();
let trigger_tx = trigger_tx.clone();
let executor = Executor::new(
runtime.clone(),
store,
Expand All @@ -72,6 +73,7 @@ impl Scheduler {
parquet_reader,
write_props,
config.memory_limit,
trigger_tx.clone(),
);

runtime.spawn(async move {
Expand All @@ -86,6 +88,7 @@ impl Scheduler {
segment_duration,
config.new_sst_max_size,
config.input_sst_max_num,
config.input_sst_min_num,
);
Self::generate_task_loop(task_tx, trigger_rx, picker, config.schedule_interval)
.await;
Expand Down
2 changes: 2 additions & 0 deletions src/metric_engine/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct SchedulerConfig {
pub ttl: Option<Duration>,
pub new_sst_max_size: u64,
pub input_sst_max_num: usize,
pub input_sst_min_num: usize,
}

impl Default for SchedulerConfig {
Expand All @@ -47,6 +48,7 @@ impl Default for SchedulerConfig {
ttl: None,
new_sst_max_size: bytesize::gb(1_u64),
input_sst_max_num: 30,
input_sst_min_num: 5,
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions src/metric_engine/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ impl ParquetReader {
})
.collect::<Vec<_>>();
let scan_config = FileScanConfig::new(dummy_url, self.schema.arrow_schema.clone())
.with_output_ordering(vec![sort_exprs.clone(); file_groups.len()])
.with_output_ordering(vec![sort_exprs.clone()])
.with_file_groups(file_groups)
.with_projection(projections);

Expand All @@ -443,8 +443,6 @@ impl ParquetReader {
// when convert between arrow and parquet.
let parquet_exec = builder.build();
let sort_exec = SortPreservingMergeExec::new(sort_exprs, Arc::new(parquet_exec))
// TODO: make fetch size configurable.
.with_fetch(Some(1024))
.with_round_robin_repartition(true);

let merge_exec = MergeExec::new(
Expand Down
2 changes: 1 addition & 1 deletion src/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ serde = { workspace = true }
tokio = { workspace = true }
toml = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["local-time"] }
tracing-subscriber = { workspace = true, features = ["local-time", "env-filter"] }
2 changes: 2 additions & 0 deletions src/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use metric_engine::{
};
use object_store::local::LocalFileSystem;
use tracing::{error, info};
use tracing_subscriber::EnvFilter;

#[derive(Parser, Debug)]
#[command(version, about, long_about)]
Expand Down Expand Up @@ -70,6 +71,7 @@ pub fn main() {
.with_file(true)
.with_line_number(true)
.with_target(false)
.with_env_filter(EnvFilter::from_default_env())
.with_timer(tracing_subscriber::fmt::time::LocalTime::rfc_3339())
.init();

Expand Down

0 comments on commit 0e3e4d1

Please sign in to comment.