Skip to content

Commit

Permalink
Pull: turn flatMapOutput into an extension method.
Browse files Browse the repository at this point in the history
We may like method calls, even extension methods, better than
functions. Plus, we need fewer type arguments in the calls.
  • Loading branch information
diesalbla committed Oct 16, 2022
1 parent d38a2bd commit a26e2f1
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
25 changes: 12 additions & 13 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,16 @@ object Pull extends PullLowPriority {
* Use `p.void.stream` to explicitly ignore the result of a pull.
*/
def streamNoScope: Stream[F, O] = new Stream(self)

private[fs2] def flatMapOutput[F2[x] >: F[x], O2](
f: O => Pull[F2, O2, Unit]
): Pull[F2, O2, Unit] =
self match {
case a: AlgEffect[F, Unit] => a
case r: Terminal[_] => r
case _ => FlatMapOutput(self, f)
}

}

private[this] val unit: Terminal[Unit] = Succeeded(())
Expand Down Expand Up @@ -1029,7 +1039,7 @@ object Pull extends PullLowPriority {
else {
def go(idx: Int): Pull[G, X, Unit] =
if (idx == chunk.size)
flatMapOutput[G, G, Y, X](tail, fun)
tail.flatMapOutput(fun)
else {
try {
var j = idx
Expand All @@ -1046,8 +1056,7 @@ object Pull extends PullLowPriority {
case Succeeded(_) => go(j + 1)
case Fail(err) => Fail(err)
case interruption @ Interrupted(_, _) =>
val ib = interruptBoundary(tail, interruption)
flatMapOutput[G, G, Y, X](ib, fun)
interruptBoundary(tail, interruption).flatMapOutput(fun)
}
} catch { case NonFatal(e) => Fail(e) }
}
Expand Down Expand Up @@ -1288,16 +1297,6 @@ object Pull extends PullLowPriority {
go(initScope, None, initFk, new OuterRun(init), stream)
}

private[fs2] def flatMapOutput[F[_], F2[x] >: F[x], O, O2](
p: Pull[F, O, Unit],
f: O => Pull[F2, O2, Unit]
): Pull[F2, O2, Unit] =
p match {
case a: AlgEffect[F, Unit] => a
case r: Terminal[_] => r
case _ => FlatMapOutput(p, f)
}

private[fs2] def translate[F[_], G[_], O](
stream: Pull[F, O, Unit],
fK: F ~> G
Expand Down
11 changes: 6 additions & 5 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
queue: Queue[F2, Option[Chunk[O2]]]
): Stream[F2, Nothing] = enqueueNoneTerminatedChunks(queue: QueueSink[F2, Option[Chunk[O2]]])

import Pull.StreamPullOps

/** Alias for `flatMap(o => Stream.eval(f(o)))`.
*
* @example {{{
Expand All @@ -975,8 +977,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
* is available, however, with caveats.
*/
def evalMap[F2[x] >: F[x], O2](f: O => F2[O2]): Stream[F2, O2] = {
def evalOut(o: O): Pull[F2, O2, Unit] = Pull.eval(f(o)).flatMap(Pull.output1)
Pull.flatMapOutput[F, F2, O, O2](underlying, evalOut).streamNoScope
def evalOut(o: O) = Pull.eval(f(o)).flatMap(Pull.output1)
underlying.flatMapOutput(evalOut).streamNoScope
}

/** Like `evalMap`, but operates on chunks for performance. This means this operator
Expand Down Expand Up @@ -1194,7 +1196,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
def flatMap[F2[x] >: F[x], O2](
f: O => Stream[F2, O2]
)(implicit ev: NotGiven[O <:< Nothing]): Stream[F2, O2] =
new Stream(Pull.flatMapOutput[F, F2, O, O2](underlying, (o: O) => f(o).underlying))
underlying.flatMapOutput((o: O) => f(o).underlying).streamNoScope

/** Alias for `flatMap(_ => s2)`. */
def >>[F2[x] >: F[x], O2](
Expand Down Expand Up @@ -1286,8 +1288,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
* }}}
*/
def foreach[F2[x] >: F[x]](f: O => F2[Unit]): Stream[F2, Nothing] = {
def exec(o: O): Pull[F2, Nothing, Unit] = Pull.eval(f(o))
Pull.flatMapOutput[F, F2, O, Nothing](underlying, exec).streamNoScope
underlying.flatMapOutput(o => Pull.eval(f(o))).streamNoScope
}

/** Partitions the input into a stream of chunks according to a discriminator function.
Expand Down

0 comments on commit a26e2f1

Please sign in to comment.