Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to handle an error with fs2.Stream based endpoint #3940

Open
jenwirth opened this issue Jul 18, 2024 · 2 comments
Open

How to handle an error with fs2.Stream based endpoint #3940

jenwirth opened this issue Jul 18, 2024 · 2 comments

Comments

@jenwirth
Copy link

jenwirth commented Jul 18, 2024

Hi,

I am struggling with adding an error out to a stream based endpoint. I want to return a 404 when the Stream has a UnknownAvatar error (inside the getAvatar i am adapting an IOException to UnknownAvatar)

Here's the endpoint definition:

def avatar[F[_]] =
  endpoint.get
    .in("user" / path[UserId].name("user-id").example(UserId(UUID.fromString("596cf534-9af8-47f5-ab7d-69f70e5e76a0"))) / "avatar.png")
    .errorOut(
      oneOf[AvatarError](
        oneOfVariant(
          statusCode(StatusCode.NotFound)
            .and(jsonBody[UnknownAvatar.type].description("no avatar found for user"))
        )
      )
    )
    .out(streamBinaryBody(Fs2Streams[F])(new CodecFormat {
      override def mediaType: MediaType = MediaType.ImagePng
    }))

The endpoint implementation:

val avatarSEP = UserProfileEndpoint.avatar[F].serverLogic { id =>
  val value: fs2.Stream[F, Byte] = profileManager.getAvatar(id)

  value
    .onError { case t: Throwable =>
      fs2.Stream.eval(Logger[F].debug(s"error encountered (1) ${t.getClass.getName}"))
    }
    .pure[F]
    .onError { case t: Throwable => Logger[F].debug(s"error encountered  (2) ${t.getClass.getName}") }
    .attemptNarrow[AvatarError]
}

This does not work and a left value is still returned to the client. Resulting in a "Connection prematurely closed DURING response". The only debug message i see is the first one.

Thanks so much for any help!

@adamw
Copy link
Member

adamw commented Jul 19, 2024

I think the problem is that the error is only returned as part of a stream. So: you are always returning a fs2.Stream. This means that you always have to return the Right-hand side, as the result of the server logic. Tapir interprets it as a "successful response", and uses the successful output (here: a streaming one). Now, as part of generating the stream, there might be an error - which happens in your case - causing the stream to become broken.

However, it's already too late to change how the response is generated, as we've already chosen the successful output. Note that in theory, such a stream error might occur also when e.g. half of the avatar is already transmitted.

The proper solution would be to change the signature of the profileManager.getAvatar method. It should return e.g. a
F[fs2.Stream[F, Byte]], where the outer left would represent the effect of looking up the avatar. Then you could recover from that, and use the error output. The streaming output would only be used if the avatar has been found.

@adamw
Copy link
Member

adamw commented Jul 19, 2024

Here's a working example, using IO instead of F[_]. Note that the successful result of getAvatar is mapped to a Right, and the errors are recovered using recoverWith:

import cats.effect.*
import cats.syntax.all.*
import io.circe.generic.auto.*
import org.http4s.HttpRoutes
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.server.Router
import sttp.capabilities.fs2.Fs2Streams
import sttp.model.{MediaType, StatusCode}
import sttp.tapir.*
import sttp.tapir.generic.auto.*
import sttp.tapir.json.circe.jsonBody
import sttp.tapir.server.http4s.Http4sServerInterpreter

import scala.concurrent.ExecutionContext

object HelloWorldHttp4sServer extends IOApp:
  sealed class AvatarError extends Exception
  case object UnknownAvatar extends AvatarError

  def avatar[F[_]] =
    endpoint.get
      .in("user" / path[String] / "avatar.png")
      .errorOut(
        oneOf[AvatarError](
          oneOfVariant(
            statusCode(StatusCode.NotFound)
              .and(jsonBody[UnknownAvatar.type].description("no avatar found for user"))
          )
        )
      )
      .out(streamBinaryBody(Fs2Streams[F])(new CodecFormat {
        override def mediaType: MediaType = MediaType.ImagePng
      }))

  def getAvatar(id: String): IO[fs2.Stream[IO, Byte]] =
    if id == "ok" then IO.pure(fs2.Stream.fromIterator[IO]("picture".getBytes.iterator, 16)) else IO.raiseError(UnknownAvatar)

  def avatarSEP = avatar.serverLogic { id =>
    val value = getAvatar(id)

    value
      .map(stream => Right(stream))
      .recoverWith { case UnknownAvatar =>
        IO(println("error encountered")).map(_ => Left(UnknownAvatar))
      }
  }

  val helloWorldRoutes: HttpRoutes[IO] = Http4sServerInterpreter[IO]().toRoutes(avatarSEP)

  implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global

  override def run(args: List[String]): IO[ExitCode] =
    // starting the server
    BlazeServerBuilder[IO]
      .withExecutionContext(ec)
      .bindHttp(8080, "localhost")
      .withHttpApp(Router("/" -> helloWorldRoutes).orNotFound)
      .resource
      .use { _ => IO.never }
      .as(ExitCode.Success)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants