Skip to content

Commit

Permalink
Merge pull request #346 from kvnkho/master
Browse files Browse the repository at this point in the history
Enforcing parquet files for Fugue transform
  • Loading branch information
kvnkho authored Aug 2, 2022
2 parents 4150489 + be7517f commit bef421f
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 11 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,6 @@ dask-worker-space
# spark
spark-warehourse
=*

# DS_Store
*.DS_Store
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

## Ways to Contribute

We're happy you're looking to contribute. We recommend you join the [Slack channel](https://join.slack.com/t/fugue-project/shared_invite/zt-jl0pcahu-KdlSOgi~fP50TZWmNxdWYQ) to discuss how to get involved, or how to use Fugue. There are many ways to help this project.
We're happy you're looking to contribute. We recommend you join the [Slack channel](slack.fugue.ai) to discuss how to get involved, or how to use Fugue. There are many ways to help this project.

1. **Use Fugue in your project** - Having more users helps us come across more use cases and make a better framework. We're always happy to help you get started using Fugue for your company use case or personal project. Feel free to message us on [Slack](https://join.slack.com/t/fugue-project/shared_invite/zt-jl0pcahu-KdlSOgi~fP50TZWmNxdWYQ).
1. **Use Fugue in your project** - Having more users helps us come across more use cases and make a better framework. We're always happy to help you get started using Fugue for your company use case or personal project. Feel free to message us on [Slack](slack.fugue.ai).

2. **Give us feedback/Post issues** - If you have ideas of how to make Fugue better, or have general questions about Fugue, we'd be happy to hear them. Hearing unclear parts helps us write better documentation. Posting issues helps us fix bugs or make new features.

Expand Down
45 changes: 36 additions & 9 deletions fugue/interfaceless.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,28 @@
from fugue.constants import FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT
from fugue.dataframe import DataFrame
from fugue.workflow import FugueWorkflow
from fugue.exceptions import FugueInterfacelessError
from triad.utils.assertion import assert_or_throw


def _check_valid_input(df: Any, save_path: Optional[str]) -> None:
# Check valid input
if isinstance(df, str):
assert_or_throw(
(".csv" not in df) and (".json" not in df),
FugueInterfacelessError(
"""Fugue transform can only load parquet file paths.
Csv and json are disallowed"""
),
)
if save_path:
assert_or_throw(
(".csv" not in save_path) and (".json" not in save_path),
FugueInterfacelessError(
"""Fugue transform can only load parquet file paths.
Csv and json are disallowed"""
),
)


def transform(
Expand Down Expand Up @@ -33,7 +55,7 @@ def transform(
Please read |TransformerTutorial|
:param df: |DataFrameLikeObject| or :class:`~fugue.workflow.yielded.Yielded`
or a path string
or a path string to a parquet file
:param using: transformer-like object, can't be a string expression
:param schema: |SchemaLikeObject|, defaults to None. The transformer
will be able to access this value from
Expand Down Expand Up @@ -94,9 +116,8 @@ def transform(
* When `save_path` is not None and `checkpoint` is True, then the output will
be saved into `save_path`. The return will be the dataframe from `save_path`
It is the best practice to only use parquet file in `df` and `save_path`.
Csv and other file formats may need additional parameters to read and save,
but this function does not support extra parameters for IO.
This function can only take parquet file paths in `df` and `save_path`.
Csv and other file formats are disallowed.
The checkpoint here is NOT deterministic, so re-run will generate new
checkpoints.
Expand All @@ -105,9 +126,11 @@ def transform(
deterministic checkpoints, please use
:class:`~fugue.workflow.workflow.FugueWorkflow`.
"""
_check_valid_input(df, save_path)

dag = FugueWorkflow(conf={FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT: 0})
if isinstance(df, str):
src = dag.load(df)
src = dag.load(df, fmt="parquet")
else:
src = dag.df(df)
tdf = src.transform(
Expand All @@ -133,12 +156,14 @@ def _no_op_processor(df: DataFrame) -> DataFrame:
"result", as_local=as_local
)
else:
tdf.save_and_use(save_path).yield_dataframe_as("result", as_local=as_local)
tdf.save_and_use(save_path, fmt="parquet").yield_dataframe_as(
"result", as_local=as_local
)
else:
if save_path is None:
tdf.yield_dataframe_as("result", as_local=as_local)
else:
tdf.save(save_path)
tdf.save(save_path, fmt="parquet")
dag.run(engine, conf=engine_conf)
if checkpoint:
result = dag.yields["result"].result # type:ignore
Expand Down Expand Up @@ -173,7 +198,7 @@ def out_transform(
Please read |TransformerTutorial|
:param df: |DataFrameLikeObject| or :class:`~fugue.workflow.yielded.Yielded`
or a path string
or a path string to a parquet file
:param using: transformer-like object, can't be a string expression
:param params: |ParamsLikeObject| to run the processor, defaults to None.
The transformer will be able to access this value from
Expand All @@ -192,13 +217,15 @@ def out_transform(
:param engine_conf: |ParamsLikeObject|, defaults to None
.. note::
This function can only take parquet file paths in `df`. Csv and other file
formats are disallowed.
This transformation is guaranteed to execute immediately (eager)
and return nothing
"""
dag = FugueWorkflow(conf={FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT: 0})
if isinstance(df, str):
src = dag.load(df)
src = dag.load(df, fmt="parquet")
else:
src = dag.df(df)
src.out_transform(
Expand Down
39 changes: 39 additions & 0 deletions tests/fugue/test_interfaceless.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
make_execution_engine,
)
from fugue.constants import FUGUE_CONF_WORKFLOW_CHECKPOINT_PATH
from fugue.exceptions import FugueInterfacelessError
from pytest import raises


def test_transform():
Expand Down Expand Up @@ -91,6 +93,12 @@ def f(df: pd.DataFrame) -> pd.DataFrame:
result = transform(fp, f, force_output_fugue_dataframe=True)
assert result.as_array(type_safe=True) == [[2, 1]]

with raises(FugueInterfacelessError):
transform("t.csv", f)

with raises(FugueInterfacelessError):
transform("t.json", f)


def test_transform_to_file(tmpdir):
fp = os.path.join(tmpdir, "t.parquet")
Expand Down Expand Up @@ -134,6 +142,37 @@ def f(df: pd.DataFrame) -> pd.DataFrame:
assert pd.read_parquet(fp).values.tolist() == [[2, 1]]
os.remove(fp)

# test that parquet format is used for saving when
# no file extension is provided
fp = os.path.join(tmpdir, "test")
transform(
tdf,
f,
save_path=fp,
engine=engine,
)
loaded = pd.read_parquet(fp)
assert pd.read_parquet(fp).values.tolist() == [[2, 1]]
os.remove(fp)

# catch invalid file paths
with raises(FugueInterfacelessError):
transform(
tdf,
f,
force_output_fugue_dataframe=True,
save_path="f.csv",
engine=engine,
)
with raises(FugueInterfacelessError):
transform(
tdf,
f,
force_output_fugue_dataframe=True,
save_path="f.json",
engine=engine,
)


def test_out_transform(tmpdir):
pdf = pd.DataFrame([[1, 10], [0, 0], [1, 1], [0, 20]], columns=["a", "b"])
Expand Down

0 comments on commit bef421f

Please sign in to comment.