-
-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add IO fusion if we can reduce number of partitions #327
Conversation
This structure of containing one IO expr in another seems fine, but also a little complicated. In an ideal world I think that the IO expressions, like ReadParquet, would have an ability to group multiple partitions together, and that we would use those parameters instead of FusedIO. Thoughts? Maybe this can be achieved with a convention among IO expressions? |
Having this abstraction made the generic implementation easier since we don't have to worry how the single partitions are created in the first place. I agree with you generally though, but I think it's easier to figure this out once we have more I/O connectors? |
dask_expr/io/io.py
Outdated
def _factor(self): | ||
return 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could probably use a better name, or an explanatory docstring
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good point
if self.operand("columns") is None: | ||
return 1 | ||
nr_original_columns = len(self._dataset_info["schema"].names) - 1 | ||
return len(_convert_to_list(self.operand("columns"))) / nr_original_columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this shouldn't be a property or an instance method, but is instead some logic that we do within the fusion operation. It seems to be only relevant for that optimization. Is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but the factor calculation will be different for different I/O methods, so it's tricky coming up with a general mechanism that won't need a bunch of special casing.
E.g. CSV will need different logic than here
Maybe if we're thinking short-term then we just do this on I don't fundamentally disagree with the approach taken here, but it seems like using a big gun to solve a specific problem. I suspect that a less sophisticated solution would be as effective with what we have today. If we find that it's hard to do this on each IO class consistently then I think it would make a lot of sense to do something generic like this. In practice I think it's not a big deal today, but I'm imagining the developer of next year who is presented with this FusedIO thing and then needs to figure out why it exists. |
Not a big deal. I'm happy to be overruled on this. It just feels a little off to me. |
There is another issue, the fusion we are doing here needs to happen after combine similar but before the blockwise fusion happens. We basically need another optimisation step and I don't think that we can encapsulate this into the |
return expr | ||
|
||
expr = _fusion_pass(expr) | ||
return expr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this could be implemented with the following:
class BlockwiseIO:
def simplify_down(self):
if self._fusion_compression_factor < 1:
return FusedIO(self)
I might be missing something here though.
Maybe this is part of my confusion here. This seems like an optimization that we always want to make on an expression type. I think that that means that we don't really need to use the Expression stuff at all. This could be a method on IO
.
Sometimes people get used to using a fancy form of optimization that they forget about the simpler approaches they have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No sorry that won't work in simplify (see below). Happy to chat if my explanation is confusion. it has to come after combine_similar.
I'm not sure I understand. So let's say that after various optimizaions, we have the following expression: ReadParquet(path=..., columns=["a", "b"]) For a dataset with ten columns. OK, while normally this expression would report that it has 100 partitions, it now reports that it has 20, and every task that it emits reads five row groups. |
After simplify but before
Combine_similar squashes all of them together so that every IO op from "x" is
We can not determine the compression factor before we remove repeated reads from storage and create the union of columns that we need. So this fusion step has to come after combine_similar. I am open to add a step that's similar to simplify, but we can't combine both of them together |
Sorry, maybe I'm not being clear with my recent suggestion. It's that there is no optimization step, but rather than an expression like Again though, I'm also happy if you just override me here. I don't think that what I'm suggesting is strictly better or strictly simple. It moves the complexity into the expression, rather than making a new nested expression. It's a less general solution, but also a lower-tech solution. Not better, just different. If you just want to move forward with your plan then please do. |
I tried this approach in the beginning as well, but I couldn't figure out a way of calculating the divisions appropriately, because we have to use the initial divisions until combine_similar happens before we can switch to the version that reads multiple parts. This got pretty complicated |
I'm curious to learn more about that. I would hope that during optimization we don't really need to query divisions at all. Happy to chat any time. I'll be in meetings for the next couple of hours though. If you want to go ahead and merge this and move forwards that's fine with me. I get a little nervous as things build on though. |
Blockwise ops validate divisions for example, so we don't get away without them |
Yeah, I think some of the pain indeed comes down to the fact that we rely so much on That said - I'm not really sure we need to introduce a new |
I don't really know how all the parquet operations work internally, so you are probably better suited to judge this. The issue I am seeing here is that I don't really know what the block size is. I want to make sure that I end up with the initial block size, e.g. we combine two partitions into one if we drop half the columns. I am open to suggestions if we can do something similar with the current read_parquet arguments (that said I would like to avoid inspecting s3 again for meta and other attributes, the cost is non-trivial) Edit: This obviously wouldn't work for other IO operations, so only a partial solution |
Yeah, that's the part I'm worried about as well. I know we don't need to collect new metadata, but I'm not sure if something will need to change to avoid this. I do agree with your interest in adding general Side Note: The most general solution is probably to avoid collecting divisions at all in top-level IO expressions like |
@@ -121,6 +126,37 @@ def _combine_similar(self, root: Expr): | |||
return | |||
|
|||
|
|||
class FusedIO(BlockwiseIO): | |||
_parameters = ["expr"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose we will still generate the graph for self.operand("expr")
when FusedIO.__dask_graph__
is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes in _task
@mrocklin and I chatted offline about this. The consensus was that we would like to remove the dependency on divisions in the future. This will probably open up ways how we can integrate this solution here better with BlockwiseIO or similar. This is hard to do until we get there though. I'll merge this solution now, happy to reiterate what we have here as long as we keep the performance improvement. I might change the way this is applied a little bit tomorrow or on Friday depending on how some of my explorations go. Current plan is to continue iterating to improve performance of the system through new optimisations/new wrinkles that we can introduce |
I'm on board with this (as you know). Interested to know if you are looking to remove the dependency across the board in a general way, or if you just want to avoid division/npartition queries unless absolutely necessary? |
We end up with a lot of small chunks if we can drop lots of columns from the parquet files. This is especially bad for P2P based merges, since small chunks slow us down there. Squashing the partitions together solves that problem so that we end up with the original chunksize