Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Nov 15, 2024
1 parent 42a964e commit b14b0c6
Show file tree
Hide file tree
Showing 61 changed files with 891 additions and 4,870 deletions.
3 changes: 0 additions & 3 deletions opteryx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,6 @@ def get(key: str, default: Optional[typing.Any] = None) -> Optional[typing.Any]:
DATA_CATALOG_CONFIGURATION: Optional[str] = get("DATA_CATALOG_CONFIGURATION")
"""Data Catalog configuration, different catalogs have different config formats."""

EXPERIMENTAL_EXECUTION_ENGINE: bool = bool(get("EXPERIMENTAL_EXECUTION_ENGINE", False))
"""Use the experimental/incomplete generation 2 execution engine."""

# GCP project ID - for Google Cloud Data
GCP_PROJECT_ID: str = get("GCP_PROJECT_ID")
# don't try to raise the priority of the server process
Expand Down
163 changes: 10 additions & 153 deletions opteryx/models/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
"""

import gc
from typing import Any
from typing import Generator
from typing import Optional
from typing import Tuple
from typing import Union

import pyarrow

Expand All @@ -43,114 +41,7 @@ class PhysicalPlan(Graph):
complex code which is the planner from the tree that describes the plan.
"""

def execute(self) -> Generator[Tuple[Union[pyarrow.Table, Any], ResultType], None, None]:
if config.EXPERIMENTAL_EXECUTION_ENGINE:
return self.push_executor()
return self.legacy_executor()

def legacy_executor(
self,
) -> Generator[Tuple[Union[pyarrow.Table, Any], ResultType], None, None]:
"""
Implements a 'pull' model execution engine, pulling records starting from
the last stage (head) of the query plan, and working backwards towards the first stage.
Yields:
tuple: The first element is the result (either tabular data or a
NonTabularResult object). The second element is a ResultType enum,
indicating the type of the result.
"""
from opteryx.models import NonTabularResult
from opteryx.operators import ExplainNode

def map_operators_to_producers(nodes: list) -> None:
"""
Walks through the query plan, linking each operator node with its data producers.
Parameters:
nodes: list
List of operator nodes in the query plan.
"""

for node in nodes:
producers = self.ingoing_edges(node)
operator = self[node]

if len(producers) == 1:
# If there is only one producer, set it directly
operator.set_producers([self[producers[0][0]]])
elif len(producers) == 2 and hasattr(operator, "_left_relation"):
left_producer = None
right_producer = None

left_relation = operator._left_relation
right_relation = operator._right_relation
for source, target, relation in producers:
for s, t, r in self.breadth_first_search(source, reverse=True) + [
(source, target, relation)
]:
if set(left_relation).intersection(
{
self[s].parameters.get("alias"),
self[s].parameters.get("relation"),
}
):
left_producer = self[source]
elif set(right_relation).intersection(
{
self[s].parameters.get("alias"),
self[s].parameters.get("relation"),
}
):
right_producer = self[source]

if left_producer and right_producer:
operator.set_producers([left_producer, right_producer])
else:
# Default to setting producers as in the current method if left and right cannot be determined
operator.set_producers([self[src_node[0]] for src_node in producers])
else:
# Handle cases with more than two producers if applicable
operator.set_producers([self[src_node[0]] for src_node in producers])

# Recursively process the producers
map_operators_to_producers([src_node[0] for src_node in producers])

# Validate query plan to ensure it's acyclic
if not self.is_acyclic():
raise InvalidInternalStateError("Query plan is cyclic, cannot execute.")

# Retrieve the tail of the query plan, which should ideally be a single head node
head_nodes = list(set(self.get_exit_points()))

if len(head_nodes) != 1:
raise InvalidInternalStateError(
f"Query plan has {len(head_nodes)} heads, expected exactly 1."
)

head_node = head_nodes[0]

# Special case handling for 'Explain' queries
if isinstance(self[head_node], ExplainNode):
yield self.explain(), ResultType.TABULAR
return

# Link operators with their producers
map_operators_to_producers([head_node])

# Execute the head node's operation
operator = self[head_node]
gc.disable()
results = operator.execute()
gc.enable()

# If the results are non-tabular, handle them accordingly
if isinstance(results, NonTabularResult):
yield results, ResultType.NON_TABULAR
else:
yield results, ResultType.TABULAR

def explain(self) -> Generator[pyarrow.Table, None, None]:
def explainv2(self, analyze: bool) -> Generator[pyarrow.Table, None, None]:
from opteryx import operators

def _inner_explain(node, depth):
Expand All @@ -163,35 +54,6 @@ def _inner_explain(node, depth):
yield from _inner_explain(operator_name[0], depth)
continue
elif isinstance(operator, operators.BasePlanNode):
yield {
"operator": operator.name,
"config": operator.config,
"depth": depth,
}
yield from _inner_explain(operator_name[0], depth + 1)

head = list(dict.fromkeys(self.get_exit_points()))
if len(head) != 1: # pragma: no cover
raise InvalidInternalStateError(f"Problem with the plan - it has {len(head)} heads.")
plan = list(_inner_explain(head[0], 1))

table = pyarrow.Table.from_pylist(plan)

yield table

def explainv2(self, analyze: bool) -> Generator[pyarrow.Table, None, None]:
from opteryx import operatorsv2

def _inner_explain(node, depth):
incoming_operators = self.ingoing_edges(node)
for operator_name in incoming_operators:
operator = self[operator_name[0]]
if isinstance(
operator, (operatorsv2.ExitNode, operatorsv2.ExplainNode)
): # Skip ExitNode
yield from _inner_explain(operator_name[0], depth)
continue
elif isinstance(operator, operatorsv2.BasePlanNode):
record = {
"tree": depth,
"operator": operator.name,
Expand All @@ -214,7 +76,7 @@ def _inner_explain(node, depth):
temp = None
head_node = self.get_exit_points()[0]
query_head, _, _ = self.ingoing_edges(head_node)[0]
results = self.push_executor(query_head)
results = self.execute(query_head)
if results is not None:
results_generator, _ = next(results, ([], None))
for temp in results_generator:
Expand Down Expand Up @@ -264,15 +126,13 @@ def depth_first_search_flat(

return traversal_list

def push_executor(
self, head_node=None
) -> Tuple[Generator[pyarrow.Table, Any, Any], ResultType]:
from opteryx.operatorsv2 import ExplainNode
from opteryx.operatorsv2 import JoinNode
from opteryx.operatorsv2 import ReaderNode
from opteryx.operatorsv2 import SetVariableNode
from opteryx.operatorsv2 import ShowCreateNode
from opteryx.operatorsv2 import ShowValueNode
def execute(self, head_node=None) -> Tuple[Generator[pyarrow.Table, Any, Any], ResultType]:
from opteryx.operators import ExplainNode
from opteryx.operators import JoinNode
from opteryx.operators import ReaderNode
from opteryx.operators import SetVariableNode
from opteryx.operators import ShowCreateNode
from opteryx.operators import ShowValueNode

# Validate query plan to ensure it's acyclic
if not self.is_acyclic():
Expand Down Expand Up @@ -328,7 +188,7 @@ def inner_execute(plan):
yield inner_execute(self), ResultType.TABULAR

def process_node(self, nid, morsel):
from opteryx.operatorsv2 import ReaderNode
from opteryx.operators import ReaderNode

node = self[nid]

Expand Down Expand Up @@ -363,6 +223,3 @@ def sensors(self):

def __del__(self):
pass


# print(self.sensors())
8 changes: 5 additions & 3 deletions opteryx/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# isort: skip

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -10,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.


from .base_plan_node import BasePlanDataObject # isort: skip
from .base_plan_node import BasePlanNode # isort: skip
from .base_plan_node import OperatorType # isort: skip
from .base_plan_node import BasePlanNode, JoinNode # isort: skip

from .aggregate_and_group_node import AggregateAndGroupNode # Group is always followed by aggregate
from .aggregate_node import AGGREGATORS
Expand All @@ -31,8 +33,8 @@
# from .information_schema_node import InformationSchemaNode # information_schema
from .inner_join_node import InnerJoinNode
from .inner_join_node_single import InnerJoinSingleNode
from .join_node import JoinNode
from .limit_node import LimitNode # select the first N records
from .pyarrow_join_node import PyArrowJoinNode

# from .metadata_writer_node import MetadataWriterNode
# from .morsel_defragment_node import MorselDefragmentNode # consolidate small morsels
Expand Down
Loading

0 comments on commit b14b0c6

Please sign in to comment.