kafkaTemplate;
+ private final CustomerEventRecordMapper mapper;
+ private final KafkaApplicationConfiguration config;
+
+ @EventListener
+ public void handleCustomerCreatedEvent(CustomerCreatedEvent event) {
+ var payload = mapper.map(event);
+ kafkaTemplate.send(
+ config.getCustomerEventsTopic(),
+ event.customer().getUuid(),
+ payload
+ );
+ }
+
+}
diff --git a/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomerRecord.java b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomerRecord.java
new file mode 100644
index 0000000..a139a3c
--- /dev/null
+++ b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomerRecord.java
@@ -0,0 +1,10 @@
+package de.sample.schulung.accounts.kafka;
+
+import java.time.LocalDate;
+
+public record CustomerRecord(
+ String name,
+ LocalDate birthdate,
+ String state
+) {
+}
diff --git a/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/KafkaApplicationConfiguration.java b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/KafkaApplicationConfiguration.java
new file mode 100644
index 0000000..a9ea088
--- /dev/null
+++ b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/KafkaApplicationConfiguration.java
@@ -0,0 +1,16 @@
+package de.sample.schulung.accounts.kafka;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+@Component
+@ConfigurationProperties(prefix = "application.kafka")
+@Getter
+@Setter
+public class KafkaApplicationConfiguration {
+
+ private String customerEventsTopic = "customer-events";
+
+}
diff --git a/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/KafkaConfiguration.java b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/KafkaConfiguration.java
new file mode 100644
index 0000000..6a6a3de
--- /dev/null
+++ b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/KafkaConfiguration.java
@@ -0,0 +1,20 @@
+package de.sample.schulung.accounts.kafka;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.TopicBuilder;
+
+@Configuration
+public class KafkaConfiguration {
+
+ @Bean
+ public NewTopic customerEventsTopic(KafkaApplicationConfiguration config) {
+ return TopicBuilder
+ .name(config.getCustomerEventsTopic())
+ .partitions(5)
+ .replicas(1)
+ .build();
+ }
+
+}
diff --git a/account-service-provider/src/main/resources/application.yml b/account-service-provider/src/main/resources/application.yml
index 2dba64a..379fd62 100644
--- a/account-service-provider/src/main/resources/application.yml
+++ b/account-service-provider/src/main/resources/application.yml
@@ -15,4 +15,14 @@ spring:
h2:
console:
path: /db
- enabled: true
\ No newline at end of file
+ enabled: true
+ kafka:
+ bootstrap-servers: localhost:9092
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.UUIDSerializer
+ value-serializer: de.sample.schulung.accounts.kafka.CustomJsonSerializer
+ admin:
+ auto-create: ${AUTO_CREATE_TOPIC:true}
+application:
+ kafka:
+ customer-events-topic: customer-events
\ No newline at end of file
diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/AccountsApiTests.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/AccountsApiTests.java
index 737c86e..91603da 100644
--- a/account-service-provider/src/test/java/de/sample/schulung/accounts/AccountsApiTests.java
+++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/AccountsApiTests.java
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts;
+import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
@@ -11,13 +12,20 @@
import org.springframework.test.web.servlet.MockMvc;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.*;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.header;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@SpringBootTest
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
+@AutoConfigureKafkaTemplateMock
class AccountsApiTests {
@Autowired
diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/IndexPageTests.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/IndexPageTests.java
index 259c02c..abe0709 100644
--- a/account-service-provider/src/test/java/de/sample/schulung/accounts/IndexPageTests.java
+++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/IndexPageTests.java
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts;
+import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
@@ -15,6 +16,7 @@
@SpringBootTest
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
+@AutoConfigureKafkaTemplateMock
public class IndexPageTests {
@Autowired
@@ -23,8 +25,8 @@ public class IndexPageTests {
@Test
void shouldRedirectIndexPage() throws Exception {
var location = mvc.perform(
- get("/")
- )
+ get("/")
+ )
.andExpect(status().isFound())
.andExpect(header().exists("Location"))
.andReturn()
diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/boundary/AccountsBoundaryTests.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/boundary/AccountsBoundaryTests.java
index 7701db0..170bff5 100644
--- a/account-service-provider/src/test/java/de/sample/schulung/accounts/boundary/AccountsBoundaryTests.java
+++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/boundary/AccountsBoundaryTests.java
@@ -2,6 +2,7 @@
import de.sample.schulung.accounts.domain.CustomersService;
import de.sample.schulung.accounts.domain.NotFoundException;
+import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
@@ -15,14 +16,20 @@
import java.util.UUID;
import java.util.stream.Stream;
-import static org.mockito.Mockito.*;
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@SpringBootTest
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
+@AutoConfigureKafkaTemplateMock
public class AccountsBoundaryTests {
@Autowired
diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersServiceTest.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersServiceTest.java
index 3e7be1c..b1cedd6 100644
--- a/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersServiceTest.java
+++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersServiceTest.java
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts.domain;
+import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
@@ -12,6 +13,7 @@
@SpringBootTest
@AutoConfigureTestDatabase
+@AutoConfigureKafkaTemplateMock
public class CustomersServiceTest {
@Autowired
diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/kafka/AutoConfigureKafkaTemplateMock.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/kafka/AutoConfigureKafkaTemplateMock.java
new file mode 100644
index 0000000..49a4947
--- /dev/null
+++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/kafka/AutoConfigureKafkaTemplateMock.java
@@ -0,0 +1,31 @@
+package de.sample.schulung.accounts.kafka;
+
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.kafka.core.KafkaTemplate;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Auto-configures a {@link KafkaTemplate} mock in the test context.
+ * You can get the mock injected by simply using
+ *
+ * \u0040Autowired
+ * KafkaTemplate<String, CustomerDto> templateMock;
+ *
+ */
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+@EnableAutoConfiguration(exclude = KafkaAutoConfiguration.class)
+@MockBean(KafkaTemplate.class)
+public @interface AutoConfigureKafkaTemplateMock {
+
+}