diff --git a/instrumentation/kamon-kafka/src/main/resources/reference.conf b/instrumentation/kamon-kafka/src/main/resources/reference.conf index d0e6a2b49..6b5d2a119 100644 --- a/instrumentation/kamon-kafka/src/main/resources/reference.conf +++ b/instrumentation/kamon-kafka/src/main/resources/reference.conf @@ -17,6 +17,10 @@ kamon.instrumentation.kafka { use-delayed-spans = no } } + + sensor.metrics { + enable = true + } } kanela.modules { @@ -25,11 +29,13 @@ kanela.modules { description = "Provides distributed context propagation for the Apache Kafka Producer and Consumer" instrumentations = [ "kamon.instrumentation.kafka.client.ProducerInstrumentation", - "kamon.instrumentation.kafka.client.ConsumerInstrumentation" + "kamon.instrumentation.kafka.client.ConsumerInstrumentation", + "kamon.instrumentation.kafka.sensor.SensorInstrumentation" ] within = [ "org.apache.kafka.clients..*", + "org.apache.kafka.common..*", ] } } \ No newline at end of file diff --git a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/KafkaInstrumentation.scala b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/KafkaInstrumentation.scala index 98567a1f1..52d9d4ef5 100644 --- a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/KafkaInstrumentation.scala +++ b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/KafkaInstrumentation.scala @@ -18,10 +18,12 @@ object KafkaInstrumentation { private def readSettings(config: Config): Settings = { val kafkaConfig = config.getConfig("kamon.instrumentation.kafka.client") + val sensorConfig = config.getConfig("kamon.instrumentation.kafka.sensor") Settings( continueTraceOnConsumer = kafkaConfig.getBoolean("tracing.continue-trace-on-consumer"), - useDelayedSpans = kafkaConfig.getBoolean("tracing.use-delayed-spans") + useDelayedSpans = kafkaConfig.getBoolean("tracing.use-delayed-spans"), + enableSensorMetrics = sensorConfig.getBoolean("metrics.enable") ) } @@ -182,6 +184,7 @@ object KafkaInstrumentation { case class Settings ( continueTraceOnConsumer: Boolean, - useDelayedSpans: Boolean + useDelayedSpans: Boolean, + enableSensorMetrics: Boolean ) } diff --git a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/SensorBridge.scala b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/SensorBridge.scala new file mode 100644 index 000000000..40798410e --- /dev/null +++ b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/SensorBridge.scala @@ -0,0 +1,15 @@ +package kamon.instrumentation.kafka.sensor + +import kamon.metric.Gauge + +sealed trait SensorBridge { + def record(value: Double): Unit +} + +case class GaugeBridge(gauge: Gauge) extends SensorBridge { + override def record(value: Double): Unit = gauge.update(value) +} + +case object NoOpBridge extends SensorBridge { + override def record(value: Double): Unit = {} +} diff --git a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/SensorInstrumentation.scala b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/SensorInstrumentation.scala new file mode 100644 index 000000000..36a7d09ca --- /dev/null +++ b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/SensorInstrumentation.scala @@ -0,0 +1,23 @@ +package kamon.instrumentation.kafka.sensor + +import kamon.instrumentation.kafka.sensor.advisor.SensorAdvisors.SensorMixin +import kamon.instrumentation.kafka.sensor.advisor.{SensorCreateAdvisor, SensorRecordAdvice} +import kanela.agent.api.instrumentation.InstrumentationBuilder + +class SensorInstrumentation extends InstrumentationBuilder { + + /*Instrument raw sensor factory to collect sensor name and tags. + * Necessary since not all (but most) sensors are created through `StreamsMetricsImpl.xxxLevelSensor` + **/ + onType("org.apache.kafka.common.metrics.Metrics") + .advise(method("sensor").and(takesArguments(5)), classOf[SensorCreateAdvisor]) //applies SensorName, scopedSensors will have this overriden by StreamsMetricsImpl + + + /*Wiretap sensor recordings and apply them to Kamon instruments. + * Additional metrics added to sensor are different measures over same data and can be extracted from Kamon histogram + * so there's no need for extra instruments here. Metrics might bring additional tags on top of sensor's own ones. + * */ + onType("org.apache.kafka.common.metrics.Sensor") + .mixin(classOf[SensorMixin]) + .advise(method("record").and(takesArguments(3)), classOf[SensorRecordAdvice]) +} diff --git a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/advisor/SensorAdvisors.scala b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/advisor/SensorAdvisors.scala new file mode 100644 index 000000000..72fcc0b15 --- /dev/null +++ b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/advisor/SensorAdvisors.scala @@ -0,0 +1,41 @@ +package kamon.instrumentation.kafka.sensor.advisor + +import kamon.instrumentation.kafka.sensor.SensorBridge +import SensorAdvisors.HasSensorBridge +import kanela.agent.libs.net.bytebuddy.asm.Advice +import org.apache.kafka.common.MetricName +import org.apache.kafka.common.metrics.Sensor.RecordingLevel +import org.apache.kafka.common.metrics.{KafkaMetric, MetricConfig, Sensor} + +object SensorAdvisors { + + trait HasSensorBridge { + def getMetric: SensorBridge + def setMetric(metric: SensorBridge):Unit + } + + class SensorMixin extends HasSensorBridge { + private var _metric: SensorBridge = _ + + override def getMetric: SensorBridge = _metric + override def setMetric(metric: SensorBridge): Unit = _metric = metric + } +} + + +class SensorRecordAdvice + +object SensorRecordAdvice { + @Advice.OnMethodEnter + def onSensorRecord( + @Advice.Argument(0) value: Double, + @Advice.This sensor: Sensor with HasSensorBridge, + @Advice.FieldValue("recordingLevel") sensorRecordingLevel: RecordingLevel, + @Advice.FieldValue("config") metricConfig: MetricConfig, + @Advice.FieldValue("metrics") metrics: java.util.Map[MetricName, KafkaMetric] + ): Unit = + //TODO think if we should keep this or record everything + if (sensorRecordingLevel.shouldRecord(metricConfig.recordLevel().id)) { + sensor.getMetric.record(value) + } +} diff --git a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/advisor/SensorCreateAdvisor.scala b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/advisor/SensorCreateAdvisor.scala new file mode 100644 index 000000000..9dbe2c4c2 --- /dev/null +++ b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/advisor/SensorCreateAdvisor.scala @@ -0,0 +1,28 @@ +package kamon.instrumentation.kafka.sensor.advisor + +import kamon.instrumentation.kafka.client.KafkaInstrumentation +import kamon.instrumentation.kafka.sensor.NoOpBridge +import kamon.instrumentation.kafka.sensor.advisor.SensorAdvisors.HasSensorBridge +import kamon.instrumentation.kafka.sensor.advisor.metrics.{GeneralPurposeMetric, MetricType, SensorBridgeCreator, TopicMetric} +import kanela.agent.libs.net.bytebuddy.asm.Advice +import org.apache.kafka.common.metrics.Sensor + + +class SensorCreateAdvisor + +object SensorCreateAdvisor { + + @Advice.OnMethodExit + def onCreatedSensor(@Advice.Argument(0) name: String, @Advice.Return sensor: Sensor with HasSensorBridge): Unit = { + + val bridge = if (KafkaInstrumentation.settings.enableSensorMetrics) { + MetricType(name) match { + case topicMetric: TopicMetric => SensorBridgeCreator.retrieveSpecializedSensorBridge(topicMetric) + case generalMetric: GeneralPurposeMetric => SensorBridgeCreator.retrieveSpecializedSensorBridge(generalMetric) + } + } else { + NoOpBridge + } + sensor.setMetric(bridge) + } +} diff --git a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/advisor/metrics/MetricType.scala b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/advisor/metrics/MetricType.scala new file mode 100644 index 000000000..56fad8105 --- /dev/null +++ b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/advisor/metrics/MetricType.scala @@ -0,0 +1,30 @@ +package kamon.instrumentation.kafka.sensor.advisor.metrics + +sealed trait MetricType + +case class TopicMetric(topic: String, metric: String) extends MetricType +case class GeneralPurposeMetric(name: String) extends MetricType + +object MetricType { + + def apply(metricName: String): MetricType = { + metricName match { + case name if isTopicMetric(name) => createTopicMetric(name) + case name => GeneralPurposeMetric(name) + } + } + + def isTopicMetric(metricName: String): Boolean = { + metricName.split("\\.") match { + case Array("topic", _, _) => true + case _ => false + } + } + + def createTopicMetric(metricName: String): MetricType = { + metricName.split("\\.") match { + case Array("topic", topicName, metricName) => TopicMetric(topicName, metricName) + case _ => GeneralPurposeMetric(metricName) + } + } +} diff --git a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/advisor/metrics/SensorBridgeCreator.scala b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/advisor/metrics/SensorBridgeCreator.scala new file mode 100644 index 000000000..bdc728528 --- /dev/null +++ b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/sensor/advisor/metrics/SensorBridgeCreator.scala @@ -0,0 +1,34 @@ +package kamon.instrumentation.kafka.sensor.advisor.metrics + +import kamon.Kamon +import kamon.instrumentation.kafka.sensor.{GaugeBridge, SensorBridge} +import kamon.metric.MeasurementUnit + +trait SensorBridgeCreator[T <: MetricType] { + def createSensorBridge(metricType: T): SensorBridge +} + +object SensorBridgeCreator { + + def retrieveSpecializedSensorBridge[T <: MetricType : SensorBridgeCreator](metric: T): SensorBridge = { + implicitly[SensorBridgeCreator[T]].createSensorBridge(metric) + } + + implicit val generalMetricSensorCreator: SensorBridgeCreator[GeneralPurposeMetric] = (metricType: GeneralPurposeMetric) => + GaugeBridge(Kamon.gauge(metricType.name).withoutTags()) + + implicit val topicMetricSensorCreator: SensorBridgeCreator[TopicMetric] = (metricType: TopicMetric) => + metricType.metric match { + // TODO Match more metrics by the name and create specialized Kamon Metrics + case "bytes" => GaugeBridge( + Kamon + .gauge("topic.bytes", "Number of bytes processed by the Kafka Client", MeasurementUnit.information.bytes) + .withTag("topicName", metricType.topic) + ) + case metricName => GaugeBridge( + Kamon + .gauge(s"topic.$metricName", "Topic related gauge") + .withTag("topicName", metricType.topic) + ) + } +} diff --git a/instrumentation/kamon-kafka/src/test/scala/kamon/instrumentation/kafka/sensor/SensorInstrumentationSpec.scala b/instrumentation/kamon-kafka/src/test/scala/kamon/instrumentation/kafka/sensor/SensorInstrumentationSpec.scala new file mode 100644 index 000000000..f0bce52db --- /dev/null +++ b/instrumentation/kamon-kafka/src/test/scala/kamon/instrumentation/kafka/sensor/SensorInstrumentationSpec.scala @@ -0,0 +1,37 @@ +package kamon.instrumentation.kafka.sensor + +import kamon.Kamon +import kamon.instrumentation.kafka.testutil.TestTopicScope +import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Matchers, WordSpec} + +class SensorInstrumentationSpec extends WordSpec + with Matchers + with BeforeAndAfter + with BeforeAndAfterAll + with EmbeddedKafka + with TestTopicScope { + + // increase zk connection timeout to avoid failing tests in "slow" environments + implicit val defaultConfig: EmbeddedKafkaConfig = + EmbeddedKafkaConfig.apply(customBrokerProperties = EmbeddedKafkaConfig.apply().customBrokerProperties + + ("zookeeper.connection.timeout.ms" -> "20000") + + ("auto.create.topics.enable" -> "false") + ) + + override protected def beforeAll(): Unit = EmbeddedKafka.start()(defaultConfig) + + override def afterAll(): Unit = EmbeddedKafka.stop() + + "The Kafka Sensor Instrumentation" should { + + "export gauge metric" in new TestTopicScope { + publishStringMessageToKafka(testTopicName, "message")(defaultConfig) + + Kamon.status().metricRegistry().metrics.size should be > 0 + Kamon.status().metricRegistry().metrics.exists(metric => metric.name.contains("topic")) should be(true) + // TODO remove after generating appropriate metrics for each sensor + Kamon.status().metricRegistry().metrics.map(_.name).foreach(println) + } + } +}