Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IO fusion if we can reduce number of partitions #327

Merged
merged 8 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion dask_expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2197,6 +2197,7 @@ def optimize(expr: Expr, combine_similar: bool = True, fuse: bool = True) -> Exp
result = result.combine_similar()

if fuse:
result = optimize_io_fusion(result)
result = optimize_blockwise_fusion(result)

return result
Expand Down Expand Up @@ -2239,6 +2240,40 @@ def are_co_aligned(*exprs):
## Utilites for Expr fusion


def optimize_io_fusion(expr):
"""Traverse the expression graph and apply fusion to the I/O layer that squashes
partitions together if possible."""

def _fusion_pass(expr):
new_operands = []
changed = False
for operand in expr.operands:
if isinstance(operand, Expr):
if (
isinstance(operand, BlockwiseIO)
and operand._fusion_compression_factor < 1
):
new = FusedIO(operand)
elif isinstance(operand, BlockwiseIO):
new = operand
else:
new = _fusion_pass(operand)

if new._name != operand._name:
changed = True
else:
new = operand
new_operands.append(new)

if changed:
expr = type(expr)(*new_operands)

return expr

expr = _fusion_pass(expr)
return expr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this could be implemented with the following:

class BlockwiseIO:
    def simplify_down(self):
        if self._fusion_compression_factor < 1:
            return FusedIO(self)

I might be missing something here though.

Maybe this is part of my confusion here. This seems like an optimization that we always want to make on an expression type. I think that that means that we don't really need to use the Expression stuff at all. This could be a method on IO.

Sometimes people get used to using a fancy form of optimization that they forget about the simpler approaches they have.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No sorry that won't work in simplify (see below). Happy to chat if my explanation is confusion. it has to come after combine_similar.



def optimize_blockwise_fusion(expr):
"""Traverse the expression graph and apply fusion"""

Expand Down Expand Up @@ -2472,4 +2507,4 @@ def _execute_task(graph, name, *deps):
Sum,
Var,
)
from dask_expr.io import IO, BlockwiseIO
from dask_expr.io import IO, BlockwiseIO, FusedIO
36 changes: 36 additions & 0 deletions dask_expr/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import functools
import math

from dask.dataframe import methods
from dask.dataframe.core import is_dataframe_like
from dask.dataframe.io.io import sorted_division_locations

Expand Down Expand Up @@ -50,6 +51,10 @@ def _layer(self):
class BlockwiseIO(Blockwise, IO):
_absorb_projections = False

@functools.cached_property
def _fusion_compression_factor(self):
return 1

def _simplify_up(self, parent):
if (
self._absorb_projections
Expand Down Expand Up @@ -121,6 +126,37 @@ def _combine_similar(self, root: Expr):
return


class FusedIO(BlockwiseIO):
_parameters = ["expr"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we will still generate the graph for self.operand("expr") when FusedIO.__dask_graph__ is called.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes in _task


@functools.cached_property
def _meta(self):
return self.operand("expr")._meta

@functools.cached_property
def npartitions(self):
return len(self._fusion_buckets)

def _divisions(self):
divisions = self.operand("expr")._divisions()
new_divisions = [divisions[b[0]] for b in self._fusion_buckets]
new_divisions.append(self._fusion_buckets[-1][-1])
return tuple(new_divisions)

def _task(self, index: int):
expr = self.operand("expr")
bucket = self._fusion_buckets[index]
return (methods.concat, [expr._filtered_task(i) for i in bucket])

@functools.cached_property
def _fusion_buckets(self):
step = math.ceil(1 / self.operand("expr")._fusion_compression_factor)
partitions = self.operand("expr")._partitions
npartitions = len(partitions)
buckets = [partitions[i : i + step] for i in range(0, npartitions, step)]
return buckets


class FromPandas(PartitionsFiltered, BlockwiseIO):
"""The only way today to get a real dataframe"""

Expand Down
8 changes: 8 additions & 0 deletions dask_expr/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import contextlib
import functools
import itertools
import operator
import warnings
Expand Down Expand Up @@ -642,6 +643,13 @@ def _update_length_statistics(self):
stat["num-rows"] for stat in _collect_pq_statistics(self)
)

@functools.cached_property
def _fusion_compression_factor(self):
if self.operand("columns") is None:
return 1
nr_original_columns = len(self._dataset_info["schema"].names) - 1
return len(_convert_to_list(self.operand("columns"))) / nr_original_columns
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this shouldn't be a property or an instance method, but is instead some logic that we do within the fusion operation. It seems to be only relevant for that optimization. Is that correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the factor calculation will be different for different I/O methods, so it's tricky coming up with a general mechanism that won't need a bunch of special casing.

E.g. CSV will need different logic than here



#
# Helper functions
Expand Down
7 changes: 7 additions & 0 deletions dask_expr/io/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ def test_predicate_pushdown_compound(tmpdir):
assert_eq(y, z)


def test_io_fusion_blockwise(tmpdir):
pdf = lib.DataFrame({c: range(10) for c in "abcdefghijklmn"})
dd.from_pandas(pdf, 2).to_parquet(tmpdir)
df = read_parquet(tmpdir)["a"].fillna(10).optimize()
assert df.npartitions == 1


@pytest.mark.parametrize("fmt", ["parquet", "csv", "pandas"])
def test_io_culling(tmpdir, fmt):
pdf = lib.DataFrame({c: range(10) for c in "abcde"})
Expand Down
Loading