diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index e76ac63a..0cd71a0e 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -22,14 +22,6 @@ jobs: - name: Test if: "!github.event.release.prerelease" run: make test - - name: Coveralls - if: "!github.event.release.prerelease" - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }} - run: | - pip install coveralls - coveralls - name: Build and publish env: RELEASE_TAG: ${{ github.event.release.tag_name }} diff --git a/.github/workflows/test37.yml b/.github/workflows/test37.yml index 2063a2ef..68432b6c 100644 --- a/.github/workflows/test37.yml +++ b/.github/workflows/test37.yml @@ -32,3 +32,8 @@ jobs: run: make devenv - name: Test run: make test + - name: "Upload coverage to Codecov" + if: matrix.python-version == 3.7 + uses: codecov/codecov-action@v1 + with: + fail_ci_if_error: true diff --git a/README.md b/README.md index 787742fc..124119ad 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,10 @@ # +[![Doc](https://readthedocs.org/projects/fugue/badge)](https://fugue.readthedocs.org) [![PyPI version](https://badge.fury.io/py/fugue.svg)](https://pypi.python.org/pypi/fugue/) [![PyPI pyversions](https://img.shields.io/pypi/pyversions/fugue.svg)](https://pypi.python.org/pypi/fugue/) [![PyPI license](https://img.shields.io/pypi/l/fugue.svg)](https://pypi.python.org/pypi/fugue/) -[![Doc](https://readthedocs.org/projects/fugue/badge)](https://fugue.readthedocs.org) -[![Coverage Status](https://coveralls.io/repos/github/fugue-project/fugue/badge.svg)](https://coveralls.io/github/fugue-project/fugue) +[![codecov](https://codecov.io/gh/fugue-project/fugue/branch/master/graph/badge.svg?token=ZO9YD5N3IA)](https://codecov.io/gh/fugue-project/fugue) [![Codacy Badge](https://app.codacy.com/project/badge/Grade/4fa5f2f53e6f48aaa1218a89f4808b91)](https://www.codacy.com/gh/fugue-project/fugue/dashboard?utm_source=github.com&utm_medium=referral&utm_content=fugue-project/fugue&utm_campaign=Badge_Grade) [![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://join.slack.com/t/fugue-project/shared_invite/zt-jl0pcahu-KdlSOgi~fP50TZWmNxdWYQ) diff --git a/RELEASE.md b/RELEASE.md index 23e6b1a4..bb1635a4 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,5 +1,11 @@ # Release Notes +## 0.5.7 + +- Let SparkExecutionEngine accept [empty](https://github.com/fugue-project/fugue/issues/217) pandas dataframes +- Move to [codecov](https://github.com/fugue-project/fugue/issues/216) +- Let Fugue SQL take input dataframes with name such as [a.b](https://github.com/fugue-project/fugue/issues/215) + ## 0.5.6 - Dask repartitioning [improvement](https://github.com/fugue-project/fugue/issues/201) diff --git a/fugue/execution/execution_engine.py b/fugue/execution/execution_engine.py index a23ebe5d..d5e98634 100644 --- a/fugue/execution/execution_engine.py +++ b/fugue/execution/execution_engine.py @@ -1,6 +1,6 @@ import logging from abc import ABC, abstractmethod -from typing import Any, Callable, Dict, Iterable, List, Optional, Union +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union from fugue.collections.partition import ( EMPTY_PARTITION_SPEC, @@ -245,6 +245,15 @@ def map( """ raise NotImplementedError + def aggregate( + self, + df: DataFrame, + funcs: List[Tuple[str, Any, str]], + partition_spec: PartitionSpec, + metadata: Any = None, + ) -> DataFrame: # pragma: no cover + raise NotImplementedError + @abstractmethod def broadcast(self, df: DataFrame) -> DataFrame: # pragma: no cover """Broadcast the dataframe to all workers for a distributed computing framework diff --git a/fugue_spark/_utils/convert.py b/fugue_spark/_utils/convert.py index 262fabae..0f072c95 100644 --- a/fugue_spark/_utils/convert.py +++ b/fugue_spark/_utils/convert.py @@ -8,7 +8,7 @@ from pyspark.sql.types import from_arrow_type, to_arrow_type # type: ignore # https://issues.apache.org/jira/browse/SPARK-29041 - pt._acceptable_types[pt.BinaryType] = (bytearray, bytes) # type: ignore + pt._acceptable_types[pt.BinaryType] = (bytearray, bytes) # type: ignore # pragma: no cover # noqa: E501 # pylint: disable=line-too-long except ImportError: # pyspark >=3 from pyspark.sql.pandas.types import from_arrow_type, to_arrow_type from pyarrow.types import is_list, is_struct, is_timestamp diff --git a/fugue_spark/execution_engine.py b/fugue_spark/execution_engine.py index 2c873f26..5649d515 100644 --- a/fugue_spark/execution_engine.py +++ b/fugue_spark/execution_engine.py @@ -36,6 +36,7 @@ from triad.utils.assertion import assert_arg_not_none, assert_or_throw from triad.utils.hash import to_uuid from triad.utils.iter import EmptyAwareIterable +from triad.utils.pandas_like import PD_UTILS from triad.utils.threading import RunOnce from fugue_spark._constants import ( @@ -191,7 +192,11 @@ def to_df( sdf = self.spark_session.createDataFrame(df, to_spark_schema(schema)) return SparkDataFrame(sdf, to_schema(schema), metadata) if isinstance(df, pd.DataFrame): - sdf = self.spark_session.createDataFrame(df) + if PD_UTILS.empty(df): + temp_schema = to_spark_schema(PD_UTILS.to_schema(df)) + sdf = self.spark_session.createDataFrame([], temp_schema) + else: + sdf = self.spark_session.createDataFrame(df) return SparkDataFrame(sdf, schema, metadata) # use arrow dataframe here to handle nulls in int cols diff --git a/fugue_sql/_visitors.py b/fugue_sql/_visitors.py index de130e4c..3e0b5d24 100644 --- a/fugue_sql/_visitors.py +++ b/fugue_sql/_visitors.py @@ -37,6 +37,7 @@ to_type, ) from triad.utils.pyarrow import to_pa_datatype +from triad.utils.string import validate_triad_var_name from fugue_sql._antlr import FugueSQLParser as fp from fugue_sql._antlr import FugueSQLVisitor @@ -766,7 +767,7 @@ def visitTableName(self, ctx: fp.TableNameContext) -> Iterable[Any]: yield from self._get_query_elements(ctx.sample()) if ctx.tableAlias().strictIdentifier() is not None: yield from self._get_query_elements(ctx.tableAlias()) - else: + elif validate_triad_var_name(table_name): yield "AS" yield table_name diff --git a/fugue_test/execution_suite.py b/fugue_test/execution_suite.py index c196c68d..1969d740 100644 --- a/fugue_test/execution_suite.py +++ b/fugue_test/execution_suite.py @@ -83,6 +83,12 @@ def test_to_df_general(self): throw=True, ) + # should handle empty pandas dataframe + o = ArrayDataFrame([], "a:double,b:str") + pdf = pd.DataFrame([[0.1, "a"]], columns=["a", "b"]) + pdf = pdf[pdf.a < 0] + df_eq(o, e.to_df(pdf), throw=True) + def test_map(self): def noop(cursor, data): return data diff --git a/fugue_version/__init__.py b/fugue_version/__init__.py index a779a442..1cc82e6b 100644 --- a/fugue_version/__init__.py +++ b/fugue_version/__init__.py @@ -1 +1 @@ -__version__ = "0.5.6" +__version__ = "0.5.7" diff --git a/tests/fugue_spark/test_execution_engine.py b/tests/fugue_spark/test_execution_engine.py index b4d08e18..b3548f26 100644 --- a/tests/fugue_spark/test_execution_engine.py +++ b/tests/fugue_spark/test_execution_engine.py @@ -1,4 +1,4 @@ -from typing import Any, Iterable, List +from typing import Any, List import numpy as np import pandas as pd diff --git a/tests/fugue_sql/test_workflow.py b/tests/fugue_sql/test_workflow.py index c3e0f35d..9ed66fe9 100644 --- a/tests/fugue_sql/test_workflow.py +++ b/tests/fugue_sql/test_workflow.py @@ -150,6 +150,42 @@ def test_use_df(tmpdir): ) +def test_use_soecial_df(tmpdir): + # external non-workflowdataframe + arr = ArrayDataFrame([[0], [1]], "a:int") + fsql( + """ + b=CREATE[[0], [1]] SCHEMA a: int + a = SELECT * FROM a.x + OUTPUT a, b USING assert_eq + a = SELECT x.* FROM a.x AS x + OUTPUT a, b USING assert_eq + c=CREATE [[0,0],[1,1]] SCHEMA a:int,b:int + d = SELECT x.*,y.a AS b FROM a.x x INNER JOIN a.x y ON x.a=y.a + OUTPUT c, d USING assert_eq + """, + {"a.x": arr}, + ).run() + + # from yield file + engine = NativeExecutionEngine( + conf={"fugue.workflow.checkpoint.path": os.path.join(tmpdir, "ck")} + ) + with FugueSQLWorkflow(engine) as dag: + dag("CREATE[[0], [1]] SCHEMA a: int YIELD FILE AS b") + res = dag.yields["b"] + + with FugueSQLWorkflow(engine) as dag: + dag( + """ + b=CREATE[[0], [1]] SCHEMA a: int + a = SELECT * FROM a.x + OUTPUT a, b USING assert_eq + """, + {"a.x": res}, + ) + + def test_lazy_use_df(): df1 = pd.DataFrame([[0]], columns=["a"]) df2 = pd.DataFrame([[1]], columns=["a"])