Skip to content

Commit

Permalink
Change temp view name to uppercase, fix various build issues (#536)
Browse files Browse the repository at this point in the history
* Change temp view name to uppercase

* update

* update

* Update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* fix codecov
  • Loading branch information
goodwanghan authored Apr 26, 2024
1 parent fadf34d commit 48b7ab6
Show file tree
Hide file tree
Showing 23 changed files with 290 additions and 253 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/test_all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8, "3.10", "3.11"]
python-version: [3.8, "3.10"] # TODO: add back 3.11 when dask-sql is compatible

steps:
- uses: actions/checkout@v2
Expand All @@ -42,9 +42,10 @@ jobs:
run: make test
- name: "Upload coverage to Codecov"
if: matrix.python-version == '3.10'
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
with:
fail_ci_if_error: false
token: ${{ secrets.CODECOV_TOKEN }}

no_spark:
name: Tests
Expand Down
21 changes: 19 additions & 2 deletions .github/workflows/test_dask.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ jobs:
- name: Test
run: make testdask

test_dask_latest:
name: Dask Latest
test_dask_sql_latest:
name: Dask with SQL Latest
runs-on: ubuntu-latest

steps:
Expand All @@ -49,7 +49,24 @@ jobs:
python-version: "3.10"
- name: Install dependencies
run: make devenv
- name: Test
run: make testdask

test_dask_latest:
name: Dask without SQL Latest
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up Python 3.11
uses: actions/setup-python@v1
with:
python-version: "3.11"
- name: Install dependencies
run: make devenv
- name: Setup Dask
run: pip install -U dask[dataframe,distributed] pyarrow pandas
- name: Remove Dask SQL
run: pip uninstall -y dask-sql qpd fugue-sql-antlr sqlglot
- name: Test
run: make testdask
4 changes: 2 additions & 2 deletions .github/workflows/test_ray.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ concurrency:

jobs:
test_ray_lower_bound:
name: Ray 2.4.0
name: Ray 2.5.0
runs-on: ubuntu-latest

steps:
Expand All @@ -33,7 +33,7 @@ jobs:
- name: Install dependencies
run: make devenv
- name: Setup Ray
run: pip install ray[data]==2.4.0 pyarrow==6.0.1 pandas==1.5.3 'pydantic<2'
run: pip install ray[data]==2.5.0 pyarrow==7.0.0 "duckdb<0.9" pandas==1.5.3 'pydantic<2'
- name: Test
run: make testray

Expand Down
12 changes: 12 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Release Notes

## 0.9.0

- [482](https://github.com/fugue-project/fugue/issues/482) Move Fugue SQL dependencies into extra `[sql]` and functions to become soft dependencies
- [504](https://github.com/fugue-project/fugue/issues/504) Create Fugue pytest fixtures and plugins
- [541](https://github.com/fugue-project/fugue/issues/541) Change table temp view names to uppercase
- [540](https://github.com/fugue-project/fugue/issues/540) Fix Ray 2.10+ compatibility issues
- [539](https://github.com/fugue-project/fugue/issues/539) Fix compatibility issues with Dask 2024.4+
- [534](https://github.com/fugue-project/fugue/issues/534) Remove ibis version cap
- [505](https://github.com/fugue-project/fugue/issues/505) Deprecate `as_ibis` in FugueWorkflow
- [387](https://github.com/fugue-project/fugue/issues/387) Improve test coverage on 3.10, add tests for 3.11
- [269](https://github.com/fugue-project/fugue/issues/269) Spark and Dask Take 1 row without sorting optimization

## 0.8.7

- [488](https://github.com/fugue-project/fugue/issues/488) Migrate from fs to fsspec
Expand Down
2 changes: 1 addition & 1 deletion fugue/collections/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class TempTableName:
"""Generating a temporary, random and globaly unique table name"""

def __init__(self):
self.key = "_" + str(uuid4())[:5]
self.key = "_" + str(uuid4())[:5].upper()

def __repr__(self) -> str:
return _TEMP_TABLE_EXPR_PREFIX + self.key + _TEMP_TABLE_EXPR_SUFFIX
Expand Down
22 changes: 4 additions & 18 deletions fugue/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,6 @@
rename_dataframe_column_names = rename


def _pa_type_eq(t1: pa.DataType, t2: pa.DataType) -> bool:
# should ignore the name difference of list
# e.g. list<item: string> == list<l: string>
if pa.types.is_list(t1) and pa.types.is_list(t2): # pragma: no cover
return _pa_type_eq(t1.value_type, t2.value_type)
return t1 == t2


def _schema_eq(s1: Schema, s2: Schema) -> bool:
if s1 == s2:
return True
return s1.names == s2.names and all(
_pa_type_eq(f1.type, f2.type) for f1, f2 in zip(s1.fields, s2.fields)
)


def _df_eq(
df: DataFrame,
data: Any,
Expand All @@ -46,6 +30,7 @@ def _df_eq(
check_schema: bool = True,
check_content: bool = True,
no_pandas: bool = False,
equal_type_groups: Optional[List[List[Any]]] = None,
throw=False,
) -> bool:
"""Compare if two dataframes are equal. Is for internal, unit test
Expand All @@ -66,6 +51,7 @@ def _df_eq(
:param no_pandas: if true, it will compare the string representations of the
dataframes, otherwise, it will convert both to pandas dataframe to compare,
defaults to False
:param equal_type_groups: the groups to treat as equal types, defaults to None.
:param throw: if to throw error if not equal, defaults to False
:return: if they equal
"""
Expand All @@ -78,8 +64,8 @@ def _df_eq(
assert (
df1.count() == df2.count()
), f"count mismatch {df1.count()}, {df2.count()}"
assert not check_schema or _schema_eq(
df.schema, df2.schema
assert not check_schema or df.schema.is_like(
df2.schema, equal_groups=equal_type_groups
), f"schema mismatch {df.schema.pa_schema}, {df2.schema.pa_schema}"
if not check_content:
return True
Expand Down
12 changes: 11 additions & 1 deletion fugue/test/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type

from fugue.dataframe.utils import _df_eq
from triad import assert_or_throw, run_once
from triad.utils.entry_points import load_entry_point

Expand Down Expand Up @@ -160,6 +160,7 @@ def test_spark(self):

backend: Any
tmp_path: Path
equal_type_groups: Any = None

__test__ = False
_test_context: Any = None
Expand All @@ -180,6 +181,15 @@ def engine(self) -> Any:
"""The engine object inside the ``FugueTestContext``"""
return self.context.engine

def get_equal_type_groups(self) -> Optional[List[List[Any]]]:
return None # pragma: no cover

def df_eq(self, *args: Any, **kwargs: Any) -> bool:
"""A wrapper of :func:`~fugue.dataframe.utils.df_eq`"""
if "equal_type_groups" not in kwargs:
kwargs["equal_type_groups"] = self.equal_type_groups
return _df_eq(*args, **kwargs)


def fugue_test_suite(backend: Any, mark_test: Optional[bool] = None) -> Any:
def deco(cls: Type["FugueTestSuite"]) -> Type["FugueTestSuite"]:
Expand Down
13 changes: 8 additions & 5 deletions fugue_dask/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from triad.collections.dict import ParamDict
from triad.collections.schema import Schema
from triad.utils.assertion import assert_or_throw
from triad.utils.io import join, makedirs, url_to_fs
from triad.utils.io import isfile, join, makedirs, url_to_fs

from fugue._utils.io import FileParser, _get_single_files
from fugue_dask.dataframe import DaskDataFrame
Expand Down Expand Up @@ -100,9 +100,11 @@ def _save_csv(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None:


def _safe_load_csv(path: str, **kwargs: Any) -> dd.DataFrame:
if not isfile(path):
return dd.read_csv(join(path, "*.csv"), **kwargs)
try:
return dd.read_csv(path, **kwargs)
except (IsADirectoryError, PermissionError):
except (IsADirectoryError, PermissionError): # pragma: no cover
return dd.read_csv(join(path, "*.csv"), **kwargs)


Expand Down Expand Up @@ -148,11 +150,12 @@ def _save_json(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None:


def _safe_load_json(path: str, **kwargs: Any) -> dd.DataFrame:
if not isfile(path):
return dd.read_json(join(path, "*.json"), **kwargs)
try:
return dd.read_json(path, **kwargs)
except (IsADirectoryError, PermissionError):
x = dd.read_json(join(path, "*.json"), **kwargs)
return x
except (IsADirectoryError, PermissionError): # pragma: no cover
return dd.read_json(join(path, "*.json"), **kwargs)


def _load_json(
Expand Down
8 changes: 4 additions & 4 deletions fugue_dask/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def hash_repartition(df: dd.DataFrame, num: int, cols: List[Any]) -> dd.DataFram
if num < 1:
return df
if num == 1:
return df.repartition(1)
return df.repartition(npartitions=1)
df = df.reset_index(drop=True).clear_divisions()
idf, ct = _add_hash_index(df, num, cols)
return _postprocess(idf, ct, num)
Expand All @@ -76,7 +76,7 @@ def even_repartition(df: dd.DataFrame, num: int, cols: List[Any]) -> dd.DataFram
the number of partitions will be the number of groups.
"""
if num == 1:
return df.repartition(1)
return df.repartition(npartitions=1)
if len(cols) == 0 and num <= 0:
return df
df = df.reset_index(drop=True).clear_divisions()
Expand Down Expand Up @@ -111,7 +111,7 @@ def rand_repartition(
if num < 1:
return df
if num == 1:
return df.repartition(1)
return df.repartition(npartitions=1)
df = df.reset_index(drop=True).clear_divisions()
if len(cols) == 0:
idf, ct = _add_random_index(df, num=num, seed=seed)
Expand All @@ -124,7 +124,7 @@ def rand_repartition(
def _postprocess(idf: dd.DataFrame, ct: int, num: int) -> dd.DataFrame:
parts = min(ct, num)
if parts <= 1:
return idf.repartition(1)
return idf.repartition(npartitions=1)
divisions = list(np.arange(ct, step=math.ceil(ct / parts)))
divisions.append(ct - 1)
return idf.repartition(divisions=divisions, force=True)
Expand Down
1 change: 1 addition & 0 deletions fugue_duckdb/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def _load_csv( # noqa: C901
else:
if header:
kw["ALL_VARCHAR"] = 1
kw["auto_detect"] = 1
if columns is None:
cols = "*"
elif isinstance(columns, list):
Expand Down
15 changes: 11 additions & 4 deletions fugue_ibis/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from ._utils import to_ibis_schema
from .dataframe import IbisDataFrame

_JOIN_RIGHT_SUFFIX = "_ibis_y__"
_GEN_TABLE_NAMES = (f"_fugue_temp_table_{i:d}" for i in itertools.count())
_JOIN_RIGHT_SUFFIX = "_ibis_y__".upper()
_GEN_TABLE_NAMES = (f"_fugue_temp_table_{i:d}".upper() for i in itertools.count())


class IbisSQLEngine(SQLEngine):
Expand Down Expand Up @@ -224,7 +224,7 @@ def take(
_presort = parse_presort_exp(presort)
else:
_presort = partition_spec.presort
tbn = "_temp"
tbn = "_TEMP"
idf = self.to_df(df)

if len(_presort) == 0:
Expand All @@ -233,9 +233,10 @@ def take(
pcols = ", ".join(
self.encode_column_name(x) for x in partition_spec.partition_by
)
dummy_order_by = self._dummy_window_order_by()
sql = (
f"SELECT * FROM ("
f"SELECT *, ROW_NUMBER() OVER (PARTITION BY {pcols}) "
f"SELECT *, ROW_NUMBER() OVER (PARTITION BY {pcols} {dummy_order_by}) "
f"AS __fugue_take_param FROM {tbn}"
f") WHERE __fugue_take_param<={n}"
)
Expand Down Expand Up @@ -290,6 +291,12 @@ def save_table(
def load_table(self, table: str, **kwargs: Any) -> DataFrame:
return self.to_df(self.backend.table(table))

def _dummy_window_order_by(self) -> str:
"""Return a dummy window order by clause, this is required for
some SQL backends when there is no real order by clause in window
"""
return ""


class IbisMapEngine(MapEngine):
"""IbisExecutionEngine's MapEngine, it is a wrapper of the map engine
Expand Down
7 changes: 3 additions & 4 deletions fugue_ray/_constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any, Dict

import ray
from packaging import version

FUGUE_RAY_CONF_SHUFFLE_PARTITIONS = "fugue.ray.shuffle.partitions"
FUGUE_RAY_DEFAULT_PARTITIONS = "fugue.ray.default.partitions"
Expand All @@ -12,8 +13,6 @@
FUGUE_RAY_DEFAULT_PARTITIONS: 0,
FUGUE_RAY_ZERO_COPY: True,
}
RAY_VERSION = version.parse(ray.__version__)

if ray.__version__ >= "2.3":
_ZERO_COPY: Dict[str, Any] = {"zero_copy_batch": True}
else: # pragma: no cover
_ZERO_COPY = {}
_ZERO_COPY: Dict[str, Any] = {"zero_copy_batch": True}
31 changes: 10 additions & 21 deletions fugue_ray/_utils/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import pandas as pd
import pyarrow as pa
import ray
import ray.data as rd
from triad import Schema

Expand Down Expand Up @@ -31,31 +30,21 @@ def get_dataset_format(df: rd.Dataset) -> Tuple[Optional[str], rd.Dataset]:
df = materialize(df)
if df.count() == 0:
return None, df
if ray.__version__ < "2.5.0": # pragma: no cover
if hasattr(df, "_dataset_format"): # pragma: no cover
return df._dataset_format(), df # ray<2.2
ctx = rd.context.DatasetContext.get_current()
ctx.use_streaming_executor = False
return df.dataset_format(), df # ray>=2.2
else:
schema = df.schema(fetch_if_missing=True)
if schema is None: # pragma: no cover
return None, df
if isinstance(schema.base_schema, pa.Schema):
return "arrow", df
return "pandas", df
schema = df.schema(fetch_if_missing=True)
if schema is None: # pragma: no cover
return None, df
if isinstance(schema.base_schema, pa.Schema):
return "arrow", df
return "pandas", df


def to_schema(schema: Any) -> Schema: # pragma: no cover
if isinstance(schema, pa.Schema):
return Schema(schema)
if ray.__version__ >= "2.5.0":
if isinstance(schema, rd.Schema):
if hasattr(schema, "base_schema") and isinstance(
schema.base_schema, pa.Schema
):
return Schema(schema.base_schema)
return Schema(list(zip(schema.names, schema.types)))
if isinstance(schema, rd.Schema):
if hasattr(schema, "base_schema") and isinstance(schema.base_schema, pa.Schema):
return Schema(schema.base_schema)
return Schema(list(zip(schema.names, schema.types)))
raise ValueError(f"{schema} is not supported")


Expand Down
Loading

0 comments on commit 48b7ab6

Please sign in to comment.