Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: manifest support delete #1610

Merged
merged 5 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ jobs:
- name: Install check binaries
run: |
cargo install --git https://github.com/DevinR528/cargo-sort --rev 55ec890 --locked
- uses: Swatinem/rust-cache@v2
- name: Run Style Check
run: |
make fmt sort clippy
Expand All @@ -80,15 +81,14 @@ jobs:
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
with:
submodules: true
- name: Release Disk Quota
run: |
sudo make ensure-disk-quota
- name: Setup Build Environment
run: |
sudo apt update
sudo apt install --yes protobuf-compiler
- uses: Swatinem/rust-cache@v2
- name: Run Unit Tests
run: |
make test
Expand Down
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ udeps:
cd $(DIR); cargo udeps --all-targets --all-features --workspace

clippy:
cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D warnings -D clippy::dbg-macro \
-A dead_code -A unused_variables -A clippy::unreachable -A clippy::too_many_arguments # Remove these once we have a clean build
cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D warnings -D clippy::dbg-macro -A clippy::too-many-arguments

ensure-disk-quota:
bash ./scripts/free-disk-space.sh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};

use anyhow::Context;
use arrow::array::{RecordBatch, UInt64Array};
Expand All @@ -31,70 +34,137 @@ use tracing::error;

use crate::{
compaction::Task,
manifest::ManifestRef,
ensure,
manifest::{ManifestRef, ManifestUpdate},
read::ParquetReader,
sst::{allocate_id, FileMeta, SstPathGenerator},
types::{ObjectStoreRef, StorageSchema},
sst::{allocate_id, FileMeta, SstFile, SstPathGenerator},
types::{ObjectStoreRef, RuntimeRef, StorageSchema},
Result,
};

#[derive(Clone)]
pub struct Runner {
pub struct Executor {
inner: Arc<Inner>,
}

struct Inner {
runtime: RuntimeRef,
store: ObjectStoreRef,
schema: StorageSchema,
manifest: ManifestRef,
sst_path_gen: Arc<SstPathGenerator>,
parquet_reader: Arc<ParquetReader>,
write_props: WriterProperties,
inused_memory: AtomicU64,
mem_limit: u64,
}

impl Runner {
impl Executor {
pub fn new(
runtime: RuntimeRef,
store: ObjectStoreRef,
schema: StorageSchema,
manifest: ManifestRef,
sst_path_gen: Arc<SstPathGenerator>,
parquet_reader: Arc<ParquetReader>,
write_props: WriterProperties,
mem_limit: u64,
) -> Self {
Self {
let inner = Inner {
runtime,
store,
schema,
manifest,
sst_path_gen,
parquet_reader,
write_props,
mem_limit,
inused_memory: AtomicU64::new(0),
};
Self {
inner: Arc::new(inner),
}
}

// TODO: Merge input sst files into one new sst file
// and delete the expired sst files
pub async fn do_compaction(&self, task: Task) -> Result<()> {
fn pre_check(&self, task: &Task) -> Result<()> {
assert!(!task.inputs.is_empty());
for f in &task.inputs {
assert!(f.is_compaction());
}
for f in &task.expireds {
assert!(f.is_compaction());
}

let task_size = task.input_size();
let inused = self.inner.inused_memory.load(Ordering::Relaxed);
let mem_limit = self.inner.mem_limit;
ensure!(
inused + task_size > mem_limit,
"Compaction memory usage too high, inused:{inused}, task_size:{task_size}, limit:{mem_limit}"
);

self.inner
.inused_memory
.fetch_add(task.input_size(), Ordering::Relaxed);
Ok(())
}

pub fn on_success(&self, task: &Task) {
let task_size = task.input_size();
self.inner
.inused_memory
.fetch_add(task_size, Ordering::Relaxed);
jiacai2050 marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn on_failure(&self, task: &Task) {
let task_size = task.input_size();
self.inner
.inused_memory
.fetch_sub(task_size, Ordering::Relaxed);

// When task execution fails, unmark sst so they can be
// reschduled.
for sst in &task.inputs {
sst.unmark_compaction();
}
for sst in &task.expireds {
sst.unmark_compaction();
}
}

pub fn submit(&self, task: Task) {
let runnable = Runnable {
executor: self.clone(),
task,
};
runnable.run()
}

// TODO: Merge input sst files into one new sst file
// and delete the expired sst files
pub async fn do_compaction(&self, task: &Task) -> Result<()> {
self.pre_check(task)?;

let mut time_range = task.inputs[0].meta().time_range.clone();
for f in &task.inputs[1..] {
time_range.merge(&f.meta().time_range);
}
let plan = self
.parquet_reader
.build_df_plan(task.inputs.clone(), None, Vec::new())?;
let plan =
self.inner
.parquet_reader
.build_df_plan(task.inputs.clone(), None, Vec::new())?;
let mut stream = execute_stream(plan, Arc::new(TaskContext::default()))
.context("execute datafusion plan")?;

let file_id = allocate_id();
let file_path = self.sst_path_gen.generate(file_id);
let file_path = self.inner.sst_path_gen.generate(file_id);
let file_path = Path::from(file_path);
let object_store_writer = ParquetObjectWriter::new(self.store.clone(), file_path.clone());
let object_store_writer =
ParquetObjectWriter::new(self.inner.store.clone(), file_path.clone());
let mut writer = AsyncArrowWriter::try_new(
object_store_writer,
self.schema.arrow_schema.clone(),
Some(self.write_props.clone()),
self.inner.schema.arrow_schema.clone(),
Some(self.inner.write_props.clone()),
)
.context("create arrow writer")?;
let mut num_rows = 0;
Expand All @@ -107,14 +177,15 @@ impl Runner {
// Since file_id in increasing order, we can use it as sequence.
let seq_column = Arc::new(UInt64Array::from(vec![file_id; batch.num_rows()]));
new_cols.push(seq_column);
RecordBatch::try_new(self.schema.arrow_schema.clone(), new_cols)
RecordBatch::try_new(self.inner.schema.arrow_schema.clone(), new_cols)
.context("construct record batch with seq column")?
};

writer.write(&batch_with_seq).await.context("write batch")?;
}
writer.close().await.context("close writer")?;
let object_meta = self
.inner
.store
.head(&file_path)
.await
Expand All @@ -126,28 +197,37 @@ impl Runner {
time_range: time_range.clone(),
};
// First add new sst to manifest, then delete expired/old sst
self.manifest.add_file(file_id, file_meta).await?;
self.manifest
.add_tombstone_files(task.expireds.clone())
.await?;
self.manifest
.add_tombstone_files(task.inputs.clone())
let to_adds = vec![SstFile::new(file_id, file_meta)];
let to_deletes = task
.expireds
.iter()
.map(|f| f.id())
.chain(task.inputs.iter().map(|f| f.id()))
.collect();
self.inner
.manifest
.update(ManifestUpdate::new(to_adds, to_deletes))
.await?;

// From now on, no error should be returned!
// Because we have already updated manifest.

let (_, results) = TokioScope::scope_and_block(|scope| {
for file in task.expireds {
let path = Path::from(self.sst_path_gen.generate(file.id()));
for file in &task.expireds {
let path = Path::from(self.inner.sst_path_gen.generate(file.id()));
scope.spawn(async move {
self.store
self.inner
.store
.delete(&path)
.await
.with_context(|| format!("failed to delete file, path:{path}"))
});
}
for file in task.inputs {
let path = Path::from(self.sst_path_gen.generate(file.id()));
for file in &task.inputs {
let path = Path::from(self.inner.sst_path_gen.generate(file.id()));
scope.spawn(async move {
self.store
self.inner
.store
.delete(&path)
.await
.with_context(|| format!("failed to delete file, path:{path}"))
Expand All @@ -170,3 +250,22 @@ impl Runner {
Ok(())
}
}

pub struct Runnable {
executor: Executor,
task: Task,
}

impl Runnable {
fn run(self) {
let rt = self.executor.inner.runtime.clone();
rt.spawn(async move {
if let Err(e) = self.executor.do_compaction(&self.task).await {
error!("Do compaction failed, err:{e}");
self.executor.on_failure(&self.task);
} else {
self.executor.on_success(&self.task);
}
});
}
}
13 changes: 7 additions & 6 deletions src/metric_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@
// specific language governing permissions and limitations
// under the License.

mod executor;
mod picker;
mod runner;
mod scheduler;

pub use scheduler::{Scheduler as CompactionScheduler, SchedulerConfig};

use crate::sst::SstFile;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Input {
files: Vec<SstFile>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Task {
pub inputs: Vec<SstFile>,
pub expireds: Vec<SstFile>,
}

impl Task {
pub fn input_size(&self) -> u64 {
self.inputs.iter().map(|f| f.size() as u64).sum()
}
}
Loading
Loading