Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Dec 24, 2024
1 parent e73533a commit 72831ba
Show file tree
Hide file tree
Showing 8 changed files with 4 additions and 90 deletions.
1 change: 0 additions & 1 deletion opteryx/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
# Distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND.


from .base_plan_node import BasePlanDataObject # isort: skip
from .base_plan_node import BasePlanNode, JoinNode # isort: skip

from .aggregate_and_group_node import AggregateAndGroupNode # Group is always followed by aggregate
Expand Down
16 changes: 0 additions & 16 deletions opteryx/operators/aggregate_and_group_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
"""

from dataclasses import dataclass

import numpy
import pyarrow
from orso.types import OrsoTypes
Expand All @@ -28,22 +26,10 @@
from opteryx.operators.aggregate_node import build_aggregations
from opteryx.operators.aggregate_node import extract_evaluations
from opteryx.operators.aggregate_node import project
from opteryx.operators.base_plan_node import BasePlanDataObject

from . import BasePlanNode


@dataclass
class AggregateAndGroupDataObject(BasePlanDataObject):
groups: list = None
aggregates: list = None
all_identifiers: list = None
evaluatable_nodes: list = None
group_by_columns: list = None
column_map: list = None
aggregate_functions: list = None


class AggregateAndGroupNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **parameters):
BasePlanNode.__init__(self, properties=properties, **parameters)
Expand Down Expand Up @@ -79,8 +65,6 @@ def __init__(self, properties: QueryProperties, **parameters):
self.group_by_columns = list({node.schema_column.identity for node in self.groups})
self.column_map, self.aggregate_functions = build_aggregations(self.aggregates)

self.do = AggregateAndGroupDataObject()

self.buffer = []

@classmethod
Expand Down
17 changes: 4 additions & 13 deletions opteryx/operators/aggregate_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
This node performs aggregates without performing groupings.
"""

from dataclasses import dataclass

import numpy
import pyarrow

Expand All @@ -22,7 +20,6 @@
from opteryx.managers.expression import evaluate_and_append
from opteryx.managers.expression import get_all_nodes_of_type
from opteryx.models import QueryProperties
from opteryx.operators.base_plan_node import BasePlanDataObject

from . import BasePlanNode

Expand Down Expand Up @@ -166,16 +163,11 @@ def extract_evaluations(aggregates):
if len(aggregators) == 0:
evaluatable_nodes.append(node)

return evaluatable_nodes

literal_count = len([n for n in evaluatable_nodes if n.node_type == NodeType.LITERAL])
if literal_count > 0 and literal_count < len(evaluatable_nodes):
evaluatable_nodes = [n for n in evaluatable_nodes if n.node_type != NodeType.LITERAL]

@dataclass
class AggregateDataObject(BasePlanDataObject):
aggregates: list = None
all_identifiers: list = None
evaluatable_nodes: list = None
column_map: list = None
aggregate_functions: list = None
return evaluatable_nodes


class AggregateNode(BasePlanNode):
Expand All @@ -196,7 +188,6 @@ def __init__(self, properties: QueryProperties, **parameters):

self.column_map, self.aggregate_functions = build_aggregations(self.aggregates)

self.do = AggregateDataObject()
self.buffer = []

@classmethod
Expand Down
8 changes: 0 additions & 8 deletions opteryx/operators/async_read_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import queue
import threading
import time
from dataclasses import dataclass
from typing import Generator

import aiohttp
Expand All @@ -28,7 +27,6 @@
from opteryx import config
from opteryx.exceptions import DataError
from opteryx.models import QueryProperties
from opteryx.operators.base_plan_node import BasePlanDataObject
from opteryx.shared import AsyncMemoryPool
from opteryx.shared import MemoryPool
from opteryx.utils.file_decoders import get_decoder
Expand Down Expand Up @@ -61,17 +59,11 @@ async def fetch_and_process(blob_name):
await session.close()


@dataclass
class AsyncReaderDataObject(BasePlanDataObject):
pass


class AsyncReaderNode(ReaderNode):
def __init__(self, properties: QueryProperties, **parameters):
ReaderNode.__init__(self, properties=properties, **parameters)
self.pool = MemoryPool(MAX_READ_BUFFER_CAPACITY, f"ReadBuffer <{self.parameters['alias']}>")

self.do = AsyncReaderDataObject()
self.predicates = parameters.get("predicates")

@classmethod
Expand Down
18 changes: 0 additions & 18 deletions opteryx/operators/base_plan_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,11 @@


import time
from dataclasses import dataclass
from typing import Optional

import pyarrow
from orso.tools import random_string

from opteryx import EOS


@dataclass
class BasePlanDataObject:
operation: Optional[str] = None
query_id: str = None
identity: str = None

def __post_init__(self):
# Perform actions after initialization
if self.identity is None:
self.identity = random_string()
if self.operation is None:
self.operation = self.__class__.__name__.replace("DataObject", "Node")


class BasePlanNode:
is_join: bool = False
Expand All @@ -47,7 +30,6 @@ def __init__(self, *, properties, **parameters):
self.parameters = parameters
self.execution_time = 0
self.identity = random_string()
self.do: Optional[BasePlanDataObject] = None
self.calls = 0
self.records_in = 0
self.bytes_in = 0
Expand Down
11 changes: 0 additions & 11 deletions opteryx/operators/cross_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
here rather than calling the join() functions
"""

from dataclasses import dataclass
from typing import Generator
from typing import Set
from typing import Tuple
Expand All @@ -26,7 +25,6 @@
from opteryx.managers.expression import NodeType
from opteryx.models import LogicalColumn
from opteryx.models import QueryProperties
from opteryx.operators.base_plan_node import BasePlanDataObject

from . import JoinNode

Expand Down Expand Up @@ -250,15 +248,6 @@ def _chunker(seq_1, seq_2, size):
)


@dataclass
class CrossJoinDataObject(BasePlanDataObject):
source: str = None
_unnest_column: str = None
_unnest_target: str = None
_filters: str = None
_distinct: bool = False


class CrossJoinNode(JoinNode):
"""
Implements a SQL CROSS JOIN
Expand Down
13 changes: 0 additions & 13 deletions opteryx/operators/exit_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,21 @@
This node doesn't do any calculations, it is a pure Projection.
"""

from dataclasses import dataclass
from dataclasses import field
from typing import List

from pyarrow import Table

from opteryx import EOS
from opteryx.exceptions import AmbiguousIdentifierError
from opteryx.exceptions import InvalidInternalStateError
from opteryx.models import LogicalColumn
from opteryx.models import QueryProperties
from opteryx.operators.base_plan_node import BasePlanDataObject

from . import BasePlanNode


@dataclass
class ExitDataObject(BasePlanDataObject):
columns: List[LogicalColumn] = field(default_factory=list)


class ExitNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **parameters):
BasePlanNode.__init__(self, properties=properties, **parameters)
self.columns = parameters.get("columns", [])

self.do = ExitDataObject(columns=self.columns)

@classmethod
def from_json(cls, json_obj: str) -> "BasePlanNode": # pragma: no cover
raise NotImplementedError()
Expand Down
10 changes: 0 additions & 10 deletions opteryx/operators/heap_sort_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
sorting smaller chunks over and over again.
"""

from dataclasses import dataclass

import numpy
import pyarrow
import pyarrow.compute
Expand All @@ -27,24 +25,16 @@
from opteryx import EOS
from opteryx.exceptions import ColumnNotFoundError
from opteryx.models import QueryProperties
from opteryx.operators.base_plan_node import BasePlanDataObject

from . import BasePlanNode


@dataclass
class HeapSortDataObject(BasePlanDataObject):
order_by: list = None
limit: int = -1


class HeapSortNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **parameters):
BasePlanNode.__init__(self, properties=properties, **parameters)
self.order_by = parameters.get("order_by", [])
self.limit: int = parameters.get("limit", -1)

self.do = HeapSortDataObject(order_by=self.order_by, limit=self.limit)
self.mapped_order = []
self.table = None

Expand Down

0 comments on commit 72831ba

Please sign in to comment.