Skip to content

Commit

Permalink
Add basic components (#6)
Browse files Browse the repository at this point in the history
* update

* add workflow

* refactor

* refactor

* refactor

* add select support
  • Loading branch information
Han Wang authored May 8, 2020
1 parent f1ea1b3 commit d75f6b0
Show file tree
Hide file tree
Showing 30 changed files with 1,196 additions and 384 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,6 @@ dmypy.json

.vscode
tmp

# Antlr
.antlr
3 changes: 2 additions & 1 deletion fugue/builtins/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# flake8: noqa
from fugue.builtins.outputters import Show
from fugue.builtins.outputters import Show, AssertEqual
from fugue.builtins.creators import CreateData
from fugue.builtins.processors import RunJoin, RunTransformer, RunSQLSelect
4 changes: 2 additions & 2 deletions fugue/builtins/creators.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from fugue.creator import Creator
from fugue.dataframe import DataFrame
from triad.collections import Schema


class CreateData(Creator):
def create(self) -> DataFrame:
return self.execution_engine.to_df(
self.params.get_or_throw("data", object),
self.params.get_or_none("schema", Schema),
self.params.get_or_none("schema", object),
self.params.get_or_none("metadata", object),
)
9 changes: 9 additions & 0 deletions fugue/builtins/outputters.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from fugue.outputter import Outputter
from fugue.dataframe import DataFrames
from fugue.dataframe.utils import _df_eq as df_eq


class Show(Outputter):
Expand All @@ -11,3 +12,11 @@ def process(self, dfs: DataFrames) -> None:
self.params.get("count", False),
title=self.params.get("title", ""),
)


class AssertEqual(Outputter):
def process(self, dfs: DataFrames) -> None:
assert len(dfs) > 1
expected = dfs[0]
for i in range(1, len(dfs)):
df_eq(expected, dfs[i], throw=True, **self.params)
102 changes: 102 additions & 0 deletions fugue/builtins/processors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from typing import Any, Iterable, List, Tuple

from fugue.dataframe import (
DataFrame,
DataFrames,
IterableDataFrame,
to_local_bounded_df,
)
from fugue.execution import SQLEngine
from fugue.processor import Processor
from fugue.transformer import Transformer, to_transformer
from triad.collections import ParamDict
from triad.utils.convert import to_instance, to_type
from triad.utils.iter import EmptyAwareIterable


class RunTransformer(Processor):
def process(self, dfs: DataFrames) -> DataFrame:
df = dfs[0]
tf = to_transformer(
self.params.get_or_none("transformer", object),
self.params.get_or_none("schema", object),
)
tf._params = self.params.get("params", ParamDict()) # type: ignore
tf._partition_spec = self.pre_partition # type: ignore
tf._key_schema = self.pre_partition.get_key_schema(df.schema) # type: ignore
tf._output_schema = tf.get_output_schema(df) # type: ignore
ie = self.params.get("ignore_errors", [])
ignore_errors = [to_type(x, Exception) for x in ie]
tr = _TransformerRunner(df, tf, ignore_errors) # type: ignore
return self.execution_engine.map_partitions(
df=df,
mapFunc=tr.run,
output_schema=tf.output_schema, # type: ignore
partition_spec=tf.partition_spec,
)


class RunJoin(Processor):
def process(self, dfs: DataFrames) -> DataFrame:
if len(dfs) == 1:
return dfs[0]
how = self.params.get_or_throw("how", str)
keys = self.params.get("keys", [])
df = dfs[0]
for i in range(1, len(dfs)):
df = self.execution_engine.join(df, dfs[i], how=how, keys=keys)
return df


class RunSQLSelect(Processor):
def process(self, dfs: DataFrames) -> DataFrame:
statement = self.params.get_or_throw("statement", str)
engine = self.params.get_or_none("sql_engine", object)
if engine is None:
engine = self.execution_engine.default_sql_engine
elif not isinstance(engine, SQLEngine):
engine = to_instance(engine, SQLEngine, args=[self.execution_engine])
return engine.select(dfs, statement)


class _TransformerRunner(object):
def __init__(
self, df: DataFrame, transformer: Transformer, ignore_errors: List[type]
):
self.schema = df.schema
self.metadata = df.metadata
self.transformer = transformer
self.ignore_errors = tuple(ignore_errors)

def run(self, no: int, data: Iterable[Any]) -> Iterable[Any]:
df = IterableDataFrame(data, self.schema, self.metadata)
if df.empty: # pragma: no cover
return
spec = self.transformer.partition_spec
self.transformer._cursor = spec.get_cursor( # type: ignore
self.schema, no
)
self.transformer.init_physical_partition(df)
if spec.empty:
partitions: Iterable[Tuple[int, int, EmptyAwareIterable]] = [
(0, 0, df.native)
]
else:
partitioner = spec.get_partitioner(self.schema)
partitions = partitioner.partition(df.native)
for pn, sn, sub in partitions:
self.transformer.cursor.set(sub.peek(), pn, sn)
sub_df = IterableDataFrame(sub, self.schema)
sub_df._metadata = self.metadata
self.transformer.init_logical_partition(sub_df)
if len(self.ignore_errors) == 0:
res = self.transformer.transform(sub_df)
for r in res.as_array_iterable(type_safe=True):
yield r
else:
try:
res = to_local_bounded_df(self.transformer.transform(sub_df))
except self.ignore_errors: # type: ignore
continue
for r in res.as_array_iterable(type_safe=True):
yield r
110 changes: 32 additions & 78 deletions fugue/dag/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import copy
from abc import ABC, abstractmethod
from typing import Any, no_type_check
from typing import Any, Dict, no_type_check

from adagio.instances import TaskContext
from adagio.specs import InputSpec, OutputSpec, TaskSpec
Expand All @@ -11,10 +11,8 @@
from fugue.execution import ExecutionEngine
from fugue.outputter.convert import to_outputter
from fugue.processor.convert import to_processor
from fugue.transformer.convert import to_transformer
from triad.collections.dict import ParamDict
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import to_type


class FugueTask(TaskSpec, ABC):
Expand Down Expand Up @@ -64,14 +62,6 @@ def execution_engine(self) -> ExecutionEngine:
def params(self) -> ParamDict:
return self.metadata

@property
def has_single_input(self) -> bool:
return len(self.inputs) == 1

@property
def has_single_output(self) -> bool:
return len(self.outputs) == 1

@property
def single_output_expression(self) -> str:
assert_or_throw(
Expand All @@ -86,7 +76,7 @@ def copy(self) -> "FugueTask":
return t

def persist(self, level: Any) -> "FugueTask":
self._persist = level
self._persist = "" if level is None else level
return self

def handle_persist(self, df: DataFrame) -> DataFrame:
Expand Down Expand Up @@ -115,15 +105,14 @@ class Create(FugueTask):
def __init__(
self,
execution_engine: ExecutionEngine,
params: Any,
creator: Any,
schema: Any = None,
params: Any = None,
deterministic: bool = True,
lazy: bool = False,
lazy: bool = True,
):
params = ParamDict(params)
self._creator = to_creator(
params.get_or_throw("creator", object), params.get_or_none("schema", object)
)
self._creator._params = params.get("params", ParamDict())
self._creator = to_creator(creator, schema)
self._creator._params = ParamDict(params)
self._creator._execution_engine = execution_engine
super().__init__(
execution_engine,
Expand All @@ -142,71 +131,27 @@ def execute(self, ctx: TaskContext) -> None:
ctx.outputs["_0"] = df


class Transform(FugueTask):
@no_type_check
def __init__(
self,
execution_engine: ExecutionEngine,
params: Any,
deterministic: bool = True,
lazy: bool = False,
):
params = ParamDict(params)
self._transformer = to_transformer(
params.get_or_throw("transformer", object),
params.get_or_none("schema", object),
)
self._transformer_params = params.get("params", ParamDict())
self._partition_spec = params.get_or_throw("partition", PartitionSpec)
self._ignore_errors = [
to_type(x, Exception) for x in params.get("ignore_errors", [])
]
super().__init__(
execution_engine,
params=params,
input_n=1,
output_n=1,
deterministic=deterministic,
lazy=lazy,
)

@no_type_check
def execute(self, ctx: TaskContext) -> None:
dfs = DataFrames(ctx.inputs)
df = self.execution_engine.transform(
dfs[0],
self._transformer,
self._transformer_params,
self._partition_spec,
self._ignore_errors,
)
df = self.handle_persist(df)
df = self.handle_broadcast(df)
ctx.outputs["_0"] = df


class Process(FugueTask):
@no_type_check
def __init__(
self,
n: int,
input_n: int,
execution_engine: ExecutionEngine,
processor: Any,
schema: Any,
params: Any,
pre_partition: Any = None,
deterministic: bool = True,
lazy: bool = False,
):
params = ParamDict(params)
self._processor = to_processor(
params.get_or_throw("processor", object),
params.get_or_none("schema", object),
)
self._processor._params = params.get("params", ParamDict())
self._processor._pre_partition = params.get("partition", PartitionSpec)
self._processor = to_processor(processor, schema)
self._processor._params = ParamDict(params)
self._processor._pre_partition = PartitionSpec(pre_partition)
self._processor._execution_engine = execution_engine
super().__init__(
execution_engine,
params=params,
input_n=n,
input_n=input_n,
output_n=1,
deterministic=deterministic,
lazy=lazy,
Expand All @@ -224,25 +169,34 @@ class Output(FugueTask):
@no_type_check
def __init__(
self,
n: int,
input_n: int,
execution_engine: ExecutionEngine,
outputter: Any,
params: Any,
pre_partition: Any = None,
deterministic: bool = True,
lazy: bool = False,
):
params = ParamDict(params)
self._outputter = to_outputter(params.get_or_throw("outputter", object))
self._outputter._params = params.get("params", ParamDict())
self._outputter._pre_partition = params.get("partition", PartitionSpec)
assert_or_throw(input_n > 0, FugueWorkflowError("must have at least one input"))
self._outputter = to_outputter(outputter)
self._outputter._params = ParamDict(params)
self._outputter._pre_partition = PartitionSpec(pre_partition)
self._outputter._execution_engine = execution_engine
super().__init__(
execution_engine,
params=params,
input_n=n,
input_n=input_n,
deterministic=deterministic,
lazy=lazy,
)

@no_type_check
def execute(self, ctx: TaskContext) -> None:
self._outputter.process(DataFrames(ctx.inputs))
dfs: Dict[str, DataFrame] = {}
for k, df in ctx.inputs.items():
if not self._outputter.pre_partition.empty:
df = self.execution_engine.repartition(
df, self._outputter.pre_partition
)
dfs[k] = df
self._outputter.process(DataFrames(dfs))
Loading

0 comments on commit d75f6b0

Please sign in to comment.