Skip to content

Commit

Permalink
Add method to convert output topic to List (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
JakobEdding authored Apr 12, 2024
1 parent 66a8657 commit 9d4aea9
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 11 deletions.
28 changes: 25 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ To get the output, `TestTopology` provides two methods: `.streamOutput()` and `.
They behave just like the input with regard to the number of output topics.
Using the stream version simulates Kafka's stream-semantics, meaning that a key can be present many times in an output stream, whereas the table-semantics only output the newest value of each key.

To check the output records, you can call `.expectNextRecord()` to indicate that the output should not be empty.
You can then inspect the record with `.hasKey(K key)` and `.hasValue(V value)`.
Both are optional, but highly recommended so that your output is always valid.
To check the output records, you can call `.expectNextRecord()` and then chain `.hasKey(K key)`, `.hasKeySatisfying(Consumer<K> requirements)`, `.hasValue(V value)` or `.hasValueSatisfying(Consumer<V> requirements)` to this call.
Note that calling `.expectNextRecord()` by itself without chaining at least one of the `.has*` methods will not check for the existence of a next record!

Once you expect no further records, call `.expectNoMoreRecord()` to indicate the end of the output stream.

Expand Down Expand Up @@ -143,6 +142,29 @@ void shouldReturnCorrectIteratorTable() {
}
```

Alternatively, you can convert the output to `List` for use with your assertion framework. Here is an example of this with [AssertJ](http://joel-costigliola.github.io/assertj/).

```java
@Test
void shouldConvertStreamOutputToList(){
this.testTopology.input()
.add("cat")
.add("dog")
.add("bird");

final List<ProducerRecord<String, Long>>outputs = this.testTopology.streamOutput()
.withSerde(Serdes.String(), Serdes.Long())
.toList();

assertThat(outputs)
.extracting(ProducerRecord::key)
.containsExactly("cat", "dog", "bird");
assertThat(outputs)
.extracting(ProducerRecord::value)
.containsExactly(1L, 1L, 1L);
}
```

#### More Examples

You can find many more tests
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2019 bakdata GmbH
* Copyright (c) 2024 bakdata GmbH
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -244,4 +244,64 @@ public void shouldBeDoneAfterSingleWord() {
public void shouldDoNothingOnEmptyInput() {
this.testTopology.streamOutput().expectNoMoreRecord().and().expectNoMoreRecord().toBeEmpty();
}

@Test
public void shouldConvertStreamOutputToList() {
this.testTopology.input()
.add("bla")
.add("blub")
.add("bla");

final List<ProducerRecord<String, Long>> outputs = this.testTopology.streamOutput()
.withSerde(Serdes.String(), Serdes.Long())
.toList();

assertThat(outputs)
.extracting(ProducerRecord::key)
.containsExactly("bla", "blub", "bla");
assertThat(outputs)
.extracting(ProducerRecord::value)
.containsExactly(1L, 1L, 2L);
}

@Test
public void shouldConvertTableOutputToList() {
this.testTopology.input()
.add("bla")
.add("blub")
.add("bla");

final List<ProducerRecord<String, Long>> outputs = this.testTopology.tableOutput()
.withSerde(Serdes.String(), Serdes.Long())
.toList();

assertThat(outputs)
.extracting(ProducerRecord::key)
.containsExactly("bla", "blub");
assertThat(outputs)
.extracting(ProducerRecord::value)
.containsExactly(2L, 1L);
}

@Test
public void shouldConvertEmptyStreamOutputToEmptyList() {
final List<ProducerRecord<String, Long>> outputs = this.testTopology.streamOutput()
.withSerde(Serdes.String(), Serdes.Long())
.toList();

assertThat(outputs)
.isInstanceOf(List.class)
.isEmpty();
}

@Test
public void shouldConvertEmptyTableOutputToEmptyList() {
final List<ProducerRecord<String, Long>> outputs = this.testTopology.tableOutput()
.withSerde(Serdes.String(), Serdes.Long())
.toList();

assertThat(outputs)
.isInstanceOf(List.class)
.isEmpty();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2019 bakdata GmbH
* Copyright (c) 2024 bakdata GmbH
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -244,4 +244,64 @@ void shouldBeDoneAfterSingleWord() {
void shouldDoNothingOnEmptyInput() {
this.testTopology.streamOutput().expectNoMoreRecord().and().expectNoMoreRecord().toBeEmpty();
}

@Test
void shouldConvertStreamOutputToList() {
this.testTopology.input()
.add("bla")
.add("blub")
.add("bla");

final List<ProducerRecord<String, Long>> outputs = this.testTopology.streamOutput()
.withSerde(Serdes.String(), Serdes.Long())
.toList();

assertThat(outputs)
.extracting(ProducerRecord::key)
.containsExactly("bla", "blub", "bla");
assertThat(outputs)
.extracting(ProducerRecord::value)
.containsExactly(1L, 1L, 2L);
}

@Test
void shouldConvertTableOutputToList() {
this.testTopology.input()
.add("bla")
.add("blub")
.add("bla");

final List<ProducerRecord<String, Long>> outputs = this.testTopology.tableOutput()
.withSerde(Serdes.String(), Serdes.Long())
.toList();

assertThat(outputs)
.extracting(ProducerRecord::key)
.containsExactly("bla", "blub");
assertThat(outputs)
.extracting(ProducerRecord::value)
.containsExactly(2L, 1L);
}

@Test
void shouldConvertEmptyStreamOutputToEmptyList() {
final List<ProducerRecord<String, Long>> outputs = this.testTopology.streamOutput()
.withSerde(Serdes.String(), Serdes.Long())
.toList();

assertThat(outputs)
.isInstanceOf(List.class)
.isEmpty();
}

@Test
void shouldConvertEmptyTableOutputToEmptyList() {
final List<ProducerRecord<String, Long>> outputs = this.testTopology.tableOutput()
.withSerde(Serdes.String(), Serdes.Long())
.toList();

assertThat(outputs)
.isInstanceOf(List.class)
.isEmpty();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2019 bakdata GmbH
* Copyright (c) 2024 bakdata GmbH
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,6 +24,8 @@

package com.bakdata.fluent_kafka_streams_tests;

import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.TestOutputTopic;
Expand Down Expand Up @@ -74,7 +76,10 @@ public <VR> TestOutput<K, VR> withValueSerde(final Serde<VR> valueSerde) {
}

/**
* Reads the next record as creates an {@link Expectation} for it.<br/>
* Reads the next record and creates an {@link Expectation} for it.<br/>
*
* Note that calling this method by itself without chaining at least one of the {@code has*()} methods will not
* check for the existence of a next record!<br/>
*
* @return An {@link Expectation} containing the next record from the output.<br/>
*/
Expand Down Expand Up @@ -114,6 +119,22 @@ public TestOutput<K, V> asStream() {
return new StreamOutput<>(this.testDriver, this.topic, this.keySerde, this.valueSerde);
}

/**
* Convert the output to a {@link java.util.List}. In case the current instance of this class is a
* {@link StreamOutput}, the output will be converted to List with {@link org.apache.kafka.streams.kstream.KStream}
* semantics (each key multiple times). In case the current instance of this class is a {@link TableOutput}, the
* output will be converted to List with {@link org.apache.kafka.streams.kstream.KTable} semantics (each key only
* once).
*
* @return A {@link java.util.List} representing the output
*/
@Override
public List<ProducerRecord<K, V>> toList() {
final List<ProducerRecord<K, V>> list = new ArrayList<>();
this.iterator().forEachRemaining(list::add);
return list;
}

// ==================
// Non-public methods
// ==================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public Expectation<K, V> and() {
}

/**
* <p>Reads the next record as creates an {@code Expectation} for it.</p>
* <p>Reads the next record and creates an {@code Expectation} for it.</p>
* <p>This is logically equivalent to {@link TestOutput#expectNextRecord()}.</p>
* <p>This methods main purpose is to allow chaining:</p>
* <pre>{@code
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata GmbH
* Copyright (c) 2024 bakdata GmbH
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,6 +24,7 @@

package com.bakdata.fluent_kafka_streams_tests;

import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;

Expand Down Expand Up @@ -117,7 +118,7 @@ default <VR> TestOutput<K, VR> withValueType(final Class<VR> valueType) {
ProducerRecord<K, V> readOneRecord();

/**
* Reads the next record as creates an {@link Expectation} for it.
* Reads the next record and creates an {@link Expectation} for it.
*
* @return An {@link Expectation} containing the next record from the output.
*/
Expand All @@ -144,8 +145,16 @@ default <VR> TestOutput<K, VR> withValueType(final Class<VR> valueType) {
* <p>This is the default, there should usually be no need to call this method.</p>
* <p>Note: once the first value of the stream has been read or the iterator has be called, you cannot switch
* between the output types any more.</p>
*
* @return Current output with {@link org.apache.kafka.streams.kstream.KStream} semantics
*/
TestOutput<K, V> asStream();

/**
* Convert the output to a {@link java.util.List}.
*
* @return A {@link java.util.List} representing the output
*/
List<ProducerRecord<K, V>> toList();
}

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata GmbH
* Copyright (c) 2024 bakdata GmbH
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -387,4 +387,64 @@ void shouldBeDoneAfterSingleWord() {
void shouldDoNothingOnEmptyInput() {
this.testTopology.streamOutput().expectNoMoreRecord().and().expectNoMoreRecord().toBeEmpty();
}

@Test
void shouldConvertStreamOutputToList() {
this.testTopology.input()
.add("bla")
.add("blub")
.add("bla");

final List<ProducerRecord<String, Long>> outputs = this.testTopology.streamOutput()
.withSerde(Serdes.String(), Serdes.Long())
.toList();

assertThat(outputs)
.extracting(ProducerRecord::key)
.containsExactly("bla", "blub", "bla");
assertThat(outputs)
.extracting(ProducerRecord::value)
.containsExactly(1L, 1L, 2L);
}

@Test
void shouldConvertTableOutputToList() {
this.testTopology.input()
.add("bla")
.add("blub")
.add("bla");

final List<ProducerRecord<String, Long>> outputs = this.testTopology.tableOutput()
.withSerde(Serdes.String(), Serdes.Long())
.toList();

assertThat(outputs)
.extracting(ProducerRecord::key)
.containsExactly("bla", "blub");
assertThat(outputs)
.extracting(ProducerRecord::value)
.containsExactly(2L, 1L);
}

@Test
void shouldConvertEmptyStreamOutputToEmptyList() {
final List<ProducerRecord<String, Long>> outputs = this.testTopology.streamOutput()
.withSerde(Serdes.String(), Serdes.Long())
.toList();

assertThat(outputs)
.isInstanceOf(List.class)
.isEmpty();
}

@Test
void shouldConvertEmptyTableOutputToEmptyList() {
final List<ProducerRecord<String, Long>> outputs = this.testTopology.tableOutput()
.withSerde(Serdes.String(), Serdes.Long())
.toList();

assertThat(outputs)
.isInstanceOf(List.class)
.isEmpty();
}
}

0 comments on commit 9d4aea9

Please sign in to comment.