Skip to content

Commit

Permalink
Support dataframe input with . in name (#219)
Browse files Browse the repository at this point in the history
* Support dataframe input with . in name

* update codecov

* update

* update

* spark empty pd.DataFrame

* update version

* update doc
  • Loading branch information
goodwanghan authored Jun 3, 2021
1 parent 3dfef05 commit f1ff762
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 16 deletions.
8 changes: 0 additions & 8 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@ jobs:
- name: Test
if: "!github.event.release.prerelease"
run: make test
- name: Coveralls
if: "!github.event.release.prerelease"
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }}
run: |
pip install coveralls
coveralls
- name: Build and publish
env:
RELEASE_TAG: ${{ github.event.release.tag_name }}
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/test37.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ jobs:
run: make devenv
- name: Test
run: make test
- name: "Upload coverage to Codecov"
if: matrix.python-version == 3.7
uses: codecov/codecov-action@v1
with:
fail_ci_if_error: true
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# <img src="./images/logo.svg" width="200">

[![Doc](https://readthedocs.org/projects/fugue/badge)](https://fugue.readthedocs.org)
[![PyPI version](https://badge.fury.io/py/fugue.svg)](https://pypi.python.org/pypi/fugue/)
[![PyPI pyversions](https://img.shields.io/pypi/pyversions/fugue.svg)](https://pypi.python.org/pypi/fugue/)
[![PyPI license](https://img.shields.io/pypi/l/fugue.svg)](https://pypi.python.org/pypi/fugue/)
[![Doc](https://readthedocs.org/projects/fugue/badge)](https://fugue.readthedocs.org)
[![Coverage Status](https://coveralls.io/repos/github/fugue-project/fugue/badge.svg)](https://coveralls.io/github/fugue-project/fugue)
[![codecov](https://codecov.io/gh/fugue-project/fugue/branch/master/graph/badge.svg?token=ZO9YD5N3IA)](https://codecov.io/gh/fugue-project/fugue)
[![Codacy Badge](https://app.codacy.com/project/badge/Grade/4fa5f2f53e6f48aaa1218a89f4808b91)](https://www.codacy.com/gh/fugue-project/fugue/dashboard?utm_source=github.com&utm_medium=referral&utm_content=fugue-project/fugue&utm_campaign=Badge_Grade)

[![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://join.slack.com/t/fugue-project/shared_invite/zt-jl0pcahu-KdlSOgi~fP50TZWmNxdWYQ)
Expand Down
6 changes: 6 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release Notes

## 0.5.7

- Let SparkExecutionEngine accept [empty](https://github.com/fugue-project/fugue/issues/217) pandas dataframes
- Move to [codecov](https://github.com/fugue-project/fugue/issues/216)
- Let Fugue SQL take input dataframes with name such as [a.b](https://github.com/fugue-project/fugue/issues/215)

## 0.5.6

- Dask repartitioning [improvement](https://github.com/fugue-project/fugue/issues/201)
Expand Down
11 changes: 10 additions & 1 deletion fugue/execution/execution_engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Iterable, List, Optional, Union
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

from fugue.collections.partition import (
EMPTY_PARTITION_SPEC,
Expand Down Expand Up @@ -245,6 +245,15 @@ def map(
"""
raise NotImplementedError

def aggregate(
self,
df: DataFrame,
funcs: List[Tuple[str, Any, str]],
partition_spec: PartitionSpec,
metadata: Any = None,
) -> DataFrame: # pragma: no cover
raise NotImplementedError

@abstractmethod
def broadcast(self, df: DataFrame) -> DataFrame: # pragma: no cover
"""Broadcast the dataframe to all workers for a distributed computing framework
Expand Down
2 changes: 1 addition & 1 deletion fugue_spark/_utils/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pyspark.sql.types import from_arrow_type, to_arrow_type # type: ignore

# https://issues.apache.org/jira/browse/SPARK-29041
pt._acceptable_types[pt.BinaryType] = (bytearray, bytes) # type: ignore
pt._acceptable_types[pt.BinaryType] = (bytearray, bytes) # type: ignore # pragma: no cover # noqa: E501 # pylint: disable=line-too-long
except ImportError: # pyspark >=3
from pyspark.sql.pandas.types import from_arrow_type, to_arrow_type
from pyarrow.types import is_list, is_struct, is_timestamp
Expand Down
7 changes: 6 additions & 1 deletion fugue_spark/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from triad.utils.assertion import assert_arg_not_none, assert_or_throw
from triad.utils.hash import to_uuid
from triad.utils.iter import EmptyAwareIterable
from triad.utils.pandas_like import PD_UTILS
from triad.utils.threading import RunOnce

from fugue_spark._constants import (
Expand Down Expand Up @@ -191,7 +192,11 @@ def to_df(
sdf = self.spark_session.createDataFrame(df, to_spark_schema(schema))
return SparkDataFrame(sdf, to_schema(schema), metadata)
if isinstance(df, pd.DataFrame):
sdf = self.spark_session.createDataFrame(df)
if PD_UTILS.empty(df):
temp_schema = to_spark_schema(PD_UTILS.to_schema(df))
sdf = self.spark_session.createDataFrame([], temp_schema)
else:
sdf = self.spark_session.createDataFrame(df)
return SparkDataFrame(sdf, schema, metadata)

# use arrow dataframe here to handle nulls in int cols
Expand Down
3 changes: 2 additions & 1 deletion fugue_sql/_visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
to_type,
)
from triad.utils.pyarrow import to_pa_datatype
from triad.utils.string import validate_triad_var_name

from fugue_sql._antlr import FugueSQLParser as fp
from fugue_sql._antlr import FugueSQLVisitor
Expand Down Expand Up @@ -766,7 +767,7 @@ def visitTableName(self, ctx: fp.TableNameContext) -> Iterable[Any]:
yield from self._get_query_elements(ctx.sample())
if ctx.tableAlias().strictIdentifier() is not None:
yield from self._get_query_elements(ctx.tableAlias())
else:
elif validate_triad_var_name(table_name):
yield "AS"
yield table_name

Expand Down
6 changes: 6 additions & 0 deletions fugue_test/execution_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ def test_to_df_general(self):
throw=True,
)

# should handle empty pandas dataframe
o = ArrayDataFrame([], "a:double,b:str")
pdf = pd.DataFrame([[0.1, "a"]], columns=["a", "b"])
pdf = pdf[pdf.a < 0]
df_eq(o, e.to_df(pdf), throw=True)

def test_map(self):
def noop(cursor, data):
return data
Expand Down
2 changes: 1 addition & 1 deletion fugue_version/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.5.6"
__version__ = "0.5.7"
2 changes: 1 addition & 1 deletion tests/fugue_spark/test_execution_engine.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Iterable, List
from typing import Any, List

import numpy as np
import pandas as pd
Expand Down
36 changes: 36 additions & 0 deletions tests/fugue_sql/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,42 @@ def test_use_df(tmpdir):
)


def test_use_soecial_df(tmpdir):
# external non-workflowdataframe
arr = ArrayDataFrame([[0], [1]], "a:int")
fsql(
"""
b=CREATE[[0], [1]] SCHEMA a: int
a = SELECT * FROM a.x
OUTPUT a, b USING assert_eq
a = SELECT x.* FROM a.x AS x
OUTPUT a, b USING assert_eq
c=CREATE [[0,0],[1,1]] SCHEMA a:int,b:int
d = SELECT x.*,y.a AS b FROM a.x x INNER JOIN a.x y ON x.a=y.a
OUTPUT c, d USING assert_eq
""",
{"a.x": arr},
).run()

# from yield file
engine = NativeExecutionEngine(
conf={"fugue.workflow.checkpoint.path": os.path.join(tmpdir, "ck")}
)
with FugueSQLWorkflow(engine) as dag:
dag("CREATE[[0], [1]] SCHEMA a: int YIELD FILE AS b")
res = dag.yields["b"]

with FugueSQLWorkflow(engine) as dag:
dag(
"""
b=CREATE[[0], [1]] SCHEMA a: int
a = SELECT * FROM a.x
OUTPUT a, b USING assert_eq
""",
{"a.x": res},
)


def test_lazy_use_df():
df1 = pd.DataFrame([[0]], columns=["a"])
df2 = pd.DataFrame([[1]], columns=["a"])
Expand Down

0 comments on commit f1ff762

Please sign in to comment.