diff --git a/Cargo.lock b/Cargo.lock index 33ed54bcd5..1830a3fb05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -89,6 +89,7 @@ dependencies = [ "atomic_enum", "base64 0.13.1", "bytes_ext", + "cluster", "codec", "common_types", "datafusion", @@ -107,6 +108,7 @@ dependencies = [ "lru 0.7.8", "macros", "message_queue", + "meta_client", "metric_ext", "object_store 2.1.0", "parquet", @@ -116,10 +118,12 @@ dependencies = [ "prost 0.11.8", "rand 0.8.5", "remote_engine_client", + "reqwest 0.12.4", "router", "runtime", "sampling_cache", "serde", + "serde_json", "size_ext", "skiplist", "smallvec", @@ -131,7 +135,9 @@ dependencies = [ "thiserror", "time_ext", "tokio", + "tonic 0.8.3", "trace_metric", + "url", "wal", "xorfilter-rs", ] @@ -3150,6 +3156,7 @@ dependencies = [ "catalog_impls", "clap", "cluster", + "common_types", "datafusion", "df_operator", "etcd-client", @@ -3223,7 +3230,7 @@ dependencies = [ [[package]] name = "horaedbproto" version = "2.0.0" -source = "git+https://github.com/apache/incubator-horaedb-proto.git?rev=a5874d9fedee32ab1292252c4eb6defc4f6e245a#a5874d9fedee32ab1292252c4eb6defc4f6e245a" +source = "git+https://github.com/apache/incubator-horaedb-proto.git?rev=fac8564e6e3d50e51daa2af6eb905e747f3191b0#fac8564e6e3d50e51daa2af6eb905e747f3191b0" dependencies = [ "prost 0.11.8", "protoc-bin-vendored", diff --git a/Cargo.toml b/Cargo.toml index d2d73fd0ab..b6ca6273dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,7 +103,7 @@ thiserror = "1" bytes_ext = { path = "src/components/bytes_ext" } catalog = { path = "src/catalog" } catalog_impls = { path = "src/catalog_impls" } -horaedbproto = { git = "https://github.com/apache/incubator-horaedb-proto.git", rev = "a5874d9fedee32ab1292252c4eb6defc4f6e245a" } +horaedbproto = { git = "https://github.com/apache/incubator-horaedb-proto.git", rev = "fac8564e6e3d50e51daa2af6eb905e747f3191b0" } codec = { path = "src/components/codec" } chrono = "0.4" clap = { version = "4.5.1", features = ["derive"] } diff --git a/src/analytic_engine/Cargo.toml b/src/analytic_engine/Cargo.toml index 09ff47af21..d6c642eb75 100644 --- a/src/analytic_engine/Cargo.toml +++ b/src/analytic_engine/Cargo.toml @@ -49,6 +49,7 @@ async-trait = { workspace = true } atomic_enum = { workspace = true } base64 = { workspace = true } bytes_ext = { workspace = true } +cluster = { workspace = true } codec = { workspace = true } common_types = { workspace = true } datafusion = { workspace = true } @@ -66,6 +67,7 @@ logger = { workspace = true } lru = { workspace = true } macros = { workspace = true } message_queue = { workspace = true } +meta_client = { workspace = true } metric_ext = { workspace = true } object_store = { workspace = true } parquet = { workspace = true } @@ -73,10 +75,12 @@ parquet_ext = { workspace = true } prometheus = { workspace = true } prost = { workspace = true } remote_engine_client = { workspace = true } +reqwest = { workspace = true } router = { workspace = true } runtime = { workspace = true } sampling_cache = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } size_ext = { workspace = true } skiplist = { path = "../components/skiplist" } smallvec = { workspace = true } @@ -87,7 +91,9 @@ tempfile = { workspace = true, optional = true } thiserror = { workspace = true } time_ext = { workspace = true } tokio = { workspace = true } +tonic = { workspace = true } trace_metric = { workspace = true } +url = "2.2" wal = { workspace = true } xorfilter-rs = { workspace = true } diff --git a/src/analytic_engine/src/compaction/mod.rs b/src/analytic_engine/src/compaction/mod.rs index 34048d6b35..8f63c93ece 100644 --- a/src/analytic_engine/src/compaction/mod.rs +++ b/src/analytic_engine/src/compaction/mod.rs @@ -20,15 +20,17 @@ use std::{collections::HashMap, fmt, str::FromStr, sync::Arc}; use common_types::COMPACTION_STRATEGY; +use generic_error::{BoxError, GenericError}; +use macros::define_result; use serde::{Deserialize, Serialize}; use size_ext::ReadableSize; -use snafu::{ensure, Backtrace, GenerateBacktrace, ResultExt, Snafu}; +use snafu::{ensure, Backtrace, GenerateBacktrace, OptionExt, ResultExt, Snafu}; use time_ext::TimeUnit; use tokio::sync::oneshot; use crate::{ compaction::picker::{CommonCompactionPicker, CompactionPickerRef}, - sst::file::{FileHandle, Level}, + sst::file::{FileHandle, FileMeta, FilePurgeQueue, Level}, table::data::TableDataRef, }; @@ -72,8 +74,22 @@ pub enum Error { }, #[snafu(display("Invalid compaction option value, err: {}", error))] InvalidOption { error: String, backtrace: Backtrace }, + + #[snafu(display("Empty file meta.\nBacktrace:\n{}", backtrace))] + EmptyFileMeta { backtrace: Backtrace }, + + #[snafu(display("Failed to convert file meta, err:{}", source))] + ConvertFileMeta { source: GenericError }, + + #[snafu(display("Empty purge queue.\nBacktrace:\n{}", backtrace))] + EmptyPurgeQueue { backtrace: Backtrace }, + + #[snafu(display("Failed to convert level, err:{}", source))] + ConvertLevel { source: GenericError }, } +define_result!(Error); + #[derive(Debug, Clone, Copy, Deserialize, Default, PartialEq, Serialize)] pub enum CompactionStrategy { #[default] @@ -145,7 +161,7 @@ impl CompactionStrategy { pub(crate) fn parse_from( value: &str, options: &HashMap, - ) -> Result { + ) -> Result { match value.trim().to_lowercase().as_str() { DEFAULT_STRATEGY => Ok(CompactionStrategy::Default), STC_STRATEGY => Ok(CompactionStrategy::SizeTiered( @@ -182,7 +198,7 @@ impl CompactionStrategy { } impl SizeTieredCompactionOptions { - pub(crate) fn validate(&self) -> Result<(), Error> { + pub(crate) fn validate(&self) -> Result<()> { ensure!( self.bucket_high > self.bucket_low, InvalidOption { @@ -215,7 +231,7 @@ impl SizeTieredCompactionOptions { pub(crate) fn parse_from( options: &HashMap, - ) -> Result { + ) -> Result { let mut opts = SizeTieredCompactionOptions::default(); if let Some(v) = options.get(BUCKET_LOW_KEY) { opts.bucket_low = v.parse().context(ParseFloat { @@ -278,7 +294,7 @@ impl TimeWindowCompactionOptions { ); } - pub(crate) fn validate(&self) -> Result<(), Error> { + pub(crate) fn validate(&self) -> Result<()> { if !Self::valid_timestamp_unit(self.timestamp_resolution) { return InvalidOption { error: format!( @@ -294,7 +310,7 @@ impl TimeWindowCompactionOptions { pub(crate) fn parse_from( options: &HashMap, - ) -> Result { + ) -> Result { let mut opts = TimeWindowCompactionOptions { size_tiered: SizeTieredCompactionOptions::parse_from(options)?, ..Default::default() @@ -326,6 +342,67 @@ pub struct CompactionInputFiles { pub output_level: Level, } +impl TryFrom for CompactionInputFiles { + type Error = Error; + + fn try_from(value: horaedbproto::compaction_service::CompactionInputFiles) -> Result { + let level: Level = value.level.try_into().box_err().context(ConvertLevel)?; + let output_level: Level = value + .output_level + .try_into() + .box_err() + .context(ConvertLevel)?; + + let mut files: Vec = Vec::with_capacity(value.files.len()); + for file in value.files { + let meta: FileMeta = file + .meta + .context(EmptyFileMeta)? + .try_into() + .box_err() + .context(ConvertFileMeta)?; + + let purge_queue: FilePurgeQueue = file.purge_queue.context(EmptyPurgeQueue)?.into(); + + files.push({ + let handle = FileHandle::new(meta, purge_queue); + handle.set_being_compacted(file.being_compacted); + handle + }); + } + + Ok(CompactionInputFiles { + level, + files, + output_level, + }) + } +} + +impl From for horaedbproto::compaction_service::CompactionInputFiles { + fn from(value: CompactionInputFiles) -> Self { + let mut files = Vec::with_capacity(value.files.len()); + for file in value.files { + let handle = horaedbproto::compaction_service::FileHandle { + meta: Some(file.meta().into()), + purge_queue: Some(horaedbproto::compaction_service::FilePurgeQueue { + space_id: file.space_id(), + table_id: file.table_id().into(), + }), + being_compacted: file.being_compacted(), + metrics: Some(horaedbproto::compaction_service::SstMetrics {}), + }; + files.push(handle); + } + + Self { + level: value.level.as_u32(), + files, + output_level: value.output_level.as_u32(), + } + } +} + #[derive(Debug, Default, Clone)] pub struct ExpiredFiles { /// Level of the expired files. diff --git a/src/analytic_engine/src/compaction/runner/local_runner.rs b/src/analytic_engine/src/compaction/runner/local_runner.rs index fc34b2bfa6..e379d78544 100644 --- a/src/analytic_engine/src/compaction/runner/local_runner.rs +++ b/src/analytic_engine/src/compaction/runner/local_runner.rs @@ -45,6 +45,7 @@ use crate::{ const MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ: usize = 64; /// Executor carrying for actual compaction work +#[derive(Clone)] pub struct LocalCompactionRunner { runtime: Arc, scan_options: ScanOptions, diff --git a/src/analytic_engine/src/compaction/runner/mod.rs b/src/analytic_engine/src/compaction/runner/mod.rs index 12f333eac3..c8e34484cc 100644 --- a/src/analytic_engine/src/compaction/runner/mod.rs +++ b/src/analytic_engine/src/compaction/runner/mod.rs @@ -16,17 +16,23 @@ // under the License. pub mod local_runner; +pub mod node_picker; +mod remote_client; +pub mod remote_runner; use std::sync::Arc; use async_trait::async_trait; use common_types::{request_id::RequestId, schema::Schema, SequenceNumber}; +use generic_error::{BoxError, GenericError}; +use macros::define_result; use object_store::Path; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::table::TableId; use crate::{ compaction::CompactionInputFiles, - instance::flush_compaction::Result, + instance::flush_compaction, row_iter::IterOptions, space::SpaceId, sst::{ @@ -39,12 +45,87 @@ use crate::{ /// Compaction runner #[async_trait] pub trait CompactionRunner: Send + Sync + 'static { - async fn run(&self, task: CompactionRunnerTask) -> Result; + async fn run( + &self, + task: CompactionRunnerTask, + ) -> flush_compaction::Result; } pub type CompactionRunnerPtr = Box; pub type CompactionRunnerRef = Arc; +#[derive(Debug, Snafu)] +#[snafu(visibility = "pub")] +pub enum Error { + #[snafu(display("Empty table schema.\nBacktrace:\n{}", backtrace))] + EmptyTableSchema { backtrace: Backtrace }, + + #[snafu(display("Empty input context.\nBacktrace:\n{}", backtrace))] + EmptyInputContext { backtrace: Backtrace }, + + #[snafu(display("Empty ouput context.\nBacktrace:\n{}", backtrace))] + EmptyOuputContext { backtrace: Backtrace }, + + #[snafu(display("Empty compaction input files.\nBacktrace:\n{}", backtrace))] + EmptyCompactionInputFiles { backtrace: Backtrace }, + + #[snafu(display("Empty write options.\nBacktrace:\n{}", backtrace))] + EmptySstWriteOptions { backtrace: Backtrace }, + + #[snafu(display("Sst meta data is empty.\nBacktrace:\n{backtrace}"))] + EmptySstMeta { backtrace: Backtrace }, + + #[snafu(display("Empty sst info.\nBacktrace:\n{}", backtrace))] + EmptySstInfo { backtrace: Backtrace }, + + #[snafu(display("Empty compaction task exec result.\nBacktrace:\n{}", backtrace))] + EmptyExecResult { backtrace: Backtrace }, + + #[snafu(display("Failed to convert table schema, err:{}", source))] + ConvertTableSchema { source: GenericError }, + + #[snafu(display("Failed to convert input context, err:{}", source))] + ConvertInputContext { source: GenericError }, + + #[snafu(display("Failed to convert ouput context, err:{}", source))] + ConvertOuputContext { source: GenericError }, + + #[snafu(display("Failed to convert compaction input files, err:{}", source))] + ConvertCompactionInputFiles { source: GenericError }, + + #[snafu(display("Failed to convert write options, err:{}", source))] + ConvertSstWriteOptions { source: GenericError }, + + #[snafu(display("Failed to convert sst info, err:{}", source))] + ConvertSstInfo { source: GenericError }, + + #[snafu(display("Failed to convert sst meta, err:{}", source))] + ConvertSstMeta { source: GenericError }, + + #[snafu(display("Failed to connect the service endpoint:{}, err:{}", addr, source,))] + FailConnect { addr: String, source: GenericError }, + + #[snafu(display("Failed to execute compaction task, err:{}", source))] + FailExecuteCompactionTask { source: GenericError }, + + #[snafu(display("Missing header in rpc response.\nBacktrace:\n{}", backtrace))] + MissingHeader { backtrace: Backtrace }, + + #[snafu(display( + "Bad response, resp code:{}, msg:{}.\nBacktrace:\n{}", + code, + msg, + backtrace + ))] + BadResponse { + code: u32, + msg: String, + backtrace: Backtrace, + }, +} + +define_result!(Error); + /// Compaction runner task #[derive(Debug, Clone)] pub struct CompactionRunnerTask { @@ -113,12 +194,106 @@ impl CompactionRunnerTask { } } +impl TryFrom + for CompactionRunnerTask +{ + type Error = Error; + + fn try_from( + request: horaedbproto::compaction_service::ExecuteCompactionTaskRequest, + ) -> Result { + let task_key = request.task_key; + let request_id: RequestId = request.request_id.into(); + + let schema: Schema = request + .schema + .context(EmptyTableSchema)? + .try_into() + .box_err() + .context(ConvertTableSchema)?; + + let space_id: SpaceId = request.space_id; + let table_id: TableId = request.table_id.into(); + let sequence: SequenceNumber = request.sequence; + + let input_ctx: InputContext = request + .input_ctx + .context(EmptyInputContext)? + .try_into() + .box_err() + .context(ConvertInputContext)?; + + let output_ctx: OutputContext = request + .output_ctx + .context(EmptyOuputContext)? + .try_into() + .box_err() + .context(ConvertOuputContext)?; + + Ok(Self { + task_key, + request_id, + schema, + space_id, + table_id, + sequence, + input_ctx, + output_ctx, + }) + } +} + +impl From for horaedbproto::compaction_service::ExecuteCompactionTaskRequest { + fn from(task: CompactionRunnerTask) -> Self { + Self { + task_key: task.task_key, + request_id: task.request_id.into(), + schema: Some((&(task.schema)).into()), + space_id: task.space_id, + table_id: task.table_id.into(), + sequence: task.sequence, + input_ctx: Some(task.input_ctx.into()), + output_ctx: Some(task.output_ctx.into()), + } + } +} + pub struct CompactionRunnerResult { pub output_file_path: Path, pub sst_info: SstInfo, pub sst_meta: MetaData, } +impl TryFrom + for CompactionRunnerResult +{ + type Error = Error; + + fn try_from( + resp: horaedbproto::compaction_service::ExecuteCompactionTaskResponse, + ) -> Result { + let res = resp.result.context(EmptyExecResult)?; + let sst_info = res + .sst_info + .context(EmptySstInfo)? + .try_into() + .box_err() + .context(ConvertSstInfo)?; + let sst_meta = res + .sst_meta + .context(EmptySstMeta)? + .try_into() + .box_err() + .context(ConvertSstMeta)?; + + Ok(Self { + output_file_path: res.output_file_path.into(), + sst_info, + sst_meta, + }) + } +} + #[derive(Debug, Clone)] pub struct InputContext { /// Input sst files in this compaction @@ -128,6 +303,43 @@ pub struct InputContext { pub need_dedup: bool, } +impl TryFrom for InputContext { + type Error = Error; + + fn try_from(value: horaedbproto::compaction_service::InputContext) -> Result { + let num_rows_per_row_group: usize = value.num_rows_per_row_group as usize; + let merge_iter_options = IterOptions { + batch_size: value.merge_iter_options as usize, + }; + let need_dedup = value.need_dedup; + + let files: CompactionInputFiles = value + .files + .context(EmptyCompactionInputFiles)? + .try_into() + .box_err() + .context(ConvertCompactionInputFiles)?; + + Ok(InputContext { + files, + num_rows_per_row_group, + merge_iter_options, + need_dedup, + }) + } +} + +impl From for horaedbproto::compaction_service::InputContext { + fn from(value: InputContext) -> Self { + Self { + files: Some(value.files.into()), + num_rows_per_row_group: value.num_rows_per_row_group as u64, + merge_iter_options: value.merge_iter_options.batch_size as u64, + need_dedup: value.need_dedup, + } + } +} + #[derive(Debug, Clone)] pub struct OutputContext { /// Output sst file path @@ -135,3 +347,31 @@ pub struct OutputContext { /// Output sst write context pub write_options: SstWriteOptions, } + +impl TryFrom for OutputContext { + type Error = Error; + + fn try_from(value: horaedbproto::compaction_service::OutputContext) -> Result { + let file_path: Path = value.file_path.into(); + let write_options: SstWriteOptions = value + .write_options + .context(EmptySstWriteOptions)? + .try_into() + .box_err() + .context(ConvertSstWriteOptions)?; + + Ok(OutputContext { + file_path, + write_options, + }) + } +} + +impl From for horaedbproto::compaction_service::OutputContext { + fn from(value: OutputContext) -> Self { + Self { + file_path: value.file_path.into(), + write_options: Some(value.write_options.into()), + } + } +} diff --git a/src/analytic_engine/src/compaction/runner/node_picker.rs b/src/analytic_engine/src/compaction/runner/node_picker.rs new file mode 100644 index 0000000000..bf21787c71 --- /dev/null +++ b/src/analytic_engine/src/compaction/runner/node_picker.rs @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Remote compaction node picker. + +use std::sync::Arc; + +use async_trait::async_trait; +use macros::define_result; +use meta_client::{types::FetchCompactionNodeRequest, MetaClientRef}; +use serde::{Deserialize, Serialize}; +use snafu::{ResultExt, Snafu}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(tag = "node_picker", content = "endpoint")] +pub enum NodePicker { + // Local node picker that specifies the local endpoint. + // The endpoint in the form `addr:port`. + Local(String), + Remote, +} + +#[async_trait] +pub trait CompactionNodePicker: Send + Sync { + /// Get the addr of the remote compaction node. + async fn get_compaction_node(&self) -> Result; +} + +pub type RemoteCompactionNodePickerRef = Arc; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Meta client fetch compaciton node failed, err:{source}."))] + FetchCompactionNodeFailure { source: meta_client::Error }, +} + +define_result!(Error); + +/// RemoteCompactionNodePickerImpl is an implementation of +/// [`CompactionNodePicker`] based [`MetaClient`]. +pub struct RemoteCompactionNodePickerImpl { + pub meta_client: MetaClientRef, +} + +#[async_trait] +impl CompactionNodePicker for RemoteCompactionNodePickerImpl { + /// Get proper remote compaction node info for compaction offload with meta + /// client. + async fn get_compaction_node(&self) -> Result { + let req = FetchCompactionNodeRequest::default(); + let resp = self + .meta_client + .fetch_compaction_node(req) + .await + .context(FetchCompactionNodeFailure)?; + + let compaction_node_addr = resp.endpoint; + Ok(compaction_node_addr) + } +} + +/// LocalCompactionNodePickerImpl is an implementation of +/// [`CompactionNodePicker`] mainly used for testing. +pub struct LocalCompactionNodePickerImpl { + pub endpoint: String, +} + +#[async_trait] +impl CompactionNodePicker for LocalCompactionNodePickerImpl { + /// Return the local addr and port of grpc service. + async fn get_compaction_node(&self) -> Result { + Ok(self.endpoint.clone()) + } +} diff --git a/src/analytic_engine/src/compaction/runner/remote_client.rs b/src/analytic_engine/src/compaction/runner/remote_client.rs new file mode 100644 index 0000000000..cf1f69be4a --- /dev/null +++ b/src/analytic_engine/src/compaction/runner/remote_client.rs @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use generic_error::BoxError; +use horaedbproto::{ + common::ResponseHeader, compaction_service::compaction_service_client::CompactionServiceClient, +}; +use logger::info; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; +use time_ext::ReadableDuration; + +use crate::compaction::runner::{ + BadResponse, FailConnect, FailExecuteCompactionTask, MissingHeader, Result, +}; + +type CompactionServiceGrpcClient = CompactionServiceClient; + +#[derive(Debug, Deserialize, Clone, Serialize)] +#[serde(default)] +pub struct CompactionClientConfig { + pub compaction_server_addr: String, + pub timeout: ReadableDuration, +} + +impl Default for CompactionClientConfig { + fn default() -> Self { + Self { + compaction_server_addr: "127.0.0.1:7878".to_string(), + timeout: ReadableDuration::secs(5), + } + } +} + +/// CompactionClient is the abstraction of client used for HoraeDB to +/// communicate with CompactionServer cluster. +#[async_trait] +pub trait CompactionClient: Send + Sync { + async fn execute_compaction_task( + &self, + req: horaedbproto::compaction_service::ExecuteCompactionTaskRequest, + ) -> Result; +} + +pub type CompactionClientRef = Arc; + +/// Default compaction client impl, will interact with the remote compaction +/// node. +pub struct CompactionClientImpl { + client: CompactionServiceGrpcClient, +} + +impl CompactionClientImpl { + pub async fn connect(config: CompactionClientConfig) -> Result { + let client = { + let endpoint = + tonic::transport::Endpoint::from_shared(config.compaction_server_addr.to_string()) + .box_err() + .context(FailConnect { + addr: &config.compaction_server_addr, + })? + .timeout(config.timeout.0); + CompactionServiceGrpcClient::connect(endpoint) + .await + .box_err() + .context(FailConnect { + addr: &config.compaction_server_addr, + })? + }; + + Ok(Self { client }) + } + + #[inline] + fn client(&self) -> CompactionServiceGrpcClient { + self.client.clone() + } +} + +#[async_trait] +impl CompactionClient for CompactionClientImpl { + async fn execute_compaction_task( + &self, + pb_req: horaedbproto::compaction_service::ExecuteCompactionTaskRequest, + ) -> Result { + // TODO(leslie): Add request header for ExecuteCompactionTaskRequest. + + info!( + "Compaction client try to execute compaction task in remote compaction node, req:{:?}", + pb_req + ); + + let pb_resp = self + .client() + .execute_compaction_task(pb_req) + .await + .box_err() + .context(FailExecuteCompactionTask)? + .into_inner(); + + info!( + "Compaction client finish executing compaction task in remote compaction node, req:{:?}", + pb_resp + ); + + check_response_header(&pb_resp.header)?; + Ok(pb_resp) + } +} + +// TODO(leslie): Consider to refactor and reuse the similar function in +// meta_client. +fn check_response_header(header: &Option) -> Result<()> { + let header = header.as_ref().context(MissingHeader)?; + if header.code == 0 { + Ok(()) + } else { + BadResponse { + code: header.code, + msg: header.error.clone(), + } + .fail() + } +} + +pub async fn build_compaction_client( + config: CompactionClientConfig, +) -> Result { + let compaction_client = CompactionClientImpl::connect(config).await?; + Ok(Arc::new(compaction_client)) +} diff --git a/src/analytic_engine/src/compaction/runner/remote_runner.rs b/src/analytic_engine/src/compaction/runner/remote_runner.rs new file mode 100644 index 0000000000..59a70c2fc2 --- /dev/null +++ b/src/analytic_engine/src/compaction/runner/remote_runner.rs @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use generic_error::BoxError; +use logger::info; +use snafu::ResultExt; + +use super::{local_runner::LocalCompactionRunner, node_picker::RemoteCompactionNodePickerRef}; +use crate::{ + compaction::runner::{ + remote_client::{build_compaction_client, CompactionClientConfig, CompactionClientRef}, + CompactionRunner, CompactionRunnerResult, CompactionRunnerTask, + }, + instance::flush_compaction::{ + self, BuildCompactionClientFailed, ConvertCompactionTaskResponse, + GetCompactionClientFailed, PickCompactionNodeFailed, Result, + }, +}; + +pub struct RemoteCompactionRunner { + pub node_picker: RemoteCompactionNodePickerRef, + + pub fallback_local_when_failed: bool, + /// Responsible for executing compaction task locally if fail to remote + /// compact when `fallback_local_when_failed` is true, used for better fault + /// tolerance. + pub local_compaction_runner: LocalCompactionRunner, +} + +impl RemoteCompactionRunner { + async fn get_compaction_client(&self) -> Result { + let mut config = CompactionClientConfig::default(); + let endpoint = self + .node_picker + .get_compaction_node() + .await + .context(PickCompactionNodeFailed)?; + config.compaction_server_addr = make_formatted_endpoint(&endpoint); + + let client = build_compaction_client(config) + .await + .context(BuildCompactionClientFailed)?; + Ok(client) + } + + async fn local_compact(&self, task: CompactionRunnerTask) -> Result { + self.local_compaction_runner.run(task).await + } +} + +#[async_trait] +impl CompactionRunner for RemoteCompactionRunner { + /// Run the compaction task either on a remote node or fall back to local + /// compaction. + async fn run(&self, task: CompactionRunnerTask) -> Result { + let client = self + .get_compaction_client() + .await + .box_err() + .context(GetCompactionClientFailed); + + let pb_resp = match client { + Ok(client) => match client.execute_compaction_task(task.clone().into()).await { + Ok(resp) => resp, + Err(e) => { + if !self.fallback_local_when_failed { + return Err(flush_compaction::Error::RemoteCompactFailed { source: e }); + } + + info!( + "The compaction task falls back to local because of error:{}", + e + ); + return self.local_compact(task).await; + } + }, + Err(e) => { + if !self.fallback_local_when_failed { + return Err(e); + } + + info!( + "The compaction task falls back to local because of error:{}", + e + ); + return self.local_compact(task).await; + } + }; + + let resp = pb_resp + .try_into() + .box_err() + .context(ConvertCompactionTaskResponse)?; + + Ok(resp) + } +} + +fn make_formatted_endpoint(endpoint: &str) -> String { + format!("http://{endpoint}") +} diff --git a/src/analytic_engine/src/instance/engine.rs b/src/analytic_engine/src/instance/engine.rs index 8c29ab1c2d..537b83314f 100644 --- a/src/analytic_engine/src/instance/engine.rs +++ b/src/analytic_engine/src/instance/engine.rs @@ -259,6 +259,12 @@ pub enum Error { sequence: SequenceNumber, source: wal::manager::Error, }, + + #[snafu(display( + "Failed to find meta client to construct remote compaction runner.\nBacktrace:\n{}", + backtrace + ))] + MetaClientNotExist { backtrace: Backtrace }, } define_result!(Error); @@ -293,6 +299,7 @@ impl From for table_engine::engine::Error { | Error::DoManifestSnapshot { .. } | Error::OpenManifest { .. } | Error::TableNotExist { .. } + | Error::MetaClientNotExist { .. } | Error::OpenTablesOfShard { .. } | Error::ReplayWalNoCause { .. } | Error::PurgeWal { .. } diff --git a/src/analytic_engine/src/instance/flush_compaction.rs b/src/analytic_engine/src/instance/flush_compaction.rs index da1647eb70..9deceff563 100644 --- a/src/analytic_engine/src/instance/flush_compaction.rs +++ b/src/analytic_engine/src/instance/flush_compaction.rs @@ -41,6 +41,7 @@ use tokio::{sync::oneshot, time::Instant}; use wal::manager::WalLocation; use crate::{ + compaction::runner::node_picker, instance::{ self, reorder_memtable::Reorder, serial_executor::TableFlushScheduler, SpaceStoreRef, }, @@ -158,6 +159,25 @@ pub enum Error { #[snafu(display("Failed to alloc file id, err:{}", source))] AllocFileId { source: data::Error }, + + #[snafu(display("Failed to convert compaction task response, err:{}", source))] + ConvertCompactionTaskResponse { source: GenericError }, + + #[snafu(display("Failed to pick remote compaction node, err:{}", source))] + PickCompactionNodeFailed { source: node_picker::Error }, + + #[snafu(display("Failed to build compaction client, err:{}", source))] + BuildCompactionClientFailed { + source: crate::compaction::runner::Error, + }, + + #[snafu(display("Failed to get compaction client, err:{}", source))] + GetCompactionClientFailed { source: GenericError }, + + #[snafu(display("Failed to execute compaction task remotely, err:{}", source))] + RemoteCompactFailed { + source: crate::compaction::runner::Error, + }, } define_result!(Error); diff --git a/src/analytic_engine/src/instance/open.rs b/src/analytic_engine/src/instance/open.rs index 220fa84c3a..97717c5ab0 100644 --- a/src/analytic_engine/src/instance/open.rs +++ b/src/analytic_engine/src/instance/open.rs @@ -24,20 +24,28 @@ use std::{ use common_types::table::ShardId; use logger::{error, info}; +use meta_client::MetaClientRef; use object_store::ObjectStoreRef; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table_engine::{engine::TableDef, table::TableId}; use wal::manager::WalManagerRef; use crate::{ compaction::{ - runner::{local_runner::LocalCompactionRunner, CompactionRunnerPtr, CompactionRunnerRef}, + runner::{ + local_runner::LocalCompactionRunner, + node_picker::{ + LocalCompactionNodePickerImpl, NodePicker, RemoteCompactionNodePickerImpl, + }, + remote_runner::RemoteCompactionRunner, + CompactionRunnerPtr, CompactionRunnerRef, + }, scheduler::SchedulerImpl, }, context::OpenContext, engine, instance::{ - engine::{OpenManifest, OpenTablesOfShard, ReadMetaUpdate, Result}, + engine::{MetaClientNotExist, OpenManifest, OpenTablesOfShard, ReadMetaUpdate, Result}, flush_compaction::Flusher, mem_collector::MemUsageCollector, wal_replayer::{ReplayMode, WalReplayer}, @@ -52,7 +60,7 @@ use crate::{ }, table::data::{TableCatalogInfo, TableDataRef}, table_meta_set_impl::TableMetaSetImpl, - RecoverMode, + CompactionMode, RecoverMode, }; pub(crate) struct InstanceContext { @@ -68,14 +76,48 @@ impl InstanceContext { wal_manager: WalManagerRef, store_picker: ObjectStorePickerRef, sst_factory: SstFactoryRef, + meta_client: Option, ) -> Result { - let compaction_runner = Box::new(LocalCompactionRunner::new( + info!( + "Construct compaction runner with compaction_mode:{:?}", + ctx.config.compaction_mode + ); + + let local_compaction_runner = LocalCompactionRunner::new( ctx.runtimes.compact_runtime.clone(), &ctx.config, sst_factory.clone(), store_picker.clone(), ctx.meta_cache.clone(), - )); + ); + + let compaction_runner: CompactionRunnerPtr = match &ctx.config.compaction_mode { + CompactionMode::Offload(NodePicker::Local(endpoint)) => { + Box::new(RemoteCompactionRunner { + node_picker: Arc::new(LocalCompactionNodePickerImpl { + endpoint: endpoint.clone(), + }), + // This field is set to false here for testing. + fallback_local_when_failed: false, + local_compaction_runner: local_compaction_runner.clone(), + }) + } + CompactionMode::Offload(NodePicker::Remote) => Box::new(RemoteCompactionRunner { + node_picker: Arc::new(RemoteCompactionNodePickerImpl { + meta_client: meta_client.context(MetaClientNotExist)?, + }), + fallback_local_when_failed: true, + local_compaction_runner: local_compaction_runner.clone(), + }), + + CompactionMode::Local => Box::new(LocalCompactionRunner::new( + ctx.runtimes.compact_runtime.clone(), + &ctx.config, + sst_factory.clone(), + store_picker.clone(), + ctx.meta_cache.clone(), + )), + }; let instance = Instance::open( ctx, @@ -89,7 +131,7 @@ impl InstanceContext { Ok(Self { instance, - local_compaction_runner: None, + local_compaction_runner: Some(Arc::new(local_compaction_runner)), }) } } diff --git a/src/analytic_engine/src/lib.rs b/src/analytic_engine/src/lib.rs index 4b80741f6c..687bcf637a 100644 --- a/src/analytic_engine/src/lib.rs +++ b/src/analytic_engine/src/lib.rs @@ -19,7 +19,7 @@ #![feature(option_get_or_insert_default)] -mod compaction; +pub mod compaction; mod context; mod engine; pub mod error; @@ -40,6 +40,7 @@ pub mod table_meta_set_impl; #[cfg(any(test, feature = "test"))] pub mod tests; +use compaction::runner::node_picker::NodePicker; use error::ErrorKind; use manifest::details::Options as ManifestOptions; use object_store::config::StorageOptions; @@ -54,6 +55,20 @@ pub use crate::{ table_options::TableOptions, }; +/// The compaction mode decides compaction offload or not. +/// +/// [CompactionMode::Offload] means offload the compaction task +/// to a local or remote node. +/// +/// [CompactionMode::Local] means local compaction, no offloading. +#[derive(Clone, Default, Debug, Deserialize, Serialize)] +#[serde(tag = "compaction_mode")] +pub enum CompactionMode { + #[default] + Local, + Offload(NodePicker), +} + /// Config of analytic engine #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(default)] @@ -77,6 +92,9 @@ pub struct Config { pub compaction: SchedulerConfig, + /// Offload the compaction task or not. + pub compaction_mode: CompactionMode, + /// sst meta cache capacity pub sst_meta_cache_cap: Option, /// sst data cache capacity @@ -187,6 +205,7 @@ impl Default for Config { table_opts: TableOptions::default(), try_compat_old_layered_memtable_opts: false, compaction: SchedulerConfig::default(), + compaction_mode: CompactionMode::Local, sst_meta_cache_cap: Some(1000), sst_data_cache_cap: Some(1000), manifest: ManifestOptions::default(), diff --git a/src/analytic_engine/src/setup.rs b/src/analytic_engine/src/setup.rs index ee16772985..4075e250db 100644 --- a/src/analytic_engine/src/setup.rs +++ b/src/analytic_engine/src/setup.rs @@ -21,6 +21,7 @@ use std::{num::NonZeroUsize, path::Path, pin::Pin, sync::Arc}; use futures::Future; use macros::define_result; +use meta_client::MetaClientRef; use object_store::{ aliyun, config::{ObjectStoreOptions, StorageOptions}, @@ -96,6 +97,8 @@ pub struct EngineBuilder<'a> { pub config: &'a Config, pub engine_runtimes: Arc, pub opened_wals: OpenedWals, + // Meta client is needed when compaction offload with remote node picker. + pub meta_client: Option, } impl<'a> EngineBuilder<'a> { @@ -116,6 +119,7 @@ impl<'a> EngineBuilder<'a> { self.opened_wals.data_wal, manifest_storages, Arc::new(opened_storages), + self.meta_client, ) .await?; @@ -134,6 +138,7 @@ async fn build_instance_context( wal_manager: WalManagerRef, manifest_storages: ManifestStorages, store_picker: ObjectStorePickerRef, + meta_client: Option, ) -> Result { let meta_cache: Option = config .sst_meta_cache_cap @@ -151,6 +156,7 @@ async fn build_instance_context( wal_manager, store_picker, Arc::new(FactoryImpl), + meta_client.clone(), ) .await .context(OpenInstance)?; diff --git a/src/analytic_engine/src/sst/factory.rs b/src/analytic_engine/src/sst/factory.rs index 2ddeb24668..1f17b8df1d 100644 --- a/src/analytic_engine/src/sst/factory.rs +++ b/src/analytic_engine/src/sst/factory.rs @@ -21,10 +21,11 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc}; use async_trait::async_trait; use common_types::projected_schema::RowProjectorBuilder; +use generic_error::{BoxError, GenericError}; use macros::define_result; use object_store::{ObjectStoreRef, Path}; use runtime::Runtime; -use snafu::{ResultExt, Snafu}; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::predicate::PredicateRef; use trace_metric::MetricsCollector; @@ -50,6 +51,15 @@ use crate::{ pub enum Error { #[snafu(display("Failed to parse sst header, err:{}", source,))] ParseHeader { source: header::Error }, + + #[snafu(display("Empty storage format hint.\nBacktrace:\n{}", backtrace))] + EmptyStorageFormatHint { backtrace: Backtrace }, + + #[snafu(display("Failed to convert storage format hint, err:{}", source))] + ConvertStorageFormatHint { source: GenericError }, + + #[snafu(display("Failed to convert compression, err:{}", source))] + ConvertCompression { source: GenericError }, } define_result!(Error); @@ -164,6 +174,59 @@ pub struct SstWriteOptions { pub column_stats: HashMap, } +impl TryFrom for SstWriteOptions { + type Error = Error; + + fn try_from(value: horaedbproto::compaction_service::SstWriteOptions) -> Result { + let storage_format_hint: StorageFormatHint = value + .storage_format_hint + .context(EmptyStorageFormatHint)? + .try_into() + .box_err() + .context(ConvertStorageFormatHint)?; + + let num_rows_per_row_group = value.num_rows_per_row_group as usize; + let compression: Compression = value + .compression + .try_into() + .box_err() + .context(ConvertCompression)?; + let max_buffer_size = value.max_buffer_size as usize; + + let column_stats: HashMap = value + .column_stats + .into_iter() + .map(|(k, v)| (k, ColumnStats { low_cardinality: v })) + .collect(); + + Ok(SstWriteOptions { + storage_format_hint, + num_rows_per_row_group, + compression, + max_buffer_size, + column_stats, + }) + } +} + +impl From for horaedbproto::compaction_service::SstWriteOptions { + fn from(value: SstWriteOptions) -> Self { + let column_stats = value + .column_stats + .into_iter() + .map(|(k, v)| (k, v.low_cardinality)) + .collect(); + + Self { + storage_format_hint: Some(value.storage_format_hint.into()), + num_rows_per_row_group: value.num_rows_per_row_group as u64, + compression: value.compression.into(), + max_buffer_size: value.max_buffer_size as u64, + column_stats, + } + } +} + impl From<&ColumnStats> for ColumnEncoding { fn from(value: &ColumnStats) -> Self { ColumnEncoding { diff --git a/src/analytic_engine/src/sst/file.rs b/src/analytic_engine/src/sst/file.rs index 39cdc7c7d1..a6cc336a31 100644 --- a/src/analytic_engine/src/sst/file.rs +++ b/src/analytic_engine/src/sst/file.rs @@ -35,12 +35,13 @@ use common_types::{ SequenceNumber, }; use future_ext::{retry_async, BackoffConfig, RetryConfig}; +use generic_error::{BoxError, GenericError}; use logger::{error, info, trace, warn}; use macros::define_result; use metric_ext::Meter; use object_store::{ObjectStoreRef, Path}; use runtime::{JoinHandle, Runtime}; -use snafu::{ResultExt, Snafu}; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::table::TableId; use tokio::sync::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, @@ -54,6 +55,18 @@ use crate::{space::SpaceId, sst::manager::FileId, table::sst_util, table_options pub enum Error { #[snafu(display("Failed to join purger, err:{}", source))] StopPurger { source: runtime::Error }, + + #[snafu(display("Empty time range.\nBacktrace:\n{}", backtrace))] + EmptyTimeRange { backtrace: Backtrace }, + + #[snafu(display("Failed to convert time range, err:{}", source))] + ConvertTimeRange { source: GenericError }, + + #[snafu(display("Failed to convert storage format, err:{}", source))] + ConvertStorageFormat { source: GenericError }, + + #[snafu(display("Converted overflow, err:{}", source))] + ConvertOverflow { source: GenericError }, } define_result!(Error); @@ -95,6 +108,15 @@ impl From for Level { } } +impl TryFrom for Level { + type Error = Error; + + fn try_from(value: u32) -> Result { + let value: u16 = value.try_into().box_err().context(ConvertOverflow)?; + Ok(value.into()) + } +} + impl fmt::Display for Level { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) @@ -197,6 +219,16 @@ impl FileHandle { } } + #[inline] + pub fn space_id(&self) -> SpaceId { + self.inner.purge_queue.space_id() + } + + #[inline] + pub fn table_id(&self) -> TableId { + self.inner.purge_queue.table_id() + } + #[inline] pub fn read_meter(&self) -> Arc { self.inner.metrics.read_meter.clone() @@ -460,6 +492,53 @@ impl FileMeta { } } +impl TryFrom for FileMeta { + type Error = Error; + + fn try_from(value: horaedbproto::compaction_service::FileMeta) -> Result { + let time_range: TimeRange = value + .time_range + .context(EmptyTimeRange)? + .try_into() + .box_err() + .context(ConvertTimeRange)?; + + let storage_format: StorageFormat = value + .storage_format + .try_into() + .box_err() + .context(ConvertStorageFormat)?; + let mut associated_files: Vec = Vec::with_capacity(value.associated_files.len()); + for file in value.associated_files { + associated_files.push(file); + } + + Ok(FileMeta { + id: value.file_id, + size: value.size, + row_num: value.row_num, + time_range, + max_seq: value.max_seq, + storage_format, + associated_files, + }) + } +} + +impl From for horaedbproto::compaction_service::FileMeta { + fn from(value: FileMeta) -> Self { + Self { + file_id: value.id, + max_seq: value.max_seq, + time_range: Some(value.time_range.into()), + size: value.size, + row_num: value.row_num, + storage_format: value.storage_format.into(), + associated_files: value.associated_files, + } + } +} + // Queue to store files to be deleted for a table. #[derive(Clone)] pub struct FilePurgeQueue { @@ -508,6 +587,23 @@ impl FilePurgeQueue { ); } } + + #[inline] + pub fn space_id(&self) -> SpaceId { + self.inner.space_id + } + + #[inline] + pub fn table_id(&self) -> TableId { + self.inner.table_id + } +} + +impl From for FilePurgeQueue { + fn from(value: horaedbproto::compaction_service::FilePurgeQueue) -> Self { + let (tx, _rx) = mpsc::unbounded_channel(); + FilePurgeQueue::new(value.space_id, value.table_id.into(), tx) + } } struct FilePurgeQueueInner { diff --git a/src/analytic_engine/src/sst/writer.rs b/src/analytic_engine/src/sst/writer.rs index e424e8af48..577f499332 100644 --- a/src/analytic_engine/src/sst/writer.rs +++ b/src/analytic_engine/src/sst/writer.rs @@ -26,7 +26,8 @@ use common_types::{ SequenceNumber, }; use futures::Stream; -use generic_error::GenericError; +use generic_error::{BoxError, GenericError}; +use snafu::{OptionExt, ResultExt}; use crate::table_options::StorageFormat; @@ -96,6 +97,21 @@ pub mod error { #[snafu(display("Other kind of error, msg:{}.\nBacktrace:\n{}", msg, backtrace))] OtherNoCause { msg: String, backtrace: Backtrace }, + + #[snafu(display("Empty time range.\nBacktrace:\n{}", backtrace))] + EmptyTimeRange { backtrace: Backtrace }, + + #[snafu(display("Empty schema.\nBacktrace:\n{}", backtrace))] + EmptySchema { backtrace: Backtrace }, + + #[snafu(display("Failed to convert time range, err:{}", source))] + ConvertTimeRange { source: GenericError }, + + #[snafu(display("Failed to convert sst info, err:{}", source))] + ConvertSstInfo { source: GenericError }, + + #[snafu(display("Failed to convert schema, err:{}", source))] + ConvertSchema { source: GenericError }, } define_result!(Error); @@ -117,6 +133,44 @@ pub struct SstInfo { pub time_range: TimeRange, } +impl TryFrom for SstInfo { + type Error = Error; + + fn try_from(value: horaedbproto::compaction_service::SstInfo) -> Result { + let storage_format = value + .storage_format + .try_into() + .box_err() + .context(ConvertSstInfo)?; + let time_range = value + .time_range + .context(EmptyTimeRange)? + .try_into() + .box_err() + .context(ConvertTimeRange)?; + + Ok(Self { + file_size: value.file_size as usize, + row_num: value.row_num as usize, + storage_format, + meta_path: value.meta_path, + time_range, + }) + } +} + +impl From for horaedbproto::compaction_service::SstInfo { + fn from(value: SstInfo) -> Self { + Self { + file_size: value.file_size as u64, + row_num: value.row_num as u64, + storage_format: value.storage_format.into(), + meta_path: value.meta_path, + time_range: Some(value.time_range.into()), + } + } +} + #[derive(Debug, Clone)] pub struct MetaData { /// Min key of the sst. @@ -131,6 +185,45 @@ pub struct MetaData { pub schema: Schema, } +impl TryFrom for MetaData { + type Error = Error; + + fn try_from(meta: horaedbproto::compaction_service::MetaData) -> Result { + let time_range = meta + .time_range + .context(EmptyTimeRange)? + .try_into() + .box_err() + .context(ConvertTimeRange)?; + let schema = meta + .schema + .context(EmptySchema)? + .try_into() + .box_err() + .context(ConvertSchema)?; + + Ok(Self { + min_key: Bytes::from(meta.min_key), + max_key: Bytes::from(meta.max_key), + time_range, + max_sequence: meta.max_sequence, + schema, + }) + } +} + +impl From for horaedbproto::compaction_service::MetaData { + fn from(meta: MetaData) -> Self { + Self { + min_key: meta.min_key.to_vec(), + max_key: meta.max_key.to_vec(), + max_sequence: meta.max_sequence, + time_range: Some(meta.time_range.into()), + schema: Some((&meta.schema).into()), + } + } +} + /// The writer for sst. /// /// The caller provides a stream of [RecordBatch] and the writer takes diff --git a/src/analytic_engine/src/table_options.rs b/src/analytic_engine/src/table_options.rs index 4c1823eed2..0ecabb9512 100644 --- a/src/analytic_engine/src/table_options.rs +++ b/src/analytic_engine/src/table_options.rs @@ -130,6 +130,13 @@ pub enum Error { ))] UnknownStorageFormatHint { value: String, backtrace: Backtrace }, + #[snafu(display( + "Unknown compression type. value:{:?}.\nBacktrace:\n{}", + value, + backtrace + ))] + UnknownCompressionType { value: i32, backtrace: Backtrace }, + #[snafu(display("Storage format hint is missing.\nBacktrace:\n{}", backtrace))] MissingStorageFormatHint { backtrace: Backtrace }, @@ -234,6 +241,33 @@ impl From for Compression { } } +impl TryFrom for Compression { + type Error = Error; + + fn try_from(compression: i32) -> Result { + let compression = match compression { + 0 => Compression::Uncompressed, + 1 => Compression::Lz4, + 2 => Compression::Snappy, + 3 => Compression::Zstd, + _ => return UnknownCompressionType { value: compression }.fail(), + }; + + Ok(compression) + } +} + +impl From for i32 { + fn from(value: Compression) -> Self { + match value { + Compression::Uncompressed => 0, + Compression::Lz4 => 1, + Compression::Snappy => 2, + Compression::Zstd => 3, + } + } +} + impl From for ParquetCompression { fn from(compression: Compression) -> Self { match compression { @@ -340,6 +374,14 @@ impl From for manifest_pb::StorageFormat { } } +impl From for i32 { + fn from(value: StorageFormat) -> Self { + match value { + StorageFormat::Columnar => 0, + } + } +} + impl TryFrom for StorageFormat { type Error = Error; @@ -363,6 +405,18 @@ impl TryFrom<&str> for StorageFormat { } } +impl TryFrom for StorageFormat { + type Error = Error; + + fn try_from(value: i32) -> Result { + let format = match value { + 0 => Self::Columnar, + _ => return UnknownStorageFormatType { value }.fail(), + }; + Ok(format) + } +} + impl ToString for StorageFormat { fn to_string(&self) -> String { match self { diff --git a/src/analytic_engine/src/tests/util.rs b/src/analytic_engine/src/tests/util.rs index 8fe0710624..04bc09f75f 100644 --- a/src/analytic_engine/src/tests/util.rs +++ b/src/analytic_engine/src/tests/util.rs @@ -141,6 +141,7 @@ impl TestContext { config: &self.config, engine_runtimes: self.runtimes.clone(), opened_wals: opened_wals.clone(), + meta_client: None, }; self.opened_wals = Some(opened_wals); diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs index a7f86f0866..97c8457be8 100644 --- a/src/benchmarks/src/util.rs +++ b/src/benchmarks/src/util.rs @@ -522,6 +522,7 @@ impl TestContext { config: &self.config, engine_runtimes: self.runtimes.clone(), opened_wals: opened_wals.clone(), + meta_client: None, }; self.opened_wals = Some(opened_wals); diff --git a/src/cluster/src/cluster_impl.rs b/src/cluster/src/cluster_impl.rs index aee54e42b3..d79eda0485 100644 --- a/src/cluster/src/cluster_impl.rs +++ b/src/cluster/src/cluster_impl.rs @@ -46,8 +46,8 @@ use crate::{ shard_set::{Shard, ShardRef, ShardSet}, topology::ClusterTopology, Cluster, ClusterNodesNotFound, ClusterNodesResp, EtcdClientFailureWithCause, - InitEtcdClientConfig, InvalidArguments, MetaClientFailure, OpenShard, OpenShardWithCause, - Result, ShardNotFound, TableStatus, + InitEtcdClientConfig, InvalidArguments, MetaClientFailure, NodeType, OpenShard, + OpenShardWithCause, Result, ShardNotFound, TableStatus, }; /// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`]. @@ -376,6 +376,10 @@ impl Cluster for ClusterImpl { Ok(()) } + fn node_type(&self) -> NodeType { + self.config.node_type.clone() + } + async fn open_shard(&self, shard_info: &ShardInfo) -> Result { self.inner.open_shard(shard_info).await } diff --git a/src/cluster/src/config.rs b/src/cluster/src/config.rs index 29e0da9719..d0b1c694b9 100644 --- a/src/cluster/src/config.rs +++ b/src/cluster/src/config.rs @@ -23,6 +23,8 @@ use serde::{Deserialize, Serialize}; use table_engine::ANALYTIC_ENGINE_TYPE; use time_ext::ReadableDuration; +use crate::NodeType; + #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(default)] // TODO: move this to table_engine crates @@ -133,6 +135,7 @@ impl Default for TlsConfig { #[serde(default)] pub struct ClusterConfig { pub cmd_channel_buffer_size: usize, + pub node_type: NodeType, pub meta_client: MetaClientConfig, pub etcd_client: EtcdClientConfig, } diff --git a/src/cluster/src/lib.rs b/src/cluster/src/lib.rs index a97c945a0b..ddda6c4689 100644 --- a/src/cluster/src/lib.rs +++ b/src/cluster/src/lib.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use async_trait::async_trait; -use common_types::schema::SchemaName; +use common_types::{cluster::NodeType, schema::SchemaName}; use generic_error::GenericError; use macros::define_result; use meta_client::types::{ @@ -190,12 +190,14 @@ pub struct ClusterNodesResp { pub cluster_nodes: ClusterNodesRef, } -/// Cluster manages tables and shard infos in cluster mode. #[async_trait] pub trait Cluster { async fn start(&self) -> Result<()>; async fn stop(&self) -> Result<()>; + /// Get cluster type. + fn node_type(&self) -> NodeType; + /// Fetch related information and open shard. async fn open_shard(&self, shard_info: &ShardInfo) -> Result; diff --git a/src/common_types/src/cluster.rs b/src/common_types/src/cluster.rs new file mode 100644 index 0000000000..ad302023e9 --- /dev/null +++ b/src/common_types/src/cluster.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use serde::{Deserialize, Serialize}; + +/// Type to distinguish different node type in cluster mode. +#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum NodeType { + #[default] + HoraeDB, + CompactionServer, +} diff --git a/src/common_types/src/lib.rs b/src/common_types/src/lib.rs index 0b6cda17c8..334bd42f91 100644 --- a/src/common_types/src/lib.rs +++ b/src/common_types/src/lib.rs @@ -18,6 +18,7 @@ //! Contains common types pub mod bitset; +pub mod cluster; pub mod column; pub mod column_block; pub mod column_schema; diff --git a/src/horaedb/Cargo.toml b/src/horaedb/Cargo.toml index ce505105f2..5a6144d3cc 100644 --- a/src/horaedb/Cargo.toml +++ b/src/horaedb/Cargo.toml @@ -38,32 +38,33 @@ wal-rocksdb = ["wal/wal-rocksdb", "analytic_engine/wal-rocksdb"] wal-local-storage = ["wal/wal-local-storage", "analytic_engine/wal-local-storage"] [dependencies] -analytic_engine = { workspace = true } -catalog = { workspace = true } -catalog_impls = { workspace = true } -clap = { workspace = true } -cluster = { workspace = true } -datafusion = { workspace = true } -df_operator = { workspace = true } -etcd-client = { workspace = true } -interpreters = { workspace = true } -logger = { workspace = true } -meta_client = { workspace = true } -moka = { version = "0.10", features = ["future"] } -panic_ext = { workspace = true } -proxy = { workspace = true } -query_engine = { workspace = true } -router = { workspace = true } -runtime = { workspace = true } -serde = { workspace = true } -server = { workspace = true } -signal-hook = "0.3" -size_ext = { workspace = true } -table_engine = { workspace = true } -toml = { workspace = true } -toml_ext = { workspace = true } -tracing_util = { workspace = true } -wal = { workspace = true } +analytic_engine = { workspace = true } +catalog = { workspace = true } +catalog_impls = { workspace = true } +clap = { workspace = true } +cluster = { workspace = true } +common_types = { workspace = true } +datafusion = { workspace = true } +df_operator = { workspace = true } +etcd-client = { workspace = true } +interpreters = { workspace = true } +logger = { workspace = true } +meta_client = { workspace = true } +moka = { version = "0.10", features = ["future"] } +panic_ext = { workspace = true } +proxy = { workspace = true } +query_engine = { workspace = true } +router = { workspace = true } +runtime = { workspace = true } +serde = { workspace = true } +server = { workspace = true } +signal-hook = "0.3" +size_ext = { workspace = true } +table_engine = { workspace = true } +toml = { workspace = true } +toml_ext = { workspace = true } +tracing_util = { workspace = true } +wal = { workspace = true } [build-dependencies] vergen = { version = "8", default-features = false, features = [ diff --git a/src/horaedb/src/config.rs b/src/horaedb/src/config.rs index b9f8932f19..e7f19233f0 100644 --- a/src/horaedb/src/config.rs +++ b/src/horaedb/src/config.rs @@ -26,8 +26,8 @@ use size_ext::ReadableSize; #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(default)] pub struct NodeInfo { - /// The address of the horaedb node. It can be a domain name or an IP - /// address without port followed. + /// The address of the horaedb (or compaction server) node. It can be a + /// domain name or an IP address without port followed. pub addr: String, pub zone: String, pub idc: String, diff --git a/src/horaedb/src/setup.rs b/src/horaedb/src/setup.rs index 9bdb46daf9..33632b5524 100644 --- a/src/horaedb/src/setup.rs +++ b/src/horaedb/src/setup.rs @@ -313,6 +313,7 @@ async fn build_with_meta( zone: config.node.zone.clone(), idc: config.node.idc.clone(), binary_version: config.node.binary_version.clone(), + node_type: cluster_config.node_type.clone(), }; info!("Build horaedb with node meta info:{node_meta_info:?}"); @@ -349,8 +350,12 @@ async fn build_with_meta( config: &config.analytic, engine_runtimes: runtimes.clone(), opened_wals: opened_wals.clone(), + meta_client: Some(meta_client.clone()), }; - let TableEngineContext { table_engine, .. } = engine_builder + let TableEngineContext { + table_engine, + local_compaction_runner, + } = engine_builder .build() .await .expect("Failed to setup analytic engine"); @@ -368,14 +373,18 @@ async fn build_with_meta( let table_manipulator = Arc::new(meta_based::TableManipulatorImpl::new(meta_client)); let schema_config_provider = Arc::new(ClusterBasedProvider::new(cluster.clone())); - builder + + let mut builder = builder .table_engine(engine_proxy) .catalog_manager(catalog_manager) .table_manipulator(table_manipulator) .cluster(cluster) .opened_wals(opened_wals) .router(router) - .schema_config_provider(schema_config_provider) + .schema_config_provider(schema_config_provider); + builder = builder.compaction_runner(local_compaction_runner.expect("Empty compaction runner.")); + + builder } async fn build_without_meta( @@ -394,6 +403,7 @@ async fn build_without_meta( config: &config.analytic, engine_runtimes: runtimes.clone(), opened_wals: opened_wals.clone(), + meta_client: None, }; let TableEngineContext { table_engine, .. } = engine_builder .build() diff --git a/src/meta_client/src/lib.rs b/src/meta_client/src/lib.rs index a6cb8df6b9..ba93313537 100644 --- a/src/meta_client/src/lib.rs +++ b/src/meta_client/src/lib.rs @@ -23,9 +23,9 @@ use macros::define_result; use snafu::{Backtrace, Snafu}; use types::{ AllocSchemaIdRequest, AllocSchemaIdResponse, CreateTableRequest, CreateTableResponse, - DropTableRequest, DropTableResponse, GetNodesRequest, GetNodesResponse, - GetTablesOfShardsRequest, GetTablesOfShardsResponse, RouteTablesRequest, RouteTablesResponse, - ShardInfo, + DropTableRequest, DropTableResponse, FetchCompactionNodeRequest, FetchCompactionNodeResponse, + GetNodesRequest, GetNodesResponse, GetTablesOfShardsRequest, GetTablesOfShardsResponse, + RouteTablesRequest, RouteTablesResponse, ShardInfo, }; pub mod meta_impl; @@ -76,6 +76,9 @@ pub enum Error { #[snafu(display("Failed to get tables, err:{}", source))] FailGetTables { source: GenericError }, + #[snafu(display("Failed to fetch compaction node, err:{}", source))] + FailFetchCompactionNode { source: GenericError }, + #[snafu(display("Failed to route tables, err:{}", source))] FailRouteTables { source: GenericError }, @@ -113,6 +116,11 @@ pub trait MetaClient: Send + Sync { async fn get_nodes(&self, req: GetNodesRequest) -> Result; + async fn fetch_compaction_node( + &self, + req: FetchCompactionNodeRequest, + ) -> Result; + async fn send_heartbeat(&self, req: Vec) -> Result<()>; } diff --git a/src/meta_client/src/meta_impl.rs b/src/meta_client/src/meta_impl.rs index 5ba98de5fc..ffe32faeb8 100644 --- a/src/meta_client/src/meta_impl.rs +++ b/src/meta_client/src/meta_impl.rs @@ -31,9 +31,10 @@ use time_ext::ReadableDuration; use crate::{ types::{ AllocSchemaIdRequest, AllocSchemaIdResponse, CreateTableRequest, CreateTableResponse, - DropTableRequest, DropTableResponse, GetNodesRequest, GetNodesResponse, - GetTablesOfShardsRequest, GetTablesOfShardsResponse, NodeInfo, NodeMetaInfo, RequestHeader, - RouteTablesRequest, RouteTablesResponse, ShardInfo, + DropTableRequest, DropTableResponse, FetchCompactionNodeRequest, + FetchCompactionNodeResponse, GetNodesRequest, GetNodesResponse, GetTablesOfShardsRequest, + GetTablesOfShardsResponse, NodeInfo, NodeMetaInfo, RequestHeader, RouteTablesRequest, + RouteTablesResponse, ShardInfo, }, BadResponse, FailAllocSchemaId, FailConnect, FailCreateTable, FailDropTable, FailGetTables, FailRouteTables, FailSendHeartbeat, MetaClient, MetaClientRef, MissingHeader, Result, @@ -236,6 +237,13 @@ impl MetaClient for MetaClientImpl { GetNodesResponse::try_from(pb_resp) } + async fn fetch_compaction_node( + &self, + _req: FetchCompactionNodeRequest, + ) -> Result { + todo!() + } + async fn send_heartbeat(&self, shard_infos: Vec) -> Result<()> { let node_info = NodeInfo { node_meta_info: self.node_meta_info.clone(), diff --git a/src/meta_client/src/types.rs b/src/meta_client/src/types.rs index 6a6aba6918..524843620b 100644 --- a/src/meta_client/src/types.rs +++ b/src/meta_client/src/types.rs @@ -19,6 +19,7 @@ use std::{collections::HashMap, fmt, sync::Arc}; pub use common_types::table::{ShardId, ShardVersion}; use common_types::{ + cluster::NodeType, schema::{SchemaId, SchemaName}, table::{TableId, TableName}, }; @@ -163,6 +164,7 @@ pub struct NodeMetaInfo { pub zone: String, pub idc: String, pub binary_version: String, + pub node_type: NodeType, } impl NodeMetaInfo { @@ -589,3 +591,10 @@ impl TryFrom for GetNodesResponse { }) } } + +#[derive(Debug, Clone, Default)] +pub struct FetchCompactionNodeRequest {} + +pub struct FetchCompactionNodeResponse { + pub endpoint: String, +} diff --git a/src/router/src/cluster_based.rs b/src/router/src/cluster_based.rs index d929104407..bd5efd8b5b 100644 --- a/src/router/src/cluster_based.rs +++ b/src/router/src/cluster_based.rs @@ -205,7 +205,7 @@ mod tests { shard_lock_manager::ShardLockManagerRef, shard_set::ShardRef, Cluster, ClusterNodesResp, TableStatus, }; - use common_types::table::ShardId; + use common_types::{cluster::NodeType, table::ShardId}; use horaedbproto::storage::{RequestContext, RouteRequest as RouteRequestPb}; use meta_client::types::{ NodeShard, RouteEntry, RouteTablesResponse, ShardInfo, ShardRole::Leader, TableInfo, @@ -226,6 +226,10 @@ mod tests { unimplemented!(); } + fn node_type(&self) -> NodeType { + unimplemented!() + } + async fn open_shard(&self, _: &ShardInfo) -> cluster::Result { unimplemented!(); } diff --git a/src/server/src/grpc/compaction_service/error.rs b/src/server/src/grpc/compaction_service/error.rs new file mode 100644 index 0000000000..eadb3f2418 --- /dev/null +++ b/src/server/src/grpc/compaction_service/error.rs @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Error definitions for compaction service. + +use generic_error::GenericError; +use horaedbproto::common::ResponseHeader; +use macros::define_result; +use snafu::Snafu; + +use crate::error_util; + +define_result!(Error); + +#[derive(Snafu, Debug)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Server error, code:{:?}, message:{}", code, msg))] + ErrNoCause { code: StatusCode, msg: String }, + + #[snafu(display("Server error, code:{:?}, message:{}, cause:{}", code, msg, source))] + ErrWithCause { + code: StatusCode, + msg: String, + source: GenericError, + }, +} + +impl Error { + pub fn code(&self) -> StatusCode { + match *self { + Error::ErrNoCause { code, .. } => code, + Error::ErrWithCause { code, .. } => code, + } + } + + /// Get the error message returned to the user. + pub fn error_message(&self) -> String { + match self { + Error::ErrNoCause { msg, .. } => msg.clone(), + + Error::ErrWithCause { msg, source, .. } => { + let err_string = source.to_string(); + let first_line = error_util::remove_backtrace_from_err(&err_string); + format!("{msg}. Caused by: {first_line}") + } + } + } +} + +/// A set of codes for compaction service. +/// +/// Note that such a set of codes is different with the codes (alias to http +/// status code) used by storage service. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum StatusCode { + #[default] + Ok = 0, + BadRequest = 401, + Internal = 500, +} + +impl StatusCode { + #[inline] + pub fn as_u32(self) -> u32 { + self as u32 + } +} + +pub fn build_err_header(err: Error) -> ResponseHeader { + ResponseHeader { + code: err.code().as_u32(), + error: err.error_message(), + } +} + +pub fn build_ok_header() -> ResponseHeader { + ResponseHeader { + code: StatusCode::Ok.as_u32(), + ..Default::default() + } +} diff --git a/src/server/src/grpc/compaction_service/mod.rs b/src/server/src/grpc/compaction_service/mod.rs new file mode 100644 index 0000000000..3954b78a44 --- /dev/null +++ b/src/server/src/grpc/compaction_service/mod.rs @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Compaction rpc service implementation. + +use std::sync::Arc; + +use analytic_engine::compaction::runner::{CompactionRunnerRef, CompactionRunnerTask}; +use async_trait::async_trait; +use error::{build_err_header, build_ok_header, ErrWithCause, StatusCode}; +use generic_error::BoxError; +use horaedbproto::compaction_service::{ + compaction_service_server::CompactionService, ExecResult, ExecuteCompactionTaskRequest, + ExecuteCompactionTaskResponse, +}; +use runtime::Runtime; +use snafu::ResultExt; +use tonic::{Request, Response, Status}; + +mod error; + +/// Builder for [CompactionServiceImpl] +pub struct Builder { + pub runtime: Arc, + pub compaction_runner: CompactionRunnerRef, +} + +impl Builder { + pub fn build(self) -> CompactionServiceImpl { + let Self { + runtime, + compaction_runner, + } = self; + + CompactionServiceImpl { + runtime, + compaction_runner, + } + } +} + +#[derive(Clone)] +pub struct CompactionServiceImpl { + pub runtime: Arc, + pub compaction_runner: CompactionRunnerRef, +} + +#[async_trait] +impl CompactionService for CompactionServiceImpl { + async fn execute_compaction_task( + &self, + request: Request, + ) -> Result, Status> { + let request: Result = request + .into_inner() + .try_into() + .box_err() + .context(ErrWithCause { + code: StatusCode::BadRequest, + msg: "fail to convert the execute compaction task request", + }); + + let mut resp: ExecuteCompactionTaskResponse = ExecuteCompactionTaskResponse::default(); + match request { + Ok(task) => { + let request_id = task.request_id.clone(); + let res = self + .compaction_runner + .run(task) + .await + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::Internal, + msg: format!("fail to compact task, request:{request_id}"), + }); + + match res { + Ok(res) => { + resp.header = Some(build_ok_header()); + resp.result = Some(ExecResult { + output_file_path: res.output_file_path.into(), + sst_info: Some(res.sst_info.into()), + sst_meta: Some(res.sst_meta.into()), + }); + // TODO(leslie): Add status. + } + Err(e) => { + resp.header = Some(build_err_header(e)); + } + } + } + Err(e) => { + resp.header = Some(build_err_header(e)); + } + } + + Ok(Response::new(resp)) + } +} diff --git a/src/server/src/grpc/mod.rs b/src/server/src/grpc/mod.rs index 7b02a3a2a2..24a181682b 100644 --- a/src/server/src/grpc/mod.rs +++ b/src/server/src/grpc/mod.rs @@ -24,11 +24,14 @@ use std::{ time::Duration, }; +use analytic_engine::compaction::runner::CompactionRunnerRef; use cluster::ClusterRef; use common_types::column_schema; +use compaction_service::CompactionServiceImpl; use futures::FutureExt; use generic_error::GenericError; use horaedbproto::{ + compaction_service::compaction_service_server::CompactionServiceServer, meta_event::meta_event_service_server::MetaEventServiceServer, remote_engine::remote_engine_service_server::RemoteEngineServiceServer, storage::storage_service_server::StorageServiceServer, @@ -60,6 +63,7 @@ use crate::{ }, }; +mod compaction_service; mod meta_event_service; mod metrics; mod remote_engine_service; @@ -105,6 +109,9 @@ pub enum Error { #[snafu(display("Missing wals.\nBacktrace:\n{}", backtrace))] MissingWals { backtrace: Backtrace }, + #[snafu(display("Missing compaction runner.\nBacktrace:\n{}", backtrace))] + MissingCompactionRunner { backtrace: Backtrace }, + #[snafu(display("Missing timeout.\nBacktrace:\n{}", backtrace))] MissingTimeout { backtrace: Backtrace }, @@ -163,6 +170,7 @@ define_result!(Error); pub struct RpcServices { serve_addr: SocketAddr, rpc_server: InterceptedService, AuthWithFile>, + compaction_rpc_server: Option>, meta_rpc_server: Option>, remote_engine_server: RemoteEngineServiceServer, runtime: Arc, @@ -173,6 +181,7 @@ pub struct RpcServices { impl RpcServices { pub async fn start(&mut self) -> Result<()> { let rpc_server = self.rpc_server.clone(); + let compaction_rpc_server = self.compaction_rpc_server.clone(); let meta_rpc_server = self.meta_rpc_server.clone(); let remote_engine_server = self.remote_engine_server.clone(); let serve_addr = self.serve_addr; @@ -182,6 +191,11 @@ impl RpcServices { let mut router = Server::builder().add_service(rpc_server); + if let Some(s) = compaction_rpc_server { + info!("Grpc server serves compaction service"); + router = router.add_service(s); + }; + if let Some(s) = meta_rpc_server { info!("Grpc server serves meta rpc service"); router = router.add_service(s); @@ -226,6 +240,7 @@ pub struct Builder { proxy: Option>, query_dedup_config: Option, hotspot_recorder: Option>, + compaction_runner: Option, } impl Builder { @@ -241,6 +256,7 @@ impl Builder { proxy: None, query_dedup_config: None, hotspot_recorder: None, + compaction_runner: None, } } @@ -294,6 +310,12 @@ impl Builder { self.query_dedup_config = Some(config); self } + + // Compaction runner is an optional field for building [RpcServices]. + pub fn compaction_runner(mut self, runner: Option) -> Self { + self.compaction_runner = runner; + self + } } impl Builder { @@ -301,19 +323,39 @@ impl Builder { let auth = self.auth.context(MissingAuth)?; let runtimes = self.runtimes.context(MissingRuntimes)?; let instance = self.instance.context(MissingInstance)?; - let opened_wals = self.opened_wals.context(MissingWals)?; let proxy = self.proxy.context(MissingProxy)?; let hotspot_recorder = self.hotspot_recorder.context(MissingHotspotRecorder)?; - - let meta_rpc_server = self.cluster.map(|v| { - let builder = meta_event_service::Builder { - cluster: v, - instance: instance.clone(), - runtime: runtimes.meta_runtime.clone(), - opened_wals, - }; - MetaEventServiceServer::new(builder.build()) - }); + let mut meta_rpc_server: Option> = None; + let mut compaction_rpc_server: Option> = + None; + + self.cluster + .map(|v| { + let result: Result<()> = (|| { + // Support meta rpc service. + let opened_wals = self.opened_wals.context(MissingWals)?; + let builder = meta_event_service::Builder { + cluster: v.clone(), + instance: instance.clone(), + runtime: runtimes.meta_runtime.clone(), + opened_wals, + }; + meta_rpc_server = Some(MetaEventServiceServer::new(builder.build())); + + // Support remote compaction rpc service. + let compaction_runner = + self.compaction_runner.context(MissingCompactionRunner)?; + let builder = compaction_service::Builder { + runtime: runtimes.compact_runtime.clone(), + compaction_runner, + }; + compaction_rpc_server = Some(CompactionServiceServer::new(builder.build())); + + Ok(()) + })(); + result + }) + .transpose()?; let remote_engine_server = { let query_dedup = self @@ -349,6 +391,7 @@ impl Builder { Ok(RpcServices { serve_addr, rpc_server, + compaction_rpc_server, meta_rpc_server, remote_engine_server, runtime, diff --git a/src/server/src/server.rs b/src/server/src/server.rs index f7cd72ec7b..bca6c8d151 100644 --- a/src/server/src/server.rs +++ b/src/server/src/server.rs @@ -19,6 +19,7 @@ use std::sync::Arc; +use analytic_engine::compaction::runner::CompactionRunnerRef; use catalog::manager::ManagerRef; use cluster::ClusterRef; use datafusion::execution::{runtime_env::RuntimeConfig, FunctionRegistry}; @@ -251,6 +252,7 @@ pub struct Builder { opened_wals: Option, remote_engine: Option, datatfusion_context: Option, + compaction_runner: Option, } impl Builder { @@ -274,6 +276,7 @@ impl Builder { opened_wals: None, remote_engine: None, datatfusion_context: None, + compaction_runner: None, } } @@ -368,6 +371,11 @@ impl Builder { self } + pub fn compaction_runner(mut self, runner: CompactionRunnerRef) -> Self { + self.compaction_runner = Some(runner); + self + } + /// Build and run the server pub fn build(self) -> Result { // Build instance @@ -527,6 +535,7 @@ impl Builder { .proxy(proxy) .hotspot_recorder(hotspot_recorder) .query_dedup(self.server_config.query_dedup) + .compaction_runner(self.compaction_runner.clone()) .build() .context(BuildGrpcService)?; diff --git a/src/table_engine/src/predicate.rs b/src/table_engine/src/predicate.rs index b316b99e24..3a3294fcd9 100644 --- a/src/table_engine/src/predicate.rs +++ b/src/table_engine/src/predicate.rs @@ -112,7 +112,7 @@ impl Predicate { impl TryFrom<&Predicate> for horaedbproto::remote_engine::Predicate { type Error = Error; - fn try_from(predicate: &Predicate) -> std::result::Result { + fn try_from(predicate: &Predicate) -> Result { let time_range = predicate.time_range; let mut exprs = Vec::with_capacity(predicate.exprs.len()); for expr in &predicate.exprs { @@ -135,9 +135,7 @@ impl TryFrom<&Predicate> for horaedbproto::remote_engine::Predicate { impl TryFrom for Predicate { type Error = Error; - fn try_from( - pb: horaedbproto::remote_engine::Predicate, - ) -> std::result::Result { + fn try_from(pb: horaedbproto::remote_engine::Predicate) -> Result { let time_range = pb.time_range.context(EmptyTimeRange)?; let mut exprs = Vec::with_capacity(pb.exprs.len()); for pb_expr in pb.exprs { diff --git a/src/table_engine/src/table.rs b/src/table_engine/src/table.rs index 3c611b4395..6526579ab5 100644 --- a/src/table_engine/src/table.rs +++ b/src/table_engine/src/table.rs @@ -307,6 +307,12 @@ impl From for TableId { } } +impl From for u64 { + fn from(id: TableId) -> Self { + id.0 + } +} + impl fmt::Display for TableId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0)