Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Oct 7, 2023
1 parent 887aab5 commit 3d628dd
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 31 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ b:

test:
clear
python -m pytest
python -m pytest

coverage:
python -m coverage run -m pytest
python -m coverage report --include=opteryx/** -m
42 changes: 29 additions & 13 deletions opteryx/components/binder/binder_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ def visit_node(self, node: Node, context: BindingContext) -> Tuple[Node, Binding
f"Internal Error - function {visit_method_name} returned invalid Schemas"
)

if not all(isinstance(col, (Node, LogicalColumn)) for col in node.columns or []):
raise InvalidInternalStateError(
f"Internal Error - function {visit_method_name} put unexpected items in 'columns' attribute"
)

return return_node, return_context

def visit_aggregate_and_group(
Expand Down Expand Up @@ -287,10 +292,8 @@ def visit_aggregate_and_group(
def visit_exit(self, node: Node, context: BindingContext) -> Tuple[Node, BindingContext]:
# LOG: Exit

# remove the derived schema
context.schemas.pop("$derived", None)

columns = []
# clear the derived schema
context.schemas["derived"] = derived.schema()

seen = set()
needs_qualifier = any(
Expand All @@ -304,16 +307,28 @@ def name_column(qualifier, column):
return f"{qualifier}.{column.name}"
return column.name

def keep_column(column, identities):
if len(node.columns) == 1 and node.columns[0].node_type == NodeType.WILDCARD:
return True
return column.identity in identities

identities = []
for column in (col for col in node.columns if col.node_type != NodeType.WILDCARD):
new_col, _ = inner_binder(column, context, node.identity)
identities.append(new_col.schema_column.identity)

columns = []
for qualifier, schema in context.schemas.items():
for column in schema.columns:
column_reference = Node(
node_type=NodeType.IDENTIFIER,
name=column.name,
schema_column=column,
type=column.type,
query_column=name_column(qualifier, column),
)
columns.append(column_reference)
if keep_column(column, identities):
column_reference = Node(
node_type=NodeType.IDENTIFIER,
name=column.name,
schema_column=column,
type=column.type,
query_column=name_column(qualifier, column),
)
columns.append(column_reference)

node.columns = columns

Expand Down Expand Up @@ -448,7 +463,7 @@ def visit_project(self, node: Node, context: BindingContext) -> Tuple[Node, Bind
columns = []

# Handle wildcards, including qualified wildcards.
for column in node.columns:
for column in node.columns + node.order_by_columns:
if not column.node_type == NodeType.WILDCARD:
columns.append(column)
else:
Expand Down Expand Up @@ -512,6 +527,7 @@ def visit_project(self, node: Node, context: BindingContext) -> Tuple[Node, Bind
column.source_column = column.alias
current_name = column.schema_column.name
column.schema_column.name = column.alias
column.schema_column.aliases.append(column.qualified_name)
context.schemas[relation].pop_column(current_name)
context.schemas[relation].columns.append(column.schema_column)
column.alias = None
Expand Down
17 changes: 6 additions & 11 deletions opteryx/components/logical_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ def __str__(self):
if node_type == LogicalPlanStepType.Order:
return f"ORDER BY ({', '.join(format_expression(item[0]) + (' DESC' if not item[1] else '') for item in self.order_by)})"
if node_type == LogicalPlanStepType.Project:
return f"PROJECT ({', '.join(format_expression(col) for col in self.columns)})"
order_by_indicator = " +" if self.order_by_columns else ""
return f"PROJECT ({', '.join(format_expression(col) for col in self.columns)}){order_by_indicator}"
if node_type == LogicalPlanStepType.Scan:
date_range = ""
if self.start_date == self.end_date and self.start_date is not None:
Expand Down Expand Up @@ -316,6 +317,8 @@ def inner_query_planner(ast_branch):
)
for item in _order_by
]
if any(c[0].node_type == NodeType.LITERAL for c in _order_by):
raise UnsupportedSyntaxError("Cannot ORDER BY constant values")
_order_by_columns_not_in_projection = [exp[0] for exp in _order_by]

# projection
Expand Down Expand Up @@ -345,7 +348,8 @@ def inner_query_planner(ast_branch):
]

project_step = LogicalPlanNode(node_type=LogicalPlanStepType.Project)
project_step.columns = _projection + _order_by_columns_not_in_projection
project_step.columns = _projection
project_step.order_by_columns = _order_by_columns_not_in_projection
previous_step_id, step_id = step_id, random_string()
inner_plan.add_node(step_id, project_step)
if previous_step_id is not None:
Expand Down Expand Up @@ -384,15 +388,6 @@ def inner_query_planner(ast_branch):
if previous_step_id is not None:
inner_plan.add_edge(previous_step_id, step_id)

# do we need to project again?
if len(_order_by_columns_not_in_projection) > 0:
project_step = LogicalPlanNode(node_type=LogicalPlanStepType.Project)
project_step.columns = _projection
previous_step_id, step_id = step_id, random_string()
inner_plan.add_node(step_id, project_step)
if previous_step_id is not None:
inner_plan.add_edge(previous_step_id, step_id)

# limit/offset
_limit = ast_branch.get("limit")
_offset = ast_branch.get("offset")
Expand Down
83 changes: 83 additions & 0 deletions opteryx/operators/morsel_defragment_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,86 @@ def execute(self) -> Iterable:
if collected_rows:
row_counter += collected_rows.num_rows
yield collected_rows


"""
untested cython implementation
# morsel_defragment_node.pyx
from cython import boundscheck, wraparound, cdivision
import time
from typing import Iterable
import pyarrow
from opteryx.operators import BasePlanNode
cdef int MORSEL_SIZE_BYTES = 64 * 1024 * 1024 # 64Mb
cdef int MORSEL_SIZE_COUNT = 500000 # hard record count limit, half a million
cdef float HIGH_WATER = 1.99 # Split morsels over 199% of MORSEL_SIZE
cdef float LOW_WATER = 0.75 # Merge morsels under 75% of MORSEL_SIZE
@boundscheck(False)
@wraparound(False)
@cdivision(True)
class MorselDefragmentNode(BasePlanNode):
@property
def name(self): # pragma: no cover
return "Morsel Defragment"
@property
def config(self): # pragma: no cover
return ""
cpdef execute(self) -> Iterable:
cdef long start
cdef int row_counter = 0, morsel_bytes, morsel_records, new_row_count
cdef float average_record_size
cdef object morsels = self._producers[0] # type:ignore
cdef object collected_rows = None
cdef bool at_least_one_morsel = False
if not self.properties.enable_morsel_defragmentation:
yield from morsels
return
for morsel in morsels.execute():
if morsel.num_rows > 0:
start = time.monotonic_ns()
if collected_rows:
self.statistics.chunk_merges += 1
morsel = pyarrow.concat_tables([collected_rows, morsel], promote=True)
collected_rows = None
self.statistics.time_defragmenting += time.monotonic_ns() - start
morsel_bytes = morsel.nbytes
morsel_records = morsel.num_rows
if (morsel_bytes > (MORSEL_SIZE_BYTES * HIGH_WATER)) or (morsel_records > MORSEL_SIZE_COUNT):
start = time.monotonic_ns()
average_record_size = morsel_bytes / morsel_records
new_row_count = min(
int(MORSEL_SIZE_BYTES / average_record_size), MORSEL_SIZE_COUNT
)
row_counter += new_row_count
self.statistics.chunk_splits += 1
new_morsel = morsel.slice(offset=0, length=new_row_count)
at_least_one_morsel = True
collected_rows = morsel.slice(offset=new_row_count)
self.statistics.time_defragmenting += time.monotonic_ns() - start
yield new_morsel
elif morsel_bytes < (MORSEL_SIZE_BYTES * LOW_WATER):
collected_rows = morsel
else:
row_counter += morsel_records
yield morsel
at_least_one_morsel = True
elif not at_least_one_morsel:
yield morsel
at_least_one_morsel = True
if collected_rows:
row_counter += collected_rows.num_rows
yield collected_rows
"""
2 changes: 1 addition & 1 deletion opteryx/operators/projection_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, properties: QueryProperties, **config):
"""
super().__init__(properties=properties)

projection = config["projection"]
projection = config["projection"] + config.get("order_by_columns", [])

self.projection = []
for column in projection:
Expand Down
5 changes: 0 additions & 5 deletions tests/sql_battery/test_shapes_and_errors_battery.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,6 @@
("SELECT * FROM $satellites LIMIT 50 OFFSET 150", 27, 8, None),
("SELECT * FROM $satellites LIMIT 50 OFFSET 170", 7, 8, None),
("SELECT * FROM $satellites ORDER BY name", 177, 8, None),
("SELECT * FROM $satellites ORDER BY 1", 177, 8, None),
("SELECT * FROM $satellites ORDER BY 1 DESC", 177, 8, None),
("SELECT * FROM $satellites ORDER BY 2", 177, 8, None),
("SELECT * FROM $satellites ORDER BY 1, 2", 177, 8, None),
("SELECT * FROM $satellites ORDER BY 1 ASC", 177, 8, None),
("SELECT * FROM $satellites ORDER BY RANDOM()", 177, 8, None),

("SELECT MAX(planetId) FROM $satellites", 1, 1, None),
Expand Down

0 comments on commit 3d628dd

Please sign in to comment.