Skip to content

Commit

Permalink
refactor: move encoding struct into independent file
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Dec 18, 2024
1 parent 03b1df9 commit f59918d
Show file tree
Hide file tree
Showing 4 changed files with 342 additions and 337 deletions.
10 changes: 5 additions & 5 deletions src/metric_engine/src/compaction/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl Executor {
let inused = self.inner.inused_memory.load(Ordering::Relaxed);
let mem_limit = self.inner.mem_limit;
ensure!(
inused + task_size > mem_limit,
inused + task_size <= mem_limit,
"Compaction memory usage too high, inused:{inused}, task_size:{task_size}, limit:{mem_limit}"
);

Expand All @@ -113,7 +113,7 @@ impl Executor {
let task_size = task.input_size();
self.inner
.inused_memory
.fetch_add(task_size, Ordering::Relaxed);
.fetch_sub(task_size, Ordering::Relaxed);
}

pub fn on_failure(&self, task: &Task) {
Expand All @@ -132,12 +132,12 @@ impl Executor {
}
}

pub fn submit(&self, task: Task) {
pub fn spawn(&self, task: Task) {
let runnable = Runnable {
executor: self.clone(),
task,
};
runnable.run()
runnable.spawn()
}

// TODO: Merge input sst files into one new sst file
Expand Down Expand Up @@ -257,7 +257,7 @@ pub struct Runnable {
}

impl Runnable {
fn run(self) {
fn spawn(self) {
let rt = self.executor.inner.runtime.clone();
rt.spawn(async move {
if let Err(e) = self.executor.do_compaction(&self.task).await {
Expand Down
2 changes: 1 addition & 1 deletion src/metric_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl Scheduler {

async fn recv_task_loop(mut task_rx: Receiver<Task>, executor: Executor) {
while let Some(task) = task_rx.recv().await {
executor.submit(task);
executor.spawn(task);
}
}

Expand Down
334 changes: 333 additions & 1 deletion src/metric_engine/src/manifest/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use std::io::{Cursor, Write};

use anyhow::Context;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use bytes::Bytes;
use parquet::data_type::AsBytes;

use crate::{
sst::{FileId, SstFile},
ensure,
sst::{FileId, FileMeta, SstFile},
types::TimeRange,
Error, Result,
};

Expand Down Expand Up @@ -66,3 +75,326 @@ impl From<ManifestUpdate> for pb_types::ManifestUpdate {
}
}
}

/// The layout for the header.
/// ```plaintext
/// +-------------+--------------+------------+--------------+
/// | magic(u32) | version(u8) | flag(u8) | length(u32) |
/// +-------------+--------------+------------+--------------+
/// ```
/// - The Magic field (u32) is used to ensure the validity of the data source.
/// - The Flags field (u8) is reserved for future extensibility, such as
/// enabling compression or supporting additional features.
/// - The length field (u64) represents the total length of the subsequent
/// records and serves as a straightforward method for verifying their
/// integrity. (length = record_length * record_count)
#[derive(Debug)]
pub struct SnapshotHeader {
pub magic: u32,
pub version: u8,
pub flag: u8,
pub length: u64,
}

impl TryFrom<&[u8]> for SnapshotHeader {
type Error = Error;

fn try_from(bytes: &[u8]) -> Result<Self> {
ensure!(
bytes.len() >= Self::LENGTH,
"invalid bytes, length: {}",
bytes.len()
);

let mut cursor = Cursor::new(bytes);
let magic = cursor.read_u32::<LittleEndian>().unwrap();
ensure!(
magic == SnapshotHeader::MAGIC,
"invalid bytes to convert to header."
);
let version = cursor.read_u8().unwrap();
let flag = cursor.read_u8().unwrap();
let length = cursor.read_u64::<LittleEndian>().unwrap();
Ok(Self {
magic,
version,
flag,
length,
})
}
}

impl SnapshotHeader {
pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8 /*length*/;
pub const MAGIC: u32 = 0xCAFE_6666;

pub fn new(length: u64) -> Self {
Self {
magic: SnapshotHeader::MAGIC,
version: SnapshotRecord::VERSION,
flag: 0,
length,
}
}

pub fn write_to<W>(&self, mut writer: W) -> Result<()>
where
W: Write,
{
writer
.write_u32::<LittleEndian>(self.magic)
.context("write shall not fail.")?;
writer
.write_u8(self.version)
.context("write shall not fail.")?;
writer
.write_u8(self.flag)
.context("write shall not fail.")?;
writer
.write_u64::<LittleEndian>(self.length)
.context("write shall not fail.")?;
Ok(())
}
}

/// The layout for manifest Record:
/// ```plaintext
/// +---------+-------------------+------------+-----------------+
/// | id(u64) | time_range(i64*2) | size(u32) | num_rows(u32) |
/// +---------+-------------------+------------+-----------------+
/// ```
pub struct SnapshotRecord {
id: u64,
time_range: TimeRange,
size: u32,
num_rows: u32,
}

impl SnapshotRecord {
const LENGTH: usize = 8 /*id*/+ 16 /*time range*/ + 4 /*size*/ + 4 /*num rows*/;
pub const VERSION: u8 = 1;

pub fn write_to<W>(&self, mut writer: W) -> Result<()>
where
W: Write,
{
writer
.write_u64::<LittleEndian>(self.id)
.context("write shall not fail.")?;
writer
.write_i64::<LittleEndian>(*self.time_range.start)
.context("write shall not fail.")?;
writer
.write_i64::<LittleEndian>(*self.time_range.end)
.context("write shall not fail.")?;
writer
.write_u32::<LittleEndian>(self.size)
.context("write shall not fail.")?;
writer
.write_u32::<LittleEndian>(self.num_rows)
.context("write shall not fail.")?;
Ok(())
}

pub fn id(&self) -> u64 {
self.id
}
}

impl From<SstFile> for SnapshotRecord {
fn from(value: SstFile) -> Self {
SnapshotRecord {
id: value.id(),
time_range: value.meta().time_range.clone(),
size: value.meta().size,
num_rows: value.meta().num_rows,
}
}
}

impl TryFrom<&[u8]> for SnapshotRecord {
type Error = Error;

fn try_from(value: &[u8]) -> Result<Self> {
ensure!(
value.len() >= SnapshotRecord::LENGTH,
"invalid value len: {}",
value.len()
);

let mut cursor = Cursor::new(value);
let id = cursor.read_u64::<LittleEndian>().unwrap();
let start = cursor.read_i64::<LittleEndian>().unwrap();
let end = cursor.read_i64::<LittleEndian>().unwrap();
let size = cursor.read_u32::<LittleEndian>().unwrap();
let num_rows = cursor.read_u32::<LittleEndian>().unwrap();
Ok(SnapshotRecord {
id,
time_range: (start..end).into(),
size,
num_rows,
})
}
}

impl From<SnapshotRecord> for SstFile {
fn from(record: SnapshotRecord) -> Self {
let file_meta = FileMeta {
max_sequence: record.id(),
num_rows: record.num_rows,
size: record.size,
time_range: record.time_range.clone(),
};
SstFile::new(record.id(), file_meta)
}
}

pub struct Snapshot {
header: SnapshotHeader,
records: Vec<SnapshotRecord>,
}

impl Default for Snapshot {
// create an empty Snapshot
fn default() -> Self {
let header = SnapshotHeader::new(0);
Self {
header,
records: Vec::new(),
}
}
}

impl TryFrom<Bytes> for Snapshot {
type Error = Error;

fn try_from(bytes: Bytes) -> Result<Self> {
if bytes.is_empty() {
return Ok(Snapshot::default());
}
let header = SnapshotHeader::try_from(bytes.as_bytes())?;
let record_total_length = header.length as usize;
ensure!(
record_total_length > 0
&& record_total_length % SnapshotRecord::LENGTH == 0
&& record_total_length + SnapshotHeader::LENGTH == bytes.len(),
"create snapshot from bytes failed, header:{header:?}, bytes_length: {}",
bytes.len()
);
let mut index = SnapshotHeader::LENGTH;
let mut records = Vec::with_capacity(record_total_length / SnapshotRecord::LENGTH);
while index < bytes.len() {
let record = SnapshotRecord::try_from(&bytes[index..index + SnapshotRecord::LENGTH])?;
records.push(record);
index += SnapshotRecord::LENGTH;
}

Ok(Self { header, records })
}
}

impl Snapshot {
pub fn into_ssts(self) -> Vec<SstFile> {
if self.header.length == 0 {
Vec::new()
} else {
self.records.into_iter().map(|r| r.into()).collect()
}
}

// TODO: Ensure no files duplicated
// https://github.com/apache/horaedb/issues/1608
pub fn merge_update(&mut self, update: ManifestUpdate) -> Result<()> {
self.records
.extend(update.to_adds.into_iter().map(SnapshotRecord::from));
self.records
.retain(|record| !update.to_deletes.contains(&record.id));

self.header.length = (self.records.len() * SnapshotRecord::LENGTH) as u64;
Ok(())
}

pub fn into_bytes(self) -> Result<Bytes> {
let buf = Vec::with_capacity(self.header.length as usize + SnapshotHeader::LENGTH);
let mut cursor = Cursor::new(buf);

self.header.write_to(&mut cursor)?;
for record in self.records {
record.write_to(&mut cursor)?;
}
Ok(Bytes::from(cursor.into_inner()))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_snapshot_header() {
let header = SnapshotHeader::new(257);
let mut vec = vec![0u8; SnapshotHeader::LENGTH];
let mut writer = vec.as_mut_slice();
header.write_to(&mut writer).unwrap();
assert!(writer.is_empty());
let mut cursor = Cursor::new(vec);

assert_eq!(
SnapshotHeader::MAGIC,
cursor.read_u32::<LittleEndian>().unwrap()
);
assert_eq!(
1, // version
cursor.read_u8().unwrap()
);
assert_eq!(
0, // flag
cursor.read_u8().unwrap()
);
assert_eq!(
257, // length
cursor.read_u64::<LittleEndian>().unwrap()
);
}

#[test]
fn test_snapshot_record() {
let sstfile = SstFile::new(
99,
FileMeta {
max_sequence: 99,
num_rows: 100,
size: 938,
time_range: (100..200).into(),
},
);
let record: SnapshotRecord = sstfile.into();
let mut vec: Vec<u8> = vec![0u8; SnapshotRecord::LENGTH];
let mut writer = vec.as_mut_slice();
record.write_to(&mut writer).unwrap();

assert!(writer.is_empty());
let mut cursor = Cursor::new(vec);

assert_eq!(
99, // id
cursor.read_u64::<LittleEndian>().unwrap()
);
assert_eq!(
100, // start range
cursor.read_i64::<LittleEndian>().unwrap()
);
assert_eq!(
200, // end range
cursor.read_i64::<LittleEndian>().unwrap()
);
assert_eq!(
938, // size
cursor.read_u32::<LittleEndian>().unwrap()
);
assert_eq!(
100, // num rows
cursor.read_u32::<LittleEndian>().unwrap()
);
}
}
Loading

0 comments on commit f59918d

Please sign in to comment.