Skip to content

Commit

Permalink
Reformat using scalafmt
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Mar 14, 2023
1 parent af766cc commit e7ed772
Show file tree
Hide file tree
Showing 146 changed files with 922 additions and 1,064 deletions.
5 changes: 3 additions & 2 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
version = 3.7.2
maxColumn = 120
rewrite.rules = [RedundantBraces, RedundantParens, SortImports]
runner.dialect = scala3
version = 3.7.2
maxColumn = 120
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class AkkaHttpBackend private (
if (r.isWebSocket) sendWebSocket(r) else sendRegular(r)
}

private def sendRegular[T](r: GenericRequest[T, R]): Future[Response[T]] = {
private def sendRegular[T](r: GenericRequest[T, R]): Future[Response[T]] =
Future
.fromTry(ToAkka.request(r).flatMap(BodyToAkka(r, r.body, _)))
.map(customizeRequest)
Expand All @@ -62,7 +62,6 @@ class AkkaHttpBackend private (
.flatMap(response => responseFromAkka(r, response, None).recoverWith(consumeResponseOnFailure(response)))
)
)
}

private def sendWebSocket[T](r: GenericRequest[T, R]): Future[Response[T]] = {
val akkaWebsocketRequest = ToAkka
Expand Down Expand Up @@ -121,9 +120,9 @@ class AkkaHttpBackend private (
private lazy val bodyFromAkka = new BodyFromAkka()(ec, implicitly[Materializer], monad)

private def responseFromAkka[T](
r: GenericRequest[T, R],
hr: HttpResponse,
wsFlow: Option[Promise[Flow[Message, Message, NotUsed]]]
r: GenericRequest[T, R],
hr: HttpResponse,
wsFlow: Option[Promise[Flow[Message, Message, NotUsed]]]
): Future[Response[T]] = {
val code = StatusCode(hr.status.intValue())
val statusText = hr.status.reason()
Expand All @@ -141,10 +140,9 @@ class AkkaHttpBackend private (
}

// http://doc.akka.io/docs/akka-http/10.0.7/scala/http/common/de-coding.html
private def decodeAkkaResponse(response: HttpResponse, disableAutoDecompression: Boolean): HttpResponse = {
private def decodeAkkaResponse(response: HttpResponse, disableAutoDecompression: Boolean): HttpResponse =
if (!response.status.allowsEntity() || disableAutoDecompression) response
else customEncodingHandler.orElse(EncodingHandler(standardEncoding)).apply(response -> response.encoding)
}

private def standardEncoding: (HttpResponse, HttpEncoding) => HttpResponse = {
case (body, HttpEncodings.gzip) => Coders.Gzip.decodeMessage(body)
Expand All @@ -156,15 +154,14 @@ class AkkaHttpBackend private (
private def adjustExceptions[T](request: GenericRequest[_, _])(t: => Future[T]): Future[T] =
SttpClientException.adjustExceptions(monad)(t)(FromAkka.exception(request, _))

override def close(): Future[Unit] = {
override def close(): Future[Unit] =
if (terminateActorSystemOnClose) {
CoordinatedShutdown(as).addTask(
CoordinatedShutdown.PhaseServiceRequestsDone,
"shut down all connection pools"
)(() => Http(as).shutdownAllConnectionPools.map(_ => Done))
actorSystem.terminate().map(_ => ())
} else Future.successful(())
}
}

object AkkaHttpBackend {
Expand Down Expand Up @@ -252,7 +249,7 @@ object AkkaHttpBackend {
customEncodingHandler: EncodingHandler = PartialFunction.empty
)(implicit
ec: Option[ExecutionContext] = None
): WebSocketStreamBackend[Future, AkkaStreams] = {
): WebSocketStreamBackend[Future, AkkaStreams] =
usingClient(
actorSystem,
options,
Expand All @@ -263,7 +260,6 @@ object AkkaHttpBackend {
customizeResponse,
customEncodingHandler
)
}

/** @param actorSystem
* The actor system which will be used for the http-client actors.
Expand All @@ -282,7 +278,7 @@ object AkkaHttpBackend {
customEncodingHandler: EncodingHandler = PartialFunction.empty
)(implicit
ec: Option[ExecutionContext] = None
): WebSocketStreamBackend[Future, AkkaStreams] = {
): WebSocketStreamBackend[Future, AkkaStreams] =
make(
actorSystem,
ec.getOrElse(actorSystem.dispatcher),
Expand All @@ -295,7 +291,6 @@ object AkkaHttpBackend {
customizeResponse,
customEncodingHandler
)
}

/** Create a stub backend for testing, which uses the [[Future]] response wrapper, and doesn't support streaming.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,13 @@ object AkkaHttpClient {
override def singleRequest(
request: HttpRequest,
settings: ConnectionPoolSettings
): Future[HttpResponse] = {
): Future[HttpResponse] =
http.singleRequest(
request,
connectionContext.getOrElse(http.defaultClientHttpsContext),
settings,
customLog.getOrElse(system.log)
)
}

override def singleWebsocketRequest[WS_RESULT](
request: WebSocketRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import scala.util.Failure

private[akkahttp] class BodyFromAkka()(implicit ec: ExecutionContext, mat: Materializer, m: MonadError[Future]) {
def apply[T, R](
responseAs: ResponseAsDelegate[T, R],
meta: ResponseMetadata,
response: Either[HttpResponse, Promise[Flow[Message, Message, NotUsed]]]
responseAs: ResponseAsDelegate[T, R],
meta: ResponseMetadata,
response: Either[HttpResponse, Promise[Flow[Message, Message, NotUsed]]]
): Future[T] =
bodyFromResponseAs(responseAs, meta, response)

Expand All @@ -40,16 +40,14 @@ private[akkahttp] class BodyFromAkka()(implicit ec: ExecutionContext, mat: Mater
Future.successful(response.copy(entity = replayEntity))
}

override protected def regularIgnore(response: HttpResponse): Future[Unit] = {
override protected def regularIgnore(response: HttpResponse): Future[Unit] =
// todo: Replace with HttpResponse#discardEntityBytes() once https://github.com/akka/akka-http/issues/1459 is resolved
response.entity.dataBytes.runWith(Sink.ignore).map(_ => ())
}

override protected def regularAsByteArray(response: HttpResponse): Future[Array[Byte]] = {
override protected def regularAsByteArray(response: HttpResponse): Future[Array[Byte]] =
response.entity.dataBytes
.runFold(ByteString(""))(_ ++ _)
.map(_.toArray[Byte])
}

override protected def regularAsFile(response: HttpResponse, file: SttpFile): Future[SttpFile] = {
val f = file.toFile
Expand All @@ -63,15 +61,14 @@ private[akkahttp] class BodyFromAkka()(implicit ec: ExecutionContext, mat: Mater

override protected def regularAsStream(
response: HttpResponse
): Future[(Source[ByteString, Any], () => Future[Unit])] = {
): Future[(Source[ByteString, Any], () => Future[Unit])] =
Future.successful(
(
response.entity.dataBytes,
// ignoring exceptions that occur when discarding (i.e. double-materialisation exceptions)
() => response.discardEntityBytes().future().map(_ => ()).recover { case _ => () }
)
)
}

override protected def handleWS[T](
responseAs: GenericWebSocketResponseAs[T, _],
Expand All @@ -92,7 +89,7 @@ private[akkahttp] class BodyFromAkka()(implicit ec: ExecutionContext, mat: Mater
rr: GenericWebSocketResponseAs[T, R],
wsFlow: Promise[Flow[Message, Message, NotUsed]],
meta: ResponseMetadata
)(implicit ec: ExecutionContext, mat: Materializer): Future[T] = {
)(implicit ec: ExecutionContext, mat: Materializer): Future[T] =
rr match {
case ResponseAsWebSocket(f) =>
val (flow, wsFuture) = webSocketAndFlow(meta)
Expand Down Expand Up @@ -126,7 +123,6 @@ private[akkahttp] class BodyFromAkka()(implicit ec: ExecutionContext, mat: Mater

donePromise.future.map(_ => ())
}
}

private def webSocketAndFlow(meta: ResponseMetadata)(implicit
ec: ExecutionContext,
Expand Down Expand Up @@ -210,13 +206,12 @@ private[akkahttp] class BodyFromAkka()(implicit ec: ExecutionContext, mat: Mater
msg.dataStream.runFold(ByteString.empty)(_ ++ _).map(b => WebSocketFrame.binary(b.toArray))
}

private def frameToMessage(w: WebSocketFrame): Option[Message] = {
private def frameToMessage(w: WebSocketFrame): Option[Message] =
w match {
case WebSocketFrame.Text(p, _, _) => Some(TextMessage(p))
case WebSocketFrame.Binary(p, _, _) => Some(BinaryMessage(ByteString(p)))
case WebSocketFrame.Ping(_) => None
case WebSocketFrame.Pong(_) => None
case WebSocketFrame.Close(_, _) => throw WebSocketClosed(None)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import akka.http.scaladsl.model.{
HttpEntity,
HttpRequest,
MediaType,
RequestEntity,
Multipart => AkkaMultipart
Multipart => AkkaMultipart,
RequestEntity
}
import akka.stream.scaladsl.{Source, StreamConverters}
import akka.util.ByteString
Expand All @@ -21,9 +21,9 @@ import scala.util.{Failure, Success, Try}

private[akkahttp] object BodyToAkka {
def apply[R](
r: GenericRequest[_, R],
body: GenericRequestBody[R],
ar: HttpRequest
r: GenericRequest[_, R],
body: GenericRequestBody[R],
ar: HttpRequest
): Try[HttpRequest] = {
def ctWithCharset(ct: ContentType, charset: String) =
HttpCharsets
Expand Down Expand Up @@ -53,9 +53,7 @@ private[akkahttp] object BodyToAkka {
for {
ct <- Util.parseContentTypeOrOctetStream(mp.contentType)
headers <- ToAkka.headers(mp.headers.toList)
} yield {
AkkaMultipart.FormData.BodyPart(mp.name, entity(ct), mp.dispositionParams, headers)
}
} yield AkkaMultipart.FormData.BodyPart(mp.name, entity(ct), mp.dispositionParams, headers)
}

def streamEntity(contentType: ContentType, s: AkkaStreams.BinaryStream) =
Expand Down Expand Up @@ -83,21 +81,20 @@ private[akkahttp] object BodyToAkka {
}

private def multipartEntity(
r: GenericRequest[_, _],
bodyParts: Seq[AkkaMultipart.FormData.BodyPart]
): Try[RequestEntity] = {
r: GenericRequest[_, _],
bodyParts: Seq[AkkaMultipart.FormData.BodyPart]
): Try[RequestEntity] =
r.headers.find(Util.isContentType) match {
case None => Success(AkkaMultipart.FormData(bodyParts: _*).toEntity())
case Some(ct) =>
Util.parseContentType(ct.value).map(_.mediaType).flatMap {
case m: MediaType.Multipart =>
Success(
AkkaMultipart
.General(m, Source(bodyParts.map { bp => AkkaMultipart.General.BodyPart(bp.entity, bp.headers) }))
.General(m, Source(bodyParts.map(bp => AkkaMultipart.General.BodyPart(bp.entity, bp.headers))))
.toEntity()
)
case _ => Failure(new RuntimeException(s"Non-multipart content type: $ct"))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,22 @@ private[akkahttp] object Util {
else Failure[Seq[T]](fs.head.exception)
}

def parseContentTypeOrOctetStream(r: GenericRequest[_, _]): Try[ContentType] = {
def parseContentTypeOrOctetStream(r: GenericRequest[_, _]): Try[ContentType] =
parseContentTypeOrOctetStream(
r.headers
.find(isContentType)
.map(_.value)
)
}

def parseContentTypeOrOctetStream(ctHeader: Option[String]): Try[ContentType] = {
def parseContentTypeOrOctetStream(ctHeader: Option[String]): Try[ContentType] =
ctHeader
.map(parseContentType)
.getOrElse(Success(`application/octet-stream`))
}

def parseContentType(ctHeader: String): Try[ContentType] = {
def parseContentType(ctHeader: String): Try[ContentType] =
ContentType
.parse(ctHeader)
.fold(errors => Failure(new RuntimeException(s"Cannot parse content type: $errors")), Success(_))
}

def isContentType(header: Header): Boolean =
header.name.toLowerCase.contains(`Content-Type`.lowercaseName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import sttp.client4.armeria.{AbstractArmeriaBackend, BodyFromStreamMessage}
import sttp.client4.impl.cats.CatsMonadAsyncError
import sttp.client4.internal.NoStreams
import sttp.client4.wrappers.FollowRedirectsBackend
import sttp.client4.{Backend, BackendOptions, wrappers}
import sttp.client4.{wrappers, Backend, BackendOptions}
import sttp.monad.MonadAsyncError

private final class ArmeriaCatsBackend[F[_]: Concurrent](client: WebClient, closeFactory: Boolean)
Expand Down Expand Up @@ -44,13 +44,11 @@ object ArmeriaCatsBackend {

def resource[F[_]: Concurrent](
options: BackendOptions = BackendOptions.Default
): Resource[F, Backend[F]] = {
): Resource[F, Backend[F]] =
Resource.make(Sync[F].delay(apply(newClient(options), closeFactory = true)))(_.close())
}

def resourceUsingClient[F[_]: Concurrent](client: WebClient): Resource[F, Backend[F]] = {
def resourceUsingClient[F[_]: Concurrent](client: WebClient): Resource[F, Backend[F]] =
Resource.make(Sync[F].delay(apply(client, closeFactory = true)))(_.close())
}

def usingDefaultClient[F[_]: Concurrent](): Backend[F] =
apply(newClient(), closeFactory = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import sttp.client4.armeria.{AbstractArmeriaBackend, BodyFromStreamMessage}
import sttp.client4.impl.cats.CatsMonadAsyncError
import sttp.client4.internal.NoStreams
import sttp.client4.wrappers.FollowRedirectsBackend
import sttp.client4.{Backend, BackendOptions, wrappers}
import sttp.client4.{wrappers, Backend, BackendOptions}
import sttp.monad.MonadAsyncError

private final class ArmeriaCatsBackend[F[_]: Async](client: WebClient, closeFactory: Boolean)
Expand Down Expand Up @@ -44,13 +44,11 @@ object ArmeriaCatsBackend {

def resource[F[_]: Async](
options: BackendOptions = BackendOptions.Default
): Resource[F, Backend[F]] = {
): Resource[F, Backend[F]] =
Resource.make(Sync[F].delay(apply(newClient(options), closeFactory = true)))(_.close())
}

def resourceUsingClient[F[_]: Async](client: WebClient): Resource[F, Backend[F]] = {
def resourceUsingClient[F[_]: Async](client: WebClient): Resource[F, Backend[F]] =
Resource.make(Sync[F].delay(apply(client, closeFactory = true)))(_.close())
}

def usingDefaultClient[F[_]: Async](): Backend[F] =
apply(newClient(), closeFactory = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import sttp.client4.armeria.ArmeriaWebClient.newClient
import sttp.client4.armeria.{AbstractArmeriaBackend, BodyFromStreamMessage}
import sttp.client4.impl.cats.CatsMonadAsyncError
import sttp.client4.wrappers.FollowRedirectsBackend
import sttp.client4.{BackendOptions, StreamBackend, wrappers}
import sttp.client4.{wrappers, BackendOptions, StreamBackend}
import sttp.monad.MonadAsyncError

private final class ArmeriaFs2Backend[F[_]: ConcurrentEffect](client: WebClient, closeFactory: Boolean)
Expand All @@ -32,12 +32,10 @@ private final class ArmeriaFs2Backend[F[_]: ConcurrentEffect](client: WebClient,
}

override protected def streamToPublisher(stream: Stream[F, Byte]): Publisher[HttpData] =
stream.chunks
.map(chunk => {
val bytes = chunk.toBytes
HttpData.wrap(bytes.values, bytes.offset, bytes.length)
})
.toUnicastPublisher
stream.chunks.map { chunk =>
val bytes = chunk.toBytes
HttpData.wrap(bytes.values, bytes.offset, bytes.length)
}.toUnicastPublisher
}

object ArmeriaFs2Backend {
Expand All @@ -53,13 +51,11 @@ object ArmeriaFs2Backend {

def resource[F[_]: ConcurrentEffect](
options: BackendOptions = BackendOptions.Default
): Resource[F, StreamBackend[F, Fs2Streams[F]]] = {
): Resource[F, StreamBackend[F, Fs2Streams[F]]] =
Resource.make(Sync[F].delay(apply(newClient(options), closeFactory = true)))(_.close())
}

def resourceUsingClient[F[_]: ConcurrentEffect](client: WebClient): Resource[F, StreamBackend[F, Fs2Streams[F]]] = {
def resourceUsingClient[F[_]: ConcurrentEffect](client: WebClient): Resource[F, StreamBackend[F, Fs2Streams[F]]] =
Resource.make(Sync[F].delay(apply(client, closeFactory = true)))(_.close())
}

def usingClient[F[_]: ConcurrentEffect](client: WebClient): StreamBackend[F, Fs2Streams[F]] =
apply(client, closeFactory = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import sttp.client4.armeria.ArmeriaWebClient.newClient
import sttp.client4.armeria.{AbstractArmeriaBackend, BodyFromStreamMessage}
import sttp.client4.impl.cats.CatsMonadAsyncError
import sttp.client4.wrappers.FollowRedirectsBackend
import sttp.client4.{BackendOptions, StreamBackend, wrappers}
import sttp.client4.{wrappers, BackendOptions, StreamBackend}
import sttp.monad.MonadAsyncError

private final class ArmeriaFs2Backend[F[_]: Async](client: WebClient, closeFactory: Boolean, dispatcher: Dispatcher[F])
Expand Down
Loading

0 comments on commit e7ed772

Please sign in to comment.