From 709b296c6e74c07341de57ed4594e1debe0a5e21 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Wed, 18 Dec 2024 18:05:16 +0800 Subject: [PATCH 1/2] feat: add demo server for benchmark --- Cargo.lock | 427 +++++++++++++++++- src/metric_engine/src/compaction/scheduler.rs | 49 +- src/metric_engine/src/storage.rs | 11 +- src/server/Cargo.toml | 5 +- src/server/src/main.rs | 58 ++- 5 files changed, 525 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index add6e1533a..ac89a9fb72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,189 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "actix-codec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f7b0a21988c1bf877cf4759ef5ddaac04c1c9fe808c9142ecb78ba97d97a28a" +dependencies = [ + "bitflags 2.6.0", + "bytes", + "futures-core", + "futures-sink", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "actix-http" +version = "3.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d48f96fc3003717aeb9856ca3d02a8c7de502667ad76eeacd830b48d2e91fac4" +dependencies = [ + "actix-codec", + "actix-rt", + "actix-service", + "actix-utils", + "ahash", + "base64", + "bitflags 2.6.0", + "brotli 6.0.0", + "bytes", + "bytestring", + "derive_more", + "encoding_rs", + "flate2", + "futures-core", + "h2", + "http", + "httparse", + "httpdate", + "itoa", + "language-tags", + "local-channel", + "mime", + "percent-encoding", + "pin-project-lite", + "rand", + "sha1", + "smallvec", + "tokio", + "tokio-util", + "tracing", + "zstd", +] + +[[package]] +name = "actix-macros" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" +dependencies = [ + "quote", + "syn", +] + +[[package]] +name = "actix-router" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13d324164c51f63867b57e73ba5936ea151b8a41a1d23d1031eeb9f70d0236f8" +dependencies = [ + "bytestring", + "cfg-if", + "http", + "regex", + "regex-lite", + "serde", + "tracing", +] + +[[package]] +name = "actix-rt" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24eda4e2a6e042aa4e55ac438a2ae052d3b5da0ecf83d7411e1a368946925208" +dependencies = [ + "futures-core", + "tokio", +] + +[[package]] +name = "actix-server" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ca2549781d8dd6d75c40cf6b6051260a2cc2f3c62343d761a969a0640646894" +dependencies = [ + "actix-rt", + "actix-service", + "actix-utils", + "futures-core", + "futures-util", + "mio", + "socket2", + "tokio", + "tracing", +] + +[[package]] +name = "actix-service" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b894941f818cfdc7ccc4b9e60fa7e53b5042a2e8567270f9147d5591893373a" +dependencies = [ + "futures-core", + "paste", + "pin-project-lite", +] + +[[package]] +name = "actix-utils" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88a1dcdff1466e3c2488e1cb5c36a71822750ad43839937f85d2f4d9f8b705d8" +dependencies = [ + "local-waker", + "pin-project-lite", +] + +[[package]] +name = "actix-web" +version = "4.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9180d76e5cc7ccbc4d60a506f2c727730b154010262df5b910eb17dbe4b8cb38" +dependencies = [ + "actix-codec", + "actix-http", + "actix-macros", + "actix-router", + "actix-rt", + "actix-server", + "actix-service", + "actix-utils", + "actix-web-codegen", + "ahash", + "bytes", + "bytestring", + "cfg-if", + "cookie", + "derive_more", + "encoding_rs", + "futures-core", + "futures-util", + "impl-more", + "itoa", + "language-tags", + "log", + "mime", + "once_cell", + "pin-project-lite", + "regex", + "regex-lite", + "serde", + "serde_json", + "serde_urlencoded", + "smallvec", + "socket2", + "time", + "url", +] + +[[package]] +name = "actix-web-codegen" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f591380e2e68490b5dfaf1dd1aa0ebe78d84ba7067078512b4ea6e4492d622b8" +dependencies = [ + "actix-router", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "addr2line" version = "0.24.1" @@ -500,6 +683,17 @@ dependencies = [ "generic-array", ] +[[package]] +name = "brotli" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + [[package]] name = "brotli" version = "7.0.0" @@ -545,6 +739,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc" +[[package]] +name = "bytestring" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e465647ae23b2823b0753f50decb2d5a86d2bb2cac04788fafd1f80e45378e5f" +dependencies = [ + "bytes", +] + [[package]] name = "bzip2" version = "0.4.4" @@ -717,6 +920,23 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + +[[package]] +name = "cookie" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e859cd57d0710d9e06c381b550c06e76992472a8c6d527aecd2fc673dcc231fb" +dependencies = [ + "percent-encoding", + "time", + "version_check", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -1258,6 +1478,28 @@ dependencies = [ "strum", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", +] + +[[package]] +name = "derive_more" +version = "0.99.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f33878137e4dafd7fa914ad4e259e18a4e8e532b9617a2d0150262bf53abfce" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version", + "syn", +] + [[package]] name = "digest" version = "0.10.7" @@ -1275,6 +1517,15 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "env_filter" version = "0.1.2" @@ -1344,6 +1595,12 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1475,6 +1732,25 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "h2" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.4.1" @@ -1521,16 +1797,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] -name = "horaedb-server" -version = "2.2.0-alpha" +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" dependencies = [ - "futures", - "metric_engine", - "tokio", - "tracing", - "tracing-subscriber", + "bytes", + "fnv", + "itoa", ] +[[package]] +name = "httparse" +version = "1.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d71d3574edd2771538b901e6549113b4006ece66150fb69c0fb6d9a2adae946" + +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "humantime" version = "2.1.0" @@ -1570,6 +1858,12 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "impl-more" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aae21c3177a27788957044151cc2800043d127acaa460a47ebb9b84dfa2c6aa0" + [[package]] name = "indexmap" version = "2.5.0" @@ -1663,6 +1957,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "language-tags" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388" + [[package]] name = "lazy_static" version = "1.5.0" @@ -1751,6 +2051,23 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "local-channel" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6cbc85e69b8df4b8bb8b89ec634e7189099cea8927a276b7384ce5488e53ec8" +dependencies = [ + "futures-core", + "futures-sink", + "local-waker", +] + +[[package]] +name = "local-waker" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d873d7c67ce09b42110d801813efbc9364414e356be9935700d368351657487" + [[package]] name = "lock_api" version = "0.4.12" @@ -1840,6 +2157,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "miniz_oxide" version = "0.8.0" @@ -1857,6 +2180,7 @@ checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ "hermit-abi 0.3.9", "libc", + "log", "wasi", "windows-sys 0.52.0", ] @@ -1910,6 +2234,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.46" @@ -2056,7 +2386,7 @@ dependencies = [ "arrow-schema", "arrow-select", "base64", - "brotli", + "brotli 7.0.0", "bytes", "chrono", "flate2", @@ -2220,6 +2550,12 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.20" @@ -2401,6 +2737,12 @@ dependencies = [ "regex-syntax 0.8.4", ] +[[package]] +name = "regex-lite" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" + [[package]] name = "regex-syntax" version = "0.6.29" @@ -2521,6 +2863,43 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "server" +version = "2.2.0-alpha" +dependencies = [ + "actix-web", + "arrow", + "futures", + "metric_engine", + "object_store", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.8" @@ -2762,6 +3141,37 @@ dependencies = [ "ordered-float", ] +[[package]] +name = "time" +version = "0.3.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2834e6017e3e5e4b9834939793b282bc03b37a3336245fa820e35e233e2a85de" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tiny-keccak" version = "2.0.2" @@ -2878,6 +3288,7 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/src/metric_engine/src/compaction/scheduler.rs b/src/metric_engine/src/compaction/scheduler.rs index 3aff52c972..a3d286a2e9 100644 --- a/src/metric_engine/src/compaction/scheduler.rs +++ b/src/metric_engine/src/compaction/scheduler.rs @@ -17,13 +17,14 @@ use std::{sync::Arc, time::Duration}; +use anyhow::Context; use parquet::file::properties::WriterProperties; use tokio::{ sync::mpsc::{self, Receiver, Sender}, task::JoinHandle, time::sleep, }; -use tracing::warn; +use tracing::{info, warn}; use super::{executor::Executor, picker::Picker}; use crate::{ @@ -32,13 +33,14 @@ use crate::{ read::ParquetReader, sst::SstPathGenerator, types::{ObjectStoreRef, RuntimeRef, StorageSchema}, + Result, }; #[allow(dead_code)] pub struct Scheduler { runtime: RuntimeRef, - task_tx: Sender, + trigger_tx: Sender<()>, task_handle: JoinHandle<()>, picker_handle: JoinHandle<()>, } @@ -56,6 +58,7 @@ impl Scheduler { config: SchedulerConfig, ) -> Self { let (task_tx, task_rx) = mpsc::channel(config.max_pending_compaction_tasks); + let (trigger_tx, trigger_rx) = mpsc::channel::<()>(1); let task_handle = { let store = store.clone(); let manifest = manifest.clone(); @@ -76,7 +79,6 @@ impl Scheduler { }) }; let picker_handle = { - let task_tx = task_tx.clone(); runtime.spawn(async move { let picker = Picker::new( manifest, @@ -85,18 +87,27 @@ impl Scheduler { config.new_sst_max_size, config.input_sst_max_num, ); - Self::generate_task_loop(task_tx, picker, config.schedule_interval).await; + Self::generate_task_loop(task_tx, trigger_rx, picker, config.schedule_interval) + .await; }) }; Self { runtime, - task_tx, + trigger_tx, task_handle, picker_handle, } } + pub fn trigger_compaction(&self) -> Result<()> { + self.trigger_tx + .try_send(()) + .context("send trigger signal failed")?; + + Ok(()) + } + async fn recv_task_loop(mut task_rx: Receiver, executor: Executor) { while let Some(task) = task_rx.recv().await { executor.submit(task); @@ -105,17 +116,35 @@ impl Scheduler { async fn generate_task_loop( task_tx: Sender, + mut trigger_rx: Receiver<()>, picker: Picker, schedule_interval: Duration, ) { + info!( + schedule_interval = ?schedule_interval, + "Scheduler generate task started" + ); loop { - if let Some(task) = picker.pick_candidate().await { - if let Err(e) = task_tx.try_send(task) { - warn!("Send task failed, err:{e}"); + tokio::select! { + _ = sleep(schedule_interval) => { + if let Some(task) = picker.pick_candidate().await { + if let Err(e) = task_tx.try_send(task) { + warn!("Send task failed, err:{e}"); + } + } + } + signal = trigger_rx.recv() => { + if signal.is_none() { + info!("Scheduler generate task stopped"); + break; + } + if let Some(task) = picker.pick_candidate().await { + if let Err(e) = task_tx.try_send(task) { + warn!("Send task failed, err:{e}"); + } + } } } - - sleep(schedule_interval).await; } } } diff --git a/src/metric_engine/src/storage.rs b/src/metric_engine/src/storage.rs index f15d67fff0..e5b6acf0fa 100644 --- a/src/metric_engine/src/storage.rs +++ b/src/metric_engine/src/storage.rs @@ -69,12 +69,13 @@ pub struct WriteRequest { } pub struct ScanRequest { - range: TimeRange, - predicate: Vec, + pub range: TimeRange, + pub predicate: Vec, /// `None` means all columns. - projections: Option>, + pub projections: Option>, } +#[derive(Default)] pub struct CompactRequest {} /// Time-aware merge storage interface. @@ -91,6 +92,8 @@ pub trait TimeMergeStorage { async fn compact(&self, req: CompactRequest) -> Result<()>; } +pub type TimeMergeStorageRef = Arc<(dyn TimeMergeStorage + Send + Sync)>; + #[derive(Clone)] struct StorageRuntimes { manifest_compact_runtime: Arc, @@ -416,7 +419,7 @@ impl TimeMergeStorage for CloudObjectStorage { } async fn compact(&self, _req: CompactRequest) -> Result<()> { - todo!() + self.compact_scheduler.trigger_compaction() } } diff --git a/src/server/Cargo.toml b/src/server/Cargo.toml index 65dbc0bc18..454c168894 100644 --- a/src/server/Cargo.toml +++ b/src/server/Cargo.toml @@ -16,7 +16,7 @@ # under the License. [package] -name = "horaedb-server" +name = "server" [package.license] workspace = true @@ -31,8 +31,11 @@ workspace = true workspace = true [dependencies] +actix-web = "4" futures = { workspace = true } metric_engine = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } +object_store = { workspace = true } +arrow = { workspace = true } diff --git a/src/server/src/main.rs b/src/server/src/main.rs index 69260e1a8e..99c109a6c6 100644 --- a/src/server/src/main.rs +++ b/src/server/src/main.rs @@ -15,12 +15,66 @@ // specific language governing permissions and limitations // under the License. +#![feature(duration_constructors)] +use std::{sync::Arc, time::Duration}; + +use actix_web::{ + get, + web::{self, Data}, + App, HttpResponse, HttpServer, Responder, +}; +use metric_engine::{ + storage::{CloudObjectStorage, CompactRequest, TimeMergeStorageRef}, + types::StorageOptions, +}; +use object_store::local::LocalFileSystem; use tracing::info; -fn main() { +#[get("/")] +async fn hello() -> impl Responder { + HttpResponse::Ok().body("Hello world!") +} + +#[get("/compact")] +async fn compact(data: web::Data) -> impl Responder { + if let Err(e) = data.storage.compact(CompactRequest::default()).await { + println!("compact failed, err:{e}"); + } + HttpResponse::Ok().body("Task submit!") +} + +struct AppState { + storage: TimeMergeStorageRef, +} + +#[actix_web::main] +async fn main() -> std::io::Result<()> { // install global collector configured based on RUST_LOG env var. tracing_subscriber::fmt::init(); let port = 5000; - info!(port, "Start horaedb server..."); + let schema = todo!(); + let store = Arc::new(LocalFileSystem::new()); + let storage = Arc::new( + CloudObjectStorage::try_new( + "/tmp/test".to_string(), + Duration::from_mins(10), + store, + schema, + 2, + StorageOptions::default(), + ) + .unwrap(), + ); + let app_state = Data::new(AppState { storage }); + info!(port, "Start HoraeDB http server..."); + HttpServer::new(move || { + App::new() + .app_data(app_state.clone()) + .service(hello) + .service(compact) + }) + .bind(("127.0.0.1", port))? + .run() + .await } From 1c9ba66949062edcbe33abf1fcc8aac879973b67 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Wed, 18 Dec 2024 18:20:07 +0800 Subject: [PATCH 2/2] fix ci --- src/server/Cargo.toml | 4 ++-- src/server/src/main.rs | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/server/Cargo.toml b/src/server/Cargo.toml index 454c168894..03ad792971 100644 --- a/src/server/Cargo.toml +++ b/src/server/Cargo.toml @@ -32,10 +32,10 @@ workspace = true [dependencies] actix-web = "4" +arrow = { workspace = true } futures = { workspace = true } metric_engine = { workspace = true } +object_store = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } -object_store = { workspace = true } -arrow = { workspace = true } diff --git a/src/server/src/main.rs b/src/server/src/main.rs index 99c109a6c6..c02bc93900 100644 --- a/src/server/src/main.rs +++ b/src/server/src/main.rs @@ -23,6 +23,7 @@ use actix_web::{ web::{self, Data}, App, HttpResponse, HttpServer, Responder, }; +use arrow::datatypes::{DataType, Field, Schema}; use metric_engine::{ storage::{CloudObjectStorage, CompactRequest, TimeMergeStorageRef}, types::StorageOptions, @@ -53,7 +54,11 @@ async fn main() -> std::io::Result<()> { tracing_subscriber::fmt::init(); let port = 5000; - let schema = todo!(); + let schema = Arc::new(Schema::new(vec![ + Field::new("pk1", DataType::Int64, true), + Field::new("pk2", DataType::Int64, true), + Field::new("value", DataType::Int64, true), + ])); let store = Arc::new(LocalFileSystem::new()); let storage = Arc::new( CloudObjectStorage::try_new(