Skip to content

Commit

Permalink
refactor(standalone-kafka): use StoveSerde<Any,ByteArry> interface to…
Browse files Browse the repository at this point in the history
… bridge the messages and let users select their ser/de #560 (#664)
  • Loading branch information
osoykan authored Dec 3, 2024
1 parent 4c03e83 commit 6cde696
Show file tree
Hide file tree
Showing 31 changed files with 223 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.stove.ktor.example.e2e
import com.trendol.stove.testing.e2e.rdbms.postgres.*
import com.trendyol.stove.testing.e2e.*
import com.trendyol.stove.testing.e2e.http.*
import com.trendyol.stove.testing.e2e.serialization.StoveSerde
import com.trendyol.stove.testing.e2e.standalone.kafka.*
import com.trendyol.stove.testing.e2e.system.TestSystem
import io.kotest.core.config.AbstractProjectConfig
Expand Down Expand Up @@ -40,8 +41,9 @@ class Stove : AbstractProjectConfig() {
})
}
kafka {
stoveKafkaObjectMapperRef = objectMapperRef
KafkaSystemOptions {
KafkaSystemOptions(
serde = StoveSerde.jackson.anyByteArraySerde(objectMapperRef)
) {
listOf(
"kafka.bootstrapServers=${it.bootstrapServers}",
"kafka.interceptorClasses=${it.interceptorClass}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ class DefaultConsumerSettings(
props[ConsumerConfig.CLIENT_ID_CONFIG] = kafkaProperties.createClientId()
props[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = kafkaProperties.autoCreateTopics
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaProperties.bootstrapServers
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java
props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java
props[JsonDeserializer.TRUSTED_PACKAGES] = "*"
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = kafkaProperties.offset
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ class Stove : AbstractProjectConfig() {
}
kafka {
KafkaSystemOptions(
containerOptions = KafkaContainerOptions(tag = "latest") {
}
containerOptions = KafkaContainerOptions(tag = "latest") { }
) {
listOf(
"kafka.bootstrapServers=${it.bootstrapServers}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ProductCreator(
key = req.id.toString(),
headers = mapOf(Headers.EVENT_TYPE to ProductCreatedEvent::class.simpleName!!),
partition = 0,
payload = objectMapper.writeValueAsString(req.mapToProductCreatedEvent())
payload = req.mapToProductCreatedEvent()
)
)
return "OK"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,28 @@ package stove.spring.standalone.example.infrastructure

import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.databind.*
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import org.springframework.boot.autoconfigure.AutoConfigureBefore
import org.springframework.boot.autoconfigure.jackson.*
import org.springframework.context.annotation.*

@Configuration
@AutoConfigureBefore(JacksonAutoConfiguration::class)
class ObjectMapperConfig {
companion object {
fun createObjectMapperWithDefaults(): ObjectMapper {
val isoInstantModule = SimpleModule()
return ObjectMapper()
.registerModule(KotlinModule.Builder().build())
.registerModule(isoInstantModule)
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
}
val default: ObjectMapper = ObjectMapper()
.registerModule(KotlinModule.Builder().build())
.findAndRegisterModules()
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
}

@Bean
fun objectMapper(): ObjectMapper = createObjectMapperWithDefaults()
@Primary
fun objectMapper(): ObjectMapper = default

@Bean
fun jacksonCustomizer(): Jackson2ObjectMapperBuilderCustomizer = Jackson2ObjectMapperBuilderCustomizer { builder ->
builder.factory(default.factory)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ class CouchbaseConfiguration(
private val meterRegistry: MeterRegistry
) {
companion object {
val objectMapper: ObjectMapper =
ObjectMapperConfig.createObjectMapperWithDefaults()
.registerModule(JsonValueModule())
val objectMapper: ObjectMapper = ObjectMapperConfig.default.registerModule(JsonValueModule())
}

@Primary
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package stove.spring.standalone.example.infrastructure.http

import com.fasterxml.jackson.databind.*
import com.fasterxml.jackson.databind.ObjectMapper
import io.netty.channel.ChannelOption
import io.netty.handler.timeout.ReadTimeoutHandler
import org.springframework.boot.context.properties.EnableConfigurationProperties
Expand All @@ -11,7 +11,6 @@ import org.springframework.http.codec.ClientCodecConfigurer
import org.springframework.http.codec.json.*
import org.springframework.web.reactive.function.client.*
import reactor.netty.http.client.HttpClient
import stove.spring.standalone.example.infrastructure.ObjectMapperConfig
import java.util.concurrent.TimeUnit

@Configuration
Expand All @@ -31,12 +30,6 @@ class WebClientConfiguration(private val webClientConfigurationProperties: WebCl
.exchangeStrategies(exchangeStrategies)
.build()

@Bean
fun webClientObjectMapper(): ObjectMapper {
return ObjectMapperConfig.createObjectMapperWithDefaults()
.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, false)
}

@Bean
fun exchangeStrategies(webClientObjectMapper: ObjectMapper): ExchangeStrategies {
return ExchangeStrategies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import org.slf4j.*
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component

data class KafkaOutgoingMessage(
data class KafkaOutgoingMessage<K, V>(
val topic: String,
val key: String?,
val payload: String,
val key: K,
val payload: V,
val headers: Map<String, String>,
val partition: Int? = null
)
Expand All @@ -21,7 +21,7 @@ class KafkaProducer(
) {
private val logger: Logger = LoggerFactory.getLogger(KafkaProducer::class.java)

suspend fun send(message: KafkaOutgoingMessage) {
suspend fun send(message: KafkaOutgoingMessage<String, Any>) {
val recordHeaders = message.headers.map { RecordHeader(it.key, it.value.toByteArray()) }
val record = ProducerRecord<String, Any>(
message.topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.kafka.support.serializer.*
import org.springframework.stereotype.Component
import java.time.Duration

Expand Down Expand Up @@ -37,8 +38,13 @@ class DefaultConsumerSettings(
props[ConsumerConfig.CLIENT_ID_CONFIG] = kafkaProperties.createClientId()
props[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = kafkaProperties.autoCreateTopics
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaProperties.bootstrapServers
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java
props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java
props[JsonDeserializer.TRUSTED_PACKAGES] = "*"
props[JsonDeserializer.REMOVE_TYPE_INFO_HEADERS] = false
props[JsonDeserializer.VALUE_DEFAULT_TYPE] = Any::class.java
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = kafkaProperties.offset
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = true
props[ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG] = ofSeconds(AUTO_COMMIT_INTERVAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class KafkaConsumerConfiguration(
factory.setConcurrency(1)
factory.consumerFactory = consumerFactory
factory.containerProperties.isDeliveryAttemptHeader = true
factory.setRecordMessageConverter(stringJsonMessageConverter())
val errorHandler = DefaultErrorHandler(
DeadLetterPublishingRecoverer(kafkaTemplate),
FixedBackOff(0, 0)
Expand All @@ -41,7 +40,6 @@ class KafkaConsumerConfiguration(
): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.setConcurrency(1)
factory.setRecordMessageConverter(stringJsonMessageConverter())
factory.containerProperties.isDeliveryAttemptHeader = true
factory.consumerFactory = consumerRetryFactory
val errorHandler = DefaultErrorHandler(
Expand All @@ -54,14 +52,12 @@ class KafkaConsumerConfiguration(
}

@Bean
fun consumerFactory(consumerSettings: ConsumerSettings): ConsumerFactory<String, Any> {
return DefaultKafkaConsumerFactory(consumerSettings.settings())
}
fun consumerFactory(consumerSettings: ConsumerSettings): ConsumerFactory<String, Any> =
DefaultKafkaConsumerFactory(consumerSettings.settings())

@Bean
fun consumerRetryFactory(consumerSettings: ConsumerSettings): ConsumerFactory<String, Any> {
return DefaultKafkaConsumerFactory(consumerSettings.settings())
}
fun consumerRetryFactory(consumerSettings: ConsumerSettings): ConsumerFactory<String, Any> =
DefaultKafkaConsumerFactory(consumerSettings.settings())

@Bean
fun stringJsonMessageConverter(): StringJsonMessageConverter = StringJsonMessageConverter(objectMapper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stove.spring.standalone.example.infrastructure.messaging.kafka.configura
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.kafka.support.serializer.JsonSerializer
import org.springframework.stereotype.Component
import stove.spring.standalone.example.infrastructure.messaging.kafka.interceptors.CustomProducerInterceptor

Expand All @@ -15,7 +16,7 @@ class DefaultProducerSettings(private val kafkaProperties: KafkaProperties) : Pr
val props: MutableMap<String, Any> = HashMap()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaProperties.bootstrapServers
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
props[ProducerConfig.INTERCEPTOR_CLASSES_CONFIG] =
listOf(CustomProducerInterceptor::class.java.name) + kafkaProperties.interceptorClasses
props[ProducerConfig.ACKS_CONFIG] = kafkaProperties.acks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ class FailingProductCreateConsumer {
groupId = "#{@consumerConfig.groupId}",
containerFactory = KafkaConsumerConfiguration.LISTENER_BEAN_NAME
)
fun listen(cr: ConsumerRecord<String, String>): Unit = runBlocking(MDCContext()) {
logger.info("Received product failing event ${cr.value()}")

fun listen(record: ConsumerRecord<*, *>): Unit = runBlocking(MDCContext()) {
logger.info("Received product failing event ${record.value()}")
throw BusinessException("Failing product create event")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ class ProductTransferConsumers(
groupId = "#{@consumerConfig.groupId}_retry",
containerFactory = KafkaConsumerConfiguration.RETRY_LISTENER_BEAN_NAME
)
fun listen(cr: ConsumerRecord<String, String>) =
runBlocking(MDCContext()) {
logger.info("Received product transfer command ${cr.value()}")
val command = objectMapper.readValue(cr.value(), CreateProductCommand::class.java)
productCreator.create(command.mapToCreateRequest())
}
fun listen(record: ConsumerRecord<*, Any>) = runBlocking(MDCContext()) {
logger.info("Received product transfer command ${record.value()}")
val command = objectMapper.convertValue(record.value(), CreateProductCommand::class.java)
productCreator.create(command.mapToCreateRequest())
}
}

data class CreateProductCommand(
Expand Down
Loading

0 comments on commit 6cde696

Please sign in to comment.