Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Nov 8, 2024
1 parent b9f3d47 commit 2dcd589
Show file tree
Hide file tree
Showing 27 changed files with 168 additions and 160 deletions.
20 changes: 5 additions & 15 deletions opteryx/models/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def push_executor(self) -> Tuple[Generator[pyarrow.Table, Any, Any], ResultType]
from opteryx.operatorsv2 import JoinNode
from opteryx.operatorsv2 import ReaderNode
from opteryx.operatorsv2 import SetVariableNode
from opteryx.operatorsv2 import ShowValueNode
from opteryx.operatorsv2 import ShowValueNode, ShowCreateNode

# Validate query plan to ensure it's acyclic
if not self.is_acyclic():
Expand All @@ -284,21 +284,11 @@ def push_executor(self) -> Tuple[Generator[pyarrow.Table, Any, Any], ResultType]
joins = [(nid, node) for nid, node in self.nodes(True) if isinstance(node, JoinNode)]
for nid, join in joins:
for s, t, r in self.breadth_first_search(nid, reverse=True):
source_parameters = self[s].parameters
if set(join._left_relation).intersection(
{
source_parameters.get("alias"),
source_parameters.get("relation"),
}
):
source_relations = self[s].parameters.get("all_relations", set())
if set(join._left_relation).intersection(source_relations):
self.remove_edge(s, t, r)
self.add_edge(s, t, "left")
elif set(join._right_relation).intersection(
{
source_parameters.get("alias"),
source_parameters.get("relation"),
}
):
elif set(join._right_relation).intersection(source_relations):
self.remove_edge(s, t, r)
self.add_edge(s, t, "right")

Expand All @@ -310,7 +300,7 @@ def push_executor(self) -> Tuple[Generator[pyarrow.Table, Any, Any], ResultType]
elif isinstance(head_node, SetVariableNode):
yield head_node(None), ResultType.NON_TABULAR

elif isinstance(head_node, ShowValueNode):
elif isinstance(head_node, (ShowValueNode, ShowCreateNode)):
yield head_node(None), ResultType.TABULAR

def inner_execute(plan):
Expand Down
10 changes: 5 additions & 5 deletions opteryx/operatorsv2/aggregate_and_group_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ class AggregateAndGroupDataObject(BasePlanDataObject):


class AggregateAndGroupNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **config):
super().__init__(properties=properties)
self.groups = list(config["groups"])
self.aggregates = list(config["aggregates"])
projection = list(config["projection"])
def __init__(self, properties: QueryProperties, **parameters):
BasePlanNode.__init__(self, properties=properties, **parameters)
self.groups = list(parameters["groups"])
self.aggregates = list(parameters["aggregates"])
projection = list(parameters["projection"])

# we're going to preload some of the evaluation

Expand Down
6 changes: 3 additions & 3 deletions opteryx/operatorsv2/aggregate_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,10 @@ class AggregateDataObject(BasePlanDataObject):


class AggregateNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **config):
super().__init__(properties=properties)
def __init__(self, properties: QueryProperties, **parameters):
BasePlanNode.__init__(self, properties=properties, **parameters)

self.aggregates = config.get("aggregates", [])
self.aggregates = parameters.get("aggregates", [])

# get all the columns anywhere in the aggregates
all_identifiers = [
Expand Down
7 changes: 4 additions & 3 deletions opteryx/operatorsv2/async_read_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from opteryx import EOS
from opteryx import config
from opteryx.exceptions import DataError
from opteryx.models import QueryProperties
from opteryx.operators.base_plan_node import BasePlanDataObject
from opteryx.shared import AsyncMemoryPool
from opteryx.shared import MemoryPool
Expand Down Expand Up @@ -73,12 +74,12 @@ class AsyncReaderDataObject(BasePlanDataObject):


class AsyncReaderNode(ReaderNode):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def __init__(self, properties: QueryProperties, **parameters):
ReaderNode.__init__(self, properties=properties, **parameters)
self.pool = MemoryPool(MAX_READ_BUFFER_CAPACITY, f"ReadBuffer <{self.parameters['alias']}>")

self.do = AsyncReaderDataObject()
self.predicates = kwargs.get("predicates")
self.predicates = parameters.get("predicates")

@classmethod
def from_dict(cls, dic: dict) -> "AsyncReaderNode": # pragma: no cover
Expand Down
52 changes: 25 additions & 27 deletions opteryx/operatorsv2/cross_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,29 +170,27 @@ def _cross_join_unnest_column(


def _cross_join_unnest_literal(
morsels: pyarrow.Table, source: Tuple, target_column: FlatColumn
morsel: pyarrow.Table, source: Tuple, target_column: FlatColumn
) -> Generator[pyarrow.Table, None, None]:
joined_list_size = len(source)

# Loop through each morsel from the morsels execution
for left_morsel in morsels.execute():
# Break the morsel into batches to avoid memory issues
for left_block in left_morsel.to_batches(max_chunksize=INTERNAL_BATCH_SIZE):
left_block = pyarrow.Table.from_batches([left_block], schema=left_morsel.schema)
block_size = left_block.num_rows
# Break the morsel into batches to avoid memory issues
for left_block in morsel.to_batches(max_chunksize=INTERNAL_BATCH_SIZE):
left_block = pyarrow.Table.from_batches([left_block], schema=morsel.schema)
block_size = left_block.num_rows

# Repeat each row in the table n times
repeated_indices = numpy.repeat(numpy.arange(block_size), joined_list_size)
appended_table = left_block.take(repeated_indices)
# Repeat each row in the table n times
repeated_indices = numpy.repeat(numpy.arange(block_size), joined_list_size)
appended_table = left_block.take(repeated_indices)

# Tile the array to match the new number of rows
tiled_array = numpy.tile(source, block_size)
# Tile the array to match the new number of rows
tiled_array = numpy.tile(source, block_size)

# Convert tiled_array to PyArrow array and append it to the table
array_column = pyarrow.array(tiled_array)
appended_table = appended_table.append_column(target_column.identity, array_column)
# Convert tiled_array to PyArrow array and append it to the table
array_column = pyarrow.array(tiled_array)
appended_table = appended_table.append_column(target_column.identity, array_column)

yield appended_table
yield appended_table


def _cartesian_product(*arrays):
Expand Down Expand Up @@ -273,19 +271,19 @@ class CrossJoinNode(JoinNode):
Implements a SQL CROSS JOIN
"""

def __init__(self, properties: QueryProperties, **config):
super().__init__(properties=properties)
def __init__(self, properties: QueryProperties, **parameters):
JoinNode.__init__(self, properties=properties, **parameters)

self.source = config.get("column")
self.source = parameters.get("column")

self._left_relation = config.get("left_relation_names")
self._right_relation = config.get("right_relation_names")
self._left_relation = parameters.get("left_relation_names")
self._right_relation = parameters.get("right_relation_names")

# do we have unnest details?
self._unnest_column = config.get("unnest_column")
self._unnest_target = config.get("unnest_target")
self._filters = config.get("filters")
self._distinct = config.get("distinct", False)
self._unnest_column = parameters.get("unnest_column")
self._unnest_target = parameters.get("unnest_target")
self._filters = parameters.get("filters")
self._distinct = parameters.get("distinct", False)

# handle variation in how the unnested column is represented
if self._unnest_column:
Expand All @@ -297,7 +295,7 @@ def __init__(self, properties: QueryProperties, **config):
):
self._unnest_column.value = tuple([self._unnest_column.value])

self._single_column = config.get("pre_update_columns", set()) == {
self._single_column = parameters.get("pre_update_columns", set()) == {
self._unnest_target.identity,
}

Expand Down Expand Up @@ -330,7 +328,7 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table:
if isinstance(self._unnest_column.value, tuple):
return list(
_cross_join_unnest_literal(
morsels=morsel,
morsel=morsel,
source=self._unnest_column.value,
target_column=self._unnest_target,
)
Expand Down
6 changes: 3 additions & 3 deletions opteryx/operatorsv2/distinct_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@


class DistinctNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **config):
def __init__(self, properties: QueryProperties, **parameters):
from opteryx.compiled.structures import HashSet

super().__init__(properties=properties)
self._distinct_on = config.get("on")
BasePlanNode.__init__(self, properties=properties, **parameters)
self._distinct_on = parameters.get("on")
if self._distinct_on:
self._distinct_on = [col.schema_column.identity for col in self._distinct_on]
self.hash_set = HashSet()
Expand Down
6 changes: 3 additions & 3 deletions opteryx/operatorsv2/exit_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ class ExitDataObject(BasePlanDataObject):


class ExitNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **config):
super().__init__(properties=properties)
self.columns = config.get("columns", [])
def __init__(self, properties: QueryProperties, **parameters):
BasePlanNode.__init__(self, properties=properties, **parameters)
self.columns = parameters.get("columns", [])

self.do = ExitDataObject(columns=self.columns)

Expand Down
2 changes: 1 addition & 1 deletion opteryx/operatorsv2/explain_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

class ExplainNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **parameters):
super().__init__(properties=properties)
BasePlanNode.__init__(self, properties=properties, **parameters)
self._query_plan = parameters.get("query_plan")
self.analyze = parameters.get("analyze", False)

Expand Down
6 changes: 3 additions & 3 deletions opteryx/operatorsv2/filter_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@


class FilterNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **config):
super().__init__(properties=properties)
self.filter = config.get("filter")
def __init__(self, properties: QueryProperties, **parameters):
BasePlanNode.__init__(self, properties=properties, **parameters)
self.filter = parameters.get("filter")

self.function_evaluations = get_all_nodes_of_type(
self.filter,
Expand Down
14 changes: 7 additions & 7 deletions opteryx/operatorsv2/function_dataset_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,17 @@ def _http(**kwargs):


class FunctionDatasetNode(ReaderNode):
def __init__(self, properties: QueryProperties, **config):
def __init__(self, properties: QueryProperties, **parameters):
"""
The Blob Reader Node is responsible for reading the relevant blobs
and returning a Table/Relation.
"""
super().__init__(properties=properties)
self.alias = config.get("alias")
self.function = config["function"]
self.parameters = config
self.columns = config.get("columns", [])
self.args = config.get("args", [])
ReaderNode.__init__(self, properties=properties, **parameters)
self.alias = parameters.get("alias")
self.function = parameters["function"]
self.parameters = parameters
self.columns = parameters.get("columns", [])
self.args = parameters.get("args", [])

@classmethod
def from_json(cls, json_obj: str) -> "BasePlanNode": # pragma: no cover
Expand Down
8 changes: 4 additions & 4 deletions opteryx/operatorsv2/heap_sort_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ class HeapSortDataObject(BasePlanDataObject):


class HeapSortNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **config):
super().__init__(properties=properties)
self.order_by = config.get("order_by", [])
self.limit: int = config.get("limit", -1)
def __init__(self, properties: QueryProperties, **parameters):
BasePlanNode.__init__(self, properties=properties, **parameters)
self.order_by = parameters.get("order_by", [])
self.limit: int = parameters.get("limit", -1)

self.do = HeapSortDataObject(order_by=self.order_by, limit=self.limit)
self.mapped_order = []
Expand Down
18 changes: 9 additions & 9 deletions opteryx/operatorsv2/inner_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ def inner_join_with_preprocessed_left_side(left_relation, right_relation, join_c


class InnerJoinNode(JoinNode):
def __init__(self, properties: QueryProperties, **config):
super().__init__(properties=properties)
self._join_type = config["type"]
self._on = config.get("on")
self._using = config.get("using")
def __init__(self, properties: QueryProperties, **parameters):
JoinNode.__init__(self, properties=properties, **parameters)
self._join_type = parameters["type"]
self._on = parameters.get("on")
self._using = parameters.get("using")

self._left_columns = config.get("left_columns")
self._left_relation = config.get("left_relation_names")
self._left_columns = parameters.get("left_columns")
self._left_relation = parameters.get("left_relation_names")

self._right_columns = config.get("right_columns")
self._right_relation = config.get("right_relation_names")
self._right_columns = parameters.get("right_columns")
self._right_relation = parameters.get("right_relation_names")

self.stream = "left"
self.left_buffer = []
Expand Down
18 changes: 9 additions & 9 deletions opteryx/operatorsv2/inner_join_node_single.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,17 @@ def inner_join_with_preprocessed_left_side(left_relation, right_relation, join_c


class InnerJoinSingleNode(JoinNode):
def __init__(self, properties: QueryProperties, **config):
super().__init__(properties=properties)
self._join_type = config["type"]
self._on = config.get("on")
self._using = config.get("using")
def __init__(self, properties: QueryProperties, **parameters):
JoinNode.__init__(self, properties=properties, **parameters)
self._join_type = parameters["type"]
self._on = parameters.get("on")
self._using = parameters.get("using")

self._left_columns = config.get("left_columns")
self._left_relation = config.get("left_relation_names")
self._left_columns = parameters.get("left_columns")
self._left_relation = parameters.get("left_relation_names")

self._right_columns = config.get("right_columns")
self._right_relation = config.get("right_relation_names")
self._right_columns = parameters.get("right_columns")
self._right_relation = parameters.get("right_relation_names")

self.stream = "left"
self.left_buffer = []
Expand Down
8 changes: 4 additions & 4 deletions opteryx/operatorsv2/limit_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@


class LimitNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **config):
super().__init__(properties=properties)
self.limit = config.get("limit", float("inf"))
self.offset = config.get("offset", 0)
def __init__(self, properties: QueryProperties, **parameters):
BasePlanNode.__init__(self, properties=properties, **parameters)
self.limit = parameters.get("limit", float("inf"))
self.offset = parameters.get("offset", 0)

self.remaining_rows = self.limit if self.limit is not None else float("inf")
self.rows_left_to_skip = max(0, self.offset)
Expand Down
4 changes: 2 additions & 2 deletions opteryx/operatorsv2/noop_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@


class NoOpNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **config):
super().__init__(properties=properties)
def __init__(self, properties: QueryProperties, **parameters):
BasePlanNode.__init__(self, properties=properties, **parameters)

@classmethod
def from_json(cls, json_obj: str) -> "BasePlanNode": # pragma: no cover
Expand Down
18 changes: 9 additions & 9 deletions opteryx/operatorsv2/outer_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,17 +249,17 @@ def left_semi_join(


class OuterJoinNode(JoinNode):
def __init__(self, properties: QueryProperties, **config):
super().__init__(properties=properties)
self._join_type = config["type"]
self._on = config.get("on")
self._using = config.get("using")
def __init__(self, properties: QueryProperties, **parameters):
JoinNode.__init__(self, properties=properties, **parameters)
self._join_type = parameters["type"]
self._on = parameters.get("on")
self._using = parameters.get("using")

self._left_columns = config.get("left_columns")
self._left_relation = config.get("left_relation_names")
self._left_columns = parameters.get("left_columns")
self._left_relation = parameters.get("left_relation_names")

self._right_columns = config.get("right_columns")
self._right_relation = config.get("right_relation_names")
self._right_columns = parameters.get("right_columns")
self._right_relation = parameters.get("right_relation_names")

self.stream = "left"
self.left_buffer = []
Expand Down
Loading

0 comments on commit 2dcd589

Please sign in to comment.