Skip to content

Commit

Permalink
Remove legacy conversion functions (#1177)
Browse files Browse the repository at this point in the history
  • Loading branch information
phofl authored Dec 20, 2024
1 parent d7577a2 commit edb6fd5
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 79 deletions.
33 changes: 29 additions & 4 deletions dask_expr/_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,37 @@ def create_array_collection(expr):
# to infer that we want to create an array is the only way that is guaranteed
# to be a general solution.
# We can get rid of this when we have an Array expression
from dask.dataframe.core import new_dd_object
from dask.highlevelgraph import HighLevelGraph
from dask.layers import Blockwise

result = expr.optimize()
return new_dd_object(
result.__dask_graph__(), result._name, result._meta, result.divisions
)
dsk = result.__dask_graph__()
name = result._name
meta = result._meta
divisions = result.divisions
import dask.array as da

chunks = ((np.nan,) * (len(divisions) - 1),) + tuple((d,) for d in meta.shape[1:])
if len(chunks) > 1:
if isinstance(dsk, HighLevelGraph):
layer = dsk.layers[name]
else:
# dask-expr provides a dict only
layer = dsk
if isinstance(layer, Blockwise):
layer.new_axes["j"] = chunks[1][0]
layer.output_indices = layer.output_indices + ("j",)
else:
from dask._task_spec import Alias, Task

suffix = (0,) * (len(chunks) - 1)
for i in range(len(chunks[0])):
task = layer.get((name, i))
new_key = (name, i) + suffix
if isinstance(task, Task):
task = Alias(new_key, task.key)
layer[new_key] = task
return da.Array(dsk, name=name, chunks=chunks, dtype=meta.dtype)


@get_collection_type.register(np.ndarray)
Expand Down
59 changes: 13 additions & 46 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@
from dask.dataframe.core import (
_concat,
_convert_to_numeric,
_Frame,
_repr_data_series,
_sqrt_and_convert_to_timedelta,
check_divisions,
has_parallel_type,
is_arraylike,
is_dataframe_like,
is_series_like,
meta_warning,
new_dd_object,
)
from dask.dataframe.dispatch import is_categorical_dtype, make_meta, meta_nonempty
from dask.dataframe.dispatch import (
get_parallel_type,
is_categorical_dtype,
make_meta,
meta_nonempty,
)
from dask.dataframe.multi import warn_dtype_mismatch
from dask.dataframe.utils import (
AttributeNotImplementedError,
Expand All @@ -52,6 +54,7 @@
derived_from,
get_default_shuffle_method,
get_meta_library,
is_arraylike,
key_split,
maybe_pluralize,
memory_repr,
Expand Down Expand Up @@ -1370,25 +1373,6 @@ def repartition(
Repartition(self, npartitions, divisions, force, partition_size, freq)
)

def to_legacy_dataframe(self, optimize: bool = True, **optimize_kwargs) -> _Frame:
"""Convert to a legacy dask-dataframe collection
Parameters
----------
optimize
Whether to optimize the underlying `Expr` object before conversion.
**optimize_kwargs
Key-word arguments to pass through to `optimize`.
"""
warnings.warn(
"to_legacy_dataframe is deprecated and will be removed in a future release. "
"The legacy implementation as a whole is deprecated and will be removed, making "
"this method unnecessary.",
FutureWarning,
)
df = self.optimize(**optimize_kwargs) if optimize else self
return new_dd_object(df.dask, df._name, df._meta, df.divisions)

def to_dask_array(
self, lengths=None, meta=None, optimize: bool = True, **optimize_kwargs
) -> Array:
Expand Down Expand Up @@ -5052,28 +5036,6 @@ def from_dict(
)


def from_legacy_dataframe(ddf: _Frame, optimize: bool = True) -> FrameBase:
"""Create a dask-expr collection from a legacy dask-dataframe collection
Parameters
----------
optimize
Whether to optimize the graph before conversion.
"""
warnings.warn(
"from_legacy_dataframe is deprecated and will be removed in a future release. "
"The legacy implementation as a whole is deprecated and will be removed, making "
"this method unnecessary.",
FutureWarning,
)
graph = ddf.dask
if optimize:
graph = ddf.__dask_optimize__(graph, ddf.__dask_keys__())
return from_graph(
graph, ddf._meta, ddf.divisions, ddf.__dask_keys__(), key_split(ddf._name)
)


def from_dask_array(x, columns=None, index=None, meta=None):
"""Create a Dask DataFrame from a Dask Array.
Expand Down Expand Up @@ -5793,7 +5755,7 @@ def merge_asof(
del kwargs["on"]

for o in [left_on, right_on]:
if isinstance(o, _Frame):
if isinstance(o, FrameBase):
raise NotImplementedError(
"Dask collections not currently allowed in merge columns"
)
Expand Down Expand Up @@ -6544,3 +6506,8 @@ def _compute_partition_stats(
return (mins, maxes, lens)
else:
return (non_empty_mins, non_empty_maxes, lens)


@get_parallel_type.register(FrameBase)
def get_parallel_type_frame(o):
return get_parallel_type(o._meta)
3 changes: 1 addition & 2 deletions dask_expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@
is_dataframe_like,
is_index_like,
is_series_like,
make_meta,
pd_split,
safe_head,
total_mem_usage,
)
from dask.dataframe.dispatch import meta_nonempty
from dask.dataframe.dispatch import make_meta, meta_nonempty
from dask.dataframe.rolling import CombinedOutput, _head_timedelta, overlap_chunk
from dask.dataframe.shuffle import drop_overlap, get_overlap
from dask.dataframe.utils import (
Expand Down
2 changes: 1 addition & 1 deletion dask_expr/_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
from dask._task_spec import Task
from dask.core import flatten
from dask.dataframe.core import (
GROUP_KEYS_DEFAULT,
_concat,
apply_and_enforce,
is_dataframe_like,
is_series_like,
)
from dask.dataframe.dispatch import concat, make_meta, meta_nonempty
from dask.dataframe.groupby import (
GROUP_KEYS_DEFAULT,
_agg_finalize,
_aggregate_docstring,
_apply_chunk,
Expand Down
3 changes: 1 addition & 2 deletions dask_expr/_reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
is_dataframe_like,
is_index_like,
is_series_like,
make_meta,
meta_nonempty,
total_mem_usage,
)
from dask.dataframe.dispatch import make_meta, meta_nonempty
from dask.typing import no_default
from dask.utils import M, apply, funcname

Expand Down
4 changes: 2 additions & 2 deletions dask_expr/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import tlz as toolz
from dask import compute
from dask._task_spec import Task, TaskRef
from dask.dataframe.core import _concat, make_meta
from dask.dataframe.dispatch import is_categorical_dtype
from dask.dataframe.core import _concat
from dask.dataframe.dispatch import is_categorical_dtype, make_meta
from dask.dataframe.shuffle import (
barrier,
collect,
Expand Down
3 changes: 2 additions & 1 deletion dask_expr/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from dask._task_spec import List, Task
from dask.dataframe import methods
from dask.dataframe._pyarrow import to_pyarrow_string
from dask.dataframe.core import apply_and_enforce, is_dataframe_like, make_meta
from dask.dataframe.core import apply_and_enforce, is_dataframe_like
from dask.dataframe.dispatch import make_meta
from dask.dataframe.io.io import _meta_from_array, sorted_division_locations
from dask.typing import Key
from dask.utils import funcname, is_series_like
Expand Down
22 changes: 1 addition & 21 deletions dask_expr/io/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
from_array,
from_dask_array,
from_dict,
from_legacy_dataframe,
from_map,
from_pandas,
optimize,
read_csv,
read_parquet,
)
from dask_expr._expr import Expr, Replace
from dask_expr._expr import Replace
from dask_expr.io import FromArray, FromMap, ReadParquet, parquet
from dask_expr.tests._util import _backend_library

Expand Down Expand Up @@ -227,25 +226,6 @@ def test_parquet_complex_filters(tmpdir):
assert_eq(got.optimize(), expect)


@pytest.mark.parametrize("optimize", [True, False])
def test_from_legacy_dataframe(optimize):
ddf = dd.from_dict({"a": range(100)}, npartitions=10)
with pytest.warns(FutureWarning, match="is deprecated"):
df = from_legacy_dataframe(ddf, optimize=optimize)
assert isinstance(df.expr, Expr)
assert_eq(df, ddf)


@pytest.mark.parametrize("optimize", [True, False])
def test_to_legacy_dataframe(optimize):
pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]})
df = from_pandas(pdf, npartitions=2)
with pytest.warns(FutureWarning, match="is deprecated"):
ddf = df.to_legacy_dataframe(optimize=optimize)
assert isinstance(ddf, dd.core.DataFrame)
assert_eq(df, ddf)


@pytest.mark.parametrize("optimize", [True, False])
def test_to_dask_array(optimize):
pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]})
Expand Down

0 comments on commit edb6fd5

Please sign in to comment.