Skip to content

Commit

Permalink
Add IO fusion if we can reduce number of partitions (#327)
Browse files Browse the repository at this point in the history
  • Loading branch information
phofl authored Oct 11, 2023
1 parent 86b7a07 commit 044617b
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 1 deletion.
37 changes: 36 additions & 1 deletion dask_expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2197,6 +2197,7 @@ def optimize(expr: Expr, combine_similar: bool = True, fuse: bool = True) -> Exp
result = result.combine_similar()

if fuse:
result = optimize_io_fusion(result)
result = optimize_blockwise_fusion(result)

return result
Expand Down Expand Up @@ -2239,6 +2240,40 @@ def are_co_aligned(*exprs):
## Utilites for Expr fusion


def optimize_io_fusion(expr):
"""Traverse the expression graph and apply fusion to the I/O layer that squashes
partitions together if possible."""

def _fusion_pass(expr):
new_operands = []
changed = False
for operand in expr.operands:
if isinstance(operand, Expr):
if (
isinstance(operand, BlockwiseIO)
and operand._fusion_compression_factor < 1
):
new = FusedIO(operand)
elif isinstance(operand, BlockwiseIO):
new = operand
else:
new = _fusion_pass(operand)

if new._name != operand._name:
changed = True
else:
new = operand
new_operands.append(new)

if changed:
expr = type(expr)(*new_operands)

return expr

expr = _fusion_pass(expr)
return expr


def optimize_blockwise_fusion(expr):
"""Traverse the expression graph and apply fusion"""

Expand Down Expand Up @@ -2472,4 +2507,4 @@ def _execute_task(graph, name, *deps):
Sum,
Var,
)
from dask_expr.io import IO, BlockwiseIO
from dask_expr.io import IO, BlockwiseIO, FusedIO
36 changes: 36 additions & 0 deletions dask_expr/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import functools
import math

from dask.dataframe import methods
from dask.dataframe.core import is_dataframe_like
from dask.dataframe.io.io import sorted_division_locations

Expand Down Expand Up @@ -50,6 +51,10 @@ def _layer(self):
class BlockwiseIO(Blockwise, IO):
_absorb_projections = False

@functools.cached_property
def _fusion_compression_factor(self):
return 1

def _simplify_up(self, parent):
if (
self._absorb_projections
Expand Down Expand Up @@ -121,6 +126,37 @@ def _combine_similar(self, root: Expr):
return


class FusedIO(BlockwiseIO):
_parameters = ["expr"]

@functools.cached_property
def _meta(self):
return self.operand("expr")._meta

@functools.cached_property
def npartitions(self):
return len(self._fusion_buckets)

def _divisions(self):
divisions = self.operand("expr")._divisions()
new_divisions = [divisions[b[0]] for b in self._fusion_buckets]
new_divisions.append(self._fusion_buckets[-1][-1])
return tuple(new_divisions)

def _task(self, index: int):
expr = self.operand("expr")
bucket = self._fusion_buckets[index]
return (methods.concat, [expr._filtered_task(i) for i in bucket])

@functools.cached_property
def _fusion_buckets(self):
step = math.ceil(1 / self.operand("expr")._fusion_compression_factor)
partitions = self.operand("expr")._partitions
npartitions = len(partitions)
buckets = [partitions[i : i + step] for i in range(0, npartitions, step)]
return buckets


class FromPandas(PartitionsFiltered, BlockwiseIO):
"""The only way today to get a real dataframe"""

Expand Down
8 changes: 8 additions & 0 deletions dask_expr/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import contextlib
import functools
import itertools
import operator
import warnings
Expand Down Expand Up @@ -642,6 +643,13 @@ def _update_length_statistics(self):
stat["num-rows"] for stat in _collect_pq_statistics(self)
)

@functools.cached_property
def _fusion_compression_factor(self):
if self.operand("columns") is None:
return 1
nr_original_columns = len(self._dataset_info["schema"].names) - 1
return len(_convert_to_list(self.operand("columns"))) / nr_original_columns


#
# Helper functions
Expand Down
7 changes: 7 additions & 0 deletions dask_expr/io/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ def test_predicate_pushdown_compound(tmpdir):
assert_eq(y, z)


def test_io_fusion_blockwise(tmpdir):
pdf = lib.DataFrame({c: range(10) for c in "abcdefghijklmn"})
dd.from_pandas(pdf, 2).to_parquet(tmpdir)
df = read_parquet(tmpdir)["a"].fillna(10).optimize()
assert df.npartitions == 1


@pytest.mark.parametrize("fmt", ["parquet", "csv", "pandas"])
def test_io_culling(tmpdir, fmt):
pdf = lib.DataFrame({c: range(10) for c in "abcde"})
Expand Down

0 comments on commit 044617b

Please sign in to comment.