Skip to content

Commit

Permalink
BucketList cache
Browse files Browse the repository at this point in the history
  • Loading branch information
SirTyson committed Dec 6, 2024
1 parent fdd833d commit 16fc3d3
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 44 deletions.
12 changes: 11 additions & 1 deletion src/bucket/BucketIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ class BucketIndex : public NonMovableOrCopyable
IndividualIndex::const_iterator>;

inline static const std::string DB_BACKEND_STATE = "bl";
inline static const uint32_t BUCKET_INDEX_VERSION = 4;
inline static const uint32_t BUCKET_INDEX_VERSION = 5;
inline static const uint32_t CACHE_SIZE = 1'000'000;
inline static const uint64_t INDIVIDUAL_CACHE_CUTOFF_SIZE = 100'000'000'000;

// Returns true if LedgerEntryType not supported by BucketListDB
static bool typeNotSupported(LedgerEntryType t);
Expand Down Expand Up @@ -130,13 +132,21 @@ class BucketIndex : public NonMovableOrCopyable
virtual std::optional<std::pair<std::streamoff, std::streamoff>>
getOfferRange() const = 0;

// Returns true if cache hit occurred
virtual std::pair<std::shared_ptr<BucketEntry>, bool>
getFromCache(LedgerKey const& k) const = 0;

virtual void addToCache(std::shared_ptr<BucketEntry> be) const = 0;

// Returns page size for index. InidividualIndex returns 0 for page size
virtual std::streamoff getPageSize() const = 0;

virtual Iterator begin() const = 0;

virtual Iterator end() const = 0;

virtual bool isFullyCached() const = 0;

virtual void markBloomMiss() const = 0;
virtual void markBloomLookup() const = 0;
virtual BucketEntryCounters const& getBucketEntryCounters() const = 0;
Expand Down
78 changes: 75 additions & 3 deletions src/bucket/BucketIndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "util/LogSlowExecution.h"
#include "util/Logging.h"
#include "util/XDRStream.h"
#include "xdr/Stellar-ledger-entries.h"

#include <Tracy.hpp>
#include <cereal/archives/binary.hpp>
Expand All @@ -28,6 +29,8 @@
#include <fmt/format.h>

#include <memory>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include <type_traits>
#include <xdrpp/marshal.h>
Expand Down Expand Up @@ -91,12 +94,15 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,
auto timer = LogSlowExecution("Indexing bucket");
mData.pageSize = pageSize;

auto fileSize = std::filesystem::file_size(filename);
bool inMemoryMap = fileSize < INDIVIDUAL_CACHE_CUTOFF_SIZE &&
std::is_same_v<BucketEntryT, BucketEntry>;

// We don't have a good way of estimating IndividualIndex size since
// keys are variable size, so only reserve range indexes since we know
// the page size ahead of time
if constexpr (std::is_same<IndexT, RangeIndex>::value)
if (std::is_same_v<IndexT, RangeIndex>)
{
auto fileSize = std::filesystem::file_size(filename);
auto estimatedIndexEntries = fileSize / mData.pageSize;
mData.keysToOffset.reserve(estimatedIndexEntries);
}
Expand Down Expand Up @@ -187,6 +193,15 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,
}
}

if constexpr (std::is_same_v<BucketEntryT, LiveBucket::EntryT>)
{
if (inMemoryMap)
{
mData.inMemoryMap[key] =
std::make_shared<BucketEntry>(be);
}
}

if constexpr (std::is_same_v<IndexT, RangeIndex>)
{
auto keyBuf = xdr::xdr_to_opaque(key);
Expand Down Expand Up @@ -299,7 +314,8 @@ template <class IndexT>
template <class Archive>
BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager const& bm, Archive& ar,
std::streamoff pageSize)
: mBloomMissMeter(bm.getBloomMissMeter())
: mData()
, mBloomMissMeter(bm.getBloomMissMeter())
, mBloomLookupMeter(bm.getBloomLookupMeter())
{
mData.pageSize = pageSize;
Expand Down Expand Up @@ -572,6 +588,48 @@ BucketIndexImpl<IndexT>::getOfferRange() const
return getOffsetBounds(lowerBound, upperBound);
}

template <class IndexT>
std::pair<std::shared_ptr<BucketEntry>, bool>
BucketIndexImpl<IndexT>::getFromCache(LedgerKey const& k) const
{
if (mData.inMemoryMap.empty())
{
std::lock_guard<std::shared_mutex> lock(mData.cacheLock);
auto* ptr = mData.inMemoryCache.maybeGet(k);
if (ptr)
{
return {*ptr, true};
}
else
{
return {nullptr, false};
}
}
else
{
auto iter = mData.inMemoryMap.find(k);
if (iter == mData.inMemoryMap.end())
{
return {nullptr, true};
}
else
{
return {iter->second, true};
}
}
}

template <class IndexT>
void
BucketIndexImpl<IndexT>::addToCache(std::shared_ptr<BucketEntry> be) const
{
if (mData.inMemoryMap.empty())
{
std::unique_lock<std::shared_mutex> lock(mData.cacheLock);
mData.inMemoryCache.put(getBucketLedgerKey(*be), be);
}
}

#ifdef BUILD_TESTS
template <class IndexT>
bool
Expand Down Expand Up @@ -620,6 +678,20 @@ BucketIndexImpl<IndexT>::operator==(BucketIndex const& inRaw) const
return false;
}

if (mData.inMemoryMap.size() != in.mData.inMemoryMap.size())
{
return false;
}

for (auto const& [key, entry] : mData.inMemoryMap)
{
auto iter = in.mData.inMemoryMap.find(key);
if (iter == in.mData.inMemoryMap.end() || !(*entry == *iter->second))
{
return false;
}
}

return true;
}
#endif
Expand Down
29 changes: 26 additions & 3 deletions src/bucket/BucketIndexImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@

#include "bucket/BucketIndex.h"
#include "bucket/LiveBucket.h"
#include "ledger/LedgerHashUtils.h"
#include "medida/meter.h"
#include "util/BinaryFuseFilter.h"
#include "util/RandomEvictionCache.h"
#include "xdr/Stellar-ledger-entries.h"
#include "xdr/Stellar-types.h"

#include "util/BufferedAsioCerealOutputArchive.h"
#include <cereal/types/map.hpp>
#include <map>
#include <memory>
#include <shared_mutex>

namespace stellar
{
Expand All @@ -34,15 +38,19 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
std::streamoff pageSize{};
std::unique_ptr<BinaryFuseFilter16> filter{};
std::map<Asset, std::vector<PoolID>> assetToPoolID{};
std::map<LedgerKey, std::shared_ptr<BucketEntry>> inMemoryMap;
mutable RandomEvictionCache<LedgerKey, std::shared_ptr<BucketEntry>>
inMemoryCache;
mutable std::shared_mutex cacheLock;
BucketEntryCounters counters{};

template <class Archive>
void
save(Archive& ar) const
{
auto version = BUCKET_INDEX_VERSION;
ar(version, pageSize, assetToPoolID, keysToOffset, filter,
counters);
ar(version, pageSize, assetToPoolID, keysToOffset, filter, counters,
inMemoryMap);
}

// Note: version and pageSize must be loaded before this function is
Expand All @@ -53,7 +61,11 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
void
load(Archive& ar)
{
ar(assetToPoolID, keysToOffset, filter, counters);
ar(assetToPoolID, keysToOffset, filter, counters, inMemoryMap);
}

SerializableBucketIndex() : inMemoryCache(CACHE_SIZE)
{
}
} mData;

Expand Down Expand Up @@ -100,6 +112,11 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
virtual std::optional<std::pair<std::streamoff, std::streamoff>>
getOfferRange() const override;

virtual std::pair<std::shared_ptr<BucketEntry>, bool>
getFromCache(LedgerKey const& k) const override;

virtual void addToCache(std::shared_ptr<BucketEntry> be) const override;

virtual std::streamoff
getPageSize() const override
{
Expand All @@ -118,6 +135,12 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
return mData.keysToOffset.end();
}

virtual bool
isFullyCached() const override
{
return !mData.inMemoryMap.empty();
}

virtual void markBloomMiss() const override;
virtual void markBloomLookup() const override;
virtual BucketEntryCounters const& getBucketEntryCounters() const override;
Expand Down
100 changes: 64 additions & 36 deletions src/bucket/BucketSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ BucketSnapshotBase<BucketT>::getEntryAtOffset(LedgerKey const& k,
}
else if (stream.readPage(be, k, pageSize))
{
return {std::make_shared<typename BucketT::EntryT>(be), false};
auto ret = std::make_shared<typename BucketT::EntryT>(be);
if constexpr (std::is_same_v<BucketT, LiveBucket>)
{
mBucket->getIndex().addToCache(ret);
}
return {ret, false};
}

// Mark entry miss for metrics
Expand All @@ -81,6 +86,15 @@ BucketSnapshotBase<BucketT>::getBucketEntry(LedgerKey const& k) const
return {nullptr, false};
}

if constexpr (std::is_same_v<BucketT, LiveBucket>)
{
auto [entryOp, hit] = mBucket->getIndex().getFromCache(k);
if (hit)
{
return {entryOp, false};
}
}

auto pos = mBucket->getIndex().lookup(k);
if (pos.has_value())
{
Expand Down Expand Up @@ -111,53 +125,67 @@ BucketSnapshotBase<BucketT>::loadKeys(
auto currKeyIt = keys.begin();
auto const& index = mBucket->getIndex();
auto indexIter = index.begin();
while (currKeyIt != keys.end() && indexIter != index.end())

while (currKeyIt != keys.end() &&
(indexIter != index.end() || index.isFullyCached()))
{
auto [offOp, newIndexIter] = index.scan(indexIter, *currKeyIt);
indexIter = newIndexIter;
if (offOp)
std::shared_ptr<typename BucketT::EntryT> entryOp{};
bool cacheHit = false;
if constexpr (std::is_same_v<BucketT, LiveBucket>)
{
std::tie(entryOp, cacheHit) = index.getFromCache(*currKeyIt);
}

if (!cacheHit)
{
auto [entryOp, bloomMiss] = getEntryAtOffset(
std::optional<std::streamoff> offOp{};
auto bloomMiss = false;
std::tie(offOp, indexIter) = index.scan(indexIter, *currKeyIt);
if (!offOp)
{
++currKeyIt;
continue;
}

std::tie(entryOp, bloomMiss) = getEntryAtOffset(
*currKeyIt, *offOp, mBucket->getIndex().getPageSize());
}

if (entryOp)
if (entryOp)
{
// Don't return tombstone entries, as these do not exist wrt
// ledger state
if (!BucketT::isTombstoneEntry(*entryOp))
{
// Don't return tombstone entries, as these do not exist wrt
// ledger state
if (!BucketT::isTombstoneEntry(*entryOp))
// Only live bucket loads can be metered
if constexpr (std::is_same_v<BucketT, LiveBucket>)
{
// Only live bucket loads can be metered
if constexpr (std::is_same_v<BucketT, LiveBucket>)
bool addEntry = true;
if (lkMeter)
{
bool addEntry = true;
if (lkMeter)
{
// Here, we are metering after the entry has been
// loaded. This is because we need to know the size
// of the entry to meter it. Future work will add
// metering at the xdr level.
auto entrySize =
xdr::xdr_size(entryOp->liveEntry());
addEntry = lkMeter->canLoad(*currKeyIt, entrySize);
lkMeter->updateReadQuotasForKey(*currKeyIt,
entrySize);
}
if (addEntry)
{
result.push_back(entryOp->liveEntry());
}
// Here, we are metering after the entry has been
// loaded. This is because we need to know the size
// of the entry to meter it. Future work will add
// metering at the xdr level.
auto entrySize = xdr::xdr_size(entryOp->liveEntry());
addEntry = lkMeter->canLoad(*currKeyIt, entrySize);
lkMeter->updateReadQuotasForKey(*currKeyIt, entrySize);
}
else
if (addEntry)
{
static_assert(std::is_same_v<BucketT, HotArchiveBucket>,
"unexpected bucket type");
result.push_back(*entryOp);
result.push_back(entryOp->liveEntry());
}
}

currKeyIt = keys.erase(currKeyIt);
continue;
else
{
static_assert(std::is_same_v<BucketT, HotArchiveBucket>,
"unexpected bucket type");
result.push_back(*entryOp);
}
}

currKeyIt = keys.erase(currKeyIt);
continue;
}

++currKeyIt;
Expand Down
2 changes: 1 addition & 1 deletion src/main/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ Config::Config() : NODE_SEED(SecretKey::random())
DEPRECATED_SQL_LEDGER_STATE = false;
BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT = 14; // 2^14 == 16 kb
BUCKETLIST_DB_INDEX_CUTOFF = 20; // 20 mb
BUCKETLIST_DB_PERSIST_INDEX = true;
BUCKETLIST_DB_PERSIST_INDEX = false;
BACKGROUND_EVICTION_SCAN = true;
PUBLISH_TO_ARCHIVE_DELAY = std::chrono::seconds{0};
// automatic maintenance settings:
Expand Down

0 comments on commit 16fc3d3

Please sign in to comment.