Skip to content

Commit

Permalink
feat: support horaedb submit compaction task to remote (#1563)
Browse files Browse the repository at this point in the history
## Rationale
The subtask to support compaction offloading. See #1545 

## Detailed Changes
**Compaction node support remote compaction service**

- Define `CompactionServiceImpl` to support compaction rpc service.
- Introduce `NodeType` to distinguish compaction node and horaedb node.
Enable the deployment of compaction node.

- Impl `compaction_client` for horaedb node to access remote compaction
node.

**Horaedb node support compaction offload**

- Introduce `compaction_mode` in analytic engine's `Config` to determine
whether exec compaction offload or not.
- Define `CompactionNodePicker` trait, supporting get remote compaction
node info.
- Impl `RemoteCompactionRunner`, supporting pick remote node and pass
compaction task to the node.
- Add docs (e.g. `example-cluster-n.toml`) to explain how to deploy a
cluster supporting compaction offload.

## Test Plan

---------

Co-authored-by: kamille <[email protected]>
  • Loading branch information
LeslieKid and Rachelint authored Oct 30, 2024
1 parent 60b5217 commit e47d9ae
Show file tree
Hide file tree
Showing 38 changed files with 1,505 additions and 79 deletions.
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ thiserror = "1"
bytes_ext = { path = "src/components/bytes_ext" }
catalog = { path = "src/catalog" }
catalog_impls = { path = "src/catalog_impls" }
horaedbproto = { git = "https://github.com/apache/incubator-horaedb-proto.git", rev = "a5874d9fedee32ab1292252c4eb6defc4f6e245a" }
horaedbproto = { git = "https://github.com/apache/incubator-horaedb-proto.git", rev = "fac8564e6e3d50e51daa2af6eb905e747f3191b0" }
codec = { path = "src/components/codec" }
chrono = "0.4"
clap = { version = "4.5.1", features = ["derive"] }
Expand Down
6 changes: 6 additions & 0 deletions src/analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async-trait = { workspace = true }
atomic_enum = { workspace = true }
base64 = { workspace = true }
bytes_ext = { workspace = true }
cluster = { workspace = true }
codec = { workspace = true }
common_types = { workspace = true }
datafusion = { workspace = true }
Expand All @@ -66,17 +67,20 @@ logger = { workspace = true }
lru = { workspace = true }
macros = { workspace = true }
message_queue = { workspace = true }
meta_client = { workspace = true }
metric_ext = { workspace = true }
object_store = { workspace = true }
parquet = { workspace = true }
parquet_ext = { workspace = true }
prometheus = { workspace = true }
prost = { workspace = true }
remote_engine_client = { workspace = true }
reqwest = { workspace = true }
router = { workspace = true }
runtime = { workspace = true }
sampling_cache = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
size_ext = { workspace = true }
skiplist = { path = "../components/skiplist" }
smallvec = { workspace = true }
Expand All @@ -87,7 +91,9 @@ tempfile = { workspace = true, optional = true }
thiserror = { workspace = true }
time_ext = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
trace_metric = { workspace = true }
url = "2.2"
wal = { workspace = true }
xorfilter-rs = { workspace = true }

Expand Down
91 changes: 84 additions & 7 deletions src/analytic_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
use std::{collections::HashMap, fmt, str::FromStr, sync::Arc};

use common_types::COMPACTION_STRATEGY;
use generic_error::{BoxError, GenericError};
use macros::define_result;
use serde::{Deserialize, Serialize};
use size_ext::ReadableSize;
use snafu::{ensure, Backtrace, GenerateBacktrace, ResultExt, Snafu};
use snafu::{ensure, Backtrace, GenerateBacktrace, OptionExt, ResultExt, Snafu};
use time_ext::TimeUnit;
use tokio::sync::oneshot;

use crate::{
compaction::picker::{CommonCompactionPicker, CompactionPickerRef},
sst::file::{FileHandle, Level},
sst::file::{FileHandle, FileMeta, FilePurgeQueue, Level},
table::data::TableDataRef,
};

Expand Down Expand Up @@ -72,8 +74,22 @@ pub enum Error {
},
#[snafu(display("Invalid compaction option value, err: {}", error))]
InvalidOption { error: String, backtrace: Backtrace },

#[snafu(display("Empty file meta.\nBacktrace:\n{}", backtrace))]
EmptyFileMeta { backtrace: Backtrace },

#[snafu(display("Failed to convert file meta, err:{}", source))]
ConvertFileMeta { source: GenericError },

#[snafu(display("Empty purge queue.\nBacktrace:\n{}", backtrace))]
EmptyPurgeQueue { backtrace: Backtrace },

#[snafu(display("Failed to convert level, err:{}", source))]
ConvertLevel { source: GenericError },
}

define_result!(Error);

#[derive(Debug, Clone, Copy, Deserialize, Default, PartialEq, Serialize)]
pub enum CompactionStrategy {
#[default]
Expand Down Expand Up @@ -145,7 +161,7 @@ impl CompactionStrategy {
pub(crate) fn parse_from(
value: &str,
options: &HashMap<String, String>,
) -> Result<CompactionStrategy, Error> {
) -> Result<CompactionStrategy> {
match value.trim().to_lowercase().as_str() {
DEFAULT_STRATEGY => Ok(CompactionStrategy::Default),
STC_STRATEGY => Ok(CompactionStrategy::SizeTiered(
Expand Down Expand Up @@ -182,7 +198,7 @@ impl CompactionStrategy {
}

impl SizeTieredCompactionOptions {
pub(crate) fn validate(&self) -> Result<(), Error> {
pub(crate) fn validate(&self) -> Result<()> {
ensure!(
self.bucket_high > self.bucket_low,
InvalidOption {
Expand Down Expand Up @@ -215,7 +231,7 @@ impl SizeTieredCompactionOptions {

pub(crate) fn parse_from(
options: &HashMap<String, String>,
) -> Result<SizeTieredCompactionOptions, Error> {
) -> Result<SizeTieredCompactionOptions> {
let mut opts = SizeTieredCompactionOptions::default();
if let Some(v) = options.get(BUCKET_LOW_KEY) {
opts.bucket_low = v.parse().context(ParseFloat {
Expand Down Expand Up @@ -278,7 +294,7 @@ impl TimeWindowCompactionOptions {
);
}

pub(crate) fn validate(&self) -> Result<(), Error> {
pub(crate) fn validate(&self) -> Result<()> {
if !Self::valid_timestamp_unit(self.timestamp_resolution) {
return InvalidOption {
error: format!(
Expand All @@ -294,7 +310,7 @@ impl TimeWindowCompactionOptions {

pub(crate) fn parse_from(
options: &HashMap<String, String>,
) -> Result<TimeWindowCompactionOptions, Error> {
) -> Result<TimeWindowCompactionOptions> {
let mut opts = TimeWindowCompactionOptions {
size_tiered: SizeTieredCompactionOptions::parse_from(options)?,
..Default::default()
Expand Down Expand Up @@ -326,6 +342,67 @@ pub struct CompactionInputFiles {
pub output_level: Level,
}

impl TryFrom<horaedbproto::compaction_service::CompactionInputFiles> for CompactionInputFiles {
type Error = Error;

fn try_from(value: horaedbproto::compaction_service::CompactionInputFiles) -> Result<Self> {
let level: Level = value.level.try_into().box_err().context(ConvertLevel)?;
let output_level: Level = value
.output_level
.try_into()
.box_err()
.context(ConvertLevel)?;

let mut files: Vec<FileHandle> = Vec::with_capacity(value.files.len());
for file in value.files {
let meta: FileMeta = file
.meta
.context(EmptyFileMeta)?
.try_into()
.box_err()
.context(ConvertFileMeta)?;

let purge_queue: FilePurgeQueue = file.purge_queue.context(EmptyPurgeQueue)?.into();

files.push({
let handle = FileHandle::new(meta, purge_queue);
handle.set_being_compacted(file.being_compacted);
handle
});
}

Ok(CompactionInputFiles {
level,
files,
output_level,
})
}
}

impl From<CompactionInputFiles> for horaedbproto::compaction_service::CompactionInputFiles {
fn from(value: CompactionInputFiles) -> Self {
let mut files = Vec::with_capacity(value.files.len());
for file in value.files {
let handle = horaedbproto::compaction_service::FileHandle {
meta: Some(file.meta().into()),
purge_queue: Some(horaedbproto::compaction_service::FilePurgeQueue {
space_id: file.space_id(),
table_id: file.table_id().into(),
}),
being_compacted: file.being_compacted(),
metrics: Some(horaedbproto::compaction_service::SstMetrics {}),
};
files.push(handle);
}

Self {
level: value.level.as_u32(),
files,
output_level: value.output_level.as_u32(),
}
}
}

#[derive(Debug, Default, Clone)]
pub struct ExpiredFiles {
/// Level of the expired files.
Expand Down
1 change: 1 addition & 0 deletions src/analytic_engine/src/compaction/runner/local_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::{
const MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ: usize = 64;

/// Executor carrying for actual compaction work
#[derive(Clone)]
pub struct LocalCompactionRunner {
runtime: Arc<Runtime>,
scan_options: ScanOptions,
Expand Down
Loading

0 comments on commit e47d9ae

Please sign in to comment.