Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Dec 1, 2024
1 parent c4e4b54 commit 81fc95a
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 33 deletions.
4 changes: 2 additions & 2 deletions opteryx/functions/other_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,12 @@ def jsonb_object_keys(arr: numpy.ndarray):
if isinstance(arr[0], dict):
# Process dictionaries
for i, row in enumerate(arr):
result[i] = [str(key) for key in row.keys()]
result[i] = [str(key) for key in row.keys()] # noqa: SIM118 - row is not a dict; .keys() is required
elif isinstance(arr[0], (str, bytes)):
# SIMD-JSON parser instance for JSON string/bytes
parser = simdjson.Parser()
for i, row in enumerate(arr):
result[i] = [str(key) for key in parser.parse(row).keys()]
result[i] = [str(key) for key in parser.parse(row).keys()] # noqa: SIM118 - row is not a dict; .keys() is required
else:
raise ValueError("Unsupported dtype for array elements. Expected dict, str, or bytes.")

Expand Down
42 changes: 18 additions & 24 deletions opteryx/models/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ def _inner_explain(node, depth):
plan = list(_inner_explain(head[0], 1))

table = pyarrow.Table.from_pylist(plan)
print(table)
return table

def execute(self, head_node=None) -> Generator[Tuple[Any, ResultType], Any, Any]:
Expand All @@ -136,10 +135,16 @@ def mark_node_exhausted(node_id):
return # Node is already marked as exhausted

node_exhaustion[node_id] = True
print("EXHAUST", node_id, self[node_id].name)

if isinstance(self[node_id], ReaderNode):
return

# Notify downstream nodes
for _, downstream_node, _ in self.outgoing_edges(node_id):
downstream_nodes = self.outgoing_edges(node_id)
if len(downstream_nodes) > 1:
raise InvalidInternalStateError("Cannot FORK execution")
elif len(downstream_nodes) == 1:
_, downstream_node, _ = downstream_nodes[0]
# Check if all parents of downstream_node are exhausted
if all(
node_exhaustion[parent] for parent, _, _ in self.ingoing_edges(downstream_node)
Expand All @@ -161,13 +166,13 @@ def update_morsel_accounting(node_id, morsel_count_change: int):
"""
with morsel_lock:
morsel_accounting[node_id] += morsel_count_change
print(
"ACCOUNT",
node_id,
morsel_accounting[node_id],
morsel_count_change,
self[node_id].name,
)
# print(
# "ACCOUNT",
# node_id,
# morsel_accounting[node_id],
# morsel_count_change,
# self[node_id].name,
# )

if morsel_accounting[node_id] < 0:
raise InvalidInternalStateError("Node input and output count in invalid state.")
Expand Down Expand Up @@ -251,13 +256,10 @@ def inner_execute(plan):
# Main engine loop processes pump nodes and coordinates work
for pump_nid, pump_instance in pump_nodes:
for morsel in pump_instance(None):
print("MORSEL")
# Initial morsels pushed to the work queue determine downstream operators
next_nodes = [target for _, target, _ in self.outgoing_edges(pump_nid)]
for downstream_node in next_nodes:
print(
f"following {self[pump_nid].name} triggering {self[downstream_node].name}"
)
# DEBUG: log (f"following initial {self[pump_nid].name} triggering {self[downstream_node].name}")
# Queue tasks for downstream operators
work_queue.put((downstream_node, morsel))
active_tasks_increment(+1)
Expand All @@ -271,13 +273,6 @@ def should_stop():
all_nodes_exhausted = all(node_exhaustion.values())
queues_empty = work_queue.empty() and response_queue.empty()
all_nodes_inactive = active_tasks <= 0
print(
list(node_exhaustion.values()),
all(node_exhaustion.values()),
work_queue.empty(),
response_queue.empty(),
active_tasks,
)
return all_nodes_exhausted and queues_empty and all_nodes_inactive

while not should_stop():
Expand All @@ -290,7 +285,7 @@ def should_stop():
# if a thread threw a error, we get them in the main
# thread here, we just reraise the error here
if isinstance(result, Exception):
raise Exception(f"{node_id} - {self[node_id]}") from result
raise result

# Handle Empty responses
if result is None:
Expand All @@ -308,14 +303,13 @@ def should_stop():
for downstream_node in downstream_nodes:
# Queue tasks for downstream operators
active_tasks_increment(+1)
# DEBUG: log (f"following {self[node_id].name} triggering {self[downstream_node].name}")
work_queue.put((downstream_node, result))
update_morsel_accounting(downstream_node, +1)

# decrement _after_ we've done the work relation to handling the task
active_tasks_increment(-1)

print("DONE!", node_exhaustion, work_queue.empty(), response_queue.empty())

for worker in workers:
work_queue.put(None)

Expand Down
9 changes: 3 additions & 6 deletions opteryx/planner/sql_rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,9 @@ def _temporal_extration_state_machine(
open_count -= 1
if in_special_function and open_count == special_function_brackets:
in_special_function = False
if relation == "":
state = WAITING
else:
# function relations, like FAKE(234,234) need the items between the
# brackets be be consumed
state = FUNCTION_RELATION
# function relations, like FAKE(234,234) need the items between the
# brackets be be consumed
state = WAITING if relation == "" else FUNCTION_RELATION

if not in_special_function:
if comparable_part in STOP_COLLECTING:
Expand Down
2 changes: 1 addition & 1 deletion tests/sql_battery/test_shapes_and_errors_battery.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
("SELECT * FROM sqlite.planets", 9, 20, None),
("SELECT * FROM $variables", 42, 4, None),
("SELECT * FROM $missions", 4630, 8, None),
("SELECT * FROM $statistics", 20, 2, None),
("SELECT * FROM $statistics", 17, 2, None),
("SELECT * FROM $stop_words", 305, 1, None),
(b"SELECT * FROM $satellites", 177, 8, None),
("SELECT * FROM testdata.missions", 4630, 8, None),
Expand Down

0 comments on commit 81fc95a

Please sign in to comment.