Skip to content

Commit

Permalink
Merge pull request #2087 from mabel-dev/#2061/3
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Nov 8, 2024
2 parents db499ab + db22cc5 commit c88addf
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 10 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 846
__build__ = 847

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
3 changes: 2 additions & 1 deletion opteryx/models/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ def push_executor(self) -> Tuple[Generator[pyarrow.Table, Any, Any], ResultType]
from opteryx.operatorsv2 import JoinNode
from opteryx.operatorsv2 import ReaderNode
from opteryx.operatorsv2 import SetVariableNode
from opteryx.operatorsv2 import ShowValueNode, ShowCreateNode
from opteryx.operatorsv2 import ShowCreateNode
from opteryx.operatorsv2 import ShowValueNode

# Validate query plan to ensure it's acyclic
if not self.is_acyclic():
Expand Down
6 changes: 6 additions & 0 deletions opteryx/operatorsv2/cross_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ def __init__(self, properties: QueryProperties, **parameters):
self.right_relation = None
self.hash_set = HashSet()

self.continue_executing = True

@classmethod
def from_json(cls, json_obj: str) -> "BasePlanNode": # pragma: no cover
raise NotImplementedError()
Expand All @@ -322,8 +324,12 @@ def config(self): # pragma: no cover
return f"CROSS JOIN {filters}"

def execute(self, morsel: pyarrow.Table) -> pyarrow.Table:
if not self.continue_executing:
return None

if self._unnest_column is not None:
if morsel == EOS:
self.continue_executing = False
return EOS
if isinstance(self._unnest_column.value, tuple):
return list(
Expand Down
3 changes: 1 addition & 2 deletions opteryx/operatorsv2/show_create_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
This is a SQL Query Execution Plan Node.
"""

from typing import Generator

import pyarrow

from opteryx.exceptions import DatasetNotFoundError
from opteryx.exceptions import UnsupportedSyntaxError
from opteryx.models import QueryProperties

from . import BasePlanNode


Expand Down
7 changes: 1 addition & 6 deletions opteryx/planner/physical_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,7 @@ def create_physical_plan(logical_plan, query_properties) -> PhysicalPlan:
elif node_type == LogicalPlanStepType.Filter:
node = operators.FilterNode(query_properties, filter=node_config["condition"], **{k:v for k,v in node_config.items() if k in ("all_relations",)})
elif node_type == LogicalPlanStepType.FunctionDataset:
if False and node_config.get("function") == "UNNEST":
node = operators.NoOpNode(query_properties, **node_config)
elif node_config.get("function") != "UNNEST" or len(node_config.get("args", [])) > 0 and not isinstance(node_config["args"][0], LogicalColumn):
node = operators.FunctionDatasetNode(query_properties, **node_config)
else:
node = operators.NoOpNode(query_properties, **node_config)
node = operators.FunctionDatasetNode(query_properties, **node_config)
elif node_type == LogicalPlanStepType.HeapSort:
node = operators.HeapSortNode(query_properties, **node_config)
elif node_type == LogicalPlanStepType.Join:
Expand Down

0 comments on commit c88addf

Please sign in to comment.