diff --git a/lib/stove-testing-e2e-http/src/main/kotlin/com/trendyol/stove/testing/e2e/http/streaming.kt b/lib/stove-testing-e2e-http/src/main/kotlin/com/trendyol/stove/testing/e2e/http/streaming.kt index aef0749f..d8ce593f 100644 --- a/lib/stove-testing-e2e-http/src/main/kotlin/com/trendyol/stove/testing/e2e/http/streaming.kt +++ b/lib/stove-testing-e2e-http/src/main/kotlin/com/trendyol/stove/testing/e2e/http/streaming.kt @@ -1,5 +1,6 @@ package com.trendyol.stove.testing.e2e.http +import arrow.core.toOption import com.trendyol.stove.testing.e2e.serialization.StoveSerde import io.ktor.client.statement.* import io.ktor.http.* @@ -13,10 +14,7 @@ fun HttpStatement.readJsonTextStream(transform: suspend (line: String) -> T) execute { check(it.status.isSuccess()) { "Request failed with status: ${it.status}" } while (!it.content.isClosedForRead) { - it.content.readUTF8Line()?.let { line -> - if (line.isBlank()) return@let - emit(transform(line)) - } + it.content.readUTF8LineNonEmpty { line -> emit(transform(line)) } } } } @@ -27,14 +25,15 @@ fun HttpStatement.readJsonContentStream(transform: suspend (line: ByteReadCh execute { check(it.status.isSuccess()) { "Request failed with status: ${it.status}" } while (!it.content.isClosedForRead) { - it.content.readUTF8Line()?.let { line -> - if (line.isBlank()) return@let - emit(transform(ByteReadChannel(line.toByteArray()))) - } + it.content.readUTF8LineNonEmpty { line -> emit(transform(ByteReadChannel(line.toByteArray()))) } } } } +private suspend fun ByteReadChannel.readUTF8LineNonEmpty(onRead: suspend (String) -> Unit) { + readUTF8Line().toOption().filter { it.isNotBlank() }.map { onRead(it) } +} + /** * Serializes the items to a stream of JSON strings. */