Skip to content

Commit

Permalink
spring-kafka: ser/de (#658)
Browse files Browse the repository at this point in the history
* Improvements for spring-kafka to work with StoveSerde interface
* add protobuf tests for spring-kafka to make sure that ser/de abstraction works when needed
  • Loading branch information
osoykan authored Nov 29, 2024
1 parent 36b64d1 commit 60e8b52
Show file tree
Hide file tree
Showing 18 changed files with 914 additions and 358 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.stove.spring.example.e2e

import com.trendyol.stove.testing.e2e.BaseApplicationContextInitializer
import com.trendyol.stove.testing.e2e.kafka.TestSystemKafkaInterceptor
import com.trendyol.stove.testing.e2e.serialization.*
import org.springframework.boot.SpringApplication

fun SpringApplication.addTestSystemDependencies() {
Expand All @@ -10,4 +11,5 @@ fun SpringApplication.addTestSystemDependencies() {

class TestSystemInitializer : BaseApplicationContextInitializer({
bean<TestSystemKafkaInterceptor<*, *>>(isPrimary = true)
bean { StoveSerde.jackson.anyByteArraySerde() }
})
2 changes: 1 addition & 1 deletion examples/spring-streams-example/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies {
implementation(libs.kafka.streams)
implementation(libs.kotlin.reflect)
implementation(libs.google.protobuf.kotlin)
implementation(libs.kafka.streams.registry)
implementation(libs.kafka.streams.protobuf.serde)
}

dependencies {
Expand Down
6 changes: 4 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ mockito = "5.4.0"
quiver = "0.5.12"
akkurate = "0.10.0"
exposed = "0.56.0"
kotlinx-serialization = "1.7.3"

[libraries]
kotlin-stdlib-jdk8 = { module = "org.jetbrains.kotlin:kotlin-stdlib-jdk8", version.ref = "kotlin" }
Expand All @@ -61,7 +62,8 @@ kotlinx-slf4j = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-slf4j", ver
kotlinx-knit = { module = "org.jetbrains.kotlinx:kotlinx-knit", version.ref = "knit" }
kotlinx-io-reactor = { module = "io.projectreactor:reactor-core", version.ref = "io-reactor" }
kotlinx-io-reactor-extensions = { module = "io.projectreactor.kotlin:reactor-kotlin-extensions", version.ref = "io-reactor-extensions" }
kotlinx-serialization = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json-jvm", version = "1.7.3" }
kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json-jvm", version.ref = "kotlinx-serialization" }
kotlinx-serialization-protobuf = { module = "org.jetbrains.kotlinx:kotlinx-serialization-protobuf", version.ref = "kotlinx-serialization" }

# Arrow
arrow-core = { module = "io.arrow-kt:arrow-core", version.ref = "arrow" }
Expand All @@ -86,7 +88,7 @@ akkurate-ksp-plugin = { module = "dev.nesk.akkurate:akkurate-ksp-plugin", versio
kafka = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka" }
kafkaKotlin = { module = "io.github.nomisrev:kotlin-kafka", version.ref = "kafka-kotlin" }
kafka-streams = { module = "org.apache.kafka:kafka-streams", version.ref = "kafka" }
kafka-streams-registry = { module = "io.confluent:kafka-streams-protobuf-serde", version.ref = "kafka-streams-registry" }
kafka-streams-protobuf-serde = { module = "io.confluent:kafka-streams-protobuf-serde", version.ref = "kafka-streams-registry" }

# Couchbase
couchbase-kotlin = { module = "com.couchbase.client:kotlin-client", version.ref = "couchbase-kotlin" }
Expand Down
2 changes: 1 addition & 1 deletion lib/stove-testing-e2e/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ dependencies {
api(libs.jackson.kotlin)
api(libs.jackson.arrow)
api(libs.google.gson)
api(libs.kotlinx.serialization)
api(libs.kotlinx.serialization.json)
api(libs.testcontainers) {
version {
require(libs.testcontainers.asProvider().get().version!!)
Expand Down
26 changes: 25 additions & 1 deletion starters/spring/stove-spring-testing-e2e-kafka/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import com.google.protobuf.gradle.id

plugins {
alias(libs.plugins.protobuf)
}

dependencies {
api(projects.lib.stoveTestingE2e)
api(libs.testcontainers.kafka)
Expand All @@ -10,5 +16,23 @@ dependencies {
testAnnotationProcessor(libs.spring.boot.annotationProcessor)
testImplementation(libs.spring.boot.autoconfigure)
testImplementation(projects.starters.spring.stoveSpringTestingE2e)
testImplementation(libs.slf4j.simple)
testImplementation(libs.logback.classic)
testImplementation(libs.google.protobuf.kotlin)
testImplementation(libs.kafka.streams.protobuf.serde)
}

protobuf {
protoc {
artifact = libs.protoc.get().toString()
}

generateProtoTasks {
all().forEach {
it.descriptorSetOptions.includeSourceInfo = true
it.descriptorSetOptions.includeImports = true
it.builtins { id("kotlin") }
}
}
}


Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.trendyol.stove.testing.e2e.kafka

import arrow.core.Option
import com.fasterxml.jackson.databind.ObjectMapper
import com.trendyol.stove.testing.e2e.messaging.MessageMetadata
import com.trendyol.stove.testing.e2e.serialization.StoveSerde
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord

Expand All @@ -19,33 +19,36 @@ internal fun <K, V> ConsumerRecord<K, V>.toMetadata(): MessageMetadata = Message
)

internal fun <K, V> ConsumerRecord<K, V>.toStoveMessage(
objectMapper: ObjectMapper
): StoveMessage.StoveConsumedMessage =
StoveMessage.StoveConsumedMessage(
this.topic(),
serializeIfNotString(this.value(), objectMapper),
this.toMetadata(),
this.offset(),
this.partition(),
this.key().toString(),
this.timestamp()
)
serde: StoveSerde<Any, ByteArray>
): StoveMessage.Consumed = StoveMessage.consumed(
this.topic(),
serializeIfNotYet(this.value(), serde),
this.toMetadata(),
this.partition(),
this.key()?.toString() ?: "",
this.timestamp(),
this.offset()
)

internal fun <K, V> ProducerRecord<K, V>.toStoveMessage(
objectMapper: ObjectMapper
): StoveMessage.StovePublishedMessage = StoveMessage.StovePublishedMessage(
serde: StoveSerde<Any, ByteArray>
): StoveMessage.Published = StoveMessage.published(
this.topic(),
serializeIfNotString(this.value(), objectMapper),
serializeIfNotYet(this.value(), serde),
this.toMetadata(),
this.partition(),
this.key().toString(),
this.key()?.toString() ?: "",
this.timestamp()
)

private fun <V> serializeIfNotString(
private fun <V> serializeIfNotYet(
value: V,
objectMapper: ObjectMapper
): String = if (value is String) value else objectMapper.writeValueAsString(value)
serde: StoveSerde<Any, ByteArray>
): ByteArray = when (value) {
is ByteArray -> value
is String -> value.toByteArray()
else -> serde.serialize(value as Any)
}

internal fun (MutableMap<String, String>).addTestCase(testCase: Option<String>): MutableMap<String, String> =
if (this.containsKey("testCase")) this else testCase.map { this["testCase"] = it }.let { this }
Loading

0 comments on commit 60e8b52

Please sign in to comment.