-
-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add IO fusion if we can reduce number of partitions #327
Changes from all commits
32d6d34
c2e18ce
101748e
f358472
1dc9ca5
d2f1c28
55f5910
594a4b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ | |
import functools | ||
import math | ||
|
||
from dask.dataframe import methods | ||
from dask.dataframe.core import is_dataframe_like | ||
from dask.dataframe.io.io import sorted_division_locations | ||
|
||
|
@@ -50,6 +51,10 @@ def _layer(self): | |
class BlockwiseIO(Blockwise, IO): | ||
_absorb_projections = False | ||
|
||
@functools.cached_property | ||
def _fusion_compression_factor(self): | ||
return 1 | ||
|
||
def _simplify_up(self, parent): | ||
if ( | ||
self._absorb_projections | ||
|
@@ -121,6 +126,37 @@ def _combine_similar(self, root: Expr): | |
return | ||
|
||
|
||
class FusedIO(BlockwiseIO): | ||
_parameters = ["expr"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose we will still generate the graph for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes in |
||
|
||
@functools.cached_property | ||
def _meta(self): | ||
return self.operand("expr")._meta | ||
|
||
@functools.cached_property | ||
def npartitions(self): | ||
return len(self._fusion_buckets) | ||
|
||
def _divisions(self): | ||
divisions = self.operand("expr")._divisions() | ||
new_divisions = [divisions[b[0]] for b in self._fusion_buckets] | ||
new_divisions.append(self._fusion_buckets[-1][-1]) | ||
return tuple(new_divisions) | ||
|
||
def _task(self, index: int): | ||
expr = self.operand("expr") | ||
bucket = self._fusion_buckets[index] | ||
return (methods.concat, [expr._filtered_task(i) for i in bucket]) | ||
|
||
@functools.cached_property | ||
def _fusion_buckets(self): | ||
step = math.ceil(1 / self.operand("expr")._fusion_compression_factor) | ||
partitions = self.operand("expr")._partitions | ||
npartitions = len(partitions) | ||
buckets = [partitions[i : i + step] for i in range(0, npartitions, step)] | ||
return buckets | ||
|
||
|
||
class FromPandas(PartitionsFiltered, BlockwiseIO): | ||
"""The only way today to get a real dataframe""" | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
from __future__ import annotations | ||
|
||
import contextlib | ||
import functools | ||
import itertools | ||
import operator | ||
import warnings | ||
|
@@ -642,6 +643,13 @@ def _update_length_statistics(self): | |
stat["num-rows"] for stat in _collect_pq_statistics(self) | ||
) | ||
|
||
@functools.cached_property | ||
def _fusion_compression_factor(self): | ||
if self.operand("columns") is None: | ||
return 1 | ||
nr_original_columns = len(self._dataset_info["schema"].names) - 1 | ||
return len(_convert_to_list(self.operand("columns"))) / nr_original_columns | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this shouldn't be a property or an instance method, but is instead some logic that we do within the fusion operation. It seems to be only relevant for that optimization. Is that correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but the factor calculation will be different for different I/O methods, so it's tricky coming up with a general mechanism that won't need a bunch of special casing. E.g. CSV will need different logic than here |
||
|
||
|
||
# | ||
# Helper functions | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this could be implemented with the following:
I might be missing something here though.
Maybe this is part of my confusion here. This seems like an optimization that we always want to make on an expression type. I think that that means that we don't really need to use the Expression stuff at all. This could be a method on
IO
.Sometimes people get used to using a fancy form of optimization that they forget about the simpler approaches they have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No sorry that won't work in simplify (see below). Happy to chat if my explanation is confusion. it has to come after combine_similar.