From 650eb6a24a9fa68cdae77d72d86d79fa05cfd29e Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Mon, 8 Apr 2024 22:09:25 -0700 Subject: [PATCH] Fix hash probe spill issues (#9400) Summary: 1. Fix hash probe spill hang for right semi join types For right semi join (project and filter), it produce output from the probed rows in hash table after processing all the probe inputs. A probe input vector (set in Operator::input_) might be processed in multiple output calls until all the input rows have been processed. The probe side spill needs to spill output from the current probe input vector before clear the hash table and the current logic wait until the output is null. This is not correct for right semi joins as it always return null, and instead we shall check on if input_ has been reset. Add unit test to verify the fix plus join fuzzer run. 2. Disable hash probe spill if dynamic filters have been generated Hash probe might generate dynamic filters based on the hash join key ranges from build side and pushdown to upstream operators. If this has been triggered, then we can't spill from hash probe side as it might produce the incorrect result as detected by join fuzzer test. The fix is to disable hash probe spill if dynamic filters have been generated. Add unit test to verif the fix plus join fuzzer run. Pull Request resolved: https://github.com/facebookincubator/velox/pull/9400 Reviewed By: kagamiori, oerling Differential Revision: D55828946 Pulled By: xiaoxmeng fbshipit-source-id: ea4cd62abf0f9d169ad61cd5718bae4c44429e68 --- velox/connectors/Connector.h | 4 +- velox/core/PlanNode.h | 2 +- velox/exec/Driver.cpp | 8 +- velox/exec/HashProbe.cpp | 48 ++++++--- velox/exec/HashProbe.h | 12 ++- velox/exec/Operator.h | 26 ++--- velox/exec/tests/HashJoinTest.cpp | 174 ++++++++++++++++++++++++++++++ velox/exec/tests/JoinFuzzer.cpp | 33 +++--- 8 files changed, 256 insertions(+), 51 deletions(-) diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index b591603e9c31..f215e84d0183 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -355,8 +355,8 @@ class Connector { VELOX_NYI("connectorConfig is not supported yet"); } - // Returns true if this connector would accept a filter dynamically generated - // during query execution. + /// Returns true if this connector would accept a filter dynamically generated + /// during query execution. virtual bool canAddDynamicFilter() const { return false; } diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index f45a2962c79b..68285ab00bf2 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -1588,7 +1588,7 @@ class HashJoinNode : public AbstractJoinNode { if (nullAware) { VELOX_USER_CHECK( isNullAwareSupported(joinType), - "Null-aware flag is supported only for semi and anti joins"); + "Null-aware flag is supported only for semi project and anti joins"); VELOX_USER_CHECK_EQ( 1, leftKeys_.size(), "Null-aware joins allow only one join key"); diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 10e2cd81aff5..8d8fe4d47530 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -308,7 +308,8 @@ void Driver::pushdownFilters(int operatorIndex) { } const auto& identityProjections = prevOp->identityProjections(); - auto inputChannel = getIdentityProjection(identityProjections, channel); + const auto inputChannel = + getIdentityProjection(identityProjections, channel); if (!inputChannel.has_value()) { // Filter channel is not an identity projection. VELOX_CHECK( @@ -897,7 +898,7 @@ std::unordered_set Driver::canPushdownFilters( for (auto i = 0; i < channels.size(); ++i) { auto channel = channels[i]; for (auto j = filterSourceIndex - 1; j >= 0; --j) { - auto prevOp = operators_[j].get(); + auto* prevOp = operators_[j].get(); if (j == 0) { // Source operator. @@ -908,7 +909,8 @@ std::unordered_set Driver::canPushdownFilters( } const auto& identityProjections = prevOp->identityProjections(); - auto inputChannel = getIdentityProjection(identityProjections, channel); + const auto inputChannel = + getIdentityProjection(identityProjections, channel); if (!inputChannel.has_value()) { // Filter channel is not an identity projection. if (prevOp->canAddDynamicFilter()) { diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index b12fc010691e..99f364bf4847 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -331,29 +331,30 @@ void HashProbe::asyncWaitForHashTable() { isRightSemiFilterJoin(joinType_) || isRightSemiProjectJoin(joinType_)) && table_->hashMode() != BaseHashTable::HashMode::kHash && !isSpillInput() && !hasMoreSpillData()) { - // Find out whether there are any upstream operators that can accept - // dynamic filters on all or a subset of the join keys. Create dynamic - // filters to push down. + // Find out whether there are any upstream operators that can accept dynamic + // filters on all or a subset of the join keys. Create dynamic filters to + // push down. // // NOTE: this optimization is not applied in the following cases: (1) if the // probe input is read from spilled data and there is no upstream operators // involved; (2) if there is spill data to restore, then we can't filter // probe inputs solely based on the current table's join keys. const auto& buildHashers = table_->hashers(); - auto channels = operatorCtx_->driverCtx()->driver->canPushdownFilters( + const auto channels = operatorCtx_->driverCtx()->driver->canPushdownFilters( this, keyChannels_); // Null aware Right Semi Project join needs to know whether there are any // nulls on the probe side. Hence, cannot filter these out. const auto nullAllowed = isRightSemiProjectJoin(joinType_) && nullAware_; - for (auto i = 0; i < keyChannels_.size(); i++) { + for (auto i = 0; i < keyChannels_.size(); ++i) { if (channels.find(keyChannels_[i]) != channels.end()) { if (auto filter = buildHashers[i]->getFilter(nullAllowed)) { dynamicFilters_.emplace(keyChannels_[i], std::move(filter)); } } } + hasGeneratedDynamicFilters_ = !dynamicFilters_.empty(); } } @@ -529,8 +530,8 @@ BlockingReason HashProbe::isBlocked(ContinueFuture* future) { } void HashProbe::clearDynamicFilters() { - // The join can be completely replaced with a pushed down - // filter when the following conditions are met: + // The join can be completely replaced with a pushed down filter when the + // following conditions are met: // * hash table has a single key with unique values, // * build side has no dependent columns. if (keyChannels_.size() == 1 && !table_->hasDuplicateKeys() && @@ -833,7 +834,7 @@ bool HashProbe::skipProbeOnEmptyBuild() const { } bool HashProbe::spillEnabled() const { - return canReclaim(); + return canSpill() && !operatorCtx_->task()->hasMixedExecutionGroup(); } bool HashProbe::hasMoreSpillData() const { @@ -1010,7 +1011,7 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) { numOut = evalFilter(numOut); - if (!numOut) { + if (numOut == 0) { continue; } @@ -1547,7 +1548,9 @@ void HashProbe::ensureOutputFits() { } bool HashProbe::canReclaim() const { - return canSpill() && !operatorCtx_->task()->hasMixedExecutionGroup(); + // NOTE: we can't spill from a hash probe operator if it has generated dynamic + // filters. + return spillEnabled() && !hasGeneratedDynamicFilters_; } void HashProbe::reclaim( @@ -1692,13 +1695,26 @@ void HashProbe::spillOutput() { &spillStats_); outputSpiller->setPartitionsSpilled({0}); - RowVectorPtr output; - while ((output = getOutputInternal(/*toSpillOutput=*/true))) { - // Ensure vector are lazy loaded before spilling. - for (int32_t i = 0; i < output->childrenSize(); ++i) { - output->childAt(i)->loadedVector(); + RowVectorPtr output{nullptr}; + for (;;) { + output = getOutputInternal(/*toSpillOutput=*/true); + if (output != nullptr) { + // Ensure vector are lazy loaded before spilling. + for (int32_t i = 0; i < output->childrenSize(); ++i) { + output->childAt(i)->loadedVector(); + } + outputSpiller->spill(0, output); + continue; } - outputSpiller->spill(0, output); + // NOTE: for right semi join types, we need to check if 'input_' has been + // cleared or not instead of checking on output. The right semi joins only + // producing the output after processing all the probe inputs. + if (input_ == nullptr) { + break; + } + VELOX_CHECK( + isRightSemiFilterJoin(joinType_) || isRightSemiProjectJoin(joinType_)); + VELOX_CHECK((output == nullptr) && (input_ != nullptr)); } VELOX_CHECK_LE(outputSpiller->spilledPartitionSet().size(), 1); diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index 90007abb1424..e4eba2589f3c 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -44,9 +44,9 @@ class HashProbe : public Operator { } // NOTE: if we can't apply dynamic filtering, then we can start early to // read input even before the hash table has been built. - const auto channels = operatorCtx_->driverCtx()->driver->canPushdownFilters( - this, keyChannels_); - return channels.empty(); + return operatorCtx_->driverCtx() + ->driver->canPushdownFilters(this, keyChannels_) + .empty(); } void addInput(RowVectorPtr input) override; @@ -335,6 +335,12 @@ class HashProbe : public Operator { // Channel of probe keys in 'input_'. std::vector keyChannels_; + // True if we have generated dynamic filters from the hash build join keys. + // + // NOTE: 'dynamicFilters_' might have been cleared once they have been pushed + // down to the upstream operators. + tsan_atomic hasGeneratedDynamicFilters_{false}; + // True if the join can become a no-op starting with the next batch of input. bool canReplaceWithDynamicFilter_{false}; diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 61ec37992e5e..c48862c60b63 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -257,24 +257,24 @@ class OperatorCtx { mutable std::unique_ptr execCtx_; }; -// Query operator +/// Query operator class Operator : public BaseRuntimeStatWriter { public: - // Factory class for mapping a user-registered PlanNode into the corresponding - // Operator. + /// Factory class for mapping a user-registered PlanNode into the + /// corresponding Operator. class PlanNodeTranslator { public: virtual ~PlanNodeTranslator() = default; - // Translates plan node to operator. Returns nullptr if the plan node cannot - // be handled by this factory. + /// Translates plan node to operator. Returns nullptr if the plan node + /// cannot be handled by this factory. virtual std::unique_ptr toOperator(DriverCtx* ctx, int32_t id, const core::PlanNodePtr& node) { return nullptr; } - // An overloaded method that should be called when the operator needs an - // ExchangeClient. + /// An overloaded method that should be called when the operator needs an + /// ExchangeClient. virtual std::unique_ptr toOperator( DriverCtx* ctx, int32_t id, @@ -283,22 +283,22 @@ class Operator : public BaseRuntimeStatWriter { return nullptr; } - // Translates plan node to join bridge. Returns nullptr if the plan node - // cannot be handled by this factory. + /// Translates plan node to join bridge. Returns nullptr if the plan node + /// cannot be handled by this factory. virtual std::unique_ptr toJoinBridge( const core::PlanNodePtr& /* node */) { return nullptr; } - // Translates plan node to operator supplier. Returns nullptr if the plan - // node cannot be handled by this factory. + /// Translates plan node to operator supplier. Returns nullptr if the plan + /// node cannot be handled by this factory. virtual OperatorSupplier toOperatorSupplier( const core::PlanNodePtr& /* node */) { return nullptr; } - // Returns max driver count for the plan node. Returns std::nullopt if the - // plan node cannot be handled by this factory. + /// Returns max driver count for the plan node. Returns std::nullopt if the + /// plan node cannot be handled by this factory. virtual std::optional maxDrivers( const core::PlanNodePtr& /* node */) { return std::nullopt; diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 2ce435b09a34..77a51e2e7d4a 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -7415,4 +7415,178 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashProbeSpillUnderNonReclaimableSection) { }) .run(); } + +// This test case is to cover the case that hash probe trigger spill for right +// semi join types and the pending input needs to be processed in multiple +// steps. +DEBUG_ONLY_TEST_F(HashJoinTest, spillOutputWithRightSemiJoins) { + for (const auto joinType : + {core::JoinType::kRightSemiFilter, core::JoinType::kRightSemiProject}) { + std::atomic_bool injectOnce{true}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::getOutput", + std::function([&](Operator* op) { + if (op->testingOperatorCtx()->operatorType() != "HashProbe") { + return; + } + if (!op->testingHasInput()) { + return; + } + if (!injectOnce.exchange(false)) { + return; + } + testingRunArbitration(op->pool()); + })); + + std::string duckDbSqlReference; + std::vector joinOutputLayout; + bool nullAware{false}; + if (joinType == core::JoinType::kRightSemiProject) { + duckDbSqlReference = "SELECT u_k2, u_k1 IN (SELECT t_k1 FROM t) FROM u"; + joinOutputLayout = {"u_k2", "match"}; + // Null aware is only supported for semi projection join type. + nullAware = true; + } else { + duckDbSqlReference = + "SELECT u_k2 FROM u WHERE u_k1 IN (SELECT t_k1 FROM t)"; + joinOutputLayout = {"u_k2"}; + } + + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) + .numDrivers(1) + .spillDirectory(spillDirectory->path) + .probeType(probeType_) + .probeVectors(128, 3) + .probeKeys({"t_k1"}) + .buildType(buildType_) + .buildVectors(128, 4) + .buildKeys({"u_k1"}) + .joinType(joinType) + // Set a small number of output rows to process the input in multiple + // steps. + .config( + core::QueryConfig::kPreferredOutputBatchRows, std::to_string(10)) + .injectSpill(false) + .joinOutputLayout(std::move(joinOutputLayout)) + .nullAware(nullAware) + .referenceQuery(duckDbSqlReference) + .run(); + } +} + +DEBUG_ONLY_TEST_F(HashJoinTest, spillCheckOnLeftSemiFilterWithDynamicFilters) { + const int32_t numSplits = 10; + const int32_t numRowsProbe = 333; + const int32_t numRowsBuild = 100; + + std::vector probeVectors; + probeVectors.reserve(numSplits); + + std::vector> tempFiles; + for (int32_t i = 0; i < numSplits; ++i) { + auto rowVector = makeRowVector({ + makeFlatVector( + numRowsProbe, [&](auto row) { return row - i * 10; }), + makeFlatVector(numRowsProbe, [](auto row) { return row; }), + }); + probeVectors.push_back(rowVector); + tempFiles.push_back(TempFilePath::create()); + writeToFile(tempFiles.back()->path, rowVector); + } + auto makeInputSplits = [&](const core::PlanNodeId& nodeId) { + return [&] { + std::vector probeSplits; + for (auto& file : tempFiles) { + probeSplits.push_back(exec::Split(makeHiveConnectorSplit(file->path))); + } + SplitInput splits; + splits.emplace(nodeId, probeSplits); + return splits; + }; + }; + + // 100 key values in [35, 233] range. + std::vector buildVectors; + for (int i = 0; i < 5; ++i) { + buildVectors.push_back(makeRowVector({ + makeFlatVector( + numRowsBuild / 5, + [i](auto row) { return 35 + 2 * (row + i * numRowsBuild / 5); }), + makeFlatVector(numRowsBuild / 5, [](auto row) { return row; }), + })); + } + std::vector keyOnlyBuildVectors; + for (int i = 0; i < 5; ++i) { + keyOnlyBuildVectors.push_back( + makeRowVector({makeFlatVector(numRowsBuild / 5, [i](auto row) { + return 35 + 2 * (row + i * numRowsBuild / 5); + })})); + } + + createDuckDbTable("t", probeVectors); + createDuckDbTable("u", buildVectors); + + auto probeType = ROW({"c0", "c1"}, {INTEGER(), BIGINT()}); + + auto planNodeIdGenerator = std::make_shared(); + + auto buildSide = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values(buildVectors) + .project({"c0 AS u_c0", "c1 AS u_c1"}) + .planNode(); + auto keyOnlyBuildSide = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values(keyOnlyBuildVectors) + .project({"c0 AS u_c0"}) + .planNode(); + + // Left semi join. + core::PlanNodeId probeScanId; + core::PlanNodeId joinNodeId; + const auto op = PlanBuilder(planNodeIdGenerator, pool_.get()) + .tableScan(probeType) + .capturePlanNodeId(probeScanId) + .hashJoin( + {"c0"}, + {"u_c0"}, + buildSide, + "", + {"c0", "c1"}, + core::JoinType::kLeftSemiFilter) + .capturePlanNodeId(joinNodeId) + .project({"c0", "c1 + 1"}) + .planNode(); + + std::atomic_bool injectOnce{true}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::getOutput", + std::function([&](Operator* op) { + if (op->testingOperatorCtx()->operatorType() != "HashProbe") { + return; + } + if (!op->testingHasInput()) { + return; + } + if (!injectOnce.exchange(false)) { + return; + } + testingRunArbitration(op->pool()); + })); + + auto spillDirectory = exec::test::TempDirectoryPath::create(); + HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) + .planNode(std::move(op)) + .makeInputSplits(makeInputSplits(probeScanId)) + .spillDirectory(spillDirectory->path) + .injectSpill(false) + .referenceQuery( + "SELECT t.c0, t.c1 + 1 FROM t WHERE t.c0 IN (SELECT c0 FROM u)") + .verifier([&](const std::shared_ptr& task, bool /*unused*/) { + // Verify spill hasn't triggered. + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& planStats = taskStats.at(joinNodeId); + ASSERT_EQ(planStats.spilledBytes, 0); + }) + .run(); +} } // namespace diff --git a/velox/exec/tests/JoinFuzzer.cpp b/velox/exec/tests/JoinFuzzer.cpp index 8f9e6638e9cf..45b4faf77510 100644 --- a/velox/exec/tests/JoinFuzzer.cpp +++ b/velox/exec/tests/JoinFuzzer.cpp @@ -405,15 +405,14 @@ std::vector flatten(const std::vector& vectors) { } RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) { - LOG(ERROR) << "Executing query plan with " - << executionStrategyToString(plan.executionStrategy) - << " strategy[" - << (plan.executionStrategy == core::ExecutionStrategy::kGrouped - ? plan.numGroups - : 0) - << " groups]" << (injectSpill ? " and spilling injection" : "") - << ": " << std::endl - << plan.plan->toString(true, true); + LOG(INFO) << "Executing query plan with " + << executionStrategyToString(plan.executionStrategy) << " strategy[" + << (plan.executionStrategy == core::ExecutionStrategy::kGrouped + ? plan.numGroups + : 0) + << " groups]" << (injectSpill ? " and spilling injection" : "") + << ": " << std::endl + << plan.plan->toString(true, true); AssertQueryBuilder builder(plan.plan); for (const auto& [planNodeId, nodeSplits] : plan.splits) { @@ -1017,9 +1016,16 @@ void JoinFuzzer::verify(core::JoinType joinType) { LOG(INFO) << "Testing plan #" << i << " with spilling"; actual = execute(altPlans[i], /*=injectSpill=*/true); if (actual != nullptr && expected != nullptr) { - VELOX_CHECK( - assertEqualResults({expected}, {actual}), - "Logically equivalent plans produced different results"); + try { + VELOX_CHECK( + assertEqualResults({expected}, {actual}), + "Logically equivalent plans produced different results"); + } catch (const VeloxException& e) { + LOG(ERROR) << "Expected\n" + << expected->toString(0, expected->size()) << "\nActual\n" + << actual->toString(0, actual->size()); + throw; + } } else { VELOX_CHECK( FLAGS_enable_oom_injection, "Got unexpected nullptr for results"); @@ -1206,9 +1212,10 @@ std::vector> JoinFuzzer::splitInputByGroup( } void JoinFuzzer::go() { - VELOX_CHECK( + VELOX_USER_CHECK( FLAGS_steps > 0 || FLAGS_duration_sec > 0, "Either --steps or --duration_sec needs to be greater than zero.") + VELOX_USER_CHECK_GE(FLAGS_batch_size, 10, "Batch size must be at least 10."); const auto startTime = std::chrono::system_clock::now(); size_t iteration = 0;