Skip to content
This repository has been archived by the owner on Apr 12, 2023. It is now read-only.

Commit

Permalink
feat: Allow to select the avro schema version in the producer view (#225
Browse files Browse the repository at this point in the history
)

* fix: check for unexpected fields in json2avro conversion (#226)

* bug: check if all fields are used when the record is build

* Rename unused to unexpected

* Add combobox to pick the schema to validate against

* Use schema registry cache and allow to select the version in validation

* Valdiate against different schema versions

* Pass the schema version to the producer send

* lint

* Fix tests

* Lint

* Fix tests

* Fix tests
  • Loading branch information
andrewinci authored Feb 12, 2022
1 parent 8c97a81 commit 4b9f64f
Show file tree
Hide file tree
Showing 15 changed files with 160 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import insulator.di.TopicScope
import insulator.helper.runOnFXThread
import insulator.kafka.consumer.ConsumeFrom
import insulator.kafka.consumer.Consumer
import insulator.kafka.consumer.DeserializationFormat
import insulator.kafka.model.Topic
import insulator.kafka.producer.SerializationFormat
import javafx.beans.property.SimpleBooleanProperty
import javafx.beans.property.SimpleObjectProperty
import javafx.beans.property.SimpleStringProperty
Expand All @@ -24,7 +24,7 @@ class ConsumerViewModel @Inject constructor(
val records: ObservableList<RecordViewModel> = FXCollections.observableList(LinkedList())
val isConsumingProperty = SimpleBooleanProperty(false)
val consumeFromProperty = SimpleStringProperty(ConsumeFrom.LastDay.text)
val deserializeValueProperty = SimpleStringProperty(DeserializationFormat.String.toString())
val deserializeValueProperty = SimpleStringProperty(SerializationFormat.String.toString())
val searchItem = SimpleStringProperty("")
val comparatorProperty = SimpleObjectProperty<Comparator<RecordViewModel>>()

Expand All @@ -48,7 +48,7 @@ class ConsumerViewModel @Inject constructor(
isConsumingProperty.value = true
clearRecords()
val consumerFrom = ConsumeFrom.values().first { it.text == consumeFromProperty.value }
val deserializationFormat = DeserializationFormat.valueOf(deserializeValueProperty.value)
val deserializationFormat = SerializationFormat.valueOf(deserializeValueProperty.value)
consumer.start(topic.name, consumerFrom, deserializationFormat) {
records.runOnFXThread { addAll(it.map { record -> RecordViewModel(record) }) }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package insulator.viewmodel.main.topic

import insulator.di.TopicScope
import insulator.helper.createListBindings
import insulator.helper.dispatch
import insulator.jsonhelper.jsontoavro.JsonFieldParsingException
import insulator.jsonhelper.jsontoavro.JsonMissingFieldException
import insulator.kafka.SchemaRegistry
import insulator.kafka.model.Cluster
import insulator.kafka.model.Schema
import insulator.kafka.model.Subject
import insulator.kafka.model.Topic
import insulator.kafka.producer.AvroProducer
import insulator.kafka.producer.Producer
Expand All @@ -13,8 +17,10 @@ import insulator.kafka.producer.StringProducer
import insulator.viewmodel.common.InsulatorViewModel
import javafx.beans.binding.Bindings
import javafx.beans.property.SimpleBooleanProperty
import javafx.beans.property.SimpleObjectProperty
import javafx.beans.property.SimpleStringProperty
import javafx.beans.value.ObservableBooleanValue
import javafx.collections.ObservableList
import tornadofx.onChange
import javax.inject.Inject

Expand All @@ -23,18 +29,25 @@ class ProducerViewModel @Inject constructor(
val topic: Topic,
val cluster: Cluster,
private val avroProducer: AvroProducer?,
private val stringProducer: StringProducer
private val stringProducer: StringProducer,
private val schemaRegistry: SchemaRegistry?,
) : InsulatorViewModel() {

val isTombstoneProperty = SimpleBooleanProperty(false)
val serializeValueProperty = SimpleStringProperty(SerializationFormat.String.name)
val serializationFormatProperty = SimpleObjectProperty(SerializationFormat.String)

private val producer: Producer
get() = when (SerializationFormat.valueOf(serializeValueProperty.value!!)) {
get() = when (serializationFormatProperty.value) {
SerializationFormat.Avro -> avroProducer ?: throw Exception("Null AvroProducer")
SerializationFormat.String -> stringProducer
null -> throw Exception("Invalid serialization format")
}

private val subjectProperty = SimpleObjectProperty<Subject?>()
val versionsProperty: ObservableList<Schema> =
createListBindings({ subjectProperty.value?.schemas ?: emptyList() }, subjectProperty)
val selectedVersionProperty = SimpleObjectProperty<Schema?>()

val nextFieldProperty = SimpleStringProperty("")
val validationErrorProperty = SimpleStringProperty(null)
val keyProperty = SimpleStringProperty()
Expand All @@ -48,12 +61,24 @@ class ProducerViewModel @Inject constructor(
)

init {
if (cluster.isSchemaRegistryConfigured()) serializeValueProperty.set(SerializationFormat.Avro.name)
listOf(valueProperty, serializeValueProperty).forEach {
it.onChange { value ->
value?.let {
producer.dispatch {
validate(value, topic.name).fold(
if (cluster.isSchemaRegistryConfigured()) {
serializationFormatProperty.set(SerializationFormat.Avro)
schemaRegistry?.dispatch {
val subjects = getSubject("${topic.name}-value")
subjects.fold(
{ validationErrorProperty.set("Unable to retrieve the list of schemas for the topic: ${topic.name}") },
{
subjectProperty.set(it)
selectedVersionProperty.set(subjectProperty.value?.schemas?.last())
}
)
}
}
listOf(valueProperty, serializationFormatProperty, selectedVersionProperty).forEach {
it.onChange {
producer.dispatch {
if (valueProperty.value != null) {
validate(valueProperty.value, topic.name, selectedVersionProperty.value?.version).fold(
{ error ->
if (error is JsonMissingFieldException) nextFieldProperty.value = error.fieldName
if (error is JsonFieldParsingException || validationErrorProperty.value.isNullOrEmpty())
Expand All @@ -73,7 +98,12 @@ class ProducerViewModel @Inject constructor(
keyProperty.value.isNullOrBlank() -> error.set(Exception("Invalid key. Key must be not empty"))
isTombstoneProperty.value -> producer.sendTombstone(topic.name, keyProperty.value)
valueProperty.value.isNullOrBlank() -> error.set(Exception("Invalid value. Value must be not empty"))
else -> producer.send(topic.name, keyProperty.value, valueProperty.value).mapLeft { error.set(it) }
else -> producer.send(
topic.name,
keyProperty.value,
valueProperty.value,
selectedVersionProperty.value?.version
).mapLeft { error.set(it) }
}
}
}
76 changes: 41 additions & 35 deletions app/src/main/kotlin/insulator/views/main/topic/ProducerView.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package insulator.views.main.topic

import com.sun.javafx.collections.ObservableListWrapper
import insulator.di.TopicScope
import insulator.helper.dispatch
import insulator.helper.toObservable
import insulator.kafka.consumer.DeserializationFormat
import insulator.kafka.model.Schema
import insulator.kafka.producer.SerializationFormat
import insulator.ui.common.InsulatorView
import insulator.ui.component.appBar
import insulator.ui.component.fieldName
Expand All @@ -16,23 +17,7 @@ import javafx.scene.control.ScrollPane
import javafx.scene.control.TextArea
import javafx.scene.layout.Priority
import javafx.scene.paint.Color
import tornadofx.action
import tornadofx.attachTo
import tornadofx.borderpane
import tornadofx.button
import tornadofx.checkbox
import tornadofx.combobox
import tornadofx.enableWhen
import tornadofx.hbox
import tornadofx.label
import tornadofx.managedWhen
import tornadofx.onChange
import tornadofx.onDoubleClick
import tornadofx.scrollpane
import tornadofx.textfield
import tornadofx.vbox
import tornadofx.vgrow
import tornadofx.visibleWhen
import tornadofx.* // ktlint-disable no-wildcard-imports
import javax.inject.Inject

@TopicScope
Expand All @@ -59,7 +44,6 @@ class ProducerView @Inject constructor(
}
recordValueTextArea().hideIfTombstone()

fieldName("Validation").hideIfTombstone()
validationArea().hideIfTombstone()

borderpane {
Expand All @@ -84,27 +68,49 @@ class ProducerView @Inject constructor(
if (viewModel.cluster.isSchemaRegistryConfigured()) {
hbox(alignment = Pos.CENTER_LEFT) {
fieldName("Serializer")
combobox<String> {
items = DeserializationFormat.values().toObservable { it.toString() }
valueProperty().bindBidirectional(viewModel.serializeValueProperty)
combobox<SerializationFormat> {
items = ObservableListWrapper(SerializationFormat.values().toList())
valueProperty().bindBidirectional(viewModel.serializationFormatProperty)
}
}
} else null

private fun EventTarget.validationArea() =
scrollpane {
label {
val warning = { viewModel.validationErrorProperty.value }
textProperty().bind(Bindings.createStringBinding({ if (warning().isNullOrEmpty()) "Valid" else warning() }, viewModel.validationErrorProperty))
textFillProperty().bind(
Bindings.createObjectBinding({ if (warning().isNullOrEmpty()) Color.GREEN else Color.RED }, viewModel.validationErrorProperty)
)
isWrapText = true
onDoubleClick { autoComplete() }
vbox {
fieldName("Validation")
if (viewModel.cluster.isSchemaRegistryConfigured()) {
hbox(alignment = Pos.CENTER_LEFT) {
fieldName("Schema version")
combobox<Schema> {
id = "combobox-schema-version"
items.bind(viewModel.versionsProperty) { it }
valueProperty().bindBidirectional(viewModel.selectedVersionProperty)
cellFormat { text = "v: ${it.version} id: ${it.id}" }
}
}.visibleWhen(viewModel.serializationFormatProperty.isEqualTo(SerializationFormat.Avro))
}
scrollpane {
label {
val warning = { viewModel.validationErrorProperty.value }
textProperty().bind(
Bindings.createStringBinding(
{ if (warning().isNullOrEmpty()) "Valid" else warning() },
viewModel.validationErrorProperty
)
)
textFillProperty().bind(
Bindings.createObjectBinding(
{ if (warning().isNullOrEmpty()) Color.GREEN else Color.RED },
viewModel.validationErrorProperty
)
)
isWrapText = true
onDoubleClick { autoComplete() }
}
vbarPolicy = ScrollPane.ScrollBarPolicy.NEVER
minHeight = 50.0
maxHeight = 100.0
}
vbarPolicy = ScrollPane.ScrollBarPolicy.NEVER
minHeight = 50.0
maxHeight = 100.0
}

private fun EventTarget.recordValueTextArea() =
Expand Down
6 changes: 3 additions & 3 deletions app/src/main/kotlin/insulator/views/main/topic/TopicView.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import insulator.helper.dispatch
import insulator.helper.hideOnReadonly
import insulator.helper.toObservable
import insulator.kafka.consumer.ConsumeFrom
import insulator.kafka.consumer.DeserializationFormat
import insulator.kafka.model.Cluster
import insulator.kafka.producer.SerializationFormat
import insulator.ui.common.InsulatorTabView
import insulator.ui.component.appBar
import insulator.ui.component.confirmationButton
Expand Down Expand Up @@ -77,10 +77,10 @@ class TopicView @Inject constructor(

private fun EventTarget.valueFormatOptions() {
if (cluster.isSchemaRegistryConfigured()) {
viewModel.consumerViewModel.deserializeValueProperty.set(DeserializationFormat.Avro.name)
viewModel.consumerViewModel.deserializeValueProperty.set(SerializationFormat.Avro.name)
fieldName("deserializer")
combobox<String> {
items = DeserializationFormat.values().toObservable { it.toString() }
items = SerializationFormat.values().toObservable { it.toString() }
valueProperty().bindBidirectional(viewModel.consumerViewModel.deserializeValueProperty)
enableWhen(viewModel.consumerViewModel.isConsumingProperty.not())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import arrow.core.left
import arrow.core.right
import helper.FxContext
import insulator.jsonhelper.jsontoavro.JsonToAvroException
import insulator.kafka.SchemaRegistry
import insulator.kafka.model.Cluster
import insulator.kafka.model.Schema
import insulator.kafka.model.SchemaRegistryConfiguration
import insulator.kafka.model.Subject
import insulator.kafka.model.Topic
import insulator.kafka.producer.AvroProducer
import insulator.kafka.producer.StringProducer
Expand All @@ -27,9 +30,9 @@ class ProducerViewModelTest : StringSpec({
ProducerViewModelTestFixture().use {
// arrange
val mockkProducer = mockk<StringProducer> {
coEvery { validate(any(), any()) } returns JsonToAvroException(errorMessage).left()
coEvery { validate(any(), any(), any()) } returns JsonToAvroException(errorMessage).left()
}
val sut = ProducerViewModel(it.mockTopic, it.cluster, mockk(), mockkProducer)
val sut = ProducerViewModel(it.mockTopic, it.cluster, mockk(), mockkProducer, mockk())
// act
sut.valueProperty.set("test")
// assert
Expand Down Expand Up @@ -58,9 +61,9 @@ class ProducerViewModelTest : StringSpec({
// arrange
val cluster = Cluster.empty().copy(schemaRegistryConfig = SchemaRegistryConfiguration("sample"))
// act
val sut = ProducerViewModel(it.mockTopic, cluster, it.avroProducer, mockk())
val sut = ProducerViewModel(it.mockTopic, cluster, it.avroProducer, mockk(), it.mockSchemaRegistry)
// assert
sut.serializeValueProperty.value.toString() shouldBe "Avro"
sut.serializationFormatProperty.value.toString() shouldBe "Avro"
}
}

Expand All @@ -80,7 +83,7 @@ class ProducerViewModelTest : StringSpec({
"send happy path" {
ProducerViewModelTestFixture().use {
// arrange
val sut = ProducerViewModel(it.mockTopic, it.cluster, mockk(relaxed = true), it.stringProducer)
val sut = ProducerViewModel(it.mockTopic, it.cluster, mockk(relaxed = true), it.stringProducer, mockk())
sut.valueProperty.set("test")
sut.keyProperty.set("test")
// act
Expand All @@ -95,10 +98,10 @@ class ProducerViewModelTest : StringSpec({
ProducerViewModelTestFixture().use {
// arrange
val mockProducer = mockk<StringProducer> {
coEvery { validate(any(), any()) } returns Unit.right()
coEvery { send(any(), any(), any()) } returns Throwable("sample").left()
coEvery { validate(any(), any(), any()) } returns Unit.right()
coEvery { send(any(), any(), any(), any()) } returns Throwable("sample").left()
}
val sut = ProducerViewModel(it.mockTopic, it.cluster, mockk(relaxed = true), mockProducer)
val sut = ProducerViewModel(it.mockTopic, it.cluster, mockk(relaxed = true), mockProducer, mockk())
sut.valueProperty.set("test")
sut.keyProperty.set("test")
// act
Expand All @@ -112,15 +115,20 @@ class ProducerViewModelTest : StringSpec({

class ProducerViewModelTestFixture : FxContext() {
val mockTopic = Topic.empty()
val avroProducer = mockk<AvroProducer> { coEvery { validate(any(), any()) } returns Unit.right() }
val avroProducer = mockk<AvroProducer> { coEvery { validate(any(), any(), any()) } returns Unit.right() }
val stringProducer = mockk<StringProducer> {
coEvery { validate(any(), any()) } returns Unit.right()
coEvery { send(any(), any(), any()) } returns Unit.right()
coEvery { validate(any(), any(), any()) } returns Unit.right()
coEvery { send(any(), any(), any(), any()) } returns Unit.right()
}
val mockSchema = Schema("name", 1, 1)
val mockSchemaRegistry = mockk<SchemaRegistry>() {
coEvery { getSubject(any()) } returns Subject("", listOf(mockSchema)).right()
}
val sut = ProducerViewModel(
mockTopic,
cluster,
avroProducer,
stringProducer
stringProducer,
mockSchemaRegistry
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ConsumerGroupTest : FreeSpec({
put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupName)
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java)
}
producer.send(testTopicName, "test", "test")
producer.send(testTopicName, "test", "test", null)

"get list of consumer groups" {
// start a consumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package insulator.kafka.consumer
import arrow.core.Either
import insulator.kafka.model.Cluster
import insulator.kafka.model.Record
import insulator.kafka.producer.SerializationFormat
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.avro.generic.GenericRecord
Expand Down Expand Up @@ -31,7 +32,7 @@ class Consumer(
private var threadLoop: Thread? = null
private var running = false

suspend fun start(topic: String, from: ConsumeFrom, valueFormat: DeserializationFormat, callback: ConsumerCallback) =
suspend fun start(topic: String, from: ConsumeFrom, valueFormat: SerializationFormat, callback: ConsumerCallback) =
suspendCoroutine<Unit> { continuation ->
GlobalScope.launch {
if (isRunning()) throw IllegalStateException("Consumer already running")
Expand Down Expand Up @@ -122,8 +123,3 @@ enum class ConsumeFrom(val text: String) {
LastWeek("Last week"),
Beginning("Beginning"),
}

enum class DeserializationFormat {
String,
Avro,
}
Loading

0 comments on commit 4b9f64f

Please sign in to comment.