Skip to content

Commit

Permalink
chore(recipes): bump Stove to 1.0.0.452-SNAPSHOT
Browse files Browse the repository at this point in the history
  • Loading branch information
osoykan committed Dec 3, 2024
1 parent 065ef5a commit 8df7342
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 37 deletions.
2 changes: 1 addition & 1 deletion recipes/gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ hoplite = "2.9.0"
kediatr = "3.1.2"

# Testing
stove = "1.0.0.450-SNAPSHOT"
stove = "1.0.0.452-SNAPSHOT"
junit = "5.11.3"
kotest = "5.9.1"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.backoff.FixedBackOff;

@Configuration
Expand Down Expand Up @@ -51,10 +54,16 @@ public Properties consumerProperties(KafkaConfiguration kafkaConfiguration) {
ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, kafkaConfiguration.isAutoCreateTopics());
properties.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConfiguration.getAutoOffsetReset());

properties.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class.getName());
properties.put(
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
properties.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
properties.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Object.class.getName());

properties.put(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, kafkaConfiguration.flattenInterceptorClasses());
logger.info("Kafka consumer properties: {}", properties);
Expand All @@ -67,7 +76,7 @@ public Properties producerProperties(KafkaConfiguration kafkaConfiguration) {
properties.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBootstrapServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
properties.put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, kafkaConfiguration.flattenInterceptorClasses());
return properties;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,23 @@
package com.trendyol.stove.examples.java.spring.infra.boilerplate.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.trendyol.stove.examples.domain.ddd.AggregateRoot;
import com.trendyol.stove.examples.domain.ddd.EventPublisher;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaDomainEventPublisher implements EventPublisher {
private final KafkaTemplate<String, Object> template;
private final TopicResolver topicResolver;
private final ObjectMapper objectMapper;
private final Logger logger = org.slf4j.LoggerFactory.getLogger(KafkaDomainEventPublisher.class);

public KafkaDomainEventPublisher(
KafkaTemplate<String, Object> template,
TopicResolver topicResolver,
@Qualifier("objectMapper") ObjectMapper objectMapper) {
KafkaTemplate<String, Object> template, TopicResolver topicResolver) {
this.template = template;
this.topicResolver = topicResolver;
this.objectMapper = objectMapper;
}

@Override
Expand All @@ -38,15 +31,8 @@ private <TId> Stream<ProducerRecord<String, Object>> mapEventsToProducerRecords(
.map(
event -> {
var topic = topicResolver.resolve(aggregateRoot.getAggregateName());
try {
logger.info("Publishing event {} to topic {}", event, topic.getName());
return new ProducerRecord<>(
topic.getName(),
aggregateRoot.getIdAsString(),
objectMapper.writeValueAsString(event));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
logger.info("Publishing event {} to topic {}", event, topic.getName());
return new ProducerRecord<>(topic.getName(), aggregateRoot.getIdAsString(), event);
});
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,36 @@
package com.trendyol.stove.example.java.spring.e2e.setup

import com.couchbase.client.kotlin.codec.JacksonJsonSerializer
import com.trendyol.stove.example.java.spring.e2e.setup.migrations.CouchbaseMigration
import com.trendyol.stove.examples.java.spring.ExampleSpringBootApp
import com.trendyol.stove.examples.java.spring.infra.boilerplate.serialization.JacksonConfiguration
import com.trendyol.stove.examples.java.spring.infra.components.product.persistency.CollectionConstants
import com.trendyol.stove.testing.e2e.*
import com.trendyol.stove.testing.e2e.couchbase.*
import com.trendyol.stove.testing.e2e.http.*
import com.trendyol.stove.testing.e2e.serialization.StoveSerde
import com.trendyol.stove.testing.e2e.standalone.kafka.*
import com.trendyol.stove.testing.e2e.system.TestSystem
import com.trendyol.stove.testing.e2e.wiremock.*
import io.kotest.core.config.AbstractProjectConfig
import io.ktor.serialization.jackson.*
import org.springframework.kafka.support.serializer.JsonSerializer

class Stove : AbstractProjectConfig() {
override suspend fun beforeProject() {
TestSystem().with {
httpClient {
HttpClientSystemOptions(
baseUrl = "http://localhost:8080",
objectMapper = JacksonConfiguration.defaultObjectMapper()
contentConverter = JacksonConverter(JacksonConfiguration.defaultObjectMapper())
)
}

bridge()
wiremock {
WireMockSystemOptions(
port = 9090,
objectMapper = JacksonConfiguration.defaultObjectMapper()
serde = StoveSerde.jackson.anyByteArraySerde(JacksonConfiguration.defaultObjectMapper())
)
}
couchbase {
Expand All @@ -36,7 +40,7 @@ class Stove : AbstractProjectConfig() {
withStartupAttempts(3)
dockerImageName = "couchbase/server:7.2.5"
},
objectMapper = JacksonConfiguration.defaultObjectMapper(),
clusterSerDe = JacksonJsonSerializer(JacksonConfiguration.defaultObjectMapper()),
configureExposedConfiguration = { cfg ->
listOf(
"couchbase.bucket=${CollectionConstants.BUCKET_NAME}",
Expand All @@ -53,7 +57,8 @@ class Stove : AbstractProjectConfig() {

kafka {
KafkaSystemOptions(
objectMapper = JacksonConfiguration.defaultObjectMapper(),
serde = StoveSerde.jackson.anyByteArraySerde(JacksonConfiguration.defaultObjectMapper()),
valueSerializer = JsonSerializer(JacksonConfiguration.defaultObjectMapper()),
containerOptions = KafkaContainerOptions {
withStartupAttempts(3)
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.trendyol.stove.examples.kotlin.ktor.infra.boilerplate.kafka

import com.fasterxml.jackson.databind.ObjectMapper
import com.trendyol.stove.examples.domain.ddd.*
import io.github.nomisRev.kafka.publisher.KafkaPublisher
import kotlinx.coroutines.runBlocking
Expand All @@ -9,8 +8,7 @@ import org.slf4j.*

class KafkaDomainEventPublisher(
private val publisher: KafkaPublisher<String, Any>,
private val topicResolver: TopicResolver,
private val objectMapper: ObjectMapper
private val topicResolver: TopicResolver
) : EventPublisher {
private val logger: Logger = LoggerFactory.getLogger(KafkaDomainEventPublisher::class.java)

Expand All @@ -28,7 +26,7 @@ class KafkaDomainEventPublisher(
ProducerRecord<String, Any>(
topic.name,
aggregateRoot.idAsString,
objectMapper.writeValueAsString(event)
event
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fun KoinApplication.registerKafka(kafkaConfiguration: KafkaConfiguration) {
single { kafkaPublisher(get()) }
single { kafkaReceiver(get()) }
single { ConsumerEngine(getAll()) }
single { KafkaDomainEventPublisher(get(), get(), get()) }.bind<EventPublisher>()
single { KafkaDomainEventPublisher(get(), get()) }.bind<EventPublisher>()
single { TopicResolver(get()) }
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,23 @@ import com.trendyol.stove.examples.kotlin.ktor.infra.components.product.persiste
import com.trendyol.stove.testing.e2e.*
import com.trendyol.stove.testing.e2e.http.*
import com.trendyol.stove.testing.e2e.mongodb.*
import com.trendyol.stove.testing.e2e.serialization.StoveSerde
import com.trendyol.stove.testing.e2e.standalone.kafka.*
import com.trendyol.stove.testing.e2e.system.TestSystem
import com.trendyol.stove.testing.e2e.wiremock.*
import io.kotest.core.config.AbstractProjectConfig
import io.ktor.serialization.jackson.*
import org.koin.dsl.module

private val database = "stove-kotlin-ktor"
private const val DATABASE = "stove-kotlin-ktor"

class Stove : AbstractProjectConfig() {
override suspend fun beforeProject() {
TestSystem().with {
httpClient {
HttpClientSystemOptions(
baseUrl = "http://localhost:8081",
objectMapper = JacksonConfiguration.default
contentConverter = JacksonConverter(JacksonConfiguration.default)
)
}
bridge()
Expand All @@ -31,6 +33,7 @@ class Stove : AbstractProjectConfig() {
}
kafka {
KafkaSystemOptions(
serde = StoveSerde.jackson.anyByteArraySerde(JacksonConfiguration.default),
configureExposedConfiguration = { cfg ->
listOf(
"kafka.bootstrapServers=${cfg.bootstrapServers}",
Expand All @@ -41,12 +44,11 @@ class Stove : AbstractProjectConfig() {
}
mongodb {
MongodbSystemOptions(
databaseOptions = DatabaseOptions(DatabaseOptions.DefaultDatabase(database, collection = PRODUCT_COLLECTION)),
databaseOptions = DatabaseOptions(DatabaseOptions.DefaultDatabase(DATABASE, collection = PRODUCT_COLLECTION)),
container = MongoContainerOptions(),
objectMapper = JacksonConfiguration.default,
configureExposedConfiguration = { cfg ->
listOf(
"mongo.database=$database",
"mongo.database=$DATABASE",
"mongo.uri=${cfg.connectionString}/?retryWrites=true&w=majority"
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.trendyol.stove.examples.kotlin.spring.e2e.tests

import com.fasterxml.jackson.module.kotlin.readValue
import com.trendyol.stove.examples.kotlin.spring.ExampleData
import com.trendyol.stove.testing.e2e.http.http
import com.trendyol.stove.testing.e2e.serialization.StoveObjectMapper
import com.trendyol.stove.testing.e2e.serialization.StoveSerde
import com.trendyol.stove.testing.e2e.serialization.StoveSerde.Companion.deserialize
import com.trendyol.stove.testing.e2e.system.TestSystem.Companion.validate
import io.kotest.core.spec.style.FunSpec
import io.ktor.client.*
Expand Down Expand Up @@ -32,7 +32,7 @@ class StreamingTests : FunSpec({
contentType(ContentType.parse("application/x-ndjson"))
}.also { response ->
response.readJsonStream { line ->
StoveObjectMapper.Default.readValue<ExampleData>(line)
StoveSerde.jackson.anyJsonStringSerde().deserialize<ExampleData>(line)
}.collect { data ->
println(data)
}
Expand Down

0 comments on commit 8df7342

Please sign in to comment.