Skip to content

Commit

Permalink
Add dask and modin
Browse files Browse the repository at this point in the history
* fix select bug, add col ops

* add modin

* add modin

* fix tests

* fix tests

* add dask

* fix tests

* update tests

* fix lint
  • Loading branch information
Han Wang authored May 11, 2020
1 parent ec02cb3 commit f364c36
Show file tree
Hide file tree
Showing 31 changed files with 1,390 additions and 53 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,6 @@ tmp

# Antlr
.antlr

# dask
dask-worker-space
8 changes: 8 additions & 0 deletions fugue/collections/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ def empty(self) -> bool:
def num_partitions(self) -> str:
return self._num_partitions

def get_num_partitions(self, **expr_map_funcs: Any) -> int:
expr = self.num_partitions
for k, v in expr_map_funcs.items():
if k in expr:
value = str(v())
expr = expr.replace(k, value)
return int(eval(expr))

@property
def algo(self) -> str:
return self._algo
Expand Down
2 changes: 2 additions & 0 deletions fugue/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
KEYWORD_ROWCOUNT = "ROWCOUNT"
KEYWORD_CORECOUNT = "CORECOUNT"
11 changes: 2 additions & 9 deletions fugue/dag/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import copy
from abc import ABC, abstractmethod
from typing import Any, Dict, no_type_check, Optional, List
from typing import Any, no_type_check, Optional, List

from adagio.instances import TaskContext
from adagio.specs import InputSpec, OutputSpec, TaskSpec
Expand Down Expand Up @@ -204,11 +204,4 @@ def __init__(

@no_type_check
def execute(self, ctx: TaskContext) -> None:
dfs: Dict[str, DataFrame] = {}
for k, df in ctx.inputs.items():
if not self._outputter.pre_partition.empty:
df = self.execution_engine.repartition(
df, self._outputter.pre_partition
)
dfs[k] = df
self._outputter.process(DataFrames(dfs))
self._outputter.process(DataFrames(ctx.inputs))
10 changes: 7 additions & 3 deletions fugue/dag/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ def is_bounded(self) -> bool: # pragma: no cover
def empty(self) -> bool: # pragma: no cover
raise NotImplementedError(f"WorkflowDataFrame does not support this method")

@property
def num_partitions(self) -> int: # pragma: no cover
raise NotImplementedError(f"WorkflowDataFrame does not support this method")

def peek_array(self) -> Any: # pragma: no cover
raise NotImplementedError(f"WorkflowDataFrame does not support this method")

Expand Down Expand Up @@ -284,11 +288,11 @@ def assert_eq(self, *dfs: Any, **params: Any) -> None:
self.output(*dfs, using=AssertEqual, params=params)

def add(self, task: FugueTask, *args: Any, **kwargs: Any) -> WorkflowDataFrame:
task = task.copy()
assert_or_throw(task._node_spec is None, f"can't reuse {task}")
dep = _Dependencies(self, task, {}, *args, **kwargs)
name = "_" + str(len(self._spec.tasks))
self._spec.add_task(name, task, dep.dependency)
return WorkflowDataFrame(self, task)
wt = self._spec.add_task(name, task, dep.dependency)
return WorkflowDataFrame(self, wt)

def _to_dfs(self, *args: Any, **kwargs: Any) -> DataFrames:
return DataFrames(*args, **kwargs).convert(self.create_data)
Expand Down
15 changes: 10 additions & 5 deletions fugue/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ def is_bounded(self) -> bool: # pragma: no cover
# def apply_schema(self, schema: Any) -> None: # pragma: no cover
# raise NotImplementedError

# @abstractmethod
# def num_partitions(self) -> int: # pragma: no cover
# raise NotImplementedError
@property
@abstractmethod
def num_partitions(self) -> int: # pragma: no cover
raise NotImplementedError

@property
@abstractmethod
Expand Down Expand Up @@ -185,19 +186,23 @@ def __init__(self, schema: Any = None, metadata: Any = None):
super().__init__(schema=schema, metadata=metadata)

@property
def is_local(self):
def is_local(self) -> bool:
return True

def as_local(self) -> "LocalDataFrame":
return self

@property
def num_partitions(self) -> int: # pragma: no cover
return 1


class LocalBoundedDataFrame(LocalDataFrame):
def __init__(self, schema: Any = None, metadata: Any = None):
super().__init__(schema=schema, metadata=metadata)

@property
def is_bounded(self):
def is_bounded(self) -> bool:
return True


Expand Down
5 changes: 2 additions & 3 deletions fugue/dataframe/pandas_dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ def __init__( # noqa: C901
):
if df is None:
schema = _input_schema(schema).assert_not_empty()
pdf = pd.DataFrame([], columns=schema.names)
pdf = pdf.astype(dtype=schema.pd_dtype)
elif isinstance(df, PandasDataFrame):
df = []
if isinstance(df, PandasDataFrame):
# TODO: This is useless if in this way and wrong
pdf = df.native
schema = None
Expand Down
2 changes: 1 addition & 1 deletion fugue/execution/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# flake8: noqa
from fugue.execution.execution_engine import ExecutionEngine, SQLEngine
from fugue.execution.naive_execution_engine import NaiveExecutionEngine
from fugue.execution.naive_execution_engine import NaiveExecutionEngine, SqliteEngine
48 changes: 44 additions & 4 deletions fugue/execution/naive_execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
ExecutionEngine,
)
from sqlalchemy import create_engine
from triad.collections import ParamDict, Schema
from triad.collections import Schema
from triad.utils.pandas_like import PD_UTILS


class SqliteEngine(SQLEngine):
Expand All @@ -39,7 +40,7 @@ def select(self, dfs: DataFrames, statement: str) -> DataFrame:

class NaiveExecutionEngine(ExecutionEngine):
def __init__(self, conf: Any = None):
self._conf = ParamDict(conf)
super().__init__(conf)
self._fs = OSFS("/")
self._log = logging.getLogger()
self._default_sql_engine = SqliteEngine(self)
Expand Down Expand Up @@ -67,7 +68,9 @@ def to_df(
) -> LocalBoundedDataFrame:
return to_local_bounded_df(df, schema, metadata)

def repartition(self, df: DataFrame, partition_spec: PartitionSpec) -> DataFrame:
def repartition(
self, df: DataFrame, partition_spec: PartitionSpec
) -> DataFrame: # pragma: no cover
self.log.warning(f"{self} doesn't respect repartition")
return df

Expand All @@ -77,14 +80,51 @@ def map_partitions(
mapFunc: Callable[[int, Iterable[Any]], Iterable[Any]],
output_schema: Any,
partition_spec: PartitionSpec,
) -> DataFrame:
if partition_spec.num_partitions != "0":
self.log.warning(
f"{self} doesn't respect num_partitions {partition_spec.num_partitions}"
)
if len(partition_spec.partition_by) == 0: # no partition
df = to_local_df(df)
return IterableDataFrame(
mapFunc(0, df.as_array_iterable(type_safe=True)), output_schema
)
presort = partition_spec.presort
presort_keys = list(presort.keys())
presort_asc = list(presort.values())
output_schema = Schema(output_schema)
names = output_schema.names

def _map(pdf: Any) -> pd.DataFrame:
if len(presort_keys) > 0:
pdf = pdf.sort_values(presort_keys, ascending=presort_asc)
data = list(
mapFunc(
0, PD_UTILS.as_array_iterable(pdf, type_safe=True, null_safe=False)
)
)
return pd.DataFrame(data, columns=names)

result = PD_UTILS.safe_groupby_apply(
df.as_pandas(), partition_spec.partition_by, _map
)
return PandasDataFrame(result, output_schema)

# TODO: remove this
def _map_partitions(
self,
df: DataFrame,
mapFunc: Callable[[int, Iterable[Any]], Iterable[Any]],
output_schema: Any,
partition_spec: PartitionSpec,
) -> DataFrame: # pragma: no cover
df = to_local_df(df)
if partition_spec.num_partitions != "0":
self.log.warning(
f"{self} doesn't respect num_partitions {partition_spec.num_partitions}"
)
partitioner = partition_spec.get_partitioner(df.schema)
sorts = partition_spec.get_sorts(df.schema)
if len(partition_spec.partition_by) == 0: # no partition
return IterableDataFrame(
mapFunc(0, df.as_array_iterable(type_safe=True)), output_schema
Expand Down
2 changes: 1 addition & 1 deletion fugue/transformer/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,5 @@ def cursor(self) -> PartitionCursor:

class MultiInputTransformer(object):
@property
def partition_spec(self) -> PartitionSpec:
def partition_spec(self) -> PartitionSpec: # pragma: no cover
return self._partition_spec # type: ignore
2 changes: 2 additions & 0 deletions fugue_dask/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# flake8: noqa
from fugue_dask.dataframe import DaskDataFrame
130 changes: 130 additions & 0 deletions fugue_dask/dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from typing import Any, Dict, Iterable, List, Optional, Tuple

import dask.dataframe as pd
import pandas
from fugue.dataframe import DataFrame, LocalDataFrame, PandasDataFrame
from fugue.dataframe.dataframe import _input_schema
from triad.collections.schema import Schema
from triad.exceptions import InvalidOperationError
from triad.utils.assertion import assert_arg_not_none, assert_or_throw
from fugue_dask.utils import DASK_UTILS


class DaskDataFrame(DataFrame):
def __init__( # noqa: C901
self,
df: Any = None,
schema: Any = None,
metadata: Any = None,
num_partitions: int = 2,
type_safe=True,
):
if df is None:
schema = _input_schema(schema).assert_not_empty()
df = []
if isinstance(df, DaskDataFrame):
super().__init__(df.schema, df.metadata)
self._native: pd.DataFrame = df._native
return
elif isinstance(df, (pd.DataFrame, pd.Series)):
if isinstance(df, pd.Series):
df = df.to_frame()
pdf = df
schema = None if schema is None else _input_schema(schema)
elif isinstance(df, (pandas.DataFrame, pandas.Series)):
if isinstance(df, pandas.Series):
df = df.to_frame()
pdf = pd.from_pandas(df, npartitions=num_partitions)
schema = None if schema is None else _input_schema(schema)
elif isinstance(df, Iterable):
assert_arg_not_none(schema, msg=f"schema can't be None for iterable input")
schema = _input_schema(schema).assert_not_empty()
t = PandasDataFrame(df, schema)
pdf = pd.from_pandas(t.native, npartitions=num_partitions)
type_safe = False
else:
raise ValueError(f"{df} is incompatible with DaskDataFrame")
pdf, schema = self._apply_schema(pdf, schema, type_safe)
super().__init__(schema, metadata)
self._native = pdf

@property
def native(self) -> pd.DataFrame:
return self._native

@property
def is_local(self) -> bool:
return False

def as_local(self) -> LocalDataFrame:
return PandasDataFrame(self.as_pandas(), self.schema)

@property
def is_bounded(self) -> bool:
return True

@property
def empty(self) -> bool:
return DASK_UTILS.empty(self.native)

@property
def num_partitions(self) -> int:
return self.native.npartitions

def peek_array(self) -> Any:
return self.as_pandas().iloc[0].values.tolist()

def count(self, persist: bool = False) -> int:
return self.as_pandas().shape[0]

def as_pandas(self) -> pandas.DataFrame:
return self.native.compute().reset_index(drop=True)

def drop(self, cols: List[str]) -> DataFrame:
try:
schema = self.schema - cols
except Exception as e:
raise InvalidOperationError(str(e))
if len(schema) == 0:
raise InvalidOperationError("Can't remove all columns of a dataframe")
return DaskDataFrame(self.native.drop(cols, axis=1), schema, type_safe=False)

def rename(self, columns: Dict[str, str]) -> "DataFrame":
df = self.native.rename(columns=columns)
schema = self.schema.rename(columns)
return DaskDataFrame(df, schema, type_safe=False)

def as_array(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]:
return list(self.as_array_iterable(columns, type_safe=type_safe))

def as_array_iterable(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> Iterable[Any]:
sub = None if columns is None else self.schema.extract(columns).pa_schema
return DASK_UTILS.as_array_iterable(
self.native.compute(), sub, type_safe=type_safe, null_safe=True
)

def _apply_schema(
self, pdf: pd.DataFrame, schema: Optional[Schema], type_safe: bool = True
) -> Tuple[pd.DataFrame, Schema]:
if not type_safe:
assert_arg_not_none(pdf, "pdf")
assert_arg_not_none(schema, "schema")
return pdf, schema
DASK_UTILS.ensure_compatible(pdf)
if pdf.columns.dtype == "object": # pdf has named schema
pschema = Schema(DASK_UTILS.to_schema(pdf))
if schema is None or pschema == schema:
return pdf, pschema.assert_not_empty()
pdf = pdf[schema.assert_not_empty().names]
else: # pdf has no named schema
schema = _input_schema(schema).assert_not_empty()
assert_or_throw(
pdf.shape[1] == len(schema),
ValueError(f"Pandas datafame column count doesn't match {schema}"),
)
pdf.columns = schema.names
return DASK_UTILS.enforce_type(pdf, schema.pa_schema, null_safe=True), schema
Loading

0 comments on commit f364c36

Please sign in to comment.