diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c88221af7a..73775bc6bb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -67,6 +67,7 @@ jobs: - name: Install check binaries run: | cargo install --git https://github.com/DevinR528/cargo-sort --rev 55ec890 --locked + - uses: Swatinem/rust-cache@v2 - name: Run Style Check run: | make fmt sort clippy @@ -80,8 +81,6 @@ jobs: timeout-minutes: 60 steps: - uses: actions/checkout@v4 - with: - submodules: true - name: Release Disk Quota run: | sudo make ensure-disk-quota @@ -89,6 +88,7 @@ jobs: run: | sudo apt update sudo apt install --yes protobuf-compiler + - uses: Swatinem/rust-cache@v2 - name: Run Unit Tests run: | make test diff --git a/Makefile b/Makefile index b39aeb18a8..6b4f287f91 100644 --- a/Makefile +++ b/Makefile @@ -47,8 +47,7 @@ udeps: cd $(DIR); cargo udeps --all-targets --all-features --workspace clippy: - cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D warnings -D clippy::dbg-macro \ - -A dead_code -A unused_variables -A clippy::unreachable -A clippy::too_many_arguments # Remove these once we have a clean build + cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D warnings -D clippy::dbg-macro -A clippy::too-many-arguments ensure-disk-quota: bash ./scripts/free-disk-space.sh diff --git a/src/metric_engine/src/compaction/runner.rs b/src/metric_engine/src/compaction/executor.rs similarity index 55% rename from src/metric_engine/src/compaction/runner.rs rename to src/metric_engine/src/compaction/executor.rs index ce6f170ed8..9f41ec8202 100644 --- a/src/metric_engine/src/compaction/runner.rs +++ b/src/metric_engine/src/compaction/executor.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; use anyhow::Context; use arrow::array::{RecordBatch, UInt64Array}; @@ -31,45 +34,59 @@ use tracing::error; use crate::{ compaction::Task, - manifest::ManifestRef, + ensure, + manifest::{ManifestRef, ManifestUpdate}, read::ParquetReader, - sst::{allocate_id, FileMeta, SstPathGenerator}, - types::{ObjectStoreRef, StorageSchema}, + sst::{allocate_id, FileMeta, SstFile, SstPathGenerator}, + types::{ObjectStoreRef, RuntimeRef, StorageSchema}, Result, }; #[derive(Clone)] -pub struct Runner { +pub struct Executor { + inner: Arc, +} + +struct Inner { + runtime: RuntimeRef, store: ObjectStoreRef, schema: StorageSchema, manifest: ManifestRef, sst_path_gen: Arc, parquet_reader: Arc, write_props: WriterProperties, + inused_memory: AtomicU64, + mem_limit: u64, } -impl Runner { +impl Executor { pub fn new( + runtime: RuntimeRef, store: ObjectStoreRef, schema: StorageSchema, manifest: ManifestRef, sst_path_gen: Arc, parquet_reader: Arc, write_props: WriterProperties, + mem_limit: u64, ) -> Self { - Self { + let inner = Inner { + runtime, store, schema, manifest, sst_path_gen, parquet_reader, write_props, + mem_limit, + inused_memory: AtomicU64::new(0), + }; + Self { + inner: Arc::new(inner), } } - // TODO: Merge input sst files into one new sst file - // and delete the expired sst files - pub async fn do_compaction(&self, task: Task) -> Result<()> { + fn pre_check(&self, task: &Task) -> Result<()> { assert!(!task.inputs.is_empty()); for f in &task.inputs { assert!(f.is_compaction()); @@ -77,24 +94,77 @@ impl Runner { for f in &task.expireds { assert!(f.is_compaction()); } + + let task_size = task.input_size(); + let inused = self.inner.inused_memory.load(Ordering::Relaxed); + let mem_limit = self.inner.mem_limit; + ensure!( + inused + task_size > mem_limit, + "Compaction memory usage too high, inused:{inused}, task_size:{task_size}, limit:{mem_limit}" + ); + + self.inner + .inused_memory + .fetch_add(task.input_size(), Ordering::Relaxed); + Ok(()) + } + + pub fn on_success(&self, task: &Task) { + let task_size = task.input_size(); + self.inner + .inused_memory + .fetch_add(task_size, Ordering::Relaxed); + } + + pub fn on_failure(&self, task: &Task) { + let task_size = task.input_size(); + self.inner + .inused_memory + .fetch_sub(task_size, Ordering::Relaxed); + + // When task execution fails, unmark sst so they can be + // reschduled. + for sst in &task.inputs { + sst.unmark_compaction(); + } + for sst in &task.expireds { + sst.unmark_compaction(); + } + } + + pub fn submit(&self, task: Task) { + let runnable = Runnable { + executor: self.clone(), + task, + }; + runnable.run() + } + + // TODO: Merge input sst files into one new sst file + // and delete the expired sst files + pub async fn do_compaction(&self, task: &Task) -> Result<()> { + self.pre_check(task)?; + let mut time_range = task.inputs[0].meta().time_range.clone(); for f in &task.inputs[1..] { time_range.merge(&f.meta().time_range); } - let plan = self - .parquet_reader - .build_df_plan(task.inputs.clone(), None, Vec::new())?; + let plan = + self.inner + .parquet_reader + .build_df_plan(task.inputs.clone(), None, Vec::new())?; let mut stream = execute_stream(plan, Arc::new(TaskContext::default())) .context("execute datafusion plan")?; let file_id = allocate_id(); - let file_path = self.sst_path_gen.generate(file_id); + let file_path = self.inner.sst_path_gen.generate(file_id); let file_path = Path::from(file_path); - let object_store_writer = ParquetObjectWriter::new(self.store.clone(), file_path.clone()); + let object_store_writer = + ParquetObjectWriter::new(self.inner.store.clone(), file_path.clone()); let mut writer = AsyncArrowWriter::try_new( object_store_writer, - self.schema.arrow_schema.clone(), - Some(self.write_props.clone()), + self.inner.schema.arrow_schema.clone(), + Some(self.inner.write_props.clone()), ) .context("create arrow writer")?; let mut num_rows = 0; @@ -107,7 +177,7 @@ impl Runner { // Since file_id in increasing order, we can use it as sequence. let seq_column = Arc::new(UInt64Array::from(vec![file_id; batch.num_rows()])); new_cols.push(seq_column); - RecordBatch::try_new(self.schema.arrow_schema.clone(), new_cols) + RecordBatch::try_new(self.inner.schema.arrow_schema.clone(), new_cols) .context("construct record batch with seq column")? }; @@ -115,6 +185,7 @@ impl Runner { } writer.close().await.context("close writer")?; let object_meta = self + .inner .store .head(&file_path) .await @@ -126,28 +197,37 @@ impl Runner { time_range: time_range.clone(), }; // First add new sst to manifest, then delete expired/old sst - self.manifest.add_file(file_id, file_meta).await?; - self.manifest - .add_tombstone_files(task.expireds.clone()) - .await?; - self.manifest - .add_tombstone_files(task.inputs.clone()) + let to_adds = vec![SstFile::new(file_id, file_meta)]; + let to_deletes = task + .expireds + .iter() + .map(|f| f.id()) + .chain(task.inputs.iter().map(|f| f.id())) + .collect(); + self.inner + .manifest + .update(ManifestUpdate::new(to_adds, to_deletes)) .await?; + // From now on, no error should be returned! + // Because we have already updated manifest. + let (_, results) = TokioScope::scope_and_block(|scope| { - for file in task.expireds { - let path = Path::from(self.sst_path_gen.generate(file.id())); + for file in &task.expireds { + let path = Path::from(self.inner.sst_path_gen.generate(file.id())); scope.spawn(async move { - self.store + self.inner + .store .delete(&path) .await .with_context(|| format!("failed to delete file, path:{path}")) }); } - for file in task.inputs { - let path = Path::from(self.sst_path_gen.generate(file.id())); + for file in &task.inputs { + let path = Path::from(self.inner.sst_path_gen.generate(file.id())); scope.spawn(async move { - self.store + self.inner + .store .delete(&path) .await .with_context(|| format!("failed to delete file, path:{path}")) @@ -170,3 +250,22 @@ impl Runner { Ok(()) } } + +pub struct Runnable { + executor: Executor, + task: Task, +} + +impl Runnable { + fn run(self) { + let rt = self.executor.inner.runtime.clone(); + rt.spawn(async move { + if let Err(e) = self.executor.do_compaction(&self.task).await { + error!("Do compaction failed, err:{e}"); + self.executor.on_failure(&self.task); + } else { + self.executor.on_success(&self.task); + } + }); + } +} diff --git a/src/metric_engine/src/compaction/mod.rs b/src/metric_engine/src/compaction/mod.rs index 62beda0a40..38a37ebec4 100644 --- a/src/metric_engine/src/compaction/mod.rs +++ b/src/metric_engine/src/compaction/mod.rs @@ -15,21 +15,22 @@ // specific language governing permissions and limitations // under the License. +mod executor; mod picker; -mod runner; mod scheduler; pub use scheduler::{Scheduler as CompactionScheduler, SchedulerConfig}; use crate::sst::SstFile; -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Input { - files: Vec, -} - #[derive(Debug, Clone, PartialEq, Eq)] pub struct Task { pub inputs: Vec, pub expireds: Vec, } + +impl Task { + pub fn input_size(&self) -> u64 { + self.inputs.iter().map(|f| f.size() as u64).sum() + } +} diff --git a/src/metric_engine/src/compaction/picker.rs b/src/metric_engine/src/compaction/picker.rs index 913ad31736..9107e3ba60 100644 --- a/src/metric_engine/src/compaction/picker.rs +++ b/src/metric_engine/src/compaction/picker.rs @@ -19,19 +19,56 @@ use std::{collections::BTreeMap, time::Duration}; use tracing::debug; -use super::SchedulerConfig; -use crate::{compaction::Task, sst::SstFile, types::Timestamp}; +use crate::{compaction::Task, manifest::ManifestRef, sst::SstFile, types::Timestamp, util::now}; + +pub struct Picker { + manifest: ManifestRef, + ttl: Option, + strategy: TimeWindowCompactionStrategy, +} + +impl Picker { + pub fn new( + manifest: ManifestRef, + ttl: Option, + segment_duration: Duration, + new_sst_max_size: u64, + input_sst_max_num: usize, + ) -> Self { + Self { + manifest, + ttl, + strategy: TimeWindowCompactionStrategy::new( + segment_duration, + new_sst_max_size, + input_sst_max_num, + ), + } + } + + pub async fn pick_candidate(&self) -> Option { + let ssts = self.manifest.all_ssts().await; + let expire_time = self.ttl.map(|ttl| (now() - ttl.as_micros() as i64).into()); + self.strategy.pick_candidate(ssts, expire_time) + } +} pub struct TimeWindowCompactionStrategy { segment_duration: Duration, - config: SchedulerConfig, + new_sst_max_size: u64, + input_sst_max_num: usize, } impl TimeWindowCompactionStrategy { - pub fn new(segment_duration: Duration, config: SchedulerConfig) -> Self { + pub fn new( + segment_duration: Duration, + new_sst_max_size: u64, + input_sst_max_num: usize, + ) -> Self { Self { segment_duration, - config, + new_sst_max_size, + input_sst_max_num, } } @@ -120,12 +157,12 @@ impl TimeWindowCompactionStrategy { files.sort_unstable_by_key(SstFile::size); let mut input_size = 0; - let memory_limit = self.config.memory_limit; - let compaction_files_limit = self.config.compaction_files_limit; + // Suppose the comaction will reduce the size of files by 10%. + let memory_limit = (self.new_sst_max_size as f64 * 1.1) as u64; let compaction_files = files .into_iter() - .take(compaction_files_limit) + .take(self.input_sst_max_num) .take_while(|f| { input_size += f.size() as u64; input_size <= memory_limit @@ -154,8 +191,7 @@ mod tests { #[test] fn test_pick_candidate() { let segment_duration = Duration::from_millis(20); - let config = SchedulerConfig::default(); - let strategy = TimeWindowCompactionStrategy::new(segment_duration, config); + let strategy = TimeWindowCompactionStrategy::new(segment_duration, 9999, 10); let ssts = (0_i64..5_i64) .map(|i| { diff --git a/src/metric_engine/src/compaction/scheduler.rs b/src/metric_engine/src/compaction/scheduler.rs index 5e2c407a49..4832bf96dc 100644 --- a/src/metric_engine/src/compaction/scheduler.rs +++ b/src/metric_engine/src/compaction/scheduler.rs @@ -15,12 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::{ - sync::{atomic::AtomicU64, Arc}, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; -use anyhow::Context; use parquet::file::properties::WriterProperties; use tokio::{ sync::mpsc::{self, Receiver, Sender}, @@ -29,21 +25,20 @@ use tokio::{ }; use tracing::warn; -use super::runner::Runner; +use super::{executor::Executor, picker::Picker}; use crate::{ - compaction::{picker::TimeWindowCompactionStrategy, Task}, + compaction::Task, manifest::ManifestRef, read::ParquetReader, sst::SstPathGenerator, types::{ObjectStoreRef, RuntimeRef, StorageSchema}, - Result, }; +#[allow(dead_code)] pub struct Scheduler { runtime: RuntimeRef, task_tx: Sender, - inused_memory: AtomicU64, task_handle: JoinHandle<()>, picker_handle: JoinHandle<()>, } @@ -61,29 +56,35 @@ impl Scheduler { ) -> Self { let (task_tx, task_rx) = mpsc::channel(config.max_pending_compaction_tasks); let task_handle = { - let rt = runtime.clone(); let store = store.clone(); let manifest = manifest.clone(); let write_props = config.write_props.clone(); + let executor = Executor::new( + runtime.clone(), + store, + schema, + manifest, + sst_path_gen, + parquet_reader, + write_props, + config.memory_limit, + ); + runtime.spawn(async move { - Self::recv_task_loop( - rt, - task_rx, - store, - schema, - manifest, - sst_path_gen, - parquet_reader, - config.memory_limit, - write_props, - ) - .await; + Self::recv_task_loop(task_rx, executor).await; }) }; let picker_handle = { let task_tx = task_tx.clone(); runtime.spawn(async move { - Self::generate_task_loop(manifest, task_tx, segment_duration, config).await; + let picker = Picker::new( + manifest, + config.ttl, + segment_duration, + config.new_sst_max_size, + config.input_sst_max_num, + ); + Self::generate_task_loop(task_tx, picker, config.schedule_interval).await; }) }; @@ -92,60 +93,22 @@ impl Scheduler { task_tx, task_handle, picker_handle, - inused_memory: AtomicU64::new(0), } } - pub fn try_send(&self, task: Task) -> Result<()> { - self.task_tx - .try_send(task) - .context("failed to send task to scheduler")?; - - Ok(()) - } - - async fn recv_task_loop( - rt: RuntimeRef, - mut task_rx: Receiver, - store: ObjectStoreRef, - schema: StorageSchema, - manifest: ManifestRef, - sst_path_gen: Arc, - parquet_reader: Arc, - _mem_limit: u64, - write_props: WriterProperties, - ) { - let runner = Runner::new( - store, - schema, - manifest, - sst_path_gen, - parquet_reader, - write_props, - ); + async fn recv_task_loop(mut task_rx: Receiver, executor: Executor) { while let Some(task) = task_rx.recv().await { - let runner = runner.clone(); - rt.spawn(async move { - if let Err(e) = runner.do_compaction(task).await { - warn!("Do compaction failed, err:{e}"); - } - }); + executor.submit(task); } } async fn generate_task_loop( - manifest: ManifestRef, task_tx: Sender, - segment_duration: Duration, - config: SchedulerConfig, + picker: Picker, + schedule_interval: Duration, ) { - let schedule_interval = config.schedule_interval; - let compactor = TimeWindowCompactionStrategy::new(segment_duration, config); - // TODO: obtain expire time - let expire_time = None; loop { - let ssts = manifest.all_ssts().await; - if let Some(task) = compactor.pick_candidate(ssts, expire_time) { + if let Some(task) = picker.pick_candidate().await { if let Err(e) = task_tx.try_send(task) { warn!("Send task failed, err:{e}"); } @@ -159,20 +122,26 @@ impl Scheduler { #[derive(Clone)] pub struct SchedulerConfig { pub schedule_interval: Duration, - pub memory_limit: u64, pub max_pending_compaction_tasks: usize, - pub compaction_files_limit: usize, + // Runner config + pub memory_limit: u64, pub write_props: WriterProperties, + // Picker config + pub ttl: Option, + pub new_sst_max_size: u64, + pub input_sst_max_num: usize, } impl Default for SchedulerConfig { fn default() -> Self { Self { schedule_interval: Duration::from_secs(30), - memory_limit: bytesize::gb(2_u64), max_pending_compaction_tasks: 10, - compaction_files_limit: 10, + memory_limit: bytesize::gb(3_u64), write_props: WriterProperties::default(), + ttl: None, + new_sst_max_size: bytesize::gb(1_u64), + input_sst_max_num: 10, } } } diff --git a/src/metric_engine/src/lib.rs b/src/metric_engine/src/lib.rs index 26580cb340..c9e1235913 100644 --- a/src/metric_engine/src/lib.rs +++ b/src/metric_engine/src/lib.rs @@ -29,5 +29,6 @@ pub mod storage; #[cfg(test)] mod test_util; pub mod types; +pub(crate) mod util; pub use error::{AnyhowError, Error, Result}; diff --git a/src/metric_engine/src/manifest.rs b/src/metric_engine/src/manifest.rs index 1a50b90ecb..17b188d601 100644 --- a/src/metric_engine/src/manifest.rs +++ b/src/metric_engine/src/manifest.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. +mod encoding; use std::{ - collections::HashSet, io::{Cursor, Write}, sync::{ atomic::{AtomicUsize, Ordering}, @@ -29,16 +29,14 @@ use anyhow::Context; use async_scoped::TokioScope; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use bytes::Bytes; +pub use encoding::ManifestUpdate; use futures::{StreamExt, TryStreamExt}; use object_store::{path::Path, PutPayload}; use parquet::data_type::AsBytes; use prost::Message; -use tokio::{ - runtime::Runtime, - sync::{ - mpsc::{self, Receiver, Sender}, - RwLock, - }, +use tokio::sync::{ + mpsc::{self, Receiver, Sender}, + RwLock, }; use tracing::error; use uuid::Uuid; @@ -53,89 +51,31 @@ use crate::{ pub const PREFIX_PATH: &str = "manifest"; pub const SNAPSHOT_FILENAME: &str = "snapshot"; pub const DELTA_PREFIX: &str = "delta"; -pub const TOMBSTONE_PREFIX: &str = "tombstone"; pub type ManifestRef = Arc; pub struct Manifest { delta_dir: Path, - tombstone_dir: Path, store: ObjectStoreRef, merger: Arc, - payload: RwLock, -} - -#[derive(Default)] -pub struct Payload { - files: Vec, -} - -impl Payload { - // TODO: we could sort sst files by name asc, then the dedup will be more - // efficient - pub fn dedup_files(&mut self) { - let mut seen = HashSet::with_capacity(self.files.len()); - self.files.retain(|file| seen.insert(file.id())); - } -} - -impl TryFrom for Payload { - type Error = Error; - - fn try_from(value: Bytes) -> Result { - if value.is_empty() { - Ok(Self::default()) - } else { - let snapshot = Snapshot::try_from(value)?; - let files = snapshot.to_sstfiles()?; - Ok(Self { files }) - } - } -} - -impl TryFrom for Payload { - type Error = Error; - - fn try_from(value: pb_types::Manifest) -> Result { - let files = value - .files - .into_iter() - .map(SstFile::try_from) - .collect::>>()?; - - Ok(Self { files }) - } -} - -impl From for pb_types::Manifest { - fn from(value: Payload) -> Self { - pb_types::Manifest { - files: value - .files - .into_iter() - .map(pb_types::SstFile::from) - .collect(), - } - } + ssts: RwLock>, } impl Manifest { pub async fn try_new( root_dir: String, store: ObjectStoreRef, - runtime: Arc, + runtime: RuntimeRef, merge_options: ManifestMergeOptions, ) -> Result { let snapshot_path = Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}")); let delta_dir = Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}")); - let tombstone_dir = Path::from(format!("{root_dir}/{PREFIX_PATH}/{TOMBSTONE_PREFIX}")); let merger = ManifestMerger::try_new( snapshot_path.clone(), delta_dir.clone(), store.clone(), - runtime.clone(), merge_options, ) .await?; @@ -148,42 +88,46 @@ impl Manifest { }); } - let bytes = read_object(&store, &snapshot_path).await?; - // TODO: add upgrade logic - let payload = bytes.try_into()?; + let snapshot = read_snapshot(&store, &snapshot_path).await?; + let ssts = snapshot.into_ssts(); Ok(Self { delta_dir, - tombstone_dir, store, merger, - payload: RwLock::new(payload), + ssts: RwLock::new(ssts), }) } pub async fn add_file(&self, id: FileId, meta: FileMeta) -> Result<()> { - self.merger.maybe_schedule_merge().await?; - - let new_sst_path = Path::from(format!("{}/{id}", self.delta_dir)); - let new_sst = SstFile::new(id, meta); + let update = ManifestUpdate::new(vec![SstFile::new(id, meta)], Vec::new()); + self.update(update).await + } - let new_sst_payload = pb_types::SstFile::from(new_sst.clone()); - let mut buf: Vec = Vec::with_capacity(new_sst_payload.encoded_len()); - new_sst_payload + pub async fn update(&self, update: ManifestUpdate) -> Result<()> { + self.merger.maybe_schedule_merge().await?; + let path = Path::from(format!("{}/{}", self.delta_dir, Uuid::new_v4())); + let pb_update = pb_types::ManifestUpdate::from(update.clone()); + let mut buf: Vec = Vec::with_capacity(pb_update.encoded_len()); + pb_update .encode(&mut buf) - .context("failed to encode manifest file")?; - let put_payload = PutPayload::from_bytes(Bytes::from(buf)); + .context("failed to encode manifest update")?; // 1. Persist the delta manifest self.store - .put(&new_sst_path, put_payload) + .put(&path, PutPayload::from_bytes(Bytes::from(buf))) .await - .with_context(|| format!("Failed to write delta manifest, path:{}", new_sst_path))?; + .with_context(|| format!("Failed to write delta manifest, path:{}", path))?; // 2. Update cached payload { - let mut payload = self.payload.write().await; - payload.files.push(new_sst); + let mut ssts = self.ssts.write().await; + for file in update.to_adds { + ssts.push(file); + } + // TODO: sort files in payload, so we can delete files more + // efficiently. + ssts.retain(|file| !update.to_deletes.contains(&file.id())); } // 3. Update delta files num @@ -192,27 +136,16 @@ impl Manifest { Ok(()) } - // TODO: recovery tombstone files when startup - pub async fn add_tombstone_files(&self, files: I) -> Result<()> - where - I: IntoIterator, - { - let path = Path::from(format!("{}/{}", self.tombstone_dir, Uuid::new_v4())); - todo!() - } - // TODO: avoid clone pub async fn all_ssts(&self) -> Vec { - let payload = self.payload.read().await; - payload.files.clone() + let ssts = self.ssts.read().await; + ssts.clone() } pub async fn find_ssts(&self, time_range: &TimeRange) -> Vec { - let payload = self.payload.read().await; + let ssts = self.ssts.read().await; - payload - .files - .iter() + ssts.iter() .filter(move |f| f.meta().time_range.overlaps(time_range)) .cloned() .collect() @@ -231,6 +164,7 @@ impl Manifest { /// - The length field (u64) represents the total length of the subsequent /// records and serves as a straightforward method for verifying their /// integrity. (length = record_length * record_count) +#[derive(Debug)] struct SnapshotHeader { pub magic: u32, pub version: u8, @@ -279,13 +213,10 @@ impl SnapshotHeader { } } - pub fn write_to(&self, writer: &mut &mut [u8]) -> Result<()> { - ensure!( - writer.len() >= SnapshotHeader::LENGTH, - "writer buf is too small for writing the header, length: {}", - writer.len() - ); - + pub fn write_to(&self, mut writer: W) -> Result<()> + where + W: Write, + { writer .write_u32::(self.magic) .context("write shall not fail.")?; @@ -319,13 +250,10 @@ impl SnapshotRecordV1 { const LENGTH: usize = 8 /*id*/+ 16 /*time range*/ + 4 /*size*/ + 4 /*num rows*/; pub const VERSION: u8 = 1; - pub fn write_to(&self, writer: &mut &mut [u8]) -> Result<()> { - ensure!( - writer.len() >= SnapshotRecordV1::LENGTH, - "writer buf is too small for writing the record, length: {}", - writer.len() - ); - + pub fn write_to(&self, mut writer: W) -> Result<()> + where + W: Write, + { writer .write_u64::(self.id) .context("write shall not fail.")?; @@ -399,7 +327,7 @@ impl From for SstFile { struct Snapshot { header: SnapshotHeader, - inner: Bytes, + records: Vec, } impl Default for Snapshot { @@ -408,7 +336,7 @@ impl Default for Snapshot { let header = SnapshotHeader::new(0); Self { header, - inner: Bytes::new(), + records: Vec::new(), } } } @@ -421,82 +349,57 @@ impl TryFrom for Snapshot { return Ok(Snapshot::default()); } let header = SnapshotHeader::try_from(bytes.as_bytes())?; - let header_length = header.length as usize; + let record_total_length = header.length as usize; ensure!( - header_length > 0 - && header_length % SnapshotRecordV1::LENGTH == 0 - && header_length + SnapshotHeader::LENGTH == bytes.len(), - "create snapshot from bytes failed, invalid bytes, header length = {}, total length: {}", - header_length, - bytes.len() + record_total_length > 0 + && record_total_length % SnapshotRecordV1::LENGTH == 0 + && record_total_length + SnapshotHeader::LENGTH == bytes.len(), + "create snapshot from bytes failed, header:{header:?}, bytes_length: {}", + bytes.len() ); + let mut index = SnapshotHeader::LENGTH; + let mut records = Vec::with_capacity(record_total_length / SnapshotRecordV1::LENGTH); + while index < bytes.len() { + let record = + SnapshotRecordV1::try_from(&bytes[index..index + SnapshotRecordV1::LENGTH])?; + records.push(record); + index += SnapshotRecordV1::LENGTH; + } - Ok(Self { - header, - inner: bytes, - }) + Ok(Self { header, records }) } } impl Snapshot { - pub fn to_sstfiles(&self) -> Result> { + pub fn into_ssts(self) -> Vec { if self.header.length == 0 { - Ok(Vec::new()) + Vec::new() } else { - let buf = self.inner.as_bytes(); - let mut result: Vec = - Vec::with_capacity(self.header.length as usize / SnapshotRecordV1::LENGTH); - let mut index = SnapshotHeader::LENGTH; - while index < buf.len() { - let record = - SnapshotRecordV1::try_from(&buf[index..index + SnapshotRecordV1::LENGTH])?; - index += SnapshotRecordV1::LENGTH; - result.push(record.into()); - } - - Ok(result) + self.records.into_iter().map(|r| r.into()).collect() } } - pub fn dedup_sstfiles(&self, sstfiles: &mut Vec) -> Result<()> { - let buf = self.inner.as_bytes(); - let mut ids = HashSet::new(); - let mut index = SnapshotHeader::LENGTH; - while index < buf.len() { - let record = SnapshotRecordV1::try_from(&buf[index..index + SnapshotRecordV1::LENGTH])?; - index += SnapshotRecordV1::LENGTH; - ids.insert(record.id()); - } - sstfiles.retain(|item| !ids.contains(&item.id())); + // TODO: Ensure no files duplicated + // https://github.com/apache/horaedb/issues/1608 + pub fn merge_update(&mut self, update: ManifestUpdate) -> Result<()> { + self.records + .extend(update.to_adds.into_iter().map(SnapshotRecordV1::from)); + self.records + .retain(|record| !update.to_deletes.contains(&record.id)); + self.header.length = (self.records.len() * SnapshotRecordV1::LENGTH) as u64; Ok(()) } - pub fn merge_sstfiles(&mut self, sstfiles: Vec) { - // update header - self.header.length += (sstfiles.len() * SnapshotRecordV1::LENGTH) as u64; - // final snapshot - let mut snapshot = vec![0u8; SnapshotHeader::LENGTH + self.header.length as usize]; - let mut writer = snapshot.as_mut_slice(); - - // write new head - self.header.write_to(&mut writer).unwrap(); - // write old records - if !self.inner.is_empty() { - writer - .write_all(&self.inner.as_bytes()[SnapshotHeader::LENGTH..]) - .unwrap(); - } - // write new records - for sst in sstfiles { - let record: SnapshotRecordV1 = sst.into(); - record.write_to(&mut writer).unwrap(); - } - self.inner = Bytes::from(snapshot); - } + pub fn into_bytes(self) -> Result { + let buf = Vec::with_capacity(self.header.length as usize + SnapshotHeader::LENGTH); + let mut cursor = Cursor::new(buf); - pub fn into_bytes(self) -> Bytes { - self.inner + self.header.write_to(&mut cursor)?; + for record in self.records { + record.write_to(&mut cursor).unwrap(); + } + Ok(Bytes::from(cursor.into_inner())) } } @@ -509,7 +412,6 @@ struct ManifestMerger { snapshot_path: Path, delta_dir: Path, store: ObjectStoreRef, - runtime: RuntimeRef, sender: Sender, receiver: RwLock>, deltas_num: AtomicUsize, @@ -521,7 +423,6 @@ impl ManifestMerger { snapshot_path: Path, delta_dir: Path, store: ObjectStoreRef, - runtime: Arc, merge_options: ManifestMergeOptions, ) -> Result> { let (tx, rx) = mpsc::channel(merge_options.channel_size); @@ -529,7 +430,6 @@ impl ManifestMerger { snapshot_path, delta_dir, store, - runtime, sender: tx, receiver: RwLock::new(rx), deltas_num: AtomicUsize::new(0), @@ -602,17 +502,12 @@ impl ManifestMerger { } }); - let mut delta_files = Vec::with_capacity(results.len()); + let mut snapshot = read_snapshot(&self.store, &self.snapshot_path).await?; for res in results { - let sst_file = res.context("Failed to join read delta files task")??; - delta_files.push(sst_file); + let manifest_update = res.context("Failed to join read delta files task")??; + snapshot.merge_update(manifest_update)?; } - let snapshot_bytes = read_object(&self.store, &self.snapshot_path).await?; - let mut snapshot = Snapshot::try_from(snapshot_bytes)?; - // TODO: no need to dedup every time. - snapshot.dedup_sstfiles(&mut delta_files)?; - snapshot.merge_sstfiles(delta_files); - let snapshot_bytes = snapshot.into_bytes(); + let snapshot_bytes = snapshot.into_bytes()?; let put_payload = PutPayload::from_bytes(snapshot_bytes); // 1. Persist the snapshot self.store @@ -646,38 +541,18 @@ impl ManifestMerger { } } -async fn read_object(store: &ObjectStoreRef, path: &Path) -> Result { - match store.get(path).await { - Ok(v) => v - .bytes() - .await - .with_context(|| format!("Failed to read manifest snapshot, path:{path}")) - .map_err(|e| e.into()), - Err(err) => { - if err.to_string().contains("not found") { - Ok(Bytes::new()) - } else { - let context = format!("Failed to read file, path:{path}"); - Err(AnyhowError::new(err).context(context).into()) - } - } - } -} - -async fn read_snapshot(store: &ObjectStoreRef, path: &Path) -> Result { +async fn read_snapshot(store: &ObjectStoreRef, path: &Path) -> Result { match store.get(path).await { Ok(v) => { let bytes = v .bytes() .await .with_context(|| format!("Failed to read manifest snapshot, path:{path}"))?; - let pb_payload = pb_types::Manifest::decode(bytes) - .with_context(|| format!("Failed to decode manifest snapshot, path:{path}"))?; - Payload::try_from(pb_payload) + Snapshot::try_from(bytes) } Err(err) => { if err.to_string().contains("not found") { - Ok(Payload { files: vec![] }) + Ok(Snapshot::default()) } else { let context = format!("Failed to read manifest snapshot, path:{path}"); Err(AnyhowError::new(err).context(context).into()) @@ -686,7 +561,7 @@ async fn read_snapshot(store: &ObjectStoreRef, path: &Path) -> Result { } } -async fn read_delta_file(store: &ObjectStoreRef, sst_path: &Path) -> Result { +async fn read_delta_file(store: &ObjectStoreRef, sst_path: &Path) -> Result { let bytes = store .get(sst_path) .await @@ -695,12 +570,12 @@ async fn read_delta_file(store: &ObjectStoreRef, sst_path: &Path) -> Result Result<()> { @@ -730,43 +605,30 @@ async fn list_delta_paths(store: &ObjectStoreRef, delta_dir: &Path) -> Result>(); + manifest.add_file(i as u64, meta).await.unwrap(); + } - expected_ssts.sort_by_key(|a| a.id()); - ssts.sort_by_key(|a| a.id()); - assert_eq!(expected_ssts, ssts); + let find_range = (10..15).into(); + let mut ssts = manifest.find_ssts(&find_range).await; + + let mut expected_ssts = (10..15) + .map(|i| { + let id = i as u64; + let time_range = (i..i + 1).into(); + let meta = FileMeta { + max_sequence: i as u64, + num_rows: i as u32, + size: i as u32, + time_range, + }; + SstFile::new(id, meta) + }) + .collect::>(); + + expected_ssts.sort_by_key(|a| a.id()); + ssts.sort_by_key(|a| a.id()); + assert_eq!(expected_ssts, ssts); + }); } - #[tokio::test] - async fn test_merge_manifest() { + #[test] + fn test_merge_manifest() { let root_dir = temp_dir::TempDir::new() .unwrap() .path() @@ -792,72 +671,53 @@ mod tests { .to_string(); let snapshot_path = Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}")); let delta_dir = Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}")); - let runtime = tokio::runtime::Builder::new_multi_thread() - .worker_threads(4) - .enable_all() - .build() + let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap()); + let rt = runtime.clone(); + + rt.block_on(async move { + let store: ObjectStoreRef = Arc::new(LocalFileSystem::new()); + let manifest = Manifest::try_new( + root_dir, + store.clone(), + runtime.clone(), + ManifestMergeOptions { + merge_interval_seconds: 1, + ..Default::default() + }, + ) + .await .unwrap(); - let store: ObjectStoreRef = Arc::new(LocalFileSystem::new()); - let manifest = Manifest::try_new( - root_dir, - store.clone(), - Arc::new(runtime), - ManifestMergeOptions { - merge_interval_seconds: 1, - ..Default::default() - }, - ) - .await - .unwrap(); - - // Add manifest files - for i in 0..20 { - let time_range = (i..i + 1).into(); - let meta = FileMeta { - max_sequence: i as u64, - num_rows: i as u32, - size: i as u32, - time_range, - }; - manifest.add_file(i as u64, meta).await.unwrap(); - } + // Add manifest files + for i in 0..20 { + let time_range = (i..i + 1).into(); + let meta = FileMeta { + max_sequence: i as u64, + num_rows: i as u32, + size: i as u32, + time_range, + }; + manifest.add_file(i as u64, meta).await.unwrap(); + } - // Wait for merge manifest to finish - sleep(Duration::from_secs(2)).await; - - let mut mem_ssts = manifest.payload.read().await.files.clone(); - let snapshot = read_object(&store, &snapshot_path).await.unwrap(); - let snapshot_len = snapshot.len(); - let payload: Payload = snapshot.try_into().unwrap(); - let mut ssts = payload.files; - - mem_ssts.sort_by_key(|a| a.id()); - ssts.sort_by_key(|a| a.id()); - assert_eq!(mem_ssts, ssts); - - let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap(); - assert!(delta_paths.is_empty()); - - // Add manifest files again to verify dedup - for i in 0..20 { - let time_range = (i..i + 1).into(); - let meta = FileMeta { - max_sequence: i as u64, - num_rows: i as u32, - size: i as u32, - time_range, - }; - manifest.add_file(i as u64, meta).await.unwrap(); - } + // Wait for merge manifest to finish + sleep(Duration::from_secs(2)).await; - // Wait for merge manifest to finish - sleep(Duration::from_secs(2)).await; + let mut mem_ssts = manifest.ssts.read().await.clone(); + let snapshot = read_snapshot(&store, &snapshot_path).await.unwrap(); + let mut ssts = snapshot + .records + .into_iter() + .map(SstFile::from) + .collect_vec(); + + mem_ssts.sort_by_key(|a| a.id()); + ssts.sort_by_key(|a| a.id()); + assert_eq!(mem_ssts, ssts); - let snapshot_again = read_object(&store, &snapshot_path).await.unwrap(); - assert!(snapshot_len == snapshot_again.len()); // dedup took effect. - let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap(); - assert!(delta_paths.is_empty()); + let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap(); + assert!(delta_paths.is_empty()); + }) } #[test] @@ -885,11 +745,6 @@ mod tests { 257, // length cursor.read_u64::().unwrap() ); - - let mut vec = [0u8; SnapshotHeader::LENGTH - 1]; - let mut writer = vec.as_mut_slice(); - let result = header.write_to(&mut writer); - assert!(result.is_err()); // buf not enough } #[test] @@ -931,9 +786,5 @@ mod tests { 100, // num rows cursor.read_u32::().unwrap() ); - let mut vec = vec![0u8; SnapshotRecordV1::LENGTH - 1]; - let mut writer = vec.as_mut_slice(); - let result = record.write_to(&mut writer); - assert!(result.is_err()); // buf not enough } } diff --git a/src/metric_engine/src/manifest/encoding.rs b/src/metric_engine/src/manifest/encoding.rs new file mode 100644 index 0000000000..01deb7a503 --- /dev/null +++ b/src/metric_engine/src/manifest/encoding.rs @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{ + sst::{FileId, SstFile}, + Error, Result, +}; + +#[derive(Clone, Debug)] +pub struct ManifestUpdate { + pub to_adds: Vec, + pub to_deletes: Vec, +} + +impl ManifestUpdate { + pub fn new(to_adds: Vec, to_deletes: Vec) -> Self { + Self { + to_adds, + to_deletes, + } + } +} + +impl TryFrom for ManifestUpdate { + type Error = Error; + + fn try_from(value: pb_types::ManifestUpdate) -> Result { + let to_adds = value + .to_adds + .into_iter() + .map(SstFile::try_from) + .collect::>>()?; + + Ok(Self { + to_adds, + to_deletes: value.to_deletes, + }) + } +} + +impl From for pb_types::ManifestUpdate { + fn from(value: ManifestUpdate) -> Self { + let to_adds = value + .to_adds + .into_iter() + .map(pb_types::SstFile::from) + .collect(); + + pb_types::ManifestUpdate { + to_adds, + to_deletes: value.to_deletes, + } + } +} diff --git a/src/metric_engine/src/sst.rs b/src/metric_engine/src/sst.rs index 1cbfedec0e..070a62ed27 100644 --- a/src/metric_engine/src/sst.rs +++ b/src/metric_engine/src/sst.rs @@ -74,6 +74,10 @@ impl SstFile { self.inner.in_compaction.store(true, Ordering::Relaxed); } + pub fn unmark_compaction(&self) { + self.inner.in_compaction.store(false, Ordering::Relaxed); + } + pub fn is_compaction(&self) -> bool { self.inner.in_compaction.load(Ordering::Relaxed) } diff --git a/src/metric_engine/src/storage.rs b/src/metric_engine/src/storage.rs index 37cc904e0f..f15d67fff0 100644 --- a/src/metric_engine/src/storage.rs +++ b/src/metric_engine/src/storage.rs @@ -125,6 +125,7 @@ impl StorageRuntimes { /// /// Compaction will be done by merging segments within a segment, and segment /// will make it easy to support expiration. +#[allow(dead_code)] pub struct CloudObjectStorage { segment_duration: Duration, path: String, @@ -150,7 +151,7 @@ pub struct CloudObjectStorage { /// ``` /// `root_path` is composed of `path` and `segment_duration`. impl CloudObjectStorage { - pub async fn try_new( + pub fn try_new( path: String, segment_duration: Duration, store: ObjectStoreRef, @@ -159,15 +160,16 @@ impl CloudObjectStorage { storage_opts: StorageOptions, ) -> Result { let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?; - let manifest = Arc::new( + let manifest = runtimes.manifest_compact_runtime.block_on(async { Manifest::try_new( path.clone(), store.clone(), runtimes.manifest_compact_runtime.clone(), storage_opts.manifest_merge_opts, ) - .await?, - ); + .await + })?; + let manifest = Arc::new(manifest); let schema = { let value_idxes = (num_primary_keys..arrow_schema.fields.len()).collect::>(); ensure!(!value_idxes.is_empty(), "no value column found"); @@ -426,8 +428,8 @@ mod tests { use super::*; use crate::{arrow_schema, record_batch, test_util::check_stream, types::Timestamp}; - #[test(tokio::test)] - async fn test_storage_write_and_scan() { + #[test(test)] + fn test_storage_write_and_scan() { let schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), ("value", Int64)); let root_dir = temp_dir::TempDir::new().unwrap(); let store = Arc::new(LocalFileSystem::new()); @@ -439,67 +441,68 @@ mod tests { 2, // num_primary_keys StorageOptions::default(), ) - .await - .unwrap(); - - let batch = record_batch!( - ("pk1", UInt8, vec![11, 11, 9, 10, 5]), - ("pk2", UInt8, vec![100, 100, 1, 2, 3]), - ("value", Int64, vec![2, 7, 4, 6, 1]) - ) .unwrap(); - storage - .write(WriteRequest { - batch, - time_range: (1..10).into(), - enable_check: true, - }) - .await - .unwrap(); - let batch = record_batch!( - ("pk1", UInt8, vec![11, 11, 9, 10]), - ("pk2", UInt8, vec![100, 99, 1, 2]), - ("value", Int64, vec![22, 77, 44, 66]) - ) - .unwrap(); - storage - .write(WriteRequest { - batch, - time_range: (10..20).into(), - enable_check: true, - }) - .await + storage.runtimes.sst_compact_runtime.block_on(async { + let batch = record_batch!( + ("pk1", UInt8, vec![11, 11, 9, 10, 5]), + ("pk2", UInt8, vec![100, 100, 1, 2, 3]), + ("value", Int64, vec![2, 7, 4, 6, 1]) + ) .unwrap(); + storage + .write(WriteRequest { + batch, + time_range: (1..10).into(), + enable_check: true, + }) + .await + .unwrap(); - let result_stream = storage - .scan(ScanRequest { - range: TimeRange::new(Timestamp(0), Timestamp::MAX), - predicate: vec![], - projections: None, - }) - .await - .unwrap(); - let expected_batch = [ - record_batch!( - ("pk1", UInt8, vec![5, 9, 10, 11]), - ("pk2", UInt8, vec![3, 1, 2, 99]), - ("value", Int64, vec![1, 44, 66, 77]) - ) - .unwrap(), - record_batch!( - ("pk1", UInt8, vec![11]), - ("pk2", UInt8, vec![100]), - ("value", Int64, vec![22]) + let batch = record_batch!( + ("pk1", UInt8, vec![11, 11, 9, 10]), + ("pk2", UInt8, vec![100, 99, 1, 2]), + ("value", Int64, vec![22, 77, 44, 66]) ) - .unwrap(), - ]; + .unwrap(); + storage + .write(WriteRequest { + batch, + time_range: (10..20).into(), + enable_check: true, + }) + .await + .unwrap(); - check_stream(result_stream, expected_batch).await; + let result_stream = storage + .scan(ScanRequest { + range: TimeRange::new(Timestamp(0), Timestamp::MAX), + predicate: vec![], + projections: None, + }) + .await + .unwrap(); + let expected_batch = [ + record_batch!( + ("pk1", UInt8, vec![5, 9, 10, 11]), + ("pk2", UInt8, vec![3, 1, 2, 99]), + ("value", Int64, vec![1, 44, 66, 77]) + ) + .unwrap(), + record_batch!( + ("pk1", UInt8, vec![11]), + ("pk2", UInt8, vec![100]), + ("value", Int64, vec![22]) + ) + .unwrap(), + ]; + + check_stream(result_stream, expected_batch).await; + }); } - #[tokio::test] - async fn test_storage_sort_batch() { + #[test] + fn test_storage_sort_batch() { let schema = arrow_schema!(("a", UInt8), ("b", UInt8), ("c", UInt8), ("c", UInt8)); let root_dir = temp_dir::TempDir::new().unwrap(); let store = Arc::new(LocalFileSystem::new()); @@ -511,32 +514,32 @@ mod tests { 1, StorageOptions::default(), ) - .await - .unwrap(); - - let batch = record_batch!( - ("a", UInt8, vec![2, 1, 3, 4, 8, 6, 5, 7]), - ("b", UInt8, vec![1, 3, 4, 8, 2, 6, 5, 7]), - ("c", UInt8, vec![8, 6, 2, 4, 3, 1, 5, 7]), - ("d", UInt8, vec![2, 7, 4, 6, 1, 3, 5, 8]) - ) .unwrap(); + storage.runtimes.sst_compact_runtime.block_on(async { + let batch = record_batch!( + ("a", UInt8, vec![2, 1, 3, 4, 8, 6, 5, 7]), + ("b", UInt8, vec![1, 3, 4, 8, 2, 6, 5, 7]), + ("c", UInt8, vec![8, 6, 2, 4, 3, 1, 5, 7]), + ("d", UInt8, vec![2, 7, 4, 6, 1, 3, 5, 8]) + ) + .unwrap(); - let mut sorted_batches = storage.sort_batch(batch).await.unwrap(); - let expected_bacth = record_batch!( - ("a", UInt8, vec![1, 2, 3, 4, 5, 6, 7, 8]), - ("b", UInt8, vec![3, 1, 4, 8, 5, 6, 7, 2]), - ("c", UInt8, vec![6, 8, 2, 4, 5, 1, 7, 3]), - ("d", UInt8, vec![7, 2, 4, 6, 5, 3, 8, 1]) - ) - .unwrap(); - let mut offset = 0; - while let Some(sorted_batch) = sorted_batches.next().await { - let sorted_batch = sorted_batch.unwrap(); - let length = sorted_batch.num_rows(); - let batch = expected_bacth.slice(offset, length); - assert_eq!(sorted_batch, batch); - offset += length; - } + let mut sorted_batches = storage.sort_batch(batch).await.unwrap(); + let expected_bacth = record_batch!( + ("a", UInt8, vec![1, 2, 3, 4, 5, 6, 7, 8]), + ("b", UInt8, vec![3, 1, 4, 8, 5, 6, 7, 2]), + ("c", UInt8, vec![6, 8, 2, 4, 5, 1, 7, 3]), + ("d", UInt8, vec![7, 2, 4, 6, 5, 3, 8, 1]) + ) + .unwrap(); + let mut offset = 0; + while let Some(sorted_batch) = sorted_batches.next().await { + let sorted_batch = sorted_batch.unwrap(); + let length = sorted_batch.num_rows(); + let batch = expected_bacth.slice(offset, length); + assert_eq!(sorted_batch, batch); + offset += length; + } + }); } } diff --git a/src/metric_engine/src/util.rs b/src/metric_engine/src/util.rs new file mode 100644 index 0000000000..f48e88340f --- /dev/null +++ b/src/metric_engine/src/util.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Current time in milliseconds. +pub fn now() -> i64 { + let now = SystemTime::now(); + let duration = now.duration_since(UNIX_EPOCH).unwrap(); + duration.as_millis() as i64 +} diff --git a/src/pb_types/protos/sst.proto b/src/pb_types/protos/sst.proto index 5312ffa70c..3c5d904003 100644 --- a/src/pb_types/protos/sst.proto +++ b/src/pb_types/protos/sst.proto @@ -41,11 +41,7 @@ message SstFile { SstMeta meta = 2; } -message Manifest { - repeated SstFile files = 1; -} - -message MetaUpdate { +message ManifestUpdate { repeated SstFile to_adds = 1; - repeated uint64 to_removes = 2; + repeated uint64 to_deletes = 2; }