diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index f976eab2a..5df20f25f 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -11,11 +11,13 @@ jobs: # os: ["windows-latest", "ubuntu-latest", "macos-latest"] os: ["ubuntu-latest"] python-version: ["3.8", "3.9", "3.10"] + query-planning: [true, false] env: PYTHON_VERSION: ${{ matrix.python-version }} PARALLEL: "true" COVERAGE: "true" + DASK_DATAFRAME__QUERY_PLANNING: ${{ matrix.query-planning }} steps: - name: Checkout source @@ -39,6 +41,10 @@ jobs: shell: bash -l {0} run: source ci/install.sh + - name: Install dask-expr + if: ${{ matrix.query-planning }} + run: pip install dask-expr + - name: Run tests shell: bash -l {0} run: pytest -v diff --git a/dask_ml/ensemble/_blockwise.py b/dask_ml/ensemble/_blockwise.py index 5a108b560..bd490cd80 100644 --- a/dask_ml/ensemble/_blockwise.py +++ b/dask_ml/ensemble/_blockwise.py @@ -62,7 +62,7 @@ def _predict(self, X): dtype=np.dtype(dtype), chunks=chunks, ) - elif isinstance(X, dd._Frame): + elif isinstance(X, dd.DataFrame): meta = np.empty((0, len(self.classes_)), dtype=dtype) combined = X.map_partitions( _predict_stack, estimators=self.estimators_, meta=meta @@ -184,7 +184,7 @@ def _collect_probas(self, X): chunks=chunks, meta=meta, ) - elif isinstance(X, dd._Frame): + elif isinstance(X, dd.DataFrame): # TODO: replace with a _predict_proba_stack version. # This current raises; dask.dataframe doesn't like map_partitions that # return new axes. diff --git a/dask_ml/linear_model/utils.py b/dask_ml/linear_model/utils.py index d0beb33c5..a2ad88dcd 100644 --- a/dask_ml/linear_model/utils.py +++ b/dask_ml/linear_model/utils.py @@ -1,33 +1,64 @@ """ """ +import dask import dask.array as da import dask.dataframe as dd import numpy as np from multipledispatch import dispatch +if not dask.config.get("dataframe.query-planning"): -@dispatch(dd._Frame) -def exp(A): - return da.exp(A) + @dispatch(dd._Frame) + def exp(A): + return da.exp(A) + @dispatch(dd._Frame) + def absolute(A): + return da.absolute(A) -@dispatch(dd._Frame) -def absolute(A): - return da.absolute(A) + @dispatch(dd._Frame) + def sign(A): + return da.sign(A) + @dispatch(dd._Frame) + def log1p(A): + return da.log1p(A) -@dispatch(dd._Frame) -def sign(A): - return da.sign(A) + @dispatch(dd._Frame) # noqa: F811 + def add_intercept(X): # noqa: F811 + columns = X.columns + if "intercept" in columns: + raise ValueError("'intercept' column already in 'X'") + return X.assign(intercept=1)[["intercept"] + list(columns)] +else: -@dispatch(dd._Frame) -def log1p(A): - return da.log1p(A) + @dispatch(dd.DataFrame) + def exp(A): + return da.exp(A) + @dispatch(dd.DataFrame) + def absolute(A): + return da.absolute(A) -@dispatch(np.ndarray) -def add_intercept(X): + @dispatch(dd.DataFrame) + def sign(A): + return da.sign(A) + + @dispatch(dd.DataFrame) + def log1p(A): + return da.log1p(A) + + @dispatch(dd.DataFrame) # noqa: F811 + def add_intercept(X): # noqa: F811 + columns = X.columns + if "intercept" in columns: + raise ValueError("'intercept' column already in 'X'") + return X.assign(intercept=1)[["intercept"] + list(columns)] + + +@dispatch(np.ndarray) # noqa: F811 +def add_intercept(X): # noqa: F811 return _add_intercept(X) @@ -53,14 +84,6 @@ def add_intercept(X): # noqa: F811 return X.map_blocks(_add_intercept, dtype=X.dtype, chunks=chunks) -@dispatch(dd.DataFrame) # noqa: F811 -def add_intercept(X): # noqa: F811 - columns = X.columns - if "intercept" in columns: - raise ValueError("'intercept' column already in 'X'") - return X.assign(intercept=1)[["intercept"] + list(columns)] - - @dispatch(np.ndarray) # noqa: F811 def lr_prob_stack(prob): # noqa: F811 return np.vstack([1 - prob, prob]).T diff --git a/dask_ml/utils.py b/dask_ml/utils.py index abeaa58a0..0464574ab 100644 --- a/dask_ml/utils.py +++ b/dask_ml/utils.py @@ -212,7 +212,7 @@ def check_array( def _assert_eq(l, r, name=None, **kwargs): array_types = (np.ndarray, da.Array) - frame_types = (pd.core.generic.NDFrame, dd._Frame) + frame_types = (pd.core.generic.NDFrame, dd.DataFrame) if isinstance(l, array_types): assert_eq_ar(l, r, **kwargs) elif isinstance(l, frame_types): diff --git a/dask_ml/wrappers.py b/dask_ml/wrappers.py index 02edc1d22..ebfffcf84 100644 --- a/dask_ml/wrappers.py +++ b/dask_ml/wrappers.py @@ -241,7 +241,7 @@ def transform(self, X): return X.map_blocks( _transform, estimator=self._postfit_estimator, meta=meta ) - elif isinstance(X, dd._Frame): + elif isinstance(X, dd.DataFrame): if meta is None: # dask-dataframe relies on dd.core.no_default # for infering meta @@ -324,7 +324,7 @@ def predict(self, X): ) return result - elif isinstance(X, dd._Frame): + elif isinstance(X, dd.DataFrame): if meta is None: meta = dd.core.no_default return X.map_partitions( @@ -369,7 +369,7 @@ def predict_proba(self, X): meta=meta, chunks=(X.chunks[0], len(self._postfit_estimator.classes_)), ) - elif isinstance(X, dd._Frame): + elif isinstance(X, dd.DataFrame): if meta is None: meta = dd.core.no_default return X.map_partitions( @@ -619,7 +619,7 @@ def _first_block(dask_object): dask_object.to_delayed().flatten()[0], shape, dask_object.dtype ) - if isinstance(dask_object, dd._Frame): + if isinstance(dask_object, dd.DataFrame): return dask_object.get_partition(0) else: diff --git a/tests/conftest.py b/tests/conftest.py index 23ab05eff..8f9c16c01 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -15,6 +15,8 @@ # pytest.register_assert_rewrite('dask_ml.utils') +DASK_EXPR_ENABLED = getattr(dd, "_dask_expr_enabled", lambda: False)() + @pytest.fixture def xy_classification(): diff --git a/tests/ensemble/test_blockwise.py b/tests/ensemble/test_blockwise.py index b24a0a0ea..b26880313 100644 --- a/tests/ensemble/test_blockwise.py +++ b/tests/ensemble/test_blockwise.py @@ -7,6 +7,7 @@ import dask_ml.datasets import dask_ml.ensemble +from tests.conftest import DASK_EXPR_ENABLED class TestBlockwiseVotingClassifier: @@ -60,6 +61,9 @@ def test_bad_chunking_raises(self): # this should *really* be a ValueError... clf.fit(X, y) + @pytest.mark.skipif( + DASK_EXPR_ENABLED, reason="dask-expr computing early into np.ndarray" + ) def test_hard_voting_frame(self): X, y = dask_ml.datasets.make_classification(chunks=25) X = dd.from_dask_array(X) @@ -127,6 +131,10 @@ def test_soft_voting_array(self): score = clf.score(X, y) assert isinstance(score, float) + @pytest.mark.skipif( + DASK_EXPR_ENABLED, + reason="AttributeError: 'Scalar' object has no attribute '_chunks'", + ) def test_soft_voting_frame(self): X, y = dask_ml.datasets.make_classification(chunks=25) X = dd.from_dask_array(X) diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index 9e7c92eb0..04d9fd7a4 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -37,6 +37,7 @@ from dask_ml.model_selection._incremental import _partial_fit, _score, fit from dask_ml.model_selection.utils_test import LinearFunction, _MaybeLinearFunction from dask_ml.utils import ConstantFunction +from tests.conftest import DASK_EXPR_ENABLED pytestmark = [ pytest.mark.skipif(not DISTRIBUTED_2_5_0, reason="hangs"), @@ -229,6 +230,9 @@ def additional_calls(scores): await asyncio.sleep(0.1) +@pytest.mark.skipif( + DASK_EXPR_ENABLED, reason="TypeError: 'coroutine' object is not iterable" +) @gen_cluster(client=True) async def test_search_basic(c, s, a, b): for decay_rate, input_type, memory in itertools.product( diff --git a/tests/preprocessing/test_data.py b/tests/preprocessing/test_data.py index 7674326ac..f5f53711b 100644 --- a/tests/preprocessing/test_data.py +++ b/tests/preprocessing/test_data.py @@ -17,6 +17,7 @@ import dask_ml.preprocessing as dpp from dask_ml.datasets import make_classification from dask_ml.utils import assert_estimator_equal +from tests.conftest import DASK_EXPR_ENABLED X, y = make_classification(chunks=50) df = X.to_dask_dataframe().rename(columns=str) @@ -95,6 +96,10 @@ def test_input_types(self, dask_df, pandas_df): exclude="n_samples_seen_", ) + @pytest.mark.skipif( + DASK_EXPR_ENABLED, + reason="AttributeError: can't set attribute 'divisions'", + ) def test_inverse_transform(self): a = dpp.StandardScaler() result = a.inverse_transform(a.fit_transform(X)) @@ -432,6 +437,10 @@ def test_encode_subset_of_columns(self, daskify): tm.assert_frame_equal(result, df) + @pytest.mark.skipif( + DASK_EXPR_ENABLED, + reason="AttributeError: can't set attribute 'divisions'", + ) @pytest.mark.parametrize("daskify", [False, True]) def test_drop_first(self, daskify): if daskify: @@ -487,6 +496,10 @@ def test_inverse_transform(self): class TestOrdinalEncoder: + @pytest.mark.skipif( + DASK_EXPR_ENABLED, + reason="AttributeError: can't set attribute 'divisions'", + ) @pytest.mark.parametrize("daskify", [False, True]) @pytest.mark.parametrize("values", [True, False]) def test_basic(self, daskify, values): @@ -531,6 +544,10 @@ def test_transform_raises(self): de.transform(dummy.drop("B", axis="columns")) assert rec.match("Columns of 'X' do not match the training") + @pytest.mark.skipif( + DASK_EXPR_ENABLED, + reason="AttributeError: can't set attribute 'divisions'", + ) def test_inverse_transform(self): enc = dpp.OrdinalEncoder() df = dd.from_pandas( @@ -618,6 +635,10 @@ def test_transformed_shape(self): # dask array with nan rows assert a.transform(X_nan_rows).shape[1] == n_cols + @pytest.mark.skipif( + DASK_EXPR_ENABLED, + reason="TypeError: No dispatch for ", + ) @pytest.mark.parametrize("daskify", [False, True]) def test_df_transform(self, daskify): frame = df @@ -646,6 +667,10 @@ def test_transformer_params(self): assert pf._transformer.interaction_only is pf.interaction_only assert pf._transformer.include_bias is pf.include_bias + @pytest.mark.skipif( + DASK_EXPR_ENABLED, + reason="TypeError: No dispatch for ", + ) @pytest.mark.parametrize("daskify", [True, False]) def test_df_transform_index(self, daskify): frame = copy(df) diff --git a/tests/test_datasets.py b/tests/test_datasets.py index d221e2963..010869493 100644 --- a/tests/test_datasets.py +++ b/tests/test_datasets.py @@ -2,6 +2,7 @@ import dask import dask.array as da +import dask.dataframe as dd import numpy as np import pytest from dask.array.utils import assert_eq @@ -79,4 +80,4 @@ def test_make_classification_df(): assert len(X_df.columns) == 6 assert len(X_df) == 100 assert len(y_series) == 100 - assert isinstance(y_series, dask.dataframe.core.Series) + assert isinstance(y_series, dd.Series) diff --git a/tests/test_parallel_post_fit.py b/tests/test_parallel_post_fit.py index 540a0a215..5297be82c 100644 --- a/tests/test_parallel_post_fit.py +++ b/tests/test_parallel_post_fit.py @@ -15,6 +15,7 @@ from dask_ml.datasets import make_classification from dask_ml.utils import assert_eq_ar, assert_estimator_equal from dask_ml.wrappers import ParallelPostFit +from tests.conftest import DASK_EXPR_ENABLED def test_it_works(): @@ -53,6 +54,10 @@ def test_laziness(): assert 0 < x.compute() < 1 +@pytest.mark.skipif( + DASK_EXPR_ENABLED, + reason="AttributeError: 'MapPartitions' object has no attribute 'shape' / AttributeError: can't set attribute '_meta'", +) def test_predict_meta_override(): X = pd.DataFrame({"c_0": [1, 2, 3, 4]}) y = np.array([1, 2, 3, 4]) @@ -76,6 +81,10 @@ def test_predict_meta_override(): assert_eq_ar(result, expected) +@pytest.mark.skipif( + DASK_EXPR_ENABLED, + reason="AttributeError: 'MapPartitions' object has no attribute 'shape'", +) def test_predict_proba_meta_override(): X = pd.DataFrame({"c_0": [1, 2, 3, 4]}) y = np.array([1, 2, 3, 4]) @@ -99,6 +108,10 @@ def test_predict_proba_meta_override(): assert_eq_ar(result, expected) +@pytest.mark.skipif( + DASK_EXPR_ENABLED, + reason="AttributeError: 'Scalar' object has no attribute 'shape'", +) def test_transform_meta_override(): X = pd.DataFrame({"cat_s": ["a", "b", "c", "d"]}) dd_X = dd.from_pandas(X, npartitions=2) @@ -135,6 +148,10 @@ def test_predict_correct_output_dtype(): assert wrap_output.dtype == base_output.dtype +@pytest.mark.skipif( + DASK_EXPR_ENABLED, + reason="AttributeError: 'MapPartitions' object has no attribute 'shape'", +) @pytest.mark.parametrize("kind", ["numpy", "dask.dataframe", "dask.array"]) def test_predict(kind): X, y = make_classification(chunks=100) @@ -168,6 +185,10 @@ def test_predict(kind): assert_eq_ar(result, expected) +@pytest.mark.skipif( + DASK_EXPR_ENABLED, + reason="AttributeError: 'MapPartitions' object has no attribute 'shape'", +) @pytest.mark.parametrize("kind", ["numpy", "dask.dataframe", "dask.array"]) def test_transform(kind): X, y = make_classification(chunks=100) diff --git a/tests/test_partial.py b/tests/test_partial.py index 03e52ec84..34f17c23c 100644 --- a/tests/test_partial.py +++ b/tests/test_partial.py @@ -14,6 +14,7 @@ from dask_ml._partial import fit, predict from dask_ml.datasets import make_classification from dask_ml.wrappers import Incremental +from tests.conftest import DASK_EXPR_ENABLED x = np.array([[1, 0], [2, 0], [3, 0], [4, 0], [0, 1], [0, 2], [3, 3], [4, 4]]) @@ -89,6 +90,10 @@ def test_fit_shuffle_blocks(): ) +@pytest.mark.skipif( + DASK_EXPR_ENABLED, + reason="AttributeError: 'Scalar' object has no attribute 'shape'", +) def test_dataframes(): df = pd.DataFrame({"x": range(10), "y": [0, 1] * 5}) ddf = dd.from_pandas(df, npartitions=2)