Skip to content

Commit

Permalink
Merge pull request #4 from ueberfuhr-trainings/feature/producer-tests
Browse files Browse the repository at this point in the history
Add test for embedded kafka (make count of partitions configurable).
  • Loading branch information
ueberfuhr authored Dec 20, 2024
2 parents 7fafd0e + c09f498 commit c2e8de8
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 10 deletions.
5 changes: 5 additions & 0 deletions account-service-provider/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
@Setter
public class KafkaApplicationConfiguration {

private String customerEventsTopic = "customer-events";
private String customerEventsTopic = "customer-events";
private int customerEventsPartitions = 1;

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
@Configuration
public class KafkaConfiguration {

@Bean
public NewTopic customerEventsTopic(KafkaApplicationConfiguration config) {
return TopicBuilder
.name(config.getCustomerEventsTopic())
.partitions(5)
.replicas(1)
.build();
}
@Bean
public NewTopic customerEventsTopic(KafkaApplicationConfiguration config) {
return TopicBuilder
.name(config.getCustomerEventsTopic())
.partitions(config.getCustomerEventsPartitions())
.replicas(1)
.build();
}

}
3 changes: 2 additions & 1 deletion account-service-provider/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ spring:
auto-create: ${AUTO_CREATE_TOPIC:true}
application:
kafka:
customer-events-topic: customer-events
customer-events-topic: customer-events
customer-events-partitions: 5
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package de.sample.schulung.accounts.kafka;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.UUIDDeserializer;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.stereotype.Component;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* Autoconfigures an {@link EmbeddedKafka}
* and provides an extension to run, reset and stop the container.<br/>
* We can get the following beans injected into our test class
* <pre>
* \u0040Autowired
* EmbeddedKafkaBroker kafka;
* \u0040Autowired
* KafkaTestContext&lt;Key,Value&gt; context;
* </pre>
*/
@Documented
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
// Kafka Configuration
@EmbeddedKafka(
partitions = 1
)
@TestPropertySource(
properties = """
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
# we disable this in production, but need this for the tests
spring.kafka.producer.properties."[spring.json.add.type.headers]"=true
application.kafka.customer-events-partitions=1
application.kafka.customer-events-topic=test-customer-events
"""
)
@Import({
AutoConfigureEmbeddedKafka.EmbeddedKafkaConfiguration.class,
AutoConfigureEmbeddedKafka.KafkaMessageListenerContainerLifecycleHandler.class
})
@ExtendWith(AutoConfigureEmbeddedKafka.EmbeddedKafkaExtension.class)
public @interface AutoConfigureEmbeddedKafka {

@RequiredArgsConstructor
@Getter(AccessLevel.PRIVATE)
class KafkaTestContext<K, V> {

private final BlockingQueue<ConsumerRecord<K, V>> records;
private final KafkaMessageListenerContainer<K, V> container;

@SneakyThrows
public Optional<ConsumerRecord<K, V>> poll(long timeout, TimeUnit unit) {
return Optional.ofNullable(this.records.poll(timeout, unit));
}

}


@TestConfiguration
class EmbeddedKafkaConfiguration {

@Bean
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
KafkaTestContext<?, ?> createKafkaTestContext(EmbeddedKafkaBroker kafka, List<NewTopic> topics) {
final var consumerFactory = new DefaultKafkaConsumerFactory<>(
Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBrokersAsString(),
ConsumerConfig.GROUP_ID_CONFIG, "consumer",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true",
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10",
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000",
// not needed, but must not be null
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, UUIDDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class,
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, CustomJsonDeserializer.class.getName(),
JsonDeserializer.TRUSTED_PACKAGES, "*",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
)
);
final var containerProperties = new ContainerProperties(
topics
.stream()
.map(NewTopic::name)
.toArray(String[]::new)
);
final var container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
final var records = new LinkedBlockingQueue<ConsumerRecord<Object, Object>>();
container.setupMessageListener((MessageListener<?, ?>) records::add);
return new KafkaTestContext<>(records, container);
}

}

@Component
@RequiredArgsConstructor
class KafkaMessageListenerContainerLifecycleHandler {

private final KafkaTestContext<?, ?> context;
private final EmbeddedKafkaBroker kafka;

@EventListener(ContextRefreshedEvent.class)
public void startup() {
context.getContainer().start();
ContainerTestUtils.waitForAssignment(
context.getContainer(),
kafka.getPartitionsPerTopic()
);
}

@EventListener(ContextClosedEvent.class)
public void shutdown() {
context.getContainer().stop();
}

}

// we need to reset the records between the tests
class EmbeddedKafkaExtension implements AfterEachCallback {

@Override
public void afterEach(ExtensionContext context) {
SpringExtension
.getApplicationContext(context)
.getBean(KafkaTestContext.class)
.getRecords()
.clear();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package de.sample.schulung.accounts.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.kafka.support.JacksonUtils;
import org.springframework.kafka.support.serializer.JsonDeserializer;

public class CustomJsonDeserializer extends JsonDeserializer<Object> {

private static ObjectMapper createCustomObjectMapper() {
final var result = JacksonUtils.enhancedObjectMapper();
result.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
result.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
result.registerModule(new JavaTimeModule());
return result;
}

public CustomJsonDeserializer() {
super(createCustomObjectMapper());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package de.sample.schulung.accounts.kafka;

import de.sample.schulung.accounts.domain.Customer;
import de.sample.schulung.accounts.domain.CustomersService;
import de.sample.schulung.accounts.kafka.AutoConfigureEmbeddedKafka.KafkaTestContext;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.LocalDate;
import java.time.Month;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.from;

@SpringBootTest
@AutoConfigureTestDatabase
@AutoConfigureEmbeddedKafka
class CustomerEventsProducerTests {

@Autowired
CustomersService service;
@Autowired
KafkaTestContext<UUID, CustomerEventRecord> context;
@Value("${application.kafka.customer-events-topic}")
String topicName;

@Test
void shouldProduceCustomerEventWhenCustomerIsCreated() {
var customer = new Customer();
customer.setName("Tom Mayer");
customer.setState(Customer.CustomerState.ACTIVE);
customer.setDateOfBirth(LocalDate.of(2000, Month.DECEMBER, 20));

service.createCustomer(customer);

assertThat(context.poll(3, TimeUnit.SECONDS))
.isPresent()
.get()
.returns(topicName, from(ConsumerRecord::topic))
.extracting(ConsumerRecord::value)
.returns("created", from(CustomerEventRecord::eventType))
.returns(customer.getUuid(), from(CustomerEventRecord::uuid))
.extracting(CustomerEventRecord::customer)
.returns("Tom Mayer", from(CustomerRecord::name));

}

}

0 comments on commit c2e8de8

Please sign in to comment.