From 8d5d7618f494e46f7e7e13a17bba261e0bd2cb1c Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Fri, 15 Sep 2023 14:25:38 +0800 Subject: [PATCH 1/5] fix: disk cache deduped get_ranges --- components/object_store/src/disk_cache.rs | 5 +- components/object_store/src/lib.rs | 2 + components/object_store/src/test_util.rs | 172 ++++++++++++++++++++++ 3 files changed, 177 insertions(+), 2 deletions(-) create mode 100644 components/object_store/src/test_util.rs diff --git a/components/object_store/src/disk_cache.rs b/components/object_store/src/disk_cache.rs index d868823df7..b41a4b38db 100644 --- a/components/object_store/src/disk_cache.rs +++ b/components/object_store/src/disk_cache.rs @@ -940,6 +940,7 @@ mod test { use upstream::local::LocalFileSystem; use super::*; + use crate::test_util::MemoryStore; struct StoreWithCacheDir { inner: DiskCacheStore, @@ -951,8 +952,7 @@ mod test { cap: usize, partition_bits: usize, ) -> StoreWithCacheDir { - let local_path = tempdir().unwrap(); - let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); + let local_store = Arc::new(MemoryStore::default()); let cache_dir = tempdir().unwrap(); let store = DiskCacheStore::try_new( @@ -1103,6 +1103,7 @@ mod test { } let actual = futures::future::join_all(tasks).await; + println!("get_counts, {}", store.inner.underlying_store); for (actual, (_, expected)) in actual.into_iter().zip(testcases.into_iter()) { assert_eq!(actual.unwrap(), Bytes::from(expected)) } diff --git a/components/object_store/src/lib.rs b/components/object_store/src/lib.rs index 46bf5e50bf..d3114846a5 100644 --- a/components/object_store/src/lib.rs +++ b/components/object_store/src/lib.rs @@ -30,5 +30,7 @@ pub mod multipart; pub mod obkv; pub mod prefix; pub mod s3; +#[cfg(test)] +pub mod test_util; pub type ObjectStoreRef = Arc; diff --git a/components/object_store/src/test_util.rs b/components/object_store/src/test_util.rs new file mode 100644 index 0000000000..1f0eed3dbb --- /dev/null +++ b/components/object_store/src/test_util.rs @@ -0,0 +1,172 @@ +// Copyright 2023 The CeresDB Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{collections::HashMap, fmt::Display, ops::Range, sync::RwLock}; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::{self, BoxStream}; +use tokio::io::AsyncWrite; +use upstream::{path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result}; + +#[derive(Debug)] +struct StoreError { + path: Path, + msg: String, +} + +impl Display for StoreError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("StoreError") + .field("path", &self.path) + .field("msg", &self.msg) + .finish() + } +} + +impl std::error::Error for StoreError {} + +/// A memory based object store implementation, mainly used for testing. +#[derive(Debug, Default)] +pub struct MemoryStore { + files: RwLock>, + get_range_counts: RwLock>, +} + +impl Display for MemoryStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MemoryStore") + .field("counts", &self.get_counts()) + .finish() + } +} + +impl MemoryStore { + pub fn get_counts(&self) -> HashMap { + let counts = self.get_range_counts.read().unwrap(); + counts.clone().into_iter().collect() + } +} + +#[async_trait] +impl ObjectStore for MemoryStore { + async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { + let mut files = self.files.write().unwrap(); + files.insert(location.clone(), bytes); + Ok(()) + } + + async fn get(&self, location: &Path) -> Result { + let files = self.files.read().unwrap(); + if let Some(bs) = files.get(location) { + let bs = bs.clone(); + Ok(GetResult::Stream(Box::pin(stream::once( + async move { Ok(bs) }, + )))) + } else { + let source = Box::new(StoreError { + msg: "not found".to_string(), + path: location.clone(), + }); + Err(upstream::Error::Generic { + store: "get", + source, + }) + } + } + + async fn get_range(&self, location: &Path, range: Range) -> Result { + { + let mut counts = self.get_range_counts.write().unwrap(); + counts + .entry(location.clone()) + .and_modify(|c| *c += 1) + .or_insert(1); + } + + let files = self.files.read().unwrap(); + if let Some(bs) = files.get(location) { + Ok(bs.slice(range)) + } else { + let source = Box::new(StoreError { + msg: "not found".to_string(), + path: location.clone(), + }); + Err(upstream::Error::Generic { + store: "get_range", + source, + }) + } + } + + async fn head(&self, location: &Path) -> Result { + let files = self.files.read().unwrap(); + + if let Some(bs) = files.get(location) { + Ok(ObjectMeta { + location: location.clone(), + size: bs.len(), + last_modified: Default::default(), + }) + } else { + let source = Box::new(StoreError { + msg: "not found".to_string(), + path: location.clone(), + }); + Err(upstream::Error::Generic { + store: "head", + source, + }) + } + } + + async fn put_multipart( + &self, + _location: &Path, + ) -> Result<(MultipartId, Box)> { + unimplemented!() + } + + async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { + unimplemented!() + } + + async fn delete(&self, _location: &Path) -> Result<()> { + unimplemented!() + } + + async fn list(&self, _prefix: Option<&Path>) -> Result>> { + unimplemented!() + } + + async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result { + unimplemented!() + } + + async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> { + unimplemented!() + } + + async fn rename(&self, _from: &Path, _to: &Path) -> Result<()> { + unimplemented!() + } + + async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> { + unimplemented!() + } + + async fn rename_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> { + unimplemented!() + } +} From e55974984662b3faf12e4de721fa04a5aa6b15a8 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Fri, 15 Sep 2023 17:37:48 +0800 Subject: [PATCH 2/5] first write to tmp file when persist bytes --- components/object_store/src/disk_cache.rs | 31 ++++++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/components/object_store/src/disk_cache.rs b/components/object_store/src/disk_cache.rs index b41a4b38db..d1deb623ea 100644 --- a/components/object_store/src/disk_cache.rs +++ b/components/object_store/src/disk_cache.rs @@ -357,18 +357,36 @@ impl DiskCache { } async fn persist_bytes(&self, filename: &str, payload: Bytes) -> Result<()> { - let file_path = std::path::Path::new(&self.root_dir) - .join(filename) + // When write payload to file, the cache lock is released, so when one thread is + // reading, another thread may update it, so we write to tmp file first, + // then rename to expected filename to avoid other threads see partial + // content. + let tmp_filename = format!("{filename}.tmp"); + let tmp_filepath = std::path::Path::new(&self.root_dir) + .join(&tmp_filename) .into_os_string() .into_string() .unwrap(); - let mut file = File::create(&file_path) - .await - .context(Io { file: &file_path })?; + let mut file = File::create(&tmp_filepath).await.context(Io { + file: &tmp_filepath, + })?; let encoding = PageFileEncoder { payload }; - encoding.encode_and_persist(&mut file, filename).await + encoding + .encode_and_persist(&mut file, &tmp_filename) + .await?; + + let dest_filepath = std::path::Path::new(&self.root_dir) + .join(filename) + .into_os_string() + .into_string() + .unwrap(); + tokio::fs::rename(tmp_filepath, dest_filepath) + .await + .context(Io { file: filename })?; + + Ok(()) } /// Read the bytes from the cached file. @@ -1103,7 +1121,6 @@ mod test { } let actual = futures::future::join_all(tasks).await; - println!("get_counts, {}", store.inner.underlying_store); for (actual, (_, expected)) in actual.into_iter().zip(testcases.into_iter()) { assert_eq!(actual.unwrap(), Bytes::from(expected)) } From 525db9f315cc51ec361edefda765c44954df7ea6 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 18 Sep 2023 16:28:19 +0800 Subject: [PATCH 3/5] delete tmp file when io error happens --- components/object_store/src/disk_cache.rs | 81 ++++++++++++----------- 1 file changed, 44 insertions(+), 37 deletions(-) diff --git a/components/object_store/src/disk_cache.rs b/components/object_store/src/disk_cache.rs index d1deb623ea..8b021f1256 100644 --- a/components/object_store/src/disk_cache.rs +++ b/components/object_store/src/disk_cache.rs @@ -167,36 +167,63 @@ impl Manifest { } } -/// The encoder of the page file in the disk cache. +/// The writer of the page file in the disk cache. /// /// Following the payload, a footer [`PageFileEncoder::MAGIC_FOOTER`] is /// appended. -struct PageFileEncoder { - payload: Bytes, +struct PageFileWriter { + output: String, } -impl PageFileEncoder { +impl PageFileWriter { const MAGIC_FOOTER: [u8; 8] = [0, 0, 0, 0, b'c', b'e', b'r', b'e']; - async fn encode_and_persist(&self, writer: &mut W, name: &str) -> Result<()> - where - W: AsyncWrite + std::marker::Unpin, - { + fn new(output: String) -> Self { + Self { output } + } + + fn tmp_file(input: &str) -> String { + format!("{}.tmp", input) + } + + async fn write_inner(&self, tmp_file: &str, bytes: Bytes) -> Result<()> { + let mut writer = File::create(&tmp_file) + .await + .context(Io { file: tmp_file })?; writer - .write_all(&self.payload[..]) + .write_all(&bytes) .await - .context(Io { file: name })?; + .context(Io { file: tmp_file })?; writer .write_all(&Self::MAGIC_FOOTER) .await - .context(Io { file: name })?; + .context(Io { file: tmp_file })?; - writer.flush().await.context(Io { file: name })?; + writer.flush().await.context(Io { file: tmp_file })?; + + tokio::fs::rename(tmp_file, &self.output) + .await + .context(Io { file: &self.output })?; Ok(()) } + // When write bytes to file, the cache lock is released, so when one thread is + // reading, another thread may update it, so we write to tmp file first, + // then rename to expected filename to avoid other threads see partial + // content. + async fn write_and_flush(self, bytes: Bytes) -> Result<()> { + let tmp_file = Self::tmp_file(&self.output); + let write_result = self.write_inner(&tmp_file, bytes).await; + if write_result.is_err() { + // we don't care this result. + _ = tokio::fs::remove_file(&tmp_file).await; + } + + write_result + } + #[inline] fn encoded_size(payload_len: usize) -> usize { payload_len + Self::MAGIC_FOOTER.len() @@ -262,7 +289,7 @@ impl DiskCache { async fn insert_data(&self, filename: String, value: Bytes) { let page_meta = { - let file_size = PageFileEncoder::encoded_size(value.len()); + let file_size = PageFileWriter::encoded_size(value.len()); PageMeta { file_size } }; let evicted_file = self.insert_page_meta(filename.clone(), page_meta); @@ -357,34 +384,14 @@ impl DiskCache { } async fn persist_bytes(&self, filename: &str, payload: Bytes) -> Result<()> { - // When write payload to file, the cache lock is released, so when one thread is - // reading, another thread may update it, so we write to tmp file first, - // then rename to expected filename to avoid other threads see partial - // content. - let tmp_filename = format!("{filename}.tmp"); - let tmp_filepath = std::path::Path::new(&self.root_dir) - .join(&tmp_filename) - .into_os_string() - .into_string() - .unwrap(); - - let mut file = File::create(&tmp_filepath).await.context(Io { - file: &tmp_filepath, - })?; - - let encoding = PageFileEncoder { payload }; - encoding - .encode_and_persist(&mut file, &tmp_filename) - .await?; - let dest_filepath = std::path::Path::new(&self.root_dir) .join(filename) .into_os_string() .into_string() .unwrap(); - tokio::fs::rename(tmp_filepath, dest_filepath) - .await - .context(Io { file: filename })?; + + let writer = PageFileWriter::new(dest_filepath); + writer.write_and_flush(payload).await?; Ok(()) } @@ -399,7 +406,7 @@ impl DiskCache { range: &Range, expect_file_size: usize, ) -> std::io::Result { - if PageFileEncoder::encoded_size(range.len()) > expect_file_size { + if PageFileWriter::encoded_size(range.len()) > expect_file_size { return Ok(ReadBytesResult::OutOfRange); } From ab55ecd063a5cbc3dba19d94e9a25fff63cb0b54 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Tue, 19 Sep 2023 11:09:44 +0800 Subject: [PATCH 4/5] do not log send error --- components/object_store/src/disk_cache.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/components/object_store/src/disk_cache.rs b/components/object_store/src/disk_cache.rs index 8b021f1256..c52567a181 100644 --- a/components/object_store/src/disk_cache.rs +++ b/components/object_store/src/disk_cache.rs @@ -27,7 +27,7 @@ use chrono::{DateTime, Utc}; use crc::{Crc, CRC_32_ISCSI}; use futures::stream::BoxStream; use hash_ext::SeaHasherBuilder; -use log::{debug, error, info, warn}; +use log::{debug, info, warn}; use lru::LruCache; use notifier::notifier::{ExecutionGuard, RequestNotifiers}; use partitioned_lock::PartitionedMutex; @@ -706,7 +706,7 @@ impl DiskCacheStore { } .fail(), ) { - error!("Failed to send notifier error result, err:{e:?}."); + warn!("Failed to send notifier error result, err:{e:?}."); } } } @@ -723,8 +723,10 @@ impl DiskCacheStore { { self.cache.insert_data(cache_key, bytes.clone()).await; for notifier in notifiers { - if let Err(e) = notifier.send(Ok(bytes.clone())) { - error!("Failed to send notifier success result, err:{e:?}."); + if notifier.send(Ok(bytes.clone())).is_err() { + // The error contains sent bytes, which maybe very large, + // so we don't log error. + warn!("Failed to send notifier success result"); } } } From 8bb8ffc946f0f5f70f6abd04d30b32f72cc5ce2f Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Thu, 21 Sep 2023 10:24:45 +0800 Subject: [PATCH 5/5] remove tmpfile in drop --- components/object_store/src/disk_cache.rs | 38 +++++++++++++++++------ 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/components/object_store/src/disk_cache.rs b/components/object_store/src/disk_cache.rs index c52567a181..89723f6b47 100644 --- a/components/object_store/src/disk_cache.rs +++ b/components/object_store/src/disk_cache.rs @@ -173,21 +173,43 @@ impl Manifest { /// appended. struct PageFileWriter { output: String, + tmp_file: String, + need_clean_tmpfile: bool, +} + +impl Drop for PageFileWriter { + fn drop(&mut self) { + if self.need_clean_tmpfile { + if let Err(e) = std::fs::remove_file(&self.tmp_file) { + warn!( + "Disk cache remove page tmp file failed, file:{}, err:{e}", + &self.tmp_file + ); + } + } + } } impl PageFileWriter { const MAGIC_FOOTER: [u8; 8] = [0, 0, 0, 0, b'c', b'e', b'r', b'e']; fn new(output: String) -> Self { - Self { output } + let tmp_file = Self::tmp_file(&output); + + Self { + output, + tmp_file, + need_clean_tmpfile: true, + } } fn tmp_file(input: &str) -> String { format!("{}.tmp", input) } - async fn write_inner(&self, tmp_file: &str, bytes: Bytes) -> Result<()> { - let mut writer = File::create(&tmp_file) + async fn write_inner(&self, bytes: Bytes) -> Result<()> { + let tmp_file = &self.tmp_file; + let mut writer = File::create(tmp_file) .await .context(Io { file: tmp_file })?; writer @@ -213,12 +235,10 @@ impl PageFileWriter { // reading, another thread may update it, so we write to tmp file first, // then rename to expected filename to avoid other threads see partial // content. - async fn write_and_flush(self, bytes: Bytes) -> Result<()> { - let tmp_file = Self::tmp_file(&self.output); - let write_result = self.write_inner(&tmp_file, bytes).await; - if write_result.is_err() { - // we don't care this result. - _ = tokio::fs::remove_file(&tmp_file).await; + async fn write_and_flush(mut self, bytes: Bytes) -> Result<()> { + let write_result = self.write_inner(bytes).await; + if write_result.is_ok() { + self.need_clean_tmpfile = false; } write_result