Skip to content

Commit

Permalink
ArrowDataFrame and DataFrame test suites (#17)
Browse files Browse the repository at this point in the history
* dask repartition

* update docs

* update

* refactor extensions

* add cotransform

* fix lint

* update

* update

* test spark

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* add spark

* fix 3.8

* fix 3.8

* add spark support

* remove modin

* remove modin

* update

* improve repartition

* improve test coverage

* update file system

* arrow dataframe and test suite

* fix pandas df name

* add IO

* update version
  • Loading branch information
Han Wang authored May 29, 2020
1 parent 9ec44b2 commit fa8bbb2
Show file tree
Hide file tree
Showing 62 changed files with 2,021 additions and 857 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,6 @@ tmp

# dask
dask-worker-space

# spark
spark-warehourse
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
[![PyPI license](https://img.shields.io/pypi/l/fugue.svg)](https://pypi.python.org/pypi/fugue/)
[![PyPI version](https://badge.fury.io/py/fugue.svg)](https://pypi.python.org/pypi/fugue/)
[![Coverage Status](https://coveralls.io/repos/github/fugue-project/fugue/badge.svg)](https://coveralls.io/github/fugue-project/fugue)
[![Doc](https://readthedocs.org/projects/fugue/badge)](https://traid.readthedocs.org)
[![Doc](https://readthedocs.org/projects/fugue/badge)](https://triad.readthedocs.org)

An abstraction layer for distributed computation
14 changes: 11 additions & 3 deletions docs/api/fugue.dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ fugue.dataframe.array\_dataframe
:undoc-members:
:show-inheritance:

fugue.dataframe.arrow\_dataframe
--------------------------------

.. automodule:: fugue.dataframe.arrow_dataframe
:members:
:undoc-members:
:show-inheritance:

fugue.dataframe.dataframe
-------------------------

Expand All @@ -34,10 +42,10 @@ fugue.dataframe.iterable\_dataframe
:undoc-members:
:show-inheritance:

fugue.dataframe.pandas\_dataframes
----------------------------------
fugue.dataframe.pandas\_dataframe
---------------------------------

.. automodule:: fugue.dataframe.pandas_dataframes
.. automodule:: fugue.dataframe.pandas_dataframe
:members:
:undoc-members:
:show-inheritance:
Expand Down
6 changes: 3 additions & 3 deletions docs/api/fugue.execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ fugue.execution.execution\_engine
:undoc-members:
:show-inheritance:

fugue.execution.naive\_execution\_engine
----------------------------------------
fugue.execution.native\_execution\_engine
-----------------------------------------

.. automodule:: fugue.execution.naive_execution_engine
.. automodule:: fugue.execution.native_execution_engine
:members:
:undoc-members:
:show-inheritance:
Expand Down
12 changes: 6 additions & 6 deletions docs/api/fugue.utils.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ fugue.utils
============


fugue.utils.file
----------------
fugue.utils.interfaceless
-------------------------

.. automodule:: fugue.utils.file
.. automodule:: fugue.utils.interfaceless
:members:
:undoc-members:
:show-inheritance:

fugue.utils.interfaceless
-------------------------
fugue.utils.io
--------------

.. automodule:: fugue.utils.interfaceless
.. automodule:: fugue.utils.io
:members:
:undoc-members:
:show-inheritance:
Expand Down
2 changes: 1 addition & 1 deletion fugue/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.1.7"
__version__ = "0.2.0"
6 changes: 3 additions & 3 deletions fugue/dag/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ def handle_broadcast(self, df: DataFrame) -> DataFrame:
return df
return self.execution_engine.broadcast(df)

def pre_partition(self, *args: Any, **kwargs: Any) -> "FugueTask":
self._pre_partition = PartitionSpec(*args, **kwargs)
return self
# def pre_partition(self, *args: Any, **kwargs: Any) -> "FugueTask":
# self._pre_partition = PartitionSpec(*args, **kwargs)
# return self


class Create(FugueTask):
Expand Down
51 changes: 48 additions & 3 deletions fugue/dag/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
SelectColumns,
Show,
Zip,
Load,
Save,
)
from triad.collections import Schema
from triad.utils.assertion import assert_or_throw
Expand Down Expand Up @@ -53,11 +55,20 @@ def process(
params: Any = None,
pre_partition: Any = None,
) -> TDF:
if pre_partition is None:
pre_partition = self._metadata.get("pre_partition", PartitionSpec())
df = self.workflow.process(
self, using=using, schema=schema, params=params, pre_partition=pre_partition
)
return self.to_self_type(df)

def output(self, using: Any, params: Any = None, pre_partition: Any = None) -> None:
if pre_partition is None:
pre_partition = self._metadata.get("pre_partition", PartitionSpec())
self.workflow.output(
self, using=using, params=params, pre_partition=pre_partition
)

def show(
self,
rows: int = 10,
Expand Down Expand Up @@ -167,6 +178,24 @@ def __getitem__(self: TDF, columns: List[Any]) -> TDF:
)
return self.to_self_type(df)

def save(
self,
path: str,
fmt: str = "",
mode: str = "overwrite",
partition: Any = None,
single: bool = False,
**kwargs: Any,
) -> None:
if partition is None:
partition = self._metadata.get("pre_partition", PartitionSpec())
self.workflow.output(
self,
using=Save,
pre_partition=partition,
params=dict(path=path, fmt=fmt, mode=mode, single=single, params=kwargs),
)

@property
def schema(self) -> Schema: # pragma: no cover
raise NotImplementedError("WorkflowDataFrame does not support this method")
Expand Down Expand Up @@ -206,6 +235,12 @@ def as_array_iterable(
) -> Iterable[Any]: # pragma: no cover
raise NotImplementedError("WorkflowDataFrame does not support this method")

def _drop_cols(self: TDF, cols: List[str]) -> DataFrame: # pragma: no cover
raise NotImplementedError("WorkflowDataFrame does not support this method")

def _select_cols(self, keys: List[Any]) -> DataFrame: # pragma: no cover
raise NotImplementedError("WorkflowDataFrame does not support this method")


class FugueWorkflow(object):
def __init__(self, execution_engine: ExecutionEngine):
Expand All @@ -224,6 +259,16 @@ def create(
)
return self.add(task)

def load(
self, path: str, fmt: str = "", columns: Any = None, **kwargs: Any
) -> WorkflowDataFrame:
task = Create(
self.execution_engine,
creator=Load,
params=dict(path=path, fmt=fmt, columns=columns, params=kwargs),
)
return self.add(task)

def process(
self,
*dfs: Any,
Expand Down Expand Up @@ -389,9 +434,9 @@ def __init__(
self.dependency[k] = self._parse_single_dependency(v)

def _parse_single_dependency(self, dep: Any) -> str:
if isinstance(dep, tuple): # (cursor_like_obj, output_name)
cursor = self._parse_cursor(dep[0])
return cursor._task.name + "." + dep[1]
# if isinstance(dep, tuple): # (cursor_like_obj, output_name)
# cursor = self._parse_cursor(dep[0])
# return cursor._task.name + "." + dep[1]
return self._parse_cursor(dep)._task.single_output_expression

def _parse_cursor(self, dep: Any) -> WorkflowDataFrame:
Expand Down
7 changes: 4 additions & 3 deletions fugue/dataframe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# flake8: noqa
from fugue.dataframe.array_dataframe import ArrayDataFrame
from fugue.dataframe.dataframe import DataFrame, LocalDataFrame, LocalBoundedDataFrame
from fugue.dataframe.arrow_dataframe import ArrowDataFrame
from fugue.dataframe.dataframe import DataFrame, LocalBoundedDataFrame, LocalDataFrame
from fugue.dataframe.dataframes import DataFrames
from fugue.dataframe.iterable_dataframe import IterableDataFrame
from fugue.dataframe.pandas_dataframes import PandasDataFrame
from fugue.dataframe.utils import to_local_df, to_local_bounded_df
from fugue.dataframe.pandas_dataframe import PandasDataFrame
from fugue.dataframe.utils import to_local_bounded_df, to_local_df
55 changes: 30 additions & 25 deletions fugue/dataframe/array_dataframe.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from typing import Any, Dict, Iterable, List, Optional

from fugue.dataframe.dataframe import (
DataFrame,
LocalBoundedDataFrame,
_get_schema_change,
)
from triad.exceptions import InvalidOperationError
from fugue.exceptions import FugueDataFrameInitError, FugueDataFrameOperationError
from triad.utils.assertion import assert_or_throw
from triad.utils.pyarrow import apply_schema

Expand All @@ -13,22 +14,25 @@ class ArrayDataFrame(LocalBoundedDataFrame):
def __init__( # noqa: C901
self, df: Any = None, schema: Any = None, metadata: Any = None
):
if df is None:
super().__init__(schema, metadata)
self._native = []
elif isinstance(df, DataFrame):
if schema is None:
super().__init__(df.schema, metadata)
self._native = df.as_array(type_safe=False)
else:
schema, _ = _get_schema_change(df.schema, schema)
try:
if df is None:
super().__init__(schema, metadata)
self._native = df.as_array(schema.names, type_safe=False)
elif isinstance(df, Iterable):
super().__init__(schema, metadata)
self._native = df if isinstance(df, List) else list(df)
else:
raise ValueError(f"{df} is incompatible with ArrayDataFrame")
self._native = []
elif isinstance(df, DataFrame):
if schema is None:
super().__init__(df.schema, metadata)
self._native = df.as_array(type_safe=False)
else:
schema, _ = _get_schema_change(df.schema, schema)
super().__init__(schema, metadata)
self._native = df.as_array(schema.names, type_safe=False)
elif isinstance(df, Iterable):
super().__init__(schema, metadata)
self._native = df if isinstance(df, List) else list(df)
else:
raise ValueError(f"{df} is incompatible with ArrayDataFrame")
except Exception as e:
raise FugueDataFrameInitError(e)

@property
def native(self) -> List[Any]:
Expand All @@ -39,22 +43,23 @@ def empty(self) -> bool:
return self.count() == 0

def peek_array(self) -> Any:
self.assert_not_empty()
return list(self.native[0])

def count(self) -> int:
return len(self.native)

def drop(self, cols: List[str]) -> DataFrame:
try:
schema = self.schema - cols
except Exception as e:
raise InvalidOperationError(str(e))
if len(schema) == 0:
raise InvalidOperationError("Can't remove all columns of a dataframe")
return ArrayDataFrame(self, schema)
def _drop_cols(self, cols: List[str]) -> DataFrame:
return ArrayDataFrame(self, self.schema - cols)

def _select_cols(self, keys: List[Any]) -> DataFrame:
return ArrayDataFrame(self, self.schema.extract(keys))

def rename(self, columns: Dict[str, str]) -> "DataFrame":
schema = self.schema.rename(columns)
try:
schema = self.schema.rename(columns)
except Exception as e:
raise FugueDataFrameOperationError(e)
return ArrayDataFrame(self.native, schema)

def as_array(
Expand Down
Loading

0 comments on commit fa8bbb2

Please sign in to comment.