Skip to content

Commit

Permalink
Add new catchup mode to use transaction results to skip failed transa…
Browse files Browse the repository at this point in the history
…ction and signature verification (#4536)

# Description

Resolves
[#X](#2814 (comment))

Adds a new config option, `CATCHUP_SKIP_KNOWN_RESULTS_FOR_TESTING`. When
this config option is enabled, transaction results are downloaded from
history archives for the catchup range. Failed transactions are not
applied, and signatures are not verified. This mode is only available in
test builds. The plan is to make this mode configurable when launching
supercluster runs from jenkins, with it being enabled by default but
disabled specifically for release validation when we want a more
comprehensive catchup. I'll raise a separate PR for that.

## Perf testing

### Locally running catchup on 1000 ledgers:

```
user/system/total time seconds

*Baseline (no skipping):*     429 / 115 / 138s
*Skip Failed:*                373 / 99  / 114s  (1.14x / 1.16x / 1.21x speedup over baseline)
*Skip Failed + verification:* 334 / 88  / 95s   (1.28x / 1.30x / 1.45x speedup over baseline)
```

### Supercluster PubnetParallelCatchup

Completed in 14h 47min (vs ~24 hours with recent releases / master
HEAD).

https://buildmeister-v3.stellar-ops.com/job/Core/job/stellar-supercluster/1055/

<!---

Describe what this pull request does, which issue it's resolving
(usually applicable for code changes).

--->

# Checklist
- [ ] Reviewed the
[contributing](https://github.com/stellar/stellar-core/blob/master/CONTRIBUTING.md#submitting-changes)
document
- [ ] Rebased on top of master (no merge commits)
- [ ] Ran `clang-format` v8.0.0 (via `make format` or the Visual Studio
extension)
- [ ] Compiles
- [ ] Ran all tests
- [ ] If change impacts performance, include supporting evidence per the
[performance
document](https://github.com/stellar/stellar-core/blob/master/performance-eval/performance-eval.md)
  • Loading branch information
dmkozh authored Nov 27, 2024
2 parents 7458426 + 47caaff commit b887bba
Show file tree
Hide file tree
Showing 19 changed files with 476 additions and 78 deletions.
83 changes: 82 additions & 1 deletion src/catchup/ApplyCheckpointWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,42 @@ ApplyCheckpointWork::openInputFiles()
mTxIn.open(ti.localPath_nogz());
mTxHistoryEntry = TransactionHistoryEntry();
mHeaderHistoryEntry = LedgerHeaderHistoryEntry();
#ifdef BUILD_TESTS
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS_FOR_TESTING)
{
mTxResultIn = std::make_optional<XDRInputFileStream>();
FileTransferInfo tri(mDownloadDir, FileType::HISTORY_FILE_TYPE_RESULTS,
mCheckpoint);
if (!tri.localPath_nogz().empty() &&
std::filesystem::exists(tri.localPath_nogz()))
{
CLOG_DEBUG(History, "Replaying transaction results from {}",
tri.localPath_nogz());

try
{
mTxResultIn->open(tri.localPath_nogz());
}
catch (std::exception const& e)
{
CLOG_DEBUG(History,
"Failed to open transaction results file: {}. All "
"transactions will be applied.",
e.what());
}
mTxHistoryResultEntry =
std::make_optional<TransactionHistoryResultEntry>();
}
else
{
CLOG_DEBUG(History,
"Results file {} not found for checkpoint {} . All "
"transactions will be applied for this checkpoint.",
tri.localPath_nogz(), mCheckpoint);
mTxHistoryResultEntry = std::nullopt;
}
}
#endif
mFilesOpen = true;
}

Expand Down Expand Up @@ -138,6 +174,43 @@ ApplyCheckpointWork::getCurrentTxSet()
return TxSetXDRFrame::makeEmpty(lm.getLastClosedLedgerHeader());
}

#ifdef BUILD_TESTS
std::optional<TransactionResultSet>
ApplyCheckpointWork::getCurrentTxResultSet()
{
ZoneScoped;
auto& lm = mApp.getLedgerManager();
auto seq = lm.getLastClosedLedgerNum() + 1;
// Check mTxResultSet prior to loading next result set.
// This order is important because it accounts for ledger "gaps"
// in the history archives (which are caused by ledgers with empty tx
// sets, as those are not uploaded).
while (mTxResultIn && mTxResultIn->readOne(*mTxHistoryResultEntry))
{
if (mTxHistoryResultEntry)
{
if (mTxHistoryResultEntry->ledgerSeq < seq)
{
CLOG_DEBUG(History, "Advancing past txresultset for ledger {}",
mTxHistoryResultEntry->ledgerSeq);
}
else if (mTxHistoryResultEntry->ledgerSeq > seq)
{
break;
}
else
{
releaseAssert(mTxHistoryResultEntry->ledgerSeq == seq);
CLOG_DEBUG(History, "Loaded txresultset for ledger {}", seq);
return std::make_optional(mTxHistoryResultEntry->txResultSet);
}
}
}
CLOG_DEBUG(History, "No txresultset for ledger {}", seq);
return std::nullopt;
}
#endif // BUILD_TESTS

std::shared_ptr<LedgerCloseData>
ApplyCheckpointWork::getNextLedgerCloseData()
{
Expand Down Expand Up @@ -216,6 +289,14 @@ ApplyCheckpointWork::getNextLedgerCloseData()
CLOG_DEBUG(History, "Ledger {} has {} transactions", header.ledgerSeq,
txset->sizeTxTotal());

std::optional<TransactionResultSet> txres = std::nullopt;
#ifdef BUILD_TESTS
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS_FOR_TESTING)
{
txres = getCurrentTxResultSet();
}
#endif

// We've verified the ledgerHeader (in the "trusted part of history"
// sense) in CATCHUP_VERIFY phase; we now need to check that the
// txhash we're about to apply is the one denoted by that ledger
Expand Down Expand Up @@ -246,7 +327,7 @@ ApplyCheckpointWork::getNextLedgerCloseData()

return std::make_shared<LedgerCloseData>(
header.ledgerSeq, txset, header.scpValue,
std::make_optional<Hash>(mHeaderHistoryEntry.hash));
std::make_optional<Hash>(mHeaderHistoryEntry.hash), txres);
}

BasicWork::State
Expand Down
31 changes: 21 additions & 10 deletions src/catchup/ApplyCheckpointWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@ class TmpDir;
struct LedgerHeaderHistoryEntry;

/**
* This class is responsible for applying transactions stored in files on
* temporary directory (downloadDir) to local ledger. It requires two sets of
* files - ledgers and transactions - int .xdr format. Transaction files are
* used to read transactions that will be used and ledger files are used to
* This class is responsible for applying transactions stored in files in the
* temporary directory (downloadDir) to local the ledger. It requires two sets
* of files - ledgers and transactions - in .xdr format. Transaction files are
* used to read transactions that will be applied and ledger files are used to
* check if ledger hashes are matching.
*
* It may also require a third set of files - transaction results - to use in
* accelerated replay, where failed transactions are not applied and successful
* transactions are applied without verifying their signatures.
*
* In each run it skips or applies transactions from one ledger. Skipping occurs
* when ledger to be applied is older than LCL from local ledger. At LCL
* boundary checks are made to confirm that ledgers from files knit up with
* LCL. If everything is OK, an apply ledger operation is performed. Then
* another check is made - if new local ledger matches corresponding ledger from
* file.
* when the ledger to be applied is older than the LCL of the local ledger. At
* LCL, boundary checks are made to confirm that the ledgers from the files knit
* up with LCL. If everything is OK, an apply ledger operation is performed.
* Then another check is made - if the new local ledger matches corresponding
* the ledger from file.
*
* Constructor of this class takes some important parameters:
* The constructor of this class takes some important parameters:
* * downloadDir - directory containing ledger and transaction files
* * range - LedgerRange to apply, must be checkpoint-aligned,
* and cover at most one checkpoint.
Expand All @@ -49,6 +53,10 @@ class ApplyCheckpointWork : public BasicWork
XDRInputFileStream mHdrIn;
XDRInputFileStream mTxIn;
TransactionHistoryEntry mTxHistoryEntry;
#ifdef BUILD_TESTS
std::optional<XDRInputFileStream> mTxResultIn;
std::optional<TransactionHistoryResultEntry> mTxHistoryResultEntry;
#endif // BUILD_TESTS
LedgerHeaderHistoryEntry mHeaderHistoryEntry;
OnFailureCallback mOnFailure;

Expand All @@ -57,6 +65,9 @@ class ApplyCheckpointWork : public BasicWork
std::shared_ptr<ConditionalWork> mConditionalWork;

TxSetXDRFrameConstPtr getCurrentTxSet();
#ifdef BUILD_TESTS
std::optional<TransactionResultSet> getCurrentTxResultSet();
#endif // BUILD_TESTS
void openInputFiles();

std::shared_ptr<LedgerCloseData> getNextLedgerCloseData();
Expand Down
15 changes: 8 additions & 7 deletions src/catchup/CatchupConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace stellar
{

// Each catchup can be configured by two parameters destination ledger
// Each catchup can be configured by two parameters: destination ledger
// (and its hash, if known) and count of ledgers to apply.
// Value of count can be adjusted in different ways during catchup. If applying
// count ledgers would mean going before the last closed ledger - it is
Expand All @@ -31,12 +31,13 @@ namespace stellar
// and catchup to that instead of destination ledger. This is useful when
// doing offline commandline catchups with stellar-core catchup command.
//
// Catchup can be done in two modes - ONLINE nad OFFLINE. In ONLINE mode node
// is connected to the network. If receives ledgers during catchup and applies
// them after history is applied. Also additional closing ledger is required
// to mark catchup as complete and node as synced. In OFFLINE mode node is not
// connected to network, so new ledgers are not being externalized. Only
// buckets and transactions from history archives are applied.
// Catchup can be done in two modes - ONLINE and OFFLINE. In ONLINE mode, the
// node is connected to the network. It receives ledgers during catchup and
// applies them after history is applied. Also, an additional closing ledger is
// required to mark catchup as complete and the node as synced. In OFFLINE mode,
// the node is not connected to network, so new ledgers are not being
// externalized. Only buckets and transactions from history archives are
// applied.
class CatchupConfiguration
{
public:
Expand Down
14 changes: 7 additions & 7 deletions src/catchup/CatchupWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@ using WorkSeqPtr = std::shared_ptr<WorkSequence>;

// CatchupWork does all the necessary work to perform any type of catchup.
// It accepts CatchupConfiguration structure to know from which ledger to which
// one do the catchup and if it involves only applying ledgers or ledgers and
// one to do the catchup and if it involves only applying ledgers or ledgers and
// buckets.
//
// First thing it does is to get a history state which allows to calculate
// proper destination ledger (in case CatchupConfiguration::CURRENT) was used
// and to get list of buckets that should be in database on that ledger.
// First, it gets a history state, which allows it to calculate a
// proper destination ledger (in case CatchupConfiguration::CURRENT)
// and get a list of buckets that should be in the database on that ledger.
//
// Next step is downloading and verifying ledgers (if verifyMode is set to
// VERIFY_BUFFERED_LEDGERS it can also verify against ledgers currently
// Next, it downloads and verifies ledgers (if verifyMode is set to
// VERIFY_BUFFERED_LEDGERS, it can also verify against ledgers currently
// buffered in LedgerManager).
//
// Then, depending on configuration, it can download, verify and apply buckets
// (as in MINIMAL and RECENT catchups), and then download and apply
// transactions (as in COMPLETE and RECENT catchups).
//
// After that, catchup is done and node can replay buffered ledgers and take
// After that, catchup is done and the node can replay buffered ledgers and take
// part in consensus protocol.

class CatchupWork : public Work
Expand Down
90 changes: 71 additions & 19 deletions src/catchup/DownloadApplyTxsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ DownloadApplyTxsWork::yieldMoreWork()
{
throw std::runtime_error("Work has no more children to iterate over!");
}

CLOG_INFO(History,
"Downloading, unzipping and applying {} for checkpoint {}",
typeString(FileType::HISTORY_FILE_TYPE_TRANSACTIONS),
Expand Down Expand Up @@ -80,6 +79,53 @@ DownloadApplyTxsWork::yieldMoreWork()
mApp, mDownloadDir, LedgerRange::inclusive(low, high), cb);

std::vector<std::shared_ptr<BasicWork>> seq{getAndUnzip};
std::vector<FileTransferInfo> filesToTransfer{ft};
std::vector<std::shared_ptr<BasicWork>> optionalDownloads;
#ifdef BUILD_TESTS
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS_FOR_TESTING)
{
CLOG_INFO(History,
"Downloading, unzipping and applying {} for checkpoint {}",
typeString(FileType::HISTORY_FILE_TYPE_RESULTS),
mCheckpointToQueue);

FileTransferInfo resultsFile(mDownloadDir,
FileType::HISTORY_FILE_TYPE_RESULTS,
mCheckpointToQueue);
auto getResultsWork = std::make_shared<GetAndUnzipRemoteFileWork>(
mApp, resultsFile, mArchive, /*logErrorOnFailure=*/false);
std::weak_ptr<GetAndUnzipRemoteFileWork> getResultsWorkWeak =
getResultsWork;
seq.emplace_back(getResultsWork);
seq.emplace_back(std::make_shared<WorkWithCallback>(
mApp, "get-results-" + std::to_string(mCheckpointToQueue),
[apply, getResultsWorkWeak, checkpoint, &dir](Application& app) {
auto getResults = getResultsWorkWeak.lock();
if (getResults && getResults->getState() != State::WORK_SUCCESS)
{
auto archive = getResults->getArchive();
if (archive)
{
FileTransferInfo ti(dir,
FileType::HISTORY_FILE_TYPE_RESULTS,
checkpoint);
CLOG_WARNING(
History,
"Archive {} maybe contains corrupt results file "
"{}. "
"This is not fatal as long as the archive contains "
"valid transaction history. Catchup will proceed "
"but"
"the node will not be able to skip known results.",
archive->getName(), ti.remoteName());
}
}
return true;
}));

filesToTransfer.push_back(resultsFile);
}
#endif // BUILD_TESTS

auto maybeWaitForMerges = [](Application& app) {
if (app.getConfig().CATCHUP_WAIT_MERGES_TX_APPLY_FOR_TESTING)
Expand Down Expand Up @@ -139,28 +185,34 @@ DownloadApplyTxsWork::yieldMoreWork()
mApp, "wait-merges" + apply->getName(), maybeWaitForMerges, apply));
}

seq.push_back(std::make_shared<WorkWithCallback>(
mApp, "delete-transactions-" + std::to_string(mCheckpointToQueue),
[ft](Application& app) {
try
{
std::filesystem::remove(
std::filesystem::path(ft.localPath_nogz()));
CLOG_DEBUG(History, "Deleted transactions {}",
for (auto const& ft : filesToTransfer)
{
auto deleteWorkName = "delete-" + ft.getTypeString() + "-" +
std::to_string(mCheckpointToQueue);
seq.push_back(std::make_shared<WorkWithCallback>(
mApp, deleteWorkName, [ft](Application& app) {
CLOG_DEBUG(History, "Deleting {} {}", ft.getTypeString(),
ft.localPath_nogz());
try
{
std::filesystem::remove(
std::filesystem::path(ft.localPath_nogz()));
CLOG_DEBUG(History, "Deleted {} {}", ft.getTypeString(),
ft.localPath_nogz());
}
catch (std::filesystem::filesystem_error const& e)
{
CLOG_ERROR(History, "Could not delete {} {}: {}",
ft.getTypeString(), ft.localPath_nogz(),
e.what());
return false;
}
return true;
}
catch (std::filesystem::filesystem_error const& e)
{
CLOG_ERROR(History, "Could not delete transactions {}: {}",
ft.localPath_nogz(), e.what());
return false;
}
}));

}));
}
auto nextWork = std::make_shared<WorkSequence>(
mApp, "download-apply-" + std::to_string(mCheckpointToQueue), seq,
BasicWork::RETRY_NEVER);
BasicWork::RETRY_NEVER, true /*stop at first failure*/);
mCheckpointToQueue += mApp.getHistoryManager().getCheckpointFrequency();
mLastYieldedWork = nextWork;
return nextWork;
Expand Down
15 changes: 15 additions & 0 deletions src/herder/LedgerCloseData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@ LedgerCloseData::LedgerCloseData(uint32_t ledgerSeq,
releaseAssert(txSet->getContentsHash() == mValue.txSetHash);
}

#ifdef BUILD_TESTS
LedgerCloseData::LedgerCloseData(
uint32_t ledgerSeq, TxSetXDRFrameConstPtr txSet, StellarValue const& v,
std::optional<Hash> const& expectedLedgerHash,
std::optional<TransactionResultSet> const& expectedResults)
: mLedgerSeq(ledgerSeq)
, mTxSet(txSet)
, mValue(v)
, mExpectedLedgerHash(expectedLedgerHash)
, mExpectedResults(expectedResults)
{
releaseAssert(txSet->getContentsHash() == mValue.txSetHash);
}
#endif // BUILD_TESTS

std::string
stellarValueToString(Config const& c, StellarValue const& sv)
{
Expand Down
Loading

0 comments on commit b887bba

Please sign in to comment.