Skip to content

Commit

Permalink
Merge pull request #2143 from mabel-dev/HOUSEKEEPING/13
Browse files Browse the repository at this point in the history
HOUSEKEEPING
  • Loading branch information
joocer authored Dec 23, 2024
2 parents 3421427 + 436ee22 commit 9d53968
Show file tree
Hide file tree
Showing 31 changed files with 176 additions and 202 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 910
__build__ = 912

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
3 changes: 0 additions & 3 deletions opteryx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,6 @@ def get(key: str, default: Optional[typing.Any] = None) -> Optional[typing.Any]:
ENABLE_RESOURCE_LOGGING: bool = bool(get("ENABLE_RESOURCE_LOGGING", False))
# size of morsels to push between steps
MORSEL_SIZE: int = int(get("MORSEL_SIZE", 64 * 1024 * 1024))
# query log
QUERY_LOG_LOCATION:str = get("QUERY_LOG_LOCATION", False)
QUERY_LOG_SIZE:int = int(get("QUERY_LOG_SIZE", 100))
# not GA
PROFILE_LOCATION:str = get("PROFILE_LOCATION")
# fmt:on
10 changes: 0 additions & 10 deletions opteryx/connectors/aws_s3_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ def __init__(self, credentials=None, **kwargs):
self.minio = Minio(end_point, access_key, secret_key, secure=secure)
self.dataset = self.dataset.replace(".", OS_SEP)

# we're going to cache the first blob as the schema and dataset reader
# sometimes both start here
self.cached_first_blob = None

@single_item_cache
def get_list_of_blob_names(self, *, prefix: str) -> List[str]:
bucket, object_path, _, _ = paths.get_parts(prefix)
Expand All @@ -94,12 +90,6 @@ def read_dataset(
prefix=self.dataset,
)

# Check if the first blob was cached earlier
if self.cached_first_blob is not None:
yield self.cached_first_blob # Use cached blob
blob_names = blob_names[1:] # Skip first blob
self.cached_first_blob = None

for blob_name in blob_names:
try:
decoder = get_decoder(blob_name)
Expand Down
2 changes: 1 addition & 1 deletion opteryx/connectors/base/base_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def __next__(self) -> pyarrow.Table: # pragma: no cover
"""
raise NotImplementedError("Subclasses must implement __next__ method.")

def close(self) -> None:
def close(self) -> None: # pragma: no cover
"""
Close the reader and release any resources.
"""
Expand Down
8 changes: 4 additions & 4 deletions opteryx/connectors/capabilities/cacheable.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
remote_cache.touch(key) # help the remote cache track LRU
statistics.bufferpool_hits += 1
read_buffer_ref = await pool.commit(payload) # type: ignore
while read_buffer_ref is None:
while read_buffer_ref is None: # pragma: no cover
await asyncio.sleep(0.1)
statistics.stalls_writing_to_read_buffer += 1
read_buffer_ref = await pool.commit(payload) # type: ignore
Expand All @@ -103,7 +103,7 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
statistics.remote_cache_hits += 1
system_statistics.remote_cache_reads += 1
read_buffer_ref = await pool.commit(payload) # type: ignore
while read_buffer_ref is None:
while read_buffer_ref is None: # pragma: no cover
await asyncio.sleep(0.1)
statistics.stalls_writing_to_read_buffer += 1
read_buffer_ref = await pool.commit(payload) # type: ignore
Expand All @@ -119,7 +119,7 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
statistics.cache_misses += 1
system_statistics.origin_reads += 1
return read_buffer_ref
except Exception as e:
except Exception as e: # pragma: no cover
print(f"Error in {func.__name__}: {e}")
raise # Optionally re-raise the error after logging it

Expand All @@ -136,7 +136,7 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
):
# if we didn't get it from the buffer pool (origin or remote cache) we add it
evicted = buffer_pool.set(key, payload)
if evicted:
if evicted: # pragma: no cover
# if we're evicting items we just put in the cache, stop
if evicted in my_keys:
evictions_remaining = 0
Expand Down
24 changes: 13 additions & 11 deletions opteryx/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,9 @@
from opteryx.exceptions import SqlError
from opteryx.exceptions import UnsupportedSyntaxError
from opteryx.models import QueryStatistics
from opteryx.shared.rolling_log import RollingLog
from opteryx.utils import sql

PROFILE_LOCATION = config.PROFILE_LOCATION
QUERY_LOG_LOCATION = config.QUERY_LOG_LOCATION
QUERY_LOG_SIZE = config.QUERY_LOG_SIZE


ROLLING_LOG = None
if QUERY_LOG_LOCATION:
ROLLING_LOG = RollingLog(QUERY_LOG_LOCATION, max_entries=QUERY_LOG_SIZE)


class CursorState(Enum):
Expand Down Expand Up @@ -191,9 +183,6 @@ def _inner_execute(
except RuntimeError as err: # pragma: no cover
raise SqlError(f"Error Executing SQL Statement ({err})") from err

if ROLLING_LOG:
ROLLING_LOG.append(operation)

results = execute(plan, statistics=self._statistics)
start = time.time_ns()

Expand Down Expand Up @@ -337,6 +326,19 @@ def execute_to_arrow(
results = self._execute_statements(operation, params, visibility_filters)
if results is not None:
result_data, self._result_type = results

if self._result_type == ResultType.NON_TABULAR:
import orso

meta_dataframe = orso.DataFrame(
rows=[(result_data.record_count,)], # type: ignore
schema=RelationSchema(
name="table",
columns=[FlatColumn(name="rows_affected", type=OrsoTypes.INTEGER)],
),
) # type: ignore
return meta_dataframe.arrow()

if limit is not None:
result_data = utils.arrow.limit_records(result_data, limit) # type: ignore
if isinstance(result_data, pyarrow.Table):
Expand Down
2 changes: 1 addition & 1 deletion opteryx/managers/cache/memcached.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _memcached_server(**kwargs):

try:
from pymemcache.client import base
except ImportError as err:
except ImportError as err: # pragma: no cover
raise MissingDependencyError(err.name) from err

try:
Expand Down
6 changes: 3 additions & 3 deletions opteryx/managers/cache/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def _redis_server(**kwargs):

try:
import redis
except ImportError as err:
except ImportError as err: # pragma: no cover
raise MissingDependencyError(err.name) from err

return redis.from_url(redis_config)
Expand Down Expand Up @@ -80,7 +80,7 @@ def get(self, key: bytes) -> Union[bytes, None]:
if response:
self.hits += 1
return bytes(response)
except Exception as err:
except Exception as err: # pragma: no cover
self._consecutive_failures += 1
if self._consecutive_failures >= MAXIMUM_CONSECUTIVE_FAILURES:
import datetime
Expand All @@ -99,7 +99,7 @@ def set(self, key: bytes, value: bytes) -> None:
try:
self._server.set(key, value)
self.sets += 1
except Exception as err:
except Exception as err: # pragma: no cover
# if we fail to set, stop trying
self._consecutive_failures = MAXIMUM_CONSECUTIVE_FAILURES
self.errors += 1
Expand Down
6 changes: 3 additions & 3 deletions opteryx/managers/cache/valkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def _valkey_server(**kwargs):

try:
import valkey # Assuming `valkey` is the client library's name
except ImportError as err:
except ImportError as err: # pragma: no cover
raise MissingDependencyError(err.name) from err

return valkey.from_url(valkey_config) # Example instantiation of the client
Expand Down Expand Up @@ -70,7 +70,7 @@ def get(self, key: bytes) -> Union[bytes, None]:
if response:
self.hits += 1
return bytes(response)
except Exception as err:
except Exception as err: # pragma: no cover
self._consecutive_failures += 1
if self._consecutive_failures >= MAXIMUM_CONSECUTIVE_FAILURES:
import datetime
Expand All @@ -89,7 +89,7 @@ def set(self, key: bytes, value: bytes) -> None:
try:
self._server.set(key, value) # Adjust based on Valkey's API
self.sets += 1
except Exception as err:
except Exception as err: # pragma: no cover
# if we fail to set, stop trying
self._consecutive_failures = MAXIMUM_CONSECUTIVE_FAILURES
self.errors += 1
Expand Down
2 changes: 1 addition & 1 deletion opteryx/managers/catalog/tarchia_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def is_valid_url(url: str) -> bool:
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except ValueError:
except ValueError: # pragma: no cover
return False


Expand Down
13 changes: 0 additions & 13 deletions opteryx/models/logical_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,3 @@ def copy(self):

def __repr__(self) -> str:
return f"<LogicalColumn name: '{self.current_name}' fullname: '{self.qualified_name}'>"

def to_dict(self) -> dict:
from opteryx.utils import dataclass_to_dict

return {
"class": "LogicalColumn",
"node_type": self.node_type.name,
"source_column": self.source_column,
"source_connector": self.source_connector,
"source": self.source,
"alias": self.alias,
"schema_column": dataclass_to_dict(self.schema_column),
}
7 changes: 0 additions & 7 deletions opteryx/operators/base_plan_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,6 @@ def __init__(self, *, properties, **parameters):
self.records_out = 0
self.bytes_out = 0

def to_json(self) -> bytes: # pragma: no cover
import orjson

from opteryx.utils import dataclass_to_dict

return orjson.dumps(dataclass_to_dict(self.do))

@classmethod
def from_json(cls, json_obj: str) -> "BasePlanNode": # pragma: no cover
raise NotImplementedError()
Expand Down
2 changes: 1 addition & 1 deletion opteryx/planner/cost_based_optimizer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def do_cost_based_optimizer(plan: LogicalPlan, statistics: QueryStatistics) -> L
Returns:
LogicalPlan: The optimized logical plan.
"""
if DISABLE_OPTIMIZER:
if DISABLE_OPTIMIZER: # pragma: no cover
message = "[OPTERYX] The optimizer has been disabled, 'DISABLE_OPTIMIZER' variable is TRUE."
print(message)
statistics.add_message(message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def fold_constants(root: Node, statistics: QueryStatistics) -> Node:

if root.node_type == NodeType.COMPARISON_OPERATOR:
if (
root.value in ("Like", "Ilike")
root.value in ("Like", "ILike")
and root.left.node_type == NodeType.IDENTIFIER
and root.right.node_type == NodeType.LITERAL
and root.right.value == "%"
Expand Down
32 changes: 2 additions & 30 deletions opteryx/planner/logical_planner/logical_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class LogicalPlanNode(Node):
def copy(self) -> "Node":
return LogicalPlanNode(**super().copy().properties)

def __str__(self):
def __str__(self): # pragma: no cover
try:
# fmt:off
node_type = self.node_type
Expand Down Expand Up @@ -502,12 +502,6 @@ def inner_query_planner(ast_branch):
if previous_step_id is not None:
inner_plan.add_edge(previous_step_id, step_id)

if distinct_step.on and _projection[0].source_column == "FROM":
cols = ", ".join([format_expression(c) for c in distinct_step.on])
raise UnsupportedSyntaxError(
f"Did you mean 'SELECT DISTINCT ON ({cols}) {cols} FROM {_projection[0].alias};'?"
)

# order
if _order_by:
order_step = LogicalPlanNode(node_type=LogicalPlanStepType.Order)
Expand Down Expand Up @@ -794,28 +788,6 @@ def create_node_relation(relation):
return root_node, sub_plan


def analyze_query(statement) -> LogicalPlan:
root_node = "Analyze"
plan = LogicalPlan()

from_step = LogicalPlanNode(node_type=LogicalPlanStepType.Scan)
table = statement[root_node]["table_name"]
from_step.relation = ".".join(part["value"] for part in table)
from_step.alias = from_step.relation
from_step.start_date = table[0].get("start_date")
from_step.end_date = table[0].get("end_date")
step_id = random_string()
plan.add_node(step_id, from_step)

metadata_step = LogicalPlanNode(node_type=LogicalPlanStepType.MetadataWriter)
previous_step_id, step_id = step_id, random_string()
plan.add_node(step_id, metadata_step)
plan.add_edge(previous_step_id, step_id)

return plan
# write manifest


def plan_execute_query(statement) -> LogicalPlan:
import orjson

Expand Down Expand Up @@ -1112,7 +1084,7 @@ def plan_show_variables(statement):


QUERY_BUILDERS = {
"Analyze": analyze_query,
# "Analyze": analyze_query,
"Execute": plan_execute_query,
"Explain": plan_explain,
"Query": plan_query,
Expand Down
10 changes: 0 additions & 10 deletions opteryx/planner/physical_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,6 @@ def create_physical_plan(logical_plan, query_properties) -> PhysicalPlan:
raise Exception(f"something unexpected happed - {node_type.name}")
# fmt: on

# DEBUG: from opteryx.exceptions import InvalidInternalStateError
# DEBUG:
# DEBUG: try:
# DEBUG: config = node.to_json()
## DEBUG: print(config)
# DEBUG: except Exception as err:
# DEBUG: message = f"Internal Error - node '{node}' unable to be serialized"
# DEBUG: print(message)
## DEBUG: raise InvalidInternalStateError(message)

plan.add_node(nid, node)

for source, destination, relation in logical_plan.edges():
Expand Down
2 changes: 0 additions & 2 deletions opteryx/planner/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ def _load_views():
with open("views.json", "rb") as defs:
return orjson.loads(defs.read())
except Exception as err: # nosec
if not err:
pass
# DEBUG:: log (f"[OPTERYX] Unable to open views definition file. {err}")
return {}

Expand Down
9 changes: 1 addition & 8 deletions opteryx/shared/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,5 @@
from opteryx.shared.async_memory_pool import AsyncMemoryPool
from opteryx.shared.buffer_pool import BufferPool
from opteryx.shared.materialized_datasets import MaterializedDatasets
from opteryx.shared.rolling_log import RollingLog

__all__ = (
"AsyncMemoryPool",
"BufferPool",
"MaterializedDatasets",
"MemoryPool",
"RollingLog",
)
__all__ = ("AsyncMemoryPool", "BufferPool", "MaterializedDatasets", "MemoryPool")
Loading

0 comments on commit 9d53968

Please sign in to comment.