Skip to content

Commit

Permalink
Improve comments to be LLM friendly, improve Spark compatibility (#511)
Browse files Browse the repository at this point in the history
* Improve comments to be LLM friendly

* update

* update

* update

* update

* update

* update

* Switch method for Spark df to arrow

* update

* update

* update

* update

* update
  • Loading branch information
goodwanghan authored Oct 8, 2023
1 parent c14691b commit c4ecb27
Show file tree
Hide file tree
Showing 16 changed files with 135 additions and 49 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/test_all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8]
python-version: [3.8, "3.10"]

steps:
- uses: actions/checkout@v2
Expand All @@ -36,10 +36,12 @@ jobs:
- name: Install dependencies
run: make devenv
- name: Lint
if: matrix.python-version == '3.10'
run: make lint
- name: Test
run: make test
- name: "Upload coverage to Codecov"
if: matrix.python-version == '3.10'
uses: codecov/codecov-action@v3
with:
fail_ci_if_error: false
Expand All @@ -49,7 +51,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.9, "3.10"]
python-version: [3.9]

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ Feel free to message us on [Slack](http://slack.fugue.ai). We also have [contrib
* [How LyftLearn Democratizes Distributed Compute through Kubernetes Spark and Fugue](https://eng.lyft.com/how-lyftlearn-democratizes-distributed-compute-through-kubernetes-spark-and-fugue-c0875b97c3d9)
* [Clobotics - Large Scale Image Processing with Spark through Fugue](https://medium.com/fugue-project/large-scale-image-processing-with-spark-through-fugue-e510b9813da8)
* [Architecture for a data lake REST API using Delta Lake, Fugue & Spark (article by bitsofinfo)](https://bitsofinfo.wordpress.com/2023/08/14/data-lake-rest-api-delta-lake-fugue-spark)

### Mentioned Uses

* [Productionizing Data Science at Interos, Inc. (LinkedIn post by Anthony Holten)](https://www.linkedin.com/posts/anthony-holten_pandas-spark-dask-activity-7022628193983459328-QvcF)
Expand Down
47 changes: 33 additions & 14 deletions fugue/dataframe/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@

@fugue_plugin
def is_df(df: Any) -> bool:
"""Whether ``df`` is a DataFrame like object"""
"""Whether the input object is any type of DataFrame"""
return isinstance(df, DataFrame)


def get_native_as_df(df: AnyDataFrame) -> AnyDataFrame:
"""Return the dataframe form of the input ``df``.
"""Return the dataframe form of any dataframe.
If ``df`` is a :class:`~.DataFrame`, then call the
:meth:`~.DataFrame.native_as_df`, otherwise, it depends on whether there is
a correspondent function handling it.
Expand All @@ -30,30 +30,49 @@ def get_native_as_df(df: AnyDataFrame) -> AnyDataFrame:

@fugue_plugin
def get_schema(df: AnyDataFrame) -> Schema:
"""Get the schema of the ``df``
"""The generic function to get the schema of any dataframe
:param df: the object that can be recognized as a dataframe by Fugue
:return: the Schema object
.. admonition:: Examples
.. code-block:: python
import fugue.api as fa
import pandas as pd
df = pd.DataFrame([[0,1],[2,3]], columns=["a","b"])
fa.get_schema(df) # == Schema("a:long,b:long")
.. related_topics
How to get schema of any dataframe using Fugue?
"""
return as_fugue_df(df).schema


@fugue_plugin
def as_pandas(df: AnyDataFrame) -> pd.DataFrame:
"""Convert ``df`` to a Pandas DataFrame
"""The generic function to convert any dataframe to a Pandas DataFrame
:param df: the object that can be recognized as a dataframe by Fugue
:return: the Pandas DataFrame
.. related_topics
How to convert any dataframe to a pandas dataframe?
"""
return as_fugue_df(df).as_pandas()


@fugue_plugin
def as_arrow(df: AnyDataFrame) -> pa.Table:
"""Convert ``df`` to a PyArrow Table
"""The generic function to convert any dataframe to a PyArrow Table
:param df: the object that can be recognized as a dataframe by Fugue
:return: the PyArrow Table
.. related_topics
How to convert any dataframe to a pyarrow dataframe?
"""
return as_fugue_df(df).as_arrow()

Expand All @@ -62,7 +81,7 @@ def as_arrow(df: AnyDataFrame) -> pa.Table:
def as_array(
df: AnyDataFrame, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]: # pragma: no cover
"""Convert df to 2-dimensional native python array
"""The generic function to convert any dataframe to a 2-dimensional python array
:param df: the object that can be recognized as a dataframe by Fugue
:param columns: columns to extract, defaults to None
Expand All @@ -81,7 +100,7 @@ def as_array(
def as_array_iterable(
df: AnyDataFrame, columns: Optional[List[str]] = None, type_safe: bool = False
) -> Iterable[Any]: # pragma: no cover
"""Convert df to iterable of native python arrays
"""The generic function to convert any dataframe to iterable of python arrays
:param df: the object that can be recognized as a dataframe by Fugue
:param columns: columns to extract, defaults to None
Expand All @@ -101,7 +120,7 @@ def as_array_iterable(
def as_dict_iterable(
df: AnyDataFrame, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
"""Convert df to iterable of native python dicts
"""Convert any dataframe to iterable of native python dicts
:param df: the object that can be recognized as a dataframe by Fugue
:param columns: columns to extract, defaults to None
Expand All @@ -116,7 +135,7 @@ def as_dict_iterable(

@fugue_plugin
def peek_array(df: AnyDataFrame) -> List[Any]:
"""Peek the first row of the dataframe as an array
"""Peek the first row of any dataframe as an array
:param df: the object that can be recognized as a dataframe by Fugue
:return: the first row as an array
Expand All @@ -126,7 +145,7 @@ def peek_array(df: AnyDataFrame) -> List[Any]:

@fugue_plugin
def peek_dict(df: AnyDataFrame) -> Dict[str, Any]:
"""Peek the first row of the dataframe as a array
"""Peek the first row of any dataframe as a array
:param df: the object that can be recognized as a dataframe by Fugue
:return: the first row as a dict
Expand All @@ -141,7 +160,7 @@ def head(
columns: Optional[List[str]] = None,
as_fugue: bool = False,
) -> AnyDataFrame:
"""Get first n rows of the dataframe as a new local bounded dataframe
"""Get first n rows of any dataframe as a new local bounded dataframe
:param n: number of rows
:param columns: selected columns, defaults to None (all columns)
Expand All @@ -160,7 +179,7 @@ def head(
def alter_columns(
df: AnyDataFrame, columns: Any, as_fugue: bool = False
) -> AnyDataFrame:
"""Change column types
"""Change column data types of any dataframe
:param df: the object that can be recognized as a dataframe by Fugue
:param columns: |SchemaLikeObject|,
Expand All @@ -178,7 +197,7 @@ def alter_columns(
def drop_columns(
df: AnyDataFrame, columns: List[str], as_fugue: bool = False
) -> AnyDataFrame:
"""Drop certain columns and return a new dataframe
"""Drop certain columns of any dataframe
:param df: the object that can be recognized as a dataframe by Fugue
:param columns: columns to drop
Expand All @@ -194,7 +213,7 @@ def drop_columns(
def select_columns(
df: AnyDataFrame, columns: List[Any], as_fugue: bool = False
) -> AnyDataFrame:
"""Select certain columns and return a new dataframe
"""Select certain columns of any dataframe and return a new dataframe
:param df: the object that can be recognized as a dataframe by Fugue
:param columns: columns to return
Expand Down
6 changes: 4 additions & 2 deletions fugue/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,19 @@ def _df_eq(
), f"schema mismatch {df.schema.pa_schema}, {df2.schema.pa_schema}"
if not check_content:
return True
cols: Any = df1.columns
if no_pandas:
dd1 = [[x.__repr__()] for x in df1.as_array_iterable(type_safe=True)]
dd2 = [[x.__repr__()] for x in df2.as_array_iterable(type_safe=True)]
d1 = pd.DataFrame(dd1, columns=["data"])
d2 = pd.DataFrame(dd2, columns=["data"])
cols = ["data"]
else:
d1 = df1.as_pandas()
d2 = df2.as_pandas()
if not check_order:
d1 = d1.sort_values(df1.columns)
d2 = d2.sort_values(df1.columns)
d1 = d1.sort_values(cols)
d2 = d2.sort_values(cols)
d1 = d1.reset_index(drop=True)
d2 = d2.reset_index(drop=True)
pd.testing.assert_frame_equal(
Expand Down
2 changes: 1 addition & 1 deletion fugue/execution/native_execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def map_dataframe(
if (
isinstance(output_df, PandasDataFrame)
and output_df.schema != output_schema
):
): # pragma: no cover
output_df = PandasDataFrame(output_df.native, output_schema)
assert_or_throw(
output_df.schema == output_schema,
Expand Down
5 changes: 5 additions & 0 deletions fugue_dask/_io.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import fsspec
import fs as pfs
import pandas as pd
from dask import dataframe as dd
Expand Down Expand Up @@ -96,6 +97,8 @@ def _load_parquet(


def _save_csv(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None:
fs, path = fsspec.core.url_to_fs(p.uri)
fs.makedirs(path, exist_ok=True)
df.native.to_csv(
pfs.path.combine(p.uri, "*.csv"), **{"index": False, "header": False, **kwargs}
)
Expand Down Expand Up @@ -145,6 +148,8 @@ def _load_csv( # noqa: C901


def _save_json(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None:
fs, path = fsspec.core.url_to_fs(p.uri)
fs.makedirs(path, exist_ok=True)
df.native.to_json(pfs.path.combine(p.uri, "*.json"), **kwargs)


Expand Down
39 changes: 32 additions & 7 deletions fugue_spark/_utils/convert.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pickle
from typing import Any, Iterable, List, Tuple
from typing import Any, Iterable, List, Tuple, Optional


import pandas as pd
import pyarrow as pa
Expand All @@ -16,7 +17,7 @@
)
from triad.collections import Schema
from triad.utils.assertion import assert_arg_not_none, assert_or_throw
from triad.utils.pyarrow import TRIAD_DEFAULT_TIMESTAMP
from triad.utils.pyarrow import TRIAD_DEFAULT_TIMESTAMP, cast_pa_table
from triad.utils.schema import quote_name

import fugue.api as fa
Expand All @@ -41,7 +42,7 @@ def pandas_udf_can_accept(schema: Schema, is_input: bool) -> bool:
return False
to_arrow_schema(from_arrow_schema(schema.pa_schema))
return True
except Exception:
except Exception: # pragma: no cover
return False


Expand Down Expand Up @@ -132,7 +133,7 @@ def to_type_safe_input(rows: Iterable[ps.Row], schema: Schema) -> Iterable[List[
if r[i] is not None:
r[i] = r[i].asDict(recursive=True)
yield r
else:
else: # pragma: no cover
for row in rows:
data = row.asDict(recursive=True)
r = [data[n] for n in schema.names]
Expand Down Expand Up @@ -173,14 +174,14 @@ def pd_to_spark_df(


def to_pandas(df: ps.DataFrame) -> pd.DataFrame:
if pd.__version__ < "2" or not any(
if version.parse(pd.__version__) < version.parse("2.0.0") or not any(
isinstance(x.dataType, (pt.TimestampType, TimestampNTZType))
for x in df.schema.fields
):
return df.toPandas()
else:
else: # pragma: no cover

def serialize(dfs): # pragma: no cover
def serialize(dfs):
for df in dfs:
data = pickle.dumps(df)
yield pd.DataFrame([[data]], columns=["data"])
Expand All @@ -189,6 +190,30 @@ def serialize(dfs): # pragma: no cover
return pd.concat(pickle.loads(x.data) for x in sdf.collect())


def to_arrow(df: ps.DataFrame) -> pa.Table:
schema = to_schema(df.schema)
destruct: Optional[bool] = None
try:
jconf = df.sparkSession._jconf
if jconf.arrowPySparkEnabled() and pandas_udf_can_accept(
schema, is_input=False
):
destruct = jconf.arrowPySparkSelfDestructEnabled()
except Exception: # pragma: no cover
# older spark does not have this config
pass
if destruct is not None and hasattr(df, "_collect_as_arrow"):
batches = df._collect_as_arrow(split_batches=destruct)
if len(batches) == 0:
return schema.create_empty_arrow_table()
table = pa.Table.from_batches(batches)
del batches
return cast_pa_table(table, schema.pa_schema)
else: # pragma: no cover
# df.toPandas has bugs on nested types
return pa.Table.from_pylist(df.collect(), schema=schema.pa_schema)


# TODO: the following function always set nullable to true,
# but should we use field.nullable?
def _to_arrow_type(dt: pt.DataType) -> pa.DataType:
Expand Down
4 changes: 3 additions & 1 deletion fugue_spark/_utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from fugue._utils.io import FileParser, save_df
from fugue.collections.partition import PartitionSpec
from fugue.dataframe import DataFrame
from fugue.dataframe import DataFrame, PandasDataFrame
from fugue_spark.dataframe import SparkDataFrame

from .convert import to_schema, to_spark_schema
Expand Down Expand Up @@ -62,6 +62,8 @@ def save_df(
writer.save(uri)
else:
ldf = df.as_local()
if isinstance(ldf, PandasDataFrame) and hasattr(ldf.native, "attrs"):
ldf.native.attrs = {} # pragma: no cover
save_df(ldf, uri, format_hint=format_hint, mode=mode, fs=self._fs, **kwargs)

def _get_writer(
Expand Down
Loading

0 comments on commit c4ecb27

Please sign in to comment.