Replies: 10 comments 14 replies
-
@mbasmanova @aditi-pandit We can discuss the final solution in this discussions. |
Beta Was this translation helpful? Give feedback.
-
I would recommend using method 1 due to the differing logic between StreamingWindow and Window. Combining them into a single implementation would require a large if-else branch to distinguish between the two logics. By keeping StreamingWindow and Window as separate implementations, we can ensure clarity and separation of concerns. It allows for easier understanding of each operator's specific behavior and facilitates independent modifications or optimizations in the future. For the spill case, both StreamingWindow and Window operators require spill logic. It would be beneficial to consolidate the spill logic into a common code section if method 1 is chosen. |
Beta Was this translation helpful? Give feedback.
-
@JkSelf Thank you for opening an issue. First, let's clarify why Spark pre-sorts the inputs before executing Window operator. My guess is that sorting can be done in a distributed matter, e.g. many nodes can partially sort the data, then a single node can merge sort the results. In addition, I imagine that Spark can partition the data first and evaluate Window operator on different sets of partitions on multiple nodes in parallel. Is this how Spark works? Let's make sure to add this information to the description on this issue to explain why we want Velox to support Window over pre-sorted inputs. |
Beta Was this translation helpful? Give feedback.
-
@mbasmanova When it comes to the Window operator in Spark, it requires access to data from multiple partitions in order to compute the window functions correctly. To achieve this, Spark employs a two-step process:
Once the data is partitioned and sorted, Spark can evaluate the Window operator in parallel on different sets of partitions across the cluster. This allows for efficient and scalable computation of window functions, as each node can independently process its assigned partitions and then combine the results as needed. |
Beta Was this translation helpful? Give feedback.
-
@mbasmanova |
Beta Was this translation helpful? Give feedback.
-
@JkSelf Thank you for clarifying. Would you share some queries and excerpts of Spark query plans that show the partitioning, partial sorting, final sorting and window plan nodes? I'd like to double check my understanding to make sure it is complete. |
Beta Was this translation helpful? Give feedback.
-
Looking at Presto WindowNode, I see that it supports specifying whether inputs are partially partitioned (prePartitionedInputs) and sorted (preSortedOrderPrefix).
This seems to be a more general case of what you have in Spark. I suggest we modify WindowNode in Velox in a similar manner to allow support for both Presto and Spark. This would also be similar how we define streaming aggregations by specifying that input is clustered on a subset of grouping keys. |
Beta Was this translation helpful? Give feedback.
-
I don't have a strong opinion on whether streaming window should be a separate operator or integrated into existing Window operator. For aggregations, we have a separate operator for the case when input is clustered on all grouping keys while HashAggregation operator has support for partially grouped inputs. If you decide to implement a separate StreamingWindow operator, do make sure not to make it inherit from the Window operator class. Instead, extract shared logic into a separate class and use it directly. It might be helpful to experiment with extending existing Window operator to add support for streaming case. For example, it is not clear to me that the following statement is accurate:
|
Beta Was this translation helpful? Give feedback.
-
If we want to separate StreamingWindow there is also another design in my mind. We could split the WindowOperator into 2 separate operators (lets call them WindowBuild and WndowOutput say). WindowBuild is different in the current Window and StreamingWindow logics. Window accumulates all input rows and starts pushing rows out in partition+sort order after all input is received. There might not be an active "pushing rows out", but rather just sharing the RowContainer. StreamingWindow identifies the rows of each partition and has a batches them one at a time for WindowOutput operator. The WindowOutput does the job of invoking the WindowFunction and sends rows downstream. This is common for both implementations. This separation could also just be a logical class/code separation within Window operator and not a separate physical operator in the plan per-se. @mbasmanova, @JkSelf : What do you think of this proposal ? |
Beta Was this translation helpful? Give feedback.
-
When it comes to the Window operator in Spark, it requires access to data from multiple partitions in order to compute the window functions correctly. To achieve this, Spark employs a two-step process:
Once the data is partitioned and sorted, Spark can evaluate the Window operator in parallel on different sets of partitions across the cluster. This allows for efficient and scalable computation of window functions, as each node can independently process its assigned partitions and then combine the results as needed.
Typically, there is no requirement to perform sorting within the Window operator. However, in Velox, the Window operator does sort the data. To reduce the memory footprint in stream processing scenarios, we can consider using the StreamingWindow operator instead of the regular Window operator. With StreamingWindow, we can define windows and apply window functions to the grouped data as soon as it becomes available, without the need for materializing and sorting the entire partition. This allows for more efficient and low-latency processing of streaming data while minimizing memory requirements.
Here are the two approaches :
Beta Was this translation helpful? Give feedback.
All reactions