Skip to content

Commit

Permalink
Merge pull request #5 from ueberfuhr-trainings/feature/exception-hand…
Browse files Browse the repository at this point in the history
…ling

Add retry and dlt handling.
  • Loading branch information
ueberfuhr authored Dec 20, 2024
2 parents c2e8de8 + 38dba7b commit 7b383f2
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;

@Component
Expand All @@ -18,19 +21,39 @@ public class CustomerEventListener {

private final CustomersService customersService;

/*
* Auto-creates or needs the following topics:
* - customer-events-retry-500-0
* - customer-events-retry-1000-0
* - customer-events-retry-2000-0
* - customer-events-retry-4000-0
* - customer-events-dlt-0
*/
@RetryableTopic(
attempts = "5",
backoff = @Backoff(
delay = 500L,
multiplier = 2
)
)
@KafkaListener(
topics = "${application.kafka.customer-events-topic}"
)
public void consume(
@Payload CustomerEventRecord record,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
Acknowledgment acknowledgement
) {
log.info(
"Received record: {} {} from partition: {}",
record.eventType(),
record.uuid(),
partition
);
// Use this to test a processing error.
// if (true) {
// throw new RuntimeException("processing error");
// }
switch (record.eventType()) {
case "created":
case "replaced":
Expand All @@ -52,6 +75,7 @@ public void consume(
default:
throw new ValidationException();
}
acknowledgement.acknowledge();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package de.sample.schulung.statistics.kafka.exceptions;

import org.springframework.kafka.support.serializer.FailedDeserializationInfo;

import java.util.function.Function;

public class CustomDeserializationFailureHandler<T>
implements Function<FailedDeserializationInfo, T> {

@Override
public T apply(FailedDeserializationInfo failedDeserializationInfo) {
// hier Fehler analysieren und eigenes Objekt erzeugen
byte[] payload = failedDeserializationInfo.getData();
return null; // TODO
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package de.sample.schulung.statistics.kafka.exceptions;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.kafka.support.JacksonUtils;
import org.springframework.kafka.support.serializer.JsonSerializer;

public class CustomJsonSerializer extends JsonSerializer<Object> {

private static ObjectMapper createCustomObjectMapper() {
final var result = JacksonUtils.enhancedObjectMapper();
result.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
result.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
result.registerModule(new JavaTimeModule());
return result;
}

public CustomJsonSerializer() {
super(createCustomObjectMapper());
}
}
15 changes: 13 additions & 2 deletions statistics-service-provider/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,24 @@ spring:
bootstrap-servers: localhost:9092
consumer:
key-deserializer: org.apache.kafka.common.serialization.UUIDDeserializer
value-deserializer: de.sample.schulung.statistics.kafka.CustomJsonDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
group-id: customer-statistics
auto-offset-reset: earliest
properties:
"[spring.json.use.type.headers]": false
"[spring.json.value.default.type]": de.sample.schulung.statistics.kafka.CustomerEventRecord

# Parameters for ErrorHandlingDeserializer
"[spring.deserializer.value.delegate.class]": de.sample.schulung.statistics.kafka.CustomJsonDeserializer
"[spring.deserializer.value.function]": de.sample.schulung.statistics.kafka.exceptions.CustomDeserializationFailureHandler
enable-auto-commit: false
producer: # DLT! -> multiple producers: https://codingnconcepts.com/spring-boot/configure-multiple-kafka-producer/
key-serializer: org.apache.kafka.common.serialization.UUIDSerializer
value-serializer: de.sample.schulung.statistics.kafka.exceptions.CustomJsonSerializer
properties:
# do not serialize the class name into the message
"[spring.json.add.type.headers]": false
listener:
ack-mode: manual
application:
kafka:
customer-events-topic: customer-events

0 comments on commit 7b383f2

Please sign in to comment.