Skip to content

Commit

Permalink
Configure Serdes from TestTopology
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 8, 2024
1 parent 2129eec commit 5d79c08
Showing 1 changed file with 36 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@

package com.bakdata.fluent_kafka_streams_tests;

import static java.util.Collections.emptyMap;

import com.bakdata.schemaregistrymock.SchemaRegistryMock;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.io.File;
Expand Down Expand Up @@ -458,7 +461,19 @@ public void stop() {
* @param <T> type to be (de-)serialized
*/
public <T> Serde<T> configureValueSerde(final Serde<T> serde) {
serde.configure(this.properties, false);
return this.configureValueSerde(serde, emptyMap());
}

/**
* Configure a {@code Serde} for values using {@link #properties} and config overrides
* @param serde serde to configure
* @param config configuration overrides
* @return configured {@code Serde}
* @param <T> type to be (de-)serialized
*/
public <T> Serde<T> configureValueSerde(final Serde<T> serde, final Map<String, Object> config) {
final Map<String, Object> serdeConfig = this.mergeConfig(config);
serde.configure(serdeConfig, false);
return serde;
}

Expand All @@ -469,10 +484,29 @@ public <T> Serde<T> configureValueSerde(final Serde<T> serde) {
* @param <T> type to be (de-)serialized
*/
public <T> Serde<T> configureKeySerde(final Serde<T> serde) {
serde.configure(this.properties, true);
return this.configureKeySerde(serde, emptyMap());
}

/**
* Configure a {@code Serde} for keys using {@link #properties} and config overrides
* @param serde serde to configure
* @param config configuration overrides
* @return configured {@code Serde}
* @param <T> type to be (de-)serialized
*/
public <T> Serde<T> configureKeySerde(final Serde<T> serde, final Map<String, Object> config) {
final Map<String, Object> serdeConfig = this.mergeConfig(config);
serde.configure(serdeConfig, true);
return serde;
}

private Map<String, Object> mergeConfig(final Map<String, Object> config) {
return ImmutableMap.<String, Object>builder()
.putAll(this.properties)
.putAll(config)
.build();
}

private Properties getProperties() {
final Properties props = new Properties();
props.putAll(this.properties);
Expand Down

0 comments on commit 5d79c08

Please sign in to comment.