Skip to content

Commit

Permalink
Fix hash probe spill issues (facebookincubator#9400)
Browse files Browse the repository at this point in the history
Summary:
1. Fix hash probe spill hang for right semi join types
For right semi join (project and filter), it produce output from the probed rows in hash table
after processing all the probe inputs. A probe input vector (set in Operator::input_) might be
processed in multiple output calls until all the input rows have been processed. The probe side
spill needs to spill output from the current probe input vector before clear the hash table and
the current logic wait until the output is null. This is not correct for right semi joins as it always
return null, and instead we shall check on if input_ has been reset.
Add unit test to verify the fix plus join fuzzer run.

2. Disable hash probe spill if dynamic filters have been generated
Hash probe might generate dynamic filters based on the hash join key ranges from build side and
pushdown to upstream operators. If this has been triggered, then we can't spill from hash probe
side as it might produce the incorrect result as detected by join fuzzer test. The fix is to disable
hash probe spill if dynamic filters have been generated.
Add unit test to verif the fix plus join fuzzer run.

Pull Request resolved: facebookincubator#9400

Reviewed By: kagamiori, oerling

Differential Revision: D55828946

Pulled By: xiaoxmeng

fbshipit-source-id: ea4cd62abf0f9d169ad61cd5718bae4c44429e68
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Apr 9, 2024
1 parent 9c83ef9 commit 650eb6a
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 51 deletions.
4 changes: 2 additions & 2 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,8 @@ class Connector {
VELOX_NYI("connectorConfig is not supported yet");
}

// Returns true if this connector would accept a filter dynamically generated
// during query execution.
/// Returns true if this connector would accept a filter dynamically generated
/// during query execution.
virtual bool canAddDynamicFilter() const {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -1588,7 +1588,7 @@ class HashJoinNode : public AbstractJoinNode {
if (nullAware) {
VELOX_USER_CHECK(
isNullAwareSupported(joinType),
"Null-aware flag is supported only for semi and anti joins");
"Null-aware flag is supported only for semi project and anti joins");
VELOX_USER_CHECK_EQ(
1, leftKeys_.size(), "Null-aware joins allow only one join key");

Expand Down
8 changes: 5 additions & 3 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ void Driver::pushdownFilters(int operatorIndex) {
}

const auto& identityProjections = prevOp->identityProjections();
auto inputChannel = getIdentityProjection(identityProjections, channel);
const auto inputChannel =
getIdentityProjection(identityProjections, channel);
if (!inputChannel.has_value()) {
// Filter channel is not an identity projection.
VELOX_CHECK(
Expand Down Expand Up @@ -897,7 +898,7 @@ std::unordered_set<column_index_t> Driver::canPushdownFilters(
for (auto i = 0; i < channels.size(); ++i) {
auto channel = channels[i];
for (auto j = filterSourceIndex - 1; j >= 0; --j) {
auto prevOp = operators_[j].get();
auto* prevOp = operators_[j].get();

if (j == 0) {
// Source operator.
Expand All @@ -908,7 +909,8 @@ std::unordered_set<column_index_t> Driver::canPushdownFilters(
}

const auto& identityProjections = prevOp->identityProjections();
auto inputChannel = getIdentityProjection(identityProjections, channel);
const auto inputChannel =
getIdentityProjection(identityProjections, channel);
if (!inputChannel.has_value()) {
// Filter channel is not an identity projection.
if (prevOp->canAddDynamicFilter()) {
Expand Down
48 changes: 32 additions & 16 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,29 +331,30 @@ void HashProbe::asyncWaitForHashTable() {
isRightSemiFilterJoin(joinType_) || isRightSemiProjectJoin(joinType_)) &&
table_->hashMode() != BaseHashTable::HashMode::kHash && !isSpillInput() &&
!hasMoreSpillData()) {
// Find out whether there are any upstream operators that can accept
// dynamic filters on all or a subset of the join keys. Create dynamic
// filters to push down.
// Find out whether there are any upstream operators that can accept dynamic
// filters on all or a subset of the join keys. Create dynamic filters to
// push down.
//
// NOTE: this optimization is not applied in the following cases: (1) if the
// probe input is read from spilled data and there is no upstream operators
// involved; (2) if there is spill data to restore, then we can't filter
// probe inputs solely based on the current table's join keys.
const auto& buildHashers = table_->hashers();
auto channels = operatorCtx_->driverCtx()->driver->canPushdownFilters(
const auto channels = operatorCtx_->driverCtx()->driver->canPushdownFilters(
this, keyChannels_);

// Null aware Right Semi Project join needs to know whether there are any
// nulls on the probe side. Hence, cannot filter these out.
const auto nullAllowed = isRightSemiProjectJoin(joinType_) && nullAware_;

for (auto i = 0; i < keyChannels_.size(); i++) {
for (auto i = 0; i < keyChannels_.size(); ++i) {
if (channels.find(keyChannels_[i]) != channels.end()) {
if (auto filter = buildHashers[i]->getFilter(nullAllowed)) {
dynamicFilters_.emplace(keyChannels_[i], std::move(filter));
}
}
}
hasGeneratedDynamicFilters_ = !dynamicFilters_.empty();
}
}

Expand Down Expand Up @@ -529,8 +530,8 @@ BlockingReason HashProbe::isBlocked(ContinueFuture* future) {
}

void HashProbe::clearDynamicFilters() {
// The join can be completely replaced with a pushed down
// filter when the following conditions are met:
// The join can be completely replaced with a pushed down filter when the
// following conditions are met:
// * hash table has a single key with unique values,
// * build side has no dependent columns.
if (keyChannels_.size() == 1 && !table_->hasDuplicateKeys() &&
Expand Down Expand Up @@ -833,7 +834,7 @@ bool HashProbe::skipProbeOnEmptyBuild() const {
}

bool HashProbe::spillEnabled() const {
return canReclaim();
return canSpill() && !operatorCtx_->task()->hasMixedExecutionGroup();
}

bool HashProbe::hasMoreSpillData() const {
Expand Down Expand Up @@ -1010,7 +1011,7 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) {

numOut = evalFilter(numOut);

if (!numOut) {
if (numOut == 0) {
continue;
}

Expand Down Expand Up @@ -1547,7 +1548,9 @@ void HashProbe::ensureOutputFits() {
}

bool HashProbe::canReclaim() const {
return canSpill() && !operatorCtx_->task()->hasMixedExecutionGroup();
// NOTE: we can't spill from a hash probe operator if it has generated dynamic
// filters.
return spillEnabled() && !hasGeneratedDynamicFilters_;
}

void HashProbe::reclaim(
Expand Down Expand Up @@ -1692,13 +1695,26 @@ void HashProbe::spillOutput() {
&spillStats_);
outputSpiller->setPartitionsSpilled({0});

RowVectorPtr output;
while ((output = getOutputInternal(/*toSpillOutput=*/true))) {
// Ensure vector are lazy loaded before spilling.
for (int32_t i = 0; i < output->childrenSize(); ++i) {
output->childAt(i)->loadedVector();
RowVectorPtr output{nullptr};
for (;;) {
output = getOutputInternal(/*toSpillOutput=*/true);
if (output != nullptr) {
// Ensure vector are lazy loaded before spilling.
for (int32_t i = 0; i < output->childrenSize(); ++i) {
output->childAt(i)->loadedVector();
}
outputSpiller->spill(0, output);
continue;
}
outputSpiller->spill(0, output);
// NOTE: for right semi join types, we need to check if 'input_' has been
// cleared or not instead of checking on output. The right semi joins only
// producing the output after processing all the probe inputs.
if (input_ == nullptr) {
break;
}
VELOX_CHECK(
isRightSemiFilterJoin(joinType_) || isRightSemiProjectJoin(joinType_));
VELOX_CHECK((output == nullptr) && (input_ != nullptr));
}
VELOX_CHECK_LE(outputSpiller->spilledPartitionSet().size(), 1);

Expand Down
12 changes: 9 additions & 3 deletions velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class HashProbe : public Operator {
}
// NOTE: if we can't apply dynamic filtering, then we can start early to
// read input even before the hash table has been built.
const auto channels = operatorCtx_->driverCtx()->driver->canPushdownFilters(
this, keyChannels_);
return channels.empty();
return operatorCtx_->driverCtx()
->driver->canPushdownFilters(this, keyChannels_)
.empty();
}

void addInput(RowVectorPtr input) override;
Expand Down Expand Up @@ -335,6 +335,12 @@ class HashProbe : public Operator {
// Channel of probe keys in 'input_'.
std::vector<column_index_t> keyChannels_;

// True if we have generated dynamic filters from the hash build join keys.
//
// NOTE: 'dynamicFilters_' might have been cleared once they have been pushed
// down to the upstream operators.
tsan_atomic<bool> hasGeneratedDynamicFilters_{false};

// True if the join can become a no-op starting with the next batch of input.
bool canReplaceWithDynamicFilter_{false};

Expand Down
26 changes: 13 additions & 13 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,24 +257,24 @@ class OperatorCtx {
mutable std::unique_ptr<core::ExecCtx> execCtx_;
};

// Query operator
/// Query operator
class Operator : public BaseRuntimeStatWriter {
public:
// Factory class for mapping a user-registered PlanNode into the corresponding
// Operator.
/// Factory class for mapping a user-registered PlanNode into the
/// corresponding Operator.
class PlanNodeTranslator {
public:
virtual ~PlanNodeTranslator() = default;

// Translates plan node to operator. Returns nullptr if the plan node cannot
// be handled by this factory.
/// Translates plan node to operator. Returns nullptr if the plan node
/// cannot be handled by this factory.
virtual std::unique_ptr<Operator>
toOperator(DriverCtx* ctx, int32_t id, const core::PlanNodePtr& node) {
return nullptr;
}

// An overloaded method that should be called when the operator needs an
// ExchangeClient.
/// An overloaded method that should be called when the operator needs an
/// ExchangeClient.
virtual std::unique_ptr<Operator> toOperator(
DriverCtx* ctx,
int32_t id,
Expand All @@ -283,22 +283,22 @@ class Operator : public BaseRuntimeStatWriter {
return nullptr;
}

// Translates plan node to join bridge. Returns nullptr if the plan node
// cannot be handled by this factory.
/// Translates plan node to join bridge. Returns nullptr if the plan node
/// cannot be handled by this factory.
virtual std::unique_ptr<JoinBridge> toJoinBridge(
const core::PlanNodePtr& /* node */) {
return nullptr;
}

// Translates plan node to operator supplier. Returns nullptr if the plan
// node cannot be handled by this factory.
/// Translates plan node to operator supplier. Returns nullptr if the plan
/// node cannot be handled by this factory.
virtual OperatorSupplier toOperatorSupplier(
const core::PlanNodePtr& /* node */) {
return nullptr;
}

// Returns max driver count for the plan node. Returns std::nullopt if the
// plan node cannot be handled by this factory.
/// Returns max driver count for the plan node. Returns std::nullopt if the
/// plan node cannot be handled by this factory.
virtual std::optional<uint32_t> maxDrivers(
const core::PlanNodePtr& /* node */) {
return std::nullopt;
Expand Down
Loading

0 comments on commit 650eb6a

Please sign in to comment.