diff --git a/.pylintrc b/.pylintrc index 6ac56d0e..4c80abe9 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,3 +1,3 @@ [MESSAGES CONTROL] -disable = C0103,C0114,C0115,C0116,C0122,C0200,C0302,C0411,C0415,E0401,E0712,R0201,R0205,R0801,R0902,R0903,R0904,R0911,R0912,R0913,R0914,R1705,R1710,R1720,R1724,W0102,W0107,W0108,W0201,W0212,W0221,W0223,W0511,W0613,W0631,W0640,W0703,W1116 +disable = C0103,C0114,C0115,C0116,C0122,C0200,C0302,C0411,C0415,E0401,E0712,E1130,R0201,R0205,R0801,R0902,R0903,R0904,R0911,R0912,R0913,R0914,R1705,R1710,R1718,R1720,R1724,W0102,W0107,W0108,W0201,W0212,W0221,W0223,W0511,W0613,W0631,W0640,W0703,W1116 # TODO: R0205: inherits from object, can be safely removed diff --git a/RELEASE.md b/RELEASE.md index bb1635a4..7f06e9b4 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,7 +1,8 @@ # Release Notes -## 0.5.7 +## 0.6.0 +- Added Select, Aggregate, Filter, Assign [interfaces](https://github.com/fugue-project/fugue/issues/211) - Let SparkExecutionEngine accept [empty](https://github.com/fugue-project/fugue/issues/217) pandas dataframes - Move to [codecov](https://github.com/fugue-project/fugue/issues/216) - Let Fugue SQL take input dataframes with name such as [a.b](https://github.com/fugue-project/fugue/issues/215) diff --git a/docs/_templates/package.rst_t b/docs/_templates/package.rst_t index 13ab3382..aa41d51c 100644 --- a/docs/_templates/package.rst_t +++ b/docs/_templates/package.rst_t @@ -45,7 +45,7 @@ .. |FugueSQLTutorial| replace:: :ref:`the Fugue SQL Tutorial ` .. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial ` .. |ExecutionEngineTutorial| replace:: :ref:`the ExecutionEngine Tutorial ` - +.. |FugueDataTypes| replace:: :ref:`Fugue Data Types ` {% if separatemodules %} {%- else %} diff --git a/docs/api/fugue.collections.rst b/docs/api/fugue.collections.rst index 76fa452d..c96beade 100644 --- a/docs/api/fugue.collections.rst +++ b/docs/api/fugue.collections.rst @@ -18,7 +18,7 @@ fugue.collections .. |FugueSQLTutorial| replace:: :ref:`the Fugue SQL Tutorial ` .. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial ` .. |ExecutionEngineTutorial| replace:: :ref:`the ExecutionEngine Tutorial ` - +.. |FugueDataTypes| replace:: :ref:`Fugue Data Types ` fugue.collections.partition diff --git a/docs/api/fugue.column.rst b/docs/api/fugue.column.rst new file mode 100644 index 00000000..d1972066 --- /dev/null +++ b/docs/api/fugue.column.rst @@ -0,0 +1,47 @@ +fugue.column +============= + +.. |SchemaLikeObject| replace:: :ref:`Schema like object ` +.. |ParamsLikeObject| replace:: :ref:`Parameters like object ` +.. |DataFrameLikeObject| replace:: :ref:`DataFrame like object ` +.. |DataFramesLikeObject| replace:: :ref:`DataFrames like object ` +.. |PartitionLikeObject| replace:: :ref:`Partition like object ` +.. |RPCHandlerLikeObject| replace:: :ref:`RPChandler like object ` + +.. |ExecutionEngine| replace:: :class:`~fugue.execution.execution_engine.ExecutionEngine` +.. |NativeExecutionEngine| replace:: :class:`~fugue.execution.native_execution_engine.NativeExecutionEngine` +.. |FugueWorkflow| replace:: :class:`~fugue.workflow.workflow.FugueWorkflow` + +.. |ReadJoin| replace:: Read Join tutorials on :ref:`workflow ` and :ref:`engine ` for details +.. |FugueConfig| replace:: :ref:`the Fugue Configuration Tutorial ` +.. |PartitionTutorial| replace:: :ref:`the Partition Tutorial ` +.. |FugueSQLTutorial| replace:: :ref:`the Fugue SQL Tutorial ` +.. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial ` +.. |ExecutionEngineTutorial| replace:: :ref:`the ExecutionEngine Tutorial ` +.. |FugueDataTypes| replace:: :ref:`Fugue Data Types ` + + +fugue.column.expressions +------------------------ + +.. automodule:: fugue.column.expressions + :members: + :undoc-members: + :show-inheritance: + +fugue.column.functions +---------------------- + +.. automodule:: fugue.column.functions + :members: + :undoc-members: + :show-inheritance: + +fugue.column.sql +---------------- + +.. automodule:: fugue.column.sql + :members: + :undoc-members: + :show-inheritance: + diff --git a/docs/api/fugue.dataframe.rst b/docs/api/fugue.dataframe.rst index cb5b236d..33084ffb 100644 --- a/docs/api/fugue.dataframe.rst +++ b/docs/api/fugue.dataframe.rst @@ -18,7 +18,7 @@ fugue.dataframe .. |FugueSQLTutorial| replace:: :ref:`the Fugue SQL Tutorial ` .. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial ` .. |ExecutionEngineTutorial| replace:: :ref:`the ExecutionEngine Tutorial ` - +.. |FugueDataTypes| replace:: :ref:`Fugue Data Types ` fugue.dataframe.array\_dataframe diff --git a/docs/api/fugue.execution.rst b/docs/api/fugue.execution.rst index 7e372022..b78499a4 100644 --- a/docs/api/fugue.execution.rst +++ b/docs/api/fugue.execution.rst @@ -18,7 +18,7 @@ fugue.execution .. |FugueSQLTutorial| replace:: :ref:`the Fugue SQL Tutorial ` .. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial ` .. |ExecutionEngineTutorial| replace:: :ref:`the ExecutionEngine Tutorial ` - +.. |FugueDataTypes| replace:: :ref:`Fugue Data Types ` fugue.execution.execution\_engine diff --git a/docs/api/fugue.extensions.creator.rst b/docs/api/fugue.extensions.creator.rst index e4a7b616..aa1d8e0e 100644 --- a/docs/api/fugue.extensions.creator.rst +++ b/docs/api/fugue.extensions.creator.rst @@ -18,7 +18,7 @@ fugue.extensions.creator .. |FugueSQLTutorial| replace:: :ref:`the Fugue SQL Tutorial ` .. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial ` .. |ExecutionEngineTutorial| replace:: :ref:`the ExecutionEngine Tutorial ` - +.. |FugueDataTypes| replace:: :ref:`Fugue Data Types ` fugue.extensions.creator.convert diff --git a/docs/api/fugue.extensions.outputter.rst b/docs/api/fugue.extensions.outputter.rst index f8e15c88..cd9be5f4 100644 --- a/docs/api/fugue.extensions.outputter.rst +++ b/docs/api/fugue.extensions.outputter.rst @@ -18,7 +18,7 @@ fugue.extensions.outputter .. |FugueSQLTutorial| replace:: :ref:`the Fugue SQL Tutorial ` .. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial ` .. |ExecutionEngineTutorial| replace:: :ref:`the ExecutionEngine Tutorial ` - +.. |FugueDataTypes| replace:: :ref:`Fugue Data Types ` fugue.extensions.outputter.convert diff --git a/docs/api/fugue.extensions.processor.rst b/docs/api/fugue.extensions.processor.rst index b9fd23b7..be7bdf1c 100644 --- a/docs/api/fugue.extensions.processor.rst +++ b/docs/api/fugue.extensions.processor.rst @@ -18,7 +18,7 @@ fugue.extensions.processor .. |FugueSQLTutorial| replace:: :ref:`the Fugue SQL Tutorial ` .. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial ` .. |ExecutionEngineTutorial| replace:: :ref:`the ExecutionEngine Tutorial ` - +.. |FugueDataTypes| replace:: :ref:`Fugue Data Types ` fugue.extensions.processor.convert diff --git a/docs/api/fugue.extensions.rst b/docs/api/fugue.extensions.rst index 99cc9e6b..5bed2d60 100644 --- a/docs/api/fugue.extensions.rst +++ b/docs/api/fugue.extensions.rst @@ -26,7 +26,7 @@ fugue.extensions .. |FugueSQLTutorial| replace:: :ref:`the Fugue SQL Tutorial ` .. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial ` .. |ExecutionEngineTutorial| replace:: :ref:`the ExecutionEngine Tutorial ` - +.. |FugueDataTypes| replace:: :ref:`Fugue Data Types ` fugue.extensions.context diff --git a/docs/api/fugue.extensions.transformer.rst b/docs/api/fugue.extensions.transformer.rst index 905d3d6d..20c3a934 100644 --- a/docs/api/fugue.extensions.transformer.rst +++ b/docs/api/fugue.extensions.transformer.rst @@ -18,7 +18,7 @@ fugue.extensions.transformer .. |FugueSQLTutorial| replace:: :ref:`the Fugue SQL Tutorial ` .. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial ` .. |ExecutionEngineTutorial| replace:: :ref:`the ExecutionEngine Tutorial ` - +.. |FugueDataTypes| replace:: :ref:`Fugue Data Types ` fugue.extensions.transformer.constants diff --git a/docs/api/fugue.rpc.rst b/docs/api/fugue.rpc.rst index 39ed2ee6..8c04b130 100644 --- a/docs/api/fugue.rpc.rst +++ b/docs/api/fugue.rpc.rst @@ -18,7 +18,7 @@ fugue.rpc .. |FugueSQLTutorial| replace:: :ref:`the Fugue SQL Tutorial ` .. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial ` .. |ExecutionEngineTutorial| replace:: :ref:`the ExecutionEngine Tutorial ` - +.. |FugueDataTypes| replace:: :ref:`Fugue Data Types ` fugue.rpc.base diff --git a/docs/api/fugue.rst b/docs/api/fugue.rst index f0b8fc10..0efa231d 100644 --- a/docs/api/fugue.rst +++ b/docs/api/fugue.rst @@ -5,6 +5,7 @@ fugue :maxdepth: 4 fugue.collections + fugue.column fugue.dataframe fugue.execution fugue.extensions @@ -28,7 +29,7 @@ fugue .. |FugueSQLTutorial| replace:: :ref:`the Fugue SQL Tutorial ` .. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial ` .. |ExecutionEngineTutorial| replace:: :ref:`the ExecutionEngine Tutorial ` - +.. |FugueDataTypes| replace:: :ref:`Fugue Data Types ` fugue.constants diff --git a/docs/api/fugue.workflow.rst b/docs/api/fugue.workflow.rst index dfbeb185..1d50aaeb 100644 --- a/docs/api/fugue.workflow.rst +++ b/docs/api/fugue.workflow.rst @@ -18,7 +18,7 @@ fugue.workflow .. |FugueSQLTutorial| replace:: :ref:`the Fugue SQL Tutorial ` .. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial ` .. |ExecutionEngineTutorial| replace:: :ref:`the ExecutionEngine Tutorial ` - +.. |FugueDataTypes| replace:: :ref:`Fugue Data Types ` fugue.workflow.module diff --git a/docs/api_dask/fugue_dask.rst b/docs/api_dask/fugue_dask.rst index a6fca9bb..afbe4770 100644 --- a/docs/api_dask/fugue_dask.rst +++ b/docs/api_dask/fugue_dask.rst @@ -18,7 +18,7 @@ fugue\_dask .. |FugueSQLTutorial| replace:: :ref:`the Fugue SQL Tutorial ` .. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial ` .. |ExecutionEngineTutorial| replace:: :ref:`the ExecutionEngine Tutorial ` - +.. |FugueDataTypes| replace:: :ref:`Fugue Data Types ` fugue\_dask.dataframe diff --git a/docs/api_spark/fugue_spark.rst b/docs/api_spark/fugue_spark.rst index 3989922c..c60c7b9b 100644 --- a/docs/api_spark/fugue_spark.rst +++ b/docs/api_spark/fugue_spark.rst @@ -18,7 +18,7 @@ fugue\_spark .. |FugueSQLTutorial| replace:: :ref:`the Fugue SQL Tutorial ` .. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial ` .. |ExecutionEngineTutorial| replace:: :ref:`the ExecutionEngine Tutorial ` - +.. |FugueDataTypes| replace:: :ref:`Fugue Data Types ` fugue\_spark.dataframe diff --git a/docs/api_sql/fugue_sql.rst b/docs/api_sql/fugue_sql.rst index 84fa42c7..7fa17463 100644 --- a/docs/api_sql/fugue_sql.rst +++ b/docs/api_sql/fugue_sql.rst @@ -18,7 +18,7 @@ fugue\_sql .. |FugueSQLTutorial| replace:: :ref:`the Fugue SQL Tutorial ` .. |DataFrameTutorial| replace:: :ref:`the DataFrame Tutorial ` .. |ExecutionEngineTutorial| replace:: :ref:`the ExecutionEngine Tutorial ` - +.. |FugueDataTypes| replace:: :ref:`Fugue Data Types ` fugue\_sql.exceptions diff --git a/fugue/column/__init__.py b/fugue/column/__init__.py new file mode 100644 index 00000000..abf381f0 --- /dev/null +++ b/fugue/column/__init__.py @@ -0,0 +1,4 @@ +# flake8: noqa +from fugue.column.expressions import ColumnExpr, col, function, lit, null +from fugue.column.functions import is_agg +from fugue.column.sql import SelectColumns, SQLExpressionGenerator diff --git a/fugue/column/expressions.py b/fugue/column/expressions.py new file mode 100644 index 00000000..8aa1af56 --- /dev/null +++ b/fugue/column/expressions.py @@ -0,0 +1,838 @@ +from typing import Any, Dict, Iterable, List, Optional, Union + +import pyarrow as pa +from triad import Schema, assert_or_throw, to_uuid +from triad.utils.pyarrow import _type_to_expression, to_pa_datatype + + +class ColumnExpr: + """Fugue column expression class. It is inspired from + :class:`spark:pyspark.sql.Column` and it is working in progress. + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. caution:: + + This is a base class of different column classes, and users are not supposed + to construct this class directly. Use :func:`~.col` and :func:`~.lit` instead. + """ + + def __init__(self): + self._as_name = "" + self._as_type: Optional[pa.DataType] = None + + @property + def name(self) -> str: + """The original name of this column, default empty + + :return: the name + + .. admonition:: Examples + + .. code-block:: python + + assert "a" == col("a").name + assert "b" == col("a").alias("b").name + assert "" == lit(1).name + assert "" == (col("a") * 2).name + """ + return "" + + @property + def as_name(self) -> str: + """The name assigned by :meth:`~.alias` + + :return: the alias + + .. admonition:: Examples + + .. code-block:: python + + assert "" == col("a").as_name + assert "b" == col("a").alias("b").as_name + assert "x" == (col("a") * 2).alias("x").as_name + """ + return self._as_name + + @property + def as_type(self) -> Optional[pa.DataType]: + """The type assigned by :meth:`~.cast` + + :return: the pyarrow datatype if :meth:`~.cast` was called + otherwise None + + .. admonition:: Examples + + .. code-block:: python + + import pyarrow as pa + + assert col("a").as_type is None + assert pa.int64() == col("a").cast(int).as_type + assert pa.string() == (col("a") * 2).cast(str).as_type + """ + return self._as_type + + @property + def output_name(self) -> str: + """The name assigned by :meth:`~.alias`, but if empty then + return the original column name + + :return: the alias or the original column name + + .. admonition:: Examples + + .. code-block:: python + + assert "a" == col("a").output_name + assert "b" == col("a").alias("b").output_name + assert "x" == (col("a") * 2).alias("x").output_name + """ + return self.as_name if self.as_name != "" else self.name + + def alias(self, as_name: str) -> "ColumnExpr": # pragma: no cover + """Assign or remove alias of a column. To remove, set ``as_name`` to empty + + :return: a new column with the alias value + + .. admonition:: Examples + + .. code-block:: python + + assert "b" == col("a").alias("b").as_name + assert "x" == (col("a") * 2).alias("x").as_name + assert "" == col("a").alias("b").alias("").as_name + """ + raise NotImplementedError + + def infer_alias(self) -> "ColumnExpr": + """Infer alias of a column. If the column's :meth:`~.output_name` is not empty + then it returns itself without change. Otherwise it tries to infer alias from + the underlying columns. + + :return: a column instance with inferred alias + + .. caution:: + + Users should not use it directly. + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + + assert "a" == col("a").infer_alias().output_name + assert "" == (col("a") * 2).infer_alias().output_name + assert "a" == col("a").is_null().infer_alias().output_name + assert "a" == f.max(col("a").is_null()).infer_alias().output_name + """ + return self + + def cast(self, data_type: Any) -> "ColumnExpr": # pragma: no cover + """Cast the column to a new data type + + :param data_type: It can be string expressions, python primitive types, + python `datetime.datetime` and pyarrow types. + For details read |FugueDataTypes| + :return: a new column instance with the assigned data type + + .. caution:: + + Currently, casting to struct or list type has undefined behavior. + + .. admonition:: Examples + + .. code-block:: python + + import pyarrow as pa + + assert pa.int64() == col("a").cast(int).as_type + assert pa.string() == col("a").cast(str).as_type + assert pa.float64() == col("a").cast(float).as_type + assert pa._bool() == col("a").cast(bool).as_type + + # string follows the type expression of Triad Schema + assert pa.int32() == col("a").cast("int").as_type + assert pa.int32() == col("a").cast("int32").as_type + + assert pa.int32() == col("a").cast(pa.int32()).as_type + """ + raise NotImplementedError + + def infer_type(self, schema: Schema) -> Optional[pa.DataType]: + """Infer data type of this column given the input schema + + :param schema: the schema instance to infer from + :return: a pyarrow datatype or None if failed to infer + + .. caution:: + + Users should not use it directly. + + .. admonition:: Examples + + .. code-block:: python + + import pyarrow as pa + from triad import Schema + import fugue.column.functions as f + + schema = Schema("a:int,b:str") + + assert pa.int32() == col("a").infer_schema(schema) + assert pa.int32() == (-col("a")).infer_schema(schema) + # due to overflow risk, can't infer certain operations + assert (col("a")+1).infer_schema(schema) is None + assert (col("a")+col("a")).infer_schema(schema) is None + assert pa.int32() == f.max(col("a")).infer_schema(schema) + assert pa.int32() == f.min(col("a")).infer_schema(schema) + assert f.sum(col("a")).infer_schema(schema) is None + """ + return self.as_type # pragma: no cover + + def __str__(self) -> str: + """String expression of the column, this is only used for debug purpose. + It is not SQL expression. + + :return: the string expression + """ + res = self.body_str + if self.as_type is not None: + res = f"CAST({res} AS {_type_to_expression(self.as_type)})" + if self.as_name != "": + res = res + " AS " + self.as_name + return res + + @property + def body_str(self) -> str: # pragma: no cover + """The string expression of this column without cast type and alias. + This is only used for debug purpose. It is not SQL expression. + + :return: the string expression + """ + raise NotImplementedError + + def is_null(self) -> "ColumnExpr": + """Same as SQL `` IS NULL``. + + :return: a new column with the boolean values + """ + # TODO: should enable infer_schema for this? + return _UnaryOpExpr("IS_NULL", self) + + def not_null(self) -> "ColumnExpr": + """Same as SQL `` IS NOT NULL``. + + :return: a new column with the boolean values + """ + # TODO: should enable infer_schema for this? + return _UnaryOpExpr("NOT_NULL", self) + + def __neg__(self) -> "ColumnExpr": + """The negative value of the current column + + :return: a new column with the negative value + """ + return _InvertOpExpr("-", self) + + def __pos__(self) -> "ColumnExpr": + """The original value of the current column + + :return: the column itself + """ + return self + + def __invert__(self) -> "ColumnExpr": + """Same as SQL ``NOT `` + + :return: a new column with the boolean values + """ + return _NotOpExpr("~", self) + + def __add__(self, other: Any) -> "ColumnExpr": + """Add with another column + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the result + """ + return _BinaryOpExpr("+", self, other) + + def __radd__(self, other: Any) -> "ColumnExpr": + """Add with another column + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the result + """ + return _BinaryOpExpr("+", other, self) + + def __sub__(self, other: Any) -> "ColumnExpr": + """Subtract another column from this column + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the result + """ + return _BinaryOpExpr("-", self, other) + + def __rsub__(self, other: Any) -> "ColumnExpr": + """Subtract this column from the other column + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the result + """ + return _BinaryOpExpr("-", other, self) + + def __mul__(self, other: Any) -> "ColumnExpr": + """Multiply with another column + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the result + """ + return _BinaryOpExpr("*", self, other) + + def __rmul__(self, other: Any) -> "ColumnExpr": + """Multiply with another column + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the result + """ + return _BinaryOpExpr("*", other, self) + + def __truediv__(self, other: Any) -> "ColumnExpr": + """Divide this column by the other column + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the result + """ + return _BinaryOpExpr("/", self, other) + + def __rtruediv__(self, other: Any) -> "ColumnExpr": + """Divide the other column by this column + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the result + """ + return _BinaryOpExpr("/", other, self) + + def __and__(self, other: Any) -> "ColumnExpr": + """``AND`` value of the two columns + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the result + """ + return _BoolBinaryOpExpr("&", self, other) + + def __rand__(self, other: Any) -> "ColumnExpr": + """``AND`` value of the two columns + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the result + """ + return _BoolBinaryOpExpr("&", other, self) + + def __or__(self, other: Any) -> "ColumnExpr": + """``OR`` value of the two columns + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the result + """ + return _BoolBinaryOpExpr("|", self, other) + + def __ror__(self, other: Any) -> "ColumnExpr": + """``OR`` value of the two columns + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the result + """ + return _BoolBinaryOpExpr("|", other, self) + + def __lt__(self, other: Any) -> "ColumnExpr": + """Whether this column is less than the other column + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the boolean result + """ + return _BoolBinaryOpExpr("<", self, other) + + def __gt__(self, other: Any) -> "ColumnExpr": + """Whether this column is greater than the other column + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the boolean result + """ + return _BoolBinaryOpExpr(">", self, other) + + def __le__(self, other: Any) -> "ColumnExpr": + """Whether this column is less or equal to the other column + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the boolean result + """ + return _BoolBinaryOpExpr("<=", self, other) + + def __ge__(self, other: Any) -> "ColumnExpr": + """Whether this column is greater or equal to the other column + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the boolean result + """ + return _BoolBinaryOpExpr(">=", self, other) + + def __eq__(self, other: Any) -> "ColumnExpr": # type: ignore + """Whether this column equals the other column + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the boolean result + """ + return _BoolBinaryOpExpr("==", self, other) + + def __ne__(self, other: Any) -> "ColumnExpr": # type: ignore + """Whether this column does not equal the other column + + :param other: the other column, if it is not a + :class:`~.ColumnExpr`, then the value will be converted to + a literal (`lit(other)`) + :return: a new column with the boolean result + """ + return _BoolBinaryOpExpr("!=", self, other) + + def __uuid__(self) -> str: + """The unique id of this instance + + :return: the unique id + """ + return to_uuid( + str(type(self)), + self.as_name, + self.as_type, + *self._uuid_keys(), + ) + + def _uuid_keys(self) -> List[Any]: # pragma: no cover + raise NotImplementedError + + +def lit(obj: Any, alias: str = "") -> ColumnExpr: + """Convert the ``obj`` to a literal column. Currently ``obj`` must be + ``int``, ``bool``, ``float`` or ``str``, otherwise an exception will + be raised + + :param obj: an arbitrary value + :param alias: the alias of this literal column, defaults to "" (no alias) + :return: a literal column expression + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column import lit + + lit("abc") + lit(100).alias("x") + lit(100, "x") + """ + return ( + _LiteralColumnExpr(obj) if alias == "" else _LiteralColumnExpr(obj).alias(alias) + ) + + +def null() -> ColumnExpr: + """Equivalent to ``lit(None)``, the ``NULL`` value + + :return: ``lit(None)`` + + .. admonition:: New Since + :class: hint + + **0.6.0** + """ + return lit(None) + + +def col(obj: Union[str, ColumnExpr], alias: str = "") -> ColumnExpr: + """Convert the ``obj`` to a :class:`~.ColumnExpr` object + + :param obj: a string representing a column name or a :class:`~.ColumnExpr` object + :param alias: the alias of this column, defaults to "" (no alias) + :return: a literal column expression + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column import col + import fugue.column.functions as f + + col("a") + col("a").alias("x") + col("a", "x") + + # unary operations + -col("a") # negative value of a + ~col("a") # NOT a + col("a").is_null() # a IS NULL + col("a").not_null() # a IS NOT NULL + + # binary operations + col("a") + 1 # col("a") + lit(1) + 1 - col("a") # lit(1) - col("a") + col("a") * col("b") + col("a") / col("b") + + # binary boolean expressions + col("a") == 1 # col("a") == lit(1) + 2 != col("a") # col("a") != lit(2) + col("a") < 5 + col("a") > 5 + col("a") <= 5 + col("a") >= 5 + (col("a") < col("b")) & (col("b") > 1) | col("c").is_null() + + # with functions + f.max(col("a")) + f.max(col("a")+col("b")) + f.max(col("a")) + f.min(col("b")) + f.count_distinct(col("a")).alias("dcount") + + """ + if isinstance(obj, ColumnExpr): + return obj if alias == "" else obj.alias(alias) + if isinstance(obj, str): + return ( + _NamedColumnExpr(obj) if alias == "" else _NamedColumnExpr(obj).alias(alias) + ) + raise NotImplementedError(obj) + + +def function(name: str, *args: Any, arg_distinct: bool = False, **kwargs) -> ColumnExpr: + """Construct a function expression + + :param name: the name of the function + :param arg_distinct: whether to add ``DISTINCT`` before all arguments, + defaults to False + :return: the function expression + + .. caution:: + + Users should not use this directly + + """ + return _FuncExpr(name, *args, arg_distinct=arg_distinct, **kwargs) + + +def _get_column_mentions(column: ColumnExpr) -> Iterable[str]: + if isinstance(column, _NamedColumnExpr): + yield column.name + elif isinstance(column, _FuncExpr): + for a in column.args: + yield from _get_column_mentions(a) + for a in column.kwargs.values(): + yield from _get_column_mentions(a) + + +def _to_col(obj: Any) -> ColumnExpr: + if isinstance(obj, ColumnExpr): + return obj + return lit(obj) + + +class _NamedColumnExpr(ColumnExpr): + def __init__(self, name: Any): + self._name = name + super().__init__() + + @property + def body_str(self) -> str: + return self.name + + @property + def name(self) -> str: + return self._name + + @property + def output_name(self) -> str: + return "" if self.wildcard else super().output_name + + @property + def wildcard(self) -> bool: + return self.name == "*" + + def alias(self, as_name: str) -> ColumnExpr: + if self.wildcard and as_name != "": + raise ValueError("'*' can't have alias") + other = _NamedColumnExpr(self.name) + other._as_name = as_name + other._as_type = self.as_type + return other + + def cast(self, data_type: Any) -> "ColumnExpr": + if self.wildcard and data_type is not None: + raise ValueError("'*' can't cast") + other = _NamedColumnExpr(self.name) + other._as_name = self.as_name + other._as_type = None if data_type is None else to_pa_datatype(data_type) + return other + + def infer_alias(self) -> ColumnExpr: + if not self.wildcard and self.as_name == "" and self.as_type is not None: + return self.alias(self.output_name) + return self + + def infer_type(self, schema: Schema) -> Optional[pa.DataType]: + if self.name not in schema: + return self.as_type + return self.as_type or schema[self.name].type + + def _uuid_keys(self) -> List[Any]: + return [self.name] + + +class _LiteralColumnExpr(ColumnExpr): + _VALID_TYPES = (int, bool, float, str) + + def __init__(self, value: Any): + assert_or_throw( + value is None or isinstance(value, _LiteralColumnExpr._VALID_TYPES), + lambda: NotImplementedError(f"{value}, type: {type(value)}"), + ) + self._value = value + super().__init__() + + @property + def body_str(self) -> str: + if self.value is None: + return "NULL" + elif isinstance(self.value, str): + body = self.value.translate( + str.maketrans( + { # type: ignore + "\\": r"\\", + "'": r"\'", + } + ) + ) + return f"'{body}'" + elif isinstance(self.value, bool): + return "TRUE" if self.value else "FALSE" + else: + return str(self.value) + + @property + def value(self) -> Any: + return self._value + + def is_null(self) -> ColumnExpr: + return _LiteralColumnExpr(self.value is None) + + def not_null(self) -> ColumnExpr: + return _LiteralColumnExpr(self.value is not None) + + def alias(self, as_name: str) -> ColumnExpr: + other = _LiteralColumnExpr(self.value) + other._as_name = as_name + other._as_type = self.as_type + return other + + def cast(self, data_type: Any) -> ColumnExpr: + other = _LiteralColumnExpr(self.value) + other._as_name = self.as_name + other._as_type = None if data_type is None else to_pa_datatype(data_type) + return other + + def infer_type(self, schema: Schema) -> Optional[pa.DataType]: + if self.value is None: + return self.as_type + return self.as_type or to_pa_datatype(type(self.value)) + + def _uuid_keys(self) -> List[Any]: + return [self.value] + + +class _FuncExpr(ColumnExpr): + def __init__( + self, + func: str, + *args: Any, + arg_distinct: bool = False, + **kwargs: Any, + ): + self._distinct = arg_distinct + self._func = func + self._args = list(args) + self._kwargs = dict(kwargs) + super().__init__() + + @property + def body_str(self) -> str: + def to_str(v: Any): + if isinstance(v, str): + return f"'{v}'" + if isinstance(v, bool): + return "TRUE" if v else "FALSE" + return str(v) + + a1 = [to_str(x) for x in self.args] + a2 = [k + "=" + to_str(v) for k, v in self.kwargs.items()] + args = ",".join(a1 + a2) + distinct = "DISTINCT " if self.is_distinct else "" + return f"{self.func}({distinct}{args})" + + @property + def func(self) -> str: + return self._func + + @property + def is_distinct(self) -> bool: + return self._distinct + + @property + def args(self) -> List[Any]: + return self._args + + @property + def kwargs(self) -> Dict[str, Any]: + return self._kwargs + + def alias(self, as_name: str) -> ColumnExpr: + other = self._copy() + other._as_name = as_name + other._distinct = self.is_distinct + other._as_type = self.as_type + return other + + def cast(self, data_type: Any) -> ColumnExpr: + other = self._copy() + other._as_name = self.as_name + other._distinct = self.is_distinct + other._as_type = None if data_type is None else to_pa_datatype(data_type) + return other + + def _copy(self) -> "_FuncExpr": + return _FuncExpr(self.func, *self.args, **self.kwargs) + + def _uuid_keys(self) -> List[Any]: + return [self.func, self.is_distinct, self.args, self.kwargs] + + +class _UnaryOpExpr(_FuncExpr): + def __init__(self, op: str, column: ColumnExpr, arg_distinct: bool = False): + super().__init__(op, column, arg_distinct=arg_distinct) + + @property + def col(self) -> ColumnExpr: + return self.args[0] + + @property + def op(self) -> str: + return self.func + + def infer_alias(self) -> ColumnExpr: + return ( + self + if self.output_name != "" + else self.alias(self.col.infer_alias().output_name) + ) + + def _copy(self) -> _FuncExpr: + return _UnaryOpExpr(self.op, self.col) + + +class _InvertOpExpr(_UnaryOpExpr): + def _copy(self) -> _FuncExpr: + return _InvertOpExpr(self.op, self.col) + + def infer_type(self, schema: Schema) -> Optional[pa.DataType]: + if self.as_type is not None: + return self.as_type + tp = self.col.infer_type(schema) + if pa.types.is_signed_integer(tp) or pa.types.is_floating(tp): + return tp + return None + + +class _NotOpExpr(_UnaryOpExpr): + def _copy(self) -> _FuncExpr: + return _NotOpExpr(self.op, self.col) + + def infer_type(self, schema: Schema) -> Optional[pa.DataType]: + if self.as_type is not None: + return self.as_type + tp = self.col.infer_type(schema) + if pa.types.is_boolean(tp): + return tp + return None + + +class _BinaryOpExpr(_FuncExpr): + def __init__(self, op: str, left: Any, right: Any, arg_distinct: bool = False): + super().__init__(op, _to_col(left), _to_col(right), arg_distinct=arg_distinct) + + @property + def left(self) -> ColumnExpr: + return self.args[0] + + @property + def right(self) -> ColumnExpr: + return self.args[1] + + @property + def op(self) -> str: + return self.func + + def _copy(self) -> _FuncExpr: + return _BinaryOpExpr(self.op, self.left, self.right) + + +class _BoolBinaryOpExpr(_BinaryOpExpr): + def _copy(self) -> _FuncExpr: + return _BoolBinaryOpExpr(self.op, self.left, self.right) + + def infer_type(self, schema: Schema) -> Optional[pa.DataType]: + return self.as_type or pa.bool_() diff --git a/fugue/column/functions.py b/fugue/column/functions.py new file mode 100644 index 00000000..0cae04c9 --- /dev/null +++ b/fugue/column/functions.py @@ -0,0 +1,370 @@ +from typing import Any, Optional + +import pyarrow as pa +from fugue.column.expressions import ( + ColumnExpr, + _FuncExpr, + _to_col, + function, +) +from triad import Schema + + +def coalesce(*args: Any) -> ColumnExpr: + """SQL ``COALESCE`` function + + :param args: If a value is not :class:`~fugue.column.expressions.ColumnExpr` + then it's converted to a literal column by + :func:`~fugue.column.expressions.col` + + .. note:: + + this function can infer neither type nor alias + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + + f.coalesce(col("a"), col("b")+col("c"), 1) + """ + return function("COALESCE", *[_to_col(x) for x in args]) + + +def min(col: ColumnExpr) -> ColumnExpr: # pylint: disable=redefined-builtin + """SQL ``MIN`` function (aggregation) + + :param col: the column to find min + + .. note:: + + * this function can infer type from ``col`` type + * this function can infer alias from ``col``'s inferred alias + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + + # assume col a has type double + f.min(col("a")) # CAST(MIN(a) AS double) AS a + f.min(-col("a")) # CAST(MIN(-a) AS double) AS a + + # neither type nor alias can be inferred in the following cases + f.min(col("a")+1) + f.min(col("a")+col("b")) + + # you can specify explicitly + # CAST(MIN(a+b) AS int) AS x + f.min(col("a")+col("b")).cast(int).alias("x") + """ + assert isinstance(col, ColumnExpr) + return _SameTypeUnaryAggFuncExpr("MIN", col) + + +def max(col: ColumnExpr) -> ColumnExpr: # pylint: disable=redefined-builtin + """SQL ``MAX`` function (aggregation) + + :param col: the column to find max + + .. note:: + + * this function can infer type from ``col`` type + * this function can infer alias from ``col``'s inferred alias + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + + # assume col a has type double + f.max(col("a")) # CAST(MAX(a) AS double) AS a + f.max(-col("a")) # CAST(MAX(-a) AS double) AS a + + # neither type nor alias can be inferred in the following cases + f.max(col("a")+1) + f.max(col("a")+col("b")) + + # you can specify explicitly + # CAST(MAX(a+b) AS int) AS x + f.max(col("a")+col("b")).cast(int).alias("x") + """ + assert isinstance(col, ColumnExpr) + return _SameTypeUnaryAggFuncExpr("MAX", col) + + +def count(col: ColumnExpr) -> ColumnExpr: + """SQL ``COUNT`` function (aggregation) + + :param col: the column to find count + + .. note:: + + * this function cannot infer type from ``col`` type + * this function can infer alias from ``col``'s inferred alias + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + + f.count(col("*")) # COUNT(*) + f.count(col("a")) # COUNT(a) AS a + + # you can specify explicitly + # CAST(COUNT(a) AS double) AS a + f.count(col("a")).cast(float) + """ + assert isinstance(col, ColumnExpr) + return _UnaryAggFuncExpr("COUNT", col) + + +def count_distinct(col: ColumnExpr) -> ColumnExpr: + """SQL ``COUNT DISTINCT`` function (aggregation) + + :param col: the column to find distinct element count + + .. note:: + + * this function cannot infer type from ``col`` type + * this function can infer alias from ``col``'s inferred alias + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + + f.count_distinct(col("*")) # COUNT(DISTINCT *) + f.count_distinct(col("a")) # COUNT(DISTINCT a) AS a + + # you can specify explicitly + # CAST(COUNT(DISTINCT a) AS double) AS a + f.count_distinct(col("a")).cast(float) + """ + assert isinstance(col, ColumnExpr) + return _UnaryAggFuncExpr("COUNT", col, arg_distinct=True) + + +def avg(col: ColumnExpr) -> ColumnExpr: + """SQL ``AVG`` function (aggregation) + + :param col: the column to find average + + .. note:: + + * this function cannot infer type from ``col`` type + * this function can infer alias from ``col``'s inferred alias + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + + f.avg(col("a")) # AVG(a) AS a + + # you can specify explicitly + # CAST(AVG(a) AS double) AS a + f.avg(col("a")).cast(float) + """ + assert isinstance(col, ColumnExpr) + return _UnaryAggFuncExpr("AVG", col) + + +def sum(col: ColumnExpr) -> ColumnExpr: # pylint: disable=redefined-builtin + """SQL ``SUM`` function (aggregation) + + :param col: the column to find sum + + .. note:: + + * this function cannot infer type from ``col`` type + * this function can infer alias from ``col``'s inferred alias + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + + f.sum(col("a")) # SUM(a) AS a + + # you can specify explicitly + # CAST(SUM(a) AS double) AS a + f.sum(col("a")).cast(float) + """ + assert isinstance(col, ColumnExpr) + return _UnaryAggFuncExpr("SUM", col) + + +def first(col: ColumnExpr) -> ColumnExpr: + """SQL ``FIRST`` function (aggregation) + + :param col: the column to find first + + .. note:: + + * this function can infer type from ``col`` type + * this function can infer alias from ``col``'s inferred alias + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + + # assume col a has type double + f.first(col("a")) # CAST(FIRST(a) AS double) AS a + f.first(-col("a")) # CAST(FIRST(-a) AS double) AS a + + # neither type nor alias can be inferred in the following cases + f.first(col("a")+1) + f.first(col("a")+col("b")) + + # you can specify explicitly + # CAST(FIRST(a+b) AS int) AS x + f.first(col("a")+col("b")).cast(int).alias("x") + """ + assert isinstance(col, ColumnExpr) + return _SameTypeUnaryAggFuncExpr("FIRST", col) + + +def last(col: ColumnExpr) -> ColumnExpr: + """SQL ``LAST`` function (aggregation) + + :param col: the column to find last + + .. note:: + + * this function can infer type from ``col`` type + * this function can infer alias from ``col``'s inferred alias + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + + # assume col a has type double + f.last(col("a")) # CAST(LAST(a) AS double) AS a + f.last(-col("a")) # CAST(LAST(-a) AS double) AS a + + # neither type nor alias can be inferred in the following cases + f.last(col("a")+1) + f.last(col("a")+col("b")) + + # you can specify explicitly + # CAST(LAST(a+b) AS int) AS x + f.last(col("a")+col("b")).cast(int).alias("x") + """ + assert isinstance(col, ColumnExpr) + return _SameTypeUnaryAggFuncExpr("LAST", col) + + +def is_agg(column: Any) -> bool: + """Check if a column contains aggregation operation + + :param col: the column to check + :return: whether the column is :class:`~fugue.column.expressions.ColumnExpr` + and contains aggregation operations + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + + assert not f.is_agg(1) + assert not f.is_agg(col("a")) + assert not f.is_agg(col("a")+lit(1)) + + assert f.is_agg(f.max(col("a"))) + assert f.is_agg(-f.max(col("a"))) + assert f.is_agg(f.max(col("a")+1)) + assert f.is_agg(f.max(col("a"))+f.min(col("a")))) + """ + if isinstance(column, _UnaryAggFuncExpr): + return True + if isinstance(column, _FuncExpr): + return any(is_agg(x) for x in column.args) or any( + is_agg(x) for x in column.kwargs.values() + ) + return False + + +class _UnaryAggFuncExpr(_FuncExpr): + def __init__(self, func: str, col: ColumnExpr, arg_distinct: bool = False): + super().__init__(func, col, arg_distinct=arg_distinct) + + def infer_alias(self) -> ColumnExpr: + return ( + self + if self.output_name != "" + else self.alias(self.args[0].infer_alias().output_name) + ) + + def _copy(self) -> _FuncExpr: + return _UnaryAggFuncExpr(self.func, *self.args, **self.kwargs) + + +class _SameTypeUnaryAggFuncExpr(_UnaryAggFuncExpr): + def _copy(self) -> _FuncExpr: + return _SameTypeUnaryAggFuncExpr(self.func, *self.args, **self.kwargs) + + def infer_type(self, schema: Schema) -> Optional[pa.DataType]: + return self.as_type or self.args[0].infer_type(schema) diff --git a/fugue/column/sql.py b/fugue/column/sql.py new file mode 100644 index 00000000..76960cd9 --- /dev/null +++ b/fugue/column/sql.py @@ -0,0 +1,475 @@ +from typing import Any, Callable, Dict, Iterable, List, Optional, Set + +import pyarrow as pa +from fugue.column.expressions import ( + ColumnExpr, + _BinaryOpExpr, + _FuncExpr, + _LiteralColumnExpr, + _NamedColumnExpr, + _UnaryOpExpr, + col, + lit, +) +from fugue.column.functions import is_agg +from fugue.exceptions import FugueBug +from triad import Schema, assert_or_throw, to_uuid +from triad.utils.pyarrow import _type_to_expression + +_SUPPORTED_OPERATORS: Dict[str, str] = { + "+": "+", + "-": "-", + "*": "*", + "/": "/", + "&": " AND ", + "|": " OR ", + "<": "<", + ">": ">", + "<=": "<=", + ">=": ">=", + "==": "=", + "!=": "!=", +} + + +class SelectColumns: + """SQL ``SELECT`` columns collection. + + :param cols: collection of :class:`~fugue.column.expressions.ColumnExpr` + :param arg_distinct: whether this is ``SELECT DISTINCT``, defaults to False + + .. admonition:: New Since + :class: hint + + **0.6.0** + """ + + def __init__(self, *cols: ColumnExpr, arg_distinct: bool = False): # noqa: C901 + self._distinct = arg_distinct + self._all: List[ColumnExpr] = [] + self._literals: List[ColumnExpr] = [] + self._cols: List[ColumnExpr] = [] + self._non_agg_funcs: List[ColumnExpr] = [] + self._agg_funcs: List[ColumnExpr] = [] + self._group_keys: List[ColumnExpr] = [] + self._has_wildcard = False + + _g_keys: List[ColumnExpr] = [] + + for c in cols: + c = c.infer_alias() + _is_agg = False + self._all.append(c) + if isinstance(c, _LiteralColumnExpr): + self._literals.append(c) + else: + if isinstance(c, _NamedColumnExpr): + self._cols.append(c) + if c.wildcard: + if self._has_wildcard: + raise ValueError("'*' can be used at most once") + self._has_wildcard = True + elif isinstance(c, _FuncExpr): + if is_agg(c): + _is_agg = True + self._agg_funcs.append(c) + else: + self._non_agg_funcs.append(c) + if not _is_agg: + _g_keys.append(c.alias("").cast(None)) + + if self.has_agg: + self._group_keys += _g_keys + + if len(self._agg_funcs) > 0 and self._has_wildcard: + raise ValueError(f"'*' can't be used in aggregation: {self}") + + def __str__(self): + """String representation for debug purpose""" + expr = ", ".join(str(x) for x in self.all_cols) + return f"[{expr}]" + + def __uuid__(self): + """Unique id for this collection""" + return to_uuid(self._distinct, self.all_cols) + + @property + def is_distinct(self) -> bool: + """Whether this is a ``SELECT DISTINCT``""" + return self._distinct + + def replace_wildcard(self, schema: Schema) -> "SelectColumns": + """Replace wildcard ``*`` with explicit column names + + :param schema: the schema used to parse the wildcard + :return: a new instance containing only explicit columns + + .. note:: + + It only replaces the top level ``*``. For example + ``count_distinct(col("*"))`` will not be transformed because + this ``*`` is not first level. + """ + + def _get_cols() -> Iterable[ColumnExpr]: + for c in self.all_cols: + if isinstance(c, _NamedColumnExpr) and c.wildcard: + yield from [col(n) for n in schema.names] + else: + yield c + + return SelectColumns(*list(_get_cols())) + + def assert_all_with_names(self) -> "SelectColumns": + """Assert every column have explicit alias or the alias can + be inferred (non empty value). It will also validate there is + no duplicated aliases + + :raises ValueError: if there are columns without alias, or there are + duplicated names. + :return: the instance itself + """ + + names: Set[str] = set() + for x in self.all_cols: + if isinstance(x, _NamedColumnExpr): + if x.wildcard: + continue + if self._has_wildcard: + if x.as_name == "": + raise ValueError( + f"with '*', all other columns must have an alias: {self}" + ) + if x.output_name == "": + raise ValueError(f"{x} does not have an alias: {self}") + if x.output_name in names: + raise ValueError(f"{x} can't be reused in select: {self}") + names.add(x.output_name) + + return self + + def assert_no_wildcard(self) -> "SelectColumns": + """Assert there is no ``*`` on first level columns + + :raises AssertionError: if ``col("*")`` exists + :return: the instance itself + """ + assert not self._has_wildcard + return self + + def assert_no_agg(self) -> "SelectColumns": + """Assert there is no aggregation operation on any column. + + :raises AssertionError: if there is any aggregation in the + collection. + :return: the instance itself + + .. seealso:: + Go to :func:`~fugue.column.functions.is_agg` to see how the + aggregations are detected. + """ + assert not self.has_agg + return self + + @property + def all_cols(self) -> List[ColumnExpr]: + """All columns (with inferred aliases)""" + return self._all + + @property + def literals(self) -> List[ColumnExpr]: + """All literal columns""" + return self._literals + + @property + def simple_cols(self) -> List[ColumnExpr]: + """All columns directly representing column names""" + return self._cols + + @property + def non_agg_funcs(self) -> List[ColumnExpr]: + """All columns with non-aggregation operations""" + return self._non_agg_funcs + + @property + def agg_funcs(self) -> List[ColumnExpr]: + """All columns with aggregation operations""" + return self._agg_funcs + + @property + def group_keys(self) -> List[ColumnExpr]: + """Group keys inferred from the columns. + + .. note:: + + * if there is no aggregation, the result will be empty + * it is :meth:`~.simple_cols` plus :meth:`~.non_agg_funcs` + """ + return self._group_keys + + @property + def has_agg(self) -> bool: + """Whether this select is an aggregation""" + return len(self.agg_funcs) > 0 + + @property + def has_literals(self) -> bool: + """Whether this select contains literal columns""" + return len(self.literals) > 0 + + @property + def simple(self) -> bool: + """Whether this select contains only simple column representations""" + return len(self.simple_cols) == len(self.all_cols) + + +class SQLExpressionGenerator: + """SQL generator for :class:`~.SelectColumns` + + :param enable_cast: whether convert ``cast`` into the statement, defaults to True + + .. admonition:: New Since + :class: hint + + **0.6.0** + """ + + def __init__(self, enable_cast: bool = True): + self._enable_cast = enable_cast + self._func_handler: Dict[str, Callable[[_FuncExpr], Iterable[str]]] = {} + + def where(self, condition: ColumnExpr, table: str) -> str: + """Generate a ``SELECT *`` statement with the given where clause + + :param condition: column expression for ``WHERE`` + :param table: table name for ``FROM`` + :return: the SQL statement + + :raises ValueError: if ``condition`` contains aggregation + + .. admonition:: Examples + + .. code-block:: python + + gen = SQLExpressionGenerator(enable_cast=False) + + # SELECT * FROM tb WHERE a>1 AND b IS NULL + gen.where((col("a")>1) & col("b").is_null(), "tb") + """ + assert_or_throw( + not is_agg(condition), + lambda: ValueError(f"{condition} has aggregation functions"), + ) + cond = self.generate(condition.alias("")) + return f"SELECT * FROM {table} WHERE {cond}" + + def select( + self, + columns: SelectColumns, + table: str, + where: Optional[ColumnExpr] = None, + having: Optional[ColumnExpr] = None, + ) -> str: + """Construct the full ``SELECT`` statement on a single table + + :param columns: columns to select, it may contain aggregations, if + so, the group keys are inferred. + See :meth:`~fugue.column.sql.SelectColumns.group_keys` + :param table: table name to select from + :param where: ``WHERE`` condition, defaults to None + :param having: ``HAVING`` condition, defaults to None. It is used + only when there is aggregation + :return: the full ``SELECT`` statement + """ + columns.assert_all_with_names() + + def _where() -> str: + if where is None: + return "" + assert_or_throw( + not is_agg(where), + lambda: ValueError(f"{where} has aggregation functions"), + ) + return " WHERE " + self.generate(where.alias("")) + + def _having(as_where: bool = False) -> str: + if having is None: + return "" + pre = " WHERE " if as_where else " HAVING " + return pre + self.generate(having.alias("")) + + distinct = "" if not columns.is_distinct else "DISTINCT " + + if not columns.has_agg: + expr = ", ".join(self.generate(x) for x in columns.all_cols) + return f"SELECT {distinct}{expr} FROM {table}{_where()}" + columns.assert_no_wildcard() + if len(columns.literals) == 0: + expr = ", ".join(self.generate(x) for x in columns.all_cols) + if len(columns.group_keys) == 0: + return f"SELECT {distinct}{expr} FROM {table}{_where()}{_having()}" + else: + keys = ", ".join(self.generate(x) for x in columns.group_keys) + return ( + f"SELECT {distinct}{expr} FROM " + f"{table}{_where()} GROUP BY {keys}{_having()}" + ) + else: + no_lit = [ + x for x in columns.all_cols if not isinstance(x, _LiteralColumnExpr) + ] + sub = self.select(SelectColumns(*no_lit), table, where=where, having=having) + names = [ + self.generate(x) if isinstance(x, _LiteralColumnExpr) else x.output_name + for x in columns.all_cols + ] + expr = ", ".join(names) + return f"SELECT {expr} FROM ({sub})" + + def generate(self, expr: ColumnExpr) -> str: + """Convert :class:`~fugue.column.expressions.ColumnExpr` to + SQL clause + + :param expr: the column expression to convert + :return: the SQL clause for this expression + """ + return "".join(self._generate(expr)).strip() + + def add_func_handler( + self, name: str, handler: Callable[[_FuncExpr], Iterable[str]] + ) -> "SQLExpressionGenerator": + """Add special function handler. + + :param name: name of the function + :param handler: the function to convert the function expression to SQL + clause + :return: the instance itself + + .. caution:: + + Users should not use this directly + """ + self._func_handler[name] = handler + return self + + def correct_select_schema( + self, input_schema: Schema, select: SelectColumns, output_schema: Schema + ) -> Optional[Schema]: + """Do partial schema inference from ``input_schema`` and ``select`` columns, + then compare with the SQL output dataframe schema, and return the different + part as a new schema, or None if there is no difference + + :param input_schema: input dataframe schema for the select statement + :param select: the collection of select columns + :param output_schema: schema of the output dataframe after executing the SQL + :return: the difference as a new schema or None if no difference + + .. tip:: + + This is particularly useful when the SQL engine messed up the schema of the + output. For example, ``SELECT *`` should return a dataframe with the same + schema of the input. However, for example a column ``a:int`` could become + ``a:long`` in the output dataframe because of information loss. This + function is designed to make corrections on column types when they can be + inferred. This may not be perfect but it can solve major discrepancies. + """ + cols = select.replace_wildcard(input_schema).assert_all_with_names() + fields: List[pa.Field] = [] + for c in cols.all_cols: + tp = c.infer_type(input_schema) + if tp is not None and tp != output_schema[c.output_name].type: + fields.append(pa.field(c.output_name, tp)) + if len(fields) == 0: + return None + return Schema(fields) + + def _generate( # noqa: C901 + self, expr: ColumnExpr, bracket: bool = False + ) -> Iterable[str]: + if self._enable_cast and expr.as_type is not None: + yield "CAST(" + if isinstance(expr, _LiteralColumnExpr): + yield from self._on_lit(expr) + elif isinstance(expr, _NamedColumnExpr): + yield from self._on_named(expr) + elif isinstance(expr, _FuncExpr): + if expr.func in self._func_handler: + yield from self._func_handler[expr.func](expr) + elif isinstance(expr, _UnaryOpExpr): + yield from self._on_common_unary(expr) + elif isinstance(expr, _BinaryOpExpr): + yield from self._on_common_binary(expr, bracket) + else: + yield from self._on_common_func(expr) + if self._enable_cast and expr.as_type is not None: + yield " AS " + yield self.type_to_expr(expr.as_type) + yield ")" + if expr.as_name != "": + yield " AS " + expr.as_name + elif expr.as_type is not None and expr.name != "": + yield " AS " + expr.name + + def type_to_expr(self, data_type: pa.DataType): + return _type_to_expression(data_type) + + def _on_named(self, expr: _NamedColumnExpr) -> Iterable[str]: + yield expr.name + + def _on_lit(self, expr: _LiteralColumnExpr) -> Iterable[str]: + yield expr.body_str + + def _on_common_unary(self, expr: _UnaryOpExpr) -> Iterable[str]: + if expr.op == "-": + yield expr.op + yield from self._generate(expr.col, bracket=True) + elif expr.op == "~": + yield "NOT " + yield from self._generate(expr.col, bracket=True) + elif expr.op == "IS_NULL": + yield from self._generate(expr.col, bracket=True) + yield " IS NULL" + elif expr.op == "NOT_NULL": + yield from self._generate(expr.col, bracket=True) + yield " IS NOT NULL" + else: + raise NotImplementedError(expr) # pragma: no cover + + def _on_common_binary(self, expr: _BinaryOpExpr, bracket: bool) -> Iterable[str]: + assert_or_throw(expr.op in _SUPPORTED_OPERATORS, NotImplementedError(expr)) + if bracket: + yield "(" + if expr.is_distinct: # pragma: no cover + raise FugueBug(f"impossible case {expr}") + yield from self._generate(expr.left, bracket=True) + yield _SUPPORTED_OPERATORS[expr.op] + yield from self._generate(expr.right, bracket=True) + if bracket: + yield ")" + + def _on_common_func(self, expr: _FuncExpr) -> Iterable[str]: + def to_str(v: Any) -> Iterable[str]: + if isinstance(v, ColumnExpr): + yield from self._generate(v) + else: + yield from self._generate(lit(v)) + + def get_args() -> Iterable[str]: + for x in expr.args: + yield from to_str(x) + yield "," + for k, v in expr.kwargs.items(): + yield k + yield "=" + yield from to_str(v) + yield "," + + args = list(get_args()) + if len(args) > 0: + args = args[:-1] + yield expr.func + yield "(" + if expr.is_distinct: + yield "DISTINCT " + yield from args + yield ")" diff --git a/fugue/execution/execution_engine.py b/fugue/execution/execution_engine.py index d5e98634..03abdfd8 100644 --- a/fugue/execution/execution_engine.py +++ b/fugue/execution/execution_engine.py @@ -1,12 +1,14 @@ import logging from abc import ABC, abstractmethod -from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union +from threading import RLock +from typing import Any, Callable, Dict, Iterable, List, Optional, Union from fugue.collections.partition import ( EMPTY_PARTITION_SPEC, PartitionCursor, PartitionSpec, ) +from fugue.column import ColumnExpr, SelectColumns, SQLExpressionGenerator, col, is_agg from fugue.constants import FUGUE_DEFAULT_CONF from fugue.dataframe import DataFrame, DataFrames from fugue.dataframe.array_dataframe import ArrayDataFrame @@ -20,7 +22,6 @@ from triad.utils.assertion import assert_or_throw from triad.utils.convert import to_size from triad.utils.string import validate_triad_var_name -from threading import RLock _DEFAULT_JOIN_KEYS: List[str] = [] @@ -245,15 +246,6 @@ def map( """ raise NotImplementedError - def aggregate( - self, - df: DataFrame, - funcs: List[Tuple[str, Any, str]], - partition_spec: PartitionSpec, - metadata: Any = None, - ) -> DataFrame: # pragma: no cover - raise NotImplementedError - @abstractmethod def broadcast(self, df: DataFrame) -> DataFrame: # pragma: no cover """Broadcast the dataframe to all workers for a distributed computing framework @@ -516,6 +508,222 @@ def take( """ pass + def select( + self, + df: DataFrame, + cols: SelectColumns, + where: Optional[ColumnExpr] = None, + having: Optional[ColumnExpr] = None, + metadata: Any = None, + ) -> DataFrame: + """The functional interface for SQL select statement + + :param df: the dataframe to be operated on + :param cols: column expressions + :param where: ``WHERE`` condition expression, defaults to None + :param having: ``having`` condition expression, defaults to None. It + is used when ``cols`` contains aggregation columns, defaults to None + :param metadata: dict-like object to add to the result dataframe, + defaults to None. It's currently not used + :return: the select result as a dataframe + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. attention:: + + This interface is experimental, it's subjected to change in new versions. + + .. seealso:: + + Please find more expression examples in :mod:`fugue.column.sql` and + :mod:`fugue.column.functions` + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + + # select existed and new columns + engine.select(df, SelectColumns(col("a"),col("b"),lit(1,"another"))) + engine.select(df, SelectColumns(col("a"),(col("b")+lit(1)).alias("x"))) + + # aggregation + # SELECT COUNT(DISTINCT *) AS x FROM df + engine.select( + df, + SelectColumns(f.count_distinct(col("*")).alias("x"))) + + # SELECT a, MAX(b+1) AS x FROM df GROUP BY a + engine.select( + df, + SelectColumns(col("a"),f.max(col("b")+lit(1)).alias("x"))) + + # SELECT a, MAX(b+1) AS x FROM df + # WHERE b<2 AND a>1 + # GROUP BY a + # HAVING MAX(b+1)>0 + engine.select( + df, + SelectColumns(col("a"),f.max(col("b")+lit(1)).alias("x")), + where=(col("b")<2) & (col("a")>1), + having=f.max(col("b")+lit(1))>0 + ) + """ + gen = SQLExpressionGenerator(enable_cast=False) + sql = gen.select(cols, "df", where=where, having=having) + res = self.sql_engine.select(DataFrames(df=self.to_df(df)), sql) + diff = gen.correct_select_schema(df.schema, cols, res.schema) + return res if diff is None else res.alter_columns(diff) + + def filter( + self, df: DataFrame, condition: ColumnExpr, metadata: Any = None + ) -> DataFrame: + """Filter rows by the given condition + + :param df: the dataframe to be filtered + :param condition: (boolean) column expression + :param metadata: dict-like object to add to the result dataframe, + defaults to None. It's currently not used + :return: the filtered dataframe + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. seealso:: + + Please find more expression examples in :mod:`fugue.column.sql` and + :mod:`fugue.column.functions` + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + + engine.filter(df, (col("a")>1) & (col("b")=="x")) + engine.filter(df, f.coalesce(col("a"),col("b"))>1) + """ + return self.select( + df, cols=SelectColumns(col("*")), where=condition, metadata=metadata + ) + + def assign( + self, df: DataFrame, columns: List[ColumnExpr], metadata: Any = None + ) -> DataFrame: + """Update existing columns with new values and add new columns + + :param df: the dataframe to set columns + :param columns: column expressions + :param metadata: dict-like object to add to the result dataframe, + defaults to None. It's currently not used + :return: the updated dataframe + + .. tip:: + + This can be used to cast data types, alter column values or add new + columns. But you can't use aggregation in columns. + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. seealso:: + + Please find more expression examples in :mod:`fugue.column.sql` and + :mod:`fugue.column.functions` + + .. admonition:: Examples + + .. code-block:: python + + # assume df has schema: a:int,b:str + + # add constant column x + engine.assign(df, lit(1,"x")) + + # change column b to be a constant integer + engine.assign(df, lit(1,"b")) + + # add new x to be a+b + engine.assign(df, (col("a")+col("b")).alias("x")) + + # cast column a data type to double + engine.assign(df, col("a").cast(float)) + """ + SelectColumns( + *columns + ).assert_no_wildcard().assert_all_with_names().assert_no_agg() + + cols = [col(n) for n in df.schema.names] + for c in columns: + if c.output_name not in df.schema: + cols.append(c) + else: + cols[df.schema.index_of_key(c.output_name)] = c + return self.select(df, SelectColumns(*cols), metadata=metadata) + + def aggregate( + self, + df: DataFrame, + partition_spec: Optional[PartitionSpec], + agg_cols: List[ColumnExpr], + metadata: Any = None, + ): + """Aggregate on dataframe + + :param df: the dataframe to aggregate on + :param partition_spec: PartitionSpec to specify partition keys + :param agg_cols: aggregation expressions + :param metadata: dict-like object to add to the result dataframe, + defaults to None. It's currently not used + :return: the aggregated result as a dataframe + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. seealso:: + + Please find more expression examples in :mod:`fugue.column.sql` and + :mod:`fugue.column.functions` + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + + # SELECT MAX(b) AS b FROM df + engine.aggregate( + df, + partition_spec=None, + agg_cols=[f.max(col("b"))]) + + # SELECT a, MAX(b) AS x FROM df GROUP BY a + engine.aggregate( + df, + partition_spec=PartitionSpec(by=["a"]), + agg_cols=[f.max(col("b")).alias("x")]) + """ + assert_or_throw(len(agg_cols) > 0, ValueError("agg_cols can't be empty")) + assert_or_throw( + all(is_agg(x) for x in agg_cols), + ValueError("all agg_cols must be aggregation functions"), + ) + keys: List[ColumnExpr] = [] + if partition_spec is not None and len(partition_spec.partition_by) > 0: + keys = [col(y) for y in partition_spec.partition_by] + cols = SelectColumns(*keys, *agg_cols) + return self.select(df, cols=cols, metadata=metadata) + def convert_yield_dataframe(self, df: DataFrame) -> DataFrame: """Convert a yield dataframe to a dataframe that can be used after this execution engine stops. @@ -565,17 +773,19 @@ def zip( :return: a zipped dataframe, the metadata of the dataframe will indicate it's zipped - :Notice: + .. note:: - * Different from join, ``df1`` and ``df2`` can have common columns that you will - not use as partition keys. - * If ``on`` is not specified it will also use the common columns of the two - dataframes (if it's not a cross zip) - * For non-cross zip, the two dataframes must have common columns, or error will - be thrown + * Different from join, ``df1`` and ``df2`` can have common columns that you + will not use as partition keys. + * If ``on`` is not specified it will also use the common columns of the two + dataframes (if it's not a cross zip) + * For non-cross zip, the two dataframes must have common columns, or error + will be thrown - For more details and examples, read - :ref:`Zip & Comap `. + .. seealso:: + + For more details and examples, read + :ref:`Zip & Comap `. """ on = list(partition_spec.partition_by) how = how.lower() diff --git a/fugue/extensions/_builtins/__init__.py b/fugue/extensions/_builtins/__init__.py index 530f7db8..59cc6698 100644 --- a/fugue/extensions/_builtins/__init__.py +++ b/fugue/extensions/_builtins/__init__.py @@ -8,11 +8,14 @@ Show, ) from fugue.extensions._builtins.processors import ( + Aggregate, AlterColumns, + Assign, Distinct, DropColumns, Dropna, Fillna, + Filter, Rename, RunJoin, RunSetOperation, @@ -20,6 +23,7 @@ RunTransformer, Sample, SaveAndUse, + Select, SelectColumns, Take, Zip, diff --git a/fugue/extensions/_builtins/processors.py b/fugue/extensions/_builtins/processors.py index e3422fd3..80d5c7c3 100644 --- a/fugue/extensions/_builtins/processors.py +++ b/fugue/extensions/_builtins/processors.py @@ -8,6 +8,7 @@ LocalDataFrame, to_local_bounded_df, ) +from fugue.column import ColumnExpr, SelectColumns as ColumnsSelect from fugue.exceptions import FugueWorkflowError from fugue.execution import make_sql_engine from fugue.execution.execution_engine import _generate_comap_empty_dfs @@ -169,6 +170,53 @@ def process(self, dfs: DataFrames) -> DataFrame: ) +class Select(Processor): + def validate_on_compile(self): + sc = self.params.get_or_throw("columns", ColumnsSelect) + sc.assert_all_with_names() + + def process(self, dfs: DataFrames) -> DataFrame: + assert_or_throw(len(dfs) == 1, FugueWorkflowError("not single input")) + columns = self.params.get_or_throw("columns", ColumnsSelect) + where = None if "where" not in self.params else self.params["where"] + having = None if "having" not in self.params else self.params["having"] + return self.execution_engine.select( + df=dfs[0], cols=columns, where=where, having=having + ) + + +class Filter(Processor): + def validate_on_compile(self): + self.params.get_or_throw("condition", ColumnExpr) + + def process(self, dfs: DataFrames) -> DataFrame: + assert_or_throw(len(dfs) == 1, FugueWorkflowError("not single input")) + condition = self.params.get_or_throw("condition", ColumnExpr) + return self.execution_engine.filter(df=dfs[0], condition=condition) + + +class Assign(Processor): + def validate_on_compile(self): + self.params.get_or_throw("columns", list) + + def process(self, dfs: DataFrames) -> DataFrame: + assert_or_throw(len(dfs) == 1, FugueWorkflowError("not single input")) + columns = self.params.get_or_throw("columns", list) + return self.execution_engine.assign(df=dfs[0], columns=columns) + + +class Aggregate(Processor): + def validate_on_compile(self): + self.params.get_or_throw("columns", list) + + def process(self, dfs: DataFrames) -> DataFrame: + assert_or_throw(len(dfs) == 1, FugueWorkflowError("not single input")) + columns = self.params.get_or_throw("columns", list) + return self.execution_engine.aggregate( + df=dfs[0], partition_spec=self.partition_spec, agg_cols=columns + ) + + class Rename(Processor): def validate_on_compile(self): self.params.get_or_throw("columns", dict) diff --git a/fugue/workflow/workflow.py b/fugue/workflow/workflow.py index 3804bd9c..f8eb5c75 100644 --- a/fugue/workflow/workflow.py +++ b/fugue/workflow/workflow.py @@ -6,6 +6,9 @@ from adagio.specs import WorkflowSpec from fugue.collections.partition import PartitionSpec from fugue.collections.yielded import Yielded +from fugue.column import ColumnExpr +from fugue.column import SelectColumns as ColumnsSelect +from fugue.column import col, lit from fugue.constants import ( FUGUE_CONF_WORKFLOW_AUTO_PERSIST, FUGUE_CONF_WORKFLOW_AUTO_PERSIST_VALUE, @@ -15,6 +18,7 @@ from fugue.exceptions import FugueWorkflowCompileError, FugueWorkflowError from fugue.execution.factory import make_execution_engine from fugue.extensions._builtins import ( + Aggregate, AlterColumns, AssertEqual, AssertNotEqual, @@ -22,6 +26,7 @@ DropColumns, Dropna, Fillna, + Filter, Load, LoadYielded, Rename, @@ -33,7 +38,9 @@ Sample, Save, SaveAndUse, + Select, SelectColumns, + Assign, Show, Take, Zip, @@ -270,6 +277,211 @@ def assert_not_eq(self, *dfs: Any, **params: Any) -> None: """ self.workflow.assert_not_eq(self, *dfs, **params) + def select( + self: TDF, + *columns: Union[str, ColumnExpr], + where: Optional[ColumnExpr] = None, + having: Optional[ColumnExpr] = None, + distinct: bool = False, + ) -> TDF: + """The functional interface for SQL select statement + + :param columns: column expressions, for strings they will represent + the column names + :param where: ``WHERE`` condition expression, defaults to None + :param having: ``having`` condition expression, defaults to None. It + is used when ``cols`` contains aggregation columns, defaults to None + :param distinct: whether to return distinct result, defaults to False + :return: the select result as a new dataframe + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. attention:: + + This interface is experimental, it's subjected to change in new versions. + + .. seealso:: + + Please find more expression examples in :mod:`fugue.column.sql` and + :mod:`fugue.column.functions` + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + from fugue import FugueWorkflow + + dag = FugueWorkflow() + df = dag.df(pandas_df) + + # select existed and new columns + df.select("a","b",lit(1,"another"))) + df.select("a",(col("b")+lit(1)).alias("x")) + + # select distinct + df.select("a","b",lit(1,"another")),distinct=True) + + # aggregation + # SELECT COUNT(DISTINCT *) AS x FROM df + df.select(f.count_distinct(col("*")).alias("x")) + + # SELECT a, MAX(b+1) AS x FROM df GROUP BY a + df.select("a",f.max(col("b")+lit(1)).alias("x")) + + # SELECT a, MAX(b+1) AS x FROM df + # WHERE b<2 AND a>1 + # GROUP BY a + # HAVING MAX(b+1)>0 + df.select( + "a",f.max(col("b")+lit(1)).alias("x"), + where=(col("b")<2) & (col("a")>1), + having=f.max(col("b")+lit(1))>0 + ) + """ + sc = ColumnsSelect( + *[col(x) if isinstance(x, str) else x for x in columns], + arg_distinct=distinct, + ) + df = self.workflow.process( + self, using=Select, params=dict(columns=sc, where=where, having=having) + ) + return self._to_self_type(df) + + def filter(self: TDF, condition: ColumnExpr) -> TDF: + """Filter rows by the given condition + + :param df: the dataframe to be filtered + :param condition: (boolean) column expression + :return: a new filtered dataframe + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. seealso:: + + Please find more expression examples in :mod:`fugue.column.sql` and + :mod:`fugue.column.functions` + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + from fugue import FugueWorkflow + + dag = FugueWorkflow() + df = dag.df(pandas_df) + + df.filter((col("a")>1) & (col("b")=="x")) + df.filter(f.coalesce(col("a"),col("b"))>1) + """ + df = self.workflow.process(self, using=Filter, params=dict(condition=condition)) + return self._to_self_type(df) + + def assign(self: TDF, *args: ColumnExpr, **kwargs: Any) -> TDF: + """Update existing columns with new values and add new columns + + :param df: the dataframe to set columns + :param args: column expressions + :param kwargs: column expressions to be renamed to the argument names, + if a value is not `ColumnExpr`, it will be treated as a literal + :return: a new dataframe with the updated values + + .. tip:: + + This can be used to cast data types, alter column values or add new + columns. But you can't use aggregation in columns. + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. seealso:: + + Please find more expression examples in :mod:`fugue.column.sql` and + :mod:`fugue.column.functions` + + .. admonition:: Examples + + .. code-block:: python + + from fugue import FugueWorkflow + + dag = FugueWorkflow() + df = dag.df(pandas_df) + + # add/set 1 as column x + df.assign(lit(1,"x")) + df.assign(x=1) + + # add/set x to be a+b + df.assign((col("a")+col("b")).alias("x")) + df.assign(x=col("a")+col("b")) + + # cast column a data type to double + df.assign(col("a").cast(float)) + + # cast + new columns + df.assign(col("a").cast(float),x=1,y=col("a")+col("b")) + """ + kv: List[ColumnExpr] = [ + v.alias(k) if isinstance(v, ColumnExpr) else lit(v).alias(k) + for k, v in kwargs.items() + ] + df = self.workflow.process( + self, using=Assign, params=dict(columns=list(args) + kv) + ) + return self._to_self_type(df) + + def aggregate(self: TDF, *agg_cols: ColumnExpr, **kwagg_cols: ColumnExpr) -> TDF: + """Aggregate on dataframe + + :param df: the dataframe to aggregate on + :param agg_cols: aggregation expressions + :param kwagg_cols: aggregation expressions to be renamed to the argument names + :return: the aggregated result as a dataframe + + .. admonition:: New Since + :class: hint + + **0.6.0** + + .. seealso:: + + Please find more expression examples in :mod:`fugue.column.sql` and + :mod:`fugue.column.functions` + + .. admonition:: Examples + + .. code-block:: python + + import fugue.column.functions as f + + # SELECT MAX(b) AS b FROM df + df.aggregate(f.max(col("b"))) + + # SELECT a, MAX(b) AS x FROM df GROUP BY a + df.partition_by("a").aggregate(f.max(col("b")).alias("x")) + df.partition_by("a").aggregate(x=f.max(col("b"))) + """ + columns: List[ColumnExpr] = list(agg_cols) + [ + v.alias(k) for k, v in kwagg_cols.items() + ] + df = self.workflow.process( + self, + using=Aggregate, + params=dict(columns=columns), + pre_partition=self.partition_spec, + ) + return self._to_self_type(df) + def transform( self: TDF, using: Any, diff --git a/fugue_dask/dataframe.py b/fugue_dask/dataframe.py index 823494b2..eceda3f6 100644 --- a/fugue_dask/dataframe.py +++ b/fugue_dask/dataframe.py @@ -8,6 +8,7 @@ from fugue.exceptions import FugueDataFrameInitError, FugueDataFrameOperationError from triad.collections.schema import Schema from triad.utils.assertion import assert_arg_not_none, assert_or_throw +from triad.utils.pyarrow import to_pandas_dtype from fugue_dask._constants import ( FUGUE_DASK_CONF_DATAFRAME_DEFAULT_PARTITIONS, @@ -143,6 +144,7 @@ def alter_columns(self, columns: Any) -> DataFrame: if new_schema == self.schema: return self new_pdf = self.native.assign() + pd_types = to_pandas_dtype(new_schema.pa_schema) for k, v in new_schema.items(): if not v.type.equals(self.schema[k].type): old_type = self.schema[k].type @@ -160,10 +162,24 @@ def alter_columns(self, columns: Any) -> DataFrame: positive = series != 0 new_pdf[k] = "False" new_pdf[k] = new_pdf[k].mask(positive, "True").mask(ns, None) + # str -> bool + elif pa.types.is_string(old_type) and pa.types.is_boolean(new_type): + series = new_pdf[k] + ns = series.isnull() + new_pdf[k] = ( + series.fillna("true") + .apply(lambda x: None if x is None else x.lower()) + .mask(ns, None) + ) + elif pa.types.is_integer(new_type): + series = new_pdf[k] + ns = series.isnull() + series = series.fillna(0).astype(pd_types[k]) + new_pdf[k] = series.mask(ns, None) else: series = new_pdf[k] ns = series.isnull() - series = series.astype(str) + series = series.astype(pd_types[k]) new_pdf[k] = series.mask(ns, None) return DaskDataFrame(new_pdf, new_schema, type_safe=True) diff --git a/fugue_spark/_utils/convert.py b/fugue_spark/_utils/convert.py index 0f072c95..86125dc1 100644 --- a/fugue_spark/_utils/convert.py +++ b/fugue_spark/_utils/convert.py @@ -55,10 +55,11 @@ def to_cast_expression( if schema1[i].dataType != schema2[i].dataType: type2 = schema2[i].dataType.simpleString() if isinstance(schema1[i].dataType, pt.FractionalType) and isinstance( - schema2[i].dataType, pt.StringType + schema2[i].dataType, (pt.StringType, pt.IntegralType) ): expr.append( - f"CAST(IF(isnan({schema1[i].name}), NULL, {schema1[i].name})" + f"CAST(IF(isnan({schema1[i].name}) OR {schema1[i].name} IS NULL" + f", NULL, {schema1[i].name})" f" AS {type2}) {schema2[i].name}" ) else: diff --git a/fugue_spark/execution_engine.py b/fugue_spark/execution_engine.py index 5649d515..ba98d198 100644 --- a/fugue_spark/execution_engine.py +++ b/fugue_spark/execution_engine.py @@ -19,6 +19,7 @@ LocalDataFrame, LocalDataFrameIterableDataFrame, ) +from fugue.dataframe.array_dataframe import ArrayDataFrame from fugue.dataframe.arrow_dataframe import ArrowDataFrame from fugue.dataframe.pandas_dataframe import PandasDataFrame from fugue.dataframe.utils import get_join_schemas @@ -174,6 +175,17 @@ def to_df( ) if isinstance(df, SparkDataFrame): return df + if isinstance(df, ArrowDataFrame): + sdf = self.spark_session.createDataFrame( + df.as_array(), to_spark_schema(df.schema) + ) + return SparkDataFrame(sdf, df.schema, df.metadata) + if isinstance(df, (ArrayDataFrame, IterableDataFrame)): + adf = ArrowDataFrame(df.as_array(type_safe=False), df.schema) + sdf = self.spark_session.createDataFrame( + adf.as_array(), to_spark_schema(df.schema) + ) + return SparkDataFrame(sdf, df.schema, df.metadata) if any(pa.types.is_struct(t) for t in df.schema.types): sdf = self.spark_session.createDataFrame( df.as_array(type_safe=True), to_spark_schema(df.schema) diff --git a/fugue_test/builtin_suite.py b/fugue_test/builtin_suite.py index 98663872..4e5fc7db 100644 --- a/fugue_test/builtin_suite.py +++ b/fugue_test/builtin_suite.py @@ -33,9 +33,12 @@ output_transformer, outputter, processor, - transformer, register_default_sql_engine, + transformer, ) +from fugue.column import col +from fugue.column import functions as ff +from fugue.column import lit from fugue.dataframe.utils import _df_eq as df_eq from fugue.exceptions import ( FugueInterfacelessError, @@ -698,6 +701,80 @@ def test_join(self): "a:int,b:int,c:int,d:int", ).assert_eq(d) + def test_df_select(self): + with self.dag() as dag: + # wildcard + a = dag.df([[1, 10], [2, 20], [3, 30]], "x:int,y:int") + a.select("*").assert_eq(a) + + # column transformation + b = dag.df( + [[1, 10, 11, "x"], [2, 20, 22, "x"], [3, 30, 33, "x"]], + "x:int,y:int,c:int,d:str", + ) + a.select( + "*", + (col("x") + col("y")).cast("int32").alias("c"), + lit("x", "d"), + ).assert_eq(b) + + # distinct + a = dag.df([[1, 10], [2, 20], [1, 10]], "x:int,y:int") + b = dag.df([[1, 10], [2, 20]], "x:int,y:int") + a.select("*", distinct=True).assert_eq(b) + + # aggregation plus infer alias + a = dag.df([[1, 10], [1, 20], [3, 30]], "x:int,y:int") + b = dag.df([[1, 30], [3, 30]], "x:int,y:int") + a.select("x", ff.sum(col("y")).cast("int32")).assert_eq(b) + + # all together + a = dag.df([[1, 10], [1, 20], [3, 35], [3, 40]], "x:int,y:int") + b = dag.df([[3, 35]], "x:int,z:int") + a.select( + "x", + ff.sum(col("y")).alias("z").cast("int32"), + where=col("y") < 40, + having=ff.sum(col("y")) > 30, + ).assert_eq(b) + + b = dag.df([[65]], "z:long") + a.select( + ff.sum(col("y")).alias("z").cast(int), where=col("y") < 40 + ).show() + + raises(ValueError, lambda: a.select("*", "x")) + + def test_df_filter(self): + with self.dag() as dag: + a = dag.df([[1, 10], [2, 20], [3, 30]], "x:int,y:int") + b = dag.df([[2, 20]], "x:int,y:int") + a.filter((col("y") > 15) & (col("y") < 25)).assert_eq(b) + + def test_df_assign(self): + with self.dag() as dag: + a = dag.df([[1, 10], [2, 20], [3, 30]], "x:int,y:int") + b = dag.df([[1, "x"], [2, "x"], [3, "x"]], "x:int,y:str") + a.assign(y="x").assert_eq(b) + + a = dag.df([[1, 10], [2, 20], [3, 30]], "x:int,y:int") + b = dag.df( + [[1, "x", 11], [2, "x", 21], [3, "x", 31]], "x:int,y:str,z:double" + ) + a.assign(lit("x").alias("y"), z=(col("y") + 1).cast(float)).assert_eq(b) + + def test_aggregate(self): + with self.dag() as dag: + a = dag.df([[1, 10], [1, 200], [3, 30]], "x:int,y:int") + b = dag.df([[1, 200], [3, 30]], "x:int,y:int") + c = dag.df([[-200, 200, 70]], "y:int,zz:int,ww:int") + a.partition_by("x").aggregate(ff.max(col("y"))).assert_eq(b) + a.aggregate( + ff.min(-col("y")), + zz=ff.max(col("y")), + ww=((ff.min(col("y")) + ff.max(col("y"))) / 3).cast("int32"), + ).assert_eq(c) + def test_select(self): class MockEngine(SqliteEngine): def __init__(self, execution_engine, p: int = 0): diff --git a/fugue_test/dataframe_suite.py b/fugue_test/dataframe_suite.py index a2336c82..129e2100 100644 --- a/fugue_test/dataframe_suite.py +++ b/fugue_test/dataframe_suite.py @@ -276,11 +276,22 @@ def test_alter_columns(self): assert [["a", "1"], ["c", None]] == ndf.as_array(type_safe=True) assert ndf.schema == "a:str,b:str" + # int -> double + df = self.df([["a", 1], ["c", None]], "a:str,b:int") + ndf = df.alter_columns("b:double") + assert [["a", 1], ["c", None]] == ndf.as_array(type_safe=True) + assert ndf.schema == "a:str,b:double" + # double -> str df = self.df([["a", 1.1], ["b", None]], "a:str,b:double") data = df.alter_columns("b:str").as_array(type_safe=True) assert [["a", "1.1"], ["b", None]] == data + # double -> int + df = self.df([["a", 1.0], ["b", None]], "a:str,b:double") + data = df.alter_columns("b:int").as_array(type_safe=True) + assert [["a", 1], ["b", None]] == data + # date -> str df = self.df( [["a", date(2020, 1, 1)], ["b", date(2020, 1, 2)], ["c", None]], diff --git a/fugue_test/execution_suite.py b/fugue_test/execution_suite.py index 1969d740..b3e736a0 100644 --- a/fugue_test/execution_suite.py +++ b/fugue_test/execution_suite.py @@ -6,6 +6,7 @@ from datetime import datetime from unittest import TestCase +import fugue.column.functions as ff import pandas as pd import pytest from fugue import ( @@ -16,6 +17,7 @@ PartitionSpec, register_default_sql_engine, ) +from fugue.column import SelectColumns, col, lit from fugue.dataframe.utils import _df_eq as df_eq from fugue.execution.native_execution_engine import NativeExecutionEngine from pytest import raises @@ -89,6 +91,161 @@ def test_to_df_general(self): pdf = pdf[pdf.a < 0] df_eq(o, e.to_df(pdf), throw=True) + def test_filter(self): + e = self.engine + o = ArrayDataFrame( + [[1, 2], [None, 2], [None, 1], [3, 4], [None, 4]], + "a:double,b:int", + dict(a=1), + ) + a = e.to_df(o) + b = e.filter(a, col("a").not_null()) + df_eq(b, [[1, 2], [3, 4]], "a:double,b:int", throw=True) + c = e.filter(a, col("a").not_null() & (col("b") < 3)) + df_eq(c, [[1, 2]], "a:double,b:int", throw=True) + c = e.filter(a, col("a") + col("b") == 3) + df_eq(c, [[1, 2]], "a:double,b:int", throw=True) + + def test_select(self): + e = self.engine + o = ArrayDataFrame( + [[1, 2], [None, 2], [None, 1], [3, 4], [None, 4]], + "a:double,b:int", + dict(a=1), + ) + a = e.to_df(o) + + # simple + b = e.select( + a, SelectColumns(col("b"), (col("b") + 1).alias("c").cast(str)) + ) + df_eq( + b, + [[2, "3"], [2, "3"], [1, "2"], [4, "5"], [4, "5"]], + "b:int,c:str", + throw=True, + ) + + # with distinct + b = e.select( + a, + SelectColumns( + col("b"), (col("b") + 1).alias("c").cast(str), arg_distinct=True + ), + ) + df_eq( + b, + [[2, "3"], [1, "2"], [4, "5"]], + "b:int,c:str", + throw=True, + ) + + # wildcard + b = e.select(a, SelectColumns(col("*")), where=col("a") + col("b") == 3) + df_eq(b, [[1, 2]], "a:double,b:int", throw=True) + + # aggregation + b = e.select( + a, SelectColumns(col("a"), ff.sum(col("b")).cast(float).alias("b")) + ) + df_eq(b, [[1, 2], [3, 4], [None, 7]], "a:double,b:double", throw=True) + + # having + # https://github.com/fugue-project/fugue/issues/222 + col_b = ff.sum(col("b")) + b = e.select( + a, + SelectColumns(col("a"), col_b.cast(float).alias("b")), + having=(col_b >= 7) | (col("a") == 1), + ) + df_eq(b, [[1, 2], [None, 7]], "a:double,b:double", throw=True) + + # literal + alias inference + # https://github.com/fugue-project/fugue/issues/222 + col_b = ff.sum(col("b")) + b = e.select( + a, + SelectColumns(col("a"), lit(1, "o").cast(str), col_b.cast(float)), + having=(col_b >= 7) | (col("a") == 1), + ) + df_eq( + b, [[1, "1", 2], [None, "1", 7]], "a:double,o:str,b:double", throw=True + ) + + def test_assign(self): + e = self.engine + o = ArrayDataFrame( + [[1, 2], [None, 2], [None, 1], [3, 4], [None, 4]], + "a:double,b:int", + dict(a=1), + ) + a = e.to_df(o) + + b = e.assign( + a, + [lit(1, "x"), col("b").cast(str), (col("b") + 1).alias("c").cast(int)], + ) + df_eq( + b, + [ + [1, "2", 1, 3], + [None, "2", 1, 3], + [None, "1", 1, 2], + [3, "4", 1, 5], + [None, "4", 1, 5], + ], + "a:double,b:str,x:long,c:long", + throw=True, + ) + + def test_aggregate(self): + e = self.engine + o = ArrayDataFrame( + [[1, 2], [None, 2], [None, 1], [3, 4], [None, 4]], + "a:double,b:int", + dict(a=1), + ) + a = e.to_df(o) + + b = e.aggregate( + df=a, + partition_spec=None, + agg_cols=[ + ff.max(col("b")), + (ff.max(col("b")) * 2).cast("int32").alias("c"), + ], + ) + df_eq(b, [[4, 8]], "b:int,c:int", throw=True) + + b = e.aggregate( + df=a, + partition_spec=PartitionSpec(by=["a"]), + agg_cols=[ + ff.max(col("b")), + (ff.max(col("b")) * 2).cast("int32").alias("c"), + ], + ) + df_eq( + b, + [[None, 4, 8], [1, 2, 4], [3, 4, 8]], + "a:double,b:int,c:int", + throw=True, + ) + + with raises(ValueError): + e.aggregate( + df=a, + partition_spec=PartitionSpec(by=["a"]), + agg_cols=[ff.max(col("b")), lit(1)], + ) + + with raises(ValueError): + e.aggregate( + df=a, + partition_spec=PartitionSpec(by=["a"]), + agg_cols=[], + ) + def test_map(self): def noop(cursor, data): return data diff --git a/fugue_version/__init__.py b/fugue_version/__init__.py index 1cc82e6b..906d362f 100644 --- a/fugue_version/__init__.py +++ b/fugue_version/__init__.py @@ -1 +1 @@ -__version__ = "0.5.7" +__version__ = "0.6.0" diff --git a/setup.cfg b/setup.cfg index 0d10e90d..e4fa31aa 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,7 +22,7 @@ omit = fugue_sql/_antlr/* [flake8] -ignore = E24,E203,W503,C408,A003,W504,C407,C405 +ignore = E24,E203,W503,C401,C408,A001,A003,W504,C407,C405 max-line-length = 88 format = pylint exclude = .svc,CVS,.bzr,.hg,.git,__pycache__,venv,tests/*,docs/* diff --git a/tests/fugue/column/__init__.py b/tests/fugue/column/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/fugue/column/test_expressions.py b/tests/fugue/column/test_expressions.py new file mode 100644 index 00000000..a4db5992 --- /dev/null +++ b/tests/fugue/column/test_expressions.py @@ -0,0 +1,206 @@ +import pyarrow as pa +from fugue.column import col, function, lit, null +from fugue.column.expressions import _get_column_mentions +from fugue.column.functions import coalesce +from pytest import raises +from triad import Schema, to_uuid + + +def test_named_col(): + assert "*" == str(col("*")) + assert col("*").wildcard + assert "" == col("*").infer_alias().output_name + raises(ValueError, lambda: col("*").alias("x")) + raises(ValueError, lambda: col("*").cast("long")) + + assert "a" == str(col("a")) + assert not col("a").wildcard + assert "a" == str(col(col("a"))) + assert "ab AS xx" == str(col("ab").alias("xx")) + assert "ab AS xx" == str(col("ab", "xx").cast(None)) + assert "CAST(ab AS long) AS xx" == str(col("ab", "xx").cast("long")) + + assert "ab AS xx" == str(col("ab").alias("xx")) + + assert "ab AS xx" == str(col("ab").alias("xx")) + assert "CAST(ab AS long) AS xx" == str(col("ab").alias("xx").cast(int)) + + raises(NotImplementedError, lambda: col([1, 2])) + + assert to_uuid(col("a")) != to_uuid(col("b")) + assert to_uuid(col("a")) != to_uuid(col("a").alias("v")) + assert to_uuid(col("a")) != to_uuid(col("a").cast(int)) + assert to_uuid(col("a").cast(int).alias("v")) == to_uuid( + col("a").alias("v").cast(int) + ) + + assert "" == col("a").infer_alias().as_name + assert "a" == str(col("a").infer_alias()) + assert "a" == col("a").cast(int).infer_alias().as_name + c = col("a").cast(int).infer_alias() + assert "CAST(a AS long) AS a" == str(c) + c = col("a").cast(int).alias("x").infer_alias() + assert "CAST(a AS long) AS x" == str(c) + + +def test_lit_col(): + assert "NULL" == str(lit(None)) + assert "TRUE" == str(null().is_null()) + assert "FALSE" == str(null().not_null()) + + assert "'a'" == str(lit("a")) + assert "'a\"\\'\\\\'" == str(lit("a\"'\\")) + assert "'a' AS x" == str(lit("a", "x")) + assert "TRUE" == str(lit("a").not_null()) + assert "FALSE" == str(lit("a").is_null()) + + assert "1.1" == str(lit(1.1)) + assert "11" == str(lit(11)) + assert "TRUE" == str(lit(True)) + assert "FALSE" == str(lit(False)) + + assert "1 AS xx" == str(lit(1).alias("xx")) + assert "'ab' AS xx" == str(lit("ab").alias("xx")) + + raises(NotImplementedError, lambda: lit([1, 2])) + + assert to_uuid(lit("a")) != to_uuid(col("a")) + assert to_uuid(lit(1)) != to_uuid(lit("1")) + assert to_uuid(null()) == to_uuid(null()) + assert to_uuid(null()) != to_uuid(lit(1)) + assert to_uuid(lit("a")) != to_uuid(lit("a").alias("v")) + assert to_uuid(lit("a")) != to_uuid(lit("a").cast(int)) + assert to_uuid(lit("a").cast(int).alias("v")) == to_uuid( + lit("a").alias("v").cast(int) + ) + + +def test_unary_op(): + assert "-(a)" == str(-col("a")) + assert "a" == (-col("a")).infer_alias().output_name + assert "a" == str(+col("a")) + assert "~(a)" == str(~col("a")) + assert "IS_NULL(a)" == str(col("a").is_null()) + assert "NOT_NULL(a)" == str(col("a").not_null()) + + assert "NOT_NULL(a) AS xx" == str(col("a").not_null().alias("xx")) + assert "NOT_NULL(a)" == str(col("a").not_null()) + assert "NOT_NULL(a) AS xx" == str(col("a").not_null().alias("xx")) + + assert "a" == col("a").not_null().infer_alias().output_name + assert "NOT_NULL(a) AS a" == str(col("a").not_null().infer_alias()) + + assert to_uuid(col("a").not_null()) == to_uuid(col("a").not_null()) + assert to_uuid(col("a").not_null()) != to_uuid(col("a").is_null()) + + +def test_binary_op(): + assert "+(ab,1)" == str(col("ab") + 1) + assert "+(ab,x)" == str(col("ab") + col("x")) + assert "+('x',a)" == str("x" + col("a")) + assert "+('x','a')" == str("x" + lit("a")) + assert "-(a,1)" == str(col("a") - 1) + assert "-(1.1,a)" == str(1.1 - col("a")) + assert "*(a,1)" == str(col("a") * 1) + assert "*(1.1,a)" == str(1.1 * col("a")) + assert "/(a,1)" == str(col("a") / 1) + assert "/(1.1,a)" == str(1.1 / col("a")) + + assert "+(ab,1)" == str((col("ab") + 1)) + assert "+(ab,1) AS xx" == str((col("ab") + 1).alias("xx")) + + assert "+(ab,1) AS xx" == str((col("ab") + 1).alias("xx")) + + assert "&(a,TRUE)" == str(col("a") & True) + assert "&(TRUE,a)" == str(True & col("a")) + assert "&(a,FALSE)" == str(col("a") & False) + assert "&(FALSE,a)" == str(False & col("a")) + + assert "|(a,TRUE)" == str(col("a") | True) + assert "|(TRUE,a)" == str(True | col("a")) + assert "|(a,FALSE)" == str(col("a") | False) + assert "|(FALSE,a)" == str(False | col("a")) + + assert "<(a,1)" == str(col("a") < 1) + assert "<(a,b)" == str(col("a") < col("b")) + assert ">(a,1.1)" == str(1.1 < col("a")) + assert "<(1.1,a)" == str(lit(1.1) < col("a")) + assert "<=(a,1)" == str(col("a") <= 1) + assert ">=(a,1.1)" == str(1.1 <= col("a")) + assert ">(a,1)" == str(col("a") > 1) + assert "<(a,1.1)" == str(1.1 > col("a")) + assert ">=(a,1)" == str(col("a") >= 1) + assert "<=(a,1.1)" == str(1.1 >= col("a")) + + assert "==(a,1)" == str(col("a") == 1) + assert "==(a,1.1)" == str(1.1 == col("a")) + assert "!=(a,1)" == str(col("a") != 1) + assert "!=(a,1.1)" == str(1.1 != col("a")) + + +def test_comb(): + assert "-(+(a,*(10,b)),/(c,d))" == str( + (col("a") + 10 * col("b")) - col("c") / col("d") + ) + assert "|(==(a,1.1),&(&(b,~(c)),TRUE))" == str( + (1.1 == col("a")) | col("b") & ~col("c") & True + ) + + +def test_function(): + expr = function("f", col("x") + col("z"), col("y"), 1, 1.1, False, "t") + assert "f(+(x,z),y,1,1.1,FALSE,'t')" == str(expr) + assert "f(+(x,z),y,1,1.1,FALSE,'t') AS x" == str(expr.alias("x")) + + +def test_coalesce(): + expr = coalesce(col("x") + col("z"), col("y"), 1, 1.1, False, "t") + assert "COALESCE(+(x,z),y,1,1.1,FALSE,'t')" == str(expr) + assert "COALESCE(+(x,z),y,1,1.1,FALSE,'t') AS x" == str(expr.alias("x")) + + +def test_get_column_mentions(): + expr = (col("a") + col("b")) * function("x", col("b"), a=col("c"), b=lit(1)) + assert set(["a", "b", "c"]) == set(_get_column_mentions(expr)) + + +def test_schema_inference(): + schema = Schema("a:int,b:str,c:bool,d:double") + assert pa.int32() == col("a").infer_type(schema) + assert pa.int32() == (-col("a")).infer_type(schema) + assert pa.int64() == (-col("a")).cast(int).infer_type(schema) + assert pa.int64() == (-col("a").cast(int)).infer_type(schema) + assert pa.string() == col("b").infer_type(schema) + assert (-col("b")).infer_type(schema) is None + assert (~col("b")).infer_type(schema) is None + assert pa.bool_() == col("c").infer_type(schema) + assert pa.bool_() == (~col("c")).alias("x").infer_type(schema) + assert pa.float64() == col("d").infer_type(schema) + assert pa.float64() == (-col("d").alias("x")).infer_type(schema) + assert col("x").infer_type(schema) is None + assert pa.string() == col("x").cast(str).infer_type(schema) + assert col("*").infer_type(schema) is None + + assert pa.bool_() == (col("a") < col("d")).infer_type(schema) + assert pa.bool_() == (col("a") > col("d")).infer_type(schema) + assert pa.bool_() == (col("a") <= col("d")).infer_type(schema) + assert pa.bool_() == (col("a") >= col("d")).infer_type(schema) + assert pa.bool_() == (col("a") == col("d")).infer_type(schema) + assert pa.bool_() == (col("a") != col("d")).infer_type(schema) + assert pa.bool_() == (~(col("a") != col("d"))).infer_type(schema) + assert pa.int64() == (~(col("a") != col("d"))).cast(int).infer_type(schema) + + assert (col("a") - col("d")).infer_type(schema) is None + + assert pa.int64() == lit(1).infer_type(schema) + assert pa.string() == lit("a").infer_type(schema) + assert pa.bool_() == lit(False).infer_type(schema) + assert pa.string() == lit(False).cast(str).infer_type(schema) + assert pa.float64() == lit(2.2).infer_type(schema) + assert null().infer_type(schema) is None + assert pa.string() == null().cast(str).infer_type(schema) + + assert function("a", col("a").cast("int")).infer_type(schema) is None + assert pa.string() == function("a", col("a").cast("int")).cast(str).infer_type( + schema + ) diff --git a/tests/fugue/column/test_functions.py b/tests/fugue/column/test_functions.py new file mode 100644 index 00000000..99199d97 --- /dev/null +++ b/tests/fugue/column/test_functions.py @@ -0,0 +1,85 @@ +import fugue.column.functions as f +import pyarrow as pa +from fugue.column import col, lit, null +from triad import Schema + + +def test_is_agg(): + assert f.is_agg(f.first(col("a"))) + assert f.is_agg(f.count_distinct(col("a")).alias("x")) + assert f.is_agg(f.first(col("a") + 1)) + assert f.is_agg(f.first(col("a")) + 1) + assert f.is_agg((f.first(col("a")) < 1).alias("x")) + assert f.is_agg(col("a") * f.first(col("a")) + 1) + + assert not f.is_agg(col("a")) + assert not f.is_agg(lit("a")) + assert not f.is_agg(col("a") + col("b")) + assert not f.is_agg(null()) + + +def test_functions(): + schema = Schema("a:int,b:str,c:bool,d:double") + + expr = f.coalesce(col("a"), 1, None, col("b") + col("c")) + assert "COALESCE(a,1,NULL,+(b,c))" == str(expr) + assert expr.infer_type(schema) is None + + expr = f.min(col("a")) + assert "MIN(a)" == str(expr) + assert pa.int32() == expr.infer_type(schema) + assert "MIN(a) AS a" == str(expr.infer_alias()) + assert "CAST(MIN(a) AS long) AS a" == str(expr.cast(int).infer_alias()) + assert "MIN(a) AS b" == str(expr.alias("b").infer_alias()) + + assert "MIN(-(a)) AS a" == str(f.min(-col("a")).infer_alias()) + + expr = f.min(lit(1.1)) + assert "MIN(1.1)" == str(expr) + assert pa.float64() == expr.infer_type(schema) + + expr = f.max(col("a")) + assert "MAX(a)" == str(expr) + assert pa.int32() == expr.infer_type(schema) + + expr = f.max(lit(1.1)) + assert "MAX(1.1)" == str(expr) + assert pa.float64() == expr.infer_type(schema) + + expr = f.first(col("a")) + assert "FIRST(a)" == str(expr) + assert pa.int32() == expr.infer_type(schema) + + expr = f.first(lit(1.1)) + assert "FIRST(1.1)" == str(expr) + assert pa.float64() == expr.infer_type(schema) + + expr = f.last(col("a")) + assert "LAST(a)" == str(expr) + assert pa.int32() == expr.infer_type(schema) + + expr = f.last(lit(1.1)) + assert "LAST(1.1)" == str(expr) + assert pa.float64() == expr.infer_type(schema) + + expr = f.avg(col("a")) + assert "AVG(a)" == str(expr) + assert expr.infer_type(schema) is None + + expr = f.sum(col("a")) + assert "SUM(a)" == str(expr) + assert expr.infer_type(schema) is None + + expr = f.count(col("a")) + assert "COUNT(a)" == str(expr) + assert expr.infer_type(schema) is None + + expr = f.count_distinct(col("a")) + assert "COUNT(DISTINCT a)" == str(expr) + assert expr.infer_type(schema) is None + assert "COUNT(DISTINCT a) AS a" == str(expr.infer_alias()) + + expr = f.count_distinct(col("*")) + assert "COUNT(DISTINCT *)" == str(expr) + assert expr.infer_type(schema) is None + assert "COUNT(DISTINCT *)" == str(expr.infer_alias()) diff --git a/tests/fugue/column/test_sql.py b/tests/fugue/column/test_sql.py new file mode 100644 index 00000000..8e9b37c1 --- /dev/null +++ b/tests/fugue/column/test_sql.py @@ -0,0 +1,257 @@ +import fugue.column.functions as f +from fugue.column import SelectColumns, SQLExpressionGenerator, col, function, lit, null +from fugue.column.expressions import _BinaryOpExpr +from pytest import raises +from triad import Schema, to_uuid + + +def test_select_columns(): + # not all with names + cols = SelectColumns( + col("a"), lit(1, "b"), col("bb") + col("cc"), f.first(col("c")) + ) + assert to_uuid(cols) == to_uuid(cols) + raises(ValueError, lambda: cols.assert_all_with_names()) + + # distinct + cols2 = SelectColumns( + col("a"), + lit(1, "b"), + col("bb") + col("cc"), + f.first(col("c")), + arg_distinct=True, + ) + assert to_uuid(cols) != to_uuid(cols2) + + # duplicated names + cols = SelectColumns(col("a").alias("b"), lit(1, "b")) + assert to_uuid(cols) != to_uuid(SelectColumns(col("a").alias("b"), lit(1, "c"))) + raises(ValueError, lambda: cols.assert_all_with_names()) + + # with *, all cols must have alias + cols = SelectColumns(col("*"), col("a")).assert_no_agg() + raises(ValueError, lambda: cols.assert_all_with_names()) + + # * can be used at most once + raises(ValueError, lambda: SelectColumns(col("*"), col("*"), col("a").alias("p"))) + + # * can't be used with aggregation + raises(ValueError, lambda: SelectColumns(col("*"), f.first(col("a")).alias("x"))) + + cols = SelectColumns( + col("aa").alias("a").cast(int), + lit(1, "b"), + (col("bb") + col("cc")).alias("c"), + f.first(col("c")).alias("d"), + ).assert_all_with_names() + raises(AssertionError, lambda: cols.assert_no_agg()) + assert not cols.simple + assert 1 == len(cols.simple_cols) + assert "CAST(aa AS long) AS a" == str(cols.simple_cols[0]) + assert cols.has_literals + assert 1 == len(cols.literals) + assert "1 AS b" == str(cols.literals[0]) + assert cols.has_agg + assert 1 == len(cols.non_agg_funcs) + assert "+(bb,cc) AS c" == str(cols.non_agg_funcs[0]) + assert 1 == len(cols.agg_funcs) + assert "FIRST(c) AS d" == str(cols.agg_funcs[0]) + assert 2 == len(cols.group_keys) # a, c + assert "aa" == cols.group_keys[0].output_name + assert "" == cols.group_keys[1].output_name + assert isinstance(cols.group_keys[1], _BinaryOpExpr) + + cols = SelectColumns(col("a")).assert_no_wildcard() + assert cols.simple + assert not cols.has_literals + assert not cols.has_agg + + cols = SelectColumns(col("x"), col("*"), col("y") + col("z")) + cols = cols.replace_wildcard(Schema("a:int,b:int")) + assert "x" == str(cols.all_cols[0]) + + +def test_basic(): + gen = SQLExpressionGenerator() + assert "a" == gen.generate(col("a")) + assert "a AS bc" == gen.generate(col("a").alias("bc")) + + assert "'a'" == gen.generate(lit("a")) + assert "'a' AS bc" == gen.generate(lit("a").alias("bc")) + + assert "CAST(a AS long) AS a" == gen.generate(col("a").cast(int)) + + +def test_select_exprs(): + gen = SQLExpressionGenerator() + assert "(a+2)*3" == gen.generate((col("a") + 2) * 3) + assert "(-a+2)*3" == gen.generate((-col("a") + 2) * 3) + assert "(a*2)/3 AS x" == gen.generate(((col("a") * 2) / 3).alias("x")) + assert "COUNT(DISTINCT a) AS x" == gen.generate( + (f.count_distinct(col("a"))).alias("x") + ) + + +def test_conditions(): + gen = SQLExpressionGenerator() + assert "(a=-1) AND (b>=c)" == gen.generate( + (col("a") == -1) & (col("b") >= col("c")) + ) + assert "TRUE AND (b>=c)" == gen.generate(True & (col("b") >= col("c"))) + assert "TRUE AND NOT (b>=c)" == gen.generate(True & ~(col("b") >= col("c"))) + assert "TRUE OR (b>=c) IS NOT NULL" == gen.generate( + True | (col("b") >= col("c")).not_null() + ) + + +def test_functions(): + gen = SQLExpressionGenerator() + assert "COALESCE(a,b+c,(d+e)-1,NULL) IS NULL" == gen.generate( + f.coalesce( + col("a"), col("b") + col("c"), col("d") + col("e") - 1, null() + ).is_null() + ) + assert ( + "MY(MIN(x),MAX(y+1),AVG(z),2,aa=FIRST(a),bb=LAST('b'),cc=COUNT(DISTINCT *)) AS x" + == gen.generate( + function( + "MY", + f.min(col("x")), + f.max(col("y") + 1), + f.avg(col("z")), + 2, + aa=f.first(col("a")), + bb=f.last(lit("b")), + cc=f.count_distinct(col("*")), + ).alias("x") + ) + ) + + def dummy(expr): + yield "DUMMY" + if expr.is_distinct: + yield " D" + + gen.add_func_handler("MY", dummy) + assert "DUMMY D AS x" == gen.generate( + function("MY", 2, 3, arg_distinct=True).alias("x") + ) + + +def test_where(): + gen = SQLExpressionGenerator() + assert "SELECT * FROM x WHERE (a<5) AND b IS NULL" == gen.where( + (col("a") < 5) & col("b").is_null(), "x" + ) + assert "SELECT * FROM x WHERE a<5" == gen.where((col("a") < 5).alias("x"), "x") + raises(ValueError, lambda: gen.where(f.max(col("a")), "x")) + + +def test_select(): + gen = SQLExpressionGenerator() + + # no aggregation + cols = SelectColumns(col("*")) + assert "SELECT * FROM x" == gen.select(cols, "x") + + cols = SelectColumns(col("a"), lit(1).alias("b"), (col("b") + col("c")).alias("x")) + where = (col("a") > 5).alias("aa") + assert "SELECT a, 1 AS b, b+c AS x FROM t WHERE a>5" == gen.select( + cols, "t", where=where + ) + + # aggregation without literals + cols = SelectColumns(f.max(col("c")).alias("c"), col("a", "aa"), col("b")) + assert "SELECT MAX(c) AS c, a AS aa, b FROM t GROUP BY a, b" == gen.select( + cols, "t" + ) + + where = col("a") < 10 + having = (f.max(col("a")) > 5).alias("aaa") + assert ( + "SELECT MAX(c) AS c, a AS aa, b FROM t WHERE a<10 GROUP BY a, b HAVING MAX(a)>5" + == gen.select(cols, "t", where=where, having=having) + ) + + cols = SelectColumns( + f.min(col("c") + 1).alias("c"), + f.avg(col("d") + col("e")).cast(int).alias("d"), + ) + assert "SELECT MIN(c+1) AS c, CAST(AVG(d+e) AS long) AS d FROM t" == gen.select( + cols, "t" + ) + + # aggregation with literals + cols = SelectColumns( + lit(1, "k"), f.max(col("c")).alias("c"), lit(2, "j"), col("a", "aa"), col("b") + ) + assert ( + "SELECT 1 AS k, c, 2 AS j, aa, b FROM (SELECT MAX(c) AS c, a AS aa, b FROM t GROUP BY a, b)" + == gen.select(cols, "t") + ) + + cols = SelectColumns(lit(1, "k"), f.max(col("c")).alias("c"), lit(2, "j")) + assert "SELECT 1 AS k, c, 2 AS j FROM (SELECT MAX(c) AS c FROM t)" == gen.select( + cols, "t" + ) + + cols = SelectColumns(lit(1, "k"), col("a"), f.max(col("c")).alias("c"), lit(2, "j")) + assert ( + "SELECT 1 AS k, a, c, 2 AS j FROM (SELECT a, MAX(c) AS c FROM t GROUP BY a)" + == gen.select(cols, "t") + ) + + # cast + cols = SelectColumns( + col("c").cast(float), + f.avg(col("d") + col("e")).cast(int).alias("d"), + ) + assert ( + "SELECT CAST(c AS double) AS c, CAST(AVG(d+e) AS long) AS d FROM t GROUP BY c" + == gen.select(cols, "t") + ) + + # infer alias + cols = SelectColumns( + (-col("c")).cast(float), + f.max(col("e")).cast(int), + f.avg(col("d") + col("e")).cast(int).alias("d"), + ) + assert ( + "SELECT CAST(-c AS double) AS c, CAST(MAX(e) AS long) AS e, " + "CAST(AVG(d+e) AS long) AS d FROM t GROUP BY -c" == gen.select(cols, "t") + ) + + +def test_correct_select_schema(): + schema = Schema("a:double,b:str") + gen = SQLExpressionGenerator() + + sc = SelectColumns(col("*"), col("c")) + output = Schema("a:double,b:str,c:str") + c = gen.correct_select_schema(schema, sc, output) + assert c is None + + output = Schema("a:int,b:int,c:str") + c = gen.correct_select_schema(schema, sc, output) + assert c == "a:double,b:str" + + sc = SelectColumns(f.count(col("*")).alias("t"), col("c").alias("a")) + output = Schema("t:int,a:str") + c = gen.correct_select_schema(schema, sc, output) + assert c is None + + sc = SelectColumns((col("a") + col("b")).cast(str).alias("a"), lit(1, "c")) + output = Schema("a:int,c:str") + c = gen.correct_select_schema(schema, sc, output) + assert c == "a:str,c:long" + + +def test_no_cast(): + gen = SQLExpressionGenerator(enable_cast=False) + cols = SelectColumns( + f.max(col("c")).cast("long").alias("c"), col("a", "aa"), col("b") + ) + assert "SELECT MAX(c) AS c, a AS aa, b FROM t GROUP BY a, b" == gen.select( + cols, "t" + ) diff --git a/tests/fugue/workflow/test_workflow_determinism.py b/tests/fugue/workflow/test_workflow_determinism.py index fc00c4e0..2ed5c191 100644 --- a/tests/fugue/workflow/test_workflow_determinism.py +++ b/tests/fugue/workflow/test_workflow_determinism.py @@ -252,6 +252,37 @@ def test_workflow_determinism_7(): assert dag1.spec_uuid() != dag3.spec_uuid() +def test_workflow_determinism_8(): + dag1 = FugueWorkflow() + a1 = dag1.create_data([[0], [0], [1]], "a:int32") + a1.select("a", "b") + a1.show() + + dag2 = FugueWorkflow() + a2 = dag2.create_data([[0], [0], [1]], "a:int32") + a2.select("a", "b") + a2.show() + + dag3 = FugueWorkflow() + a3 = dag3.create_data([[0], [0], [1]], "a:int32") + a3.select("b", "a") + a3.show() + + dag4 = FugueWorkflow() + a4 = dag4.create_data([[0], [0], [1]], "a:int32") + a4.select("a", "b", distinct=True) + a4.show() + + assert a1.spec_uuid() == a2.spec_uuid() + assert dag1.spec_uuid() == dag2.spec_uuid() + + assert a1.spec_uuid() == a3.spec_uuid() + assert dag1.spec_uuid() != dag3.spec_uuid() + + assert a1.spec_uuid() == a4.spec_uuid() + assert dag1.spec_uuid() != dag4.spec_uuid() + + def mock_tf1(df: List[Dict[str, Any]], v: int = 1) -> Iterable[Dict[str, Any]]: for r in df: r["b"] = v * len(df) diff --git a/tests/fugue_spark/test_execution_engine.py b/tests/fugue_spark/test_execution_engine.py index b3548f26..66d4032d 100644 --- a/tests/fugue_spark/test_execution_engine.py +++ b/tests/fugue_spark/test_execution_engine.py @@ -8,6 +8,7 @@ from fugue.collections.partition import PartitionSpec from fugue.dataframe import ( ArrayDataFrame, + ArrowDataFrame, LocalDataFrameIterableDataFrame, PandasDataFrame, ) @@ -40,16 +41,41 @@ def test__join_outer_pandas_incompatible(self): def test_to_df(self): e = self.engine o = ArrayDataFrame( - [[1, 2]], - "a:int,b:int", + [[1, 2], [None, 3]], + "a:double,b:int", dict(a=1), ) a = e.to_df(o) assert a is not o + res = a.native.collect() + assert res[0][0] == 1.0 or res[0][0] is None + assert res[1][0] == 1.0 or res[1][0] is None df_eq(a, o, throw=True) + + o = ArrowDataFrame( + [[1, 2], [None, 3]], + "a:double,b:int", + dict(a=1), + ) + a = e.to_df(o) + assert a is not o + res = a.native.collect() + assert res[0][0] == 1.0 or res[0][0] is None + assert res[1][0] == 1.0 or res[1][0] is None + a = e.to_df([[1, None]], "a:int,b:int", dict(a=1)) df_eq(a, [[1, None]], "a:int,b:int", dict(a=1), throw=True) + o = PandasDataFrame( + [[{"a": "b"}, 2]], + "a:{a:str},b:int", + dict(a=1), + ) + a = e.to_df(o) + assert a is not o + res = a.as_array(type_safe=True) + assert res[0][0] == {"a": "b"} + def test_persist(self): e = self.engine o = ArrayDataFrame( diff --git a/tests/fugue_spark/utils/test_convert.py b/tests/fugue_spark/utils/test_convert.py index c75a5d74..d888d78e 100644 --- a/tests/fugue_spark/utils/test_convert.py +++ b/tests/fugue_spark/utils/test_convert.py @@ -43,8 +43,8 @@ def test_to_select_expression(): assert to_select_expression("a:int,b:str", "b:str,a:int") == ["b", "a"] assert to_select_expression("a:int,b:str", "b:str,a:long") == [ "b", "CAST(a AS bigint) a"] - assert to_select_expression("a:int,b:double,c:float", "a:str,b:str,c:str") == [ + assert to_select_expression("a:int,b:double,c:float", "a:str,b:str,c:long") == [ "CAST(a AS string) a", - "CAST(IF(isnan(b), NULL, b) AS string) b", - "CAST(IF(isnan(c), NULL, c) AS string) c"] + "CAST(IF(isnan(b) OR b IS NULL, NULL, b) AS string) b", + "CAST(IF(isnan(c) OR c IS NULL, NULL, c) AS bigint) c"] raises(KeyError, lambda: to_select_expression("a:int,b:str", "b:str,x:int"))