Skip to content

Commit

Permalink
Merge pull request #5 from jeqo/converter
Browse files Browse the repository at this point in the history
feat(producer-datagen): adopt converters
  • Loading branch information
jeqo authored Sep 26, 2022
2 parents 124f489 + 03e67a6 commit 19f923d
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 49 deletions.
33 changes: 32 additions & 1 deletion producer-datagen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>info.picocli</groupId>
Expand Down Expand Up @@ -54,6 +64,7 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
Expand All @@ -64,6 +75,26 @@
<artifactId>avro-random-generator</artifactId>
<version>${confluent.avro.generator.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-data</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-json-schema-converter</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-protobuf-converter</artifactId>
<version>${confluent.version}</version>
</dependency>

<dependency>
<groupId>org.graalvm.nativeimage</groupId>
Expand Down Expand Up @@ -196,7 +227,7 @@
--initialize-at-run-time=org.apache.kafka.common.security.authenticator.SaslClientAuthenticator
</buildArg>
<buildArg>
--initialize-at-build-time=${graalvm.at-build-time.slf4j},${graalvm.at-build-time.kafka},${graalvm.at-build-time.json}
--initialize-at-build-time=${graalvm.at-build-time.slf4j},${graalvm.at-build-time.kafka},${graalvm.at-build-time.json},org.joda.time
</buildArg>
<buildArg>--enable-url-protocols=http</buildArg>
<buildArg>--enable-url-protocols=https</buildArg>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package kafka.cli.producer.datagen;

import java.io.IOException;
import kafka.cli.producer.datagen.PayloadGenerator.Format;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class IntervalRunner {

final Config config;
final KafkaProducer<String, Object> producer;
final KafkaProducer<String, byte[]> producer;
final PayloadGenerator payloadGenerator;
final Stats stats;

public IntervalRunner(
Config config,
KafkaProducer<String, Object> producer,
KafkaProducer<String, byte[]> producer,
PayloadGenerator payloadGenerator,
Stats stats
) {
Expand Down Expand Up @@ -53,19 +51,9 @@ void loop(int count) {
}

void runOnce() {
var payload = payloadGenerator.get();
var key = payloadGenerator.key(payload);
Object value;

if (payloadGenerator.format.equals(Format.AVRO)) {
value = payload;
} else {
value = payloadGenerator.toJson(payload);
}

var sendStartMs = System.currentTimeMillis();
var cb = stats.nextCompletion(sendStartMs, sample.length, stats);
var record = new ProducerRecord<>(config.topicName(), key, value);
var record = payloadGenerator.record(config.topicName());

producer.send(record, cb);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package kafka.cli.producer.datagen;

import io.confluent.avro.random.generator.Generator;
import io.confluent.connect.avro.AvroConverter;
import io.confluent.connect.avro.AvroData;
import io.confluent.connect.json.JsonSchemaConverter;
import io.confluent.connect.protobuf.ProtobufConverter;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -9,6 +13,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
Expand All @@ -24,16 +29,22 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.Converter;

/** Datagen side */
public class PayloadGenerator {

final Format format;
final Random random;
final Generator generator;
final Converter converter;
final AvroData avroData;
final Schema avroSchema;
final org.apache.kafka.connect.data.Schema connectSchema;
final String keyFieldName;

public PayloadGenerator(Config config) {
public PayloadGenerator(Config config, Properties producerConfig) {
this.format = config.format();
this.random = new Random();
config
Expand All @@ -45,6 +56,42 @@ public PayloadGenerator(Config config) {

this.generator = new Generator.Builder().random(random).generation(config.count()).schema(config.schema()).build();
this.keyFieldName = config.keyFieldName();
this.avroData = new AvroData(1);
this.avroSchema = config.schema();
this.connectSchema = avroData.toConnectSchema(config.schema());
this.converter =
switch (this.format) {
case JSON -> {
var jsonConverter = new JsonConverter();
var schemasEnabled = producerConfig.getProperty("schemas.enabled", "false");
jsonConverter.configure(Map.of("schemas.enable", schemasEnabled, "converter.type", "value"));
yield jsonConverter;
}
case AVRO -> {
var avroConverter = new AvroConverter();
avroConverter.configure(
producerConfig.keySet().stream().collect(Collectors.toMap(String::valueOf, producerConfig::get)),
false
);
yield avroConverter;
}
case PROTOBUF -> {
var avroConverter = new ProtobufConverter();
avroConverter.configure(
producerConfig.keySet().stream().collect(Collectors.toMap(String::valueOf, producerConfig::get)),
false
);
yield avroConverter;
}
case JSON_SCHEMA -> {
var avroConverter = new JsonSchemaConverter();
avroConverter.configure(
producerConfig.keySet().stream().collect(Collectors.toMap(String::valueOf, producerConfig::get)),
false
);
yield avroConverter;
}
};
}

public GenericRecord get() {
Expand All @@ -60,16 +107,11 @@ public GenericRecord get() {
return (GenericRecord) generatedObject;
}

public ProducerRecord<String, Object> record(String topicName) {
public ProducerRecord<String, byte[]> record(String topicName) {
final var record = get();

final Object value;
if (format.equals(Format.JSON)) {
value = toJson(record);
} else {
value = record;
}
return new ProducerRecord<>(topicName, key(record), value);
final var key = key(record);
final var value = value(topicName, record);
return new ProducerRecord<>(topicName, key, value);
}

String toJson(GenericRecord record) {
Expand Down Expand Up @@ -120,6 +162,11 @@ public String keyFieldName() {
return keyFieldName;
}

public byte[] value(String topicName, GenericRecord payload) {
final var schemaAndValue = avroData.toConnectData(avroSchema, payload);
return converter.fromConnectData(topicName, schemaAndValue.schema(), schemaAndValue.value());
}

public record Config(
Optional<Long> randomSeed,
Optional<Quickstart> quickstart,
Expand Down Expand Up @@ -175,6 +222,8 @@ public String keyFieldName() {
public enum Format {
JSON,
AVRO,
JSON_SCHEMA,
PROTOBUF,
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.Map;
import java.util.TreeMap;
import kafka.cli.producer.datagen.PayloadGenerator.Format;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -14,14 +13,14 @@
public class PerformanceRunner {

final Config config;
final KafkaProducer<String, Object> producer;
final KafkaProducer<String, byte[]> producer;
final PayloadGenerator payloadGenerator;
final ThroughputThrottler throttler;
final Stats stats;

public PerformanceRunner(
final Config config,
final KafkaProducer<String, Object> producer,
final KafkaProducer<String, byte[]> producer,
final PayloadGenerator payloadGenerator,
final ThroughputThrottler throughputThrottler,
final Stats stats
Expand All @@ -37,29 +36,20 @@ public void start() {
GenericRecord payload;
Object value;
String key;
ProducerRecord<String, Object> record;
ProducerRecord<String, byte[]> record;

int currentTransactionSize = 0;
long transactionStartTime = 0;

var sample = payloadGenerator.sample();

for (long i = 0; i < config.records(); i++) {
payload = payloadGenerator.get();
key = payloadGenerator.key(payload);

if (payloadGenerator.format.equals(Format.AVRO)) {
value = payload;
} else {
value = payloadGenerator.toJson(payload);
}

if (config.transactionsEnabled() && currentTransactionSize == 0) {
producer.beginTransaction();
transactionStartTime = System.currentTimeMillis();
}

record = new ProducerRecord<>(config.topicName(), key, value);
record = payloadGenerator.record(config.topicName());

var sendStartMs = System.currentTimeMillis();
var cb = stats.nextCompletion(sendStartMs, sample.length, stats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import kafka.cli.producer.datagen.PayloadGenerator;
import kafka.cli.producer.datagen.Stats;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import picocli.CommandLine;

Expand Down Expand Up @@ -48,10 +48,10 @@ public Integer call() throws Exception {
producerConfig.putAll(additionalProperties);

var keySerializer = new StringSerializer();
Serializer<Object> valueSerializer = PayloadGenerator.valueSerializer(schemaOpts.format(), producerConfig);
var valueSerializer = new ByteArraySerializer();

try (var producer = new KafkaProducer<>(producerConfig, keySerializer, valueSerializer)) {
final var payloadGenerator = new PayloadGenerator(schemaOpts.config());
final var payloadGenerator = new PayloadGenerator(schemaOpts.config(), producerConfig);
final var stats = new Stats(numRecords, reportingIntervalMs);
final var config = new IntervalRunner.Config(topicName, numRecords, intervalMs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import kafka.cli.producer.datagen.Cli;
import kafka.cli.producer.datagen.PayloadGenerator;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import picocli.CommandLine;

Expand All @@ -33,10 +33,10 @@ public Integer call() throws Exception {
if (producerConfig == null) return 1;
producerConfig.putAll(additionalProperties);
var keySerializer = new StringSerializer();
Serializer<Object> valueSerializer = PayloadGenerator.valueSerializer(schemaOpts.format(), producerConfig);
var valueSerializer = new ByteArraySerializer();

try (var producer = new KafkaProducer<>(producerConfig, keySerializer, valueSerializer)) {
var pg = new PayloadGenerator(schemaOpts.config());
var pg = new PayloadGenerator(schemaOpts.config(), producerConfig);

out.println("Avro Schema used to generate records:");
out.println(pg.schema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import kafka.cli.producer.datagen.Stats;
import kafka.cli.producer.datagen.ThroughputThrottler;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import picocli.CommandLine;

Expand Down Expand Up @@ -52,7 +53,7 @@ public Integer call() {
producerConfig.putAll(additionalProperties);

var keySerializer = new StringSerializer();
var valueSerializer = PayloadGenerator.valueSerializer(schemaOpts.format(), producerConfig);
var valueSerializer = new ByteArraySerializer();

try (var producer = new KafkaProducer<>(producerConfig, keySerializer, valueSerializer)) {
final var config = new PerformanceRunner.Config(
Expand All @@ -62,7 +63,7 @@ public Integer call() {
transactionDurationMs,
shouldPrintMetrics
);
final var payloadGenerator = new PayloadGenerator(schemaOpts.config());
final var payloadGenerator = new PayloadGenerator(schemaOpts.config(), producerConfig);
final var throughputThrottler = new ThroughputThrottler(System.currentTimeMillis(), throughput);
final var stats = new Stats(numRecords, reportingIntervalMs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Callable;
import kafka.cli.producer.datagen.Cli;
import kafka.cli.producer.datagen.PayloadGenerator;
Expand Down Expand Up @@ -33,7 +34,8 @@ public Integer call() throws Exception {
1,
PayloadGenerator.Format.JSON,
null
)
),
new Properties()
);
if (schema) {
final var schema = json.readTree(payloadGenerator.schema());
Expand Down

0 comments on commit 19f923d

Please sign in to comment.