From 3d628dd974d69e32df84d39020fd6caed8687fbf Mon Sep 17 00:00:00 2001 From: joocer Date: Sat, 7 Oct 2023 19:40:37 +0100 Subject: [PATCH] #1185 --- Makefile | 6 +- opteryx/components/binder/binder_visitor.py | 42 +++++++--- opteryx/components/logical_planner.py | 17 ++-- opteryx/operators/morsel_defragment_node.py | 83 +++++++++++++++++++ opteryx/operators/projection_node.py | 2 +- .../test_shapes_and_errors_battery.py | 5 -- 6 files changed, 124 insertions(+), 31 deletions(-) diff --git a/Makefile b/Makefile index bb7a0afe6..9cb5270e9 100644 --- a/Makefile +++ b/Makefile @@ -18,4 +18,8 @@ b: test: clear - python -m pytest \ No newline at end of file + python -m pytest + +coverage: + python -m coverage run -m pytest + python -m coverage report --include=opteryx/** -m \ No newline at end of file diff --git a/opteryx/components/binder/binder_visitor.py b/opteryx/components/binder/binder_visitor.py index e03dc6be3..f11c59bef 100644 --- a/opteryx/components/binder/binder_visitor.py +++ b/opteryx/components/binder/binder_visitor.py @@ -237,6 +237,11 @@ def visit_node(self, node: Node, context: BindingContext) -> Tuple[Node, Binding f"Internal Error - function {visit_method_name} returned invalid Schemas" ) + if not all(isinstance(col, (Node, LogicalColumn)) for col in node.columns or []): + raise InvalidInternalStateError( + f"Internal Error - function {visit_method_name} put unexpected items in 'columns' attribute" + ) + return return_node, return_context def visit_aggregate_and_group( @@ -287,10 +292,8 @@ def visit_aggregate_and_group( def visit_exit(self, node: Node, context: BindingContext) -> Tuple[Node, BindingContext]: # LOG: Exit - # remove the derived schema - context.schemas.pop("$derived", None) - - columns = [] + # clear the derived schema + context.schemas["derived"] = derived.schema() seen = set() needs_qualifier = any( @@ -304,16 +307,28 @@ def name_column(qualifier, column): return f"{qualifier}.{column.name}" return column.name + def keep_column(column, identities): + if len(node.columns) == 1 and node.columns[0].node_type == NodeType.WILDCARD: + return True + return column.identity in identities + + identities = [] + for column in (col for col in node.columns if col.node_type != NodeType.WILDCARD): + new_col, _ = inner_binder(column, context, node.identity) + identities.append(new_col.schema_column.identity) + + columns = [] for qualifier, schema in context.schemas.items(): for column in schema.columns: - column_reference = Node( - node_type=NodeType.IDENTIFIER, - name=column.name, - schema_column=column, - type=column.type, - query_column=name_column(qualifier, column), - ) - columns.append(column_reference) + if keep_column(column, identities): + column_reference = Node( + node_type=NodeType.IDENTIFIER, + name=column.name, + schema_column=column, + type=column.type, + query_column=name_column(qualifier, column), + ) + columns.append(column_reference) node.columns = columns @@ -448,7 +463,7 @@ def visit_project(self, node: Node, context: BindingContext) -> Tuple[Node, Bind columns = [] # Handle wildcards, including qualified wildcards. - for column in node.columns: + for column in node.columns + node.order_by_columns: if not column.node_type == NodeType.WILDCARD: columns.append(column) else: @@ -512,6 +527,7 @@ def visit_project(self, node: Node, context: BindingContext) -> Tuple[Node, Bind column.source_column = column.alias current_name = column.schema_column.name column.schema_column.name = column.alias + column.schema_column.aliases.append(column.qualified_name) context.schemas[relation].pop_column(current_name) context.schemas[relation].columns.append(column.schema_column) column.alias = None diff --git a/opteryx/components/logical_planner.py b/opteryx/components/logical_planner.py index ee77ba3de..b77d00944 100644 --- a/opteryx/components/logical_planner.py +++ b/opteryx/components/logical_planner.py @@ -131,7 +131,8 @@ def __str__(self): if node_type == LogicalPlanStepType.Order: return f"ORDER BY ({', '.join(format_expression(item[0]) + (' DESC' if not item[1] else '') for item in self.order_by)})" if node_type == LogicalPlanStepType.Project: - return f"PROJECT ({', '.join(format_expression(col) for col in self.columns)})" + order_by_indicator = " +" if self.order_by_columns else "" + return f"PROJECT ({', '.join(format_expression(col) for col in self.columns)}){order_by_indicator}" if node_type == LogicalPlanStepType.Scan: date_range = "" if self.start_date == self.end_date and self.start_date is not None: @@ -316,6 +317,8 @@ def inner_query_planner(ast_branch): ) for item in _order_by ] + if any(c[0].node_type == NodeType.LITERAL for c in _order_by): + raise UnsupportedSyntaxError("Cannot ORDER BY constant values") _order_by_columns_not_in_projection = [exp[0] for exp in _order_by] # projection @@ -345,7 +348,8 @@ def inner_query_planner(ast_branch): ] project_step = LogicalPlanNode(node_type=LogicalPlanStepType.Project) - project_step.columns = _projection + _order_by_columns_not_in_projection + project_step.columns = _projection + project_step.order_by_columns = _order_by_columns_not_in_projection previous_step_id, step_id = step_id, random_string() inner_plan.add_node(step_id, project_step) if previous_step_id is not None: @@ -384,15 +388,6 @@ def inner_query_planner(ast_branch): if previous_step_id is not None: inner_plan.add_edge(previous_step_id, step_id) - # do we need to project again? - if len(_order_by_columns_not_in_projection) > 0: - project_step = LogicalPlanNode(node_type=LogicalPlanStepType.Project) - project_step.columns = _projection - previous_step_id, step_id = step_id, random_string() - inner_plan.add_node(step_id, project_step) - if previous_step_id is not None: - inner_plan.add_edge(previous_step_id, step_id) - # limit/offset _limit = ast_branch.get("limit") _offset = ast_branch.get("offset") diff --git a/opteryx/operators/morsel_defragment_node.py b/opteryx/operators/morsel_defragment_node.py index a96c51e6d..cc98f0d0d 100644 --- a/opteryx/operators/morsel_defragment_node.py +++ b/opteryx/operators/morsel_defragment_node.py @@ -120,3 +120,86 @@ def execute(self) -> Iterable: if collected_rows: row_counter += collected_rows.num_rows yield collected_rows + + +""" +untested cython implementation + + +# morsel_defragment_node.pyx + +from cython import boundscheck, wraparound, cdivision +import time +from typing import Iterable +import pyarrow +from opteryx.operators import BasePlanNode + +cdef int MORSEL_SIZE_BYTES = 64 * 1024 * 1024 # 64Mb +cdef int MORSEL_SIZE_COUNT = 500000 # hard record count limit, half a million +cdef float HIGH_WATER = 1.99 # Split morsels over 199% of MORSEL_SIZE +cdef float LOW_WATER = 0.75 # Merge morsels under 75% of MORSEL_SIZE + +@boundscheck(False) +@wraparound(False) +@cdivision(True) +class MorselDefragmentNode(BasePlanNode): + @property + def name(self): # pragma: no cover + return "Morsel Defragment" + + @property + def config(self): # pragma: no cover + return "" + + cpdef execute(self) -> Iterable: + cdef long start + cdef int row_counter = 0, morsel_bytes, morsel_records, new_row_count + cdef float average_record_size + cdef object morsels = self._producers[0] # type:ignore + cdef object collected_rows = None + cdef bool at_least_one_morsel = False + + if not self.properties.enable_morsel_defragmentation: + yield from morsels + return + + for morsel in morsels.execute(): + if morsel.num_rows > 0: + start = time.monotonic_ns() + if collected_rows: + self.statistics.chunk_merges += 1 + morsel = pyarrow.concat_tables([collected_rows, morsel], promote=True) + collected_rows = None + self.statistics.time_defragmenting += time.monotonic_ns() - start + + morsel_bytes = morsel.nbytes + morsel_records = morsel.num_rows + + if (morsel_bytes > (MORSEL_SIZE_BYTES * HIGH_WATER)) or (morsel_records > MORSEL_SIZE_COUNT): + start = time.monotonic_ns() + average_record_size = morsel_bytes / morsel_records + new_row_count = min( + int(MORSEL_SIZE_BYTES / average_record_size), MORSEL_SIZE_COUNT + ) + row_counter += new_row_count + self.statistics.chunk_splits += 1 + new_morsel = morsel.slice(offset=0, length=new_row_count) + at_least_one_morsel = True + collected_rows = morsel.slice(offset=new_row_count) + self.statistics.time_defragmenting += time.monotonic_ns() - start + yield new_morsel + elif morsel_bytes < (MORSEL_SIZE_BYTES * LOW_WATER): + collected_rows = morsel + else: + row_counter += morsel_records + yield morsel + at_least_one_morsel = True + elif not at_least_one_morsel: + yield morsel + at_least_one_morsel = True + + if collected_rows: + row_counter += collected_rows.num_rows + yield collected_rows + +""" diff --git a/opteryx/operators/projection_node.py b/opteryx/operators/projection_node.py index 378cd4367..78a64bada 100644 --- a/opteryx/operators/projection_node.py +++ b/opteryx/operators/projection_node.py @@ -34,7 +34,7 @@ def __init__(self, properties: QueryProperties, **config): """ super().__init__(properties=properties) - projection = config["projection"] + projection = config["projection"] + config.get("order_by_columns", []) self.projection = [] for column in projection: diff --git a/tests/sql_battery/test_shapes_and_errors_battery.py b/tests/sql_battery/test_shapes_and_errors_battery.py index 1aec12853..5cfbf4a56 100644 --- a/tests/sql_battery/test_shapes_and_errors_battery.py +++ b/tests/sql_battery/test_shapes_and_errors_battery.py @@ -342,11 +342,6 @@ ("SELECT * FROM $satellites LIMIT 50 OFFSET 150", 27, 8, None), ("SELECT * FROM $satellites LIMIT 50 OFFSET 170", 7, 8, None), ("SELECT * FROM $satellites ORDER BY name", 177, 8, None), - ("SELECT * FROM $satellites ORDER BY 1", 177, 8, None), - ("SELECT * FROM $satellites ORDER BY 1 DESC", 177, 8, None), - ("SELECT * FROM $satellites ORDER BY 2", 177, 8, None), - ("SELECT * FROM $satellites ORDER BY 1, 2", 177, 8, None), - ("SELECT * FROM $satellites ORDER BY 1 ASC", 177, 8, None), ("SELECT * FROM $satellites ORDER BY RANDOM()", 177, 8, None), ("SELECT MAX(planetId) FROM $satellites", 1, 1, None),