diff --git a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/BaseOutput.java b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/BaseOutput.java index 2f4c513..1d501fa 100644 --- a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/BaseOutput.java +++ b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/BaseOutput.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 bakdata GmbH + * Copyright (c) 2024 bakdata GmbH * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,6 +24,8 @@ package com.bakdata.fluent_kafka_streams_tests; +import java.util.List; +import java.util.stream.Collectors; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.TestOutputTopic; @@ -114,6 +116,16 @@ public TestOutput asStream() { return new StreamOutput<>(this.testDriver, this.topic, this.keySerde, this.valueSerde); } + @Override + public List> toList() { + return this.testOutputTopic + .readRecordsToList() + .stream() + .map(testRecord -> new ProducerRecord<>(this.topic, 0, testRecord.timestamp(), testRecord.key(), + testRecord.value(), testRecord.getHeaders())) + .collect(Collectors.toList()); + } + // ================== // Non-public methods // ================== diff --git a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestOutput.java b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestOutput.java index 22ae948..469e577 100644 --- a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestOutput.java +++ b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestOutput.java @@ -24,6 +24,7 @@ package com.bakdata.fluent_kafka_streams_tests; +import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serde; @@ -147,5 +148,7 @@ default TestOutput withValueType(final Class valueType) { * @return Current output with {@link org.apache.kafka.streams.kstream.KStream} semantics */ TestOutput asStream(); + + List> toList(); }