Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid persisting Futures #1004

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 13 additions & 18 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,32 @@ jobs:
matrix:
# os: ["windows-latest", "ubuntu-latest", "macos-latest"]
os: ["ubuntu-latest"]
python-version: ["3.9", "3.10", "3.11"]
query-planning: [true, false]
python-version: ["3.10", "3.11", "3.12", "3.13"]
# query-planning: [true, false]

env:
PYTHON_VERSION: ${{ matrix.python-version }}
PARALLEL: "true"
COVERAGE: "true"
DASK_DATAFRAME__QUERY_PLANNING: ${{ matrix.query-planning }}
# DASK_DATAFRAME__QUERY_PLANNING: ${{ matrix.query-planning }}

steps:
- name: Checkout source
uses: actions/checkout@v2
with:
fetch-depth: 0 # Needed by codecov.io

- name: Setup Conda Environment
uses: conda-incubator/setup-miniconda@v2
- name: Install the latest version of uv
uses: astral-sh/setup-uv@v3
with:
miniforge-variant: Mambaforge
miniforge-version: latest
use-mamba: true
channel-priority: strict
python-version: ${{ matrix.python-version }}
environment-file: ci/environment-${{ matrix.python-version }}.yaml
activate-environment: test-environment
auto-activate-base: false

- name: Install
shell: bash -l {0}
run: source ci/install.sh
enable-cache: false

- name: Install Python
run: uv python install ${{ matrix.python-version }}

- name: Install dask-ml
run: uv pip install -e .[dev]

- name: Run tests
shell: bash -l {0}
run: pytest -v
run: uv run pytest -v
15 changes: 5 additions & 10 deletions ci/install.sh
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
uv pip install -e .[dev]

# Optionally, install development versions of dependenies
if [[ ${UPSTREAM_DEV} ]]; then
# FIXME https://github.com/mamba-org/mamba/issues/412
# mamba uninstall --force dask distributed scikit-learn
conda uninstall --force dask distributed scikit-learn

python -m pip install --no-deps --pre \
uv pip install --no-deps --pre \
-i https://pypi.anaconda.org/scipy-wheels-nightly/simple \
scikit-learn

python -m pip install \
uv pip install \
--upgrade \
git+https://github.com/dask/dask \
git+https://github.com/dask/distributed
fi

# Install dask-ml
python -m pip install --quiet --no-deps -e .

echo mamba list
mamba list
uv pip install --no-deps -e .
uv pip tree
33 changes: 31 additions & 2 deletions dask_ml/model_selection/_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,19 @@ def get_futures(partial_fit_calls):
_specs[ident] = spec

if DISTRIBUTED_2021_02_0:
_models, _scores, _specs = dask.persist(_models, _scores, _specs)
# https://github.com/dask/dask-ml/issues/1003
# We only want to persist dask collections, not Futures.
# So we build a collection without futures and bring them back later.
to_persist = {
"models": {k: v for k, v in _models.items() if not isinstance(v, Future)},
"scores": {k: v for k, v in _scores.items() if not isinstance(v, Future)},
"specs": {k: v for k, v in _specs.items() if not isinstance(v, Future)},
}
models_p, scores_p, specs_p = dask.persist(*list(to_persist.values()))
# Update with keys not present, which should just be futures
_models = {**_models, **models_p}
_scores = {**_scores, **scores_p}
_specs = {**_specs, **specs_p}
else:
_models, _scores, _specs = dask.persist(
_models, _scores, _specs, priority={tuple(_specs.values()): -1}
Expand Down Expand Up @@ -315,7 +327,24 @@ def get_futures(partial_fit_calls):
_specs[ident] = spec

if DISTRIBUTED_2021_02_0:
_models2, _scores2, _specs2 = dask.persist(_models, _scores, _specs)
# https://github.com/dask/dask-ml/issues/1003
# We only want to persist dask collections, not Futures.
# So we build a collection without futures and bring them back later.
to_persist = {
"models": {
k: v for k, v in _models.items() if not isinstance(v, Future)
},
"scores": {
k: v for k, v in _scores.items() if not isinstance(v, Future)
},
"specs": {k: v for k, v in _specs.items() if not isinstance(v, Future)},
}
models2_p, scores2_p, specs2_p = dask.persist(*list(to_persist.values()))
# Update with keys not present, which should just be futures
_models2 = {**_models, **models2_p}
_scores2 = {**_scores, **scores2_p}
_specs2 = {**_specs, **specs2_p}

else:
_models2, _scores2, _specs2 = dask.persist(
_models, _scores, _specs, priority={tuple(_specs.values()): -1}
Expand Down
Loading