Skip to content

Commit

Permalink
feat(rdb save): add Lz4Compressor (#536)
Browse files Browse the repository at this point in the history
Signed-off-by: adi_holden <[email protected]>
  • Loading branch information
adiholden authored Dec 6, 2022
1 parent 5aabc96 commit e21a212
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 15 deletions.
6 changes: 4 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ add_third_party(
lz4
URL https://github.com/lz4/lz4/archive/refs/tags/v1.9.4.tar.gz

SOURCE_SUBDIR build/cmake
CMAKE_PASS_FLAGS "-DBUILD_SHARED_LIBS=OFF"
BUILD_IN_SOURCE 1
CONFIGURE_COMMAND echo skip
BUILD_COMMAND make lib-release
INSTALL_COMMAND make install BUILD_SHARED=no PREFIX=${THIRD_PARTY_LIB_DIR}/lz4
)

add_library(TRDP::jsoncons INTERFACE IMPORTED)
Expand Down
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ add_library(dragonfly_lib channel_slice.cc command_registry.cc
zset_family.cc version.cc bitops_family.cc container_utils.cc)

cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib
absl::random_random TRDP::jsoncons zstd)
absl::random_random TRDP::jsoncons zstd TRDP::lz4)

add_library(dfly_test_lib test_utils.cc)
cxx_link(dfly_test_lib dragonfly_lib epoll_fiber_lib facade_test gtest_main_ext)
Expand Down
85 changes: 75 additions & 10 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <absl/cleanup/cleanup.h>
#include <absl/strings/str_cat.h>
#include <absl/strings/str_format.h>
#include <lz4frame.h>
#include <zstd.h>

#include "core/string_set.h"
Expand Down Expand Up @@ -166,20 +167,20 @@ uint8_t RdbObjectType(unsigned type, unsigned encoding) {
return 0; /* avoid warning */
}

class ZstdCompressImpl {
class ZstdCompressor {
public:
ZstdCompressImpl() {
ZstdCompressor() {
cctx_ = ZSTD_createCCtx();
compression_level_ = absl::GetFlag(FLAGS_zstd_compression_level);
}
~ZstdCompressImpl() {
~ZstdCompressor() {
ZSTD_freeCCtx(cctx_);

VLOG(1) << "zstd compressed size: " << compressed_size_total_;
VLOG(1) << "zstd uncompressed size: " << uncompressed_size_total_;
}

io::Bytes Compress(io::Bytes str);
io::Result<io::Bytes> Compress(io::Bytes data);

private:
ZSTD_CCtx* cctx_;
Expand All @@ -189,16 +190,75 @@ class ZstdCompressImpl {
base::PODArray<uint8_t> compr_buf_;
};

io::Bytes ZstdCompressImpl::Compress(io::Bytes str) {
size_t buf_size = ZSTD_compressBound(str.size());
io::Result<io::Bytes> ZstdCompressor::Compress(io::Bytes data) {
size_t buf_size = ZSTD_compressBound(data.size());
if (compr_buf_.capacity() < buf_size) {
compr_buf_.reserve(buf_size);
}
size_t compressed_size = ZSTD_compressCCtx(cctx_, compr_buf_.data(), compr_buf_.capacity(),
str.data(), str.size(), compression_level_);
data.data(), data.size(), compression_level_);

if (ZSTD_isError(compressed_size)) {
return make_unexpected(error_code{int(compressed_size), generic_category()});
}
compressed_size_total_ += compressed_size;
uncompressed_size_total_ += str.size();
uncompressed_size_total_ += data.size();
return io::Bytes(compr_buf_.data(), compressed_size);
}

class Lz4Compressor {
public:
// create a compression context
Lz4Compressor() {
LZ4F_errorCode_t result = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION);
if (LZ4F_isError(result)) {
// TODO this can fail on memory allocation. What should we do in this case?
}
}

// destroy the compression context
~Lz4Compressor() {
LZ4F_freeCompressionContext(ctx_);
VLOG(1) << "lz4 compressed size: " << compressed_size_total_;
VLOG(1) << "lz4 uncompressed size: " << uncompressed_size_total_;
}

// compress a string of data
io::Result<io::Bytes> Compress(io::Bytes data);

private:
LZ4F_cctx* ctx_;
base::PODArray<uint8_t> compr_buf_;
size_t compressed_size_total_ = 0;
size_t uncompressed_size_total_ = 0;
};

io::Result<io::Bytes> Lz4Compressor::Compress(io::Bytes data) {
size_t buf_size = LZ4F_compressFrameBound(data.size(), NULL);
if (compr_buf_.capacity() < buf_size) {
compr_buf_.reserve(buf_size);
}
// initialize the compression context
size_t compressed_size = LZ4F_compressBegin(ctx_, compr_buf_.data(), compr_buf_.capacity(), NULL);
if (LZ4F_isError(compressed_size)) {
return make_unexpected(error_code{int(compressed_size), generic_category()});
}

// compress the data
compressed_size = LZ4F_compressUpdate(ctx_, compr_buf_.data(), compr_buf_.capacity(), data.data(),
data.size(), NULL);
if (LZ4F_isError(compressed_size)) {
return make_unexpected(error_code{int(compressed_size), generic_category()});
}

// finish the compression process
compressed_size = LZ4F_compressEnd(ctx_, compr_buf_.data(), compr_buf_.capacity(), NULL);
if (LZ4F_isError(compressed_size)) {
return make_unexpected(error_code{int(compressed_size), generic_category()});
}

compressed_size_total_ += compressed_size;
uncompressed_size_total_ += data.size();
return io::Bytes(compr_buf_.data(), compressed_size);
}

Expand Down Expand Up @@ -1105,10 +1165,15 @@ void RdbSerializer::CompressBlob() {

// Compress the data
if (!compressor_impl_) {
compressor_impl_.reset(new ZstdCompressImpl());
compressor_impl_.reset(new ZstdCompressor());
}

Bytes compressed_blob = compressor_impl_->Compress(blob_to_compress);
auto ec = compressor_impl_->Compress(blob_to_compress);
if (!ec) {
++compression_stats_->compression_failed;
return;
}
Bytes compressed_blob = *ec;
if (compressed_blob.length() > blob_size * kMinCompressionReductionPrecentage) {
++compression_stats_->compression_no_effective;
return;
Expand Down
5 changes: 3 additions & 2 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class RdbSaver {
CompressionMode compression_mode_;
};

class ZstdCompressImpl;
class ZstdCompressor;

class RdbSerializer {
public:
Expand Down Expand Up @@ -171,13 +171,14 @@ class RdbSerializer {
std::string tmp_str_;
CompressionMode compression_mode_;
// TODO : This compressor impl should support different compression algorithms zstd/lz4 etc.
std::unique_ptr<ZstdCompressImpl> compressor_impl_;
std::unique_ptr<ZstdCompressor> compressor_impl_;

static constexpr size_t kMinStrSizeToCompress = 256;
static constexpr double kMinCompressionReductionPrecentage = 0.95;
struct CompressionStats {
uint32_t compression_no_effective = 0;
uint32_t small_str_count = 0;
uint32_t compression_failed = 0;
};
std::optional<CompressionStats> compression_stats_;
};
Expand Down

0 comments on commit e21a212

Please sign in to comment.