From cbede06a4bbbf376f596488bb0cc94f994713176 Mon Sep 17 00:00:00 2001 From: "Diego E. Alonso" Date: Mon, 17 Oct 2022 14:04:11 +0100 Subject: [PATCH] Pull - turn "unconsFlatMap" into an extension method. --- core/shared/src/main/scala/fs2/Pull.scala | 18 +++++++++--------- core/shared/src/main/scala/fs2/Stream.scala | 11 +++++------ 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index e4ecb8fa9e..9970ed119b 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -331,6 +331,14 @@ object Pull extends PullLowPriority { case _ => FlatMapOutput(self, f) } + private[fs2] def unconsFlatMap[F2[x] >: F[x], O2]( + f: Chunk[O] => Pull[F2, O2, Unit] + ): Pull[F2, O2, Unit] = + uncons(self).flatMap { + case None => Pull.done + case Some((hd, tl)) => f(hd) >> tl.unconsFlatMap(f) + } + } private[this] val unit: Terminal[Unit] = Succeeded(()) @@ -469,14 +477,6 @@ object Pull extends PullLowPriority { case Right(r) => Pull.pure(r) } - private[fs2] def unconsFlatMap[F[_], F2[x] >: F[x], O, O2](p: Pull[F, O, Unit])( - f: Chunk[O] => Pull[F2, O2, Unit] - ): Pull[F2, O2, Unit] = - uncons(p).flatMap { - case None => Pull.done - case Some((hd, tl)) => f(hd) >> unconsFlatMap[F, F2, O, O2](tl)(f) - } - private[fs2] def fail[F[_]](err: Throwable): Pull[F, Nothing, Nothing] = Fail(err) final class PartiallyAppliedFromEither[F[_]] { @@ -1325,7 +1325,7 @@ object Pull extends PullLowPriority { s: Stream[F, O], f: O => P ): Pull[F, P, Unit] = - unconsFlatMap[F, F, O, P](s.pull.echo)(hd => Pull.output(hd.map(f))) + s.pull.echo.unconsFlatMap(hd => Pull.output(hd.map(f))) private[this] def transformWith[F[_], O, R, S](p: Pull[F, O, R])( f: Terminal[R] => Pull[F, O, S] diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 7ed245b99b..a46d54bda6 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -35,6 +35,7 @@ import cats.syntax.all._ import fs2.compat._ import fs2.concurrent._ import fs2.internal._ +import Pull.StreamPullOps /** A stream producing output of type `O` and which may evaluate `F` effects. * @@ -401,7 +402,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * }}} */ def chunks: Stream[F, Chunk[O]] = - Pull.unconsFlatMap[F, F, O, Chunk[O]](underlying)(Pull.output1).stream + underlying.unconsFlatMap(Pull.output1).stream /** Outputs chunk with a limited maximum size, splitting as necessary. * @@ -418,7 +419,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, Pull.output1(pre) >> breakup(rest) } - Pull.unconsFlatMap[F, F, O, Chunk[O]](underlying)(breakup).stream + underlying.unconsFlatMap(breakup).stream } /** Outputs chunks of size larger than N @@ -804,7 +805,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * }}} */ def drain: Stream[F, Nothing] = - Pull.unconsFlatMap[F, F, O, Nothing](underlying)(_ => Pull.done).stream + underlying.unconsFlatMap(_ => Pull.done).stream /** Drops `n` elements of the input, then echoes the rest. * @@ -962,8 +963,6 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, queue: Queue[F2, Option[Chunk[O2]]] ): Stream[F2, Nothing] = enqueueNoneTerminatedChunks(queue: QueueSink[F2, Option[Chunk[O2]]]) - import Pull.StreamPullOps - /** Alias for `flatMap(o => Stream.eval(f(o)))`. * * @example {{{ @@ -1818,7 +1817,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * }}} */ def mapChunks[O2](f: Chunk[O] => Chunk[O2]): Stream[F, O2] = - Pull.unconsFlatMap[F, F, O, O2](underlying)((hd: Chunk[O]) => Pull.output(f(hd))).stream + underlying.unconsFlatMap((hd: Chunk[O]) => Pull.output(f(hd))).stream /** Behaves like the identity function but halts the stream on an error and does not return the error. *