Skip to content
This repository has been archived by the owner on Apr 12, 2023. It is now read-only.

Commit

Permalink
Add send tombstone (#156)
Browse files Browse the repository at this point in the history
* Add tombostone checkbox

* Add send tombstone to producer

* Fix PR lint

* Set minHeigth

* Improve UI when tombstone is selected

* Add send tombstone test

* Fix PR lint

* Update docs

Co-authored-by: Auto Lint <[email protected]>
  • Loading branch information
andrewinci and Auto Lint authored Nov 27, 2020
1 parent 6a10587 commit f8b68bf
Show file tree
Hide file tree
Showing 17 changed files with 252 additions and 38 deletions.
4 changes: 3 additions & 1 deletion Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Download the binary from the latest release for your OS. Learn more [here](https
* **Producer**
* Avro producer with **autocompletion** based on the schema
* String producer
* Send tombstones
* 🚧 **Consumer groups** 🚧
* List consumer groups
* Show topics, partitions and lags
Expand All @@ -105,7 +106,8 @@ See https://openjdk.java.net/jeps/343 for more info.

### Build the documentation

The documentation for the github page is available under the `/docs/` folder
The documentation for the github page is available under the `/docs/` folder.
To build and serve it, use the following.

```bash
bundle exec jekyll serve
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package insulator.integrationtest
package insulator.integrationtest.cluster

import insulator.integrationtest.helpers.IntegrationTestFixture
import insulator.integrationtest.helpers.click
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package insulator.integrationtest
package insulator.integrationtest.cluster

// import insulator.integrationtest.helpers.click
// import insulator.integrationtest.helpers.getPrimaryWindow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import insulator.kafka.local.SchemaRegistryContainer
import insulator.kafka.model.Cluster
import insulator.kafka.model.SchemaRegistryConfiguration
import insulator.kafka.model.Topic
import insulator.kafka.model.TopicConfiguration
import insulator.kafka.producer.Producer
import insulator.kafka.producer.stringProducer
import insulator.kafka.schemaRegistry
Expand Down Expand Up @@ -96,6 +97,22 @@ class IntegrationTestFixture : Closeable {
}
}

suspend fun createCompactedTopic(s: String) = adminApi!!.createTopics(
Topic(
s,
partitionCount = 1,
replicationFactor = 1,
isCompacted = true,
configuration = TopicConfiguration(
mapOf(
"delete.retention.ms" to "0",
"min.cleanable.dirty.ratio" to "0.001",
"segment.ms" to "100",
)
)
)
)

suspend fun createTopic(s: String) = adminApi!!.createTopics(Topic(s, partitionCount = 3, replicationFactor = 1))
fun createTestSchema(schemaName: String) = schemaRegistry!!.register(schemaName, testSchema(5))
fun createTestSchemaUpdate(schemaName: String) = schemaRegistry!!.register(schemaName, testSchema(4))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package insulator.integrationtest
package insulator.integrationtest.producer

import insulator.helper.runOnFXThread
import insulator.integrationtest.helpers.IntegrationTestFixture
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package insulator.integrationtest.producer

import insulator.helper.runOnFXThread
import insulator.integrationtest.helpers.IntegrationTestFixture
import insulator.integrationtest.helpers.click
import insulator.integrationtest.helpers.lookupFirst
import insulator.integrationtest.helpers.screenShoot
import insulator.integrationtest.helpers.selectCluster
import insulator.integrationtest.helpers.selectTopic
import insulator.integrationtest.helpers.startStopConsumer
import insulator.integrationtest.helpers.waitWindowWithTitle
import insulator.viewmodel.main.topic.RecordViewModel
import io.kotest.assertions.arrow.either.shouldBeRight
import io.kotest.core.spec.style.FreeSpec
import io.kotest.core.spec.style.freeSpec
import io.kotest.matchers.collections.shouldContainAll
import io.kotest.matchers.ints.shouldBeLessThan
import io.kotest.matchers.ints.shouldBeLessThanOrEqual
import io.kotest.matchers.shouldNotBe
import javafx.scene.control.Button
import javafx.scene.control.CheckBox
import javafx.scene.control.TableView
import javafx.scene.control.TextInputControl
import kotlinx.coroutines.delay
import tornadofx.CssRule
import tornadofx.Stylesheet
import kotlin.time.ExperimentalTime

@ExperimentalTime
class ProducerTombstoneTest : FreeSpec({
include(testProduceToCompactedTopic(false))
include(testProduceToCompactedTopic(true))
})

@ExperimentalTime
fun testProduceToCompactedTopic(testTombstones: Boolean) = freeSpec {
// customize log
val withTombstone = if (testTombstones) " with tombstones" else ""

"Test produce to compacted topic$withTombstone" - {
IntegrationTestFixture().use { fixture ->
val numberOfMessages = 12
val clusterName = "Test cluster"
fixture.startAppWithKafkaCuster(clusterName, false)

// create topic
val testTopicName = "test-producer-topic"
fixture.createCompactedTopic(testTopicName) shouldBeRight {}

// open main view
selectCluster(fixture.currentKafkaCluster)
val mainView = waitWindowWithTitle("Insulator")

"Produce to one topic" {

// select topic
mainView.selectTopic("topic-$testTopicName")

// start consumer
mainView.startStopConsumer()

repeat(numberOfMessages) { index ->
// open producer view
mainView.lookupFirst<Button>(CssRule.id("button-produce")).click()
val producerView = waitWindowWithTitle("Insulator Producer")

// set key and value
mapOf("field-producer-key" to index.toString(), "field-producer-value" to "first-value")
.map { (id, value) ->
producerView
.lookupFirst<TextInputControl>(CssRule.id(id))
.runOnFXThread { textProperty().set(value) }
}

producerView.lookupFirst<Button>(CssRule.id("button-producer-send")).click()
delay(500)
}

screenShoot("produce-messages$withTombstone")

// send tombstones
repeat(numberOfMessages) { index ->
// open producer view
mainView.lookupFirst<Button>(CssRule.id("button-produce")).click()
val producerView = waitWindowWithTitle("Insulator Producer")

// set key and value
mapOf("field-producer-key" to index.toString(), "field-producer-value" to "updated-value-$index")
.map { (id, value) ->
producerView
.lookupFirst<TextInputControl>(CssRule.id(id))
.runOnFXThread { textProperty().set(value) }
}
if (testTombstones) {
// the only checkbox is the tombstone one
producerView.lookupFirst<CheckBox>(CssRule.c("check-box")).runOnFXThread { isSelected = true }
}

producerView.lookupFirst<Button>(CssRule.id("button-producer-send")).click()
delay(500)
}

screenShoot("produce-to-compacted-topics$withTombstone")
// consume all records again
mainView.startStopConsumer()
delay(10_000)
mainView.startStopConsumer()

// assert
val recordTable = mainView.lookupFirst<TableView<RecordViewModel>>(Stylesheet.tableView)
if (testTombstones) {
// at least one record was removed by compacting the tombstone and cleanup
recordTable.items.map { it.keyProperty.value }.toSet() shouldNotBe (0 until numberOfMessages)
recordTable.items.size shouldBeLessThanOrEqual numberOfMessages
} else {
// at least 1 message was updated
recordTable.items.size shouldBeLessThan 2 * numberOfMessages
recordTable.items.map { it.valueProperty.value } shouldContainAll (0 until numberOfMessages).map { "updated-value-$it" }
}

// stop consumer
delay(500)
mainView.startStopConsumer()
screenShoot("update-records-$withTombstone")
}
}
}
}
1 change: 1 addition & 0 deletions app/src/main/kotlin/insulator/ui/style/CheckBoxStyle.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class CheckBoxStyle : Stylesheet() {
borderInsets = multi(box(1.0.px))
borderColor = multi(box(theme.mainColor))
}
textFill = theme.black
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import insulator.kafka.producer.SerializationFormat
import insulator.kafka.producer.StringProducer
import insulator.viewmodel.common.InsulatorViewModel
import javafx.beans.binding.Bindings
import javafx.beans.property.SimpleBooleanProperty
import javafx.beans.property.SimpleStringProperty
import javafx.beans.value.ObservableBooleanValue
import tornadofx.onChange
Expand All @@ -25,6 +26,7 @@ class ProducerViewModel @Inject constructor(
private val stringProducer: StringProducer
) : InsulatorViewModel() {

val isTombstoneProperty = SimpleBooleanProperty(false)
val serializeValueProperty = SimpleStringProperty(SerializationFormat.String.name)

private val producer: Producer
Expand All @@ -38,15 +40,15 @@ class ProducerViewModel @Inject constructor(
val keyProperty = SimpleStringProperty()
val valueProperty = SimpleStringProperty()
val canSendProperty: ObservableBooleanValue = Bindings.createBooleanBinding(
{ validationErrorProperty.value == null && !keyProperty.value.isNullOrEmpty() && !valueProperty.value.isNullOrEmpty() },
{ ((validationErrorProperty.value == null && !valueProperty.value.isNullOrEmpty()) || isTombstoneProperty.value) && !keyProperty.value.isNullOrEmpty() },
validationErrorProperty,
keyProperty,
valueProperty,
isTombstoneProperty
)

init {
if (cluster.isSchemaRegistryConfigured())
serializeValueProperty.set(SerializationFormat.Avro.name)
if (cluster.isSchemaRegistryConfigured()) serializeValueProperty.set(SerializationFormat.Avro.name)
listOf(valueProperty, serializeValueProperty).forEach {
it.onChange { value ->
value?.let {
Expand All @@ -67,15 +69,11 @@ class ProducerViewModel @Inject constructor(
}

suspend fun send() {
if (keyProperty.value.isNullOrBlank()) {
error.set(Exception("Invalid key. Key must be not empty"))
return
when {
keyProperty.value.isNullOrBlank() -> error.set(Exception("Invalid key. Key must be not empty"))
isTombstoneProperty.value -> producer.sendTombstone(topic.name, keyProperty.value)
valueProperty.value.isNullOrBlank() -> error.set(Exception("Invalid value. Value must be not empty"))
else -> producer.send(topic.name, keyProperty.value, valueProperty.value).mapLeft { error.set(it) }
}
if (valueProperty.value.isNullOrBlank()) {
error.set(Exception("Invalid value. Value must be not empty"))
return
}
producer.send(topic.name, keyProperty.value, valueProperty.value)
.mapLeft { error.set(it) }
}
}
49 changes: 38 additions & 11 deletions app/src/main/kotlin/insulator/views/main/topic/ProducerView.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import insulator.viewmodel.main.topic.ProducerViewModel
import javafx.beans.binding.Bindings
import javafx.event.EventTarget
import javafx.geometry.Pos
import javafx.scene.Node
import javafx.scene.control.ScrollPane
import javafx.scene.control.TextArea
import javafx.scene.layout.Priority
Expand All @@ -19,36 +20,47 @@ import tornadofx.action
import tornadofx.attachTo
import tornadofx.borderpane
import tornadofx.button
import tornadofx.checkbox
import tornadofx.combobox
import tornadofx.enableWhen
import tornadofx.hbox
import tornadofx.label
import tornadofx.managedWhen
import tornadofx.onChange
import tornadofx.onDoubleClick
import tornadofx.scrollpane
import tornadofx.textfield
import tornadofx.vbox
import tornadofx.vgrow
import tornadofx.visibleWhen
import javax.inject.Inject

@TopicScope
class ProducerView @Inject constructor(
override val viewModel: ProducerViewModel
) : InsulatorView() {

init {
viewModel.isTombstoneProperty.onChange { resize() }
}

private val recordValueTextArea = TextArea()

override val root = vbox(spacing = 10.0) {
override val root = vbox(spacing = 15.0) {
appBar { title = viewModel.topic.name }
fieldName("Key")
textfield(viewModel.keyProperty) { id = "field-producer-key" }

valueFormatOptions()
valueFormatOptions()?.hideIfTombstone()

fieldName("Value")
recordValueTextArea()
hbox(spacing = 20.0, Pos.CENTER_LEFT) {
fieldName("Value")
checkbox("Tombstone", viewModel.isTombstoneProperty)
}
recordValueTextArea().hideIfTombstone()

fieldName("Validation")
validationArea()
fieldName("Validation").hideIfTombstone()
validationArea().hideIfTombstone()

borderpane {
right = button("Send") {
Expand All @@ -63,7 +75,12 @@ class ProducerView @Inject constructor(
prefHeight = 800.0
}

private fun EventTarget.valueFormatOptions() {
private fun Node.hideIfTombstone() = apply {
visibleWhen { viewModel.isTombstoneProperty.not() }
managedWhen { viewModel.isTombstoneProperty.not() }
}

private fun EventTarget.valueFormatOptions() =
if (viewModel.cluster.isSchemaRegistryConfigured()) {
hbox(alignment = Pos.CENTER_LEFT) {
fieldName("Serializer")
Expand All @@ -72,8 +89,7 @@ class ProducerView @Inject constructor(
valueProperty().bindBidirectional(viewModel.serializeValueProperty)
}
}
}
}
} else null

private fun EventTarget.validationArea() =
scrollpane {
Expand All @@ -91,21 +107,32 @@ class ProducerView @Inject constructor(
maxHeight = 100.0
}

private fun EventTarget.recordValueTextArea() {
private fun EventTarget.recordValueTextArea() =
recordValueTextArea.apply {
id = "field-producer-value"
textProperty().bindBidirectional(viewModel.valueProperty)
vgrow = Priority.ALWAYS
}.attachTo(this)
}

private fun autoComplete() {
if (!viewModel.nextFieldProperty.value.isNullOrEmpty())
with(recordValueTextArea) { insertText(caretPosition, "\"${viewModel.nextFieldProperty.value}\":") }
}

private fun resize() = currentStage?.let {
if (viewModel.isTombstoneProperty.value) {
it.minHeight = 260.0
it.height = 260.0
it.maxHeight = 260.0
} else {
it.minHeight = 800.0
it.maxHeight = Double.MAX_VALUE
}
}

override fun onDock() {
title = "Insulator Producer"
resize()
super.onDock()
}
}
Loading

0 comments on commit f8b68bf

Please sign in to comment.