Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Dec 25, 2024
1 parent a0ac0b2 commit 6835a2e
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions src/stream/src/executor/exchange/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,16 @@ pub(crate) fn new_input(
}

impl DispatcherMessageBatch {
fn into_messages(self) -> Vec<DispatcherMessage> {
fn into_messages(self) -> impl Iterator<Item = DispatcherMessage> {
#[auto_enums::auto_enum(Iterator)]
match self {
DispatcherMessageBatch::BarrierBatch(barriers) => barriers
.into_iter()
.map(DispatcherMessage::Barrier)
.collect(),
DispatcherMessageBatch::Chunk(c) => vec![DispatcherMessage::Chunk(c)],
DispatcherMessageBatch::Watermark(w) => vec![DispatcherMessage::Watermark(w)],
DispatcherMessageBatch::BarrierBatch(barriers) => {
barriers.into_iter().map(DispatcherMessage::Barrier)
}
DispatcherMessageBatch::Chunk(c) => std::iter::once(DispatcherMessage::Chunk(c)),
DispatcherMessageBatch::Watermark(w) => {
std::iter::once(DispatcherMessage::Watermark(w))
}
}
}
}

0 comments on commit 6835a2e

Please sign in to comment.