Skip to content

Commit

Permalink
Update Ibis Engine and add DataFrame column names normalizers (#366)
Browse files Browse the repository at this point in the history
* update dependencies

* update

* fix dependencies

* update

* fix compatibility

* update

* sql syntax update

* Update Ibis Engine

* update

* Add dataframe column names normalizers
  • Loading branch information
goodwanghan authored Oct 2, 2022
1 parent f7c5e63 commit c7bc70f
Show file tree
Hide file tree
Showing 15 changed files with 386 additions and 29 deletions.
16 changes: 16 additions & 0 deletions docs/api_ibis/fugue_ibis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ fugue\_ibis
.. |FugueDataTypes| replace:: :doc:`Fugue Data Types <tutorial:tutorials/appendix/generate_types>`


fugue\_ibis.dataframe
---------------------

.. automodule:: fugue_ibis.dataframe
:members:
:undoc-members:
:show-inheritance:

fugue\_ibis.execution\_engine
-----------------------------

.. automodule:: fugue_ibis.execution_engine
:members:
:undoc-members:
:show-inheritance:

fugue\_ibis.extensions
----------------------

Expand Down
4 changes: 2 additions & 2 deletions docs/api_ray/fugue_ray.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ fugue\_ray
.. |FugueDataTypes| replace:: :doc:`Fugue Data Types <tutorial:tutorials/appendix/generate_types>`


fugue\_ray.dateframe
fugue\_ray.dataframe
--------------------

.. automodule:: fugue_ray.dateframe
.. automodule:: fugue_ray.dataframe
:members:
:undoc-members:
:show-inheritance:
Expand Down
5 changes: 3 additions & 2 deletions fugue/_utils/registry.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Callable
from triad import conditional_dispatcher
from triad.utils.dispatcher import ConditionalDispatcher

_FUGUE_ENTRYPOINT = "fugue.plugins"


def fugue_plugin(func: Callable) -> Callable:
return conditional_dispatcher(entry_point=_FUGUE_ENTRYPOINT)(func)
def fugue_plugin(func: Callable) -> ConditionalDispatcher:
return conditional_dispatcher(entry_point=_FUGUE_ENTRYPOINT)(func) # type: ignore
22 changes: 14 additions & 8 deletions fugue/dataframe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
# flake8: noqa
from fugue.dataframe.array_dataframe import ArrayDataFrame
from fugue.dataframe.arrow_dataframe import ArrowDataFrame
from fugue.dataframe.dataframe import (
from .array_dataframe import ArrayDataFrame
from .arrow_dataframe import ArrowDataFrame
from .dataframe import (
DataFrame,
LocalBoundedDataFrame,
LocalDataFrame,
YieldedDataFrame,
)
from fugue.dataframe.dataframe_iterable_dataframe import LocalDataFrameIterableDataFrame
from fugue.dataframe.dataframes import DataFrames
from fugue.dataframe.iterable_dataframe import IterableDataFrame
from fugue.dataframe.pandas_dataframe import PandasDataFrame
from fugue.dataframe.utils import to_local_bounded_df, to_local_df
from .dataframe_iterable_dataframe import LocalDataFrameIterableDataFrame
from .dataframes import DataFrames
from .iterable_dataframe import IterableDataFrame
from .pandas_dataframe import PandasDataFrame
from .utils import (
get_dataframe_column_names,
normalize_dataframe_column_names,
rename_dataframe_column_names,
to_local_bounded_df,
to_local_df,
)
133 changes: 132 additions & 1 deletion fugue/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import os
import pickle
from typing import Any, Iterable, List, Optional, Tuple
from typing import Any, Dict, Iterable, List, Optional, Tuple

import pandas as pd
import pyarrow as pa
Expand All @@ -17,6 +17,137 @@
from triad.exceptions import InvalidOperationError
from triad.utils.assertion import assert_arg_not_none
from triad.utils.assertion import assert_or_throw as aot
from triad.utils.rename import normalize_names

from .._utils.registry import fugue_plugin


@fugue_plugin
def get_dataframe_column_names(df: Any) -> List[Any]: # pragma: no cover
"""A generic function to get column names of any dataframe
:param df: the dataframe object
:return: the column names
.. note::
In order to support a new type of dataframe, an implementation must
be registered, for example
.. code-block::python
@get_dataframe_column_names.candidate(lambda df: isinstance(df, pa.Table))
def _get_pyarrow_dataframe_columns(df: pa.Table) -> List[Any]:
return [f.name for f in df.schema]
"""
raise NotImplementedError(f"{type(df)} is not supported")


@fugue_plugin
def rename_dataframe_column_names(df: Any, names: Dict[str, Any]) -> Any:
"""A generic function to rename column names of any dataframe
:param df: the dataframe object
:param names: the rename operations as a dict: ``old name => new name``
:return: the renamed dataframe
.. note::
In order to support a new type of dataframe, an implementation must
be registered, for example
.. code-block::python
@rename_dataframe_column_names.candidate(
lambda df, *args, **kwargs: isinstance(df, pd.DataFrame)
)
def _rename_pandas_dataframe(
df: pd.DataFrame, names: Dict[str, Any]
) -> pd.DataFrame:
if len(names) == 0:
return df
return df.rename(columns=names)
"""
if len(names) == 0:
return df
else: # pragma: no cover
raise NotImplementedError(f"{type(df)} is not supported")


def normalize_dataframe_column_names(df: Any) -> Tuple[Any, Dict[str, Any]]:
"""A generic function to normalize any dataframe's column names to follow
Fugue naming rules
.. note::
This is a temporary solution before
:class:`~triad:triad.collections.schema.Schema`
can take arbitrary names
.. admonition:: Examples
* ``[0,1]`` => ``{"_0":0, "_1":1}``
* ``["1a","2b"]`` => ``{"_1a":"1a", "_2b":"2b"}``
* ``["*a","-a"]`` => ``{"_a":"*a", "_a_1":"-a"}``
:param df: a dataframe object
:return: the renamed dataframe and the rename operations as a dict that
can **undo** the change
.. seealso::
* :func:`~.get_dataframe_column_names`
* :func:`~.rename_dataframe_column_names`
* :func:`~triad:triad.utils.rename.normalize_names`
"""
cols = get_dataframe_column_names(df)
names = normalize_names(cols)
if len(names) == 0:
return df, {}
undo = {v: k for k, v in names.items()}
return (rename_dataframe_column_names(df, names), undo)


@get_dataframe_column_names.candidate(lambda df: isinstance(df, pd.DataFrame))
def _get_pandas_dataframe_columns(df: pd.DataFrame) -> List[Any]:
return list(df.columns)


@rename_dataframe_column_names.candidate(
lambda df, *args, **kwargs: isinstance(df, pd.DataFrame)
)
def _rename_pandas_dataframe(df: pd.DataFrame, names: Dict[str, Any]) -> pd.DataFrame:
if len(names) == 0:
return df
return df.rename(columns=names)


@get_dataframe_column_names.candidate(lambda df: isinstance(df, pa.Table))
def _get_pyarrow_dataframe_columns(df: pa.Table) -> List[Any]:
return [f.name for f in df.schema]


@rename_dataframe_column_names.candidate(
lambda df, *args, **kwargs: isinstance(df, pa.Table)
)
def _rename_pyarrow_dataframe(df: pa.Table, names: Dict[str, Any]) -> pa.Table:
if len(names) == 0:
return df
return df.rename_columns([names.get(f.name, f.name) for f in df.schema])


@get_dataframe_column_names.candidate(lambda df: isinstance(df, DataFrame))
def _get_fugue_dataframe_columns(df: "DataFrame") -> List[Any]:
return df.schema.names


@rename_dataframe_column_names.candidate(
lambda df, *args, **kwargs: isinstance(df, DataFrame)
)
def _rename_fugue_dataframe(df: "DataFrame", names: Dict[str, Any]) -> "DataFrame":
if len(names) == 0:
return df
return df.rename(columns=names)


def _pa_type_eq(t1: pa.DataType, t2: pa.DataType) -> bool:
Expand Down
18 changes: 18 additions & 0 deletions fugue_dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import pyarrow as pa
from fugue.dataframe import ArrowDataFrame, DataFrame, LocalDataFrame, PandasDataFrame
from fugue.dataframe.dataframe import _input_schema
from fugue.dataframe.utils import (
get_dataframe_column_names,
rename_dataframe_column_names,
)
from fugue.exceptions import FugueDataFrameOperationError
from triad.collections.schema import Schema
from triad.utils.assertion import assert_arg_not_none, assert_or_throw
Expand All @@ -17,6 +21,20 @@
from fugue_dask._utils import DASK_UTILS


@get_dataframe_column_names.candidate(lambda df: isinstance(df, pd.DataFrame))
def _get_dask_dataframe_columns(df: pd.DataFrame) -> List[Any]:
return list(df.columns)


@rename_dataframe_column_names.candidate(
lambda df, *args, **kwargs: isinstance(df, pd.DataFrame)
)
def _rename_dask_dataframe(df: pd.DataFrame, names: Dict[str, Any]) -> pd.DataFrame:
if len(names) == 0:
return df
return df.rename(columns=names)


class DaskDataFrame(DataFrame):
"""DataFrame that wraps Dask DataFrame. Please also read
|DataFrameTutorial| to understand this Fugue concept
Expand Down
4 changes: 2 additions & 2 deletions fugue_ibis/_compat.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# flake8: noqa
# pylint: disable-all

try:
try: # pragma: no cover
from ibis.expr.types import Table as IbisTable
except Exception:
except Exception: # pragma: no cover
from ibis.expr.types import TableExpr as IbisTable
30 changes: 22 additions & 8 deletions fugue_ibis/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from triad.utils.assertion import assert_or_throw

from .dataframe import IbisDataFrame
from ._compat import IbisTable
import itertools

_JOIN_RIGHT_SUFFIX = "_ibis_y__"
Expand All @@ -40,10 +41,9 @@ def __init__(self, execution_engine: ExecutionEngine) -> None:
self._ibis_engine: IbisExecutionEngine = execution_engine # type: ignore

def select(self, dfs: DataFrames, statement: str) -> DataFrame:
for k, v in dfs.items():
self._ibis_engine._to_ibis_dataframe(v).native.alias(k)
tb = self._ibis_engine.backend.sql(statement)
return self._ibis_engine._to_ibis_dataframe(tb)
return self._ibis_engine._to_ibis_dataframe(
self._ibis_engine._raw_select(statement, dfs)
)


class IbisExecutionEngine(ExecutionEngine):
Expand Down Expand Up @@ -72,6 +72,9 @@ def _to_ibis_dataframe(
) -> IbisDataFrame: # pragma: no cover
raise NotImplementedError

def _compile_sql(self, df: IbisDataFrame) -> str:
return str(df.native.compile())

def to_df(self, df: Any, schema: Any = None, metadata: Any = None) -> DataFrame:
return self._to_ibis_dataframe(df, schema=schema, metadata=metadata)

Expand Down Expand Up @@ -249,7 +252,7 @@ def take(
_presort = parse_presort_exp(presort)
else:
_presort = partition_spec.presort
tbn = self.get_temp_table_name()
tbn = "_temp"
idf = self._to_ibis_dataframe(df)

if len(_presort) == 0:
Expand All @@ -264,7 +267,7 @@ def take(
f"AS __fugue_take_param FROM {tbn}"
f") WHERE __fugue_take_param<={n}"
)
tb = idf.native.alias(tbn).sql(sql)
tb = self._raw_select(sql, {tbn: idf})
return self._to_ibis_dataframe(tb[df.schema.names], metadata=metadata)

sorts: List[str] = []
Expand All @@ -277,7 +280,7 @@ def take(

if len(partition_spec.partition_by) == 0:
sql = f"SELECT * FROM {tbn} {sort_expr} LIMIT {n}"
tb = idf.native.alias(tbn).sql(sql)
tb = self._raw_select(sql, {tbn: idf})
return self._to_ibis_dataframe(tb[df.schema.names], metadata=metadata)

pcols = ", ".join(
Expand All @@ -289,5 +292,16 @@ def take(
f"AS __fugue_take_param FROM {tbn}"
f") WHERE __fugue_take_param<={n}"
)
tb = idf.native.alias(tbn).sql(sql)
tb = self._raw_select(sql, {tbn: idf})
return self._to_ibis_dataframe(tb[df.schema.names], metadata=metadata)

def _raw_select(self, statement: str, dfs: Dict[str, Any]) -> IbisTable:
cte: List[str] = []
for k, v in dfs.items():
idf = self._to_ibis_dataframe(v)
cte.append(k + " AS (" + self._compile_sql(idf) + ")")
if len(cte) > 0:
sql = "WITH " + ",\n".join(cte) + "\n" + statement
else:
sql = statement
return self.backend.sql(sql)
28 changes: 26 additions & 2 deletions fugue_ray/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,35 @@
import ray.data as rd
from fugue.dataframe import ArrowDataFrame, DataFrame, LocalDataFrame, PandasDataFrame
from fugue.dataframe.dataframe import _input_schema
from fugue.dataframe.utils import (
get_dataframe_column_names,
rename_dataframe_column_names,
)
from fugue.exceptions import FugueDataFrameEmptyError, FugueDataFrameOperationError
from triad import assert_or_throw
from triad.collections.schema import Schema

from ._utils.dataframe import build_empty, get_dataset_format, _build_empty_arrow
from triad import assert_or_throw
from ._utils.dataframe import _build_empty_arrow, build_empty, get_dataset_format


@get_dataframe_column_names.candidate(lambda df: isinstance(df, rd.Dataset))
def _get_ray_dataframe_columns(df: rd.Dataset) -> List[Any]:
fmt = get_dataset_format(df)
if fmt == "pandas":
return list(df.schema(True).names)
elif fmt == "arrow":
return [f.name for f in df.schema(True)]
raise NotImplementedError(f"{fmt} is not supported") # pragma: no cover


@rename_dataframe_column_names.candidate(
lambda df, *args, **kwargs: isinstance(df, rd.Dataset)
)
def _rename_ray_dataframe(df: rd.Dataset, names: Dict[str, Any]) -> rd.Dataset:
if len(names) == 0:
return df
new_cols = [names.get(name, name) for name in _get_ray_dataframe_columns(df)]
return df.map_batches(lambda b: b.rename_columns(new_cols), batch_format="pyarrow")


class RayDataFrame(DataFrame):
Expand Down
Loading

0 comments on commit c7bc70f

Please sign in to comment.