Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Dec 17, 2024
1 parent d161905 commit b65c968
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 170 deletions.
171 changes: 85 additions & 86 deletions src/metric_engine/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,17 @@ use futures::{StreamExt, TryStreamExt};
use object_store::{path::Path, PutPayload};
use parquet::data_type::AsBytes;
use prost::Message;
use tokio::{
runtime::Runtime,
sync::{
mpsc::{self, Receiver, Sender},
RwLock,
},
use tokio::sync::{
mpsc::{self, Receiver, Sender},
RwLock,
};
use tracing::error;
use uuid::Uuid;

use crate::{
ensure,
sst::{FileId, FileMeta, SstFile},
types::{ManifestMergeOptions, ObjectStoreRef, TimeRange},
types::{ManifestMergeOptions, ObjectStoreRef, RuntimeRef, TimeRange},
AnyhowError, Error, Result,
};

Expand All @@ -69,7 +66,7 @@ impl Manifest {
pub async fn try_new(
root_dir: String,
store: ObjectStoreRef,
runtime: Arc<Runtime>,
runtime: RuntimeRef,
merge_options: ManifestMergeOptions,
) -> Result<Self> {
let snapshot_path = Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
Expand Down Expand Up @@ -614,111 +611,113 @@ mod tests {

use super::*;

#[tokio::test]
async fn test_find_manifest() {
#[test]
fn test_find_manifest() {
let root_dir = temp_dir::TempDir::new().unwrap();
let runtime = tokio::runtime::Runtime::new().unwrap();
let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap());
let rt = runtime.clone();
let store = Arc::new(LocalFileSystem::new());

let manifest = Manifest::try_new(
root_dir.path().to_string_lossy().to_string(),
store,
Arc::new(runtime),
ManifestMergeOptions::default(),
)
.await
.unwrap();

for i in 0..20 {
let time_range = (i..i + 1).into();
let meta = FileMeta {
max_sequence: i as u64,
num_rows: i as u32,
size: i as u32,
time_range,
};
manifest.add_file(i as u64, meta).await.unwrap();
}

let find_range = (10..15).into();
let mut ssts = manifest.find_ssts(&find_range).await;
rt.block_on(async move {
let manifest = Manifest::try_new(
root_dir.path().to_string_lossy().to_string(),
store,
runtime.clone(),
ManifestMergeOptions::default(),
)
.await
.unwrap();

let mut expected_ssts = (10..15)
.map(|i| {
let id = i as u64;
for i in 0..20 {
let time_range = (i..i + 1).into();
let meta = FileMeta {
max_sequence: i as u64,
num_rows: i as u32,
size: i as u32,
time_range,
};
SstFile::new(id, meta)
})
.collect::<Vec<_>>();
manifest.add_file(i as u64, meta).await.unwrap();
}

expected_ssts.sort_by_key(|a| a.id());
ssts.sort_by_key(|a| a.id());
assert_eq!(expected_ssts, ssts);
let find_range = (10..15).into();
let mut ssts = manifest.find_ssts(&find_range).await;

let mut expected_ssts = (10..15)
.map(|i| {
let id = i as u64;
let time_range = (i..i + 1).into();
let meta = FileMeta {
max_sequence: i as u64,
num_rows: i as u32,
size: i as u32,
time_range,
};
SstFile::new(id, meta)
})
.collect::<Vec<_>>();

expected_ssts.sort_by_key(|a| a.id());
ssts.sort_by_key(|a| a.id());
assert_eq!(expected_ssts, ssts);
});
}

#[tokio::test]
async fn test_merge_manifest() {
#[test]
fn test_merge_manifest() {
let root_dir = temp_dir::TempDir::new()
.unwrap()
.path()
.to_string_lossy()
.to_string();
let snapshot_path = Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
let delta_dir = Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}"));
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap());
let rt = runtime.clone();

rt.block_on(async move {
let store: ObjectStoreRef = Arc::new(LocalFileSystem::new());
let manifest = Manifest::try_new(
root_dir,
store.clone(),
runtime.clone(),
ManifestMergeOptions {
merge_interval_seconds: 1,
..Default::default()
},
)
.await
.unwrap();
let store: ObjectStoreRef = Arc::new(LocalFileSystem::new());

let manifest = Manifest::try_new(
root_dir,
store.clone(),
Arc::new(runtime),
ManifestMergeOptions {
merge_interval_seconds: 1,
..Default::default()
},
)
.await
.unwrap();

// Add manifest files
for i in 0..20 {
let time_range = (i..i + 1).into();
let meta = FileMeta {
max_sequence: i as u64,
num_rows: i as u32,
size: i as u32,
time_range,
};
manifest.add_file(i as u64, meta).await.unwrap();
}
// Add manifest files
for i in 0..20 {
let time_range = (i..i + 1).into();
let meta = FileMeta {
max_sequence: i as u64,
num_rows: i as u32,
size: i as u32,
time_range,
};
manifest.add_file(i as u64, meta).await.unwrap();
}

// Wait for merge manifest to finish
sleep(Duration::from_secs(2)).await;
// Wait for merge manifest to finish
sleep(Duration::from_secs(2)).await;

let mut mem_ssts = manifest.ssts.read().await.clone();
let snapshot = read_snapshot(&store, &snapshot_path).await.unwrap();
let mut ssts = snapshot
.records
.into_iter()
.map(SstFile::from)
.collect_vec();
let mut mem_ssts = manifest.ssts.read().await.clone();
let snapshot = read_snapshot(&store, &snapshot_path).await.unwrap();
let mut ssts = snapshot
.records
.into_iter()
.map(SstFile::from)
.collect_vec();

mem_ssts.sort_by_key(|a| a.id());
ssts.sort_by_key(|a| a.id());
assert_eq!(mem_ssts, ssts);
mem_ssts.sort_by_key(|a| a.id());
ssts.sort_by_key(|a| a.id());
assert_eq!(mem_ssts, ssts);

let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap();
assert!(delta_paths.is_empty());
let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap();
assert!(delta_paths.is_empty());
})
}

#[test]
Expand Down
Loading

0 comments on commit b65c968

Please sign in to comment.