Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
kvnkho authored Jan 12, 2021
2 parents cee35be + 47925b7 commit 81856e9
Show file tree
Hide file tree
Showing 20 changed files with 478 additions and 16 deletions.
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

Fugue is a pure abstraction layer that makes code portable across differing computing frameworks such as Pandas, Spark and Dask.

* :rocket: **Framework-agnostic code**: Write code once in native Python. Fugue makes it runnable on Pandas, Dask or Spark with minimal changes. Logic and code is decoupled from frameworks, even from Fugue itself. Fugue adapts user's code, as well as the underlying computing frameworks.
* :moneybag: **Rapid iterations for big data projects**: Test code on smaller data, then reliably scale to Dask or Spark when ready. This drastically improves project iteration time and saves cluster expense. This lessens the frequency spinning up clusters to test code, and reduces expensive mistakes.
* :wrench: **Friendlier interface for Spark**: Fugue handles some optimizations on Spark, making it easier for big data practitioners to focus on logic. A lot of Fugue users see performance gains in their Spark jobs. Fugue SQL extends Spark SQL to be a programming language.
* :heavy_check_mark: **Highly testable code**: Fugue naturally makes logic more testable because the code is in native Python. Unit tests scale seamlessly from local workflows to distributed computing workflows.
* **Framework-agnostic code**: Write code once in native Python. Fugue makes it runnable on Pandas, Dask or Spark with minimal changes. Logic and code is decoupled from frameworks, even from Fugue itself. Fugue adapts user's code, as well as the underlying computing frameworks.
* **Rapid iterations for big data projects**: Test code on smaller data, then reliably scale to Dask or Spark when ready. This drastically improves project iteration time and saves cluster expense. This lessens the frequency spinning up clusters to test code, and reduces expensive mistakes.
* **Friendlier interface for Spark**: Fugue handles some optimizations on Spark, making it easier for big data practitioners to focus on logic. A lot of Fugue users see performance gains in their Spark jobs. Fugue SQL extends Spark SQL to be a programming language.
* **Highly testable code**: Fugue naturally makes logic more testable because the code is in native Python. Unit tests scale seamlessly from local workflows to distributed computing workflows.

## Who is it for?

Expand All @@ -30,6 +30,7 @@ Fugue is a pure abstraction layer that makes code portable across differing comp
Here is an example Fugue code snippet that illustrates some of the key features of the framework. A fillna function creates a new column named `filled`, which is the same as the column `value` except that the `None` values are filled.

```python
from fugue import FugueWorkflow
from typing import Iterable, Dict, Any, List

# Creating sample data
Expand All @@ -41,13 +42,12 @@ data = [
["B", "2020-01-02", None],
["B", "2020-01-03", 40]
]
schema = "id:str,date:date,value:int"
schema = "id:str,date:date,value:double"

# schema: *, filled:int
def fillna(df:Iterable[Dict[str,Any]],value:int=0) -> Iterable[Dict[str,Any]]:
# schema: *, filled:double
def fillna(df:Iterable[Dict[str,Any]], value:float=0) -> Iterable[Dict[str,Any]]:
for row in df:
for col in cols:
row["filled"] = (row["value"] or value)
row["filled"] = (row["value"] or value)
yield row

with FugueWorkflow() as dag:
Expand Down
6 changes: 6 additions & 0 deletions fugue/execution/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from fugue.dataframe.dataframe import LocalDataFrame
from fugue.dataframe.utils import deserialize_df, serialize_df
from fugue.exceptions import FugueBug
from fugue.rpc import RPCServer, make_rpc_server
from triad.collections import ParamDict, Schema
from triad.collections.fs import FileSystem
from triad.exceptions import InvalidOperationError
Expand Down Expand Up @@ -79,6 +80,7 @@ class ExecutionEngine(ABC):
def __init__(self, conf: Any):
_conf = ParamDict(conf)
self._conf = ParamDict({**FUGUE_DEFAULT_CONF, **_conf})
self._rpc_server = make_rpc_server(self.conf)

@property
def conf(self) -> ParamDict:
Expand All @@ -93,6 +95,10 @@ def conf(self) -> ParamDict:
"""
return self._conf

@property
def rpc_server(self) -> RPCServer:
return self._rpc_server

@property
@abstractmethod
def log(self) -> logging.Logger: # pragma: no cover
Expand Down
6 changes: 5 additions & 1 deletion fugue/extensions/_builtins/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from triad.collections.schema import Schema
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import to_instance, to_type
from fugue.rpc import RPCFuncDict


class RunTransformer(Processor):
Expand All @@ -29,7 +30,10 @@ def process(self, dfs: DataFrames) -> DataFrame:
)
tf._workflow_conf = self.execution_engine.conf
tf._params = self.params.get("params", ParamDict()) # type: ignore
tf._partition_spec = self.partition_spec # type: ignore
tf._partition_spec = self.partition_spec
rpc_funcs = self.params.get_or_throw("rpc_funcs", RPCFuncDict)
if len(rpc_funcs) > 0:
tf._rpc_client = self.execution_engine.rpc_server.make_client(rpc_funcs)
ie = self.params.get("ignore_errors", [])
self._ignore_errors = [to_type(x, Exception) for x in ie]
tf.validate_on_runtime(df)
Expand Down
10 changes: 10 additions & 0 deletions fugue/extensions/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from fugue.dataframe import DataFrame, DataFrames
from fugue.execution.execution_engine import ExecutionEngine
from fugue.extensions._utils import validate_input_schema, validate_partition_spec
from fugue.rpc import RPCClient
from triad.collections import ParamDict, Schema
from triad.utils.convert import get_full_type_path
from triad.utils.hash import to_uuid
Expand Down Expand Up @@ -79,6 +80,15 @@ def cursor(self) -> PartitionCursor:
"""
return self._cursor # type: ignore

@property
def rpc_client(self) -> RPCClient:
"""RPC client to talk to driver, this is for transformers only,
and available on both driver and workers
"""
if "_rpc_client" in self.__dict__:
return self._rpc_client # type: ignore
return RPCClient() # pragma: no cover

@property
def validation_rules(self) -> Dict[str, Any]:
"""Extension input validation rules defined by user"""
Expand Down
3 changes: 3 additions & 0 deletions fugue/rpc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# flake8: noqa
from fugue.rpc.base import RPCClient, RPCServer, make_rpc_server
from fugue.rpc.collections import RPCFuncDict, to_rpc_func_dict
109 changes: 109 additions & 0 deletions fugue/rpc/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from abc import ABC, abstractmethod
from threading import RLock
from typing import Any, Callable, Dict
from uuid import uuid4

from fugue.rpc.collections import RPCFuncDict, to_rpc_func_dict
from triad import ParamDict, assert_or_throw
from triad.utils.convert import to_type
import pickle


class RPCClient(object):
def __call__(self, method: str, value: str) -> str: # pragma: no cover
raise NotImplementedError


class RPCServer(ABC):
def __init__(self, conf: Any):
self._lock = RLock()
self._conf = ParamDict(conf)
self._services: Dict[str, RPCFuncDict] = {}
self._running = 0

@property
def conf(self) -> ParamDict:
return self._conf

@abstractmethod
def make_client(self, methods: Dict[str, Callable[[str], str]]) -> RPCClient:
raise NotImplementedError # pragma: no cover

@abstractmethod
def start_server(self) -> None:
raise NotImplementedError # pragma: no cover

@abstractmethod
def stop_server(self) -> None:
raise NotImplementedError # pragma: no cover

def invoke(self, key: str, method: str, value: str) -> str:
with self._lock:
handler = self._services[key][method]
return handler(value)

def add_methods(self, methods: Dict[str, Callable[[str], str]]) -> str:
with self._lock:
key = "_" + str(uuid4()).split("-")[-1]
self._services[key] = to_rpc_func_dict(methods)
return key

def start(self) -> "RPCServer":
with self._lock:
if self._running == 0:
self.start_server()
self._running += 1
return self

def stop(self) -> None:
with self._lock:
if self._running == 1:
self.stop_server()
self._running -= 1
if self._running < 0:
self._running = 0

def __enter__(self) -> "RPCServer":
with self._lock:
assert_or_throw(self._running, "use `with <instance>.start():` instead")
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.stop()

def __getstate__(self):
raise pickle.PicklingError(f"{self} is not serializable")


class NativeRPCClient(RPCClient):
def __init__(self, server: "NativeRPCServer", key: str):
self._key = key
self._server = server

def __call__(self, method: str, value: str) -> str:
return self._server.invoke(self._key, method, value)

def __getstate__(self):
raise pickle.PicklingError(f"{self} is not serializable")


class NativeRPCServer(RPCServer):
def __init__(self, conf: Any):
super().__init__(conf)

def make_client(self, methods: Dict[str, Callable[[str], str]]) -> RPCClient:
key = self.add_methods(methods)
return NativeRPCClient(self, key)

def start_server(self) -> None:
return

def stop_server(self) -> None:
return


def make_rpc_server(conf: Any) -> RPCServer:
conf = ParamDict(conf)
tp = conf.get_or_none("fugue.rpc.server", str)
t_server = NativeRPCServer if tp is None else to_type(tp, RPCServer)
return t_server(conf) # type: ignore
37 changes: 37 additions & 0 deletions fugue/rpc/collections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import Any, Callable, Iterable, Tuple, Dict
from uuid import uuid4

from triad import IndexedOrderedDict, assert_or_throw


class RPCFuncDict(IndexedOrderedDict[str, Callable[[str], str]]):
def __init__(self, data: Dict[str, Callable[[str], str]]):
if isinstance(data, RPCFuncDict):
super().__init__(data)
self._uuid = data.__uuid__()
else:
super().__init__(self.get_tuples(data))
self._uuid = "" if len(self) == 0 else str(uuid4())
self.set_readonly()

def __uuid__(self) -> str:
return self._uuid

def get_tuples(
self, data: Dict[str, Callable[[str], str]]
) -> Iterable[Tuple[str, Callable[[str], str]]]:
for k, v in sorted([(k, v) for k, v in data.items()], key=lambda p: p[0]):
assert_or_throw(callable(v), ValueError(k, v))
yield k, v

def __copy__(self) -> "RPCFuncDict":
return self

def __deepcopy__(self, memo: Any) -> "RPCFuncDict":
return self


def to_rpc_func_dict(obj: Any) -> RPCFuncDict:
if isinstance(obj, RPCFuncDict):
return obj
return RPCFuncDict(obj)
77 changes: 77 additions & 0 deletions fugue/rpc/flask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from threading import Thread
from typing import Any, Callable, Dict, Optional

import requests
from fugue.rpc.base import RPCClient, RPCServer
from triad.utils.convert import to_timedelta
from werkzeug.serving import make_server

from flask import Flask, request


class FlaskRPCServer(RPCServer):
class _Thread(Thread):
def __init__(self, app: Flask, host: str, port: int):
super().__init__()
self._srv = make_server(host, port, app)
self._ctx = app.app_context()
self._ctx.push()

def run(self) -> None:
self._srv.serve_forever()

def shutdown(self) -> None:
self._srv.shutdown()

def __init__(self, conf: Any):
super().__init__(conf)
self._host = conf.get_or_throw("fugue.rpc.flask_server.host", str)
self._port = conf.get_or_throw("fugue.rpc.flask_server.port", int)
timeout = conf.get_or_none("fugue.rpc.flask_server.timeout", object)
self._timeout_sec = (
-1.0 if timeout is None else to_timedelta(timeout).total_seconds()
)
self._server: Optional[FlaskRPCServer._Thread] = None

def _invoke(self) -> str:
key = request.form.get("key")
method = request.form.get("method")
value = request.form.get("value")
return self.invoke(key, method, value) # type: ignore

def make_client(self, methods: Dict[str, Callable[[str], str]]) -> RPCClient:
key = self.add_methods(methods)
return FlaskRPCClient(
key,
self._host,
self._port,
self._timeout_sec,
)

def start_server(self) -> None:
app = Flask("FlaskRPCServer")
app.route("/invoke", methods=["POST"])(self._invoke)
self._server = FlaskRPCServer._Thread(app, self._host, self._port)
self._server.start()

def stop_server(self) -> None:
if self._server is not None:
self._server.shutdown()
self._server.join()


class FlaskRPCClient(RPCClient):
def __init__(self, key: str, host: str, port: int, timeout_sec: float):
self._url = f"http://{host}:{port}/invoke"
self._timeout_sec = timeout_sec
self._key = key

def __call__(self, method: str, value: str) -> str:
timeout: Any = None if self._timeout_sec <= 0 else self._timeout_sec
res = requests.post(
self._url,
data=dict(key=self._key, method=method, value=value),
timeout=timeout,
)
res.raise_for_status()
return res.text
2 changes: 2 additions & 0 deletions fugue/workflow/_workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ def run(self, spec: WorkflowSpec, conf: Dict[str, Any]) -> None:
self._execution_id = str(uuid4())
self._checkpoint_path = CheckpointPath(self.execution_engine)
self._checkpoint_path.init_temp_path(self._execution_id)
self.execution_engine.rpc_server.start()
super().run(spec, conf)
finally:
self._checkpoint_path.remove_temp_path()
self.execution_engine.rpc_server.stop()
self._execution_id = ""

@property
Expand Down
Loading

0 comments on commit 81856e9

Please sign in to comment.