diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6ddb60afa2b0..a79e576f7a98 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -26,8 +26,10 @@ add_third_party( lz4 URL https://github.com/lz4/lz4/archive/refs/tags/v1.9.4.tar.gz - SOURCE_SUBDIR build/cmake - CMAKE_PASS_FLAGS "-DBUILD_SHARED_LIBS=OFF" + BUILD_IN_SOURCE 1 + CONFIGURE_COMMAND echo skip + BUILD_COMMAND make lib-release + INSTALL_COMMAND make install BUILD_SHARED=no PREFIX=${THIRD_PARTY_LIB_DIR}/lz4 ) add_library(TRDP::jsoncons INTERFACE IMPORTED) diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 9887cf5d1be2..f4851fae3ad5 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -21,7 +21,7 @@ add_library(dragonfly_lib channel_slice.cc command_registry.cc zset_family.cc version.cc bitops_family.cc container_utils.cc) cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib - absl::random_random TRDP::jsoncons zstd) + absl::random_random TRDP::jsoncons zstd TRDP::lz4) add_library(dfly_test_lib test_utils.cc) cxx_link(dfly_test_lib dragonfly_lib epoll_fiber_lib facade_test gtest_main_ext) diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 2905cb6d64b5..d412c831d10c 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -7,6 +7,7 @@ #include #include #include +#include #include #include "core/string_set.h" @@ -166,20 +167,20 @@ uint8_t RdbObjectType(unsigned type, unsigned encoding) { return 0; /* avoid warning */ } -class ZstdCompressImpl { +class ZstdCompressor { public: - ZstdCompressImpl() { + ZstdCompressor() { cctx_ = ZSTD_createCCtx(); compression_level_ = absl::GetFlag(FLAGS_zstd_compression_level); } - ~ZstdCompressImpl() { + ~ZstdCompressor() { ZSTD_freeCCtx(cctx_); VLOG(1) << "zstd compressed size: " << compressed_size_total_; VLOG(1) << "zstd uncompressed size: " << uncompressed_size_total_; } - io::Bytes Compress(io::Bytes str); + io::Result Compress(io::Bytes data); private: ZSTD_CCtx* cctx_; @@ -189,16 +190,75 @@ class ZstdCompressImpl { base::PODArray compr_buf_; }; -io::Bytes ZstdCompressImpl::Compress(io::Bytes str) { - size_t buf_size = ZSTD_compressBound(str.size()); +io::Result ZstdCompressor::Compress(io::Bytes data) { + size_t buf_size = ZSTD_compressBound(data.size()); if (compr_buf_.capacity() < buf_size) { compr_buf_.reserve(buf_size); } size_t compressed_size = ZSTD_compressCCtx(cctx_, compr_buf_.data(), compr_buf_.capacity(), - str.data(), str.size(), compression_level_); + data.data(), data.size(), compression_level_); + if (ZSTD_isError(compressed_size)) { + return make_unexpected(error_code{int(compressed_size), generic_category()}); + } compressed_size_total_ += compressed_size; - uncompressed_size_total_ += str.size(); + uncompressed_size_total_ += data.size(); + return io::Bytes(compr_buf_.data(), compressed_size); +} + +class Lz4Compressor { + public: + // create a compression context + Lz4Compressor() { + LZ4F_errorCode_t result = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION); + if (LZ4F_isError(result)) { + // TODO this can fail on memory allocation. What should we do in this case? + } + } + + // destroy the compression context + ~Lz4Compressor() { + LZ4F_freeCompressionContext(ctx_); + VLOG(1) << "lz4 compressed size: " << compressed_size_total_; + VLOG(1) << "lz4 uncompressed size: " << uncompressed_size_total_; + } + + // compress a string of data + io::Result Compress(io::Bytes data); + + private: + LZ4F_cctx* ctx_; + base::PODArray compr_buf_; + size_t compressed_size_total_ = 0; + size_t uncompressed_size_total_ = 0; +}; + +io::Result Lz4Compressor::Compress(io::Bytes data) { + size_t buf_size = LZ4F_compressFrameBound(data.size(), NULL); + if (compr_buf_.capacity() < buf_size) { + compr_buf_.reserve(buf_size); + } + // initialize the compression context + size_t compressed_size = LZ4F_compressBegin(ctx_, compr_buf_.data(), compr_buf_.capacity(), NULL); + if (LZ4F_isError(compressed_size)) { + return make_unexpected(error_code{int(compressed_size), generic_category()}); + } + + // compress the data + compressed_size = LZ4F_compressUpdate(ctx_, compr_buf_.data(), compr_buf_.capacity(), data.data(), + data.size(), NULL); + if (LZ4F_isError(compressed_size)) { + return make_unexpected(error_code{int(compressed_size), generic_category()}); + } + + // finish the compression process + compressed_size = LZ4F_compressEnd(ctx_, compr_buf_.data(), compr_buf_.capacity(), NULL); + if (LZ4F_isError(compressed_size)) { + return make_unexpected(error_code{int(compressed_size), generic_category()}); + } + + compressed_size_total_ += compressed_size; + uncompressed_size_total_ += data.size(); return io::Bytes(compr_buf_.data(), compressed_size); } @@ -1105,10 +1165,15 @@ void RdbSerializer::CompressBlob() { // Compress the data if (!compressor_impl_) { - compressor_impl_.reset(new ZstdCompressImpl()); + compressor_impl_.reset(new ZstdCompressor()); } - Bytes compressed_blob = compressor_impl_->Compress(blob_to_compress); + auto ec = compressor_impl_->Compress(blob_to_compress); + if (!ec) { + ++compression_stats_->compression_failed; + return; + } + Bytes compressed_blob = *ec; if (compressed_blob.length() > blob_size * kMinCompressionReductionPrecentage) { ++compression_stats_->compression_no_effective; return; diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 7ac8e6dfe736..535a3a1d9196 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -112,7 +112,7 @@ class RdbSaver { CompressionMode compression_mode_; }; -class ZstdCompressImpl; +class ZstdCompressor; class RdbSerializer { public: @@ -171,13 +171,14 @@ class RdbSerializer { std::string tmp_str_; CompressionMode compression_mode_; // TODO : This compressor impl should support different compression algorithms zstd/lz4 etc. - std::unique_ptr compressor_impl_; + std::unique_ptr compressor_impl_; static constexpr size_t kMinStrSizeToCompress = 256; static constexpr double kMinCompressionReductionPrecentage = 0.95; struct CompressionStats { uint32_t compression_no_effective = 0; uint32_t small_str_count = 0; + uint32_t compression_failed = 0; }; std::optional compression_stats_; };