diff --git a/opteryx/functions/other_functions.py b/opteryx/functions/other_functions.py index dc39e2ad..eda4da41 100644 --- a/opteryx/functions/other_functions.py +++ b/opteryx/functions/other_functions.py @@ -251,12 +251,12 @@ def jsonb_object_keys(arr: numpy.ndarray): if isinstance(arr[0], dict): # Process dictionaries for i, row in enumerate(arr): - result[i] = [str(key) for key in row.keys()] + result[i] = [str(key) for key in row.keys()] # noqa: SIM118 - row is not a dict; .keys() is required elif isinstance(arr[0], (str, bytes)): # SIMD-JSON parser instance for JSON string/bytes parser = simdjson.Parser() for i, row in enumerate(arr): - result[i] = [str(key) for key in parser.parse(row).keys()] + result[i] = [str(key) for key in parser.parse(row).keys()] # noqa: SIM118 - row is not a dict; .keys() is required else: raise ValueError("Unsupported dtype for array elements. Expected dict, str, or bytes.") diff --git a/opteryx/models/physical_plan.py b/opteryx/models/physical_plan.py index aceaff75..a7be78aa 100644 --- a/opteryx/models/physical_plan.py +++ b/opteryx/models/physical_plan.py @@ -114,7 +114,6 @@ def _inner_explain(node, depth): plan = list(_inner_explain(head[0], 1)) table = pyarrow.Table.from_pylist(plan) - print(table) return table def execute(self, head_node=None) -> Generator[Tuple[Any, ResultType], Any, Any]: @@ -136,10 +135,16 @@ def mark_node_exhausted(node_id): return # Node is already marked as exhausted node_exhaustion[node_id] = True - print("EXHAUST", node_id, self[node_id].name) + + if isinstance(self[node_id], ReaderNode): + return # Notify downstream nodes - for _, downstream_node, _ in self.outgoing_edges(node_id): + downstream_nodes = self.outgoing_edges(node_id) + if len(downstream_nodes) > 1: + raise InvalidInternalStateError("Cannot FORK execution") + elif len(downstream_nodes) == 1: + _, downstream_node, _ = downstream_nodes[0] # Check if all parents of downstream_node are exhausted if all( node_exhaustion[parent] for parent, _, _ in self.ingoing_edges(downstream_node) @@ -161,13 +166,13 @@ def update_morsel_accounting(node_id, morsel_count_change: int): """ with morsel_lock: morsel_accounting[node_id] += morsel_count_change - print( - "ACCOUNT", - node_id, - morsel_accounting[node_id], - morsel_count_change, - self[node_id].name, - ) + # print( + # "ACCOUNT", + # node_id, + # morsel_accounting[node_id], + # morsel_count_change, + # self[node_id].name, + # ) if morsel_accounting[node_id] < 0: raise InvalidInternalStateError("Node input and output count in invalid state.") @@ -251,13 +256,10 @@ def inner_execute(plan): # Main engine loop processes pump nodes and coordinates work for pump_nid, pump_instance in pump_nodes: for morsel in pump_instance(None): - print("MORSEL") # Initial morsels pushed to the work queue determine downstream operators next_nodes = [target for _, target, _ in self.outgoing_edges(pump_nid)] for downstream_node in next_nodes: - print( - f"following {self[pump_nid].name} triggering {self[downstream_node].name}" - ) + # DEBUG: log (f"following initial {self[pump_nid].name} triggering {self[downstream_node].name}") # Queue tasks for downstream operators work_queue.put((downstream_node, morsel)) active_tasks_increment(+1) @@ -271,13 +273,6 @@ def should_stop(): all_nodes_exhausted = all(node_exhaustion.values()) queues_empty = work_queue.empty() and response_queue.empty() all_nodes_inactive = active_tasks <= 0 - print( - list(node_exhaustion.values()), - all(node_exhaustion.values()), - work_queue.empty(), - response_queue.empty(), - active_tasks, - ) return all_nodes_exhausted and queues_empty and all_nodes_inactive while not should_stop(): @@ -290,7 +285,7 @@ def should_stop(): # if a thread threw a error, we get them in the main # thread here, we just reraise the error here if isinstance(result, Exception): - raise Exception(f"{node_id} - {self[node_id]}") from result + raise result # Handle Empty responses if result is None: @@ -308,14 +303,13 @@ def should_stop(): for downstream_node in downstream_nodes: # Queue tasks for downstream operators active_tasks_increment(+1) + # DEBUG: log (f"following {self[node_id].name} triggering {self[downstream_node].name}") work_queue.put((downstream_node, result)) update_morsel_accounting(downstream_node, +1) # decrement _after_ we've done the work relation to handling the task active_tasks_increment(-1) - print("DONE!", node_exhaustion, work_queue.empty(), response_queue.empty()) - for worker in workers: work_queue.put(None) diff --git a/opteryx/planner/sql_rewriter.py b/opteryx/planner/sql_rewriter.py index e9f4c2b0..7d2b2f65 100644 --- a/opteryx/planner/sql_rewriter.py +++ b/opteryx/planner/sql_rewriter.py @@ -288,12 +288,9 @@ def _temporal_extration_state_machine( open_count -= 1 if in_special_function and open_count == special_function_brackets: in_special_function = False - if relation == "": - state = WAITING - else: - # function relations, like FAKE(234,234) need the items between the - # brackets be be consumed - state = FUNCTION_RELATION + # function relations, like FAKE(234,234) need the items between the + # brackets be be consumed + state = WAITING if relation == "" else FUNCTION_RELATION if not in_special_function: if comparable_part in STOP_COLLECTING: diff --git a/tests/sql_battery/test_shapes_and_errors_battery.py b/tests/sql_battery/test_shapes_and_errors_battery.py index 9a6d8169..6757ac02 100644 --- a/tests/sql_battery/test_shapes_and_errors_battery.py +++ b/tests/sql_battery/test_shapes_and_errors_battery.py @@ -83,7 +83,7 @@ ("SELECT * FROM sqlite.planets", 9, 20, None), ("SELECT * FROM $variables", 42, 4, None), ("SELECT * FROM $missions", 4630, 8, None), - ("SELECT * FROM $statistics", 20, 2, None), + ("SELECT * FROM $statistics", 17, 2, None), ("SELECT * FROM $stop_words", 305, 1, None), (b"SELECT * FROM $satellites", 177, 8, None), ("SELECT * FROM testdata.missions", 4630, 8, None),