Skip to content

Commit

Permalink
Live to init conversions lowest level
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasBrady committed Nov 26, 2024
1 parent 7f54c88 commit 0e8f83b
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 15 deletions.
9 changes: 5 additions & 4 deletions src/bucket/BucketBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<BucketT> const& newBucket,
std::vector<std::shared_ptr<BucketT>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents,
asio::io_context& ctx, bool doFsync)
asio::io_context& ctx, bool doFsync,
bool rewriteLiveToInitEntries)
{
BUCKET_TYPE_ASSERT(BucketT);

Expand Down Expand Up @@ -353,7 +354,7 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,

BucketOutputIterator<BucketT> out(bucketManager.getTmpDir(),
keepTombstoneEntries, meta, mc, ctx,
doFsync);
doFsync, rewriteLiveToInitEntries);

BucketEntryIdCmp<BucketT> cmp;
size_t iter = 0;
Expand Down Expand Up @@ -407,13 +408,13 @@ template std::shared_ptr<LiveBucket> BucketBase::merge<LiveBucket>(
std::shared_ptr<LiveBucket> const& newBucket,
std::vector<std::shared_ptr<LiveBucket>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents, asio::io_context& ctx,
bool doFsync);
bool doFsync, bool rewriteLiveToInitEntries);

template std::shared_ptr<HotArchiveBucket> BucketBase::merge<HotArchiveBucket>(
BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<HotArchiveBucket> const& oldBucket,
std::shared_ptr<HotArchiveBucket> const& newBucket,
std::vector<std::shared_ptr<HotArchiveBucket>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents, asio::io_context& ctx,
bool doFsync);
bool doFsync, bool rewriteLiveToInitEntries);
}
6 changes: 5 additions & 1 deletion src/bucket/BucketBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class BucketBase : public NonMovableOrCopyable
public:
static constexpr ProtocolVersion
FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION = ProtocolVersion::V_23;
static constexpr ProtocolVersion
FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY =
ProtocolVersion::V_11;

// Create an empty bucket. The empty bucket has hash '000000...' and its
// filename is the empty string.
Expand Down Expand Up @@ -114,7 +117,8 @@ class BucketBase : public NonMovableOrCopyable
std::shared_ptr<BucketT> const& newBucket,
std::vector<std::shared_ptr<BucketT>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents,
asio::io_context& ctx, bool doFsync);
asio::io_context& ctx, bool doFsync,
bool rewriteLiveToInitEntries = false);

static std::string randomBucketName(std::string const& tmpDir);
static std::string randomBucketIndexName(std::string const& tmpDir);
Expand Down
7 changes: 7 additions & 0 deletions src/bucket/BucketListBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,13 @@ BucketListBase<BucketT>::keepTombstoneEntries(uint32_t level)
return level < BucketListBase<BucketT>::kNumLevels - 1;
}

template <typename BucketT>
bool
BucketListBase<BucketT>::rewriteLiveToInitEntries(uint32_t level)
{
return level == BucketListBase<BucketT>::kNumLevels - 1;
}

template <typename BucketT>
BucketLevel<BucketT> const&
BucketListBase<BucketT>::getLevel(uint32_t i) const
Expand Down
4 changes: 4 additions & 0 deletions src/bucket/BucketListBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,10 @@ template <class BucketT> class BucketListBase
// HotArchiveBucketList, HOT_ARCHIVE_LIVE.
static bool keepTombstoneEntries(uint32_t level);

// Returns true if at given `level` live entries should be rewritten to
// init entries.
static bool rewriteLiveToInitEntries(uint32_t level);

// Number of ledgers it takes a bucket to spill/receive an incoming spill
static uint32_t bucketUpdatePeriod(uint32_t level, bool isCurr);

Expand Down
35 changes: 28 additions & 7 deletions src/bucket/BucketOutputIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,18 @@ namespace stellar
* hashes them while writing to either destination. Produces a Bucket when done.
*/
template <typename BucketT>
BucketOutputIterator<BucketT>::BucketOutputIterator(std::string const& tmpDir,
bool keepTombstoneEntries,
BucketMetadata const& meta,
MergeCounters& mc,
asio::io_context& ctx,
bool doFsync)
BucketOutputIterator<BucketT>::BucketOutputIterator(
std::string const& tmpDir, bool keepTombstoneEntries,
BucketMetadata const& meta, MergeCounters& mc, asio::io_context& ctx,
bool doFsync, bool rewriteLiveToInitEntries)
: mFilename(BucketBase::randomBucketName(tmpDir))
, mOut(ctx, doFsync)
, mCtx(ctx)
, mBuf(nullptr)
, mKeepTombstoneEntries(keepTombstoneEntries)
, mMeta(meta)
, mMergeCounters(mc)
, mRewriteLiveToInitEntries(rewriteLiveToInitEntries)
{
ZoneScoped;
CLOG_TRACE(Bucket, "BucketOutputIterator opening file to write: {}",
Expand Down Expand Up @@ -157,9 +156,31 @@ BucketOutputIterator<BucketT>::put(typename BucketT::EntryT const& e)
mBuf = std::make_unique<typename BucketT::EntryT>();
}

if constexpr (std::is_same_v<BucketT, LiveBucket>)
{
if (mRewriteLiveToInitEntries && e.type() == LIVEENTRY &&
protocolVersionStartsFrom(
mMeta.ledgerVersion,
BucketT::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY))
{
// If mRewriteLiveToInitEntries, we also want to convert the
// LIVEENTRY to an INITENTRY. This is because each level of the
// bucket list contains only one entry per key, and per CAP-0020,
// INIT ENTRY implies that either no entry with the same ledger key
// exists in an older bucket. Therefore, all entries of type
// LIVEENTRY in the lowest level are also of type INITENTRY.
++mMergeCounters.mOutputIteratorLiveToInitRewrites;
++mMergeCounters.mOutputIteratorBufferUpdates;
// Make a copy of e and set the type of the new entry to INITENTRY.
typename BucketT::EntryT eCopy = e;
eCopy.type(INITENTRY);
*mBuf = eCopy;
return;
}
}
// In any case, replace *mBuf with e.
++mMergeCounters.mOutputIteratorBufferUpdates;
*mBuf = e;
++mMergeCounters.mOutputIteratorBufferUpdates;
}

template <typename BucketT>
Expand Down
4 changes: 3 additions & 1 deletion src/bucket/BucketOutputIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ template <typename BucketT> class BucketOutputIterator
BucketMetadata mMeta;
bool mPutMeta{false};
MergeCounters& mMergeCounters;
bool mRewriteLiveToInitEntries{false};

public:
// BucketOutputIterators must _always_ be constructed with BucketMetadata,
Expand All @@ -48,7 +49,8 @@ template <typename BucketT> class BucketOutputIterator
// (or forget to do), it's handled automatically.
BucketOutputIterator(std::string const& tmpDir, bool keepTombstoneEntries,
BucketMetadata const& meta, MergeCounters& mc,
asio::io_context& ctx, bool doFsync);
asio::io_context& ctx, bool doFsync,
bool rewriteLiveToInitEntries = false);

void put(typename BucketT::EntryT const& e);

Expand Down
1 change: 1 addition & 0 deletions src/bucket/BucketUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct MergeCounters
uint64_t mOutputIteratorTombstoneElisions{0};
uint64_t mOutputIteratorBufferUpdates{0};
uint64_t mOutputIteratorActualWrites{0};
uint64_t mOutputIteratorLiveToInitRewrites{0};
MergeCounters& operator+=(MergeCounters const& delta);
bool operator==(MergeCounters const& other) const;
};
Expand Down
3 changes: 2 additions & 1 deletion src/bucket/FutureBucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ FutureBucket<BucketT>::startMerge(Application& app, uint32_t maxProtocolVersion,
auto res = BucketBase::merge(
bm, maxProtocolVersion, curr, snap, shadows,
BucketListBase<BucketT>::keepTombstoneEntries(level),
countMergeEvents, ctx, doFsync);
countMergeEvents, ctx, doFsync,
BucketListBase<BucketT>::rewriteLiveToInitEntries(level));

if (res)
{
Expand Down
19 changes: 18 additions & 1 deletion src/bucket/test/BucketListTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,8 @@ TEST_CASE_VERSIONS("hot archive bucket tombstones expire at bottom level",
});
}

TEST_CASE_VERSIONS("live bucket tombstones expire at bottom level",
TEST_CASE_VERSIONS("live bucket tombstones expire at bottom level and live "
"entries converted to init entries",
"[bucket][bucketlist][tombstones]")
{
VirtualClock clock;
Expand Down Expand Up @@ -521,6 +522,22 @@ TEST_CASE_VERSIONS("live bucket tombstones expire at bottom level",
REQUIRE(e0.nDead != 0);
REQUIRE(e1.nDead != 0);
REQUIRE(e2.nDead == 0);
if (protocolVersionStartsFrom(
cfg.LEDGER_PROTOCOL_VERSION,
LiveBucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY))
{
// Assert that init entries are converted to live entries
// at the lowest level.
REQUIRE(e2.nLive == 0);
REQUIRE(e2.nInitOrArchived != 0);
// But not the level above.
REQUIRE(e1.nInitOrArchived == 0);
}
else
{
REQUIRE(e2.nLive != 0);
REQUIRE(e2.nInitOrArchived == 0);
}
});
}

Expand Down
4 changes: 4 additions & 0 deletions src/bucket/test/BucketManagerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,8 @@ class StopAndRestartBucketMergesTest
mMergeCounters.mDeadEntryShadowElisions);
CLOG_INFO(Bucket, "OutputIteratorTombstoneElisions: {}",
mMergeCounters.mOutputIteratorTombstoneElisions);
CLOG_INFO(Bucket, "OutputIteratorLiveToInitConversions: {}",
mMergeCounters.mOutputIteratorLiveToInitRewrites);
CLOG_INFO(Bucket, "OutputIteratorBufferUpdates: {}",
mMergeCounters.mOutputIteratorBufferUpdates);
CLOG_INFO(Bucket, "OutputIteratorActualWrites: {}",
Expand Down Expand Up @@ -915,6 +917,8 @@ class StopAndRestartBucketMergesTest

CHECK(mMergeCounters.mOutputIteratorTombstoneElisions ==
other.mMergeCounters.mOutputIteratorTombstoneElisions);
CHECK(mMergeCounters.mOutputIteratorLiveToInitRewrites ==
other.mMergeCounters.mOutputIteratorLiveToInitRewrites);
CHECK(mMergeCounters.mOutputIteratorBufferUpdates ==
other.mMergeCounters.mOutputIteratorBufferUpdates);
CHECK(mMergeCounters.mOutputIteratorActualWrites ==
Expand Down

0 comments on commit 0e8f83b

Please sign in to comment.