diff --git a/opteryx/__version__.py b/opteryx/__version__.py index 8494118f..98bf044f 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,4 +1,4 @@ -__build__ = 865 +__build__ = 871 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/opteryx/models/physical_plan.py b/opteryx/models/physical_plan.py index 050b6af9..2638a540 100644 --- a/opteryx/models/physical_plan.py +++ b/opteryx/models/physical_plan.py @@ -10,38 +10,64 @@ # See the License for the specific language governing permissions and # limitations under the License. -""" -The Execution Tree is the Graph which defines a Query Plan. - -The execution tree contains functionality to: - -- build and define the plan -- execute the plan -- manipulate the plan - -""" - +from queue import Empty +from queue import Queue +from threading import Lock +from threading import Thread from typing import Any from typing import Generator from typing import Optional from typing import Tuple -import pyarrow - from opteryx import EOS -from opteryx import config from opteryx.constants import ResultType from opteryx.exceptions import InvalidInternalStateError from opteryx.third_party.travers import Graph +import pyarrow + +morsel_lock = Lock() +active_task_lock = Lock() +active_tasks: int = 0 + +def active_tasks_increment(value: int): + global active_tasks + with active_task_lock: + active_tasks += value + class PhysicalPlan(Graph): """ - The execution tree is defined separately to the planner to simplify the - complex code which is the planner from the tree that describes the plan. + The execution tree is defined separately from the planner to simplify the + complex code that is the planner from the tree that describes the plan. """ - def explainv2(self, analyze: bool) -> Generator[pyarrow.Table, None, None]: + def depth_first_search_flat( + self, node: Optional[str] = None, visited: Optional[set] = None + ) -> list: + """ + Returns a flat list representing the depth-first traversal of the graph with left/right ordering. + """ + if node is None: + node = self.get_exit_points()[0] + + if visited is None: + visited = set() + + visited.add(node) + traversal_list = [(node, self[node])] + + # Sort neighbors based on relationship to ensure left, right, then unlabelled order + neighbors = sorted(self.ingoing_edges(node), key=lambda x: (x[2] == "right", x[2] == "")) + + for neighbor, _, _ in neighbors: + if neighbor not in visited: + child_list = self.depth_first_search_flat(neighbor, visited) + traversal_list.extend(child_list) + + return traversal_list + + def explain(self, analyze: bool) -> Generator[pyarrow.Table, None, None]: from opteryx import operators def _inner_explain(node, depth): @@ -86,47 +112,11 @@ def _inner_explain(node, depth): plan = list(_inner_explain(head[0], 1)) table = pyarrow.Table.from_pylist(plan) + print(table) + return table - yield table - - def depth_first_search_flat( - self, node: Optional[str] = None, visited: Optional[set] = None - ) -> list: - """ - Returns a flat list representing the depth-first traversal of the graph with left/right ordering. - - We do this so we always evaluate the left side of a join before the right side. It technically - doesn't need the entire plan flattened DFS-wise, but this is what we are doing here to achieve - the outcome we're after. - """ - if node is None: - node = self.get_exit_points()[0] - - if visited is None: - visited = set() - visited.add(node) - - # Collect this node's information in a flat list format - traversal_list = [ - ( - node, - self[node], - ) - ] - - # Sort neighbors based on relationship to ensure left, right, then unlabelled order - neighbors = sorted(self.ingoing_edges(node), key=lambda x: (x[2] == "right", x[2] == "")) - - # Traverse each child, prioritizing left, then right, then unlabelled - for neighbor, _, _ in neighbors: - if neighbor not in visited: - child_list = self.depth_first_search_flat(neighbor, visited) - traversal_list.extend(child_list) - - return traversal_list - - def execute(self, head_node=None) -> Tuple[Generator[pyarrow.Table, Any, Any], ResultType]: + def execute(self, head_node=None) -> Generator[Tuple[Any, ResultType], Any, Any]: from opteryx.operators import ExplainNode from opteryx.operators import JoinNode from opteryx.operators import ReaderNode @@ -134,13 +124,57 @@ def execute(self, head_node=None) -> Tuple[Generator[pyarrow.Table, Any, Any], R from opteryx.operators import ShowCreateNode from opteryx.operators import ShowValueNode - # Validate query plan to ensure it's acyclic + morsel_accounting = {nid: 0 for nid in self.nodes()} # Total morsels received by each node + node_exhaustion = {nid: False for nid in self.nodes()} # Exhaustion state of each node + + def mark_node_exhausted(node_id): + """ + Mark a node as exhausted and propagate exhaustion downstream. + """ + if node_exhaustion[node_id]: + return # Node is already marked as exhausted + + node_exhaustion[node_id] = True + print("+", node_id, self[node_id].name) + + # Notify downstream nodes + for _, downstream_node, _ in self.outgoing_edges(node_id): + # Check if all parents of downstream_node are exhausted + if all( + node_exhaustion[parent] for parent, _, _ in self.ingoing_edges(downstream_node) + ): + work_queue.put((downstream_node, EOS)) # EOS signals exhaustion + active_tasks_increment(+1) + morsel_accounting[node_id] += 1 + + def update_morsel_accounting(node_id, morsel_count_change: int): + """ + Updates the morsel accounting for a node and checks for exhaustion. + + Parameters: + node_id (str): The ID of the node to update. + morsel_count_change (int): The change in morsel count (+1 for increment, -1 for decrement). + + Returns: + None + """ + with morsel_lock: + morsel_accounting[node_id] += morsel_count_change + # print(">", node_id, morsel_accounting[node_id], morsel_count_change, self[node_id].name) + + # Check if the node is exhausted + if morsel_accounting[node_id] <= 0: # No more pending morsels for this node + # Ensure all parent nodes are exhausted + all_parents_exhausted = all( + node_exhaustion[parent] for parent, _, _ in self.ingoing_edges(node_id) + ) + if all_parents_exhausted: + mark_node_exhausted(node_id) + if not self.is_acyclic(): raise InvalidInternalStateError("Query plan is cyclic, cannot execute.") - # Retrieve the tail of the query plan, which should ideally be a single head node head_nodes = list(set(self.get_exit_points())) - if len(head_nodes) != 1: raise InvalidInternalStateError( f"Query plan has {len(head_nodes)} heads, expected exactly 1." @@ -149,77 +183,118 @@ def execute(self, head_node=None) -> Tuple[Generator[pyarrow.Table, Any, Any], R if head_node is None: head_node = self[head_nodes[0]] - # add the left/right labels to the edges coming into the joins - joins = [(nid, node) for nid, node in self.nodes(True) if isinstance(node, JoinNode)] - for nid, join in joins: - for s, t, r in self.breadth_first_search(nid, reverse=True): - source_relations = self[s].parameters.get("all_relations", set()) - if set(join._left_relation).intersection(source_relations): - self.remove_edge(s, t, r) - self.add_edge(s, t, "left") - elif set(join._right_relation).intersection(source_relations): - self.remove_edge(s, t, r) - self.add_edge(s, t, "right") - # Special case handling for 'Explain' queries if isinstance(head_node, ExplainNode): - yield self.explainv2(head_node.analyze), ResultType.TABULAR - - # Special case handling for 'Set' queries - elif isinstance(head_node, SetVariableNode): - yield head_node(None), ResultType.NON_TABULAR + yield self.explain(head_node.analyze), ResultType.TABULAR - elif isinstance(head_node, (ShowValueNode, ShowCreateNode)): + elif isinstance(head_node, (SetVariableNode, ShowValueNode, ShowCreateNode)): yield head_node(None), ResultType.TABULAR else: + # Work queue for worker tasks + work_queue = Queue() + # Response queue for results sent back to the engine + response_queue = Queue() + num_workers = 1 + workers = [] + + def worker_process(): + """ + Worker thread: Processes tasks from the work queue and sends results to the response queue. + """ + while True: + task = work_queue.get() + if task is None: + break + + node_id, morsel = task + if morsel_accounting[node_id] is False: + print("RUNNING AN EXHAUSTED NODE") + operator = self[node_id] + results = operator(morsel) + + for result in results: + # Send results back to the response queue + response_queue.put((node_id, result)) + + update_morsel_accounting(node_id, -1) + + work_queue.task_done() + + # Launch worker threads + for _ in range(num_workers): + worker = Thread(target=worker_process) + worker.daemon = True + worker.start() + workers.append(worker) def inner_execute(plan): - # Get the pump nodes from the plan and execute them in order + # Identify pump nodes + global active_tasks + pump_nodes = [ (nid, node) for nid, node in self.depth_first_search_flat() if isinstance(node, ReaderNode) ] + + # Main engine loop processes pump nodes and coordinates work for pump_nid, pump_instance in pump_nodes: for morsel in pump_instance(None): - yield from plan.process_node(pump_nid, morsel) + # Initial morsels pushed to the work queue + # Determine downstream operators + next_nodes = [target for _, target, _ in self.outgoing_edges(pump_nid)] + for downstream_node in next_nodes: + # Queue tasks for downstream operators + work_queue.put((downstream_node, morsel)) + active_tasks_increment(+1) + update_morsel_accounting(downstream_node, +1) + + # Pump is exhausted after emitting all morsels + mark_node_exhausted(pump_nid) + + # Process results from the response queue + def should_stop(): + all_nodes_exhausted = all(node_exhaustion.values()) + queues_empty = work_queue.empty() and response_queue.empty() + all_nodes_inactive = active_tasks <= 0 + print(node_exhaustion.values(), all(node_exhaustion.values()), work_queue.empty(), response_queue.empty(), active_tasks) + return all_nodes_exhausted and queues_empty and all_nodes_inactive + + while not should_stop(): + # Wait for results from workers + try: + node_id, result = response_queue.get(timeout=0.1) + except Empty: + continue + + # Handle EOS + if result is None or result == EOS: + active_tasks_increment(-1) + continue + + # Determine downstream operators + downstream_nodes = [target for _, target, _ in self.outgoing_edges(node_id)] + if len(downstream_nodes) == 0: + # print("YIELD") + yield result + else: + for downstream_node in downstream_nodes: + # Queue tasks for downstream operators + active_tasks_increment(+1) + work_queue.put((downstream_node, result)) + update_morsel_accounting(downstream_node, +1) - yield inner_execute(self), ResultType.TABULAR + # decrement _after_ we've done the work relation to handling the task + active_tasks_increment(-1) - def process_node(self, nid, morsel): - from opteryx.operators import ReaderNode + # print("DONE!", node_exhaustion, work_queue.empty(), response_queue.empty()) - node = self[nid] + for worker in workers: + work_queue.put(None) - if isinstance(node, ReaderNode): - children = (t for s, t, r in self.outgoing_edges(nid)) - for child in children: - results = self.process_node(child, morsel) - results = list(results) - yield from results - else: - results = node(morsel) - if results is None: - return None - if not isinstance(results, list): - results = [results] - if morsel == EOS and not any(r == EOS for r in results): - results.append(EOS) - for result in results: - if result is not None: - children = [t for s, t, r in self.outgoing_edges(nid)] - for child in children: - yield from self.process_node(child, result) - if len(children) == 0 and result != EOS: - yield result + # Wait for all workers to complete + for worker in workers: + worker.join() - def sensors(self): - readings = {} - for nid in self.nodes(): - node = self[nid] - readings[node.identity] = node.sensors() - return readings - - def __del__(self): - pass + yield inner_execute(self), ResultType.TABULAR diff --git a/opteryx/models/query_statistics.py b/opteryx/models/query_statistics.py index 1d9ee5e6..24a771fb 100644 --- a/opteryx/models/query_statistics.py +++ b/opteryx/models/query_statistics.py @@ -36,6 +36,9 @@ def __setattr__(self, attr, value): else: self._stats[attr] = value + def increase(self, attr: str, amount: float): + self._stats[attr] += amount + def add_message(self, message: str): """collect warnings""" if "messages" not in self._stats: diff --git a/opteryx/operators/aggregate_and_group_node.py b/opteryx/operators/aggregate_and_group_node.py index 6cc40d28..c5463a52 100644 --- a/opteryx/operators/aggregate_and_group_node.py +++ b/opteryx/operators/aggregate_and_group_node.py @@ -104,7 +104,7 @@ def config(self): # pragma: no cover def name(self): # pragma: no cover return "Group" - def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: + def execute(self, morsel: pyarrow.Table): if morsel == EOS: # merge all the morsels together into one table, selecting only the columns # we're pretty sure we're going to use - this will fail for datasets @@ -138,7 +138,9 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: groups = groups.select(list(self.column_map.values()) + self.group_by_columns) groups = groups.rename_columns(list(self.column_map.keys()) + self.group_by_columns) - return [groups, EOS] + yield groups + yield EOS + return morsel = project(morsel, self.all_identifiers) # Add a "*" column, this is an int because when a bool it miscounts diff --git a/opteryx/operators/aggregate_node.py b/opteryx/operators/aggregate_node.py index 0d6d7514..81142d93 100644 --- a/opteryx/operators/aggregate_node.py +++ b/opteryx/operators/aggregate_node.py @@ -221,10 +221,11 @@ def name(self): # pragma: no cover def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: if morsel == EOS: if _is_count_star(self.aggregates): - return _count_star( + yield _count_star( morsel_promise=self.buffer, column_name=self.aggregates[0].schema_column.identity, ) + return # merge all the morsels together into one table, selecting only the columns # we're pretty sure we're going to use - this will fail for datasets @@ -248,6 +249,9 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: # name the aggregate fields and add them to the Columns data aggregates = aggregates.select(list(self.column_map.keys())) - return [aggregates, EOS] + yield aggregates + + return self.buffer.append(project(morsel, self.all_identifiers)) + yield None diff --git a/opteryx/operators/async_read_node.py b/opteryx/operators/async_read_node.py index 792936a0..7096f68d 100644 --- a/opteryx/operators/async_read_node.py +++ b/opteryx/operators/async_read_node.py @@ -209,5 +209,3 @@ def execute(self, morsel) -> Generator: yield pyarrow.Table.from_arrays( [pyarrow.array([]) for _ in arrow_schema], schema=arrow_schema ) - - yield EOS diff --git a/opteryx/operators/base_plan_node.py b/opteryx/operators/base_plan_node.py index 7e025efa..0bff180c 100644 --- a/opteryx/operators/base_plan_node.py +++ b/opteryx/operators/base_plan_node.py @@ -95,14 +95,29 @@ def __call__(self, morsel: pyarrow.Table) -> Optional[pyarrow.Table]: self.bytes_in += morsel.nbytes self.calls += 1 - start_time = time.monotonic_ns() - result = self.execute(morsel) - - self.execution_time += time.monotonic_ns() - start_time - if result is not None and result != EOS and hasattr(result, "num_rows"): - self.records_out += result.num_rows - self.bytes_out += result.nbytes - return result + generator = self.execute(morsel) # Initialize the generator + + while True: + try: + # Time the production of the next result + start_time = time.monotonic_ns() + result = next(generator) # Retrieve the next item from the generator + self.execution_time += time.monotonic_ns() - start_time + self.statistics.increase( + "time_" + self.name.lower(), time.monotonic_ns() - start_time + ) + + # Update metrics for valid results + if result is not None and result != EOS and hasattr(result, "num_rows"): + self.records_out += result.num_rows + self.bytes_out += result.nbytes + + # Yield the result to the consumer + yield result + + except StopIteration: + # Break the loop when the generator is exhausted + break def sensors(self): return { diff --git a/opteryx/operators/cross_join_node.py b/opteryx/operators/cross_join_node.py index 490762ba..5a0ed526 100644 --- a/opteryx/operators/cross_join_node.py +++ b/opteryx/operators/cross_join_node.py @@ -330,26 +330,25 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: if self._unnest_column is not None: if morsel == EOS: self.continue_executing = False - return EOS + return if isinstance(self._unnest_column.value, tuple): - return list( - _cross_join_unnest_literal( - morsel=morsel, - source=self._unnest_column.value, - target_column=self._unnest_target, - ) - ) - return list( - _cross_join_unnest_column( + yield from _cross_join_unnest_literal( morsel=morsel, - source=self._unnest_column, + source=self._unnest_column.value, target_column=self._unnest_target, - conditions=self._filters, - hash_set=self.hash_set, - distinct=self._distinct, - single_column=self._single_column, ) + return + + yield from _cross_join_unnest_column( + morsel=morsel, + source=self._unnest_column, + target_column=self._unnest_target, + conditions=self._filters, + hash_set=self.hash_set, + distinct=self._distinct, + single_column=self._single_column, ) + return if self.stream == "left": if morsel == EOS: @@ -358,13 +357,13 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: self.left_buffer.clear() else: self.left_buffer.append(morsel) - return None + yield None if self.stream == "right": if morsel == EOS: right_table = pyarrow.concat_tables(self.right_buffer, promote_options="none") # type:ignore self.right_buffer = None - return list(_cross_join(self.left_relation, right_table)) + yield from _cross_join(self.left_relation, right_table) else: self.right_buffer.append(morsel) - return None + yield None diff --git a/opteryx/operators/distinct_node.py b/opteryx/operators/distinct_node.py index 60cf76c2..2a1478f9 100644 --- a/opteryx/operators/distinct_node.py +++ b/opteryx/operators/distinct_node.py @@ -59,7 +59,8 @@ def execute(self, morsel: Table) -> Table: # limit processing if morsel == EOS: - return EOS + yield EOS + return unique_indexes, self.hash_set = distinct( morsel, columns=self._distinct_on, seen_hashes=self.hash_set @@ -67,7 +68,7 @@ def execute(self, morsel: Table) -> Table: if len(unique_indexes) > 0: distinct_table = morsel.take(unique_indexes) - return distinct_table + yield distinct_table else: distinct_table = morsel.slice(0, 0) - return distinct_table + yield distinct_table diff --git a/opteryx/operators/exit_node.py b/opteryx/operators/exit_node.py index a428e955..c1288939 100644 --- a/opteryx/operators/exit_node.py +++ b/opteryx/operators/exit_node.py @@ -67,7 +67,8 @@ def name(self): # pragma: no cover def execute(self, morsel: Table) -> Table: # Exit doesn't return EOS if morsel == EOS: - return None + yield None + return final_columns = [] final_names = [] @@ -105,4 +106,4 @@ def execute(self, morsel: Table) -> Table: morsel = morsel.select(final_columns) morsel = morsel.rename_columns(final_names) - return morsel + yield morsel diff --git a/opteryx/operators/explain_node.py b/opteryx/operators/explain_node.py index 2b16067a..c52e7035 100644 --- a/opteryx/operators/explain_node.py +++ b/opteryx/operators/explain_node.py @@ -45,4 +45,4 @@ def from_json(cls, json_obj: str) -> "BasePlanNode": # pragma: no cover def execute(self, morsel: Table) -> Table: if self._query_plan: - return self._query_plan.explain(self.analyze) + yield self._query_plan.explain(self.analyze) diff --git a/opteryx/operators/filter_node.py b/opteryx/operators/filter_node.py index c4cff2e7..1346d4a6 100644 --- a/opteryx/operators/filter_node.py +++ b/opteryx/operators/filter_node.py @@ -57,10 +57,12 @@ def name(self): # pragma: no cover def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: if morsel == EOS: - return EOS + yield EOS + return if morsel.num_rows == 0: - return morsel + yield morsel + return if self.function_evaluations: morsel = evaluate_and_append(self.function_evaluations, morsel) @@ -77,5 +79,6 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: # if there's no matching rows, just drop the morsel if mask.size > 0 and not numpy.all(mask is None): - return morsel.take(pyarrow.array(mask)) - return morsel.slice(0, 0) + yield morsel.take(pyarrow.array(mask)) + else: + yield morsel.slice(0, 0) diff --git a/opteryx/operators/function_dataset_node.py b/opteryx/operators/function_dataset_node.py index 9ac8cf80..ce2912d9 100644 --- a/opteryx/operators/function_dataset_node.py +++ b/opteryx/operators/function_dataset_node.py @@ -148,4 +148,3 @@ def execute(self, morsel) -> Generator: self.statistics.columns_read += len(table.column_names) yield table - yield EOS diff --git a/opteryx/operators/heap_sort_node.py b/opteryx/operators/heap_sort_node.py index 782e8ab4..3a5a1d1f 100644 --- a/opteryx/operators/heap_sort_node.py +++ b/opteryx/operators/heap_sort_node.py @@ -84,7 +84,8 @@ def name(self): # pragma: no cover def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: if morsel == EOS: - return [self.table, EOS] + yield self.table + return if self.table: # Concatenate the accumulated table with the new morsel diff --git a/opteryx/operators/inner_join_node.py b/opteryx/operators/inner_join_node.py index 533a0060..5d5ef74a 100644 --- a/opteryx/operators/inner_join_node.py +++ b/opteryx/operators/inner_join_node.py @@ -118,10 +118,11 @@ def execute(self, morsel: Table) -> Table: self.left_hash = hash_join_map(self.left_relation, self._left_columns) else: self.left_buffer.append(morsel) - return None + yield None + return if morsel == EOS: - return EOS + return # do the join new_morsel = inner_join_with_preprocessed_left_side( @@ -131,4 +132,4 @@ def execute(self, morsel: Table) -> Table: hash_table=self.left_hash, ) - return new_morsel + yield new_morsel diff --git a/opteryx/operators/inner_join_node_single.py b/opteryx/operators/inner_join_node_single.py index f2f45692..2187e770 100644 --- a/opteryx/operators/inner_join_node_single.py +++ b/opteryx/operators/inner_join_node_single.py @@ -204,10 +204,11 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: self.left_hash = preprocess_left(self.left_relation, self._left_columns) else: self.left_buffer.append(morsel) - return None + yield None + return if morsel == EOS: - return EOS + return # do the join new_morsel = inner_join_with_preprocessed_left_side( @@ -217,4 +218,4 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: hash_table=self.left_hash, ) - return new_morsel + yield new_morsel diff --git a/opteryx/operators/limit_node.py b/opteryx/operators/limit_node.py index 20b20482..b4e4a40a 100644 --- a/opteryx/operators/limit_node.py +++ b/opteryx/operators/limit_node.py @@ -49,12 +49,13 @@ def config(self): # pragma: no cover def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: if morsel == EOS: - return EOS + return if self.rows_left_to_skip > 0: if self.rows_left_to_skip >= morsel.num_rows: self.rows_left_to_skip -= morsel.num_rows - return morsel.slice(offset=0, length=0) + yield morsel.slice(offset=0, length=0) + return else: morsel = morsel.slice( offset=self.rows_left_to_skip, length=morsel.num_rows - self.rows_left_to_skip @@ -62,12 +63,13 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: self.rows_left_to_skip = 0 if self.remaining_rows <= 0 or morsel.num_rows == 0: - return morsel.slice(offset=0, length=0) + yield morsel.slice(offset=0, length=0) + yield if morsel.num_rows < self.remaining_rows: self.remaining_rows -= morsel.num_rows - return morsel + yield morsel else: rows_to_slice = self.remaining_rows self.remaining_rows = 0 - return morsel.slice(offset=0, length=rows_to_slice) + yield morsel.slice(offset=0, length=rows_to_slice) diff --git a/opteryx/operators/noop_node.py b/opteryx/operators/noop_node.py index b0c4bce8..5b327d59 100644 --- a/opteryx/operators/noop_node.py +++ b/opteryx/operators/noop_node.py @@ -41,4 +41,4 @@ def config(self): # pragma: no cover def execute(self, morsel: Table) -> Table: print("NOOP was called") - return [morsel] + yield morsel diff --git a/opteryx/operators/outer_join_node.py b/opteryx/operators/outer_join_node.py index 191d43c2..479dc98e 100644 --- a/opteryx/operators/outer_join_node.py +++ b/opteryx/operators/outer_join_node.py @@ -292,7 +292,8 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: self.left_buffer.clear() else: self.left_buffer.append(morsel) - return None + yield None + return if self.stream == "right": if morsel == EOS: @@ -301,18 +302,16 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: join_provider = providers.get(self._join_type) - return list( - join_provider( - left_relation=self.left_relation, - right_relation=right_relation, - left_columns=self._left_columns, - right_columns=self._right_columns, - ) - ) + [EOS] + yield from join_provider( + left_relation=self.left_relation, + right_relation=right_relation, + left_columns=self._left_columns, + right_columns=self._right_columns, + ) else: self.right_buffer.append(morsel) - return None + yield None providers = { diff --git a/opteryx/operators/projection_node.py b/opteryx/operators/projection_node.py index 35b89059..8a858fe8 100644 --- a/opteryx/operators/projection_node.py +++ b/opteryx/operators/projection_node.py @@ -64,8 +64,8 @@ def name(self): # pragma: no cover def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: if morsel == EOS: - return EOS + return # If any of the columns need evaluating, we need to do that here morsel = evaluate_and_append(self.evaluations, morsel) - return morsel.select(self.projection) + yield morsel.select(self.projection) diff --git a/opteryx/operators/pyarrow_join_node.py b/opteryx/operators/pyarrow_join_node.py index 02b5ed13..6592c25a 100644 --- a/opteryx/operators/pyarrow_join_node.py +++ b/opteryx/operators/pyarrow_join_node.py @@ -78,7 +78,8 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: else: self.left_buffer.append(morsel) - return None + yield None + return if morsel == EOS: right_relation = pyarrow.concat_tables(self.right_buffer, promote_options="none") @@ -111,8 +112,8 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: "Unable to ANTI/SEMI JOIN with unsupported column types in table." ) from err - return [new_morsel, EOS] + yield new_morsel else: self.right_buffer.append(morsel) - return None + yield None diff --git a/opteryx/operators/read_node.py b/opteryx/operators/read_node.py index e81bcb7e..69d5b0d2 100644 --- a/opteryx/operators/read_node.py +++ b/opteryx/operators/read_node.py @@ -224,5 +224,3 @@ def execute(self, morsel) -> Generator: self.statistics.columns_read += morsel.num_columns else: self.statistics.columns_read += len(orso_schema.columns) - - yield EOS diff --git a/opteryx/operators/show_columns_node.py b/opteryx/operators/show_columns_node.py index 3d57a8c2..7388babc 100644 --- a/opteryx/operators/show_columns_node.py +++ b/opteryx/operators/show_columns_node.py @@ -75,13 +75,15 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: from orso import DataFrame if self.seen: - return None + yield None + return if not (self._full or self._extended): # if it's not full or extended, do just get the list of columns and their # types self.seen = True - return _simple_collector(self._schema) + yield _simple_collector(self._schema) + return if self._full or self._extended: # we're going to read the full table, so we can count stuff @@ -90,7 +92,8 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: dicts = self.collector.to_dicts() dicts = [self.rename_column(d, self._column_map) for d in dicts] self.seen = True - return pyarrow.Table.from_pylist(dicts) + yield pyarrow.Table.from_pylist(dicts) + return df = DataFrame.from_arrow(morsel) @@ -99,4 +102,4 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: else: self.collector += df.profile - return None + yield None diff --git a/opteryx/operators/sort_node.py b/opteryx/operators/sort_node.py index 12c39924..c6ede6fe 100644 --- a/opteryx/operators/sort_node.py +++ b/opteryx/operators/sort_node.py @@ -53,7 +53,8 @@ def name(self): # pragma: no cover def execute(self, morsel: Table) -> Table: if morsel != EOS: self.morsels.append(morsel) - return None + yield None + return table = concat_tables(self.morsels, promote_options="permissive") @@ -67,7 +68,9 @@ def execute(self, morsel: Table) -> Table: if column.value in ("RANDOM", "RAND"): new_order = numpy.argsort(numpy.random.uniform(size=table.num_rows)) table = table.take(new_order) - return table + yield table + yield EOS + return raise UnsupportedSyntaxError( "`ORDER BY` only supports `RAND()` as a functional sort order." @@ -97,4 +100,5 @@ def execute(self, morsel: Table) -> Table: f"`ORDER BY` must reference columns as they appear in the `SELECT` clause. {cnfe}" ) - return [table.sort_by(mapped_order), EOS] + yield table.sort_by(mapped_order) + yield EOS diff --git a/opteryx/operators/union_node.py b/opteryx/operators/union_node.py index a59a0753..c8623f69 100644 --- a/opteryx/operators/union_node.py +++ b/opteryx/operators/union_node.py @@ -50,15 +50,15 @@ def execute(self, morsel: Table) -> Table: coercible types are coerced. """ if morsel == EOS and self.seen_first_eos: - return [EOS] - if morsel == EOS: + return + elif morsel == EOS: self.seen_first_eos = True - return None + yield None - if self.schema is None: + elif self.schema is None: self.schema = morsel.schema else: morsel = morsel.rename_columns(self.schema.names) morsel = morsel.cast(self.schema) - return morsel.select(self.column_ids) + yield morsel.select(self.column_ids) diff --git a/opteryx/planner/cost_based_optimizer/strategies/constant_folding.py b/opteryx/planner/cost_based_optimizer/strategies/constant_folding.py index 8dc449a4..21ac51b7 100644 --- a/opteryx/planner/cost_based_optimizer/strategies/constant_folding.py +++ b/opteryx/planner/cost_based_optimizer/strategies/constant_folding.py @@ -137,6 +137,24 @@ def fold_constants(root: Node, statistics: QueryStatistics) -> Node: node.schema_column = root.schema_column return node + if root.node_type == NodeType.COMPARISON_OPERATOR: + if ( + root.value in ("Like", "Ilike") + and root.left.node_type == NodeType.IDENTIFIER + and root.right.node_type == NodeType.LITERAL + and root.right.value == "%" + ): + # column LIKE '%' is True + node = Node(node_type=NodeType.UNARY_OPERATOR) + node.type = OrsoTypes.BOOLEAN + node.value = "IsNotNull" + node.schema_column = root.schema_column + node.centre = root.left + node.query_column = root.query_column + statistics.optimization_constant_fold_reduce += 1 + return node + + if root.node_type in {NodeType.AND, NodeType.OR, NodeType.XOR}: # try to fold each side of logical operators root.left = fold_constants(root.left, statistics) diff --git a/tests/sql_battery/test_shapes_and_errors_battery.py b/tests/sql_battery/test_shapes_and_errors_battery.py index 7c26aeea..9a6d8169 100644 --- a/tests/sql_battery/test_shapes_and_errors_battery.py +++ b/tests/sql_battery/test_shapes_and_errors_battery.py @@ -1813,14 +1813,28 @@ ("SELECT name, missions FROM $astronauts WHERE missions ILIKE ANY ('%Apoll%')", 34, 2, None), ("SELECT name, missions FROM $astronauts WHERE missions LIKE ANY ('%Apoll%', 'mission')", 34, 2, None), ("SELECT name, missions FROM $astronauts WHERE missions ILIKE ANY ('%Apoll%', 'mission')", 34, 2, None), - ("SELECT name, missions FROM $astronauts WHERE missions NOT LIKE ANY '%apoll%'", 0, 2, None), - ("SELECT name, missions FROM $astronauts WHERE missions NOT ILIKE ANY '%apoll%'", 34, 2, None), - ("SELECT name, missions FROM $astronauts WHERE missions NOT LIKE ANY ('%apoll%')", 0, 2, None), - ("SELECT name, missions FROM $astronauts WHERE missions NOT ILIKE ANY ('%apoll%')", 34, 2, None), - ("SELECT name, missions FROM $astronauts WHERE missions NOT LIKE ANY ('%Apoll%')", 34, 2, None), - ("SELECT name, missions FROM $astronauts WHERE missions NOT ILIKE ANY ('%Apoll%')", 34, 2, None), - ("SELECT name, missions FROM $astronauts WHERE missions NOT LIKE ANY ('%Apoll%', 'mission')", 34, 2, None), - ("SELECT name, missions FROM $astronauts WHERE missions NOT ILIKE ANY ('%Apoll%', 'mission')", 34, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions NOT LIKE ANY '%apoll%'", 357, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions NOT ILIKE ANY '%apoll%'", 323, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions NOT LIKE ANY ('%apoll%')", 357, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions NOT ILIKE ANY ('%apoll%')", 323, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions NOT LIKE ANY ('%Apoll%')", 323, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions NOT ILIKE ANY ('%Apoll%')", 323, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions NOT LIKE ANY ('%Apoll%', 'mission')", 323, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions NOT ILIKE ANY ('%Apoll%', 'mission')", 323, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions LIKE ANY ('Apoll%', 'Gemini%', 'Mercury%')", 37, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions NOT LIKE ANY ('Apoll%', 'Gemini%', 'Mercury%')", 320, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions LIKE ANY ()", 0, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions NOT LIKE ANY ()", 0, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions LIKE ANY ('%Apoll%', null)", 37, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions NOT LIKE ANY ('%Apoll%', null)", 37, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions LIKE ANY ('%aPoll%')", 37, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions ILIKE ANY ('%aPoll%')", 37, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions LIKE ANY ('Apollo 11')", 37, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions NOT LIKE ANY ('Apollo 11')", 37, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions LIKE ANY ('Apollo_%')", 37, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions LIKE ANY ('Apo__o%')", 37, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions LIKE ANY ('%Apoll%', 123)", 37, 2, None), + ("SELECT name, missions FROM $astronauts WHERE missions LIKE ANY ('%pattern1%', '%pattern2%', '%pattern3%', '%pattern4%', '%pattern5%', '%pattern6%', '%pattern7%', '%pattern8%', '%pattern9%', '%pattern10%', '%pattern11%', '%pattern12%', '%pattern13%', '%pattern14%', '%pattern15%', '%pattern16%', '%pattern17%', '%pattern18%', '%pattern19%', '%pattern20%', '%pattern21%', '%pattern22%', '%pattern23%', '%pattern24%', '%pattern25%', '%pattern26%', '%pattern27%', '%pattern28%', '%pattern29%', '%pattern30%', '%pattern31%', '%pattern32%', '%pattern33%', '%pattern34%', '%pattern35%', '%pattern36%', '%pattern37%', '%pattern38%', '%pattern39%', '%pattern40%', '%pattern41%', '%pattern42%', '%pattern43%', '%pattern44%', '%pattern45%', '%pattern46%', '%pattern47%', '%pattern48%', '%pattern49%', '%pattern50%');", 37, 2, None), # ****************************************************************************************