Skip to content

Commit

Permalink
Improve IbisExecutionEngine, refactor ExecutionEngine, fix bugs (#416)
Browse files Browse the repository at this point in the history
* Improve IbisExecutionEngine

* update

* lint

* Update ray configs

* update

* Add is_distributed

* make ibis handle timestamp

* refactor execution engine

* update import

* add plot extension

* fix tests

* fix tests

* update dependency

* make fsql dialect configurable

* add dialect

* fix tests, update dataframe interface

* use more df.columns instead of df.schema

* ibis handle unsupported types

* update

* update

* yield table

* update

* update

* fix seed for fsql

* refactor ibis engine

* update
  • Loading branch information
goodwanghan authored Feb 1, 2023
1 parent 33548b6 commit 8c04e9f
Show file tree
Hide file tree
Showing 57 changed files with 1,626 additions and 523 deletions.
4 changes: 2 additions & 2 deletions fugue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from fugue.bag.bag import Bag, BagDisplay
from fugue.collections.partition import PartitionCursor, PartitionSpec
from fugue.collections.sql import StructuredRawSQL, TempTableName
from fugue.collections.yielded import Yielded, YieldedFile
from fugue.collections.yielded import Yielded, PhysicalYielded
from fugue.constants import register_global_conf
from fugue.dataframe.array_dataframe import ArrayDataFrame
from fugue.dataframe.arrow_dataframe import ArrowDataFrame
Expand All @@ -33,7 +33,7 @@
from fugue.execution.execution_engine import (
AnyExecutionEngine,
ExecutionEngine,
ExecutionEngineFacet,
EngineFacet,
MapEngine,
SQLEngine,
)
Expand Down
2 changes: 1 addition & 1 deletion fugue/collections/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
PartitionCursor,
PartitionSpec,
)
from fugue.collections.yielded import Yielded, YieldedFile
from fugue.collections.yielded import Yielded, PhysicalYielded
25 changes: 23 additions & 2 deletions fugue/collections/sql.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from logging import Logger
from typing import Any, Callable, Dict, Iterable, Optional, Tuple, Union
from uuid import uuid4

from triad import to_uuid

from fugue._utils.registry import fugue_plugin
import sqlglot

_TEMP_TABLE_EXPR_PREFIX = "<tmpdf:"
_TEMP_TABLE_EXPR_SUFFIX = ">"
Expand Down Expand Up @@ -31,7 +33,14 @@ def transpile_sql(
:param to_dialect: the expected dialect.
:return: the transpiled SQL
"""
return raw # pragma: no cover
if (
from_dialect is not None
and to_dialect is not None
and from_dialect != to_dialect
):
return " ".join(sqlglot.transpile(raw, read=from_dialect, write=to_dialect))
else:
return raw


class StructuredRawSQL:
Expand Down Expand Up @@ -66,13 +75,15 @@ def construct(
self,
name_map: Union[None, Callable[[str], str], Dict[str, str]] = None,
dialect: Optional[str] = None,
log: Optional[Logger] = None,
):
"""Construct the final SQL given the ``dialect``
:param name_map: the name map from the original statement to
the expected names, defaults to None. It can be a function or a
dictionary
:param dialect: the expected dialect, defaults to None
:param log: the logger to log information, defaults to None
:return: the final SQL string
"""
nm: Any = (
Expand All @@ -88,7 +99,17 @@ def construct(
and dialect is not None
and self._dialect != dialect
):
return transpile_sql(raw_sql, self._dialect, dialect)
tsql = transpile_sql(raw_sql, self._dialect, dialect)
if log is not None:
log.debug(
"SQL transpiled from %s to %s\n\n"
"Original:\n\n%s\n\nTranspiled:\n\n%s\n",
self._dialect,
dialect,
raw_sql,
tsql,
)
return tsql
return raw_sql

@staticmethod
Expand Down
40 changes: 26 additions & 14 deletions fugue/collections/yielded.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from triad.utils.assertion import assert_or_throw
from triad.utils.hash import to_uuid
from typing import Any

from triad import assert_or_throw
from triad.utils.hash import to_uuid


class Yielded(object):
"""Yields from :class:`~fugue.workflow.workflow.FugueWorkflow`.
Expand Down Expand Up @@ -33,30 +34,41 @@ def __deepcopy__(self, memo: Any) -> Any: # pragma: no cover
return self


class YieldedFile(Yielded):
"""Yielded file from :class:`~fugue.workflow.workflow.FugueWorkflow`.
class PhysicalYielded(Yielded):
"""Physical yielded object from :class:`~fugue.workflow.workflow.FugueWorkflow`.
Users shouldn't create this object directly.
:param yid: unique id for determinism
:param storage_type: ``file`` or ``table``
"""

def __init__(self, yid: str):
def __init__(self, yid: str, storage_type: str):
super().__init__(yid)
self._path = ""
self._name = ""
assert_or_throw(
storage_type in ["file", "table"],
ValueError(f"{storage_type} not in (file, table) "),
)
self._storage_type = storage_type

@property
def is_set(self) -> bool:
return self._path != ""
return self._name != ""

def set_value(self, path: str) -> None:
"""Set the yielded path after compute
def set_value(self, name: str) -> None:
"""Set the storage name after compute
:param path: file path
:param name: name reference of the storage
"""
self._path = path
self._name = name

@property
def path(self) -> str:
"""File path of the yield"""
def name(self) -> str:
"""The name reference of the yield"""
assert_or_throw(self.is_set, "value is not set")
return self._path
return self._name

@property
def storage_type(self) -> str:
"""The storage type of this yield"""
return self._storage_type
5 changes: 4 additions & 1 deletion fugue/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
KEYWORD_ROWCOUNT = "ROWCOUNT"
KEYWORD_CORECOUNT = "CORECOUNT"

FUGUE_SQL_DIALECT = "fugue"
FUGUE_SQL_DEFAULT_DIALECT = "spark"

FUGUE_CONF_WORKFLOW_CONCURRENCY = "fugue.workflow.concurrency"
FUGUE_CONF_WORKFLOW_CHECKPOINT_PATH = "fugue.workflow.checkpoint.path"
Expand All @@ -15,6 +15,7 @@
FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT = "fugue.workflow.exception.inject"
FUGUE_CONF_WORKFLOW_EXCEPTION_OPTIMIZE = "fugue.workflow.exception.optimize"
FUGUE_CONF_SQL_IGNORE_CASE = "fugue.sql.compile.ignore_case"
FUGUE_CONF_SQL_DIALECT = "fugue.sql.compile.dialect"
FUGUE_CONF_DEFAULT_PARTITIONS = "fugue.default.partitions"

FUGUE_COMPILE_TIME_CONFIGS = set(
Expand All @@ -25,6 +26,7 @@
FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT,
FUGUE_CONF_WORKFLOW_EXCEPTION_OPTIMIZE,
FUGUE_CONF_SQL_IGNORE_CASE,
FUGUE_CONF_SQL_DIALECT,
]
)

Expand All @@ -38,6 +40,7 @@
FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT: 3,
FUGUE_CONF_WORKFLOW_EXCEPTION_OPTIMIZE: True,
FUGUE_CONF_SQL_IGNORE_CASE: False,
FUGUE_CONF_SQL_DIALECT: FUGUE_SQL_DEFAULT_DIALECT,
FUGUE_CONF_DEFAULT_PARTITIONS: -1,
}
)
Expand Down
6 changes: 4 additions & 2 deletions fugue/dataframe/arrow_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ def _select_cols(self, keys: List[Any]) -> DataFrame:

def rename(self, columns: Dict[str, str]) -> DataFrame:
try:
new_cols = self.schema.rename(columns).names
cols = dict(columns)
new_cols = [cols.pop(c, c) for c in self.columns]
assert_or_throw(len(cols) == 0)
except Exception as e:
raise FugueDataFrameOperationError from e
return ArrowDataFrame(self.native.rename_columns(new_cols))
Expand Down Expand Up @@ -236,7 +238,7 @@ def as_array_iterable(
yield x
else:
d = self.native.to_pydict()
cols = [d[n] for n in self.schema.names]
cols = [d[n] for n in self.columns]
for arr in zip(*cols):
yield list(arr)

Expand Down
18 changes: 14 additions & 4 deletions fugue/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, schema: Any = None):
@property
def schema(self) -> Schema:
"""The schema of the dataframe"""
if self._schema_discovered:
if self.schema_discovered:
# we must keep it simple because it could be called on every row by a user
assert isinstance(self._schema, Schema)
return self._schema # type: ignore
Expand All @@ -65,6 +65,16 @@ def schema(self) -> Schema:
self._schema_discovered = True
return self._schema

@property
def schema_discovered(self) -> Schema:
"""Whether the schema has been discovered or still a lambda"""
return self._schema_discovered

@property
def columns(self) -> List[str]:
"""The column names of the dataframe"""
return self.schema.names

@abstractmethod
def native_as_df(self) -> AnyDataFrame: # pragma: no cover
"""The dataframe form of the native object this Dataset class wraps.
Expand Down Expand Up @@ -94,11 +104,11 @@ def peek_dict(self) -> Dict[str, Any]:
:raises FugueDatasetEmptyError: if it is empty
"""
arr = self.peek_array()
return {self.schema.names[i]: arr[i] for i in range(len(self.schema))}
return {self.columns[i]: arr[i] for i in range(len(self.columns))}

def as_pandas(self) -> pd.DataFrame:
"""Convert to pandas DataFrame"""
pdf = pd.DataFrame(self.as_array(), columns=self.schema.names)
pdf = pd.DataFrame(self.as_array(), columns=self.columns)
return PD_UTILS.enforce_type(pdf, self.schema.pa_schema, null_safe=True)

def as_arrow(self, type_safe: bool = False) -> pa.Table:
Expand Down Expand Up @@ -232,7 +242,7 @@ def as_dict_iterable(
The default implementation enforces ``type_safe`` True
"""
if columns is None:
columns = self.schema.names
columns = self.columns
idx = range(len(columns))
for x in self.as_array_iterable(columns, type_safe=True):
yield {columns[i]: x[i] for i in idx}
Expand Down
9 changes: 5 additions & 4 deletions fugue/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ def _df_eq(
d1 = df1.as_pandas()
d2 = df2.as_pandas()
if not check_order:
d1 = d1.sort_values(df1.schema.names)
d2 = d2.sort_values(df1.schema.names)
d1 = d1.sort_values(df1.columns)
d2 = d2.sort_values(df1.columns)
d1 = d1.reset_index(drop=True)
d2 = d2.reset_index(drop=True)
pd.testing.assert_frame_equal(
Expand Down Expand Up @@ -332,11 +332,12 @@ def get_join_schemas(
on = list(on) if on is not None else []
aot(len(on) == len(set(on)), f"{on} has duplication")
if how != "cross" and len(on) == 0:
on = list(df1.schema.intersect(df2.schema.names).names)
other = set(df2.columns)
on = [c for c in df1.columns if c in other]
aot(
len(on) > 0,
lambda: SchemaError(
f"no common columns between {df1.schema} and {df2.schema}"
f"no common columns between {df1.columns} and {df2.columns}"
),
)
schema2 = df2.schema
Expand Down
Loading

0 comments on commit 8c04e9f

Please sign in to comment.