diff --git a/examples/ktor-example/src/test/kotlin/com/stove/ktor/example/e2e/Stove.kt b/examples/ktor-example/src/test/kotlin/com/stove/ktor/example/e2e/Stove.kt index 691e1b3f6..4798ab287 100644 --- a/examples/ktor-example/src/test/kotlin/com/stove/ktor/example/e2e/Stove.kt +++ b/examples/ktor-example/src/test/kotlin/com/stove/ktor/example/e2e/Stove.kt @@ -3,6 +3,7 @@ package com.stove.ktor.example.e2e import com.trendol.stove.testing.e2e.rdbms.postgres.* import com.trendyol.stove.testing.e2e.* import com.trendyol.stove.testing.e2e.http.* +import com.trendyol.stove.testing.e2e.serialization.StoveSerde import com.trendyol.stove.testing.e2e.standalone.kafka.* import com.trendyol.stove.testing.e2e.system.TestSystem import io.kotest.core.config.AbstractProjectConfig @@ -40,8 +41,9 @@ class Stove : AbstractProjectConfig() { }) } kafka { - stoveKafkaObjectMapperRef = objectMapperRef - KafkaSystemOptions { + KafkaSystemOptions( + serde = StoveSerde.jackson.anyByteArraySerde(objectMapperRef) + ) { listOf( "kafka.bootstrapServers=${it.bootstrapServers}", "kafka.interceptorClasses=${it.interceptorClass}" diff --git a/examples/spring-example/src/main/kotlin/stove/spring/example/infrastructure/messaging/kafka/configuration/ConsumerSettings.kt b/examples/spring-example/src/main/kotlin/stove/spring/example/infrastructure/messaging/kafka/configuration/ConsumerSettings.kt index dc23a24f3..05cb6b233 100644 --- a/examples/spring-example/src/main/kotlin/stove/spring/example/infrastructure/messaging/kafka/configuration/ConsumerSettings.kt +++ b/examples/spring-example/src/main/kotlin/stove/spring/example/infrastructure/messaging/kafka/configuration/ConsumerSettings.kt @@ -39,9 +39,10 @@ class DefaultConsumerSettings( props[ConsumerConfig.CLIENT_ID_CONFIG] = kafkaProperties.createClientId() props[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = kafkaProperties.autoCreateTopics props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaProperties.bootstrapServers - props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java + props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java + props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java + props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java props[JsonDeserializer.TRUSTED_PACKAGES] = "*" props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = kafkaProperties.offset props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = true diff --git a/examples/spring-example/src/test/kotlin/com/stove/spring/example/e2e/Stove.kt b/examples/spring-example/src/test/kotlin/com/stove/spring/example/e2e/Stove.kt index b4a5ff43f..f28a1960e 100644 --- a/examples/spring-example/src/test/kotlin/com/stove/spring/example/e2e/Stove.kt +++ b/examples/spring-example/src/test/kotlin/com/stove/spring/example/e2e/Stove.kt @@ -38,8 +38,7 @@ class Stove : AbstractProjectConfig() { } kafka { KafkaSystemOptions( - containerOptions = KafkaContainerOptions(tag = "latest") { - } + containerOptions = KafkaContainerOptions(tag = "latest") { } ) { listOf( "kafka.bootstrapServers=${it.bootstrapServers}", diff --git a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/application/handlers/ProductCreator.kt b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/application/handlers/ProductCreator.kt index 740aa334d..1645a2ba9 100644 --- a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/application/handlers/ProductCreator.kt +++ b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/application/handlers/ProductCreator.kt @@ -35,7 +35,7 @@ class ProductCreator( key = req.id.toString(), headers = mapOf(Headers.EVENT_TYPE to ProductCreatedEvent::class.simpleName!!), partition = 0, - payload = objectMapper.writeValueAsString(req.mapToProductCreatedEvent()) + payload = req.mapToProductCreatedEvent() ) ) return "OK" diff --git a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/ObjectMapperConfig.kt b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/ObjectMapperConfig.kt index 067083c6a..4d607d92e 100644 --- a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/ObjectMapperConfig.kt +++ b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/ObjectMapperConfig.kt @@ -2,23 +2,28 @@ package stove.spring.standalone.example.infrastructure import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.databind.* -import com.fasterxml.jackson.databind.module.SimpleModule import com.fasterxml.jackson.module.kotlin.KotlinModule +import org.springframework.boot.autoconfigure.AutoConfigureBefore +import org.springframework.boot.autoconfigure.jackson.* import org.springframework.context.annotation.* @Configuration +@AutoConfigureBefore(JacksonAutoConfiguration::class) class ObjectMapperConfig { companion object { - fun createObjectMapperWithDefaults(): ObjectMapper { - val isoInstantModule = SimpleModule() - return ObjectMapper() - .registerModule(KotlinModule.Builder().build()) - .registerModule(isoInstantModule) - .setSerializationInclusion(JsonInclude.Include.NON_NULL) - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) - } + val default: ObjectMapper = ObjectMapper() + .registerModule(KotlinModule.Builder().build()) + .findAndRegisterModules() + .setSerializationInclusion(JsonInclude.Include.NON_NULL) + .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) } @Bean - fun objectMapper(): ObjectMapper = createObjectMapperWithDefaults() + @Primary + fun objectMapper(): ObjectMapper = default + + @Bean + fun jacksonCustomizer(): Jackson2ObjectMapperBuilderCustomizer = Jackson2ObjectMapperBuilderCustomizer { builder -> + builder.factory(default.factory) + } } diff --git a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/couchbase/CouchbaseConfiguration.kt b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/couchbase/CouchbaseConfiguration.kt index 538fe9d48..509072556 100644 --- a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/couchbase/CouchbaseConfiguration.kt +++ b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/couchbase/CouchbaseConfiguration.kt @@ -19,9 +19,7 @@ class CouchbaseConfiguration( private val meterRegistry: MeterRegistry ) { companion object { - val objectMapper: ObjectMapper = - ObjectMapperConfig.createObjectMapperWithDefaults() - .registerModule(JsonValueModule()) + val objectMapper: ObjectMapper = ObjectMapperConfig.default.registerModule(JsonValueModule()) } @Primary diff --git a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/http/WebClientConfiguration.kt b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/http/WebClientConfiguration.kt index 0b13d74b7..0292d3c37 100644 --- a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/http/WebClientConfiguration.kt +++ b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/http/WebClientConfiguration.kt @@ -1,6 +1,6 @@ package stove.spring.standalone.example.infrastructure.http -import com.fasterxml.jackson.databind.* +import com.fasterxml.jackson.databind.ObjectMapper import io.netty.channel.ChannelOption import io.netty.handler.timeout.ReadTimeoutHandler import org.springframework.boot.context.properties.EnableConfigurationProperties @@ -11,7 +11,6 @@ import org.springframework.http.codec.ClientCodecConfigurer import org.springframework.http.codec.json.* import org.springframework.web.reactive.function.client.* import reactor.netty.http.client.HttpClient -import stove.spring.standalone.example.infrastructure.ObjectMapperConfig import java.util.concurrent.TimeUnit @Configuration @@ -31,12 +30,6 @@ class WebClientConfiguration(private val webClientConfigurationProperties: WebCl .exchangeStrategies(exchangeStrategies) .build() - @Bean - fun webClientObjectMapper(): ObjectMapper { - return ObjectMapperConfig.createObjectMapperWithDefaults() - .configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, false) - } - @Bean fun exchangeStrategies(webClientObjectMapper: ObjectMapper): ExchangeStrategies { return ExchangeStrategies diff --git a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/KafkaProducer.kt b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/KafkaProducer.kt index 4e9eb8f50..2e3b766d1 100644 --- a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/KafkaProducer.kt +++ b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/KafkaProducer.kt @@ -7,10 +7,10 @@ import org.slf4j.* import org.springframework.kafka.core.KafkaTemplate import org.springframework.stereotype.Component -data class KafkaOutgoingMessage( +data class KafkaOutgoingMessage( val topic: String, - val key: String?, - val payload: String, + val key: K, + val payload: V, val headers: Map, val partition: Int? = null ) @@ -21,7 +21,7 @@ class KafkaProducer( ) { private val logger: Logger = LoggerFactory.getLogger(KafkaProducer::class.java) - suspend fun send(message: KafkaOutgoingMessage) { + suspend fun send(message: KafkaOutgoingMessage) { val recordHeaders = message.headers.map { RecordHeader(it.key, it.value.toByteArray()) } val record = ProducerRecord( message.topic, diff --git a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/configuration/ConsumerSettings.kt b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/configuration/ConsumerSettings.kt index 209a67fef..19d14f57c 100644 --- a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/configuration/ConsumerSettings.kt +++ b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/configuration/ConsumerSettings.kt @@ -4,6 +4,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.springframework.beans.factory.annotation.Value import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.kafka.support.serializer.* import org.springframework.stereotype.Component import java.time.Duration @@ -37,8 +38,13 @@ class DefaultConsumerSettings( props[ConsumerConfig.CLIENT_ID_CONFIG] = kafkaProperties.createClientId() props[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = kafkaProperties.autoCreateTopics props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaProperties.bootstrapServers - props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java + props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java + props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java + props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java + props[JsonDeserializer.TRUSTED_PACKAGES] = "*" + props[JsonDeserializer.REMOVE_TYPE_INFO_HEADERS] = false + props[JsonDeserializer.VALUE_DEFAULT_TYPE] = Any::class.java props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = kafkaProperties.offset props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = true props[ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG] = ofSeconds(AUTO_COMMIT_INTERVAL) diff --git a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/configuration/KafkaConsumerConfiguration.kt b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/configuration/KafkaConsumerConfiguration.kt index 32b9f1bd4..ea3676938 100644 --- a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/configuration/KafkaConsumerConfiguration.kt +++ b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/configuration/KafkaConsumerConfiguration.kt @@ -24,7 +24,6 @@ class KafkaConsumerConfiguration( factory.setConcurrency(1) factory.consumerFactory = consumerFactory factory.containerProperties.isDeliveryAttemptHeader = true - factory.setRecordMessageConverter(stringJsonMessageConverter()) val errorHandler = DefaultErrorHandler( DeadLetterPublishingRecoverer(kafkaTemplate), FixedBackOff(0, 0) @@ -41,7 +40,6 @@ class KafkaConsumerConfiguration( ): ConcurrentKafkaListenerContainerFactory { val factory = ConcurrentKafkaListenerContainerFactory() factory.setConcurrency(1) - factory.setRecordMessageConverter(stringJsonMessageConverter()) factory.containerProperties.isDeliveryAttemptHeader = true factory.consumerFactory = consumerRetryFactory val errorHandler = DefaultErrorHandler( @@ -54,14 +52,12 @@ class KafkaConsumerConfiguration( } @Bean - fun consumerFactory(consumerSettings: ConsumerSettings): ConsumerFactory { - return DefaultKafkaConsumerFactory(consumerSettings.settings()) - } + fun consumerFactory(consumerSettings: ConsumerSettings): ConsumerFactory = + DefaultKafkaConsumerFactory(consumerSettings.settings()) @Bean - fun consumerRetryFactory(consumerSettings: ConsumerSettings): ConsumerFactory { - return DefaultKafkaConsumerFactory(consumerSettings.settings()) - } + fun consumerRetryFactory(consumerSettings: ConsumerSettings): ConsumerFactory = + DefaultKafkaConsumerFactory(consumerSettings.settings()) @Bean fun stringJsonMessageConverter(): StringJsonMessageConverter = StringJsonMessageConverter(objectMapper) diff --git a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/configuration/ProducerSettings.kt b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/configuration/ProducerSettings.kt index 60d9fb740..ff7a5e320 100644 --- a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/configuration/ProducerSettings.kt +++ b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/configuration/ProducerSettings.kt @@ -3,6 +3,7 @@ package stove.spring.standalone.example.infrastructure.messaging.kafka.configura import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringSerializer import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.kafka.support.serializer.JsonSerializer import org.springframework.stereotype.Component import stove.spring.standalone.example.infrastructure.messaging.kafka.interceptors.CustomProducerInterceptor @@ -15,7 +16,7 @@ class DefaultProducerSettings(private val kafkaProperties: KafkaProperties) : Pr val props: MutableMap = HashMap() props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaProperties.bootstrapServers props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java - props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java props[ProducerConfig.INTERCEPTOR_CLASSES_CONFIG] = listOf(CustomProducerInterceptor::class.java.name) + kafkaProperties.interceptorClasses props[ProducerConfig.ACKS_CONFIG] = kafkaProperties.acks diff --git a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/consumers/FailingProductCreateConsumer.kt b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/consumers/FailingProductCreateConsumer.kt index 9ec40a336..bfa163c70 100644 --- a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/consumers/FailingProductCreateConsumer.kt +++ b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/consumers/FailingProductCreateConsumer.kt @@ -21,9 +21,8 @@ class FailingProductCreateConsumer { groupId = "#{@consumerConfig.groupId}", containerFactory = KafkaConsumerConfiguration.LISTENER_BEAN_NAME ) - fun listen(cr: ConsumerRecord): Unit = runBlocking(MDCContext()) { - logger.info("Received product failing event ${cr.value()}") - + fun listen(record: ConsumerRecord<*, *>): Unit = runBlocking(MDCContext()) { + logger.info("Received product failing event ${record.value()}") throw BusinessException("Failing product create event") } } diff --git a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/consumers/ProductCreateConsumers.kt b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/consumers/ProductCreateConsumers.kt index 4d6d32dd6..3b4a12420 100644 --- a/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/consumers/ProductCreateConsumers.kt +++ b/examples/spring-standalone-example/src/main/kotlin/stove/spring/standalone/example/infrastructure/messaging/kafka/consumers/ProductCreateConsumers.kt @@ -29,12 +29,11 @@ class ProductTransferConsumers( groupId = "#{@consumerConfig.groupId}_retry", containerFactory = KafkaConsumerConfiguration.RETRY_LISTENER_BEAN_NAME ) - fun listen(cr: ConsumerRecord) = - runBlocking(MDCContext()) { - logger.info("Received product transfer command ${cr.value()}") - val command = objectMapper.readValue(cr.value(), CreateProductCommand::class.java) - productCreator.create(command.mapToCreateRequest()) - } + fun listen(record: ConsumerRecord<*, Any>) = runBlocking(MDCContext()) { + logger.info("Received product transfer command ${record.value()}") + val command = objectMapper.convertValue(record.value(), CreateProductCommand::class.java) + productCreator.create(command.mapToCreateRequest()) + } } data class CreateProductCommand( diff --git a/examples/spring-standalone-example/src/test/kotlin/com/stove/spring/standalone/example/e2e/ExampleTest.kt b/examples/spring-standalone-example/src/test/kotlin/com/stove/spring/standalone/example/e2e/ExampleTest.kt index 9e00b7138..55938216f 100644 --- a/examples/spring-standalone-example/src/test/kotlin/com/stove/spring/standalone/example/e2e/ExampleTest.kt +++ b/examples/spring-standalone-example/src/test/kotlin/com/stove/spring/standalone/example/e2e/ExampleTest.kt @@ -42,36 +42,36 @@ class ExampleTest : FunSpec({ test("should create new product when send product create request from api for the allowed supplier") { TestSystem.validate { - val productCreateRequest = ProductCreateRequest(1L, name = "product name", 99L) - val supplierPermission = SupplierPermission(productCreateRequest.supplierId, isAllowed = true) + val request = ProductCreateRequest(1L, name = "product name", 99L) + val permission = SupplierPermission(request.supplierId, isAllowed = true) wiremock { mockGet( - "/suppliers/${supplierPermission.id}/allowed", + "/suppliers/${permission.id}/allowed", statusCode = 200, - responseBody = supplierPermission.some() + responseBody = permission.some() ) } http { - postAndExpectBodilessResponse(uri = "/api/product/create", body = productCreateRequest.some()) { actual -> + postAndExpectBodilessResponse(uri = "/api/product/create", body = request.some()) { actual -> actual.status shouldBe 200 } } kafka { shouldBePublished { - actual.id == productCreateRequest.id && - actual.name == productCreateRequest.name && - actual.supplierId == productCreateRequest.supplierId + actual.id == request.id && + actual.name == request.name && + actual.supplierId == request.supplierId } } couchbase { - shouldGet("product:${productCreateRequest.id}") { actual -> - actual.id shouldBe productCreateRequest.id - actual.name shouldBe productCreateRequest.name - actual.supplierId shouldBe productCreateRequest.supplierId + shouldGet("product:${request.id}") { actual -> + actual.id shouldBe request.id + actual.name shouldBe request.name + actual.supplierId shouldBe request.supplierId } } } @@ -79,18 +79,18 @@ class ExampleTest : FunSpec({ test("should throw error when send product create request from api for for the not allowed supplier") { TestSystem.validate { - val productCreateRequest = ProductCreateRequest(2L, name = "product name", 98L) - val supplierPermission = SupplierPermission(productCreateRequest.supplierId, isAllowed = false) + val request = ProductCreateRequest(2L, name = "product name", 98L) + val permission = SupplierPermission(request.supplierId, isAllowed = false) wiremock { mockGet( - "/suppliers/${supplierPermission.id}/allowed", + "/suppliers/${permission.id}/allowed", statusCode = 200, - responseBody = supplierPermission.some() + responseBody = permission.some() ) } http { - postAndExpectJson(uri = "/api/product/create", body = productCreateRequest.some()) { actual -> - actual shouldBe "Supplier with the given id(${productCreateRequest.supplierId}) is not allowed for product creation" + postAndExpectJson(uri = "/api/product/create", body = request.some()) { actual -> + actual shouldBe "Supplier with the given id(${request.supplierId}) is not allowed for product creation" } } } @@ -98,8 +98,8 @@ class ExampleTest : FunSpec({ test("should throw error when send product create event for the not allowed supplier") { TestSystem.validate { - val productCreateEvent = CreateProductCommand(3L, name = "product name", 97L) - val supplierPermission = SupplierPermission(productCreateEvent.supplierId, isAllowed = false) + val command = CreateProductCommand(3L, name = "product name", 97L) + val supplierPermission = SupplierPermission(command.supplierId, isAllowed = false) wiremock { mockGet( @@ -110,9 +110,9 @@ class ExampleTest : FunSpec({ } kafka { - publish("trendyol.stove.service.product.create.0", productCreateEvent) + publish("trendyol.stove.service.product.create.0", command) shouldBeConsumed(10.seconds) { - actual.id == productCreateEvent.id + actual.id == command.id } } } @@ -120,8 +120,8 @@ class ExampleTest : FunSpec({ test("should create new product when send product create event for the allowed supplier") { TestSystem.validate { - val createProductCommand = CreateProductCommand(4L, name = "product name", 96L) - val supplierPermission = SupplierPermission(createProductCommand.supplierId, isAllowed = true) + val command = CreateProductCommand(4L, name = "product name", 96L) + val supplierPermission = SupplierPermission(command.supplierId, isAllowed = true) wiremock { mockGet( @@ -132,26 +132,26 @@ class ExampleTest : FunSpec({ } kafka { - publish("trendyol.stove.service.product.create.0", createProductCommand) + publish("trendyol.stove.service.product.create.0", command) shouldBeConsumed { - actual.id == createProductCommand.id && - actual.name == createProductCommand.name && - actual.supplierId == createProductCommand.supplierId + actual.id == command.id && + actual.name == command.name && + actual.supplierId == command.supplierId } shouldBePublished { - actual.id == createProductCommand.id && - actual.name == createProductCommand.name && - actual.supplierId == createProductCommand.supplierId && + actual.id == command.id && + actual.name == command.name && + actual.supplierId == command.supplierId && metadata.headers["X-UserEmail"] == "stove@trendyol.com" } } couchbase { - shouldGet("product:${createProductCommand.id}") { actual -> - actual.id shouldBe createProductCommand.id - actual.name shouldBe createProductCommand.name - actual.supplierId shouldBe createProductCommand.supplierId + shouldGet("product:${command.id}") { actual -> + actual.id shouldBe command.id + actual.name shouldBe command.name + actual.supplierId shouldBe command.supplierId } } } diff --git a/examples/spring-standalone-example/src/test/kotlin/com/stove/spring/standalone/example/e2e/Stove.kt b/examples/spring-standalone-example/src/test/kotlin/com/stove/spring/standalone/example/e2e/Stove.kt index 7507b0c92..53bd17680 100644 --- a/examples/spring-standalone-example/src/test/kotlin/com/stove/spring/standalone/example/e2e/Stove.kt +++ b/examples/spring-standalone-example/src/test/kotlin/com/stove/spring/standalone/example/e2e/Stove.kt @@ -3,6 +3,7 @@ package com.stove.spring.standalone.example.e2e import com.trendyol.stove.testing.e2e.* import com.trendyol.stove.testing.e2e.couchbase.* import com.trendyol.stove.testing.e2e.http.* +import com.trendyol.stove.testing.e2e.serialization.StoveSerde import com.trendyol.stove.testing.e2e.standalone.kafka.* import com.trendyol.stove.testing.e2e.system.TestSystem import com.trendyol.stove.testing.e2e.wiremock.* @@ -38,10 +39,9 @@ class Stove : AbstractProjectConfig() { ) } kafka { - stoveKafkaObjectMapperRef = ObjectMapperConfig.createObjectMapperWithDefaults() KafkaSystemOptions( topicSuffixes = TopicSuffixes().copy(error = listOf(".error", ".DLT", "dlt")), - objectMapper = ObjectMapperConfig.createObjectMapperWithDefaults(), + serde = StoveSerde.jackson.anyByteArraySerde(ObjectMapperConfig.default), containerOptions = KafkaContainerOptions(tag = "latest") { } ) { listOf( diff --git a/examples/spring-streams-example/src/main/kotlin/stove/spring/streams/example/kafka/CustomSerDe.kt b/examples/spring-streams-example/src/main/kotlin/stove/spring/streams/example/kafka/CustomSerDe.kt index 2cf5dadba..bc3238012 100644 --- a/examples/spring-streams-example/src/main/kotlin/stove/spring/streams/example/kafka/CustomSerDe.kt +++ b/examples/spring-streams-example/src/main/kotlin/stove/spring/streams/example/kafka/CustomSerDe.kt @@ -12,14 +12,29 @@ class CustomSerDe { @Value("\${kafka.schema-registry-url}") val schemaRegistryUrl = "" - fun createConfiguredSerdeForRecordValues(): KafkaProtobufSerde { - var serde: KafkaProtobufSerde = KafkaProtobufSerde() - if (schemaRegistryUrl.contains("mock://")) { - serde = KafkaProtobufSerde(MockSchemaRegistry.getClientForScope("mock-registry")) + fun createSerdeForValues(): KafkaProtobufSerde = KafkaRegistry.createSerde(schemaRegistryUrl) +} + +sealed class KafkaRegistry(open val url: String) { + object Mock : KafkaRegistry("mock://mock-registry") + + data class Defined(override val url: String) : KafkaRegistry(url) + + companion object { + fun createSerde(fromUrl: String): KafkaProtobufSerde = createSerde( + if (fromUrl.contains(Mock.url)) Mock else Defined(fromUrl) + ) + + fun createSerde(registry: KafkaRegistry = Mock): KafkaProtobufSerde { + val schemaRegistryClient = when (registry) { + is Mock -> MockSchemaRegistry.getClientForScope("mock-registry") + is Defined -> MockSchemaRegistry.getClientForScope(registry.url) + } + val serde: KafkaProtobufSerde = KafkaProtobufSerde(schemaRegistryClient) + val serdeConfig: MutableMap = HashMap() + serdeConfig[SCHEMA_REGISTRY_URL_CONFIG] = registry.url + serde.configure(serdeConfig, false) + return serde } - val serdeConfig: MutableMap = HashMap() - serdeConfig[SCHEMA_REGISTRY_URL_CONFIG] = schemaRegistryUrl - serde.configure(serdeConfig, false) - return serde } } diff --git a/examples/spring-streams-example/src/main/kotlin/stove/spring/streams/example/kafka/application/processor/ExampleJoin.kt b/examples/spring-streams-example/src/main/kotlin/stove/spring/streams/example/kafka/application/processor/ExampleJoin.kt index 072aff508..058915959 100644 --- a/examples/spring-streams-example/src/main/kotlin/stove/spring/streams/example/kafka/application/processor/ExampleJoin.kt +++ b/examples/spring-streams-example/src/main/kotlin/stove/spring/streams/example/kafka/application/processor/ExampleJoin.kt @@ -17,7 +17,7 @@ import stove.spring.streams.example.kafka.CustomSerDe @EnableKafka @EnableKafkaStreams class ExampleJoin(customSerDe: CustomSerDe) { - private val protobufSerde: KafkaProtobufSerde = customSerDe.createConfiguredSerdeForRecordValues() + private val protobufSerde: KafkaProtobufSerde = customSerDe.createSerdeForValues() private val byteArraySerde: Serde = Serdes.ByteArray() private val stringSerde: Serde = Serdes.String() @@ -31,21 +31,20 @@ class ExampleJoin(customSerDe: CustomSerDe) { .stream("input2", Consumed.with(stringSerde, protobufSerde)) .toTable(Materialized.with(stringSerde, protobufSerde)) - val joinedTable = - input1.join( - input2, - { input1Message: Message, input2Message: Message -> - protobufSerde.serializer().serialize( - "output", - output { - this.firstName = Input1.parseFrom(input1Message.toByteArray()).firstName - this.lastName = Input1.parseFrom(input1Message.toByteArray()).lastName - this.bsn = Input2.parseFrom(input2Message.toByteArray()).bsn - this.age = Input2.parseFrom(input2Message.toByteArray()).age - } - ) - } - ) + val joinedTable = input1.join( + input2, + { input1Message: Message, input2Message: Message -> + protobufSerde.serializer().serialize( + "output", + output { + this.firstName = Input1.parseFrom(input1Message.toByteArray()).firstName + this.lastName = Input1.parseFrom(input1Message.toByteArray()).lastName + this.bsn = Input2.parseFrom(input2Message.toByteArray()).bsn + this.age = Input2.parseFrom(input2Message.toByteArray()).age + } + ) + } + ) joinedTable.toStream().to("output", Produced.with(stringSerde, byteArraySerde)) } } diff --git a/examples/spring-streams-example/src/test/kotlin/com/stove/spring/streams/example/e2e/ExampleTest.kt b/examples/spring-streams-example/src/test/kotlin/com/stove/spring/streams/example/e2e/ExampleTest.kt index ba9c1a702..ca29fae42 100644 --- a/examples/spring-streams-example/src/test/kotlin/com/stove/spring/streams/example/e2e/ExampleTest.kt +++ b/examples/spring-streams-example/src/test/kotlin/com/stove/spring/streams/example/e2e/ExampleTest.kt @@ -4,7 +4,6 @@ import arrow.core.Option import com.google.protobuf.Message import com.trendyol.stove.testing.e2e.standalone.kafka.kafka import com.trendyol.stove.testing.e2e.system.TestSystem -import io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde import io.kotest.core.spec.style.FunSpec import org.apache.kafka.common.serialization.StringDeserializer import stove.example.protobuf.* @@ -15,8 +14,6 @@ import java.util.* import kotlin.time.Duration.Companion.seconds class ExampleTest : FunSpec({ - val protobufSerde: KafkaProtobufSerde = createConfiguredSerdeForRecordValues() - test("expect join") { /*------------------------- | Create test data @@ -57,23 +54,13 @@ class ExampleTest : FunSpec({ ----------------------------*/ // Assert input1 message is consumed - shouldBeConsumed { - protobufSerde.messageAsBase64(actual) - .isSome { message -> - message.onMatchingAssert(Input1.getDescriptor().name) { - Input1.parseFrom(it.toByteArray()) == input1Message - } - } + shouldBeConsumed { + actual == input1Message } // Assert input2 message is consumed - shouldBeConsumed { - protobufSerde.messageAsBase64(actual) - .isSome { message -> - message.onMatchingAssert(Input2.getDescriptor().name) { - Input2.parseFrom(it.toByteArray()) == input2Message - } - } + shouldBeConsumed { + actual == input2Message } /*--------------------------- @@ -81,19 +68,15 @@ class ExampleTest : FunSpec({ ----------------------------*/ // Assert joined message is correctly published - shouldBePublished(atLeastIn = 20.seconds) { - protobufSerde.messageAsBase64(actual).isSome { message -> - message.onMatchingAssert(Output.getDescriptor().name) { - Output.parseFrom(it.toByteArray()) == outputMessage - } - } + shouldBePublished(atLeastIn = 20.seconds) { + actual.bsn == bsn } // Assert joined message is correctly published // Similar to test above, but is able to run even if no messages are published consumer( "output", - valueDeserializer = StoveKafkaValueDeserializer(), + valueDeserializer = StoveKafkaValueDeserializer(), keyDeserializer = StringDeserializer() ) { record -> if (Output.parseFrom(record.value().toByteArray()) != outputMessage) throw AssertionError() diff --git a/examples/spring-streams-example/src/test/kotlin/com/stove/spring/streams/example/e2e/Stove.kt b/examples/spring-streams-example/src/test/kotlin/com/stove/spring/streams/example/e2e/Stove.kt index a609e59ce..8429260e2 100644 --- a/examples/spring-streams-example/src/test/kotlin/com/stove/spring/streams/example/e2e/Stove.kt +++ b/examples/spring-streams-example/src/test/kotlin/com/stove/spring/streams/example/e2e/Stove.kt @@ -20,6 +20,7 @@ class Stove : AbstractProjectConfig() { kafka { KafkaSystemOptions( listenPublishedMessagesFromStove = false, + serde = StoveProtobufSerde(), valueSerializer = StoveKafkaValueSerializer(), containerOptions = KafkaContainerOptions(tag = "latest") { } ) { diff --git a/examples/spring-streams-example/src/test/kotlin/com/stove/spring/streams/example/e2e/TestHelper.kt b/examples/spring-streams-example/src/test/kotlin/com/stove/spring/streams/example/e2e/TestHelper.kt index af79819de..e819dad3e 100644 --- a/examples/spring-streams-example/src/test/kotlin/com/stove/spring/streams/example/e2e/TestHelper.kt +++ b/examples/spring-streams-example/src/test/kotlin/com/stove/spring/streams/example/e2e/TestHelper.kt @@ -2,15 +2,16 @@ package com.stove.spring.streams.example.e2e import arrow.core.* import com.google.protobuf.Message -import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG +import com.trendyol.stove.testing.e2e.serialization.StoveSerde import io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde +import kotlinx.serialization.ExperimentalSerializationApi import okio.ByteString.Companion.toByteString import org.apache.kafka.common.serialization.* +import stove.spring.streams.example.kafka.KafkaRegistry import java.util.* class StoveKafkaValueSerializer : Serializer { - private val protobufSerde: KafkaProtobufSerde = createConfiguredSerdeForRecordValues() + private val protobufSerde: KafkaProtobufSerde = KafkaRegistry.createSerde(KafkaRegistry.Mock) override fun serialize( topic: String, @@ -21,8 +22,8 @@ class StoveKafkaValueSerializer : Serializer { } } -class StoveKafkaValueDeserializer : Deserializer { - private val protobufSerde: KafkaProtobufSerde = createConfiguredSerdeForRecordValues() +class StoveKafkaValueDeserializer : Deserializer { + private val protobufSerde: KafkaProtobufSerde = KafkaRegistry.createSerde(KafkaRegistry.Mock) override fun deserialize( topic: String, @@ -30,15 +31,32 @@ class StoveKafkaValueDeserializer : Deserializer { ): Message = protobufSerde.deserializer().deserialize(topic, data) } -fun createConfiguredSerdeForRecordValues(): KafkaProtobufSerde { - val schemaRegistryClient = MockSchemaRegistry.getClientForScope("mock-registry") - val serde: KafkaProtobufSerde = KafkaProtobufSerde(schemaRegistryClient) - val serdeConfig: MutableMap = HashMap() - serdeConfig[SCHEMA_REGISTRY_URL_CONFIG] = "mock://mock-registry" - serde.configure(serdeConfig, false) - return serde +@Suppress("UNCHECKED_CAST") +@OptIn(ExperimentalSerializationApi::class) +class StoveProtobufSerde : StoveSerde { + private val parseFromMethod = "parseFrom" + private val protobufSerde: KafkaProtobufSerde = KafkaRegistry.createSerde(KafkaRegistry.Mock) + + override fun serialize(value: Any): ByteArray = protobufSerde.serializer().serialize("any", value as Message) + + override fun deserialize(value: ByteArray, clazz: Class): T { + val incoming: Message = protobufSerde.deserializer().deserialize("any", value) + incoming.isAssignableFrom(clazz).also { isAssignableFrom -> + require(isAssignableFrom) { + "Expected '${clazz.simpleName}' but got '${incoming.descriptorForType.name}'. " + + "This could be transient ser/de problem since the message stream is constantly checked if the expected message is arrived, " + + "so you can ignore this error if you are sure that the message is the expected one." + } + } + + val parseFromMethod = clazz.getDeclaredMethod(parseFromMethod, ByteArray::class.java) + val parsed = parseFromMethod(incoming, incoming.toByteArray()) as T + return parsed + } } +private fun Message.isAssignableFrom(clazz: Class<*>): Boolean = this.descriptorForType.name == clazz.simpleName + fun KafkaProtobufSerde.messageAsBase64( message: Any ): Option = Either.catch { diff --git a/lib/stove-testing-e2e-kafka/build.gradle.kts b/lib/stove-testing-e2e-kafka/build.gradle.kts index c5b256c93..f12fab56c 100644 --- a/lib/stove-testing-e2e-kafka/build.gradle.kts +++ b/lib/stove-testing-e2e-kafka/build.gradle.kts @@ -39,7 +39,7 @@ wire { rpcRole = "client" rpcCallStyle = "suspending" exclusive = false - javaInterop = true + javaInterop = false } kotlin { custom { @@ -53,7 +53,7 @@ wire { rpcCallStyle = "suspending" exclusive = false singleMethodServices = false - javaInterop = true + javaInterop = false includes = listOf("com.trendyol.stove.testing.e2e.standalone.kafka.StoveKafkaObserverService") } } diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystem.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystem.kt index 68d03a988..49052c1b5 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystem.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystem.kt @@ -1,10 +1,9 @@ package com.trendyol.stove.testing.e2e.standalone.kafka import arrow.core.* -import com.fasterxml.jackson.databind.ObjectMapper import com.trendyol.stove.functional.* import com.trendyol.stove.testing.e2e.messaging.* -import com.trendyol.stove.testing.e2e.serialization.* +import com.trendyol.stove.testing.e2e.serialization.StoveSerde import com.trendyol.stove.testing.e2e.standalone.kafka.intercepting.* import com.trendyol.stove.testing.e2e.system.TestSystem import com.trendyol.stove.testing.e2e.system.abstractions.* @@ -24,17 +23,17 @@ import kotlin.time.* import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds -var stoveKafkaObjectMapperRef: ObjectMapper = StoveSerde.jackson.default +var stoveSerdeRef: StoveSerde = StoveSerde.jackson.anyByteArraySerde() var stoveKafkaBridgePortDefault = "50051" const val STOVE_KAFKA_BRIDGE_PORT = "STOVE_KAFKA_BRIDGE_PORT" internal val StoveKafkaCoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()) -@Suppress("TooManyFunctions") +@Suppress("TooManyFunctions", "unused") @StoveDsl class KafkaSystem( override val testSystem: TestSystem, private val context: KafkaContext -) : PluggedSystem, ExposesConfiguration, RunAware, AfterRunAware { +) : PluggedSystem, ExposesConfiguration, RunAware, AfterRunAware, BeforeRunAware { private lateinit var exposedConfiguration: KafkaExposedConfiguration private lateinit var adminClient: Admin private lateinit var kafkaPublisher: KafkaProducer @@ -46,6 +45,10 @@ class KafkaSystem( private val state: StateStorage = testSystem.options.createStateStorage() + override suspend fun beforeRun() { + stoveSerdeRef = context.options.serde + } + override suspend fun run() { exposedConfiguration = state.capture { context.container.start() @@ -62,7 +65,7 @@ class KafkaSystem( ) sink = TestSystemMessageSink( adminClient, - context.options.objectMapper, + context.options.serde, context.options.topicSuffixes ) grpcServer = startGrpcServer() @@ -144,7 +147,7 @@ class KafkaSystem( sink.store.consumedMessages() .filter { it.topic == topic && it.offset > offset } .onEach { offset = it.offset } - .map { ConsumedRecord(it.topic, it.key, it.message, it.headers, it.offset, it.partition) } + .map { ConsumedRecord(it.topic, it.key, it.message.toByteArray(), it.headers, it.offset, it.partition) } .forEach { loop = !condition(it) } @@ -198,7 +201,7 @@ class KafkaSystem( sink.store.publishedMessages() .filter { it.topic == topic && !seenIds.containsKey(it.id) } .onEach { seenIds[it.id] = it } - .map { PublishedRecord(it.topic, it.key, it.message, it.headers) } + .map { PublishedRecord(it.topic, it.key, it.message.toByteArray(), it.headers) } .forEach { loop = !condition(it) } @@ -358,9 +361,7 @@ class KafkaSystem( ProducerConfig.ACKS_CONFIG to "1" ) + ( if (listenKafkaSystemPublishedMessages) { - mapOf( - ProducerConfig.INTERCEPTOR_CLASSES_CONFIG to exposedConfiguration.interceptorClass - ) + mapOf(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG to exposedConfiguration.interceptorClass) } else { emptyMap() } diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystemOptions.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystemOptions.kt index a36d6c0ac..b3df31bf2 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystemOptions.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystemOptions.kt @@ -1,6 +1,6 @@ package com.trendyol.stove.testing.e2e.standalone.kafka -import com.fasterxml.jackson.databind.ObjectMapper +import com.trendyol.stove.testing.e2e.serialization.StoveSerde import com.trendyol.stove.testing.e2e.system.abstractions.* import org.apache.kafka.common.serialization.Serializer @@ -18,9 +18,20 @@ class KafkaSystemOptions( */ val bridgeGrpcServerPort: Int = stoveKafkaBridgePortDefault.toInt(), /** - * The object mapper that is used to serialize and deserialize messages. + * The Serde that is used while asserting the messages, + * serializing while bridging the messages. Take a look at the [serde] property for more information. + * + * The default value is [StoveSerde.jackson]'s anyByteArraySerde. + * Depending on your application's needs you might want to change this value. + * + * The places where it was used listed below: + * + * @see [com.trendyol.stove.testing.e2e.standalone.kafka.intercepting.StoveKafkaBridge] for bridging the messages. + * @see StoveKafkaValueSerializer for serializing the messages. + * @see StoveKafkaValueDeserializer for deserializing the messages. + * @see valueSerializer for serializing the messages. */ - val objectMapper: ObjectMapper = stoveKafkaObjectMapperRef, + val serde: StoveSerde = stoveSerdeRef, /** * The Value serializer that is used to serialize messages. */ diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/SerDe.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/SerDe.kt index 347ccb010..55d127da9 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/SerDe.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/SerDe.kt @@ -1,6 +1,5 @@ package com.trendyol.stove.testing.e2e.standalone.kafka -import com.fasterxml.jackson.module.kotlin.readValue import org.apache.kafka.common.serialization.* @Suppress("UNCHECKED_CAST") @@ -8,12 +7,12 @@ class StoveKafkaValueDeserializer : Deserializer { override fun deserialize( topic: String, data: ByteArray - ): T = stoveKafkaObjectMapperRef.readValue(data) as T + ): T = stoveSerdeRef.deserialize(data, Any::class.java) as T } class StoveKafkaValueSerializer : Serializer { override fun serialize( topic: String, data: T - ): ByteArray = stoveKafkaObjectMapperRef.writeValueAsBytes(data) + ): ByteArray = stoveSerdeRef.serialize(data) } diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/CommonOps.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/CommonOps.kt index 44c3e28bc..65db67e7a 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/CommonOps.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/CommonOps.kt @@ -1,8 +1,8 @@ package com.trendyol.stove.testing.e2e.standalone.kafka.intercepting import arrow.core.toOption -import com.fasterxml.jackson.databind.ObjectMapper import com.trendyol.stove.testing.e2e.messaging.* +import com.trendyol.stove.testing.e2e.serialization.StoveSerde import com.trendyol.stove.testing.e2e.standalone.kafka.* import kotlinx.coroutines.* import org.apache.kafka.clients.admin.Admin @@ -12,7 +12,7 @@ import kotlin.time.Duration internal interface CommonOps { val store: MessageStore - val serde: ObjectMapper + val serde: StoveSerde val adminClient: Admin val topicSuffixes: TopicSuffixes val logger: Logger @@ -59,7 +59,7 @@ internal interface CommonOps { selector: (message: ParsedMessage) -> Boolean ): Unit = store.failedMessages() .filter { - selector(SuccessfulParsedMessage(readCatching(it.message, clazz).getOrNull().toOption(), it.metadata())) + selector(SuccessfulParsedMessage(readCatching(it.message.toByteArray(), clazz).getOrNull().toOption(), it.metadata())) }.forEach { throw AssertionError("Message was expected to be consumed successfully, but failed: $it \n ${dumpMessages()}") } @@ -71,7 +71,7 @@ internal interface CommonOps { .filter { selector( SuccessfulParsedMessage( - readCatching(it.message, clazz).getOrNull().toOption(), + readCatching(it.message.toByteArray(), clazz).getOrNull().toOption(), MessageMetadata(it.topic, it.key, it.headers) ) ) @@ -79,28 +79,11 @@ internal interface CommonOps { throw AssertionError("Message was expected to be consumed successfully, but was retried: $it \n ${dumpMessages()}") } - fun throwIfSucceeded( - clazz: KClass, - selector: (message: ParsedMessage) -> Boolean - ): Unit = store.consumedMessages() - .filter { - selector( - SuccessfulParsedMessage( - readCatching(it.message, clazz).getOrNull().toOption(), - it.metadata() - ) - ) && store.isCommitted(it.topic, it.offset, it.partition) - }.forEach { throw AssertionError("Message was expected to fail, but was consumed: $it \n ${dumpMessages()}") } - fun readCatching( - json: Any, + value: ByteArray, clazz: KClass - ): Result = runCatching { - when (json) { - is String -> serde.readValue(json, clazz.java) - else -> serde.convertValue(json, clazz.java) - } - } + ): Result = runCatching { serde.deserialize(value, clazz.java) } + .onFailure { exception -> logger.error("Failed to deserialize message: ${String(value)}", exception) } fun dumpMessages(): String } diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/MessageSinkOps.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/MessageSinkOps.kt index 0967c2870..75f79cd05 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/MessageSinkOps.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/MessageSinkOps.kt @@ -48,7 +48,7 @@ internal interface MessageSinkOps : MessageSinkPublishOps, CommonOps { ) { val getRecords = { store.consumedMessages() } getRecords.waitUntilConditionMet(atLeastIn, "While expecting consuming of ${clazz.java.simpleName}") { - val outcome = readCatching(it.message, clazz) + val outcome = readCatching(it.message.toByteArray(), clazz) outcome.isSuccess && condition( SuccessfulParsedMessage( outcome.getOrNull().toOption(), @@ -66,13 +66,13 @@ internal interface MessageSinkOps : MessageSinkPublishOps, CommonOps { clazz: KClass, condition: (ParsedMessage) -> Boolean ) { - data class FailedMessage(val message: String, val metadata: MessageMetadata) + class FailedMessage(val message: ByteArray, val metadata: MessageMetadata) val getRecords = { - store.failedMessages().map { FailedMessage(it.message, it.metadata()) } + + store.failedMessages().map { FailedMessage(it.message.toByteArray(), it.metadata()) } + store.publishedMessages() .filter { topicSuffixes.isErrorTopic(it.topic) } - .map { FailedMessage(it.message, it.metadata()) } + .map { FailedMessage(it.message.toByteArray(), it.metadata()) } } getRecords.waitUntilConditionMet(atLeastIn, "While expecting Failure of ${clazz.java.simpleName}") { val outcome = readCatching(it.message, clazz) @@ -89,7 +89,7 @@ internal interface MessageSinkOps : MessageSinkPublishOps, CommonOps { val getRecords = { store.retriedMessages() } val failedFunc = suspend { getRecords.waitUntilConditionMet(atLeastIn, "While expecting Retrying of ${clazz.java.simpleName}") { - val outcome = readCatching(it.message, clazz) + val outcome = readCatching(it.message.toByteArray(), clazz) outcome.isSuccess && condition( SuccessfulParsedMessage( outcome.getOrNull().toOption(), diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/MessageSinkPublishOps.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/MessageSinkPublishOps.kt index faba585f7..bfc81c4fe 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/MessageSinkPublishOps.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/MessageSinkPublishOps.kt @@ -15,7 +15,7 @@ internal interface MessageSinkPublishOps : CommonOps { ) { val getRecords = { store.publishedMessages().map { it } } getRecords.waitUntilConditionMet(atLeastIn, "While expecting Publishing of ${clazz.java.simpleName}") { - val outcome = readCatching(it.message, clazz) + val outcome = readCatching(it.message.toByteArray(), clazz) outcome.isSuccess && condition(SuccessfulParsedMessage(outcome.getOrNull().toOption(), MessageMetadata(it.topic, it.key, it.headers))) } } diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/StoveKafkaBridge.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/StoveKafkaBridge.kt index 26575c5b5..0b5d4e11e 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/StoveKafkaBridge.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/StoveKafkaBridge.kt @@ -1,10 +1,11 @@ package com.trendyol.stove.testing.e2e.standalone.kafka.intercepting -import com.fasterxml.jackson.databind.ObjectMapper import com.squareup.wire.GrpcException import com.trendyol.stove.functional.* +import com.trendyol.stove.testing.e2e.serialization.StoveSerde import com.trendyol.stove.testing.e2e.standalone.kafka.* import kotlinx.coroutines.runBlocking +import okio.ByteString.Companion.toByteString import org.apache.kafka.clients.consumer.* import org.apache.kafka.clients.producer.* import org.apache.kafka.common.TopicPartition @@ -16,7 +17,7 @@ import java.util.* class StoveKafkaBridge : ConsumerInterceptor, ProducerInterceptor { private val logger: Logger = org.slf4j.LoggerFactory.getLogger(StoveKafkaBridge::class.java) private val client: StoveKafkaObserverServiceClient by lazy { startGrpcClient() } - private val mapper: ObjectMapper by lazy { stoveKafkaObjectMapperRef } + private val serde: StoveSerde by lazy { stoveSerdeRef } override fun onSend(record: ProducerRecord): ProducerRecord = runBlocking { record.also { send(publishedMessage(it)) } @@ -105,7 +106,7 @@ class StoveKafkaBridge : ConsumerInterceptor, ProducerInterceptor : ConsumerInterceptor, ProducerInterceptor) = PublishedMessage( id = UUID.randomUUID().toString(), key = record.key().toString(), - message = serializeIfNotString(record.value()), + message = serializeIfNotYet(record.value()).toByteString(), topic = record.topic(), headers = record.headers().associate { it.key() to it.value().toString(Charset.defaultCharset()) } ) @@ -133,9 +134,9 @@ class StoveKafkaBridge : ConsumerInterceptor, ProducerInterceptor value - else -> mapper.writeValueAsString(value) + private fun serializeIfNotYet(value: V): ByteArray = when (value) { + is ByteArray -> value + else -> serde.serialize(value as Any) } private fun startGrpcClient(): StoveKafkaObserverServiceClient { diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/TestSystemMessageSink.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/TestSystemMessageSink.kt index dcc4ff591..32bc62b48 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/TestSystemMessageSink.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/TestSystemMessageSink.kt @@ -1,13 +1,13 @@ package com.trendyol.stove.testing.e2e.standalone.kafka.intercepting -import com.fasterxml.jackson.databind.ObjectMapper +import com.trendyol.stove.testing.e2e.serialization.StoveSerde import com.trendyol.stove.testing.e2e.standalone.kafka.* import org.apache.kafka.clients.admin.Admin import org.slf4j.* class TestSystemMessageSink( override val adminClient: Admin, - override val serde: ObjectMapper, + override val serde: StoveSerde, override val topicSuffixes: TopicSuffixes ) : MessageSinkOps, CommonOps { override val logger: Logger = LoggerFactory.getLogger(javaClass) diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/messages.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/messages.kt index 0136ea699..db57a9ec7 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/messages.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/messages.kt @@ -1,23 +1,23 @@ package com.trendyol.stove.testing.e2e.standalone.kafka -data class PublishedRecord( +class PublishedRecord( val topic: String, val key: String, - val value: String, + val value: ByteArray, val headers: Map ) -data class CommittedRecord( +class CommittedRecord( val topic: String, val metadata: String, val offset: Long, val partition: Int ) -data class ConsumedRecord( +class ConsumedRecord( val topic: String, val key: String, - val value: String, + val value: ByteArray, val headers: Map, val offset: Long, val partition: Int diff --git a/lib/stove-testing-e2e-kafka/src/main/proto/messages.proto b/lib/stove-testing-e2e-kafka/src/main/proto/messages.proto index 8a3f587e0..245d54760 100644 --- a/lib/stove-testing-e2e-kafka/src/main/proto/messages.proto +++ b/lib/stove-testing-e2e-kafka/src/main/proto/messages.proto @@ -5,7 +5,7 @@ package com.trendyol.stove.testing.e2e.standalone.kafka; message ConsumedMessage { string id = 1; - string message = 2; + bytes message = 2; string topic = 3; int32 partition = 4; int64 offset = 5; @@ -15,7 +15,7 @@ message ConsumedMessage { message PublishedMessage { string id = 1; - string message = 2; + bytes message = 2; string topic = 3; string key = 4; map headers = 5;