Skip to content

Commit

Permalink
refactor yield (#169)
Browse files Browse the repository at this point in the history
* refactor yield

* update

* fix notebook test

* update yield test

* update test

* update docs
  • Loading branch information
goodwanghan authored Feb 1, 2021
1 parent 6c1cb14 commit a43cc60
Show file tree
Hide file tree
Showing 24 changed files with 3,770 additions and 3,651 deletions.
2 changes: 1 addition & 1 deletion fugue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from triad.collections.fs import FileSystem

from fugue.collections.partition import PartitionCursor, PartitionSpec
from fugue.collections.yielded import Yielded
from fugue.collections.yielded import Yielded, YieldedFile
from fugue.dataframe.array_dataframe import ArrayDataFrame
from fugue.dataframe.arrow_dataframe import ArrowDataFrame
from fugue.dataframe.dataframe import DataFrame, LocalBoundedDataFrame, LocalDataFrame
Expand Down
1 change: 1 addition & 0 deletions fugue/collections/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# flake8: noqa
from fugue.collections.partition import PartitionSpec, PartitionCursor
from fugue.collections.yielded import Yielded, YieldedFile
52 changes: 42 additions & 10 deletions fugue/collections/yielded.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,59 @@


class Yielded(object):
def __init__(self, file_id: str):
self._path = ""
self._file_id = file_id
"""Yields from :class:`~fugue.workflow.workflow.FugueWorkflow`.
Users shouldn't create this object directly.
:param yid: unique id for determinism
"""

def __init__(self, yid: str):
self._yid = to_uuid(yid)

def __uuid__(self) -> str:
return to_uuid(self._file_id)
"""uuid of the instance"""
return self._yid

@property
def is_set(self) -> bool: # pragma: no cover
"""Whether the value is set. It can be false if the parent workflow
has not been executed.
"""
raise NotImplementedError

def __copy__(self) -> Any: # pragma: no cover
"""``copy`` should have no effect"""
return self

def __deepcopy__(self, memo: Any) -> Any:
"""``deepcopy`` should have no effect"""
return self


class YieldedFile(Yielded):
"""Yielded file from :class:`~fugue.workflow.workflow.FugueWorkflow`.
Users shouldn't create this object directly.
:param yid: unique id for determinism
"""

def __init__(self, yid: str):
super().__init__(yid)
self._path = ""

@property
def is_set(self) -> bool:
return self._path != ""

def set_value(self, path: str) -> None:
"""Set the yielded path after compute
:param path: file path
"""
self._path = path

@property
def path(self) -> str:
"""File path of the yield"""
assert_or_throw(self.is_set, "value is not set")
return self._path

def __copy__(self) -> Any: # pragma: no cover
return self

def __deepcopy__(self, memo: Any) -> Any:
return self
7 changes: 6 additions & 1 deletion fugue/dataframe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
# flake8: noqa
from fugue.dataframe.array_dataframe import ArrayDataFrame
from fugue.dataframe.arrow_dataframe import ArrowDataFrame
from fugue.dataframe.dataframe import DataFrame, LocalBoundedDataFrame, LocalDataFrame
from fugue.dataframe.dataframe import (
DataFrame,
LocalBoundedDataFrame,
LocalDataFrame,
YieldedDataFrame,
)
from fugue.dataframe.dataframe_iterable_dataframe import LocalDataFrameIterableDataFrame
from fugue.dataframe.dataframes import DataFrames
from fugue.dataframe.iterable_dataframe import IterableDataFrame
Expand Down
33 changes: 33 additions & 0 deletions fugue/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pandas as pd
import pyarrow as pa
from fugue.collections.yielded import Yielded
from fugue.exceptions import FugueDataFrameEmptyError, FugueDataFrameOperationError
from triad.collections.dict import ParamDict
from triad.collections.schema import Schema
Expand Down Expand Up @@ -461,6 +462,38 @@ def count(self) -> int:
raise InvalidOperationError("Impossible to count an LocalUnboundedDataFrame")


class YieldedDataFrame(Yielded):
"""Yielded dataframe from :class:`~fugue.workflow.workflow.FugueWorkflow`.
Users shouldn't create this object directly.
:param yid: unique id for determinism
"""

def __init__(self, yid: str):
super().__init__(yid)
self._df: Any = None

@property
def is_set(self) -> bool:
return self._df is not None

def set_value(self, df: DataFrame) -> None:
"""Set the yielded dataframe after compute. Users should not
call it.
:param path: file path
"""
self._df = df

@property
def result(self) -> DataFrame:
"""The yielded dataframe, it will be set after the parent
workflow is computed
"""
assert_or_throw(self.is_set, "value is not set")
return self._df


class _PrettyTable(object):
def __init__(
self, # noqa: C901
Expand Down
19 changes: 19 additions & 0 deletions fugue/execution/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,25 @@ def take(
"""
pass

def convert_yield_dataframe(self, df: DataFrame) -> DataFrame:
"""Convert a yield dataframe to a dataframe that can be used after this
execution engine stops.
:param df: DataFrame
:return: another DataFrame that can be used after this execution engine stops
:Notice:
By default, the output dataframe is the input dataframe. But it should be
overridden if when an engine stops and the input dataframe will become invalid.
For example, if you custom a spark engine where you start and stop the spark
session in this engine's :meth:`~.start_engine` and :meth:`~.stop_engine`, then
the spark dataframe will be invalid. So you may consider converting it to a
local dataframe so it can still exist after the engine stops.
"""
return df

def zip(
self,
df1: DataFrame,
Expand Down
9 changes: 6 additions & 3 deletions fugue/extensions/_builtins/creators.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from fugue.extensions.creator import Creator
from fugue.collections.yielded import Yielded, YieldedFile
from fugue.dataframe import DataFrame
from fugue.collections.yielded import Yielded
from fugue.extensions.creator import Creator


class Load(Creator):
Expand All @@ -18,4 +18,7 @@ def create(self) -> DataFrame:
class LoadYielded(Creator):
def create(self) -> DataFrame:
yielded = self.params.get_or_throw("yielded", Yielded)
return self.execution_engine.load_df(path=yielded.path)
if isinstance(yielded, YieldedFile):
return self.execution_engine.load_df(path=yielded.path)
else:
return self.execution_engine.to_df(yielded.result)
12 changes: 6 additions & 6 deletions fugue/workflow/_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any

from fugue.collections.partition import PartitionSpec
from fugue.collections.yielded import Yielded
from fugue.collections.yielded import YieldedFile
from fugue.constants import FUGUE_CONF_WORKFLOW_CHECKPOINT_PATH
from fugue.dataframe import DataFrame
from fugue.exceptions import FugueWorkflowCompileError, FugueWorkflowRuntimeError
Expand Down Expand Up @@ -32,8 +32,8 @@ def run(self, df: DataFrame, path: "CheckpointPath") -> DataFrame:
return df

@property
def yielded(self) -> Yielded:
raise FugueWorkflowCompileError(f"yield is not allowed for {self}")
def yielded_file(self) -> YieldedFile:
raise FugueWorkflowCompileError(f"yield file is not allowed for {self}")

@property
def is_null(self) -> bool:
Expand Down Expand Up @@ -65,7 +65,7 @@ def __init__(
)
self._yield_func: Any = None
self._file_id = to_uuid(file_id, namespace)
self._yielded = Yielded(self._file_id)
self._yielded = YieldedFile(self._file_id)

def run(self, df: DataFrame, path: "CheckpointPath") -> DataFrame:
fpath = path.get_temp_file(self._file_id, self.permanent)
Expand All @@ -86,7 +86,7 @@ def run(self, df: DataFrame, path: "CheckpointPath") -> DataFrame:
return result

@property
def yielded(self) -> Yielded:
def yielded_file(self) -> YieldedFile:
assert_or_throw(
self.permanent,
FugueWorkflowCompileError(f"yield is not allowed for {self}"),
Expand Down Expand Up @@ -144,7 +144,7 @@ def remove_temp_path(self):
try:
self._fs.removetree(self._temp_path)
except Exception as e: # pragma: no cover
self._log.warn("Unable to remove " + self._temp_path, e)
self._log.info("Unable to remove " + self._temp_path, e)

def get_temp_file(self, file_id: str, permanent: bool) -> str:
path = self._path if permanent else self._temp_path
Expand Down
50 changes: 27 additions & 23 deletions fugue/workflow/_tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from abc import ABC, abstractmethod
from typing import Any, Callable, List, Optional, no_type_check, Iterable
from typing import Any, Callable, Iterable, List, Optional, no_type_check

from adagio.instances import TaskContext
from adagio.specs import InputSpec, OutputSpec, TaskSpec
from fugue.collections.partition import PartitionSpec
from fugue.collections.yielded import Yielded
from fugue.collections.yielded import YieldedFile
from fugue.dataframe import DataFrame, DataFrames
from fugue.dataframe.array_dataframe import ArrayDataFrame
from fugue.exceptions import FugueWorkflowCompileError, FugueWorkflowError
Expand Down Expand Up @@ -62,6 +62,7 @@ def __init__(
self._checkpoint = Checkpoint()
self._broadcast = False
self._dependency_uuid: Any = None
self._yield_dataframe_handler: Any = None

def __uuid__(self) -> str:
# _checkpoint is not part of determinism
Expand Down Expand Up @@ -108,21 +109,24 @@ def has_checkpoint(self) -> bool:
return not self._checkpoint.is_null

@property
def yielded(self) -> Yielded:
return self._checkpoint.yielded

def handle_checkpoint(self, df: DataFrame, ctx: TaskContext) -> DataFrame:
wfctx = self._get_workflow_context(ctx)
return self._checkpoint.run(df, wfctx.checkpoint_path)
def yielded_file(self) -> YieldedFile:
return self._checkpoint.yielded_file

def broadcast(self) -> "FugueTask":
self._broadcast = True
return self

def handle_broadcast(self, df: DataFrame, ctx: TaskContext) -> DataFrame:
if not self._broadcast:
return df
return self._get_execution_engine(ctx).broadcast(df)
def set_yield_dataframe_handler(self, handler: Callable) -> None:
self._yield_dataframe_handler = handler

def set_result(self, ctx: TaskContext, df: DataFrame) -> DataFrame:
df = self._handle_checkpoint(df, ctx)
df = self._handle_broadcast(df, ctx)
if self._yield_dataframe_handler is not None:
out_df = self._get_execution_engine(ctx).convert_yield_dataframe(df)
self._yield_dataframe_handler(out_df)
self._get_workflow_context(ctx).set_result(id(self), df)
return df

def _get_workflow_context(self, ctx: TaskContext) -> FugueWorkflowContext:
wfctx = ctx.workflow_context
Expand All @@ -132,8 +136,14 @@ def _get_workflow_context(self, ctx: TaskContext) -> FugueWorkflowContext:
def _get_execution_engine(self, ctx: TaskContext) -> ExecutionEngine:
return self._get_workflow_context(ctx).execution_engine

def _set_result(self, ctx: TaskContext, df: DataFrame) -> None:
self._get_workflow_context(ctx).set_result(id(self), df)
def _handle_checkpoint(self, df: DataFrame, ctx: TaskContext) -> DataFrame:
wfctx = self._get_workflow_context(ctx)
return self._checkpoint.run(df, wfctx.checkpoint_path)

def _handle_broadcast(self, df: DataFrame, ctx: TaskContext) -> DataFrame:
if not self._broadcast:
return df
return self._get_execution_engine(ctx).broadcast(df)

def _get_dependency_uuid(self) -> Any:
# TODO: this should be a part of adagio!!
Expand Down Expand Up @@ -192,9 +202,7 @@ def __init__(
def execute(self, ctx: TaskContext) -> None:
e = self._get_execution_engine(ctx)
df = e.to_df(self._data, self._schema, self._metadata)
df = self.handle_checkpoint(df, ctx)
df = self.handle_broadcast(df, ctx)
self._set_result(ctx, df)
df = self.set_result(ctx, df)
ctx.outputs["_0"] = df

def _validate_data(
Expand Down Expand Up @@ -236,9 +244,7 @@ def execute(self, ctx: TaskContext) -> None:
e = self._get_execution_engine(ctx)
self._creator._execution_engine = e
df = self._creator.create()
df = self.handle_checkpoint(df, ctx)
df = self.handle_broadcast(df, ctx)
self._set_result(ctx, df)
df = self.set_result(ctx, df)
ctx.outputs["_0"] = df


Expand Down Expand Up @@ -287,9 +293,7 @@ def execute(self, ctx: TaskContext) -> None:
inputs = DataFrames(ctx.inputs.values())
self._processor.validate_on_runtime(inputs)
df = self._processor.process(inputs)
df = self.handle_checkpoint(df, ctx)
df = self.handle_broadcast(df, ctx)
self._set_result(ctx, df)
df = self.set_result(ctx, df)
ctx.outputs["_0"] = df


Expand Down
Loading

0 comments on commit a43cc60

Please sign in to comment.