Skip to content

Commit

Permalink
rpc phase 3, higher level api (#145)
Browse files Browse the repository at this point in the history
* rpc

* fix determinism

* clean

* update

* rpc_handler

* rpc_handler

* refactor rpcclient interface

* add comment

* update doc

* improve dag df determinism design

* rpc phase 3, higher level api

* interfaceless for rpc calls

* callback fugue sql

* add more test
  • Loading branch information
goodwanghan authored Jan 15, 2021
1 parent 9be82cb commit 98a22c0
Show file tree
Hide file tree
Showing 20 changed files with 7,288 additions and 7,060 deletions.
11 changes: 11 additions & 0 deletions fugue/_utils/interfaceless.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ def _parse_param( # noqa: C901
if param is not None and param.kind == param.VAR_KEYWORD:
return _KeywordParam(param)
return _OtherParam(param) if none_as_other else _NoneParam(param)
if (
annotation is Callable
or annotation is callable
or str(annotation).startswith("typing.Callable")
):
return _CallableParam(param)
if annotation is to_type("fugue.execution.ExecutionEngine"):
# to prevent cyclic import
return _ExecutionEngineParam(param)
Expand Down Expand Up @@ -247,6 +253,11 @@ def __repr__(self) -> str:
return str(self.annotation)


class _CallableParam(_FuncParam):
def __init__(self, param: Optional[inspect.Parameter]):
super().__init__(param, "Callable", "f")


class _ExecutionEngineParam(_FuncParam):
def __init__(self, param: Optional[inspect.Parameter]):
super().__init__(param, "ExecutionEngine", "e")
Expand Down
4 changes: 4 additions & 0 deletions fugue/extensions/_builtins/outputters.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from fugue.extensions.outputter import Outputter
from fugue.extensions.transformer.convert import _to_output_transformer
from fugue.extensions.transformer.transformer import CoTransformer, Transformer
from fugue.rpc import EmptyRPCHandler, to_rpc_handler
from triad.collections.dict import ParamDict
from triad.collections.schema import Schema
from triad.utils.assertion import assert_or_throw
Expand Down Expand Up @@ -105,6 +106,9 @@ def process(self, dfs: DataFrames) -> None:
tf._workflow_conf = self.execution_engine.conf
tf._params = self.params.get("params", ParamDict()) # type: ignore
tf._partition_spec = self.partition_spec # type: ignore
rpc_handler = to_rpc_handler(self.params.get_or_throw("rpc_handler", object))
if not isinstance(rpc_handler, EmptyRPCHandler):
tf._rpc_client = self.execution_engine.rpc_server.make_client(rpc_handler)
ie = self.params.get("ignore_errors", [])
self._ignore_errors = [to_type(x, Exception) for x in ie]
tf.validate_on_runtime(df)
Expand Down
4 changes: 2 additions & 2 deletions fugue/extensions/_builtins/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from triad.collections.schema import Schema
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import to_instance, to_type
from fugue.rpc import to_rpc_handler, RPCEmptyHandler
from fugue.rpc import to_rpc_handler, EmptyRPCHandler


class RunTransformer(Processor):
Expand All @@ -32,7 +32,7 @@ def process(self, dfs: DataFrames) -> DataFrame:
tf._params = self.params.get("params", ParamDict()) # type: ignore
tf._partition_spec = self.partition_spec
rpc_handler = to_rpc_handler(self.params.get_or_throw("rpc_handler", object))
if not isinstance(rpc_handler, RPCEmptyHandler):
if not isinstance(rpc_handler, EmptyRPCHandler):
tf._rpc_client = self.execution_engine.rpc_server.make_client(rpc_handler)
ie = self.params.get("ignore_errors", [])
self._ignore_errors = [to_type(x, Exception) for x in ie]
Expand Down
46 changes: 31 additions & 15 deletions fugue/extensions/transformer/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,13 @@ def get_output_schema(self, df: DataFrame) -> Any:
def validation_rules(self) -> Dict[str, Any]:
return self._validation_rules # type: ignore

@no_type_check
def transform(self, df: LocalDataFrame) -> LocalDataFrame:
return self._wrapper.run( # type: ignore
[df], self.params, ignore_unknown=False, output_schema=self.output_schema
args = [df]
if self._has_callback:
args.append(self.rpc_client)
return self._wrapper.run(
args, self.params, ignore_unknown=False, output_schema=self.output_schema
)

def __call__(self, *args: Any, **kwargs: Any) -> Any:
Expand Down Expand Up @@ -146,20 +150,23 @@ def from_func(
validation_rules.update(parse_validation_rules_from_comment(func))
assert_arg_not_none(schema, "schema")
tr = _FuncAsTransformer()
tr._wrapper = FunctionWrapper(func, "^[lsp]x*z?$", "^[lspq]$") # type: ignore
tr._wrapper = FunctionWrapper(func, "^[lsp]f?x*z?$", "^[lspq]$") # type: ignore
tr._output_schema_arg = schema # type: ignore
tr._validation_rules = validation_rules # type: ignore
tr._has_callback = "f" in tr._wrapper.input_code # type: ignore
return tr


class _FuncAsOutputTransformer(_FuncAsTransformer):
def get_output_schema(self, df: DataFrame) -> Any:
return OUTPUT_TRANSFORMER_DUMMY_SCHEMA

@no_type_check
def transform(self, df: LocalDataFrame) -> LocalDataFrame:
self._wrapper.run( # type: ignore
[df], self.params, ignore_unknown=False, output=False
)
args = [df]
if self._has_callback:
args.append(self.rpc_client)
self._wrapper.run(args, self.params, ignore_unknown=False, output=False)
return ArrayDataFrame([], OUTPUT_TRANSFORMER_DUMMY_SCHEMA)

@staticmethod
Expand All @@ -169,9 +176,12 @@ def from_func(
assert_or_throw(schema is None, "schema must be None for output transformers")
validation_rules.update(parse_validation_rules_from_comment(func))
tr = _FuncAsOutputTransformer()
tr._wrapper = FunctionWrapper(func, "^[lsp]x*z?$", "^[lspnq]$") # type: ignore
tr._wrapper = FunctionWrapper( # type: ignore
func, "^[lsp]f?x*z?$", "^[lspnq]$"
)
tr._output_schema_arg = None # type: ignore
tr._validation_rules = validation_rules # type: ignore
tr._has_callback = "f" in tr._wrapper.input_code # type: ignore
return tr


Expand All @@ -185,16 +195,17 @@ def validation_rules(self) -> ParamDict:

@no_type_check
def transform(self, dfs: DataFrames) -> LocalDataFrame:
cb: List[Any] = [self.rpc_client] if self._has_callback else []
if self._dfs_input: # function has DataFrames input
return self._wrapper.run( # type: ignore
[dfs],
[dfs] + cb,
self.params,
ignore_unknown=False,
output_schema=self.output_schema,
)
if not dfs.has_key: # input does not have key
return self._wrapper.run( # type: ignore
list(dfs.values()),
list(dfs.values()) + cb,
self.params,
ignore_unknown=False,
output_schema=self.output_schema,
Expand All @@ -203,7 +214,7 @@ def transform(self, dfs: DataFrames) -> LocalDataFrame:
p = dict(dfs)
p.update(self.params)
return self._wrapper.run( # type: ignore
[], p, ignore_unknown=False, output_schema=self.output_schema
[] + cb, p, ignore_unknown=False, output_schema=self.output_schema
)

def __call__(self, *args: Any, **kwargs: Any) -> Any:
Expand Down Expand Up @@ -250,11 +261,12 @@ def from_func(
assert_arg_not_none(schema, "schema")
tr = _FuncAsCoTransformer()
tr._wrapper = FunctionWrapper( # type: ignore
func, "^(c|[lsp]+)x*z?$", "^[lspq]$"
func, "^(c|[lsp]+)f?x*z?$", "^[lspq]$"
)
tr._dfs_input = tr._wrapper.input_code[0] == "c" # type: ignore
tr._output_schema_arg = schema # type: ignore
tr._validation_rules = {} # type: ignore
tr._has_callback = "f" in tr._wrapper.input_code # type: ignore
return tr


Expand All @@ -264,24 +276,27 @@ def get_output_schema(self, dfs: DataFrames) -> Any:

@no_type_check
def transform(self, dfs: DataFrames) -> LocalDataFrame:
cb: List[Any] = [self.rpc_client] if self._has_callback else []
if self._dfs_input: # function has DataFrames input
self._wrapper.run( # type: ignore
[dfs],
[dfs] + cb,
self.params,
ignore_unknown=False,
output=False,
)
elif not dfs.has_key: # input does not have key
self._wrapper.run( # type: ignore
list(dfs.values()),
list(dfs.values()) + cb,
self.params,
ignore_unknown=False,
output=False,
)
else: # input DataFrames has key
p = dict(dfs)
p.update(self.params)
self._wrapper.run([], p, ignore_unknown=False, output=False) # type: ignore
self._wrapper.run(
[] + cb, p, ignore_unknown=False, output=False # type: ignore
)
return ArrayDataFrame([], OUTPUT_TRANSFORMER_DUMMY_SCHEMA)

@staticmethod
Expand All @@ -296,11 +311,12 @@ def from_func(

tr = _FuncAsOutputCoTransformer()
tr._wrapper = FunctionWrapper( # type: ignore
func, "^(c|[lsp]+)x*z?$", "^[lspnq]$"
func, "^(c|[lsp]+)f?x*z?$", "^[lspnq]$"
)
tr._dfs_input = tr._wrapper.input_code[0] == "c" # type: ignore
tr._output_schema_arg = None # type: ignore
tr._validation_rules = {} # type: ignore
tr._has_callback = "f" in tr._wrapper.input_code # type: ignore
return tr


Expand Down
2 changes: 1 addition & 1 deletion fugue/rpc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# flake8: noqa
from fugue.rpc.base import (
RPCClient,
RPCEmptyHandler,
EmptyRPCHandler,
RPCFunc,
RPCHandler,
RPCServer,
Expand Down
6 changes: 3 additions & 3 deletions fugue/rpc/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __deepcopy__(self, memo: Any) -> "RPCHandler":
return self


class RPCEmptyHandler(RPCHandler):
class EmptyRPCHandler(RPCHandler):
"""The class representing empty :class:`~.RPCHandler`"""

def __init__(self):
Expand Down Expand Up @@ -257,14 +257,14 @@ def __call__(self, *args: Any, **kwargs: Any) -> Any:
def to_rpc_handler(obj: Any) -> RPCHandler:
"""Convert object to :class:`~.RPCHandler`. If the object is already
``RPCHandler``, then the original instance will be returned.
If the object is ``None`` then :class:`~.RPCEmptyHandler` will be returned.
If the object is ``None`` then :class:`~.EmptyRPCHandler` will be returned.
If the object is a python function then :class:`~.RPCFunc` will be returned.
:param obj: |RPCHandlerLikeObject|
:return: the RPC handler
"""
if obj is None:
return RPCEmptyHandler()
return EmptyRPCHandler()
if isinstance(obj, RPCHandler):
return obj
if callable(obj):
Expand Down
24 changes: 12 additions & 12 deletions fugue/workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def transform(
params: Any = None,
pre_partition: Any = None,
ignore_errors: List[Any] = _DEFAULT_IGNORE_ERRORS,
rpc_handler: Any = None,
callback: Any = None,
) -> TDF:
"""Transform this dataframe using transformer. It's a wrapper of
:meth:`fugue.workflow.workflow.FugueWorkflow.transform`
Expand All @@ -312,7 +312,7 @@ def transform(
:meth:`~.partition` and then call :meth:`~.transform` without this parameter
:param ignore_errors: list of exception types the transformer can ignore,
defaults to empty list
:param rpc_handler: |RPCHandlerLikeObject|, defaults to None
:param callback: |RPCHandlerLikeObject|, defaults to None
:return: the transformed dataframe
:rtype: :class:`~.WorkflowDataFrame`
Expand All @@ -335,7 +335,7 @@ def transform(
params=params,
pre_partition=pre_partition,
ignore_errors=ignore_errors,
rpc_handler=rpc_handler,
callback=callback,
)
return self._to_self_type(df)

Expand All @@ -345,7 +345,7 @@ def out_transform(
params: Any = None,
pre_partition: Any = None,
ignore_errors: List[Any] = _DEFAULT_IGNORE_ERRORS,
rpc_handler: Any = None,
callback: Any = None,
) -> None:
"""Transform this dataframe using transformer. It's a wrapper of
:meth:`fugue.workflow.workflow.FugueWorkflow.out_transform`
Expand All @@ -362,7 +362,7 @@ def out_transform(
:meth:`~.partition` and then call :meth:`~.transform` without this parameter
:param ignore_errors: list of exception types the transformer can ignore,
defaults to empty list
:param rpc_handler: |RPCHandlerLikeObject|, defaults to None
:param callback: |RPCHandlerLikeObject|, defaults to None
:Notice:
Expand All @@ -382,7 +382,7 @@ def out_transform(
params=params,
pre_partition=pre_partition,
ignore_errors=ignore_errors,
rpc_handler=rpc_handler,
callback=callback,
)

def join(self: TDF, *dfs: Any, how: str, on: Optional[Iterable[str]] = None) -> TDF:
Expand Down Expand Up @@ -1578,7 +1578,7 @@ def transform(
params: Any = None,
pre_partition: Any = None,
ignore_errors: List[Any] = _DEFAULT_IGNORE_ERRORS,
rpc_handler: Any = None,
callback: Any = None,
) -> WorkflowDataFrame:
"""Transform dataframes using transformer.
Expand All @@ -1598,7 +1598,7 @@ def transform(
:meth:`~.partition` and then call :meth:`~.transform` without this parameter
:param ignore_errors: list of exception types the transformer can ignore,
defaults to empty list
:param rpc_handler: |RPCHandlerLikeObject|, defaults to None
:param callback: |RPCHandlerLikeObject|, defaults to None
:return: the transformed dataframe
:Notice:
Expand Down Expand Up @@ -1626,7 +1626,7 @@ def transform(
transformer=tf,
ignore_errors=ignore_errors,
params=params,
rpc_handler=to_rpc_handler(rpc_handler),
rpc_handler=to_rpc_handler(callback),
),
pre_partition=pre_partition,
)
Expand All @@ -1638,7 +1638,7 @@ def out_transform(
params: Any = None,
pre_partition: Any = None,
ignore_errors: List[Any] = _DEFAULT_IGNORE_ERRORS,
rpc_handler: Any = None,
callback: Any = None,
) -> None:
"""Transform dataframes using transformer, it materializes the execution
immediately and returns nothing
Expand All @@ -1660,7 +1660,7 @@ def out_transform(
parameter
:param ignore_errors: list of exception types the transformer can ignore,
defaults to empty list
:param rpc_handler: |RPCHandlerLikeObject|, defaults to None
:param callback: |RPCHandlerLikeObject|, defaults to None
:Notice:
Expand All @@ -1686,7 +1686,7 @@ def out_transform(
transformer=tf,
ignore_errors=ignore_errors,
params=params,
rpc_handler=to_rpc_handler(rpc_handler),
rpc_handler=to_rpc_handler(callback),
),
pre_partition=pre_partition,
)
Expand Down
5 changes: 3 additions & 2 deletions fugue_sql/_antlr/fugue_sql.g4
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ fugueEngineSpecificQueryTask:
;

fugueTransformTask:
TRANSFORM (dfs=fugueDataFrames)? (partition=fuguePrepartition)? params=fugueSingleOutputExtensionCommonWild
TRANSFORM (dfs=fugueDataFrames)? (partition=fuguePrepartition)? params=fugueSingleOutputExtensionCommonWild (CALLBACK callback=fugueExtension)?
;

fugueProcessTask:
Expand Down Expand Up @@ -197,7 +197,7 @@ fugueSaveTask:
;

fugueOutputTransformTask:
OUTTRANSFORM (dfs=fugueDataFrames)? (partition=fuguePrepartition)? USING using=fugueExtension (params=fugueParams)?
OUTTRANSFORM (dfs=fugueDataFrames)? (partition=fuguePrepartition)? USING using=fugueExtension (params=fugueParams)? (CALLBACK callback=fugueExtension)?
;

fugueModuleTask:
Expand Down Expand Up @@ -1893,6 +1893,7 @@ SAMPLE: 'SAMPLE';
SEED: 'SEED';

SUB: 'SUB';
CALLBACK: 'CALLBACK';

//================================
// End of the Fugue keywords list
Expand Down
Loading

0 comments on commit 98a22c0

Please sign in to comment.