Skip to content

Commit

Permalink
feat: add compaction runner (#1609)
Browse files Browse the repository at this point in the history
## Rationale
Compaction runner is responsible for compact old sst & delete expired
sst.

## Detailed Changes


## Test Plan
CI
  • Loading branch information
jiacai2050 authored Dec 17, 2024
1 parent 4650d36 commit 1331e0a
Show file tree
Hide file tree
Showing 11 changed files with 475 additions and 188 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ tracing = "0.1"
tracing-subscriber = "0.3"
async-scoped = { version = "0.9.0", features = ["use-tokio"] }
test-log = "0.2"
uuid = { version = "1" }

# This profile optimizes for good runtime performance.
[profile.release]
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ udeps:

clippy:
cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D warnings -D clippy::dbg-macro \
-A dead_code -A unused_variables -A clippy::unreachable # Remove these once we have a clean build
-A dead_code -A unused_variables -A clippy::unreachable -A clippy::too_many_arguments # Remove these once we have a clean build

ensure-disk-quota:
bash ./scripts/free-disk-space.sh
Expand Down
1 change: 1 addition & 0 deletions src/metric_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ prost = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] }

[dev-dependencies]
temp-dir = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions src/metric_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

mod picker;
mod runner;
mod scheduler;

pub use scheduler::{Scheduler as CompactionScheduler, SchedulerConfig};
Expand Down
172 changes: 172 additions & 0 deletions src/metric_engine/src/compaction/runner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use anyhow::Context;
use arrow::array::{RecordBatch, UInt64Array};
use async_scoped::TokioScope;
use datafusion::{execution::TaskContext, physical_plan::execute_stream};
use futures::StreamExt;
use object_store::path::Path;
use parquet::{
arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter},
file::properties::WriterProperties,
};
use tracing::error;

use crate::{
compaction::Task,
manifest::ManifestRef,
read::ParquetReader,
sst::{allocate_id, FileMeta, SstPathGenerator},
types::{ObjectStoreRef, StorageSchema},
Result,
};

#[derive(Clone)]
pub struct Runner {
store: ObjectStoreRef,
schema: StorageSchema,
manifest: ManifestRef,
sst_path_gen: Arc<SstPathGenerator>,
parquet_reader: Arc<ParquetReader>,
write_props: WriterProperties,
}

impl Runner {
pub fn new(
store: ObjectStoreRef,
schema: StorageSchema,
manifest: ManifestRef,
sst_path_gen: Arc<SstPathGenerator>,
parquet_reader: Arc<ParquetReader>,
write_props: WriterProperties,
) -> Self {
Self {
store,
schema,
manifest,
sst_path_gen,
parquet_reader,
write_props,
}
}

// TODO: Merge input sst files into one new sst file
// and delete the expired sst files
pub async fn do_compaction(&self, task: Task) -> Result<()> {
assert!(!task.inputs.is_empty());
for f in &task.inputs {
assert!(f.is_compaction());
}
for f in &task.expireds {
assert!(f.is_compaction());
}
let mut time_range = task.inputs[0].meta().time_range.clone();
for f in &task.inputs[1..] {
time_range.merge(&f.meta().time_range);
}
let plan = self
.parquet_reader
.build_df_plan(task.inputs.clone(), None, Vec::new())?;
let mut stream = execute_stream(plan, Arc::new(TaskContext::default()))
.context("execute datafusion plan")?;

let file_id = allocate_id();
let file_path = self.sst_path_gen.generate(file_id);
let file_path = Path::from(file_path);
let object_store_writer = ParquetObjectWriter::new(self.store.clone(), file_path.clone());
let mut writer = AsyncArrowWriter::try_new(
object_store_writer,
self.schema.arrow_schema.clone(),
Some(self.write_props.clone()),
)
.context("create arrow writer")?;
let mut num_rows = 0;
// TODO: support multi-part write
while let Some(batch) = stream.next().await {
let batch = batch.context("execute plan")?;
num_rows += batch.num_rows();
let batch_with_seq = {
let mut new_cols = batch.columns().to_vec();
// Since file_id in increasing order, we can use it as sequence.
let seq_column = Arc::new(UInt64Array::from(vec![file_id; batch.num_rows()]));
new_cols.push(seq_column);
RecordBatch::try_new(self.schema.arrow_schema.clone(), new_cols)
.context("construct record batch with seq column")?
};

writer.write(&batch_with_seq).await.context("write batch")?;
}
writer.close().await.context("close writer")?;
let object_meta = self
.store
.head(&file_path)
.await
.context("get object meta")?;
let file_meta = FileMeta {
max_sequence: file_id,
num_rows: num_rows as u32,
size: object_meta.size as u32,
time_range: time_range.clone(),
};
// First add new sst to manifest, then delete expired/old sst
self.manifest.add_file(file_id, file_meta).await?;
self.manifest
.add_tombstone_files(task.expireds.clone())
.await?;
self.manifest
.add_tombstone_files(task.inputs.clone())
.await?;

let (_, results) = TokioScope::scope_and_block(|scope| {
for file in task.expireds {
let path = Path::from(self.sst_path_gen.generate(file.id()));
scope.spawn(async move {
self.store
.delete(&path)
.await
.with_context(|| format!("failed to delete file, path:{path}"))
});
}
for file in task.inputs {
let path = Path::from(self.sst_path_gen.generate(file.id()));
scope.spawn(async move {
self.store
.delete(&path)
.await
.with_context(|| format!("failed to delete file, path:{path}"))
});
}
});
for res in results {
match res {
Err(e) => {
error!("Failed to join delete task, err:{e}")
}
Ok(v) => {
if let Err(e) = v {
error!("Failed to delete sst, err:{e}")
}
}
}
}

Ok(())
}
}
47 changes: 27 additions & 20 deletions src/metric_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ use std::{
};

use anyhow::Context;
use parquet::file::properties::WriterProperties;
use tokio::{
sync::mpsc::{self, Receiver, Sender},
task::JoinHandle,
time::sleep,
};
use tracing::warn;

use super::runner::Runner;
use crate::{
compaction::{picker::TimeWindowCompactionStrategy, Task},
manifest::ManifestRef,
read::ParquetReader,
sst::SstPathGenerator,
types::{ObjectStoreRef, RuntimeRef},
types::{ObjectStoreRef, RuntimeRef, StorageSchema},
Result,
};

Expand All @@ -50,23 +53,29 @@ impl Scheduler {
runtime: RuntimeRef,
manifest: ManifestRef,
store: ObjectStoreRef,
schema: StorageSchema,
segment_duration: Duration,
sst_path_gen: Arc<SstPathGenerator>,
parquet_reader: Arc<ParquetReader>,
config: SchedulerConfig,
) -> Self {
let (task_tx, task_rx) = mpsc::channel(config.max_pending_compaction_tasks);
let task_handle = {
let rt = runtime.clone();
let store = store.clone();
let manifest = manifest.clone();
let write_props = config.write_props.clone();
runtime.spawn(async move {
Self::recv_task_loop(
rt,
task_rx,
store,
schema,
manifest,
sst_path_gen,
parquet_reader,
config.memory_limit,
write_props,
)
.await;
})
Expand Down Expand Up @@ -99,15 +108,24 @@ impl Scheduler {
rt: RuntimeRef,
mut task_rx: Receiver<Task>,
store: ObjectStoreRef,
schema: StorageSchema,
manifest: ManifestRef,
_sst_path_gen: Arc<SstPathGenerator>,
sst_path_gen: Arc<SstPathGenerator>,
parquet_reader: Arc<ParquetReader>,
_mem_limit: u64,
write_props: WriterProperties,
) {
let runner = Runner::new(
store,
schema,
manifest,
sst_path_gen,
parquet_reader,
write_props,
);
while let Some(task) = task_rx.recv().await {
let store = store.clone();
let manifest = manifest.clone();
let runner = runner.clone();
rt.spawn(async move {
let runner = Runner { store, manifest };
if let Err(e) = runner.do_compaction(task).await {
warn!("Do compaction failed, err:{e}");
}
Expand All @@ -121,8 +139,8 @@ impl Scheduler {
segment_duration: Duration,
config: SchedulerConfig,
) {
let compactor = TimeWindowCompactionStrategy::new(segment_duration, config);
let schedule_interval = config.schedule_interval;
let compactor = TimeWindowCompactionStrategy::new(segment_duration, config);
// TODO: obtain expire time
let expire_time = None;
loop {
Expand All @@ -138,12 +156,13 @@ impl Scheduler {
}
}

#[derive(Clone, Copy)]
#[derive(Clone)]
pub struct SchedulerConfig {
pub schedule_interval: Duration,
pub memory_limit: u64,
pub max_pending_compaction_tasks: usize,
pub compaction_files_limit: usize,
pub write_props: WriterProperties,
}

impl Default for SchedulerConfig {
Expand All @@ -153,19 +172,7 @@ impl Default for SchedulerConfig {
memory_limit: bytesize::gb(2_u64),
max_pending_compaction_tasks: 10,
compaction_files_limit: 10,
write_props: WriterProperties::default(),
}
}
}

pub struct Runner {
store: ObjectStoreRef,
manifest: ManifestRef,
}

impl Runner {
// TODO: Merge input sst files into one new sst file
// and delete the expired sst files
async fn do_compaction(&self, _task: Task) -> Result<()> {
todo!()
}
}
Loading

0 comments on commit 1331e0a

Please sign in to comment.