diff --git a/src/cypher/execution_plan/ops/op_var_len_expand.cpp b/src/cypher/execution_plan/ops/op_var_len_expand.cpp index 309a6b7458..6d207856b0 100644 --- a/src/cypher/execution_plan/ops/op_var_len_expand.cpp +++ b/src/cypher/execution_plan/ops/op_var_len_expand.cpp @@ -40,24 +40,31 @@ DfsState::DfsState(RTContext *ctx, lgraph::VertexId id, int level, cypher::Relat } if (!isMaxHop) { // if reach max hop, do not init eiter - (relp->ItsRef()[level]).Initialize(ctx->txn_->GetTxn().get(), iter_type, id, types); - currentEit = &(relp->ItsRef()[level]); + // level start from 1, mention it + (relp->ItsRef()[level - 1]).Initialize(ctx->txn_->GetTxn().get(), iter_type, id, types); + currentEit = &(relp->ItsRef()[level - 1]); } } -// Predicate Class -bool HeadPredicate::eval(std::vector &eits) { - auto ret = cypher::FieldData::Array(0); - // get first edge's timestamp, check whether it fits the condition - for (auto &eit : eits) { - if (eit.IsValid()) { - ret.array->emplace_back(lgraph::FieldData(eit.GetField("timestamp"))); - } +void DfsState::getTime() { + if (!currentEit->IsValid()) { + return; } - if (ret.array->empty()) { + timestamp = lgraph::FieldData(currentEit->GetField("timestamp")); +} + +// Predicate Class +bool ValidPredicate::eval(std::vector &stack) { + // every eiter in stack must be valid + return stack.back().currentEit->IsValid(); +} + +bool HeadPredicate::eval(std::vector &stack) { + if (stack.size() >= 2) { return true; } - FieldData head = FieldData(ret.array->front()); + // only check the first timestamp + FieldData head = stack.front().timestamp; switch (op) { case lgraph::CompareOp::LBR_GT: return head > operand; @@ -72,23 +79,13 @@ bool HeadPredicate::eval(std::vector &eits) { case lgraph::CompareOp::LBR_NEQ: return head != operand; default: - break; + return false; } - return false; } -bool LastPredicate::eval(std::vector &eits) { - auto ret = cypher::FieldData::Array(0); - // get last edge's timestamp, check whether it fits the condition - for (auto &eit : eits) { - if (eit.IsValid()) { - ret.array->emplace_back(lgraph::FieldData(eit.GetField("timestamp"))); - } - } - if (ret.array->empty()) { - return true; - } - FieldData last = FieldData(ret.array->back()); +bool LastPredicate::eval(std::vector &stack) { + // last timestamp, check every one + FieldData last = stack.back().timestamp; switch (op) { case lgraph::CompareOp::LBR_GT: return last > operand; @@ -103,104 +100,85 @@ bool LastPredicate::eval(std::vector &eits) { case lgraph::CompareOp::LBR_NEQ: return last != operand; default: - break; + return false; } - return false; } -bool IsAscPredicate::eval(std::vector &eits) { - auto ret = cypher::FieldData::Array(0); - for (auto &eit : eits) { - if (eit.IsValid()) { - ret.array->emplace_back(lgraph::FieldData(eit.GetField("timestamp"))); - } - } - if (ret.array->empty()) { +bool IsAscPredicate::eval(std::vector &stack) { + if (stack.size() == 1) { return true; } - for (size_t i = 1; i < ret.array->size(); i++) { - if ((*ret.array)[i - 1] >= (*ret.array)[i]) { - return false; - } + auto it = stack.end(); + if ((it - 1)->timestamp > (it - 2)->timestamp) { + // check the last two timestamp, now is asc + return true; + } else { + return false; } - return true; } -bool IsDescPredicate::eval(std::vector &eits) { - auto ret = cypher::FieldData::Array(0); - for (auto &eit : eits) { - if (eit.IsValid()) { - ret.array->emplace_back(lgraph::FieldData(eit.GetField("timestamp"))); - } - } - if (ret.array->empty()) { +bool IsDescPredicate::eval(std::vector &stack) { + if (stack.size() == 1) { return true; } - for (size_t i = 1; i < ret.array->size(); i++) { - if ((*ret.array)[i - 1] <= (*ret.array)[i]) { - return false; - } + auto it = stack.end(); + if ((it - 1)->timestamp < (it - 2)->timestamp) { + // is desc + return true; + } else { + return false; } - return true; } -bool MaxInListPredicate::eval(std::vector &eits) { - auto ret = cypher::FieldData::Array(0); - for (auto &eit : eits) { - if (eit.IsValid()) { - ret.array->emplace_back(lgraph::FieldData(eit.GetField("timestamp"))); - } - } - if (ret.array->empty()) { - return true; - } - // find max in path - size_t pos = 0; - for (size_t i = 0; i < ret.array->size(); i++) { - if ((*ret.array)[i] > (*ret.array)[pos]) { - pos = i; +bool MaxInListPredicate::eval(std::vector &stack) { + FieldData maxInList; + if (stack.size() == 1) { + stack.back().maxTimestamp = stack.back().timestamp; + maxInList = stack.back().maxTimestamp; + } else { + auto it = stack.end(); + if ((it - 1)->timestamp <= (it - 2)->maxTimestamp) { + // if the last timestamp is no larger than the previous maxTimestamp + (it - 1)->maxTimestamp = (it - 2)->maxTimestamp; + return true; + } else { + (it - 1)->maxTimestamp = (it - 1)->timestamp; + maxInList = (it - 1)->maxTimestamp; } } - - FieldData maxInList = cypher::FieldData((*ret.array)[pos]); switch (op) { - case lgraph::CompareOp::LBR_GT: - return maxInList > operand; - case lgraph::CompareOp::LBR_GE: - return maxInList >= operand; case lgraph::CompareOp::LBR_LT: return maxInList < operand; case lgraph::CompareOp::LBR_LE: return maxInList <= operand; + case lgraph::CompareOp::LBR_GT: + return maxInList > operand; + case lgraph::CompareOp::LBR_GE: + return maxInList >= operand; case lgraph::CompareOp::LBR_EQ: return maxInList == operand; case lgraph::CompareOp::LBR_NEQ: return maxInList != operand; default: - break; + return false; } - return false; } -bool MinInListPredicate::eval(std::vector &eits) { - auto ret = cypher::FieldData::Array(0); - for (auto &eit : eits) { - if (eit.IsValid()) { - ret.array->emplace_back(lgraph::FieldData(eit.GetField("timestamp"))); - } - } - if (ret.array->empty()) { - return true; - } - // find min in path - size_t pos = 0; - for (size_t i = 0; i < ret.array->size(); i++) { - if ((*ret.array)[i] < (*ret.array)[pos]) { - pos = i; +bool MinInListPredicate::eval(std::vector &stack) { + FieldData minInList; + if (stack.size() == 1) { + stack.back().minTimestamp = stack.back().timestamp; + minInList = stack.back().minTimestamp; + } else { + auto it = stack.end(); + if ((it - 1)->timestamp >= (it - 2)->minTimestamp) { + (it - 1)->minTimestamp = (it - 2)->minTimestamp; + return true; + } else { + (it - 1)->minTimestamp = (it - 1)->timestamp; + minInList = (it - 1)->minTimestamp; } } - - FieldData minInList = cypher::FieldData((*ret.array)[pos]); switch (op) { case lgraph::CompareOp::LBR_GT: return minInList > operand; @@ -215,69 +193,99 @@ bool MinInListPredicate::eval(std::vector &eits) { case lgraph::CompareOp::LBR_NEQ: return minInList != operand; default: - break; + return false; } - return false; } // VarLenExpand Class -bool VarLenExpand::PerNodeLimit(RTContext *ctx, size_t count) { - return !ctx->per_node_limit_.has_value() || count <= ctx->per_node_limit_.value(); -} - bool VarLenExpand::NextWithFilter(RTContext *ctx) { while (!stack.empty()) { - if (needPop) { - // it means that, in the last hoop, the path needs pop - relp_->path_.PopBack(); - needPop = false; - } auto ¤tState = stack.back(); - auto currentNodeId = currentState.currentNodeId; + // auto currentNodeId = currentState.currentNodeId; auto ¤tEit = currentState.currentEit; - auto currentLevel = currentState.level; + auto currentLevel = currentState.level; // actually, is the path length in stack - // the part of count, needs check + // the part of count auto ¤tCount = currentState.count; - if (!PerNodeLimit(ctx, currentCount)) { - stack.pop_back(); - if (relp_->path_.Length() != 0) { - needPop = true; - } - continue; - } - // if currentNodeId's needNext is true, currentEit.next(), then set needNext to false + // if needNext is true, the back currentEit next, then set needNext to false auto &needNext = currentState.needNext; if (needNext) { - currentEit->Next(); - currentCount++; - needNext = false; - } - - if (currentLevel == max_hop_) { - // When reach here, the top eiter must be invalid, and the path meets the condition. - // check path unique + // deal with the top eit, only Next in this block, don't pop stack + // CYPHER_THROW_ASSERT(currentEit->IsValid()); + // CYPHER_THROW_ASSERT(currentEit->GetUid() == + // relp_->path_.GetNthEdgeWithTid(relp_->path_.Length() - 1)); + // check unique, delete previous edge if (ctx->path_unique_ && relp_->path_.Length() != 0) { CYPHER_THROW_ASSERT(pattern_graph_->VisitedEdges().Erase( relp_->path_.GetNthEdgeWithTid(relp_->path_.Length() - 1))); } - stack.pop_back(); + relp_->path_.PopBack(); + + currentEit->Next(); + needNext = false; + currentState.getTime(); + currentCount++; + + bool isFinding = true; + while (isFinding) { + bool continueFind = false; + // check predicates here, path derived from eiters in stack + for (auto &p : predicates) { + if (!p->eval(stack)) { + // not fit predicate + continueFind = true; + if (stack.back().currentEit->IsValid()) { + // the back eiter is still valid + stack.back().currentEit->Next(); + stack.back().getTime(); + stack.back().count++; + } else { + // now the back eiter of stack is invalid + isFinding = false; + } + break; + } + } + if (continueFind) { + continue; + } + // when reach here, the eit, path's predicate are ok + // CYPHER_THROW_ASSERT(currentEit == stack.back().currentEit); + // CYPHER_THROW_ASSERT(currentEit->IsValid()); + isFinding = false; + + // add edge's euid to path + relp_->path_.Append(currentEit->GetUid()); + + if (ctx->path_unique_ && pattern_graph_->VisitedEdges().Contains(*currentEit)) { + // if this edge has been added, find next edge from the same eiter + isFinding = true; + // set next + stack.back().currentEit->Next(); + stack.back().getTime(); + stack.back().count++; + // pop path + relp_->path_.PopBack(); + } else if (ctx->path_unique_) { + // this is ok, add edge to path unique + pattern_graph_->VisitedEdges().Add(*currentEit); + } + } + } - neighbor_->PushVid(currentNodeId); + if ((int)relp_->path_.Length() == currentLevel && currentLevel == max_hop_) { + // CYPHER_THROW_ASSERT(currentEit->IsValid()); + // the top eit is valid + neighbor_->PushVid(currentEit->GetNbr(expand_direction_)); + needNext = true; // check label if (!neighbor_->Label().empty() && neighbor_->IsValidAfterMaterialize(ctx) && neighbor_->ItRef()->GetLabel() != neighbor_->Label()) { - if (relp_->path_.Length() != 0) { - relp_->path_.PopBack(); - } continue; } - if (relp_->path_.Length() != 0) { - needPop = true; - } return true; } @@ -285,59 +293,74 @@ bool VarLenExpand::NextWithFilter(RTContext *ctx) { // eit is valid, set currentNodeId's eiter's needNext to true needNext = true; - // check predicates here, path derived from eiters in stack - bool passPredicate = true; - for (auto &p : predicates) { - if (!p->eval(relp_->ItsRef())) { - passPredicate = false; - break; + auto neighbor = currentEit->GetNbr(expand_direction_); + stack.emplace_back(ctx, neighbor, currentLevel + 1, relp_, expand_direction_, false, + currentLevel + 1 > max_hop_); + stack.back().getTime(); + + bool isFinding = true; + while (isFinding) { + bool continueFind = false; + // check predicates here, path derived from eiters + for (auto &p : predicates) { + if (!p->eval(stack)) { + // not fit predicate + continueFind = true; + if (stack.back().currentEit->IsValid()) { + // the back eiter is still valid + stack.back().currentEit->Next(); + stack.back().getTime(); + stack.back().count++; + } else { + // now the back eiter of stack is invalid + isFinding = false; + } + break; + } } - } - - if (passPredicate) { - // check path unique - if (ctx->path_unique_ && pattern_graph_->VisitedEdges().Contains(*currentEit)) { - currentEit->Next(); - currentCount++; + if (continueFind) { continue; - } else if (ctx->path_unique_) { - pattern_graph_->VisitedEdges().Add(*currentEit); } + // when reach here, the eit, path's predicate are ok + isFinding = false; + // add edge's euid to path - relp_->path_.Append(currentEit->GetUid()); - auto neighbor = currentEit->GetNbr(expand_direction_); - stack.emplace_back(ctx, neighbor, currentLevel + 1, relp_, expand_direction_, false, - currentLevel + 1 == max_hop_); - } - } else { - // check unique - if (ctx->path_unique_ && relp_->path_.Length() != 0) { - CYPHER_THROW_ASSERT(pattern_graph_->VisitedEdges().Erase( - relp_->path_.GetNthEdgeWithTid(relp_->path_.Length() - 1))); + relp_->path_.Append(stack.back().currentEit->GetUid()); + + if (ctx->path_unique_ && + pattern_graph_->VisitedEdges().Contains(*stack.back().currentEit)) { + // if this edge has been added, find next edge + isFinding = true; + // set next + stack.back().currentEit->Next(); + stack.back().getTime(); + stack.back().count++; + // the edge occurs before, pop it + relp_->path_.PopBack(); + } else if (ctx->path_unique_) { + // this is ok, add edge + pattern_graph_->VisitedEdges().Add(*stack.back().currentEit); + } } + } else { + // now the top eit is invaild stack.pop_back(); - if (currentLevel >= min_hop_) { - neighbor_->PushVid(currentNodeId); + auto pathLen = relp_->path_.Length(); + if (pathLen == stack.size() && pathLen >= (size_t)min_hop_) { + // CYPHER_THROW_ASSERT(stack.back().currentEit->IsValid()); + neighbor_->PushVid(stack.back().currentEit->GetNbr(expand_direction_)); + + stack.back().needNext = true; // check label if (!neighbor_->Label().empty() && neighbor_->IsValidAfterMaterialize(ctx) && neighbor_->ItRef()->GetLabel() != neighbor_->Label()) { - if (relp_->path_.Length() != 0) { - relp_->path_.PopBack(); - } continue; } - if (relp_->path_.Length() != 0) { - needPop = true; - } - return true; } - if (relp_->path_.Length() != 0) { - relp_->path_.PopBack(); - } } } return false; @@ -436,7 +459,9 @@ OpBase::OpResult VarLenExpand::Initialize(RTContext *ctx) { record->values[relp_rec_idx_].type = Entry::VAR_LEN_RELP; record->values[relp_rec_idx_].relationship = relp_; relp_->ItsRef().resize(max_hop_); - needPop = false; + + auto p = std::make_unique(); + addPredicate(std::move(p)); return OP_OK; } @@ -456,14 +481,54 @@ OpBase::OpResult VarLenExpand::RealConsume(RTContext *ctx) { } CYPHER_THROW_ASSERT(stack.empty()); // push the first node and the related eiter into the stack - stack.emplace_back(ctx, startVid, 0, relp_, expand_direction_, false, !max_hop_); - - if (!PerNodeLimit(ctx, stack.front().count)) { - stack.pop_back(); + // the first node and the related edge is chosen, path length is 1 + stack.emplace_back(ctx, startVid, 1, relp_, expand_direction_, false, 1 > max_hop_); + stack.back().getTime(); + + // check the first node and edge + bool nextNode = false; + bool isFinding = true; + while (isFinding) { + // check predicates here, path derived from eiters in stack + bool continueCheck = false; + for (auto &p : predicates) { + if (!p->eval(stack)) { + // when the edge in the stack not fit + if (stack.back().currentEit->IsValid()) { + // still vaild, find next edge + stack.back().currentEit->Next(); + stack.back().getTime(); + stack.back().count++; + continueCheck = true; + } else { + // the top eiter is not valid, should find next node + stack.pop_back(); + nextNode = true; + } + break; + } + } + if (continueCheck) { + // find next edge of the same node + // continueCheck = false; + continue; + } else { + // if ok, it means this path is ok, find next hop + isFinding = false; + } + } + if (nextNode) { + // find next node continue; } + // when reach here, the first node and eiter are ok relp_->path_.SetStart(startVid); + relp_->path_.Append(stack.back().currentEit->GetUid()); + if (ctx->path_unique_) { + // add the first edge + pattern_graph_->VisitedEdges().Add(*stack.back().currentEit); + } } return OP_OK; } diff --git a/src/cypher/execution_plan/ops/op_var_len_expand.h b/src/cypher/execution_plan/ops/op_var_len_expand.h index 1641879b7a..ead3451148 100644 --- a/src/cypher/execution_plan/ops/op_var_len_expand.h +++ b/src/cypher/execution_plan/ops/op_var_len_expand.h @@ -35,13 +35,24 @@ struct DfsState { // whether the eiter need Next() bool needNext; + FieldData timestamp; + FieldData maxTimestamp; + FieldData minTimestamp; + DfsState(RTContext *ctx, lgraph::VertexId id, int level, cypher::Relationship *relp, ExpandTowards expand_direction, bool needNext, bool isMaxHop); + + void getTime(); }; class Predicate { public: - virtual bool eval(std::vector &eits) = 0; + virtual bool eval(std::vector &stack) = 0; +}; + +class ValidPredicate : public Predicate { + public: + bool eval(std::vector &stack) override; }; class HeadPredicate : public Predicate { @@ -53,7 +64,7 @@ class HeadPredicate : public Predicate { public: HeadPredicate(lgraph::CompareOp op, FieldData operand) : op(op), operand(operand) {} - bool eval(std::vector &eits) override; + bool eval(std::vector &stack) override; }; class LastPredicate : public Predicate { @@ -65,19 +76,19 @@ class LastPredicate : public Predicate { public: LastPredicate(lgraph::CompareOp op, FieldData operand) : op(op), operand(operand) {} - bool eval(std::vector &eits) override; + bool eval(std::vector &stack) override; }; class IsAscPredicate : public Predicate { public: IsAscPredicate() {} - bool eval(std::vector &eits) override; + bool eval(std::vector &stack) override; }; class IsDescPredicate : public Predicate { public: IsDescPredicate() {} - bool eval(std::vector &eits) override; + bool eval(std::vector &stack) override; }; class MaxInListPredicate : public Predicate { @@ -87,7 +98,7 @@ class MaxInListPredicate : public Predicate { public: MaxInListPredicate(lgraph::CompareOp op, FieldData operand) : op(op), operand(operand) {} - bool eval(std::vector &eits) override; + bool eval(std::vector &stack) override; }; class MinInListPredicate : public Predicate { @@ -97,7 +108,7 @@ class MinInListPredicate : public Predicate { public: MinInListPredicate(lgraph::CompareOp op, FieldData operand) : op(op), operand(operand) {} - bool eval(std::vector &eits) override; + bool eval(std::vector &stack) override; }; /* Variable Length Expand */ @@ -116,9 +127,6 @@ class VarLenExpand : public OpBase { // stack for DFS std::vector stack; - // this flag decides whether need to pop relp_->Path - bool needPop; - public: cypher::PatternGraph *pattern_graph_ = nullptr; cypher::Node *start_ = nullptr; // start node to expand