Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Oct 22, 2024
1 parent eaa98ba commit eb40aae
Show file tree
Hide file tree
Showing 21 changed files with 297 additions and 67 deletions.
6 changes: 6 additions & 0 deletions opteryx/compiled/structures/node.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# cython: language_level=3
# cython: nonecheck=False
# cython: cdivision=True
# cython: initializedcheck=False
# cython: infer_types=True
# cython: wraparound=True
# cython: boundscheck=False

"""
Node Module
Expand Down
52 changes: 41 additions & 11 deletions opteryx/functions/other_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import numpy
import pyarrow
import simdjson
from pyarrow import compute

from opteryx.exceptions import SqlError
Expand Down Expand Up @@ -196,17 +197,46 @@ def cosine_similarity(
return similarities


def jsonb_object_keys(arr):
def jsonb_object_keys(arr: numpy.ndarray):
"""
Extract the keys from a NumPy array of JSON objects or JSON strings/bytes.
Parameters:
arr: numpy.ndarray
A NumPy array of dictionaries or JSON-encoded strings/bytes.
Returns:
pyarrow.Array
A PyArrow Array containing lists of keys for each input element.
"""
# Early exit for empty input
if len(arr) == 0:
return []
result = []
if isinstance(arr[0], dict):
result = [[str(key) for key in row] for row in arr]
if isinstance(arr[0], (str, bytes)):
import simdjson
return numpy.array([])

# we may get pyarrow arrays here - usually not though
if isinstance(arr, pyarrow.Array):
arr = arr.to_numpy(zero_copy_only=False)

# Determine type based on dtype of the array
if not numpy.issubdtype(arr.dtype, numpy.object_):
raise ValueError(
"Unsupported array dtype. Expected object dtype for dicts or strings/bytes."
)

def keys(doc):
return simdjson.Parser().parse(doc).keys() # type:ignore
# Pre-create the result array as a NumPy boolean array set to False
result = numpy.empty(arr.shape, dtype=list)

if isinstance(arr[0], dict):
# Process dictionaries
for i, row in enumerate(arr):
result[i] = [str(key) for key in row.keys()]
elif isinstance(arr[0], (str, bytes)):
# SIMD-JSON parser instance for JSON string/bytes
parser = simdjson.Parser()
for i, row in enumerate(arr):
result[i] = [str(key) for key in parser.parse(row).keys()]
else:
raise ValueError("Unsupported dtype for array elements. Expected dict, str, or bytes.")

result = [[str(key) for key in keys(row)] for row in arr]
return pyarrow.array(result)
# Return the result as a PyArrow array
return result
25 changes: 18 additions & 7 deletions opteryx/managers/expression/binary_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,41 @@

import numpy
import pyarrow
import simdjson
from orso.types import OrsoTypes
from pyarrow import compute

from opteryx.compiled import list_ops

# Initialize simdjson parser once
parser = simdjson.Parser()


def ArrowOp(documents, elements) -> pyarrow.Array:
"""JSON Selector"""
element = elements[0]

# if it's dicts, extract the value from the dict
# Fast path: if the documents are dicts, delegate to the cython optimized op
if len(documents) > 0 and isinstance(documents[0], dict):
return list_ops.cython_arrow_op(documents, element)

# if it's a string, parse and extract, we don't need a dict (dicts are s_l_o_w)
# so we can use a library which allows us to access the values directly
import simdjson
if hasattr(documents, "to_numpy"):
documents = documents.to_numpy(zero_copy_only=False)

# Function to extract value from a document
def extract(doc: bytes, elem: Union[bytes, str]) -> Any:
value = simdjson.Parser().parse(doc).get(elem) # type:ignore
value = parser.parse(doc).get(elem) # type:ignore
if hasattr(value, "as_list"):
return value.as_list()
if hasattr(value, "as_dict"):
return value.as_dict()
return value.mini
return value

return pyarrow.array([None if d is None else extract(d, element) for d in documents])
# Use a generator expression to lazily evaluate the extraction
extracted_values = (None if d is None else extract(d, element) for d in documents)

# Return the result as a PyArrow array
return pyarrow.array(extracted_values)


def LongArrowOp(documents, elements) -> pyarrow.Array:
Expand All @@ -54,6 +62,9 @@ def LongArrowOp(documents, elements) -> pyarrow.Array:
if len(documents) > 0 and isinstance(documents[0], dict):
return list_ops.cython_long_arrow_op(documents, element)

if hasattr(documents, "to_numpy"):
documents = documents.to_numpy(zero_copy_only=False)

import simdjson

def extract(doc: bytes, elem: Union[bytes, str]) -> bytes:
Expand Down
4 changes: 2 additions & 2 deletions opteryx/managers/expression/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ def format_expression(root, qualify: bool = False):
"ShiftRight": ">>",
"Arrow": "->",
"LongArrow": "->>",
"AtQuestion": "@?",
"AtArrow": "@>",
}
return f"{format_expression(root.left, qualify)} {_map.get(root.value, root.value).upper()} {format_expression(root.right, qualify)}"
if node_type == NodeType.EXPRESSION_LIST:
Expand All @@ -116,6 +114,8 @@ def format_expression(root, qualify: bool = False):
"BitwiseOr": "|",
"LtEq": "<=",
"GtEq": ">=",
"AtQuestion": "@?",
"AtArrow": "@>",
}
return f"{format_expression(root.left, qualify)} {_map.get(root.value, root.value).upper()} {format_expression(root.right, qualify)}"
if node_type == NodeType.UNARY_OPERATOR:
Expand Down
23 changes: 21 additions & 2 deletions opteryx/managers/expression/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,28 @@ def _inner_filter_operations(arr, operator, value):

import simdjson

# Don't warn on rule SIM118, the object isn't actually a dictionary
parser = simdjson.Parser()

if not element.startswith("$."):
# Don't warn on rule SIM118, the object isn't actually a dictionary
return pyarrow.array(
[element in parser.parse(doc).keys() for doc in arr],
type=pyarrow.bool_(), # type:ignore
)

_keys = element[2:].split(".")

def json_path_extract(current_value, keys):
for key in keys:
if key not in current_value:
return False # Key doesn't exist

# Proceed to the next level of the JSON object
current_value = current_value[key]
return True # Key exists if traversal succeeds

return pyarrow.array(
[element in simdjson.Parser().parse(doc).keys() for doc in arr],
[json_path_extract(parser.parse(doc), _keys) for doc in arr],
type=pyarrow.bool_(), # type:ignore
)

Expand Down
1 change: 1 addition & 0 deletions opteryx/planner/cost_based_optimizer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(self, statistics: QueryStatistics):
ProjectionPushdownStrategy(statistics),
DistinctPushdownStrategy(statistics),
OperatorFusionStrategy(statistics),
LimitPushdownStrategy(statistics),
RedundantOperationsStrategy(statistics),
ConstantFoldingStrategy(statistics),
]
Expand Down
2 changes: 2 additions & 0 deletions opteryx/planner/cost_based_optimizer/strategies/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .boolean_simplication import BooleanSimplificationStrategy
from .constant_folding import ConstantFoldingStrategy
from .distinct_pushdown import DistinctPushdownStrategy
from .limit_pushdown import LimitPushdownStrategy
from .operator_fusion import OperatorFusionStrategy
from .predicate_pushdown import PredicatePushdownStrategy
from .predicate_rewriter import PredicateRewriteStrategy
Expand All @@ -12,6 +13,7 @@
"BooleanSimplificationStrategy",
"ConstantFoldingStrategy",
"DistinctPushdownStrategy",
"LimitPushdownStrategy",
"OperatorFusionStrategy",
"PredicatePushdownStrategy",
"PredicateRewriteStrategy",
Expand Down
63 changes: 63 additions & 0 deletions opteryx/planner/cost_based_optimizer/strategies/limit_pushdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Optimization Rule - Limit Pushdown
Type: Heuristic
Goal: Reduce Rows
We try to push the limit to the other side of PROJECTS
"""

from opteryx.planner.logical_planner import LogicalPlan
from opteryx.planner.logical_planner import LogicalPlanNode
from opteryx.planner.logical_planner import LogicalPlanStepType

from .optimization_strategy import OptimizationStrategy
from .optimization_strategy import OptimizerContext


class LimitPushdownStrategy(OptimizationStrategy):
def visit(self, node: LogicalPlanNode, context: OptimizerContext) -> OptimizerContext:
if not context.optimized_plan:
context.optimized_plan = context.pre_optimized_tree.copy() # type: ignore

if node.node_type == LogicalPlanStepType.Limit:
node.nid = context.node_id
context.collected_limits.append(node)
return context

if node.node_type in (
LogicalPlanStepType.Join,
LogicalPlanStepType.Scan,
LogicalPlanStepType.AggregateAndGroup,
LogicalPlanStepType.Aggregate,
LogicalPlanStepType.Subquery,
LogicalPlanStepType.Union,
LogicalPlanStepType.Filter,
):
# we don't push past here
for limit_node in context.collected_limits:
self.statistics.optimization_limit_pushdown += 1
context.optimized_plan.remove_node(limit_node.nid, heal=True)
context.optimized_plan.insert_node_after(
limit_node.nid, limit_node, context.node_id
)
limit_node.columns = []
context.collected_limits.clear()

return context

def complete(self, plan: LogicalPlan, context: OptimizerContext) -> LogicalPlan:
# No finalization needed for this strategy
return plan
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
# limitations under the License.

"""
Optimization Rule - Operator Fusion
Type: Heuristic
Goal: Chose more efficient physical implementations.
Some operators can be fused to be faster.
'Fused' opertors are when physical operations perform multiple logical operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ def __init__(self, tree: LogicalPlan):
"""We collect column identities so we can push column selection as close to the read as possible, including off to remote systems"""

self.collected_distincts: list = []
"""We collect distincts to try to eliminate records earlier"""
"""We collect distincts to try to eliminate rows earlier"""

self.collected_limits: list = []
"""We collect limits to to to eliminate rows earlier"""


class OptimizationStrategy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
# limitations under the License.

"""
PUSH DOWN
Optimization Rule - Predicate Pushdown
Type: Heuristic
Goal: Filter rows as early as possible
One main heuristic strategy is it eliminate rows to be processed as early
as possible, to do that we try to push filter conditions to as close to the
Expand Down Expand Up @@ -62,7 +65,6 @@ def visit(self, node: LogicalPlanNode, context: OptimizerContext) -> OptimizerCo
):
# Handle predicates specific to node types
context = self._handle_predicates(node, context)
self.statistics.optimization_predicate_pushdown += 1
context.optimized_plan.add_node(context.node_id, LogicalPlanNode(**node.properties))
if context.last_nid:
context.optimized_plan.add_edge(context.node_id, context.last_nid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
# limitations under the License.

"""
PREDICATE REWRITER
Optimization Rule - Predicate rewriter
Type: Heuristic
Goal: Chose more efficient predicate evaluations
We rewrite some conditions to a more optimal form; for example if doing a
LIKE comparison and the pattern contains no wildcards, we rewrite to be an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Optimization Rule - Projection Pushdown
Type: Heuristic
Goal: Limit columns which need to be moved around
We bind from the the scans, exposing the available columns to each operator
as we make our way to the top of the plan (usually the SELECT). The projection
pushdown is done as part of the optimizers, but isn't quite like the other
optimizations; this is collecting used column information as it goes from the
top of the plan down to the selects. The other optimizations tend to move or
remove operations, or update what a step does, this is just collecting and
updating the used columns.
"""

from typing import Set

from opteryx.managers.expression import NodeType
Expand Down Expand Up @@ -81,7 +96,6 @@ def visit(self, node: LogicalPlanNode, context: OptimizerContext) -> OptimizerCo
node.columns = node_columns

context.optimized_plan.add_node(context.node_id, LogicalPlanNode(**node.properties))
self.statistics.optimization_projection_pushdown += 1
if context.parent_nid:
context.optimized_plan.add_edge(context.node_id, context.parent_nid)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
# limitations under the License.

"""
Optimization Rule - Remove Redundant Operators
Type: Heuristic
Goal: Remove steps which don't affect the result
This optimization runs toward the end of the set, it removes operators which
were useful during planning and optimization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Optimization Rule - Split Conjections
Type: Heuristic
Goal: Break filters into units which are easier to handle
"""

from orso.tools import random_string

from opteryx.managers.expression import NodeType
Expand Down
Loading

0 comments on commit eb40aae

Please sign in to comment.