Skip to content

Commit

Permalink
Make building workflow independent from engine (#18)
Browse files Browse the repository at this point in the history
* Make building workflow independent from engine

* update version

* update version

* update workflow language

* update

* update

* add join syntax sugar
  • Loading branch information
Han Wang authored May 30, 2020
1 parent fa8bbb2 commit 9f730cb
Show file tree
Hide file tree
Showing 18 changed files with 332 additions and 171 deletions.
20 changes: 0 additions & 20 deletions docs/api/fugue.dag.rst

This file was deleted.

2 changes: 1 addition & 1 deletion docs/api/fugue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ fugue
:maxdepth: 4

fugue.collections
fugue.dag
fugue.dataframe
fugue.execution
fugue.extensions
fugue.utils
fugue.workflow


fugue.constants
Expand Down
28 changes: 28 additions & 0 deletions docs/api/fugue.workflow.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
fugue.workflow
===============


fugue.workflow.tasks
--------------------

.. automodule:: fugue.workflow.tasks
:members:
:undoc-members:
:show-inheritance:

fugue.workflow.workflow
-----------------------

.. automodule:: fugue.workflow.workflow
:members:
:undoc-members:
:show-inheritance:

fugue.workflow.workflow\_context
--------------------------------

.. automodule:: fugue.workflow.workflow_context
:members:
:undoc-members:
:show-inheritance:

2 changes: 1 addition & 1 deletion fugue/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.2.0"
__version__ = "0.2.1"
16 changes: 11 additions & 5 deletions fugue/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
from fugue.dataframe.iterable_dataframe import IterableDataFrame
from fugue.dataframe.pandas_dataframe import PandasDataFrame
from triad.collections import Schema
from triad.collections.fs import FileSystem
from triad.collections.schema import SchemaError
from triad.exceptions import InvalidOperationError
from triad.utils.assertion import assert_arg_not_none
from triad.utils.assertion import assert_or_throw as aot

from triad.collections.fs import FileSystem


def _df_eq(
df: DataFrame,
Expand Down Expand Up @@ -185,6 +185,12 @@ def get_join_schemas(
)
on = list(on)
aot(len(on) == len(set(on)), f"{on} has duplication")
if how != "cross" and len(on) == 0:
on = list(df1.schema.intersect(df2.schema.names).names)
aot(
len(on) > 0,
SchemaError(f"no common columns between {df1.schema} and {df2.schema}"),
)
schema2 = df2.schema
aot(
how != "outer",
Expand All @@ -196,14 +202,14 @@ def get_join_schemas(
schema2 = schema2.extract(on)
aot(
on in df1.schema and on in schema2,
KeyError(f"{on} is not the intersection of {df1.schema} & {df2.schema}"),
SchemaError(f"{on} is not the intersection of {df1.schema} & {df2.schema}"),
)
cm = df1.schema.intersect(on)
if how == "cross":
aot(
len(df1.schema.intersect(schema2)) == 0,
KeyError("can't specify on for cross join"),
SchemaError("can't specify on for cross join"),
)
else:
aot(len(on) > 0, KeyError("on must be specified"))
aot(len(on) > 0, SchemaError("on must be specified"))
return cm, (df1.schema.union(schema2))
File renamed without changes.
58 changes: 28 additions & 30 deletions fugue/dag/tasks.py → fugue/workflow/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from adagio.instances import TaskContext
from adagio.specs import InputSpec, OutputSpec, TaskSpec
from fugue.collections.partition import PartitionSpec
from fugue.workflow.workflow_context import FugueWorkflowContext
from fugue.dataframe import DataFrame, DataFrames
from fugue.exceptions import FugueWorkflowError
from fugue.execution import ExecutionEngine
Expand All @@ -18,7 +19,6 @@
class FugueTask(TaskSpec, ABC):
def __init__(
self,
execution_engine: ExecutionEngine,
input_n: int = 0,
output_n: int = 0,
configs: Any = None,
Expand All @@ -45,7 +45,6 @@ def __init__(
OutputSpec("_" + str(i), DataFrame, nullable=False) for i in range(output_n)
]
self._input_has_key = input_names is not None
self._execution_engine = execution_engine
super().__init__(
configs=configs,
inputs=inputs,
Expand All @@ -63,10 +62,6 @@ def __init__(
def execute(self, ctx: TaskContext) -> None: # pragma: no cover
raise NotImplementedError

@property
def execution_engine(self) -> ExecutionEngine:
return self._execution_engine

@property
def params(self) -> ParamDict:
return self.metadata
Expand All @@ -92,32 +87,40 @@ def persist(self, level: Any) -> "FugueTask":
self._persist = "" if level is None else level
return self

def handle_persist(self, df: DataFrame) -> DataFrame:
def handle_persist(self, df: DataFrame, engine: ExecutionEngine) -> DataFrame:
if self._persist is None:
return df
return self.execution_engine.persist(
df, None if self._persist == "" else self._persist
)
return engine.persist(df, None if self._persist == "" else self._persist)

def broadcast(self) -> "FugueTask":
self._broadcast = True
return self

def handle_broadcast(self, df: DataFrame) -> DataFrame:
def handle_broadcast(self, df: DataFrame, engine: ExecutionEngine) -> DataFrame:
if not self._broadcast:
return df
return self.execution_engine.broadcast(df)
return engine.broadcast(df)

# def pre_partition(self, *args: Any, **kwargs: Any) -> "FugueTask":
# self._pre_partition = PartitionSpec(*args, **kwargs)
# return self

def _get_workflow_context(self, ctx: TaskContext) -> FugueWorkflowContext:
wfctx = ctx.workflow_context
assert isinstance(wfctx, FugueWorkflowContext)
return wfctx

def _get_execution_engine(self, ctx: TaskContext) -> ExecutionEngine:
return self._get_workflow_context(ctx).execution_engine

def _set_result(self, ctx: TaskContext, df: DataFrame) -> None:
self._get_workflow_context(ctx).set_result(id(self), df)


class Create(FugueTask):
@no_type_check
def __init__(
self,
execution_engine: ExecutionEngine,
creator: Any,
schema: Any = None,
params: Any = None,
Expand All @@ -126,21 +129,18 @@ def __init__(
):
self._creator = to_creator(creator, schema)
self._creator._params = ParamDict(params)
self._creator._execution_engine = execution_engine
super().__init__(
execution_engine,
params=params,
input_n=0,
output_n=1,
deterministic=deterministic,
lazy=lazy,
params=params, input_n=0, output_n=1, deterministic=deterministic, lazy=lazy
)

@no_type_check
def execute(self, ctx: TaskContext) -> None:
e = self._get_execution_engine(ctx)
self._creator._execution_engine = e
df = self._creator.create()
df = self.handle_persist(df)
df = self.handle_broadcast(df)
df = self.handle_persist(df, e)
df = self.handle_broadcast(df, e)
self._set_result(ctx, df)
ctx.outputs["_0"] = df


Expand All @@ -149,7 +149,6 @@ class Process(FugueTask):
def __init__(
self,
input_n: int,
execution_engine: ExecutionEngine,
processor: Any,
schema: Any,
params: Any,
Expand All @@ -161,9 +160,7 @@ def __init__(
self._processor = to_processor(processor, schema)
self._processor._params = ParamDict(params)
self._processor._partition_spec = PartitionSpec(pre_partition)
self._processor._execution_engine = execution_engine
super().__init__(
execution_engine,
params=params,
input_n=input_n,
output_n=1,
Expand All @@ -174,12 +171,15 @@ def __init__(

@no_type_check
def execute(self, ctx: TaskContext) -> None:
e = self._get_execution_engine(ctx)
self._processor._execution_engine = e
if self._input_has_key:
df = self._processor.process(DataFrames(ctx.inputs))
else:
df = self._processor.process(DataFrames(ctx.inputs.values()))
df = self.handle_persist(df)
df = self.handle_broadcast(df)
df = self.handle_persist(df, e)
df = self.handle_broadcast(df, e)
self._set_result(ctx, df)
ctx.outputs["_0"] = df


Expand All @@ -188,7 +188,6 @@ class Output(FugueTask):
def __init__(
self,
input_n: int,
execution_engine: ExecutionEngine,
outputter: Any,
params: Any,
pre_partition: Any = None,
Expand All @@ -200,9 +199,7 @@ def __init__(
self._outputter = to_outputter(outputter)
self._outputter._params = ParamDict(params)
self._outputter._partition_spec = PartitionSpec(pre_partition)
self._outputter._execution_engine = execution_engine
super().__init__(
execution_engine,
params=params,
input_n=input_n,
deterministic=deterministic,
Expand All @@ -212,6 +209,7 @@ def __init__(

@no_type_check
def execute(self, ctx: TaskContext) -> None:
self._outputter._execution_engine = self._get_execution_engine(ctx)
if self._input_has_key:
self._outputter.process(DataFrames(ctx.inputs))
else:
Expand Down
Loading

0 comments on commit 9f730cb

Please sign in to comment.