Skip to content

Commit

Permalink
Add schema provider to Schema Registry mock JUnit 5 (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
raminqaf authored Feb 27, 2024
1 parent 6850248 commit be2010c
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-and-publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
jobs:
build-and-publish:
name: Java Gradle
uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.40.6
uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.41.0
with:
java-version: 17
secrets:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ on:
jobs:
java-gradle-release:
name: Java Gradle
uses: bakdata/ci-templates/.github/workflows/java-gradle-release.yaml@1.40.6
uses: bakdata/ci-templates/.github/workflows/java-gradle-release.yaml@1.41.0
with:
java-version: 17
release-type: "${{ inputs.release-type }}"
Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ plugins {
// release
id("net.researchgate.release") version "3.0.2"
id("com.bakdata.sonar") version "1.1.17"
id("com.bakdata.sonatype") version "1.1.14"
id("com.bakdata.sonatype") version "1.2.1"
id("org.hildan.github.changelog") version "2.2.0"
id("io.freefair.lombok") version "8.4" apply false
}
Expand Down
2 changes: 2 additions & 0 deletions schema-registry-mock-junit5/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ dependencies {

testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junit5Version)
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junit5Version)
val confluentVersion: String by project
testImplementation(group = "io.confluent", name = "kafka-protobuf-provider", version = confluentVersion)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2019 bakdata GmbH
* Copyright (c) 2024 bakdata GmbH
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,6 +24,9 @@
package com.bakdata.schemaregistrymock.junit5;

import com.bakdata.schemaregistrymock.SchemaRegistryMock;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import java.util.List;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
Expand Down Expand Up @@ -59,6 +62,24 @@
* To retrieve the url of the schema registry for a Kafka Streams config, please use {@link #getUrl()}
*/
public class SchemaRegistryMockExtension extends SchemaRegistryMock implements BeforeEachCallback, AfterEachCallback {
/**
* Create a new {@code SchemaRegistryMockExtension} with default {@link SchemaProvider SchemaProviders}.
*
* @see #SchemaRegistryMockExtension(List)
*/
public SchemaRegistryMockExtension() {
this(null);
}

/**
* Create a new {@code SchemaRegistryMockExtension} from {@link SchemaProvider SchemaProviders}.
*
* @param schemaProviders List of {@link SchemaProvider}. If null, {@link AvroSchemaProvider} will be used.
*/
public SchemaRegistryMockExtension(final List<SchemaProvider> schemaProviders) {
super(schemaProviders);
}

@Override
public void afterEach(final ExtensionContext context) {
this.stop();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata GmbH
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.schemaregistrymock.junit5;

import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ProtobufRegistryMockExtensionTest {
@RegisterExtension
private final SchemaRegistryMockExtension schemaRegistry;
private final ParsedSchema schema;

ProtobufRegistryMockExtensionTest() throws IOException {
this.schemaRegistry = new SchemaRegistryMockExtension(Collections.singletonList(new ProtobufSchemaProvider()));
try (final InputStream input = ProtobufRegistryMockExtensionTest.class.getResourceAsStream("/record.proto");
final BufferedReader reader = new BufferedReader(new InputStreamReader(input))) {
this.schema = new ProtobufSchema(reader.lines().collect(Collectors.joining("\n")));
}
}

@Test
void shouldRegisterKeySchema() throws IOException, RestClientException {
final int id = this.schemaRegistry.registerKeySchema("test-topic", this.schema);

final ParsedSchema retrievedSchema = this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id);
assertThat(retrievedSchema).isEqualTo(this.schema);
}

@Test
void shouldRegisterValueSchema() throws IOException, RestClientException {
final int id = this.schemaRegistry.registerValueSchema("test-topic", this.schema);

final ParsedSchema retrievedSchema = this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id);
assertThat(retrievedSchema).isEqualTo(this.schema);
}

@Test
void shouldRegisterKeySchemaWithClient() throws IOException, RestClientException {
final int id = this.schemaRegistry.getSchemaRegistryClient().register("test-topic-key", this.schema);

final ParsedSchema retrievedSchema = this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id);
assertThat(retrievedSchema).isEqualTo(this.schema);
}

@Test
void shouldRegisterValueSchemaWithClient() throws IOException, RestClientException {
final int id = this.schemaRegistry.getSchemaRegistryClient().register("test-topic-value", this.schema);

final ParsedSchema retrievedSchema = this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id);
assertThat(retrievedSchema).isEqualTo(this.schema);
}

@Test
void shouldHaveSchemaVersions() throws IOException, RestClientException {
final String topic = "test-topic";
final int id = this.schemaRegistry.registerValueSchema(topic, this.schema);

final List<Integer> versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value");
assertThat(versions).hasSize(1);

final SchemaMetadata metadata =
this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0));
assertThat(metadata.getId()).isEqualTo(id);
final String schemaString = metadata.getSchema();
final ParsedSchema retrievedSchema = new ProtobufSchema(schemaString);
assertThat(retrievedSchema).isEqualTo(this.schema);
}

@Test
void shouldReturnAllSubjects() throws IOException, RestClientException {
this.schemaRegistry.registerKeySchema("test-topic", this.schema);
this.schemaRegistry.registerValueSchema("test-topic", this.schema);
final Collection<String> allSubjects = this.schemaRegistry.getSchemaRegistryClient().getAllSubjects();
assertThat(allSubjects).hasSize(2).containsExactly("test-topic-key", "test-topic-value");
}


@Test
void shouldDeleteKeySchema() throws IOException, RestClientException {
this.schemaRegistry.registerKeySchema("test-topic", this.schema);
final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient();
final Collection<String> allSubjects = client.getAllSubjects();
assertThat(allSubjects).hasSize(1).containsExactly("test-topic-key");
this.schemaRegistry.deleteKeySchema("test-topic");
final Collection<String> subjectsAfterDeletion = client.getAllSubjects();
assertThat(subjectsAfterDeletion).isEmpty();
}

@Test
void shouldDeleteValueSchema() throws IOException, RestClientException {
final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient();
this.schemaRegistry.registerValueSchema("test-topic", this.schema);
final Collection<String> allSubjects = client.getAllSubjects();
assertThat(allSubjects).hasSize(1).containsExactly("test-topic-value");
this.schemaRegistry.deleteValueSchema("test-topic");
final Collection<String> subjectsAfterDeletion = client.getAllSubjects();
assertThat(subjectsAfterDeletion).isEmpty();
}

@Test
void shouldDeleteKeySchemaWithClient() throws IOException, RestClientException {
final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient();
this.schemaRegistry.registerKeySchema("test-topic", this.schema);
final Collection<String> allSubjects = client.getAllSubjects();
assertThat(allSubjects).hasSize(1).containsExactly("test-topic-key");
client.deleteSubject("test-topic-key");
final Collection<String> subjectsAfterDeletion = client.getAllSubjects();
assertThat(subjectsAfterDeletion).isEmpty();
}

@Test
void shouldDeleteValueSchemaWithClient() throws IOException, RestClientException {
final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient();
this.schemaRegistry.registerValueSchema("test-topic", this.schema);
final Collection<String> allSubjects = client.getAllSubjects();
assertThat(allSubjects).hasSize(1).containsExactly("test-topic-value");
client.deleteSubject("test-topic-value");
final Collection<String> subjectsAfterDeletion = client.getAllSubjects();
assertThat(subjectsAfterDeletion).isEmpty();
}

@Test
void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClientException {
final String topic = "test-topic";
final int id = this.schemaRegistry.registerValueSchema(topic, this.schema);

final List<Integer> versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value");
assertThat(versions).hasSize(1);

final SchemaMetadata metadata =
this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0));
assertThat(metadata.getId()).isEqualTo(id);
assertThat(this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"))
.isNotNull();
this.schemaRegistry.deleteValueSchema(topic);
assertThatExceptionOfType(RestClientException.class)
.isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"))
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
assertThatExceptionOfType(RestClientException.class)
.isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient()
.getSchemaMetadata(topic + "-value", versions.get(0)))
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
assertThatExceptionOfType(RestClientException.class)
.isThrownBy(
() -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"))
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
}

}
9 changes: 9 additions & 0 deletions schema-registry-mock-junit5/src/test/resources/record.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
syntax = "proto3";
package com.bakdata.proto;

import "nested.proto";

message Record {
string f1 = 1;
Nested f2 = 2;
}

0 comments on commit be2010c

Please sign in to comment.