Skip to content

Commit

Permalink
Introduce Kafka Producer.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralf Ueberfuhr committed Dec 19, 2024
1 parent 6342072 commit 241fabc
Show file tree
Hide file tree
Showing 14 changed files with 234 additions and 7 deletions.
4 changes: 4 additions & 0 deletions account-service-provider/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package de.sample.schulung.accounts.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.kafka.support.JacksonUtils;
import org.springframework.kafka.support.serializer.JsonSerializer;

public class CustomJsonSerializer extends JsonSerializer<Object> {

private static ObjectMapper createCustomObjectMapper() {
final var result = JacksonUtils.enhancedObjectMapper();
result.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
result.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
result.registerModule(new JavaTimeModule());
return result;
}

public CustomJsonSerializer() {
super(createCustomObjectMapper());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package de.sample.schulung.accounts.kafka;

import java.util.UUID;

public record CustomerEventRecord(
String eventType,
UUID uuid,
CustomerRecord customer
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package de.sample.schulung.accounts.kafka;

import de.sample.schulung.accounts.domain.Customer;
import de.sample.schulung.accounts.domain.Customer.CustomerState;
import de.sample.schulung.accounts.domain.events.CustomerCreatedEvent;
import de.sample.schulung.accounts.domain.events.CustomerDeletedEvent;
import de.sample.schulung.accounts.domain.events.CustomerReplacedEvent;
import org.springframework.stereotype.Component;

@Component
public class CustomerEventRecordMapper {

public String map(CustomerState state) {
return switch (state) {
case ACTIVE -> "active";
case LOCKED -> "locked";
case DISABLED -> "disabled";
};
}

public CustomerRecord map(Customer customer) {
return new CustomerRecord(
customer.getName(),
customer.getDateOfBirth(),
this.map(customer.getState())
);
}

public CustomerEventRecord map(CustomerCreatedEvent event) {
var customer = event.customer();
return new CustomerEventRecord(
"created",
customer.getUuid(),
this.map(customer)
);
}

public CustomerEventRecord map(CustomerReplacedEvent event) {
var customer = event.customer();
return new CustomerEventRecord(
"replaced",
customer.getUuid(),
this.map(customer)
);
}

public CustomerEventRecord map(CustomerDeletedEvent event) {
return new CustomerEventRecord(
"deleted",
event.uuid(),
null
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package de.sample.schulung.accounts.kafka;

import de.sample.schulung.accounts.domain.events.CustomerCreatedEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
@RequiredArgsConstructor
public class CustomerEventsProducer {

private final KafkaTemplate<UUID, Object> kafkaTemplate;
private final CustomerEventRecordMapper mapper;
private final KafkaApplicationConfiguration config;

@EventListener
public void handleCustomerCreatedEvent(CustomerCreatedEvent event) {
var payload = mapper.map(event);
kafkaTemplate.send(
config.getCustomerEventsTopic(),
event.customer().getUuid(),
payload
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package de.sample.schulung.accounts.kafka;

import java.time.LocalDate;

public record CustomerRecord(
String name,
LocalDate birthdate,
String state
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package de.sample.schulung.accounts.kafka;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties(prefix = "application.kafka")
@Getter
@Setter
public class KafkaApplicationConfiguration {

private String customerEventsTopic = "customer-events";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package de.sample.schulung.accounts.kafka;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaConfiguration {

@Bean
public NewTopic customerEventsTopic(KafkaApplicationConfiguration config) {
return TopicBuilder
.name(config.getCustomerEventsTopic())
.partitions(5)
.replicas(1)
.build();
}

}
12 changes: 11 additions & 1 deletion account-service-provider/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,14 @@ spring:
h2:
console:
path: /db
enabled: true
enabled: true
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.UUIDSerializer
value-serializer: de.sample.schulung.accounts.kafka.CustomJsonSerializer
admin:
auto-create: ${AUTO_CREATE_TOPIC:true}
application:
kafka:
customer-events-topic: customer-events
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts;

import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
Expand All @@ -11,13 +12,20 @@
import org.springframework.test.web.servlet.MockMvc;

import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.*;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.header;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;


@SpringBootTest
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
class AccountsApiTests {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts;

import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
Expand All @@ -15,6 +16,7 @@
@SpringBootTest
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
public class IndexPageTests {

@Autowired
Expand All @@ -23,8 +25,8 @@ public class IndexPageTests {
@Test
void shouldRedirectIndexPage() throws Exception {
var location = mvc.perform(
get("/")
)
get("/")
)
.andExpect(status().isFound())
.andExpect(header().exists("Location"))
.andReturn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import de.sample.schulung.accounts.domain.CustomersService;
import de.sample.schulung.accounts.domain.NotFoundException;
import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
Expand All @@ -15,14 +16,20 @@
import java.util.UUID;
import java.util.stream.Stream;

import static org.mockito.Mockito.*;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

@SpringBootTest
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
public class AccountsBoundaryTests {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts.domain;

import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
Expand All @@ -12,6 +13,7 @@

@SpringBootTest
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
public class CustomersServiceTest {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package de.sample.schulung.accounts.kafka;

import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.kafka.core.KafkaTemplate;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Auto-configures a {@link KafkaTemplate} mock in the test context.
* You can get the mock injected by simply using
* <pre>
* \u0040Autowired
* KafkaTemplate&lt;String, CustomerDto&gt; templateMock;
* </pre>
*/
@Documented
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@EnableAutoConfiguration(exclude = KafkaAutoConfiguration.class)
@MockBean(KafkaTemplate.class)
public @interface AutoConfigureKafkaTemplateMock {

}

0 comments on commit 241fabc

Please sign in to comment.