diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index 16f86cbde3..ed7c5f4df8 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -321,6 +321,16 @@ object Pull extends PullLowPriority { * Use `p.void.stream` to explicitly ignore the result of a pull. */ def streamNoScope: Stream[F, O] = new Stream(self) + + private[fs2] def flatMapOutput[F2[x] >: F[x], O2]( + f: O => Pull[F2, O2, Unit] + ): Pull[F2, O2, Unit] = + self match { + case a: AlgEffect[F, Unit] => a + case r: Terminal[_] => r + case _ => FlatMapOutput(self, f) + } + } private[this] val unit: Terminal[Unit] = Succeeded(()) @@ -1029,7 +1039,7 @@ object Pull extends PullLowPriority { else { def go(idx: Int): Pull[G, X, Unit] = if (idx == chunk.size) - flatMapOutput[G, G, Y, X](tail, fun) + tail.flatMapOutput(fun) else { try { var j = idx @@ -1046,8 +1056,7 @@ object Pull extends PullLowPriority { case Succeeded(_) => go(j + 1) case Fail(err) => Fail(err) case interruption @ Interrupted(_, _) => - val ib = interruptBoundary(tail, interruption) - flatMapOutput[G, G, Y, X](ib, fun) + interruptBoundary(tail, interruption).flatMapOutput(fun) } } catch { case NonFatal(e) => Fail(e) } } @@ -1288,16 +1297,6 @@ object Pull extends PullLowPriority { go(initScope, None, initFk, new OuterRun(init), stream) } - private[fs2] def flatMapOutput[F[_], F2[x] >: F[x], O, O2]( - p: Pull[F, O, Unit], - f: O => Pull[F2, O2, Unit] - ): Pull[F2, O2, Unit] = - p match { - case a: AlgEffect[F, Unit] => a - case r: Terminal[_] => r - case _ => FlatMapOutput(p, f) - } - private[fs2] def translate[F[_], G[_], O]( stream: Pull[F, O, Unit], fK: F ~> G diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 57609af1ce..d4a7eb6994 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -962,6 +962,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, queue: Queue[F2, Option[Chunk[O2]]] ): Stream[F2, Nothing] = enqueueNoneTerminatedChunks(queue: QueueSink[F2, Option[Chunk[O2]]]) + import Pull.StreamPullOps + /** Alias for `flatMap(o => Stream.eval(f(o)))`. * * @example {{{ @@ -975,8 +977,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * is available, however, with caveats. */ def evalMap[F2[x] >: F[x], O2](f: O => F2[O2]): Stream[F2, O2] = { - def evalOut(o: O): Pull[F2, O2, Unit] = Pull.eval(f(o)).flatMap(Pull.output1) - Pull.flatMapOutput[F, F2, O, O2](underlying, evalOut).streamNoScope + def evalOut(o: O) = Pull.eval(f(o)).flatMap(Pull.output1) + underlying.flatMapOutput(evalOut).streamNoScope } /** Like `evalMap`, but operates on chunks for performance. This means this operator @@ -1194,7 +1196,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, def flatMap[F2[x] >: F[x], O2]( f: O => Stream[F2, O2] )(implicit ev: NotGiven[O <:< Nothing]): Stream[F2, O2] = - new Stream(Pull.flatMapOutput[F, F2, O, O2](underlying, (o: O) => f(o).underlying)) + underlying.flatMapOutput((o: O) => f(o).underlying).streamNoScope /** Alias for `flatMap(_ => s2)`. */ def >>[F2[x] >: F[x], O2]( @@ -1286,8 +1288,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * }}} */ def foreach[F2[x] >: F[x]](f: O => F2[Unit]): Stream[F2, Nothing] = { - def exec(o: O): Pull[F2, Nothing, Unit] = Pull.eval(f(o)) - Pull.flatMapOutput[F, F2, O, Nothing](underlying, exec).streamNoScope + underlying.flatMapOutput(o => Pull.eval(f(o))).streamNoScope } /** Partitions the input into a stream of chunks according to a discriminator function.