diff --git a/fluent-kafka-streams-tests-junit4/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit4/TestTopologyRule.java b/fluent-kafka-streams-tests-junit4/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit4/TestTopologyRule.java index 07f136d..5facd5f 100644 --- a/fluent-kafka-streams-tests-junit4/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit4/TestTopologyRule.java +++ b/fluent-kafka-streams-tests-junit4/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit4/TestTopologyRule.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -27,7 +27,6 @@ import com.bakdata.fluent_kafka_streams_tests.TestTopology; import com.bakdata.schemaregistrymock.SchemaRegistryMock; import java.util.Map; -import java.util.Properties; import java.util.function.Function; import java.util.function.Supplier; import lombok.Getter; @@ -72,39 +71,39 @@ public class TestTopologyRule extends TestTopology implements TestRule { public TestTopologyRule( - final Function topologyFactory, - final Function> propertiesFactory) { + final Function, ? extends Topology> topologyFactory, + final Function> propertiesFactory) { super(topologyFactory, propertiesFactory); } public TestTopologyRule( - final Function topologyFactory, - final Map properties) { + final Function, ? extends Topology> topologyFactory, + final Map properties) { super(topologyFactory, properties); } public TestTopologyRule( final Supplier topologyFactory, - final Function> propertiesFactory) { + final Function> propertiesFactory) { super(topologyFactory, propertiesFactory); } public TestTopologyRule( - final Supplier topologyFactory, final Map properties) { + final Supplier topologyFactory, final Map properties) { super(topologyFactory, properties); } public TestTopologyRule(final Topology topology, - final Function> propertiesFactory) { + final Function> propertiesFactory) { super(topology, propertiesFactory); } - public TestTopologyRule(final Topology topology, final Map properties) { + public TestTopologyRule(final Topology topology, final Map properties) { super(topology, properties); } protected TestTopologyRule( - final Function topologyFactory, - final Function> propertiesFactory, + final Function, ? extends Topology> topologyFactory, + final Function> propertiesFactory, final Serde defaultKeySerde, final Serde defaultValueSerde, final SchemaRegistryMock schemaRegistryMock) { super(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, schemaRegistryMock); @@ -126,8 +125,9 @@ public void evaluate() throws Throwable { } @Override - protected TestTopologyRule with(final Function topologyFactory, - final Function> propertiesFactory, final Serde defaultKeySerde, + protected TestTopologyRule with( + final Function, ? extends Topology> topologyFactory, + final Function> propertiesFactory, final Serde defaultKeySerde, final Serde defaultValueSerde, final SchemaRegistryMock schemaRegistryMock) { return new TestTopologyRule<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, diff --git a/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountTest.java b/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountTest.java index 8732451..09c884e 100644 --- a/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountTest.java +++ b/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountTest.java @@ -41,7 +41,7 @@ public class WordCountTest { @Rule public final TestTopologyRule testTopology = - new TestTopologyRule<>(this.app::getTopology, this.app.getKafkaProperties()); + new TestTopologyRule<>(this.app::getTopology, WordCount.getKafkaProperties()); @Test public void shouldAggregateSameWordStream() { diff --git a/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWithStaticTopologyTest.java b/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWithStaticTopologyTest.java index 97815c9..0cf2003 100644 --- a/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWithStaticTopologyTest.java +++ b/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWithStaticTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 bakdata GmbH + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -34,7 +34,7 @@ public class WordCountWithStaticTopologyTest { @Rule public final TestTopologyRule testTopology = new TestTopologyRule<>(this.app.getTopology(), - this.app.getKafkaProperties()); + WordCount.getKafkaProperties()); @Test public void shouldAggregateSameWordStream() { diff --git a/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWitherTest.java b/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWitherTest.java index fd00c28..c3a1c50 100644 --- a/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWitherTest.java +++ b/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWitherTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 bakdata GmbH + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -37,7 +37,7 @@ public class WordCountWitherTest { @Rule public final TestTopologyRule testTopology = - new TestTopologyRule<>(this.app.getTopology(), this.app.getKafkaProperties()) + new TestTopologyRule<>(this.app.getTopology(), WordCount.getKafkaProperties()) .withDefaultValueSerde(Serdes.String()) .withSchemaRegistryMock(new SchemaRegistryMock(List.of(new AvroSchemaProvider()))); diff --git a/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/test_applications/WordCount.java b/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/test_applications/WordCount.java index b726cb0..8ac8ee6 100644 --- a/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/test_applications/WordCount.java +++ b/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/test_applications/WordCount.java @@ -1,12 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.bakdata.fluent_kafka_streams_tests.junit4.test_applications; import java.util.Arrays; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; import java.util.regex.Pattern; import lombok.Getter; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; @@ -14,27 +39,19 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; +@Getter public class WordCount { - @Getter private final String inputTopic = "wordcount-input"; - @Getter private final String outputTopic = "wordcount-output"; - public static void main(final String[] args) { - final WordCount wordCount = new WordCount(); - final KafkaStreams streams = new KafkaStreams(wordCount.getTopology(), wordCount.getKafkaProperties()); - streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); - } - - public Properties getKafkaProperties() { + public static Map getKafkaProperties() { final String brokers = "localhost:9092"; - final Properties kafkaConfig = new Properties(); - kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount"); - kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); - kafkaConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - kafkaConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + final Map kafkaConfig = new HashMap<>(); + kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount"); + kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); + kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); return kafkaConfig; } diff --git a/fluent-kafka-streams-tests-junit5/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit5/TestTopologyExtension.java b/fluent-kafka-streams-tests-junit5/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit5/TestTopologyExtension.java index 08afcfc..cd5792e 100644 --- a/fluent-kafka-streams-tests-junit5/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit5/TestTopologyExtension.java +++ b/fluent-kafka-streams-tests-junit5/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit5/TestTopologyExtension.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -27,7 +27,6 @@ import com.bakdata.fluent_kafka_streams_tests.TestTopology; import com.bakdata.schemaregistrymock.SchemaRegistryMock; import java.util.Map; -import java.util.Properties; import java.util.function.Function; import java.util.function.Supplier; import lombok.Getter; @@ -72,40 +71,40 @@ public class TestTopologyExtension extends TestTopology implements BeforeEachCallback, AfterEachCallback { public TestTopologyExtension( - final Function topologyFactory, - final Function> propertiesFactory) { + final Function, ? extends Topology> topologyFactory, + final Function> propertiesFactory) { super(topologyFactory, propertiesFactory); } public TestTopologyExtension( - final Function topologyFactory, - final Map properties) { + final Function, ? extends Topology> topologyFactory, + final Map properties) { super(topologyFactory, properties); } public TestTopologyExtension( final Supplier topologyFactory, - final Function> propertiesFactory) { + final Function> propertiesFactory) { super(topologyFactory, propertiesFactory); } public TestTopologyExtension( - final Supplier topologyFactory, final Map properties) { + final Supplier topologyFactory, final Map properties) { super(topologyFactory, properties); } public TestTopologyExtension(final Topology topology, - final Function> propertiesFactory) { + final Function> propertiesFactory) { super(topology, propertiesFactory); } - public TestTopologyExtension(final Topology topology, final Map properties) { + public TestTopologyExtension(final Topology topology, final Map properties) { super(topology, properties); } protected TestTopologyExtension( - final Function topologyFactory, - final Function> propertiesFactory, + final Function, ? extends Topology> topologyFactory, + final Function> propertiesFactory, final Serde defaultKeySerde, final Serde defaultValueSerde, final SchemaRegistryMock schemaRegistryMock) { super(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, schemaRegistryMock); @@ -122,8 +121,9 @@ public void beforeEach(final ExtensionContext context) { } @Override - protected TestTopology with(final Function topologyFactory, - final Function> propertiesFactory, final Serde defaultKeySerde, + protected TestTopology with( + final Function, ? extends Topology> topologyFactory, + final Function> propertiesFactory, final Serde defaultKeySerde, final Serde defaultValueSerde, final SchemaRegistryMock schemaRegistry) { return new TestTopologyExtension<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, diff --git a/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountTest.java b/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountTest.java index c12b88a..55fa743 100644 --- a/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountTest.java +++ b/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountTest.java @@ -41,7 +41,7 @@ class WordCountTest { @RegisterExtension final TestTopologyExtension testTopology = new TestTopologyExtension<>(this.app::getTopology, - this.app.getKafkaProperties()); + WordCount.getKafkaProperties()); @Test void shouldAggregateSameWordStream() { diff --git a/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWithStaticTopologyTest.java b/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWithStaticTopologyTest.java index 199bff1..efae5a4 100644 --- a/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWithStaticTopologyTest.java +++ b/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWithStaticTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 bakdata GmbH + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -34,7 +34,7 @@ class WordCountWithStaticTopologyTest { @RegisterExtension final TestTopologyExtension testTopology = new TestTopologyExtension<>(this.app.getTopology(), - this.app.getKafkaProperties()); + WordCount.getKafkaProperties()); @Test void shouldAggregateSameWordStream() { diff --git a/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWitherTest.java b/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWitherTest.java index e8a1a19..e9f9084 100644 --- a/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWitherTest.java +++ b/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWitherTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 bakdata GmbH + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -37,7 +37,7 @@ class WordCountWitherTest { @RegisterExtension final TestTopologyExtension testTopology = - new TestTopologyExtension<>(this.app::getTopology, this.app.getKafkaProperties()) + new TestTopologyExtension<>(this.app::getTopology, WordCount.getKafkaProperties()) .withDefaultValueSerde(Serdes.String()) .withSchemaRegistryMock(new SchemaRegistryMock(List.of(new AvroSchemaProvider()))); diff --git a/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/test_applications/WordCount.java b/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/test_applications/WordCount.java index 7940d99..64dfeb1 100644 --- a/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/test_applications/WordCount.java +++ b/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/test_applications/WordCount.java @@ -1,12 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.bakdata.fluent_kafka_streams_tests.junit5.test_applications; import java.util.Arrays; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; import java.util.regex.Pattern; import lombok.Getter; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; @@ -14,27 +39,19 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; +@Getter public class WordCount { - @Getter private final String inputTopic = "wordcount-input"; - @Getter private final String outputTopic = "wordcount-output"; - public static void main(final String[] args) { - final WordCount wordCount = new WordCount(); - final KafkaStreams streams = new KafkaStreams(wordCount.getTopology(), wordCount.getKafkaProperties()); - streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); - } - - public Properties getKafkaProperties() { + public static Map getKafkaProperties() { final String brokers = "localhost:9092"; - final Properties kafkaConfig = new Properties(); - kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount"); - kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); - kafkaConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - kafkaConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + final Map kafkaConfig = new HashMap<>(); + kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount"); + kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); + kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); return kafkaConfig; } diff --git a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java index 50415e4..2af58f2 100644 --- a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java +++ b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java @@ -103,12 +103,12 @@ @Getter public class TestTopology { private final SchemaRegistryMock schemaRegistry; - private final Function topologyFactory; - private final Properties properties = new Properties(); + private final Function, ? extends Topology> topologyFactory; + private final Map properties = new HashMap<>(); private final Collection inputTopics = new HashSet<>(); private final Collection inputPatterns = new HashSet<>(); private final Collection outputTopics = new HashSet<>(); - private final Function> propertiesFactory; + private final Function> propertiesFactory; private final Serde defaultKeySerde; private final Serde defaultValueSerde; @@ -118,8 +118,8 @@ public class TestTopology { /** * Used by wither methods. */ - protected TestTopology(final Function topologyFactory, - final Function> propertiesFactory, + protected TestTopology(final Function, ? extends Topology> topologyFactory, + final Function> propertiesFactory, final Serde defaultKeySerde, final Serde defaultValueSerde, final SchemaRegistryMock schemaRegistry) { this.schemaRegistry = schemaRegistry; @@ -138,8 +138,8 @@ protected TestTopology(final Function to * is passed as a parameter and needs to be configured if needed. Required entries: APPLICATION_ID_CONFIG, * BOOTSTRAP_SERVERS_CONFIG. */ - public TestTopology(final Function topologyFactory, - final Function> propertiesFactory) { + public TestTopology(final Function, ? extends Topology> topologyFactory, + final Function> propertiesFactory) { this(topologyFactory, propertiesFactory, null, null, new SchemaRegistryMock()); } @@ -151,10 +151,10 @@ public TestTopology(final Function topol * @param properties The properties of the Kafka Streams application under test. Required entries: * APPLICATION_ID_CONFIG, BOOTSTRAP_SERVERS_CONFIG */ - public TestTopology(final Function topologyFactory, - final Map properties) { + public TestTopology(final Function, ? extends Topology> topologyFactory, + final Map properties) { this(topologyFactory, schemaRegistryUrl -> { - final Map newProperties = new HashMap<>(properties); + final Map newProperties = new HashMap<>(properties); newProperties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); return Map.copyOf(newProperties); }); @@ -170,7 +170,7 @@ public TestTopology(final Function topol * BOOTSTRAP_SERVERS_CONFIG. */ public TestTopology(final Supplier topologyFactory, - final Function> propertiesFactory) { + final Function> propertiesFactory) { this(props -> topologyFactory.get(), propertiesFactory); } @@ -182,7 +182,7 @@ public TestTopology(final Supplier topologyFactory, * @param properties The properties of the Kafka Streams application under test. Required entries: * APPLICATION_ID_CONFIG, BOOTSTRAP_SERVERS_CONFIG */ - public TestTopology(final Supplier topologyFactory, final Map properties) { + public TestTopology(final Supplier topologyFactory, final Map properties) { this(props -> topologyFactory.get(), properties); } @@ -196,7 +196,7 @@ public TestTopology(final Supplier topologyFactory, final Ma * BOOTSTRAP_SERVERS_CONFIG. */ public TestTopology(final Topology topology, - final Function> propertiesFactory) { + final Function> propertiesFactory) { this(props -> topology, propertiesFactory); } @@ -208,7 +208,7 @@ public TestTopology(final Topology topology, * @param properties The properties of the Kafka Streams application under test. Required entries: * APPLICATION_ID_CONFIG, BOOTSTRAP_SERVERS_CONFIG */ - public TestTopology(final Topology topology, final Map properties) { + public TestTopology(final Topology topology, final Map properties) { this(props -> topology, properties); } @@ -290,9 +290,9 @@ public void start() { } catch (final IOException e) { throw new UncheckedIOException("Cannot create temporary state directory", e); } - this.properties.setProperty(StreamsConfig.STATE_DIR_CONFIG, this.stateDirectory.toAbsolutePath().toString()); + this.properties.put(StreamsConfig.STATE_DIR_CONFIG, this.stateDirectory.toAbsolutePath().toString()); final Topology topology = this.topologyFactory.apply(this.properties); - this.testDriver = new TopologyTestDriver(topology, this.properties); + this.testDriver = new TopologyTestDriver(topology, this.createProperties()); this.inputTopics.clear(); this.inputPatterns.clear(); @@ -309,6 +309,15 @@ public void start() { } } + protected TestTopology with( + final Function, ? extends Topology> topologyFactory, + final Function> propertiesFactory, final Serde defaultKeySerde, + final Serde defaultValueSerde, + final SchemaRegistryMock schemaRegistry) { + return new TestTopology<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, + schemaRegistry); + } + /** * Get the default serde of the key type in your application. */ @@ -442,12 +451,10 @@ public void stop() { } } - protected TestTopology with(final Function topologyFactory, - final Function> propertiesFactory, final Serde defaultKeySerde, - final Serde defaultValueSerde, - final SchemaRegistryMock schemaRegistry) { - return new TestTopology<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, - schemaRegistry); + private Properties createProperties() { + final Properties props = new Properties(); + props.putAll(this.properties); + return props; } private void processNode(final Node node) { diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/CountInhabitantsWithProtoTest.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/CountInhabitantsWithProtoTest.java index 8f0222a..72a61b9 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/CountInhabitantsWithProtoTest.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/CountInhabitantsWithProtoTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -31,7 +31,7 @@ import com.bakdata.schemaregistrymock.SchemaRegistryMock; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; import java.util.Collections; -import java.util.Properties; +import java.util.Map; import org.apache.kafka.common.serialization.Serdes; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -91,7 +91,7 @@ void shouldGetSchemaRegistryClient() { Assertions.assertNotNull(this.testTopology.getSchemaRegistry()); } - private Properties properties(final String schemaRegistryUrl) { + private Map properties(final String schemaRegistryUrl) { this.app.setSchemaRegistryUrl(schemaRegistryUrl); return this.app.getKafkaProperties(); } diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/ErrorEventsPerMinuteTest.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/ErrorEventsPerMinuteTest.java index d785486..816a690 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/ErrorEventsPerMinuteTest.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/ErrorEventsPerMinuteTest.java @@ -1,3 +1,27 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.bakdata.fluent_kafka_streams_tests; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -21,7 +45,7 @@ class ErrorEventsPerMinuteTest { private final ErrorEventsPerMinute app = new ErrorEventsPerMinute(); private final TestTopology testTopology = - new TestTopology<>(this.app::getTopology, this.app.getKafkaProperties()); + new TestTopology<>(this.app::getTopology, ErrorEventsPerMinute.getKafkaProperties()); @BeforeEach void start() { diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/MirrorPatternTopicMixedTest.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/MirrorPatternTopicMixedTest.java index b1d6fd2..b0d81b7 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/MirrorPatternTopicMixedTest.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/MirrorPatternTopicMixedTest.java @@ -1,8 +1,31 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.bakdata.fluent_kafka_streams_tests; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import com.bakdata.fluent_kafka_streams_tests.test_applications.MirrorPattern; import com.bakdata.fluent_kafka_streams_tests.test_applications.MirrorPatternTopicMixed; import java.util.NoSuchElementException; import org.junit.jupiter.api.AfterEach; @@ -13,7 +36,7 @@ class MirrorPatternTopicMixedTest { private final MirrorPatternTopicMixed app = new MirrorPatternTopicMixed(); private final TestTopology testTopology = new TestTopology<>(this.app::getTopology, - MirrorPattern.getKafkaProperties()); + MirrorPatternTopicMixed.getKafkaProperties()); @BeforeEach void start() { diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/NameJoinTest.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/NameJoinTest.java index f430049..406640a 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/NameJoinTest.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/NameJoinTest.java @@ -1,3 +1,27 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.bakdata.fluent_kafka_streams_tests; import com.bakdata.fluent_kafka_streams_tests.test_applications.NameJoinGlobalKTable; @@ -8,10 +32,9 @@ import org.junit.jupiter.api.Test; class NameJoinTest { - private final NameJoinGlobalKTable app = new NameJoinGlobalKTable(); private final TestTopology testTopology = - new TestTopology<>(this.app::getTopology, this.app.getKafkaProperties()); + new TestTopology<>(NameJoinGlobalKTable::getTopology, NameJoinGlobalKTable.getKafkaProperties()); @BeforeEach void start() { @@ -39,4 +62,4 @@ void testTopology() { .expectNoMoreRecord(); } -} \ No newline at end of file +} diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/NameJoinWithIntermediateTopicTest.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/NameJoinWithIntermediateTopicTest.java index e01d206..522775e 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/NameJoinWithIntermediateTopicTest.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/NameJoinWithIntermediateTopicTest.java @@ -1,3 +1,27 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.bakdata.fluent_kafka_streams_tests; import com.bakdata.fluent_kafka_streams_tests.test_applications.NameJoinGlobalKTable; @@ -8,10 +32,10 @@ import org.junit.jupiter.api.Test; class NameJoinWithIntermediateTopicTest { - private final NameJoinGlobalKTable app = new NameJoinGlobalKTable(); private final TestTopology testTopology = - new TestTopology<>(this.app::getTopologyWithIntermediateTopic, this.app.getKafkaProperties()); + new TestTopology<>(NameJoinGlobalKTable::getTopologyWithIntermediateTopic, + NameJoinGlobalKTable.getKafkaProperties()); @BeforeEach void start() { diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/UserClicksPerMinuteTest.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/UserClicksPerMinuteTest.java index d7a3d5d..70d3b27 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/UserClicksPerMinuteTest.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/UserClicksPerMinuteTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata GmbH + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -38,7 +38,6 @@ class UserClicksPerMinuteTest { private static final int USER = 1; private static final int USER1 = 1; private static final int USER2 = 2; - private final UserClicksPerMinute app = new UserClicksPerMinute(); private final TestTopology testTopology = new TestTopology<>(UserClicksPerMinute::getTopology, UserClicksPerMinute.getKafkaProperties()); diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountTest.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountTest.java index 159d502..1b8c901 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountTest.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountTest.java @@ -44,7 +44,7 @@ class WordCountTest { private final WordCount app = new WordCount(); private final TestTopology testTopology = new TestTopology<>(this.app::getTopology, - this.app.getKafkaProperties()); + WordCount.getKafkaProperties()); @BeforeEach void start() { diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountWithDefaultSerdeTest.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountWithDefaultSerdeTest.java index 8be5ae9..d15b8ea 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountWithDefaultSerdeTest.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountWithDefaultSerdeTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata GmbH + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -33,8 +33,8 @@ class WordCountWithDefaultSerdeTest { private final WordCount app = new WordCount(); - public final TestTopology testTopology = - new TestTopology<>(this.app.getTopology(), this.app.getKafkaProperties()) + private final TestTopology testTopology = + new TestTopology<>(this.app.getTopology(), WordCount.getKafkaProperties()) .withDefaultValueSerde(Serdes.String()); @BeforeEach diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountWithStaticTopologyTest.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountWithStaticTopologyTest.java index 80e494c..ad11681 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountWithStaticTopologyTest.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/WordCountWithStaticTopologyTest.java @@ -1,3 +1,27 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.bakdata.fluent_kafka_streams_tests; import com.bakdata.fluent_kafka_streams_tests.test_applications.WordCount; @@ -9,8 +33,8 @@ class WordCountWithStaticTopologyTest { private final WordCount app = new WordCount(); - final TestTopology testTopology = - new TestTopology<>(this.app::getTopology, this.app.getKafkaProperties()); + private final TestTopology testTopology = + new TestTopology<>(this.app::getTopology, WordCount.getKafkaProperties()); @BeforeEach void start() { diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/CountInhabitantsWithAvro.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/CountInhabitantsWithAvro.java index 9ea01fa..6fe0d27 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/CountInhabitantsWithAvro.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/CountInhabitantsWithAvro.java @@ -1,7 +1,7 @@ /* - * The MIT License + * MIT License * - * Copyright (c) 2019 bakdata GmbH + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -28,10 +28,10 @@ import com.bakdata.fluent_kafka_streams_tests.test_types.Person; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; import lombok.Getter; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -39,30 +39,21 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +@Getter public class CountInhabitantsWithAvro { - @Getter private final String inputTopic = "person-input"; - @Getter private final String outputTopic = "city-output"; - @Getter private final String schemaRegistryUrl = "http://dummy"; - public static void main(final String[] args) { - final CountInhabitantsWithAvro app = new CountInhabitantsWithAvro(); - final KafkaStreams streams = new KafkaStreams(app.getTopology(), app.getKafkaProperties()); - streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); - } - - public Properties getKafkaProperties() { + public Map getKafkaProperties() { final String brokers = "localhost:9092"; - final Properties kafkaConfig = new Properties(); + final Map kafkaConfig = new HashMap<>(); kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "inhabitants-per-city"); kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); - kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); kafkaConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryUrl); return kafkaConfig; diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/CountInhabitantsWithProto.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/CountInhabitantsWithProto.java index 75c786b..327c1db 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/CountInhabitantsWithProto.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/CountInhabitantsWithProto.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -31,13 +31,11 @@ import io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde; import java.util.HashMap; import java.util.Map; -import java.util.Properties; import lombok.Getter; import lombok.Setter; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes.StringSerde; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -58,14 +56,6 @@ public class CountInhabitantsWithProto { @Setter private String schemaRegistryUrl; - - public static void main(final String[] args) { - final CountInhabitantsWithAvro app = new CountInhabitantsWithAvro(); - final KafkaStreams streams = new KafkaStreams(app.getTopology(), app.getKafkaProperties()); - streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); - } - public KafkaProtobufSerde newPersonSerde() { final KafkaProtobufSerde serde = new KafkaProtobufSerde<>(Person.class); final Map config = new HashMap<>(); @@ -82,14 +72,14 @@ public KafkaProtobufSerde newCitySerde() { return serde; } - public Properties getKafkaProperties() { + public Map getKafkaProperties() { final String brokers = "localhost:9092"; - final Properties kafkaConfig = new Properties(); - kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "inhabitants-per-city"); - kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + final Map kafkaConfig = new HashMap<>(); + kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "inhabitants-per-city"); + kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaProtobufSerde.class); - kafkaConfig.setProperty(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryUrl); + kafkaConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryUrl); return kafkaConfig; } diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/ErrorEventsPerMinute.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/ErrorEventsPerMinute.java index 8db2487..47b52db 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/ErrorEventsPerMinute.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/ErrorEventsPerMinute.java @@ -1,3 +1,27 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.bakdata.fluent_kafka_streams_tests.test_applications; import com.bakdata.fluent_kafka_streams_tests.serde.JsonSerde; @@ -5,9 +29,11 @@ import com.bakdata.fluent_kafka_streams_tests.test_types.ErrorOutput; import com.bakdata.fluent_kafka_streams_tests.test_types.StatusCode; import java.time.Duration; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; import lombok.Getter; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.IntegerSerde; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -20,25 +46,22 @@ import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; +@Getter public class ErrorEventsPerMinute { - @Getter private final String clickInputTopic = "user-click-input"; - @Getter private final String statusInputTopic = "status-input"; - @Getter private final String errorOutputTopic = "user-error-output"; - @Getter private final String alertTopic = "error-alert-output"; - public Properties getKafkaProperties() { + public static Map getKafkaProperties() { final String brokers = "localhost:9092"; - final Properties kafkaConfig = new Properties(); - kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "errors-per-minute"); - kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); - kafkaConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); + final Map kafkaConfig = new HashMap<>(); + kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "errors-per-minute"); + kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, IntegerSerde.class); kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class); return kafkaConfig; } diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/Mirror.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/Mirror.java index c3a2808..e2ac6ed 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/Mirror.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/Mirror.java @@ -1,35 +1,51 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.bakdata.fluent_kafka_streams_tests.test_applications; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; import lombok.Getter; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; +@Getter public class Mirror { - @Getter private final String inputTopic = "input"; - @Getter private final String outputTopic = "output"; - public static void main(final String[] args) { - final Mirror wordCount = new Mirror(); - final KafkaStreams streams = new KafkaStreams(wordCount.getTopology(), getKafkaProperties()); - streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); - } - - public static Properties getKafkaProperties() { + public static Map getKafkaProperties() { final String brokers = "localhost:9092"; - final Properties kafkaConfig = new Properties(); - kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "mirror"); - kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); - kafkaConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - kafkaConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + final Map kafkaConfig = new HashMap<>(); + kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "mirror"); + kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); + kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); return kafkaConfig; } diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/MirrorAvro.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/MirrorAvro.java index 2f852e7..523ead5 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/MirrorAvro.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/MirrorAvro.java @@ -1,7 +1,7 @@ /* - * The MIT License + * MIT License * - * Copyright (c) 2022 bakdata GmbH + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,7 +25,8 @@ package com.bakdata.fluent_kafka_streams_tests.test_applications; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; import lombok.experimental.UtilityClass; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.streams.StreamsBuilder; @@ -38,11 +39,11 @@ public class MirrorAvro { private static final String INPUT_TOPIC = "input"; private static final String OUTPUT_TOPIC = "output"; - public static Properties getKafkaProperties() { + public static Map getKafkaProperties() { final String brokers = "localhost:9092"; - final Properties kafkaConfig = new Properties(); - kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "mirror"); - kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + final Map kafkaConfig = new HashMap<>(); + kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "mirror"); + kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); return kafkaConfig; diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/MirrorPattern.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/MirrorPattern.java index 1959f73..677aa5c 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/MirrorPattern.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/MirrorPattern.java @@ -1,38 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.bakdata.fluent_kafka_streams_tests.test_applications; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; import java.util.regex.Pattern; import lombok.Getter; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; +@Getter public class MirrorPattern { - @Getter private final String inputPattern1 = ".*-input1"; - @Getter private final String inputPattern2 = ".*-input2"; - @Getter private final String outputTopic = "output"; - public static void main(final String[] args) { - final MirrorPattern wordCount = new MirrorPattern(); - final KafkaStreams streams = new KafkaStreams(wordCount.getTopology(), getKafkaProperties()); - streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); - } - - public static Properties getKafkaProperties() { + public static Map getKafkaProperties() { final String brokers = "localhost:9092"; - final Properties kafkaConfig = new Properties(); - kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount"); - kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); - kafkaConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - kafkaConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + final Map kafkaConfig = new HashMap<>(); + kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount"); + kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); + kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); return kafkaConfig; } diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/MirrorPatternTopicMixed.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/MirrorPatternTopicMixed.java index a8893bb..7e0abcf 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/MirrorPatternTopicMixed.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/MirrorPatternTopicMixed.java @@ -1,38 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.bakdata.fluent_kafka_streams_tests.test_applications; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; import java.util.regex.Pattern; import lombok.Getter; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; +@Getter public class MirrorPatternTopicMixed { - @Getter private final String inputPattern = ".*-input1"; - @Getter private final String inputTopic = "input2"; - @Getter private final String outputTopic = "output"; - public static void main(final String[] args) { - final MirrorPatternTopicMixed wordCount = new MirrorPatternTopicMixed(); - final KafkaStreams streams = new KafkaStreams(wordCount.getTopology(), getKafkaProperties()); - streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); - } - - public static Properties getKafkaProperties() { + public static Map getKafkaProperties() { final String brokers = "localhost:9092"; - final Properties kafkaConfig = new Properties(); - kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount"); - kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); - kafkaConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - kafkaConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + final Map kafkaConfig = new HashMap<>(); + kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount"); + kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); + kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); return kafkaConfig; } diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/NameJoinGlobalKTable.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/NameJoinGlobalKTable.java index 4fbc334..d2a774c 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/NameJoinGlobalKTable.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/NameJoinGlobalKTable.java @@ -1,10 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.bakdata.fluent_kafka_streams_tests.test_applications; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; +import lombok.experimental.UtilityClass; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes.LongSerde; import org.apache.kafka.common.serialization.Serdes.StringSerde; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; @@ -13,31 +38,24 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; +@UtilityClass public class NameJoinGlobalKTable { public static final String INPUT_TOPIC = "id-input"; public static final String NAME_INPUT = "name-input"; public static final String INTERMEDIATE_TOPIC = "upper-case-input"; public static final String OUTPUT_TOPIC = "join-output"; - - public static void main(final String[] args) { - final NameJoinGlobalKTable kTableJoin = new NameJoinGlobalKTable(); - final KafkaStreams streams = new KafkaStreams(kTableJoin.getTopology(), kTableJoin.getKafkaProperties()); - streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); - } - - public Properties getKafkaProperties() { + public static Map getKafkaProperties() { final String brokers = "localhost:9092"; - final Properties kafkaConfig = new Properties(); - kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "globalKTableJoin"); - kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + final Map kafkaConfig = new HashMap<>(); + kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "globalKTableJoin"); + kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, LongSerde.class); kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); return kafkaConfig; } - public Topology getTopology() { + public static Topology getTopology() { final StreamsBuilder builder = new StreamsBuilder(); final KStream inputStream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Long(), Serdes.Long())); @@ -53,7 +71,7 @@ public Topology getTopology() { return builder.build(); } - public Topology getTopologyWithIntermediateTopic() { + public static Topology getTopologyWithIntermediateTopic() { final StreamsBuilder builder = new StreamsBuilder(); final KStream inputStream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Long(), Serdes.Long())); diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/TopicExtractorApplication.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/TopicExtractorApplication.java index d767947..0f6ce8e 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/TopicExtractorApplication.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/TopicExtractorApplication.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata GmbH + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,9 +24,11 @@ package com.bakdata.fluent_kafka_streams_tests.test_applications; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; import lombok.experimental.UtilityClass; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; @@ -45,12 +47,12 @@ public static Topology getTopology() { return builder.build(); } - public static Properties getProperties() { - final Properties properties = new Properties(); - properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dynamic-test-stream"); - properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:123"); - properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + public static Map getProperties() { + final Map properties = new HashMap<>(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "dynamic-test-stream"); + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:123"); + properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); + properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); return properties; } } diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/UserClicksPerMinute.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/UserClicksPerMinute.java index 3b2df57..4ea789d 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/UserClicksPerMinute.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/UserClicksPerMinute.java @@ -1,12 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.bakdata.fluent_kafka_streams_tests.test_applications; import com.bakdata.fluent_kafka_streams_tests.serde.JsonSerde; import com.bakdata.fluent_kafka_streams_tests.test_types.ClickEvent; import com.bakdata.fluent_kafka_streams_tests.test_types.ClickOutput; import java.time.Duration; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.common.serialization.Serdes.IntegerSerde; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -22,18 +47,12 @@ public class UserClicksPerMinute { private static final String OUTPUT_TOPIC = "user-click-output"; - public static void main(final String[] args) { - final KafkaStreams streams = new KafkaStreams(getTopology(), getKafkaProperties()); - streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); - } - - public static Properties getKafkaProperties() { + public static Map getKafkaProperties() { final String brokers = "localhost:9092"; - final Properties kafkaConfig = new Properties(); - kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "user-clicks-per-minute"); - kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); - kafkaConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); + final Map kafkaConfig = new HashMap<>(); + kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-clicks-per-minute"); + kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, IntegerSerde.class); kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class); return kafkaConfig; } diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/WordCount.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/WordCount.java index 0e71cad..c0e977b 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/WordCount.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/test_applications/WordCount.java @@ -1,12 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.bakdata.fluent_kafka_streams_tests.test_applications; import java.util.Arrays; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; import java.util.regex.Pattern; import lombok.Getter; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; @@ -15,27 +40,19 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; +@Getter public class WordCount { - @Getter private final String inputTopic = "wordcount-input"; - @Getter private final String outputTopic = "wordcount-output"; - public static void main(final String[] args) { - final WordCount wordCount = new WordCount(); - final KafkaStreams streams = new KafkaStreams(wordCount.getTopology(), wordCount.getKafkaProperties()); - streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); - } - - public Properties getKafkaProperties() { + public static Map getKafkaProperties() { final String brokers = "localhost:9092"; - final Properties kafkaConfig = new Properties(); - kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount"); - kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); - kafkaConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - kafkaConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + final Map kafkaConfig = new HashMap<>(); + kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount"); + kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); + kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); return kafkaConfig; } diff --git a/schema-registry-mock-junit5/src/test/java/com/bakdata/schemaregistrymock/junit5/SchemaRegistryMockExtensionTest.java b/schema-registry-mock-junit5/src/test/java/com/bakdata/schemaregistrymock/junit5/SchemaRegistryMockExtensionTest.java index 7dbbe68..5433a28 100644 --- a/schema-registry-mock-junit5/src/test/java/com/bakdata/schemaregistrymock/junit5/SchemaRegistryMockExtensionTest.java +++ b/schema-registry-mock-junit5/src/test/java/com/bakdata/schemaregistrymock/junit5/SchemaRegistryMockExtensionTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata GmbH + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -39,9 +39,13 @@ class SchemaRegistryMockExtensionTest { @RegisterExtension final SchemaRegistryMockExtension schemaRegistry = new SchemaRegistryMockExtension(); + private static Schema createSchema(final String name) { + return Schema.createRecord(name, "no doc", "", false, Collections.emptyList()); + } + @Test void shouldRegisterKeySchema() throws IOException, RestClientException { - final Schema keySchema = this.createSchema("key_schema"); + final Schema keySchema = createSchema("key_schema"); final int id = this.schemaRegistry.registerKeySchema("test-topic", keySchema); final AvroSchema retrievedSchema = (AvroSchema) this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); @@ -50,7 +54,7 @@ void shouldRegisterKeySchema() throws IOException, RestClientException { @Test void shouldRegisterValueSchema() throws IOException, RestClientException { - final Schema valueSchema = this.createSchema("value_schema"); + final Schema valueSchema = createSchema("value_schema"); final int id = this.schemaRegistry.registerValueSchema("test-topic", valueSchema); final AvroSchema retrievedSchema = (AvroSchema) this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); @@ -59,7 +63,7 @@ void shouldRegisterValueSchema() throws IOException, RestClientException { @Test void shouldRegisterKeySchemaWithClient() throws IOException, RestClientException { - final Schema keySchema = this.createSchema("key_schema"); + final Schema keySchema = createSchema("key_schema"); final int id = this.schemaRegistry.getSchemaRegistryClient().register("test-topic-key", new AvroSchema(keySchema)); @@ -69,7 +73,7 @@ void shouldRegisterKeySchemaWithClient() throws IOException, RestClientException @Test void shouldRegisterValueSchemaWithClient() throws IOException, RestClientException { - final Schema valueSchema = this.createSchema("value_schema"); + final Schema valueSchema = createSchema("value_schema"); final int id = this.schemaRegistry.getSchemaRegistryClient().register("test-topic-value", new AvroSchema(valueSchema)); @@ -79,7 +83,7 @@ void shouldRegisterValueSchemaWithClient() throws IOException, RestClientExcepti @Test void shouldHaveSchemaVersions() throws IOException, RestClientException { - final Schema valueSchema = this.createSchema("value_schema"); + final Schema valueSchema = createSchema("value_schema"); final String topic = "test-topic"; final int id = this.schemaRegistry.registerValueSchema(topic, valueSchema); @@ -96,7 +100,7 @@ void shouldHaveSchemaVersions() throws IOException, RestClientException { @Test void shouldHaveLatestSchemaVersion() throws IOException, RestClientException { - final Schema valueSchema1 = this.createSchema("value_schema"); + final Schema valueSchema1 = createSchema("value_schema"); final String topic = "test-topic"; final int id1 = this.schemaRegistry.registerValueSchema(topic, valueSchema1); @@ -118,8 +122,4 @@ void shouldHaveLatestSchemaVersion() throws IOException, RestClientException { final Schema retrievedSchema = new Schema.Parser().parse(schemaString); assertThat(retrievedSchema).isEqualTo(valueSchema2); } - - private Schema createSchema(final String name) { - return Schema.createRecord(name, "no doc", "", false, Collections.emptyList()); - } }