Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1162 (partial) #1164

Merged
merged 1 commit into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,4 @@ data/**
duckdb
version_2/**
planets.duckdb
charts.csv
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,14 @@ result.head()

_this example is complete and should run as-is_

#### Further Examples

For further examples check out the [Interactive labs](https://github.com/mabel-dev/labs) on [Binder](https://mybinder.org/v2/gh/mabel-dev/labs/HEAD).

## Community

[![Discord](https://img.shields.io/badge/discuss%20on-discord-5865F2.svg?logo=discord)](https://discord.gg/qpv2tr989x)
[![X Follow](https://img.shields.io/badge/follow%20on-x-1DA1F2.svg?logo=X)](https://twitter.com/OpteryxSQL)
[![X Follow](https://img.shields.io/badge/follow%20on-X-1DA1F2.svg?logo=X)](https://twitter.com/OpteryxSQL)

**How do I get Support?**

Expand All @@ -310,7 +314,7 @@ See the project [Security Policy](SECURITY.md) for information about reporting v
[![License](https://img.shields.io/badge/license-Apache%202.0-blue.svg)](https://github.com/mabel-dev/opteryx/blob/master/LICENSE)
[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Fmabel-dev%2Fopteryx.svg?type=shield)](https://app.fossa.com/projects/git%2Bgithub.com%2Fmabel-dev%2Fopteryx?ref=badge_shield)

Opteryx is licensed under [Apache 2.0](https://github.com/mabel-dev/opteryx/blob/master/LICENSE) except where specific module note otherwise.
Opteryx is licensed under [Apache 2.0](https://github.com/mabel-dev/opteryx/blob/master/LICENSE) except where specific modules note otherwise.

## Status

Expand All @@ -328,4 +332,4 @@ Opteryx is in beta. Beta means different things to different people, to us, bein

- **[orso](https://github.com/mabel-dev/orso)** DataFrame library
- **[mabel](https://github.com/mabel-dev/mabel)** Streaming data APIs
- **[mesos](https://github.com/mabel-dev/mesos)** MySQL connector for Opteryx
- **[mesos](https://github.com/mabel-dev/mesos)** MySQL connector for Opteryx
40 changes: 39 additions & 1 deletion opteryx/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
"""
A command line interface for Opteryx
"""
import os
import sys
import threading
import time

import typer
Expand All @@ -24,10 +27,32 @@
from opteryx.utils.sql import clean_statement
from opteryx.utils.sql import remove_comments

sys.path.insert(1, os.path.join(sys.path[0], ".."))


# Define ANSI color codes
ANSI_RED = "\u001b[31m"
ANSI_RESET = "\u001b[0m"


def print_dots(stop_event):
"""
Prints three dots repeatedly until the stop_event is set.
"""
while not stop_event.is_set():
print(".", end="", flush=True)
time.sleep(0.5)
if not stop_event.is_set():
print(".", end="", flush=True)
time.sleep(0.5)
if not stop_event.is_set():
print(".", end="", flush=True)
time.sleep(0.5)
if not stop_event.is_set():
print("\r \r", end="", flush=True)
time.sleep(0.5)


# fmt:off
def main(
o: str = typer.Option(default="console", help="Output location (ignored by REPL)", ),
Expand Down Expand Up @@ -72,22 +97,35 @@ def main(
print(" .help Show help text")
continue

# Create a stop event
stop_event = threading.Event()
# Create and start a thread to print dots
dot_thread = threading.Thread(target=print_dots, args=(stop_event,))
dot_thread.start()
try:
# Execute the SQL statement and display the results
start = time.monotonic_ns()
result = opteryx.query(statement)
result.materialize()
stop_event.set()
duration = time.monotonic_ns() - start
print("\r \r", end="", flush=True)
print(result.display(limit=-1, display_width=table_width, colorize=color, max_column_width=max_col_width))
if stats:
print(f"[ {result.rowcount} rows x {result.columncount} columns ] ( {duration/1e9} seconds )")
except MissingSqlStatement:
print("\r \r", end="", flush=True)
print(f"{ANSI_RED}Error{ANSI_RESET}: Expected SQL statement or dot command missing.")
print(" Enter '.help' for usage hints")
except Exception as e:
print("\r \r", end="", flush=True)
# Display a friendly error message if an exception occurs
print(f"{ANSI_RED}Error{ANSI_RESET}: {e}")

print(" Enter '.help' for usage hints")
finally:
# Stop the dot thread
stop_event.set()
dot_thread.join()
quit()

# tidy up the statement
Expand Down
58 changes: 0 additions & 58 deletions opteryx/components/ast_rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,63 +134,6 @@ def temporal_range_binder(ast, filters):
return ast


def rewrite_in_subquery(ast, path=None):
if path is None:
path = []

if isinstance(ast, dict):
for key, value in ast.items():
new_path = path + [key]
if key == "InSubquery":
subquery_alias = random_string()
subquery = value["subquery"]
identifier = value["expr"]["Identifier"]

join_structure = {
"relation": {
"Derived": {
"lateral": False,
"subquery": subquery,
"alias": {
"name": {"value": subquery_alias, "quote_style": None},
"columns": [],
},
}
},
"join_operator": {
"Inner": {
"On": {
"BinaryOp": {
"left": {"Identifier": identifier},
"op": "Eq",
"right": {
"CompoundIdentifier": [
{"value": subquery_alias, "quote_style": None},
{"value": "id", "quote_style": None},
]
},
}
}
}
},
}

# Navigate to the correct part of the AST where you want to insert the join
# Modify these keys to match your specific AST structure
target_location = ast["InSubquery"]["subquery"]["body"]["Select"]["from"][0]

# Check if the "joins" key exists and append the join_structure
if "joins" in target_location:
target_location["joins"].append(join_structure)
else:
target_location["joins"] = [join_structure]

rewrite_in_subquery(value, new_path)
elif isinstance(ast, list):
for index, value in enumerate(ast):
rewrite_in_subquery(value, path + [index])


def do_ast_rewriter(ast: list, temporal_filters: list, paramters: list, connection):
# get the query type
query_type = next(iter(ast))
Expand All @@ -207,6 +150,5 @@ def do_ast_rewriter(ast: list, temporal_filters: list, paramters: list, connecti
# Do some AST rewriting
# first eliminate WHERE x IN (subquery) queries
rewritten_query = with_parameters_exchanged
# rewritten_query = rewrite_in_subquery(with_parameters_exchanged)

return rewritten_query
15 changes: 4 additions & 11 deletions opteryx/components/binder/binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def create_variable_node(node: Node, context: Dict[str, Any]) -> Node:

schemas = context.schemas
found_source_relation = schemas.get(node.source)
if not found_source_relation and "$projection" in schemas:
found_source_relation = schemas["$projection"]
node.source = "$projection"

# Handle fully qualified fields
if node.source:
Expand Down Expand Up @@ -195,25 +198,15 @@ def inner_binder(node: Node, context: Dict[str, Any], step: str) -> Tuple[Node,
# has appeared earlier in the plan and in that case we don't need to recalcuate, we just
# need to treat the result like an IDENTIFIER
column_name = node.query_column or format_expression(node)
for schema_name, schema in context.schemas.items():
for schema in context.schemas.values():
found_column = schema.find_column(column_name)

# If the column exists in the schema, update node and context accordingly.
if found_column:
# Convert to a FLATCOLUMN (an EVALUATED identifier)
node.schema_column = found_column # .to_flatcolumn()
node.query_column = node.alias or column_name
"""
node.node_type = NodeType.EVALUATED
node.left, node.right, node.centre, node.parameters = None, None, None, None


# Remove the column from its original schema as it's already processed.
context.schemas[schema_name].pop_column(found_column.name)

# Add the schema column to a special "$derived" schema for later use.
context.schemas["$derived"].columns.append(node.schema_column)
"""
return node, context

schemas = context.schemas
Expand Down
54 changes: 29 additions & 25 deletions opteryx/components/binder/binder_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,30 +260,27 @@ def visit_aggregate_and_group(

def visit_exit(self, node: Node, context: BindingContext) -> Tuple[Node, BindingContext]:
# LOG: Exit

def get_name(column):
# if column.aliases:
# return max(column.aliases, key=len)
return column.name

columns = []
schemas = context.schemas

for column in node.columns:
if column.node_type == NodeType.WILDCARD:
qualified_wildcard = column.value
for schema in schemas:
if qualified_wildcard and schema not in qualified_wildcard:
continue
if schema == "$derived":
continue
for column in schemas[schema].columns:
column_reference = Node(
node_type=NodeType.IDENTIFIER,
name=column.name,
schema_column=column,
type=column.type,
query_column=f"{schema}.{column.name}",
)
columns.append(column_reference)
else:
bound_column, bound_context = inner_binder(column, context, node.identity)
context.schemas = merge_schemas(bound_context.schemas, context.schemas)
columns.append(bound_column)
# remove the derived schema
context.schemas.pop("$derived", None)

for schema in context.schemas.values():
for column in schema.columns:
column_reference = Node(
node_type=NodeType.IDENTIFIER,
name=column.name,
schema_column=column,
type=column.type,
query_column=get_name(column),
)
columns.append(column_reference)

node.columns = columns

Expand Down Expand Up @@ -419,14 +416,21 @@ def visit_project(self, node: Node, context: BindingContext) -> Tuple[Node, Bind
)
context.schemas = merge_schemas(*[ctx.schemas for ctx in group_contexts])

columns = []
for column in node.columns:
column.fqn = f"{column.source}.{column.source_column}"
column.schema_column.aliases = [column.fqn, column.source_column]
column.source = "$projection"
column.value = column.alias or column.value
column.alias = None
column.value = (
column.alias or column.value
) # <- this is using the source name not the alias name
# column.alias = None
# create the schema of the resultant dataset
column.source_column = column.query_column
column.schema_column = column.schema_column.to_flatcolumn()
column.schema_column.aliases = []
columns.append(column)

node.columns = columns
# Construct the RelationSchema with new FlatColumn instances
schema = RelationSchema(
name="$projection", columns=[col.schema_column for col in node.columns]
Expand Down
2 changes: 1 addition & 1 deletion opteryx/components/temporary_physical_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def create_physical_plan(logical_plan, query_properties):
elif node_type == LogicalPlanStepType.Distinct:
node = operators.DistinctNode(query_properties, **node_config)
elif node_type == LogicalPlanStepType.Exit:
node = operators.ExitNode(query_properties, projection=logical_node.columns)
node = operators.ExitNode(query_properties, **node_config)
elif node_type == LogicalPlanStepType.Explain:
node = operators.ExplainNode(query_properties, **node_config)
elif node_type == LogicalPlanStepType.Filter:
Expand Down
3 changes: 2 additions & 1 deletion opteryx/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"""

from typing import Optional
from typing import Union


# ======================== Begin Codebase Errors ========================
Expand Down Expand Up @@ -197,7 +198,7 @@ def __init__(self, variable: str, suggestion: Optional[str] = None):
class AmbiguousIdentifierError(SqlError):
"""Exception raised for ambiguous identifier references."""

def __init__(self, identifier: str, message: str = None):
def __init__(self, identifier: Optional[str] = None, message: Optional[str] = None):
self.identifier = identifier
if message is None:
message = f"Identifier reference '{identifier}' is ambiguous; Try adding the databaset name as a prefix e.g. 'dataset.{identifier}'."
Expand Down
29 changes: 25 additions & 4 deletions opteryx/models/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,32 @@ def copy(self) -> "Node":
Returns:
Node: The new, independent deep copy.
"""

def _inner_copy(obj: Any) -> Any:
"""
Create an independent inner copy of the given object.

Parameters:
obj: Any
The object to be deep copied.

Returns:
Any: The new, independent deep copy.
"""
if isinstance(obj, list):
return [_inner_copy(item) for item in obj]
if isinstance(obj, tuple):
return tuple(_inner_copy(item) for item in obj)
if isinstance(obj, set):
return {_inner_copy(item) for item in obj}
if isinstance(obj, dict):
return {key: _inner_copy(value) for key, value in obj.items()}
if hasattr(obj, "copy"):
return obj.copy()
return copy.deepcopy(obj)

new_node = Node()
for key, value in self._internal.items():
if hasattr(value, "copy"):
new_value = value.copy()
else:
new_value = copy.deepcopy(value)
new_value = _inner_copy(value)
setattr(new_node, key, new_value)
return new_node
8 changes: 4 additions & 4 deletions opteryx/operators/exit_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
class ExitNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **config):
super().__init__(properties=properties)
self.columns = config["projection"]
self.columns = config.get("columns", [])

@property
def config(self): # pragma: no cover
Expand All @@ -51,15 +51,15 @@ def execute(self) -> Iterable:
final_names = []
for column in self.columns:
final_columns.append(column.schema_column.identity)
final_names.append(column.schema_column.name)
final_names.append(column.query_column)

if len(final_columns) != len(set(final_columns)):
from collections import Counter

duplicates = [column for column, count in Counter(final_columns).items() if count > 1]
matches = (a for a, b in zip(final_names, final_columns) if b in duplicates)
matches = {a for a, b in zip(final_names, final_columns) if b in duplicates}
raise AmbiguousIdentifierError(
message=f"Query result contains multiple instances of the same column - {', '.join(matches)}"
message=f"Query result contains multiple instances of the same column(s) - `{'`, `'.join(matches)}`"
)

for morsel in morsels.execute():
Expand Down
Loading
Loading