diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index b591603e9c31..f215e84d0183 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -355,8 +355,8 @@ class Connector { VELOX_NYI("connectorConfig is not supported yet"); } - // Returns true if this connector would accept a filter dynamically generated - // during query execution. + /// Returns true if this connector would accept a filter dynamically generated + /// during query execution. virtual bool canAddDynamicFilter() const { return false; } diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index f45a2962c79b..68285ab00bf2 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -1588,7 +1588,7 @@ class HashJoinNode : public AbstractJoinNode { if (nullAware) { VELOX_USER_CHECK( isNullAwareSupported(joinType), - "Null-aware flag is supported only for semi and anti joins"); + "Null-aware flag is supported only for semi project and anti joins"); VELOX_USER_CHECK_EQ( 1, leftKeys_.size(), "Null-aware joins allow only one join key"); diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 10e2cd81aff5..8d8fe4d47530 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -308,7 +308,8 @@ void Driver::pushdownFilters(int operatorIndex) { } const auto& identityProjections = prevOp->identityProjections(); - auto inputChannel = getIdentityProjection(identityProjections, channel); + const auto inputChannel = + getIdentityProjection(identityProjections, channel); if (!inputChannel.has_value()) { // Filter channel is not an identity projection. VELOX_CHECK( @@ -897,7 +898,7 @@ std::unordered_set Driver::canPushdownFilters( for (auto i = 0; i < channels.size(); ++i) { auto channel = channels[i]; for (auto j = filterSourceIndex - 1; j >= 0; --j) { - auto prevOp = operators_[j].get(); + auto* prevOp = operators_[j].get(); if (j == 0) { // Source operator. @@ -908,7 +909,8 @@ std::unordered_set Driver::canPushdownFilters( } const auto& identityProjections = prevOp->identityProjections(); - auto inputChannel = getIdentityProjection(identityProjections, channel); + const auto inputChannel = + getIdentityProjection(identityProjections, channel); if (!inputChannel.has_value()) { // Filter channel is not an identity projection. if (prevOp->canAddDynamicFilter()) { diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index b12fc010691e..99f364bf4847 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -331,29 +331,30 @@ void HashProbe::asyncWaitForHashTable() { isRightSemiFilterJoin(joinType_) || isRightSemiProjectJoin(joinType_)) && table_->hashMode() != BaseHashTable::HashMode::kHash && !isSpillInput() && !hasMoreSpillData()) { - // Find out whether there are any upstream operators that can accept - // dynamic filters on all or a subset of the join keys. Create dynamic - // filters to push down. + // Find out whether there are any upstream operators that can accept dynamic + // filters on all or a subset of the join keys. Create dynamic filters to + // push down. // // NOTE: this optimization is not applied in the following cases: (1) if the // probe input is read from spilled data and there is no upstream operators // involved; (2) if there is spill data to restore, then we can't filter // probe inputs solely based on the current table's join keys. const auto& buildHashers = table_->hashers(); - auto channels = operatorCtx_->driverCtx()->driver->canPushdownFilters( + const auto channels = operatorCtx_->driverCtx()->driver->canPushdownFilters( this, keyChannels_); // Null aware Right Semi Project join needs to know whether there are any // nulls on the probe side. Hence, cannot filter these out. const auto nullAllowed = isRightSemiProjectJoin(joinType_) && nullAware_; - for (auto i = 0; i < keyChannels_.size(); i++) { + for (auto i = 0; i < keyChannels_.size(); ++i) { if (channels.find(keyChannels_[i]) != channels.end()) { if (auto filter = buildHashers[i]->getFilter(nullAllowed)) { dynamicFilters_.emplace(keyChannels_[i], std::move(filter)); } } } + hasGeneratedDynamicFilters_ = !dynamicFilters_.empty(); } } @@ -529,8 +530,8 @@ BlockingReason HashProbe::isBlocked(ContinueFuture* future) { } void HashProbe::clearDynamicFilters() { - // The join can be completely replaced with a pushed down - // filter when the following conditions are met: + // The join can be completely replaced with a pushed down filter when the + // following conditions are met: // * hash table has a single key with unique values, // * build side has no dependent columns. if (keyChannels_.size() == 1 && !table_->hasDuplicateKeys() && @@ -833,7 +834,7 @@ bool HashProbe::skipProbeOnEmptyBuild() const { } bool HashProbe::spillEnabled() const { - return canReclaim(); + return canSpill() && !operatorCtx_->task()->hasMixedExecutionGroup(); } bool HashProbe::hasMoreSpillData() const { @@ -1010,7 +1011,7 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) { numOut = evalFilter(numOut); - if (!numOut) { + if (numOut == 0) { continue; } @@ -1547,7 +1548,9 @@ void HashProbe::ensureOutputFits() { } bool HashProbe::canReclaim() const { - return canSpill() && !operatorCtx_->task()->hasMixedExecutionGroup(); + // NOTE: we can't spill from a hash probe operator if it has generated dynamic + // filters. + return spillEnabled() && !hasGeneratedDynamicFilters_; } void HashProbe::reclaim( @@ -1692,13 +1695,26 @@ void HashProbe::spillOutput() { &spillStats_); outputSpiller->setPartitionsSpilled({0}); - RowVectorPtr output; - while ((output = getOutputInternal(/*toSpillOutput=*/true))) { - // Ensure vector are lazy loaded before spilling. - for (int32_t i = 0; i < output->childrenSize(); ++i) { - output->childAt(i)->loadedVector(); + RowVectorPtr output{nullptr}; + for (;;) { + output = getOutputInternal(/*toSpillOutput=*/true); + if (output != nullptr) { + // Ensure vector are lazy loaded before spilling. + for (int32_t i = 0; i < output->childrenSize(); ++i) { + output->childAt(i)->loadedVector(); + } + outputSpiller->spill(0, output); + continue; } - outputSpiller->spill(0, output); + // NOTE: for right semi join types, we need to check if 'input_' has been + // cleared or not instead of checking on output. The right semi joins only + // producing the output after processing all the probe inputs. + if (input_ == nullptr) { + break; + } + VELOX_CHECK( + isRightSemiFilterJoin(joinType_) || isRightSemiProjectJoin(joinType_)); + VELOX_CHECK((output == nullptr) && (input_ != nullptr)); } VELOX_CHECK_LE(outputSpiller->spilledPartitionSet().size(), 1); diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index 90007abb1424..e4eba2589f3c 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -44,9 +44,9 @@ class HashProbe : public Operator { } // NOTE: if we can't apply dynamic filtering, then we can start early to // read input even before the hash table has been built. - const auto channels = operatorCtx_->driverCtx()->driver->canPushdownFilters( - this, keyChannels_); - return channels.empty(); + return operatorCtx_->driverCtx() + ->driver->canPushdownFilters(this, keyChannels_) + .empty(); } void addInput(RowVectorPtr input) override; @@ -335,6 +335,12 @@ class HashProbe : public Operator { // Channel of probe keys in 'input_'. std::vector keyChannels_; + // True if we have generated dynamic filters from the hash build join keys. + // + // NOTE: 'dynamicFilters_' might have been cleared once they have been pushed + // down to the upstream operators. + tsan_atomic hasGeneratedDynamicFilters_{false}; + // True if the join can become a no-op starting with the next batch of input. bool canReplaceWithDynamicFilter_{false}; diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 61ec37992e5e..c48862c60b63 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -257,24 +257,24 @@ class OperatorCtx { mutable std::unique_ptr execCtx_; }; -// Query operator +/// Query operator class Operator : public BaseRuntimeStatWriter { public: - // Factory class for mapping a user-registered PlanNode into the corresponding - // Operator. + /// Factory class for mapping a user-registered PlanNode into the + /// corresponding Operator. class PlanNodeTranslator { public: virtual ~PlanNodeTranslator() = default; - // Translates plan node to operator. Returns nullptr if the plan node cannot - // be handled by this factory. + /// Translates plan node to operator. Returns nullptr if the plan node + /// cannot be handled by this factory. virtual std::unique_ptr toOperator(DriverCtx* ctx, int32_t id, const core::PlanNodePtr& node) { return nullptr; } - // An overloaded method that should be called when the operator needs an - // ExchangeClient. + /// An overloaded method that should be called when the operator needs an + /// ExchangeClient. virtual std::unique_ptr toOperator( DriverCtx* ctx, int32_t id, @@ -283,22 +283,22 @@ class Operator : public BaseRuntimeStatWriter { return nullptr; } - // Translates plan node to join bridge. Returns nullptr if the plan node - // cannot be handled by this factory. + /// Translates plan node to join bridge. Returns nullptr if the plan node + /// cannot be handled by this factory. virtual std::unique_ptr toJoinBridge( const core::PlanNodePtr& /* node */) { return nullptr; } - // Translates plan node to operator supplier. Returns nullptr if the plan - // node cannot be handled by this factory. + /// Translates plan node to operator supplier. Returns nullptr if the plan + /// node cannot be handled by this factory. virtual OperatorSupplier toOperatorSupplier( const core::PlanNodePtr& /* node */) { return nullptr; } - // Returns max driver count for the plan node. Returns std::nullopt if the - // plan node cannot be handled by this factory. + /// Returns max driver count for the plan node. Returns std::nullopt if the + /// plan node cannot be handled by this factory. virtual std::optional maxDrivers( const core::PlanNodePtr& /* node */) { return std::nullopt; diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 2ce435b09a34..77a51e2e7d4a 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -7415,4 +7415,178 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashProbeSpillUnderNonReclaimableSection) { }) .run(); } + +// This test case is to cover the case that hash probe trigger spill for right +// semi join types and the pending input needs to be processed in multiple +// steps. +DEBUG_ONLY_TEST_F(HashJoinTest, spillOutputWithRightSemiJoins) { + for (const auto joinType : + {core::JoinType::kRightSemiFilter, core::JoinType::kRightSemiProject}) { + std::atomic_bool injectOnce{true}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::getOutput", + std::function([&](Operator* op) { + if (op->testingOperatorCtx()->operatorType() != "HashProbe") { + return; + } + if (!op->testingHasInput()) { + return; + } + if (!injectOnce.exchange(false)) { + return; + } + testingRunArbitration(op->pool()); + })); + + std::string duckDbSqlReference; + std::vector joinOutputLayout; + bool nullAware{false}; + if (joinType == core::JoinType::kRightSemiProject) { + duckDbSqlReference = "SELECT u_k2, u_k1 IN (SELECT t_k1 FROM t) FROM u"; + joinOutputLayout = {"u_k2", "match"}; + // Null aware is only supported for semi projection join type. + nullAware = true; + } else { + duckDbSqlReference = + "SELECT u_k2 FROM u WHERE u_k1 IN (SELECT t_k1 FROM t)"; + joinOutputLayout = {"u_k2"}; + } + + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) + .numDrivers(1) + .spillDirectory(spillDirectory->path) + .probeType(probeType_) + .probeVectors(128, 3) + .probeKeys({"t_k1"}) + .buildType(buildType_) + .buildVectors(128, 4) + .buildKeys({"u_k1"}) + .joinType(joinType) + // Set a small number of output rows to process the input in multiple + // steps. + .config( + core::QueryConfig::kPreferredOutputBatchRows, std::to_string(10)) + .injectSpill(false) + .joinOutputLayout(std::move(joinOutputLayout)) + .nullAware(nullAware) + .referenceQuery(duckDbSqlReference) + .run(); + } +} + +DEBUG_ONLY_TEST_F(HashJoinTest, spillCheckOnLeftSemiFilterWithDynamicFilters) { + const int32_t numSplits = 10; + const int32_t numRowsProbe = 333; + const int32_t numRowsBuild = 100; + + std::vector probeVectors; + probeVectors.reserve(numSplits); + + std::vector> tempFiles; + for (int32_t i = 0; i < numSplits; ++i) { + auto rowVector = makeRowVector({ + makeFlatVector( + numRowsProbe, [&](auto row) { return row - i * 10; }), + makeFlatVector(numRowsProbe, [](auto row) { return row; }), + }); + probeVectors.push_back(rowVector); + tempFiles.push_back(TempFilePath::create()); + writeToFile(tempFiles.back()->path, rowVector); + } + auto makeInputSplits = [&](const core::PlanNodeId& nodeId) { + return [&] { + std::vector probeSplits; + for (auto& file : tempFiles) { + probeSplits.push_back(exec::Split(makeHiveConnectorSplit(file->path))); + } + SplitInput splits; + splits.emplace(nodeId, probeSplits); + return splits; + }; + }; + + // 100 key values in [35, 233] range. + std::vector buildVectors; + for (int i = 0; i < 5; ++i) { + buildVectors.push_back(makeRowVector({ + makeFlatVector( + numRowsBuild / 5, + [i](auto row) { return 35 + 2 * (row + i * numRowsBuild / 5); }), + makeFlatVector(numRowsBuild / 5, [](auto row) { return row; }), + })); + } + std::vector keyOnlyBuildVectors; + for (int i = 0; i < 5; ++i) { + keyOnlyBuildVectors.push_back( + makeRowVector({makeFlatVector(numRowsBuild / 5, [i](auto row) { + return 35 + 2 * (row + i * numRowsBuild / 5); + })})); + } + + createDuckDbTable("t", probeVectors); + createDuckDbTable("u", buildVectors); + + auto probeType = ROW({"c0", "c1"}, {INTEGER(), BIGINT()}); + + auto planNodeIdGenerator = std::make_shared(); + + auto buildSide = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values(buildVectors) + .project({"c0 AS u_c0", "c1 AS u_c1"}) + .planNode(); + auto keyOnlyBuildSide = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values(keyOnlyBuildVectors) + .project({"c0 AS u_c0"}) + .planNode(); + + // Left semi join. + core::PlanNodeId probeScanId; + core::PlanNodeId joinNodeId; + const auto op = PlanBuilder(planNodeIdGenerator, pool_.get()) + .tableScan(probeType) + .capturePlanNodeId(probeScanId) + .hashJoin( + {"c0"}, + {"u_c0"}, + buildSide, + "", + {"c0", "c1"}, + core::JoinType::kLeftSemiFilter) + .capturePlanNodeId(joinNodeId) + .project({"c0", "c1 + 1"}) + .planNode(); + + std::atomic_bool injectOnce{true}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::getOutput", + std::function([&](Operator* op) { + if (op->testingOperatorCtx()->operatorType() != "HashProbe") { + return; + } + if (!op->testingHasInput()) { + return; + } + if (!injectOnce.exchange(false)) { + return; + } + testingRunArbitration(op->pool()); + })); + + auto spillDirectory = exec::test::TempDirectoryPath::create(); + HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) + .planNode(std::move(op)) + .makeInputSplits(makeInputSplits(probeScanId)) + .spillDirectory(spillDirectory->path) + .injectSpill(false) + .referenceQuery( + "SELECT t.c0, t.c1 + 1 FROM t WHERE t.c0 IN (SELECT c0 FROM u)") + .verifier([&](const std::shared_ptr& task, bool /*unused*/) { + // Verify spill hasn't triggered. + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& planStats = taskStats.at(joinNodeId); + ASSERT_EQ(planStats.spilledBytes, 0); + }) + .run(); +} } // namespace diff --git a/velox/exec/tests/JoinFuzzer.cpp b/velox/exec/tests/JoinFuzzer.cpp index 8f9e6638e9cf..45b4faf77510 100644 --- a/velox/exec/tests/JoinFuzzer.cpp +++ b/velox/exec/tests/JoinFuzzer.cpp @@ -405,15 +405,14 @@ std::vector flatten(const std::vector& vectors) { } RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) { - LOG(ERROR) << "Executing query plan with " - << executionStrategyToString(plan.executionStrategy) - << " strategy[" - << (plan.executionStrategy == core::ExecutionStrategy::kGrouped - ? plan.numGroups - : 0) - << " groups]" << (injectSpill ? " and spilling injection" : "") - << ": " << std::endl - << plan.plan->toString(true, true); + LOG(INFO) << "Executing query plan with " + << executionStrategyToString(plan.executionStrategy) << " strategy[" + << (plan.executionStrategy == core::ExecutionStrategy::kGrouped + ? plan.numGroups + : 0) + << " groups]" << (injectSpill ? " and spilling injection" : "") + << ": " << std::endl + << plan.plan->toString(true, true); AssertQueryBuilder builder(plan.plan); for (const auto& [planNodeId, nodeSplits] : plan.splits) { @@ -1017,9 +1016,16 @@ void JoinFuzzer::verify(core::JoinType joinType) { LOG(INFO) << "Testing plan #" << i << " with spilling"; actual = execute(altPlans[i], /*=injectSpill=*/true); if (actual != nullptr && expected != nullptr) { - VELOX_CHECK( - assertEqualResults({expected}, {actual}), - "Logically equivalent plans produced different results"); + try { + VELOX_CHECK( + assertEqualResults({expected}, {actual}), + "Logically equivalent plans produced different results"); + } catch (const VeloxException& e) { + LOG(ERROR) << "Expected\n" + << expected->toString(0, expected->size()) << "\nActual\n" + << actual->toString(0, actual->size()); + throw; + } } else { VELOX_CHECK( FLAGS_enable_oom_injection, "Got unexpected nullptr for results"); @@ -1206,9 +1212,10 @@ std::vector> JoinFuzzer::splitInputByGroup( } void JoinFuzzer::go() { - VELOX_CHECK( + VELOX_USER_CHECK( FLAGS_steps > 0 || FLAGS_duration_sec > 0, "Either --steps or --duration_sec needs to be greater than zero.") + VELOX_USER_CHECK_GE(FLAGS_batch_size, 10, "Batch size must be at least 10."); const auto startTime = std::chrono::system_clock::now(); size_t iteration = 0;