Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make simplify and lower optional within Expr.optimize #326

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,18 @@ def simplify(self):
def lower_once(self):
return new_collection(self.expr.lower_once())

def optimize(self, combine_similar: bool = True, fuse: bool = True):
def optimize(
self,
lower: bool = True,
combine_similar: bool = True,
fuse: bool = True,
):
return new_collection(
self.expr.optimize(combine_similar=combine_similar, fuse=fuse)
self.expr.optimize(
lower=lower,
combine_similar=combine_similar,
fuse=fuse,
)
)

@property
Expand Down
27 changes: 20 additions & 7 deletions dask_expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2160,43 +2160,56 @@ def normalize_expression(expr):
return expr._name


def optimize(expr: Expr, combine_similar: bool = True, fuse: bool = True) -> Expr:
def optimize(
expr: Expr,
lower: bool = True,
combine_similar: bool = True,
fuse: bool = True,
) -> Expr:
"""High level query optimization

This leverages three optimization passes:

1. Class based simplification using the ``_simplify`` function and methods
2. Combine similar operations
3. Blockwise fusion
1. Lowering of abstract expressions
2. Class based simplification using the ``_simplify`` function and methods
3. Combine similar operations
4. Blockwise fusion

Parameters
----------
expr:
Input expression to optimize
lower:
whether or not to lower abstract expressions
combine_similar:
whether or not to combine similar operations
(like `ReadParquet`) to aggregate redundant work.
fuse:
whether or not to turn on blockwise fusion
whether or not to turn on blockwise fusion.
Fusion also requires ``lower == True``.

See Also
--------
simplify
lower_once
combine_similar
optimize_blockwise_fusion
"""

result = expr
while True:
out = result.simplify().lower_once()
if lower:
out = result.simplify().lower_once()
else:
out = result.simplify()
if out._name == result._name:
break
result = out

if combine_similar:
result = result.combine_similar()

if fuse:
if fuse and lower:
result = optimize_blockwise_fusion(result)

return result
Expand Down
37 changes: 37 additions & 0 deletions dask_expr/tests/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1256,3 +1256,40 @@ def test_replace_filtered_combine_similar():
# Filter expressions (and not the other way around)
similar = list(df.find_operations(Replace))
assert all(isinstance(op.frame, Filter) for op in similar)


@pytest.mark.parametrize("fuse", [True, False])
@pytest.mark.parametrize("combine_similar", [True, False])
def test_optimize_lower_false(fuse, combine_similar):
from dask_expr._shuffle import SortValues

pdf = lib.DataFrame({"a": [1, 6, 2, 4, 5, 3], "b": 1, "c": 2})

df = from_pandas(pdf, npartitions=2)
df = df.sort_values("a")
df = df[df.a > 3]
df = df.replace(5, 4)

expect = pdf.sort_values("a")
expect = expect[expect.a > 3]
expect = expect.replace(5, 4)

result = df.optimize(
lower=False,
fuse=fuse,
combine_similar=combine_similar,
)

# Check that SortValues was not lowered
assert result.find_operations(SortValues)

# Check that we get correct results
# (with and without the same compute args)
assert_eq(result, expect)
assert_eq(
result.compute(
fuse=fuse,
combine_similar=combine_similar,
),
expect,
)
Loading