Skip to content

Commit

Permalink
Pull - turn "unconsFlatMap" into an extension method.
Browse files Browse the repository at this point in the history
  • Loading branch information
diesalbla committed Oct 17, 2022
1 parent 1c78d00 commit cbede06
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 15 deletions.
18 changes: 9 additions & 9 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,14 @@ object Pull extends PullLowPriority {
case _ => FlatMapOutput(self, f)
}

private[fs2] def unconsFlatMap[F2[x] >: F[x], O2](
f: Chunk[O] => Pull[F2, O2, Unit]
): Pull[F2, O2, Unit] =
uncons(self).flatMap {
case None => Pull.done
case Some((hd, tl)) => f(hd) >> tl.unconsFlatMap(f)
}

}

private[this] val unit: Terminal[Unit] = Succeeded(())
Expand Down Expand Up @@ -469,14 +477,6 @@ object Pull extends PullLowPriority {
case Right(r) => Pull.pure(r)
}

private[fs2] def unconsFlatMap[F[_], F2[x] >: F[x], O, O2](p: Pull[F, O, Unit])(
f: Chunk[O] => Pull[F2, O2, Unit]
): Pull[F2, O2, Unit] =
uncons(p).flatMap {
case None => Pull.done
case Some((hd, tl)) => f(hd) >> unconsFlatMap[F, F2, O, O2](tl)(f)
}

private[fs2] def fail[F[_]](err: Throwable): Pull[F, Nothing, Nothing] = Fail(err)

final class PartiallyAppliedFromEither[F[_]] {
Expand Down Expand Up @@ -1325,7 +1325,7 @@ object Pull extends PullLowPriority {
s: Stream[F, O],
f: O => P
): Pull[F, P, Unit] =
unconsFlatMap[F, F, O, P](s.pull.echo)(hd => Pull.output(hd.map(f)))
s.pull.echo.unconsFlatMap(hd => Pull.output(hd.map(f)))

private[this] def transformWith[F[_], O, R, S](p: Pull[F, O, R])(
f: Terminal[R] => Pull[F, O, S]
Expand Down
11 changes: 5 additions & 6 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import cats.syntax.all._
import fs2.compat._
import fs2.concurrent._
import fs2.internal._
import Pull.StreamPullOps

/** A stream producing output of type `O` and which may evaluate `F` effects.
*
Expand Down Expand Up @@ -401,7 +402,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
* }}}
*/
def chunks: Stream[F, Chunk[O]] =
Pull.unconsFlatMap[F, F, O, Chunk[O]](underlying)(Pull.output1).stream
underlying.unconsFlatMap(Pull.output1).stream

/** Outputs chunk with a limited maximum size, splitting as necessary.
*
Expand All @@ -418,7 +419,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
Pull.output1(pre) >> breakup(rest)
}

Pull.unconsFlatMap[F, F, O, Chunk[O]](underlying)(breakup).stream
underlying.unconsFlatMap(breakup).stream
}

/** Outputs chunks of size larger than N
Expand Down Expand Up @@ -804,7 +805,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
* }}}
*/
def drain: Stream[F, Nothing] =
Pull.unconsFlatMap[F, F, O, Nothing](underlying)(_ => Pull.done).stream
underlying.unconsFlatMap(_ => Pull.done).stream

/** Drops `n` elements of the input, then echoes the rest.
*
Expand Down Expand Up @@ -962,8 +963,6 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
queue: Queue[F2, Option[Chunk[O2]]]
): Stream[F2, Nothing] = enqueueNoneTerminatedChunks(queue: QueueSink[F2, Option[Chunk[O2]]])

import Pull.StreamPullOps

/** Alias for `flatMap(o => Stream.eval(f(o)))`.
*
* @example {{{
Expand Down Expand Up @@ -1818,7 +1817,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
* }}}
*/
def mapChunks[O2](f: Chunk[O] => Chunk[O2]): Stream[F, O2] =
Pull.unconsFlatMap[F, F, O, O2](underlying)((hd: Chunk[O]) => Pull.output(f(hd))).stream
underlying.unconsFlatMap((hd: Chunk[O]) => Pull.output(f(hd))).stream

/** Behaves like the identity function but halts the stream on an error and does not return the error.
*
Expand Down

0 comments on commit cbede06

Please sign in to comment.