Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IO fusion if we can reduce number of partitions #327

Merged
merged 8 commits into from
Oct 11, 2023
Merged

Conversation

phofl
Copy link
Collaborator

@phofl phofl commented Oct 11, 2023

We end up with a lot of small chunks if we can drop lots of columns from the parquet files. This is especially bad for P2P based merges, since small chunks slow us down there. Squashing the partitions together solves that problem so that we end up with the original chunksize

@mrocklin
Copy link
Member

This structure of containing one IO expr in another seems fine, but also a little complicated.

In an ideal world I think that the IO expressions, like ReadParquet, would have an ability to group multiple partitions together, and that we would use those parameters instead of FusedIO. Thoughts? Maybe this can be achieved with a convention among IO expressions?

@phofl
Copy link
Collaborator Author

phofl commented Oct 11, 2023

Having this abstraction made the generic implementation easier since we don't have to worry how the single partitions are created in the first place. I agree with you generally though, but I think it's easier to figure this out once we have more I/O connectors?

Comment on lines 55 to 56
def _factor(self):
return 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could probably use a better name, or an explanatory docstring

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point

if self.operand("columns") is None:
return 1
nr_original_columns = len(self._dataset_info["schema"].names) - 1
return len(_convert_to_list(self.operand("columns"))) / nr_original_columns
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this shouldn't be a property or an instance method, but is instead some logic that we do within the fusion operation. It seems to be only relevant for that optimization. Is that correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the factor calculation will be different for different I/O methods, so it's tricky coming up with a general mechanism that won't need a bunch of special casing.

E.g. CSV will need different logic than here

@mrocklin
Copy link
Member

Having this abstraction made the generic implementation easier since we don't have to worry how the single partitions are created in the first place. I agree with you generally though, but I think it's easier to figure this out once we have more I/O connectors?

Maybe if we're thinking short-term then we just do this on ReadParquet and ReadCSV in a less general purpose way? This doesn't even need to be an optimization, but could instead be a default behavior on the class itself.

I don't fundamentally disagree with the approach taken here, but it seems like using a big gun to solve a specific problem. I suspect that a less sophisticated solution would be as effective with what we have today. If we find that it's hard to do this on each IO class consistently then I think it would make a lot of sense to do something generic like this.

In practice I think it's not a big deal today, but I'm imagining the developer of next year who is presented with this FusedIO thing and then needs to figure out why it exists.

@mrocklin
Copy link
Member

Not a big deal. I'm happy to be overruled on this. It just feels a little off to me.

@phofl
Copy link
Collaborator Author

phofl commented Oct 11, 2023

There is another issue, the fusion we are doing here needs to happen after combine similar but before the blockwise fusion happens. We basically need another optimisation step and I don't think that we can encapsulate this into the ReadParquet class, we would have to subclass to override the divisions calculation. I think this is the cleanest solution, but I might be very wrong as well.

return expr

expr = _fusion_pass(expr)
return expr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this could be implemented with the following:

class BlockwiseIO:
    def simplify_down(self):
        if self._fusion_compression_factor < 1:
            return FusedIO(self)

I might be missing something here though.

Maybe this is part of my confusion here. This seems like an optimization that we always want to make on an expression type. I think that that means that we don't really need to use the Expression stuff at all. This could be a method on IO.

Sometimes people get used to using a fancy form of optimization that they forget about the simpler approaches they have.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No sorry that won't work in simplify (see below). Happy to chat if my explanation is confusion. it has to come after combine_similar.

@mrocklin
Copy link
Member

There is another issue, the fusion we are doing here needs to happen after combine similar but before the blockwise fusion happens. We basically need another optimisation step and I don't think that we can encapsulate this into the ReadParquet class, we would have to subclass to override the divisions calculation. I think this is the cleanest solution, but I might be very wrong as well

I'm not sure I understand. So let's say that after various optimizaions, we have the following expression:

ReadParquet(path=..., columns=["a", "b"])

For a dataset with ten columns. OK, while normally this expression would report that it has 100 partitions, it now reports that it has 20, and every task that it emits reads five row groups.

@phofl
Copy link
Collaborator Author

phofl commented Oct 11, 2023

After simplify but before combine_similar we might have 5 different operations that read from the same parquet file

ReadParquet(path=x, columns=["a", "b"])
ReadParquet(path=x, columns=["c", "b"])
ReadParquet(path=x, columns=["a"])
ReadParquet(path=x, columns=["a", "b", "c"])

Combine_similar squashes all of them together so that every IO op from "x" is

ReadParquet(path=x, columns=["a", "b", "c"])

We can not determine the compression factor before we remove repeated reads from storage and create the union of columns that we need. So this fusion step has to come after combine_similar.

I am open to add a step that's similar to simplify, but we can't combine both of them together

@mrocklin
Copy link
Member

Sorry, maybe I'm not being clear with my recent suggestion. It's that there is no optimization step, but rather than an expression like ReadParquet(path=x, columns=["a", "b", "c"]) knows that it should generate tasks that read from several row groups. So there's no optimization or Expr manipulation necessary, it's just that the default number of row groups per partition changes in ReadParquet based on the selected columns.

Again though, I'm also happy if you just override me here. I don't think that what I'm suggesting is strictly better or strictly simple. It moves the complexity into the expression, rather than making a new nested expression. It's a less general solution, but also a lower-tech solution. Not better, just different. If you just want to move forward with your plan then please do.

@phofl
Copy link
Collaborator Author

phofl commented Oct 11, 2023

I tried this approach in the beginning as well, but I couldn't figure out a way of calculating the divisions appropriately, because we have to use the initial divisions until combine_similar happens before we can switch to the version that reads multiple parts. This got pretty complicated

@mrocklin
Copy link
Member

I'm curious to learn more about that. I would hope that during optimization we don't really need to query divisions at all.

Happy to chat any time. I'll be in meetings for the next couple of hours though.

If you want to go ahead and merge this and move forwards that's fine with me. I get a little nervous as things build on though.

@phofl
Copy link
Collaborator Author

phofl commented Oct 11, 2023

Blockwise ops validate divisions for example, so we don't get away without them

@rjzamora
Copy link
Member

Blockwise ops validate divisions for example, so we don't get away without them

Yeah, I think some of the pain indeed comes down to the fact that we rely so much on divisions/npartitions in many Expr classes (which I was also thinking about yesterday). I think optimizations like this would be much easier if we tried producing "abstract" classes from the collection API whenever possible, and avoided "lowering" to expressions with divisions/npartitions support until it becomes necessary. However, this is a non-trivial change at this point.

That said - I'm not really sure we need to introduce a new Expr class here. It seems like it should be possible to replace a ReadParquet instance with another ReadParquet instance having a larger blocksize argument?

@phofl
Copy link
Collaborator Author

phofl commented Oct 11, 2023

I don't really know how all the parquet operations work internally, so you are probably better suited to judge this.

The issue I am seeing here is that I don't really know what the block size is. I want to make sure that I end up with the initial block size, e.g. we combine two partitions into one if we drop half the columns. I am open to suggestions if we can do something similar with the current read_parquet arguments (that said I would like to avoid inspecting s3 again for meta and other attributes, the cost is non-trivial)

Edit: This obviously wouldn't work for other IO operations, so only a partial solution

@rjzamora
Copy link
Member

(that said I would like to avoid inspecting s3 again for meta and other attributes, the cost is non-trivial)

Yeah, that's the part I'm worried about as well. I know we don't need to collect new metadata, but I'm not sure if something will need to change to avoid this. I do agree with your interest in adding general Expr-based solution here, but I'd like to take a bit of time to think of alternatives.

Side Note: The most general solution is probably to avoid collecting divisions at all in top-level IO expressions like ReadParquet, and only do this after a simplify/combine_similar pass, when a "lower" operation could convert the abstract expression to something like ReadParquetPartitions (at which point the column projection could be taken into account when choosing file/row-group to partition mapping).

@@ -121,6 +126,37 @@ def _combine_similar(self, root: Expr):
return


class FusedIO(BlockwiseIO):
_parameters = ["expr"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we will still generate the graph for self.operand("expr") when FusedIO.__dask_graph__ is called.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes in _task

@phofl
Copy link
Collaborator Author

phofl commented Oct 11, 2023

@mrocklin and I chatted offline about this. The consensus was that we would like to remove the dependency on divisions in the future. This will probably open up ways how we can integrate this solution here better with BlockwiseIO or similar. This is hard to do until we get there though.

I'll merge this solution now, happy to reiterate what we have here as long as we keep the performance improvement. I might change the way this is applied a little bit tomorrow or on Friday depending on how some of my explorations go. Current plan is to continue iterating to improve performance of the system through new optimisations/new wrinkles that we can introduce

@phofl phofl merged commit 044617b into dask:main Oct 11, 2023
4 checks passed
@phofl phofl deleted the fuse_io branch October 11, 2023 22:30
@rjzamora
Copy link
Member

The consensus was that we would like to remove the dependency on divisions in the future.

I'm on board with this (as you know). Interested to know if you are looking to remove the dependency across the board in a general way, or if you just want to avoid division/npartition queries unless absolutely necessary?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants