diff --git a/producer-datagen/pom.xml b/producer-datagen/pom.xml
index e278081..5232341 100644
--- a/producer-datagen/pom.xml
+++ b/producer-datagen/pom.xml
@@ -27,6 +27,16 @@
org.apache.kafka
kafka-clients
+
+ org.apache.kafka
+ connect-api
+ ${kafka.version}
+
+
+ org.apache.kafka
+ connect-json
+ ${kafka.version}
+
info.picocli
@@ -54,6 +64,7 @@
com.fasterxml.jackson.core
jackson-databind
+
io.confluent
kafka-avro-serializer
@@ -64,6 +75,26 @@
avro-random-generator
${confluent.avro.generator.version}
+
+ io.confluent
+ kafka-connect-avro-data
+ ${confluent.version}
+
+
+ io.confluent
+ kafka-connect-avro-converter
+ ${confluent.version}
+
+
+ io.confluent
+ kafka-connect-json-schema-converter
+ ${confluent.version}
+
+
+ io.confluent
+ kafka-connect-protobuf-converter
+ ${confluent.version}
+
org.graalvm.nativeimage
@@ -196,7 +227,7 @@
--initialize-at-run-time=org.apache.kafka.common.security.authenticator.SaslClientAuthenticator
- --initialize-at-build-time=${graalvm.at-build-time.slf4j},${graalvm.at-build-time.kafka},${graalvm.at-build-time.json}
+ --initialize-at-build-time=${graalvm.at-build-time.slf4j},${graalvm.at-build-time.kafka},${graalvm.at-build-time.json},org.joda.time
--enable-url-protocols=http
--enable-url-protocols=https
diff --git a/producer-datagen/src/main/java/kafka/cli/producer/datagen/IntervalRunner.java b/producer-datagen/src/main/java/kafka/cli/producer/datagen/IntervalRunner.java
index 59d7a9f..9e0cc1e 100644
--- a/producer-datagen/src/main/java/kafka/cli/producer/datagen/IntervalRunner.java
+++ b/producer-datagen/src/main/java/kafka/cli/producer/datagen/IntervalRunner.java
@@ -1,20 +1,18 @@
package kafka.cli.producer.datagen;
import java.io.IOException;
-import kafka.cli.producer.datagen.PayloadGenerator.Format;
import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
public class IntervalRunner {
final Config config;
- final KafkaProducer producer;
+ final KafkaProducer producer;
final PayloadGenerator payloadGenerator;
final Stats stats;
public IntervalRunner(
Config config,
- KafkaProducer producer,
+ KafkaProducer producer,
PayloadGenerator payloadGenerator,
Stats stats
) {
@@ -53,19 +51,9 @@ void loop(int count) {
}
void runOnce() {
- var payload = payloadGenerator.get();
- var key = payloadGenerator.key(payload);
- Object value;
-
- if (payloadGenerator.format.equals(Format.AVRO)) {
- value = payload;
- } else {
- value = payloadGenerator.toJson(payload);
- }
-
var sendStartMs = System.currentTimeMillis();
var cb = stats.nextCompletion(sendStartMs, sample.length, stats);
- var record = new ProducerRecord<>(config.topicName(), key, value);
+ var record = payloadGenerator.record(config.topicName());
producer.send(record, cb);
diff --git a/producer-datagen/src/main/java/kafka/cli/producer/datagen/PayloadGenerator.java b/producer-datagen/src/main/java/kafka/cli/producer/datagen/PayloadGenerator.java
index f88f7f6..8ed034a 100644
--- a/producer-datagen/src/main/java/kafka/cli/producer/datagen/PayloadGenerator.java
+++ b/producer-datagen/src/main/java/kafka/cli/producer/datagen/PayloadGenerator.java
@@ -1,6 +1,10 @@
package kafka.cli.producer.datagen;
import io.confluent.avro.random.generator.Generator;
+import io.confluent.connect.avro.AvroConverter;
+import io.confluent.connect.avro.AvroData;
+import io.confluent.connect.json.JsonSchemaConverter;
+import io.confluent.connect.protobuf.ProtobufConverter;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -9,6 +13,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
@@ -24,6 +29,8 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.storage.Converter;
/** Datagen side */
public class PayloadGenerator {
@@ -31,9 +38,13 @@ public class PayloadGenerator {
final Format format;
final Random random;
final Generator generator;
+ final Converter converter;
+ final AvroData avroData;
+ final Schema avroSchema;
+ final org.apache.kafka.connect.data.Schema connectSchema;
final String keyFieldName;
- public PayloadGenerator(Config config) {
+ public PayloadGenerator(Config config, Properties producerConfig) {
this.format = config.format();
this.random = new Random();
config
@@ -45,6 +56,42 @@ public PayloadGenerator(Config config) {
this.generator = new Generator.Builder().random(random).generation(config.count()).schema(config.schema()).build();
this.keyFieldName = config.keyFieldName();
+ this.avroData = new AvroData(1);
+ this.avroSchema = config.schema();
+ this.connectSchema = avroData.toConnectSchema(config.schema());
+ this.converter =
+ switch (this.format) {
+ case JSON -> {
+ var jsonConverter = new JsonConverter();
+ var schemasEnabled = producerConfig.getProperty("schemas.enabled", "false");
+ jsonConverter.configure(Map.of("schemas.enable", schemasEnabled, "converter.type", "value"));
+ yield jsonConverter;
+ }
+ case AVRO -> {
+ var avroConverter = new AvroConverter();
+ avroConverter.configure(
+ producerConfig.keySet().stream().collect(Collectors.toMap(String::valueOf, producerConfig::get)),
+ false
+ );
+ yield avroConverter;
+ }
+ case PROTOBUF -> {
+ var avroConverter = new ProtobufConverter();
+ avroConverter.configure(
+ producerConfig.keySet().stream().collect(Collectors.toMap(String::valueOf, producerConfig::get)),
+ false
+ );
+ yield avroConverter;
+ }
+ case JSON_SCHEMA -> {
+ var avroConverter = new JsonSchemaConverter();
+ avroConverter.configure(
+ producerConfig.keySet().stream().collect(Collectors.toMap(String::valueOf, producerConfig::get)),
+ false
+ );
+ yield avroConverter;
+ }
+ };
}
public GenericRecord get() {
@@ -60,16 +107,11 @@ public GenericRecord get() {
return (GenericRecord) generatedObject;
}
- public ProducerRecord record(String topicName) {
+ public ProducerRecord record(String topicName) {
final var record = get();
-
- final Object value;
- if (format.equals(Format.JSON)) {
- value = toJson(record);
- } else {
- value = record;
- }
- return new ProducerRecord<>(topicName, key(record), value);
+ final var key = key(record);
+ final var value = value(topicName, record);
+ return new ProducerRecord<>(topicName, key, value);
}
String toJson(GenericRecord record) {
@@ -120,6 +162,11 @@ public String keyFieldName() {
return keyFieldName;
}
+ public byte[] value(String topicName, GenericRecord payload) {
+ final var schemaAndValue = avroData.toConnectData(avroSchema, payload);
+ return converter.fromConnectData(topicName, schemaAndValue.schema(), schemaAndValue.value());
+ }
+
public record Config(
Optional randomSeed,
Optional quickstart,
@@ -175,6 +222,8 @@ public String keyFieldName() {
public enum Format {
JSON,
AVRO,
+ JSON_SCHEMA,
+ PROTOBUF,
}
@SuppressWarnings("unchecked")
diff --git a/producer-datagen/src/main/java/kafka/cli/producer/datagen/PerformanceRunner.java b/producer-datagen/src/main/java/kafka/cli/producer/datagen/PerformanceRunner.java
index faadb40..1381459 100644
--- a/producer-datagen/src/main/java/kafka/cli/producer/datagen/PerformanceRunner.java
+++ b/producer-datagen/src/main/java/kafka/cli/producer/datagen/PerformanceRunner.java
@@ -2,7 +2,6 @@
import java.util.Map;
import java.util.TreeMap;
-import kafka.cli.producer.datagen.PayloadGenerator.Format;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -14,14 +13,14 @@
public class PerformanceRunner {
final Config config;
- final KafkaProducer producer;
+ final KafkaProducer producer;
final PayloadGenerator payloadGenerator;
final ThroughputThrottler throttler;
final Stats stats;
public PerformanceRunner(
final Config config,
- final KafkaProducer producer,
+ final KafkaProducer producer,
final PayloadGenerator payloadGenerator,
final ThroughputThrottler throughputThrottler,
final Stats stats
@@ -37,7 +36,7 @@ public void start() {
GenericRecord payload;
Object value;
String key;
- ProducerRecord record;
+ ProducerRecord record;
int currentTransactionSize = 0;
long transactionStartTime = 0;
@@ -45,21 +44,12 @@ public void start() {
var sample = payloadGenerator.sample();
for (long i = 0; i < config.records(); i++) {
- payload = payloadGenerator.get();
- key = payloadGenerator.key(payload);
-
- if (payloadGenerator.format.equals(Format.AVRO)) {
- value = payload;
- } else {
- value = payloadGenerator.toJson(payload);
- }
-
if (config.transactionsEnabled() && currentTransactionSize == 0) {
producer.beginTransaction();
transactionStartTime = System.currentTimeMillis();
}
- record = new ProducerRecord<>(config.topicName(), key, value);
+ record = payloadGenerator.record(config.topicName());
var sendStartMs = System.currentTimeMillis();
var cb = stats.nextCompletion(sendStartMs, sample.length, stats);
diff --git a/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/ProduceIntervalCommand.java b/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/ProduceIntervalCommand.java
index af788b7..362dcac 100644
--- a/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/ProduceIntervalCommand.java
+++ b/producer-datagen/src/main/java/kafka/cli/producer/datagen/command/ProduceIntervalCommand.java
@@ -10,7 +10,7 @@
import kafka.cli.producer.datagen.PayloadGenerator;
import kafka.cli.producer.datagen.Stats;
import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import picocli.CommandLine;
@@ -48,10 +48,10 @@ public Integer call() throws Exception {
producerConfig.putAll(additionalProperties);
var keySerializer = new StringSerializer();
- Serializer