Skip to content

Commit

Permalink
Kafka serdes simplification (#5200)
Browse files Browse the repository at this point in the history
* Simplify kafka serializer hierarchy

* Simplify kafka deserializer hierarchy

* Format kafka serdes code

* Simplify kafka serdes structure by removing layers

* Create base serde for basic commonalities between serializer and deserializer

* Make Json converter extend the base serde

* Remove one dereference configuration and fix avro to adapt it to the new exception

* Consolidate the dereference configuration into a single property
  • Loading branch information
carlesarnal authored Sep 26, 2024
1 parent f698783 commit accc53a
Show file tree
Hide file tree
Showing 37 changed files with 533 additions and 810 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import io.apicurio.registry.resolver.strategy.ArtifactReferenceResolverStrategy;
import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.rest.client.models.VersionMetaData;
import io.apicurio.registry.serde.avro.AvroPulsarDeserializer;
import io.apicurio.registry.serde.avro.AvroDeserializer;
import io.apicurio.registry.serde.avro.AvroPulsarSerde;
import io.apicurio.registry.serde.avro.AvroPulsarSerializer;
import io.apicurio.registry.serde.avro.AvroSerdeConfig;
import io.apicurio.registry.serde.avro.AvroSerializer;
import io.apicurio.registry.serde.avro.strategy.RecordIdStrategy;
import io.apicurio.registry.serde.config.SerdeConfig;
import io.apicurio.registry.utils.tests.TestUtils;
Expand Down Expand Up @@ -52,9 +52,8 @@ private void testAvroAutoRegisterIdInBody(
Supplier<VersionMetaData> artifactFinder) throws Exception {
Schema schema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"myrecord3\",\"namespace\":\"test_group_avro\",\"fields\":[{\"name\":\"bar\",\"type\":\"string\"}]}");
try (AvroPulsarSerializer<GenericData.Record> serializer = new AvroPulsarSerializer<>(restClient);
AvroPulsarDeserializer<GenericData.Record> deserializer = new AvroPulsarDeserializer<>(
restClient)) {
try (AvroSerializer<GenericData.Record> serializer = new AvroSerializer<>(restClient);
AvroDeserializer<GenericData.Record> deserializer = new AvroDeserializer<>(restClient)) {

AvroPulsarSerde<GenericData.Record> avroPulsarSerde = new AvroPulsarSerde<>(serializer,
deserializer, "myrecord3");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ public void avroJsonWithReferences() throws Exception {

Map<String, String> config = new HashMap<>();
config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.DEREFERENCE_SCHEMA, "false");
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
serializer.configure(config, false);

Expand Down Expand Up @@ -332,7 +333,6 @@ public void avroJsonWithReferencesDereferenced() throws Exception {
Map<String, String> config = new HashMap<>();
config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
config.put(SerdeConfig.REGISTER_DEREFERENCED, "true");
serializer.configure(config, false);

config = new HashMap<>();
Expand Down Expand Up @@ -413,7 +413,7 @@ public void avroJsonWithReferencesDeserializerDereferenced() throws Exception {
config = new HashMap<>();
config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.putIfAbsent(AvroSerdeConfig.AVRO_DATUM_PROVIDER, ReflectAvroDatumProvider.class.getName());
config.putIfAbsent(SerdeConfig.DESERIALIZER_DEREFERENCE_SCHEMA, "true");
config.putIfAbsent(SerdeConfig.DEREFERENCE_SCHEMA, "true");
deserializer.configure(config, false);

AvroSchemaB avroSchemaB = new AvroSchemaB();
Expand Down Expand Up @@ -472,7 +472,6 @@ public void avroJsonWithReferencesDeserializerDereferenced() throws Exception {
config = new HashMap<>();
config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "false");
config.put(SerdeConfig.SERIALIZER_DEREFERENCE_SCHEMA, "true");
serializer.configure(config, false);

bytes = serializer.serialize(artifactId, avroSchemaB);
Expand All @@ -494,6 +493,7 @@ public void issue4463Test() throws Exception {

Map<String, String> config = new HashMap<>();
config.put(AvroSerdeConfig.AVRO_ENCODING, AvroSerdeConfig.AVRO_ENCODING_JSON);
config.put(SerdeConfig.DEREFERENCE_SCHEMA, "true");
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
serializer.configure(config, false);

Expand Down Expand Up @@ -578,6 +578,7 @@ public void testReferenceRaw() throws Exception {
Map<String, String> config = new HashMap<>();
config.put(KafkaSerdeConfig.ENABLE_HEADERS, "true");
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
config.put(SerdeConfig.DEREFERENCE_SCHEMA, "true");
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, RecordIdStrategy.class.getName());
serializer.configure(config, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.apicurio.registry.rest.client.models.CreateArtifact;
import io.apicurio.registry.rest.client.models.IfArtifactExists;
import io.apicurio.registry.rest.client.models.VersionMetaData;
import io.apicurio.registry.serde.SchemaResolverConfigurer;
import io.apicurio.registry.serde.config.IdOption;
import io.apicurio.registry.serde.config.KafkaSerdeConfig;
import io.apicurio.registry.serde.config.SerdeConfig;
Expand Down Expand Up @@ -512,10 +511,7 @@ public void complexObjectValidation() throws Exception {
.builder().globalId(global.getGlobalId()).groupId("GLOBAL")// .version("4")
.artifactId("sample.account.json").build();

SchemaResolverConfigurer<JsonSchema, Object> src = new SchemaResolverConfigurer<JsonSchema, Object>(
client);

SchemaResolver<JsonSchema, Object> sr = src.getSchemaResolver();
SchemaResolver<JsonSchema, Object> sr = new DefaultSchemaResolver<>(client);
Map<String, String> configs = new HashMap<>();
configs.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY_DEFAULT, DefaultSchemaResolver.class.getName());
configs.put(SerdeConfig.CHECK_PERIOD_MS, "600000");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void testProtobufSchemaWithReferencesDereferenced() {
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, "true");
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
config.put(SerdeConfig.FALLBACK_ARTIFACT_GROUP_ID, groupId);
config.put(SerdeConfig.DESERIALIZER_DEREFERENCE_SCHEMA, "true");
config.put(SerdeConfig.DEREFERENCE_SCHEMA, "true");
serializer.configure(config, false);
deserializer.configure(config, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ The `DefaultSchemaResolver` uses the following properties to configure how to lo
|Used by serializers only. Specifies whether the serializer tries to create an artifact in the registry. The JSON Schema serializer does not support this feature.
|`boolean, boolean String`
|`false`
|`DEREFERENCE_SCHEMA`
|`apicurio.registry.dereference-schema`
|Used to indicate the serdes to dereference the schema. This is used in two different situation, once the schema is registered, instructs the serdes to ask the server for the schema dereferenced. It is also used to instruct the serializer to dereference the schema before registering it Registry, but this is only supported for Avro.
|`boolean`
|`false`
|`AUTO_REGISTER_ARTIFACT_IF_EXISTS`
|`apicurio.registry.auto-register.if-exists`
|Used by serializers only. Configures the behavior of the client when there is a conflict creating an artifact because the artifact already exists. Available values are `FAIL`, `UPDATE`, `RETURN`, or `RETURN_OR_UPDATE`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.serde.AbstractKafkaSerDe;
import io.apicurio.registry.serde.BaseSerde;
import io.apicurio.registry.serde.avro.AvroKafkaDeserializer;
import io.apicurio.registry.serde.avro.AvroKafkaSerializer;
import io.apicurio.registry.serde.avro.AvroSerdeConfig;
Expand Down Expand Up @@ -315,7 +315,7 @@ private static Struct buildTransactionStruct() {
@Test
public void testCompactJson() throws Exception {
testJson(createRegistryClient(), new CompactFormatStrategy(), input -> {
ByteBuffer buffer = AbstractKafkaSerDe.getByteBuffer(input);
ByteBuffer buffer = BaseSerde.getByteBuffer(input);
return buffer.getInt();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public abstract class AbstractSchemaResolver<S, T> implements SchemaResolver<S,
protected String explicitArtifactVersion;

protected Vertx vertx;
protected boolean dereference;
protected boolean resolveDereferenced;

@Override
public void configure(Map<String, ?> configs, SchemaParser<S, T> schemaParser) {
Expand Down Expand Up @@ -112,7 +112,7 @@ public void configure(Map<String, ?> configs, SchemaParser<S, T> schemaParser) {
this.explicitArtifactVersion = artifactVersionOverride;
}

this.dereference = config.deserializerDereference() || config.serializerDereference();
this.resolveDereferenced = config.resolveDereferenced();
}

/**
Expand Down Expand Up @@ -175,7 +175,7 @@ protected String resolveArtifactId(String artifactId, boolean isReference, Strin

protected SchemaLookupResult<S> resolveSchemaByGlobalId(long globalId) {
return schemaCache.getByGlobalId(globalId, globalIdKey -> {
if (dereference) {
if (resolveDereferenced) {
return resolveSchemaDereferenced(globalIdKey);
} else {
return resolveSchemaWithReferences(globalIdKey);
Expand Down Expand Up @@ -227,27 +227,16 @@ private SchemaLookupResult<S> resolveSchemaWithReferences(long globalId) {
protected Map<String, ParsedSchema<S>> resolveReferences(
List<io.apicurio.registry.rest.client.models.ArtifactReference> artifactReferences) {
Map<String, ParsedSchema<S>> resolvedReferences = new HashMap<>();

artifactReferences.forEach(reference -> {

final InputStream referenceContent = client.groups()
.byGroupId(reference.getGroupId() == null ? "default" : reference.getGroupId())
.artifacts().byArtifactId(reference.getArtifactId()).versions()
.byVersionExpression(reference.getVersion()).content().get();

final List<io.apicurio.registry.rest.client.models.ArtifactReference> referenceReferences = client
.groups().byGroupId(reference.getGroupId() == null ? "default" : reference.getGroupId()) // TODO
// verify
// the
// old
// logic:
// .pathParams(List.of(groupId
// ==
// null
// ?
// "null"
// :
// groupId,
// artifactId,
// version))
// GroupRequestsProvider.java
.groups().byGroupId(reference.getGroupId() == null ? "default" : reference.getGroupId())
.artifacts().byArtifactId(reference.getArtifactId()).versions()
.byVersionExpression(reference.getVersion()).references().get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.apicurio.registry.resolver.data.Record;
import io.apicurio.registry.resolver.strategy.ArtifactCoordinates;
import io.apicurio.registry.resolver.strategy.ArtifactReference;
import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.rest.client.models.*;
import io.apicurio.registry.types.ArtifactType;
import io.apicurio.registry.types.ContentTypes;
Expand All @@ -28,10 +29,17 @@ public class DefaultSchemaResolver<S, T> extends AbstractSchemaResolver<S, T> {
private boolean autoCreateArtifact;
private String autoCreateBehavior;
private boolean findLatest;
private boolean registerDereferenced;

private static final Logger logger = Logger.getLogger(DefaultSchemaResolver.class.getSimpleName());

public DefaultSchemaResolver() {
super();
}

public DefaultSchemaResolver(RegistryClient client) {
this.client = client;
}

/**
* @see io.apicurio.registry.resolver.AbstractSchemaResolver#reset()
*/
Expand All @@ -53,7 +61,6 @@ public void configure(Map<String, ?> configs, SchemaParser<S, T> schemaParser) {
}

this.autoCreateArtifact = config.autoRegisterArtifact();
this.registerDereferenced = config.registerDereferenced();
this.autoCreateBehavior = config.autoRegisterArtifactIfExists();
this.findLatest = config.findLatest();
}
Expand All @@ -68,7 +75,7 @@ public SchemaLookupResult<S> resolveSchema(Record<T> data) {

ParsedSchema<S> parsedSchema;
if (artifactResolverStrategy.loadSchema() && schemaParser.supportsExtractSchemaFromData()) {
parsedSchema = schemaParser.getSchemaFromData(data, registerDereferenced);
parsedSchema = schemaParser.getSchemaFromData(data, resolveDereferenced);
} else {
parsedSchema = null;
}
Expand Down Expand Up @@ -105,7 +112,7 @@ private SchemaLookupResult<S> getSchemaFromRegistry(ParsedSchema<S> parsedSchema
if (schemaParser.supportsExtractSchemaFromData()) {

if (parsedSchema == null) {
parsedSchema = schemaParser.getSchemaFromData(data, registerDereferenced);
parsedSchema = schemaParser.getSchemaFromData(data, resolveDereferenced);
}

if (parsedSchema.hasReferences()) {
Expand All @@ -130,7 +137,7 @@ private SchemaLookupResult<S> getSchemaFromRegistry(ParsedSchema<S> parsedSchema

if (schemaParser.supportsExtractSchemaFromData()) {
if (parsedSchema == null) {
parsedSchema = schemaParser.getSchemaFromData(data, dereference);
parsedSchema = schemaParser.getSchemaFromData(data, resolveDereferenced);
}
return handleResolveSchemaByContent(parsedSchema, artifactReference);
}
Expand Down Expand Up @@ -465,7 +472,7 @@ private SchemaLookupResult<S> resolveByCoordinates(String groupId, String artifa

InputStream rawSchema;
Map<String, ParsedSchema<S>> resolvedReferences = new HashMap<>();
if (dereference) {
if (resolveDereferenced) {
rawSchema = client.ids().globalIds().byGlobalId(gid).get(config -> {
assert config.queryParameters != null;
config.queryParameters.references = HandleReferencesType.DEREFERENCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,26 +166,13 @@ public SchemaResolverConfig(Map<String, ?> originals) {
public static final long RETRY_BACKOFF_MS_DEFAULT = 300;

/**
* Used to indicate the auto-register feature to try to dereference the schema before registering it in
* Registry. Only supported for Avro. Only applicable when
* {@link SchemaResolverConfig#AUTO_REGISTER_ARTIFACT} is enabled.
* Used to indicate the serdes to dereference the schema. This is used in two different situation, once
* the schema is registered, instructs the serdes to ask the server for the schema dereferenced. It is
* also used to instruct the serializer to dereference the schema before registering it Registry, but this
* is only supported for Avro.
*/
public static final String REGISTER_DEREFERENCED = "apicurio.registry.dereference-schema";
public static final boolean REGISTER_DEREFERENCED_DEFAULT = true;

/**
* Used to indicate the serializer to ask Registry to return the schema dereferenced. This is useful to
* reduce the number of http requests to the server.
*/
public static final String SERIALIZER_DEREFERENCE_SCHEMA = "apicurio.registry.serializer.dereference-schema";
public static final boolean SERIALIZER_DEREFERENCE_SCHEMA_DEFAULT = false;

/**
* Used to indicate the deserializer to ask Registry to return the schema dereferenced. This is useful to
* reduce the number of http requests to the server.
*/
public static final String DESERIALIZER_DEREFERENCE_SCHEMA = "apicurio.registry.deserializer.dereference-schema";
public static final boolean DESERIALIZER_DEREFERENCE_SCHEMA_DEFAULT = false;
public static final String DEREFERENCE_SCHEMA = "apicurio.registry.dereference-schema";
public static final boolean DEREFERENCE_DEFAULT = false;

public String getRegistryUrl() {
return getString(REGISTRY_URL);
Expand Down Expand Up @@ -270,16 +257,8 @@ public String getExplicitArtifactVersion() {
return getString(EXPLICIT_ARTIFACT_VERSION);
}

public boolean registerDereferenced() {
return getBooleanOrFalse(REGISTER_DEREFERENCED);
}

public boolean serializerDereference() {
return getBooleanOrFalse(SERIALIZER_DEREFERENCE_SCHEMA);
}

public boolean deserializerDereference() {
return getBooleanOrFalse(DESERIALIZER_DEREFERENCE_SCHEMA);
public boolean resolveDereferenced() {
return getBooleanOrFalse(DEREFERENCE_SCHEMA);
}

@Override
Expand All @@ -296,7 +275,5 @@ public boolean deserializerDereference() {
entry(FIND_LATEST_ARTIFACT, FIND_LATEST_ARTIFACT_DEFAULT),
entry(CHECK_PERIOD_MS, CHECK_PERIOD_MS_DEFAULT), entry(RETRY_COUNT, RETRY_COUNT_DEFAULT),
entry(RETRY_BACKOFF_MS, RETRY_BACKOFF_MS_DEFAULT),
entry(REGISTER_DEREFERENCED, REGISTER_DEREFERENCED_DEFAULT),
entry(DESERIALIZER_DEREFERENCE_SCHEMA, DESERIALIZER_DEREFERENCE_SCHEMA_DEFAULT),
entry(SERIALIZER_DEREFERENCE_SCHEMA, SERIALIZER_DEREFERENCE_SCHEMA_DEFAULT));
entry(DEREFERENCE_SCHEMA, DEREFERENCE_DEFAULT));
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void configure(SerdeConfig configs, boolean isKey) {
}

/**
* @see io.apicurio.registry.serde.AbstractSerDe#schemaParser()
* @see AbstractDeserializer#schemaParser()
*/
@Override
public SchemaParser<Schema, U> schemaParser() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import io.apicurio.registry.resolver.ParsedSchema;
import io.apicurio.registry.utils.IoUtil;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.reflect.ReflectData;

Expand Down Expand Up @@ -44,7 +44,7 @@ public static Schema parse(String schema, List<ParsedSchema<Schema>> references)
try {
final Schema.Parser parser = new Schema.Parser();
return parser.parse(schema);
} catch (SchemaParseException e) {
} catch (AvroTypeException e) {
// If we fail to parse the content from the main schema, then parse first the references and then
// the main schema
final Schema.Parser parser = new Schema.Parser();
Expand Down
Loading

0 comments on commit accc53a

Please sign in to comment.