Skip to content

Commit

Permalink
feat(sink): kafka async truncate log (#12587)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Oct 7, 2023
1 parent 01c7c2b commit 848b0a1
Show file tree
Hide file tree
Showing 5 changed files with 531 additions and 186 deletions.
10 changes: 10 additions & 0 deletions src/common/src/util/future_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::future::pending;

use futures::future::Either;
use futures::{Future, FutureExt, Stream};

/// Convert a list of streams into a [`Stream`] of results from the streams.
Expand All @@ -33,3 +34,12 @@ pub fn pending_on_none<I>(future: impl Future<Output = Option<I>>) -> impl Futur
}
})
}

pub fn drop_either_future<A, B>(
either: Either<(A, impl Future), (B, impl Future)>,
) -> Either<A, B> {
match either {
Either::Left((left, _)) => Either::Left(left),
Either::Right((right, _)) => Either::Right(right),
}
}
2 changes: 1 addition & 1 deletion src/common/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub mod tracing;
pub mod value_encoding;
pub mod worker_util;

pub use future_utils::{pending_on_none, select_all};
pub use future_utils::{drop_either_future, pending_on_none, select_all};
#[macro_use]
pub mod match_util;

Expand Down
Loading

0 comments on commit 848b0a1

Please sign in to comment.