From 8df7342ce2a6a5d853a6e64c2948de2fdc221604 Mon Sep 17 00:00:00 2001 From: Oguzhan Soykan Date: Tue, 3 Dec 2024 13:23:09 +0100 Subject: [PATCH] chore(recipes): bump Stove to 1.0.0.452-SNAPSHOT --- recipes/gradle/libs.versions.toml | 2 +- .../kafka/KafkaBeanConfiguration.java | 13 ++++++++++-- .../kafka/KafkaDomainEventPublisher.java | 20 +++---------------- .../example/java/spring/e2e/setup/Stove.kt | 13 ++++++++---- .../kafka/KafkaDomainEventPublisher.kt | 6 ++---- .../ktor/infra/boilerplate/kafka/kafka.kt | 2 +- .../examples/kotlin/ktor/e2e/setup/Stove.kt | 12 ++++++----- .../kotlin/spring/e2e/tests/StreamingTests.kt | 6 +++--- 8 files changed, 37 insertions(+), 37 deletions(-) diff --git a/recipes/gradle/libs.versions.toml b/recipes/gradle/libs.versions.toml index 52250583..c228c03a 100644 --- a/recipes/gradle/libs.versions.toml +++ b/recipes/gradle/libs.versions.toml @@ -70,7 +70,7 @@ hoplite = "2.9.0" kediatr = "3.1.2" # Testing -stove = "1.0.0.450-SNAPSHOT" +stove = "1.0.0.452-SNAPSHOT" junit = "5.11.3" kotest = "5.9.1" diff --git a/recipes/java-recipes/spring-boot-recipe/src/main/java/com/trendyol/stove/examples/java/spring/infra/boilerplate/kafka/KafkaBeanConfiguration.java b/recipes/java-recipes/spring-boot-recipe/src/main/java/com/trendyol/stove/examples/java/spring/infra/boilerplate/kafka/KafkaBeanConfiguration.java index b12cc70e..fecf759d 100644 --- a/recipes/java-recipes/spring-boot-recipe/src/main/java/com/trendyol/stove/examples/java/spring/infra/boilerplate/kafka/KafkaBeanConfiguration.java +++ b/recipes/java-recipes/spring-boot-recipe/src/main/java/com/trendyol/stove/examples/java/spring/infra/boilerplate/kafka/KafkaBeanConfiguration.java @@ -15,6 +15,9 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.util.backoff.FixedBackOff; @Configuration @@ -51,10 +54,16 @@ public Properties consumerProperties(KafkaConfiguration kafkaConfiguration) { ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, kafkaConfiguration.isAutoCreateTopics()); properties.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConfiguration.getAutoOffsetReset()); + properties.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class.getName()); + properties.put( + ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName()); + properties.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); + properties.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Object.class.getName()); + properties.put( ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, kafkaConfiguration.flattenInterceptorClasses()); logger.info("Kafka consumer properties: {}", properties); @@ -67,7 +76,7 @@ public Properties producerProperties(KafkaConfiguration kafkaConfiguration) { properties.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBootstrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); properties.put( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, kafkaConfiguration.flattenInterceptorClasses()); return properties; diff --git a/recipes/java-recipes/spring-boot-recipe/src/main/java/com/trendyol/stove/examples/java/spring/infra/boilerplate/kafka/KafkaDomainEventPublisher.java b/recipes/java-recipes/spring-boot-recipe/src/main/java/com/trendyol/stove/examples/java/spring/infra/boilerplate/kafka/KafkaDomainEventPublisher.java index c1737c5f..838bad2e 100644 --- a/recipes/java-recipes/spring-boot-recipe/src/main/java/com/trendyol/stove/examples/java/spring/infra/boilerplate/kafka/KafkaDomainEventPublisher.java +++ b/recipes/java-recipes/spring-boot-recipe/src/main/java/com/trendyol/stove/examples/java/spring/infra/boilerplate/kafka/KafkaDomainEventPublisher.java @@ -1,13 +1,10 @@ package com.trendyol.stove.examples.java.spring.infra.boilerplate.kafka; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.trendyol.stove.examples.domain.ddd.AggregateRoot; import com.trendyol.stove.examples.domain.ddd.EventPublisher; import java.util.stream.Stream; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @@ -15,16 +12,12 @@ public class KafkaDomainEventPublisher implements EventPublisher { private final KafkaTemplate template; private final TopicResolver topicResolver; - private final ObjectMapper objectMapper; private final Logger logger = org.slf4j.LoggerFactory.getLogger(KafkaDomainEventPublisher.class); public KafkaDomainEventPublisher( - KafkaTemplate template, - TopicResolver topicResolver, - @Qualifier("objectMapper") ObjectMapper objectMapper) { + KafkaTemplate template, TopicResolver topicResolver) { this.template = template; this.topicResolver = topicResolver; - this.objectMapper = objectMapper; } @Override @@ -38,15 +31,8 @@ private Stream> mapEventsToProducerRecords( .map( event -> { var topic = topicResolver.resolve(aggregateRoot.getAggregateName()); - try { - logger.info("Publishing event {} to topic {}", event, topic.getName()); - return new ProducerRecord<>( - topic.getName(), - aggregateRoot.getIdAsString(), - objectMapper.writeValueAsString(event)); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + logger.info("Publishing event {} to topic {}", event, topic.getName()); + return new ProducerRecord<>(topic.getName(), aggregateRoot.getIdAsString(), event); }); } } diff --git a/recipes/java-recipes/spring-boot-recipe/src/test-e2e/kotlin/com/trendyol/stove/example/java/spring/e2e/setup/Stove.kt b/recipes/java-recipes/spring-boot-recipe/src/test-e2e/kotlin/com/trendyol/stove/example/java/spring/e2e/setup/Stove.kt index 4a7f4b45..d7e1ec67 100644 --- a/recipes/java-recipes/spring-boot-recipe/src/test-e2e/kotlin/com/trendyol/stove/example/java/spring/e2e/setup/Stove.kt +++ b/recipes/java-recipes/spring-boot-recipe/src/test-e2e/kotlin/com/trendyol/stove/example/java/spring/e2e/setup/Stove.kt @@ -1,5 +1,6 @@ package com.trendyol.stove.example.java.spring.e2e.setup +import com.couchbase.client.kotlin.codec.JacksonJsonSerializer import com.trendyol.stove.example.java.spring.e2e.setup.migrations.CouchbaseMigration import com.trendyol.stove.examples.java.spring.ExampleSpringBootApp import com.trendyol.stove.examples.java.spring.infra.boilerplate.serialization.JacksonConfiguration @@ -7,10 +8,13 @@ import com.trendyol.stove.examples.java.spring.infra.components.product.persiste import com.trendyol.stove.testing.e2e.* import com.trendyol.stove.testing.e2e.couchbase.* import com.trendyol.stove.testing.e2e.http.* +import com.trendyol.stove.testing.e2e.serialization.StoveSerde import com.trendyol.stove.testing.e2e.standalone.kafka.* import com.trendyol.stove.testing.e2e.system.TestSystem import com.trendyol.stove.testing.e2e.wiremock.* import io.kotest.core.config.AbstractProjectConfig +import io.ktor.serialization.jackson.* +import org.springframework.kafka.support.serializer.JsonSerializer class Stove : AbstractProjectConfig() { override suspend fun beforeProject() { @@ -18,7 +22,7 @@ class Stove : AbstractProjectConfig() { httpClient { HttpClientSystemOptions( baseUrl = "http://localhost:8080", - objectMapper = JacksonConfiguration.defaultObjectMapper() + contentConverter = JacksonConverter(JacksonConfiguration.defaultObjectMapper()) ) } @@ -26,7 +30,7 @@ class Stove : AbstractProjectConfig() { wiremock { WireMockSystemOptions( port = 9090, - objectMapper = JacksonConfiguration.defaultObjectMapper() + serde = StoveSerde.jackson.anyByteArraySerde(JacksonConfiguration.defaultObjectMapper()) ) } couchbase { @@ -36,7 +40,7 @@ class Stove : AbstractProjectConfig() { withStartupAttempts(3) dockerImageName = "couchbase/server:7.2.5" }, - objectMapper = JacksonConfiguration.defaultObjectMapper(), + clusterSerDe = JacksonJsonSerializer(JacksonConfiguration.defaultObjectMapper()), configureExposedConfiguration = { cfg -> listOf( "couchbase.bucket=${CollectionConstants.BUCKET_NAME}", @@ -53,7 +57,8 @@ class Stove : AbstractProjectConfig() { kafka { KafkaSystemOptions( - objectMapper = JacksonConfiguration.defaultObjectMapper(), + serde = StoveSerde.jackson.anyByteArraySerde(JacksonConfiguration.defaultObjectMapper()), + valueSerializer = JsonSerializer(JacksonConfiguration.defaultObjectMapper()), containerOptions = KafkaContainerOptions { withStartupAttempts(3) }, diff --git a/recipes/kotlin-recipes/ktor-recipe/src/main/kotlin/com/trendyol/stove/examples/kotlin/ktor/infra/boilerplate/kafka/KafkaDomainEventPublisher.kt b/recipes/kotlin-recipes/ktor-recipe/src/main/kotlin/com/trendyol/stove/examples/kotlin/ktor/infra/boilerplate/kafka/KafkaDomainEventPublisher.kt index d8c8d554..88e00f88 100644 --- a/recipes/kotlin-recipes/ktor-recipe/src/main/kotlin/com/trendyol/stove/examples/kotlin/ktor/infra/boilerplate/kafka/KafkaDomainEventPublisher.kt +++ b/recipes/kotlin-recipes/ktor-recipe/src/main/kotlin/com/trendyol/stove/examples/kotlin/ktor/infra/boilerplate/kafka/KafkaDomainEventPublisher.kt @@ -1,6 +1,5 @@ package com.trendyol.stove.examples.kotlin.ktor.infra.boilerplate.kafka -import com.fasterxml.jackson.databind.ObjectMapper import com.trendyol.stove.examples.domain.ddd.* import io.github.nomisRev.kafka.publisher.KafkaPublisher import kotlinx.coroutines.runBlocking @@ -9,8 +8,7 @@ import org.slf4j.* class KafkaDomainEventPublisher( private val publisher: KafkaPublisher, - private val topicResolver: TopicResolver, - private val objectMapper: ObjectMapper + private val topicResolver: TopicResolver ) : EventPublisher { private val logger: Logger = LoggerFactory.getLogger(KafkaDomainEventPublisher::class.java) @@ -28,7 +26,7 @@ class KafkaDomainEventPublisher( ProducerRecord( topic.name, aggregateRoot.idAsString, - objectMapper.writeValueAsString(event) + event ) } } diff --git a/recipes/kotlin-recipes/ktor-recipe/src/main/kotlin/com/trendyol/stove/examples/kotlin/ktor/infra/boilerplate/kafka/kafka.kt b/recipes/kotlin-recipes/ktor-recipe/src/main/kotlin/com/trendyol/stove/examples/kotlin/ktor/infra/boilerplate/kafka/kafka.kt index 3ccd5142..0902a5f5 100644 --- a/recipes/kotlin-recipes/ktor-recipe/src/main/kotlin/com/trendyol/stove/examples/kotlin/ktor/infra/boilerplate/kafka/kafka.kt +++ b/recipes/kotlin-recipes/ktor-recipe/src/main/kotlin/com/trendyol/stove/examples/kotlin/ktor/infra/boilerplate/kafka/kafka.kt @@ -22,7 +22,7 @@ fun KoinApplication.registerKafka(kafkaConfiguration: KafkaConfiguration) { single { kafkaPublisher(get()) } single { kafkaReceiver(get()) } single { ConsumerEngine(getAll()) } - single { KafkaDomainEventPublisher(get(), get(), get()) }.bind() + single { KafkaDomainEventPublisher(get(), get()) }.bind() single { TopicResolver(get()) } } ) diff --git a/recipes/kotlin-recipes/ktor-recipe/src/test-e2e/kotlin/com/trendyol/stove/examples/kotlin/ktor/e2e/setup/Stove.kt b/recipes/kotlin-recipes/ktor-recipe/src/test-e2e/kotlin/com/trendyol/stove/examples/kotlin/ktor/e2e/setup/Stove.kt index 458a3a6a..07761474 100644 --- a/recipes/kotlin-recipes/ktor-recipe/src/test-e2e/kotlin/com/trendyol/stove/examples/kotlin/ktor/e2e/setup/Stove.kt +++ b/recipes/kotlin-recipes/ktor-recipe/src/test-e2e/kotlin/com/trendyol/stove/examples/kotlin/ktor/e2e/setup/Stove.kt @@ -6,13 +6,15 @@ import com.trendyol.stove.examples.kotlin.ktor.infra.components.product.persiste import com.trendyol.stove.testing.e2e.* import com.trendyol.stove.testing.e2e.http.* import com.trendyol.stove.testing.e2e.mongodb.* +import com.trendyol.stove.testing.e2e.serialization.StoveSerde import com.trendyol.stove.testing.e2e.standalone.kafka.* import com.trendyol.stove.testing.e2e.system.TestSystem import com.trendyol.stove.testing.e2e.wiremock.* import io.kotest.core.config.AbstractProjectConfig +import io.ktor.serialization.jackson.* import org.koin.dsl.module -private val database = "stove-kotlin-ktor" +private const val DATABASE = "stove-kotlin-ktor" class Stove : AbstractProjectConfig() { override suspend fun beforeProject() { @@ -20,7 +22,7 @@ class Stove : AbstractProjectConfig() { httpClient { HttpClientSystemOptions( baseUrl = "http://localhost:8081", - objectMapper = JacksonConfiguration.default + contentConverter = JacksonConverter(JacksonConfiguration.default) ) } bridge() @@ -31,6 +33,7 @@ class Stove : AbstractProjectConfig() { } kafka { KafkaSystemOptions( + serde = StoveSerde.jackson.anyByteArraySerde(JacksonConfiguration.default), configureExposedConfiguration = { cfg -> listOf( "kafka.bootstrapServers=${cfg.bootstrapServers}", @@ -41,12 +44,11 @@ class Stove : AbstractProjectConfig() { } mongodb { MongodbSystemOptions( - databaseOptions = DatabaseOptions(DatabaseOptions.DefaultDatabase(database, collection = PRODUCT_COLLECTION)), + databaseOptions = DatabaseOptions(DatabaseOptions.DefaultDatabase(DATABASE, collection = PRODUCT_COLLECTION)), container = MongoContainerOptions(), - objectMapper = JacksonConfiguration.default, configureExposedConfiguration = { cfg -> listOf( - "mongo.database=$database", + "mongo.database=$DATABASE", "mongo.uri=${cfg.connectionString}/?retryWrites=true&w=majority" ) } diff --git a/recipes/kotlin-recipes/spring-boot-recipe/src/test-e2e/kotlin/com/trendyol/stove/examples/kotlin/spring/e2e/tests/StreamingTests.kt b/recipes/kotlin-recipes/spring-boot-recipe/src/test-e2e/kotlin/com/trendyol/stove/examples/kotlin/spring/e2e/tests/StreamingTests.kt index cac930ef..4e61b9c8 100644 --- a/recipes/kotlin-recipes/spring-boot-recipe/src/test-e2e/kotlin/com/trendyol/stove/examples/kotlin/spring/e2e/tests/StreamingTests.kt +++ b/recipes/kotlin-recipes/spring-boot-recipe/src/test-e2e/kotlin/com/trendyol/stove/examples/kotlin/spring/e2e/tests/StreamingTests.kt @@ -1,9 +1,9 @@ package com.trendyol.stove.examples.kotlin.spring.e2e.tests -import com.fasterxml.jackson.module.kotlin.readValue import com.trendyol.stove.examples.kotlin.spring.ExampleData import com.trendyol.stove.testing.e2e.http.http -import com.trendyol.stove.testing.e2e.serialization.StoveObjectMapper +import com.trendyol.stove.testing.e2e.serialization.StoveSerde +import com.trendyol.stove.testing.e2e.serialization.StoveSerde.Companion.deserialize import com.trendyol.stove.testing.e2e.system.TestSystem.Companion.validate import io.kotest.core.spec.style.FunSpec import io.ktor.client.* @@ -32,7 +32,7 @@ class StreamingTests : FunSpec({ contentType(ContentType.parse("application/x-ndjson")) }.also { response -> response.readJsonStream { line -> - StoveObjectMapper.Default.readValue(line) + StoveSerde.jackson.anyJsonStringSerde().deserialize(line) }.collect { data -> println(data) }