From 2129eecd1923332b4544445a554e9c3190a87ee7 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 8 Apr 2024 11:39:33 +0200 Subject: [PATCH 1/7] Configure Serdes from TestTopology --- .../TestTopology.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java index 0605854..3fdbb38 100644 --- a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java +++ b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java @@ -451,6 +451,28 @@ public void stop() { } } + /** + * Configure a {@code Serde} for values using {@link #properties} + * @param serde serde to configure + * @return configured {@code Serde} + * @param type to be (de-)serialized + */ + public Serde configureValueSerde(final Serde serde) { + serde.configure(this.properties, false); + return serde; + } + + /** + * Configure a {@code Serde} for keys using {@link #properties} + * @param serde serde to configure + * @return configured {@code Serde} + * @param type to be (de-)serialized + */ + public Serde configureKeySerde(final Serde serde) { + serde.configure(this.properties, true); + return serde; + } + private Properties getProperties() { final Properties props = new Properties(); props.putAll(this.properties); From 5d79c08c45bb18a88ef02c2c9654e00ccba933ce Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 8 Apr 2024 13:46:22 +0200 Subject: [PATCH 2/7] Configure Serdes from TestTopology --- .../TestTopology.java | 38 ++++++++++++++++++- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java index 3fdbb38..8deb8d7 100644 --- a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java +++ b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java @@ -24,7 +24,10 @@ package com.bakdata.fluent_kafka_streams_tests; +import static java.util.Collections.emptyMap; + import com.bakdata.schemaregistrymock.SchemaRegistryMock; +import com.google.common.collect.ImmutableMap; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import java.io.File; @@ -458,7 +461,19 @@ public void stop() { * @param type to be (de-)serialized */ public Serde configureValueSerde(final Serde serde) { - serde.configure(this.properties, false); + return this.configureValueSerde(serde, emptyMap()); + } + + /** + * Configure a {@code Serde} for values using {@link #properties} and config overrides + * @param serde serde to configure + * @param config configuration overrides + * @return configured {@code Serde} + * @param type to be (de-)serialized + */ + public Serde configureValueSerde(final Serde serde, final Map config) { + final Map serdeConfig = this.mergeConfig(config); + serde.configure(serdeConfig, false); return serde; } @@ -469,10 +484,29 @@ public Serde configureValueSerde(final Serde serde) { * @param type to be (de-)serialized */ public Serde configureKeySerde(final Serde serde) { - serde.configure(this.properties, true); + return this.configureKeySerde(serde, emptyMap()); + } + + /** + * Configure a {@code Serde} for keys using {@link #properties} and config overrides + * @param serde serde to configure + * @param config configuration overrides + * @return configured {@code Serde} + * @param type to be (de-)serialized + */ + public Serde configureKeySerde(final Serde serde, final Map config) { + final Map serdeConfig = this.mergeConfig(config); + serde.configure(serdeConfig, true); return serde; } + private Map mergeConfig(final Map config) { + return ImmutableMap.builder() + .putAll(this.properties) + .putAll(config) + .build(); + } + private Properties getProperties() { final Properties props = new Properties(); props.putAll(this.properties); From 601be0b3382bd21e1e079f31fa6ed7c3582543f7 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 9 Apr 2024 07:17:52 +0200 Subject: [PATCH 3/7] Configure Serdes from TestTopology --- schema-registry-mock/build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/schema-registry-mock/build.gradle.kts b/schema-registry-mock/build.gradle.kts index d62ce0c..abe980b 100644 --- a/schema-registry-mock/build.gradle.kts +++ b/schema-registry-mock/build.gradle.kts @@ -6,7 +6,7 @@ dependencies { "api"(group = "io.confluent", name = "kafka-schema-registry-client", version = confluentVersion) "api"(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) - implementation(group = "org.wiremock", name = "wiremock", version = "3.4.2") + implementation(group = "org.wiremock", name = "wiremock", version = "3.3.1") val junit5Version: String by project testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junit5Version) From dd0fa587f41ffe9cc45803b027f56d74cd59a48a Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 9 Apr 2024 09:49:20 +0200 Subject: [PATCH 4/7] Configure Serdes from TestTopology --- schema-registry-mock/build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/schema-registry-mock/build.gradle.kts b/schema-registry-mock/build.gradle.kts index abe980b..261c9ee 100644 --- a/schema-registry-mock/build.gradle.kts +++ b/schema-registry-mock/build.gradle.kts @@ -6,7 +6,7 @@ dependencies { "api"(group = "io.confluent", name = "kafka-schema-registry-client", version = confluentVersion) "api"(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) - implementation(group = "org.wiremock", name = "wiremock", version = "3.3.1") + api(group = "org.wiremock", name = "wiremock", version = "3.4.2") val junit5Version: String by project testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junit5Version) From 5c1747f72f7ca70a2a74e0e845cc95822a9f1ee0 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 9 Apr 2024 10:42:59 +0200 Subject: [PATCH 5/7] Configure Serdes from TestTopology --- schema-registry-mock/build.gradle.kts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/schema-registry-mock/build.gradle.kts b/schema-registry-mock/build.gradle.kts index 261c9ee..ef01440 100644 --- a/schema-registry-mock/build.gradle.kts +++ b/schema-registry-mock/build.gradle.kts @@ -6,7 +6,9 @@ dependencies { "api"(group = "io.confluent", name = "kafka-schema-registry-client", version = confluentVersion) "api"(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) - api(group = "org.wiremock", name = "wiremock", version = "3.4.2") + implementation(group = "org.wiremock", name = "wiremock", version = "3.4.2") + // required because other dependencies use different Jackson versions if this library is used in test scope + api(group = "com.fasterxml.jackson.core", name = "jackson-databind", version = "2.15.3") val junit5Version: String by project testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junit5Version) From fcf5f7a15cf3519e16a188f0e85fb640e11a15bf Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 12 Apr 2024 13:41:21 +0200 Subject: [PATCH 6/7] Empty-Commit From 9d4aea9888a4b6b023d00bc3244168fb2c817424 Mon Sep 17 00:00:00 2001 From: Jakob Edding <15202881+JakobEdding@users.noreply.github.com> Date: Fri, 12 Apr 2024 15:22:07 +0200 Subject: [PATCH 7/7] Add method to convert output topic to List (#93) --- README.md | 28 ++++++++- .../junit4/WordCountTest.java | 62 ++++++++++++++++++- .../junit5/WordCountTest.java | 62 ++++++++++++++++++- .../BaseOutput.java | 25 +++++++- .../Expectation.java | 2 +- .../TestOutput.java | 13 +++- .../WordCountTest.java | 62 ++++++++++++++++++- 7 files changed, 243 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 3d206b4..e2525ad 100644 --- a/README.md +++ b/README.md @@ -101,9 +101,8 @@ To get the output, `TestTopology` provides two methods: `.streamOutput()` and `. They behave just like the input with regard to the number of output topics. Using the stream version simulates Kafka's stream-semantics, meaning that a key can be present many times in an output stream, whereas the table-semantics only output the newest value of each key. -To check the output records, you can call `.expectNextRecord()` to indicate that the output should not be empty. -You can then inspect the record with `.hasKey(K key)` and `.hasValue(V value)`. -Both are optional, but highly recommended so that your output is always valid. +To check the output records, you can call `.expectNextRecord()` and then chain `.hasKey(K key)`, `.hasKeySatisfying(Consumer requirements)`, `.hasValue(V value)` or `.hasValueSatisfying(Consumer requirements)` to this call. +Note that calling `.expectNextRecord()` by itself without chaining at least one of the `.has*` methods will not check for the existence of a next record! Once you expect no further records, call `.expectNoMoreRecord()` to indicate the end of the output stream. @@ -143,6 +142,29 @@ void shouldReturnCorrectIteratorTable() { } ``` +Alternatively, you can convert the output to `List` for use with your assertion framework. Here is an example of this with [AssertJ](http://joel-costigliola.github.io/assertj/). + +```java +@Test +void shouldConvertStreamOutputToList(){ + this.testTopology.input() + .add("cat") + .add("dog") + .add("bird"); + + final List>outputs = this.testTopology.streamOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .extracting(ProducerRecord::key) + .containsExactly("cat", "dog", "bird"); + assertThat(outputs) + .extracting(ProducerRecord::value) + .containsExactly(1L, 1L, 1L); +} +``` + #### More Examples You can find many more tests diff --git a/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountTest.java b/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountTest.java index e14eab6..8732451 100644 --- a/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountTest.java +++ b/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 bakdata GmbH + * Copyright (c) 2024 bakdata GmbH * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -244,4 +244,64 @@ public void shouldBeDoneAfterSingleWord() { public void shouldDoNothingOnEmptyInput() { this.testTopology.streamOutput().expectNoMoreRecord().and().expectNoMoreRecord().toBeEmpty(); } + + @Test + public void shouldConvertStreamOutputToList() { + this.testTopology.input() + .add("bla") + .add("blub") + .add("bla"); + + final List> outputs = this.testTopology.streamOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .extracting(ProducerRecord::key) + .containsExactly("bla", "blub", "bla"); + assertThat(outputs) + .extracting(ProducerRecord::value) + .containsExactly(1L, 1L, 2L); + } + + @Test + public void shouldConvertTableOutputToList() { + this.testTopology.input() + .add("bla") + .add("blub") + .add("bla"); + + final List> outputs = this.testTopology.tableOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .extracting(ProducerRecord::key) + .containsExactly("bla", "blub"); + assertThat(outputs) + .extracting(ProducerRecord::value) + .containsExactly(2L, 1L); + } + + @Test + public void shouldConvertEmptyStreamOutputToEmptyList() { + final List> outputs = this.testTopology.streamOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .isInstanceOf(List.class) + .isEmpty(); + } + + @Test + public void shouldConvertEmptyTableOutputToEmptyList() { + final List> outputs = this.testTopology.tableOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .isInstanceOf(List.class) + .isEmpty(); + } } diff --git a/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountTest.java b/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountTest.java index 4fa9d1b..c12b88a 100644 --- a/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountTest.java +++ b/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 bakdata GmbH + * Copyright (c) 2024 bakdata GmbH * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -244,4 +244,64 @@ void shouldBeDoneAfterSingleWord() { void shouldDoNothingOnEmptyInput() { this.testTopology.streamOutput().expectNoMoreRecord().and().expectNoMoreRecord().toBeEmpty(); } + + @Test + void shouldConvertStreamOutputToList() { + this.testTopology.input() + .add("bla") + .add("blub") + .add("bla"); + + final List> outputs = this.testTopology.streamOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .extracting(ProducerRecord::key) + .containsExactly("bla", "blub", "bla"); + assertThat(outputs) + .extracting(ProducerRecord::value) + .containsExactly(1L, 1L, 2L); + } + + @Test + void shouldConvertTableOutputToList() { + this.testTopology.input() + .add("bla") + .add("blub") + .add("bla"); + + final List> outputs = this.testTopology.tableOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .extracting(ProducerRecord::key) + .containsExactly("bla", "blub"); + assertThat(outputs) + .extracting(ProducerRecord::value) + .containsExactly(2L, 1L); + } + + @Test + void shouldConvertEmptyStreamOutputToEmptyList() { + final List> outputs = this.testTopology.streamOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .isInstanceOf(List.class) + .isEmpty(); + } + + @Test + void shouldConvertEmptyTableOutputToEmptyList() { + final List> outputs = this.testTopology.tableOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .isInstanceOf(List.class) + .isEmpty(); + } } diff --git a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/BaseOutput.java b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/BaseOutput.java index 2f4c513..9e73cf0 100644 --- a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/BaseOutput.java +++ b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/BaseOutput.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 bakdata GmbH + * Copyright (c) 2024 bakdata GmbH * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,6 +24,8 @@ package com.bakdata.fluent_kafka_streams_tests; +import java.util.ArrayList; +import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.TestOutputTopic; @@ -74,7 +76,10 @@ public TestOutput withValueSerde(final Serde valueSerde) { } /** - * Reads the next record as creates an {@link Expectation} for it.
+ * Reads the next record and creates an {@link Expectation} for it.
+ * + * Note that calling this method by itself without chaining at least one of the {@code has*()} methods will not + * check for the existence of a next record!
* * @return An {@link Expectation} containing the next record from the output.
*/ @@ -114,6 +119,22 @@ public TestOutput asStream() { return new StreamOutput<>(this.testDriver, this.topic, this.keySerde, this.valueSerde); } + /** + * Convert the output to a {@link java.util.List}. In case the current instance of this class is a + * {@link StreamOutput}, the output will be converted to List with {@link org.apache.kafka.streams.kstream.KStream} + * semantics (each key multiple times). In case the current instance of this class is a {@link TableOutput}, the + * output will be converted to List with {@link org.apache.kafka.streams.kstream.KTable} semantics (each key only + * once). + * + * @return A {@link java.util.List} representing the output + */ + @Override + public List> toList() { + final List> list = new ArrayList<>(); + this.iterator().forEachRemaining(list::add); + return list; + } + // ================== // Non-public methods // ================== diff --git a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/Expectation.java b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/Expectation.java index c299e35..d10b017 100644 --- a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/Expectation.java +++ b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/Expectation.java @@ -113,7 +113,7 @@ public Expectation and() { } /** - *

Reads the next record as creates an {@code Expectation} for it.

+ *

Reads the next record and creates an {@code Expectation} for it.

*

This is logically equivalent to {@link TestOutput#expectNextRecord()}.

*

This methods main purpose is to allow chaining:

*
{@code
diff --git a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestOutput.java b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestOutput.java
index 22ae948..8d18e69 100644
--- a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestOutput.java
+++ b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestOutput.java
@@ -1,7 +1,7 @@
 /*
  * MIT License
  *
- * Copyright (c) 2023 bakdata GmbH
+ * Copyright (c) 2024 bakdata GmbH
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal
@@ -24,6 +24,7 @@
 
 package com.bakdata.fluent_kafka_streams_tests;
 
+import java.util.List;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serde;
 
@@ -117,7 +118,7 @@ default  TestOutput withValueType(final Class valueType) {
     ProducerRecord readOneRecord();
 
     /**
-     * Reads the next record as creates an {@link Expectation} for it.
+     * Reads the next record and creates an {@link Expectation} for it.
      *
      * @return An {@link Expectation} containing the next record from the output.
      */
@@ -144,8 +145,16 @@ default  TestOutput withValueType(final Class valueType) {
      * 

This is the default, there should usually be no need to call this method.

*

Note: once the first value of the stream has been read or the iterator has be called, you cannot switch * between the output types any more.

+ * * @return Current output with {@link org.apache.kafka.streams.kstream.KStream} semantics */ TestOutput asStream(); + + /** + * Convert the output to a {@link java.util.List}. + * + * @return A {@link java.util.List} representing the output + */ + List> toList(); } diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountTest.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountTest.java index 1b35b18..159d502 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountTest.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata GmbH + * Copyright (c) 2024 bakdata GmbH * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -387,4 +387,64 @@ void shouldBeDoneAfterSingleWord() { void shouldDoNothingOnEmptyInput() { this.testTopology.streamOutput().expectNoMoreRecord().and().expectNoMoreRecord().toBeEmpty(); } + + @Test + void shouldConvertStreamOutputToList() { + this.testTopology.input() + .add("bla") + .add("blub") + .add("bla"); + + final List> outputs = this.testTopology.streamOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .extracting(ProducerRecord::key) + .containsExactly("bla", "blub", "bla"); + assertThat(outputs) + .extracting(ProducerRecord::value) + .containsExactly(1L, 1L, 2L); + } + + @Test + void shouldConvertTableOutputToList() { + this.testTopology.input() + .add("bla") + .add("blub") + .add("bla"); + + final List> outputs = this.testTopology.tableOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .extracting(ProducerRecord::key) + .containsExactly("bla", "blub"); + assertThat(outputs) + .extracting(ProducerRecord::value) + .containsExactly(2L, 1L); + } + + @Test + void shouldConvertEmptyStreamOutputToEmptyList() { + final List> outputs = this.testTopology.streamOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .isInstanceOf(List.class) + .isEmpty(); + } + + @Test + void shouldConvertEmptyTableOutputToEmptyList() { + final List> outputs = this.testTopology.tableOutput() + .withSerde(Serdes.String(), Serdes.Long()) + .toList(); + + assertThat(outputs) + .isInstanceOf(List.class) + .isEmpty(); + } }