Skip to content

Commit

Permalink
Cotransform dict DataFrames, improve transformer deco, add no effect …
Browse files Browse the repository at this point in the history
…checkpoint (#30)

* setup fugue sql

* SQL: add basic extensions and tests

* update

* update

* clean up sql files

* fix syntax, add save load

* add test for load

* FugueSQLWorkflow

* update version

* Add pandas udf support, add SQL persist broadcast

* update

* update

* update

* update

* update

* make transformer schema more flexible

* Remove ABC from Transformers

* cotransform on dict like dataframes, add no effect checkpoint

* change deco lambda

* add more tests
  • Loading branch information
Han Wang authored Jun 14, 2020
1 parent f62f918 commit 711941b
Show file tree
Hide file tree
Showing 22 changed files with 5,570 additions and 5,137 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,13 @@
[![Doc](https://readthedocs.org/projects/fugue/badge)](https://triad.readthedocs.org)

An abstraction layer for distributed computation

## Tutorials

### The simplest way to start:
[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/fugue-project/tutorials/master)

### Run the tutorial using docker
```
docker run -p 8888:8888 fugueproject/tutorials:latest
```
12 changes: 10 additions & 2 deletions fugue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# flake8: noqa

__version__ = "0.2.8"
__version__ = "0.2.9"

from triad.collections import Schema
from triad.collections.fs import FileSystem

from fugue.collections.partition import PartitionCursor, PartitionSpec
from fugue.dataframe.array_dataframe import ArrayDataFrame
Expand All @@ -15,6 +18,11 @@
from fugue.extensions.creator import Creator, creator
from fugue.extensions.outputter import Outputter, outputter
from fugue.extensions.processor import Processor, processor
from fugue.extensions.transformer import CoTransformer, Transformer, transformer
from fugue.extensions.transformer import (
CoTransformer,
Transformer,
transformer,
cotransformer,
)
from fugue.workflow.workflow import FugueWorkflow, WorkflowDataFrame
from fugue.workflow.workflow_context import FugueWorkflowContext
6 changes: 5 additions & 1 deletion fugue/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,11 @@ def show(
print("")
if len(self.metadata) > 0:
print("Metadata:")
print(self.metadata.to_json(indent=True))
try:
# try pretty print, but if not convertible to json, print original
print(self.metadata.to_json(indent=True))
except Exception: # pragma: no cover
print(self.metadata)
print("")

def head(self, n: int, columns: Optional[List[str]] = None) -> List[Any]:
Expand Down
108 changes: 89 additions & 19 deletions fugue/execution/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from triad.exceptions import InvalidOperationError
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import to_size
from triad.utils.string import validate_triad_var_name

_DEFAULT_JOIN_KEYS: List[str] = []

Expand Down Expand Up @@ -116,8 +117,10 @@ def serialize_by_partition(
partition_spec: PartitionSpec,
df_name: str,
temp_path: Optional[str] = None,
to_file_threshold: int = -1,
to_file_threshold: Any = -1,
has_name: bool = False,
) -> DataFrame:
to_file_threshold = _get_file_threshold(to_file_threshold)
on = list(filter(lambda k: k in df.schema, partition_spec.partition_by))
presort = list(
filter(lambda p: p[0] in df.schema, partition_spec.presort.items())
Expand All @@ -135,7 +138,8 @@ def serialize_by_partition(
metadata = dict(
serialized=True,
serialized_cols={df_name: col_name},
schemas={df_name: df.schema},
schemas={df_name: str(df.schema)},
serialized_has_name=has_name,
)
return self.map(df, s.run, output_schema, partition_spec, metadata)

Expand All @@ -147,16 +151,15 @@ def zip(
partition_spec: PartitionSpec = EMPTY_PARTITION_SPEC,
temp_path: Optional[str] = None,
to_file_threshold: Any = -1,
df1_name: Optional[str] = None,
df2_name: Optional[str] = None,
):
on = list(partition_spec.partition_by)
how = how.lower()
assert_or_throw(
"semi" not in how and "anti" not in how,
InvalidOperationError("zip does not support semi or anti joins"),
)
to_file_threshold = (
-1 if to_file_threshold == -1 else to_size(to_file_threshold)
)
serialized_cols: Dict[str, Any] = {}
schemas: Dict[str, Any] = {}
if len(on) == 0:
Expand All @@ -171,14 +174,17 @@ def zip(
)
partition_spec = PartitionSpec(partition_spec, by=on)

def update_df(df: DataFrame) -> DataFrame:
def update_df(df: DataFrame, name: Optional[str]) -> DataFrame:
if name is None:
name = f"_{len(serialized_cols)}"
if not df.metadata.get("serialized", False):
df = self.serialize_by_partition(
df,
partition_spec,
f"_{len(serialized_cols)}",
name,
temp_path,
to_file_threshold,
has_name=name is not None,
)
for k in df.metadata["serialized_cols"].keys():
assert_or_throw(
Expand All @@ -188,13 +194,58 @@ def update_df(df: DataFrame) -> DataFrame:
schemas[k] = df.metadata["schemas"][k]
return df

df1 = update_df(df1)
df2 = update_df(df2)
df1 = update_df(df1, df1_name)
df2 = update_df(df2, df2_name)
metadata = dict(
serialized=True, serialized_cols=serialized_cols, schemas=schemas
serialized=True,
serialized_cols=serialized_cols,
schemas=schemas,
serialized_has_name=df1_name is not None or df2_name is not None,
)
return self.join(df1, df2, how=how, on=on, metadata=metadata)

def zip_all(
self,
dfs: DataFrames,
how: str = "inner",
partition_spec: PartitionSpec = EMPTY_PARTITION_SPEC,
temp_path: Optional[str] = None,
to_file_threshold: Any = -1,
) -> DataFrame:
assert_or_throw(len(dfs) > 0, "can't zip 0 dataframes")
pairs = list(dfs.items())
has_name = dfs.has_key
if len(dfs) == 1:
return self.serialize_by_partition(
pairs[0][1],
partition_spec,
pairs[0][0],
temp_path,
to_file_threshold,
has_name=has_name,
)
df = self.zip(
pairs[0][1],
pairs[1][1],
how=how,
partition_spec=partition_spec,
temp_path=temp_path,
to_file_threshold=to_file_threshold,
df1_name=pairs[0][0] if has_name else None,
df2_name=pairs[1][0] if has_name else None,
)
for i in range(2, len(dfs)):
df = self.zip(
df,
pairs[i][1],
how=how,
partition_spec=partition_spec,
temp_path=temp_path,
to_file_threshold=to_file_threshold,
df2_name=pairs[i][0] if has_name else None,
)
return df

def comap(
self,
df: DataFrame,
Expand Down Expand Up @@ -243,9 +294,19 @@ def __deepcopy__(self, memo: Any) -> "ExecutionEngine":
return self


def _get_file_threshold(size: Any) -> int:
if size is None:
return -1
if isinstance(size, int):
return size
return to_size(size)


def _df_name_to_serialize_col(name: str):
assert_or_throw(name is not None, "Dataframe name can't be None")
return "__blob__" + name + "__"
name = "__blob__" + name + "__"
assert_or_throw(validate_triad_var_name(name), "Invalid name " + name)
return name


class _PartitionSerializer(object):
Expand All @@ -271,19 +332,18 @@ def __init__(
):
self.schemas = df.metadata["schemas"]
self.df_idx = [
(df.schema.index_of_key(v), self.schemas[k])
(df.schema.index_of_key(v), k, self.schemas[k])
for k, v in df.metadata["serialized_cols"].items()
]
self.named = df.metadata.get("serialized_has_name", False)
self.func = func
self._on_init = on_init

def on_init(self, partition_no, df: DataFrame) -> None:
if self._on_init is None:
return
# TODO: currently, get_output_schema only gets empty dataframes
empty_dfs = DataFrames(
{k: ArrayDataFrame([], v) for k, v in self.schemas.items()}
)
empty_dfs = _generate_comap_empty_dfs(self.schemas, self.named)
self._on_init(partition_no, empty_dfs)

def run(self, cursor: PartitionCursor, df: LocalDataFrame) -> LocalDataFrame:
Expand All @@ -295,11 +355,21 @@ def run(self, cursor: PartitionCursor, df: LocalDataFrame) -> LocalDataFrame:
dfs = DataFrames(list(self._get_dfs(data[0])))
return self.func(cursor, dfs)

def _get_dfs(self, row: Any) -> Iterable[DataFrame]:
for k, v in self.df_idx:
def _get_dfs(self, row: Any) -> Iterable[Any]:
for k, name, v in self.df_idx:
if row[k] is None:
yield ArrayDataFrame([], v)
df: DataFrame = ArrayDataFrame([], v)
else:
df = deserialize_df(row[k])
df = deserialize_df(row[k]) # type: ignore
assert df is not None
if self.named:
yield name, df
else:
yield df


def _generate_comap_empty_dfs(schemas: Any, named: bool) -> DataFrames:
if named:
return DataFrames({k: ArrayDataFrame([], v) for k, v in schemas.items()})
else:
return DataFrames([ArrayDataFrame([], v) for v in schemas.values()])
7 changes: 6 additions & 1 deletion fugue/extensions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# flake8: noqa
from fugue.extensions.transformer import Transformer, CoTransformer, transformer
from fugue.extensions.transformer import (
Transformer,
CoTransformer,
transformer,
cotransformer,
)
from fugue.extensions.creator import Creator, creator
from fugue.extensions.processor import Processor, processor
from fugue.extensions.outputter import Outputter, outputter
2 changes: 1 addition & 1 deletion fugue/extensions/builtins/outputters.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def process(self, dfs: DataFrames) -> None:
df.show(
self.params.get("rows", 10),
self.params.get("show_count", False),
title=self.params.get("title", ""),
title=self.params.get_or_none("title", str),
)


Expand Down
26 changes: 5 additions & 21 deletions fugue/extensions/builtins/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
)
from fugue.exceptions import FugueWorkflowError
from fugue.execution import SQLEngine
from fugue.execution.execution_engine import _generate_comap_empty_dfs
from fugue.extensions.processor import Processor
from fugue.extensions.transformer import CoTransformer, Transformer, to_transformer
from triad.collections import ParamDict
Expand Down Expand Up @@ -56,8 +57,8 @@ def cotransform(self, df: DataFrame, tf: CoTransformer) -> DataFrame:
)
tf._key_schema = df.schema - list(df.metadata["serialized_cols"].values())
# TODO: currently, get_output_schema only gets empty dataframes
empty_dfs = DataFrames(
{k: ArrayDataFrame([], v) for k, v in df.metadata["schemas"].items()}
empty_dfs = _generate_comap_empty_dfs(
df.metadata["schemas"], df.metadata.get("serialized_has_name", False)
)
tf._output_schema = Schema(tf.get_output_schema(empty_dfs))
tr = _CoTransformerRunner(df, tf, self._ignore_errors)
Expand Down Expand Up @@ -100,30 +101,13 @@ def process(self, dfs: DataFrames) -> DataFrame:
# TODO: this should also search on workflow conf
temp_path = self.params.get_or_none("temp_path", str)
to_file_threshold = self.params.get_or_none("to_file_threshold", object)
if to_file_threshold is None: # pragma: no cover
to_file_threshold = -1
if len(dfs) == 1:
return self.execution_engine.serialize_by_partition(
dfs[0], partition_spec, "_0", temp_path, to_file_threshold
)
df = self.execution_engine.zip(
dfs[0],
dfs[1],
return self.execution_engine.zip_all(
dfs,
how=how,
partition_spec=partition_spec,
temp_path=temp_path,
to_file_threshold=to_file_threshold,
)
for i in range(2, len(dfs)):
df = self.execution_engine.zip(
df,
dfs[i],
how=how,
partition_spec=partition_spec,
temp_path=temp_path,
to_file_threshold=to_file_threshold,
)
return df


class Rename(Processor):
Expand Down
38 changes: 28 additions & 10 deletions fugue/extensions/transformer/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ def __uuid__(self) -> str:
return to_uuid(self._wrapper.__uuid__(), self._output_schema_arg)

def _parse_schema(self, obj: Any, df: DataFrame) -> Schema:
if callable(obj):
return obj(df, **self.params)
if isinstance(obj, str):
return df.schema.transform(obj)
if isinstance(obj, List):
return df.schema.transform(*obj)
return df.schema.transform(obj)
raise NotImplementedError # pragma: no cover

@staticmethod
def from_func(func: Callable, schema: Any) -> "_FuncAsTransformer":
Expand All @@ -98,14 +100,30 @@ def from_func(func: Callable, schema: Any) -> "_FuncAsTransformer":

class _FuncAsCoTransformer(CoTransformer):
def get_output_schema(self, dfs: DataFrames) -> Any:
return self._parse_schema(self._output_schema_arg) # type: ignore
return self._parse_schema(self._output_schema_arg, dfs) # type: ignore

@no_type_check
def transform(self, dfs: DataFrames) -> LocalDataFrame:
args: List[Any] = [dfs] if self._dfs_input else list(dfs.values())
return self._wrapper.run( # type: ignore
args, self.params, ignore_unknown=False, output_schema=self.output_schema
)
if self._dfs_input: # function has DataFrames input
return self._wrapper.run( # type: ignore
[dfs],
self.params,
ignore_unknown=False,
output_schema=self.output_schema,
)
if not dfs.has_key: # input does not have key
return self._wrapper.run( # type: ignore
list(dfs.values()),
self.params,
ignore_unknown=False,
output_schema=self.output_schema,
)
else: # input DataFrames has key
p = dict(dfs)
p.update(self.params)
return self._wrapper.run( # type: ignore
[], p, ignore_unknown=False, output_schema=self.output_schema
)

def __call__(self, *args: Any, **kwargs: Any) -> Any:
return self._wrapper(*args, **kwargs) # type: ignore
Expand All @@ -116,15 +134,15 @@ def __uuid__(self) -> str:
self._wrapper.__uuid__(), self._output_schema_arg, self._dfs_input
)

def _parse_schema(self, obj: Any) -> Schema:
if obj is None:
return Schema()
def _parse_schema(self, obj: Any, dfs: DataFrames) -> Schema:
if callable(obj):
return obj(dfs, **self.params)
if isinstance(obj, str):
return Schema(obj)
if isinstance(obj, List):
s = Schema()
for x in obj:
s += self._parse_schema(x)
s += self._parse_schema(x, dfs)
return s
return Schema(obj)

Expand Down
Loading

0 comments on commit 711941b

Please sign in to comment.