Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Kafka Sensor advisors #820

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ kamon.instrumentation.kafka {
use-delayed-spans = no
}
}

sensor.metrics {
enable = true
}
}

kanela.modules {
Expand All @@ -25,11 +29,13 @@ kanela.modules {
description = "Provides distributed context propagation for the Apache Kafka Producer and Consumer"
instrumentations = [
"kamon.instrumentation.kafka.client.ProducerInstrumentation",
"kamon.instrumentation.kafka.client.ConsumerInstrumentation"
"kamon.instrumentation.kafka.client.ConsumerInstrumentation",
"kamon.instrumentation.kafka.sensor.SensorInstrumentation"
]

within = [
"org.apache.kafka.clients..*",
"org.apache.kafka.common..*",
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ object KafkaInstrumentation {

private def readSettings(config: Config): Settings = {
val kafkaConfig = config.getConfig("kamon.instrumentation.kafka.client")
val sensorConfig = config.getConfig("kamon.instrumentation.kafka.sensor")

Settings(
continueTraceOnConsumer = kafkaConfig.getBoolean("tracing.continue-trace-on-consumer"),
useDelayedSpans = kafkaConfig.getBoolean("tracing.use-delayed-spans")
useDelayedSpans = kafkaConfig.getBoolean("tracing.use-delayed-spans"),
enableSensorMetrics = sensorConfig.getBoolean("metrics.enable")
)
}

Expand Down Expand Up @@ -182,6 +184,7 @@ object KafkaInstrumentation {

case class Settings (
continueTraceOnConsumer: Boolean,
useDelayedSpans: Boolean
useDelayedSpans: Boolean,
enableSensorMetrics: Boolean
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package kamon.instrumentation.kafka.sensor

import kamon.metric.Gauge

sealed trait SensorBridge {
def record(value: Double): Unit
}

case class GaugeBridge(gauge: Gauge) extends SensorBridge {
override def record(value: Double): Unit = gauge.update(value)
}

case object NoOpBridge extends SensorBridge {
override def record(value: Double): Unit = {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package kamon.instrumentation.kafka.sensor

import kamon.instrumentation.kafka.sensor.advisor.SensorAdvisors.SensorMixin
import kamon.instrumentation.kafka.sensor.advisor.{SensorCreateAdvisor, SensorRecordAdvice}
import kanela.agent.api.instrumentation.InstrumentationBuilder

class SensorInstrumentation extends InstrumentationBuilder {

/*Instrument raw sensor factory to collect sensor name and tags.
* Necessary since not all (but most) sensors are created through `StreamsMetricsImpl.xxxLevelSensor`
**/
onType("org.apache.kafka.common.metrics.Metrics")
.advise(method("sensor").and(takesArguments(5)), classOf[SensorCreateAdvisor]) //applies SensorName, scopedSensors will have this overriden by StreamsMetricsImpl


/*Wiretap sensor recordings and apply them to Kamon instruments.
* Additional metrics added to sensor are different measures over same data and can be extracted from Kamon histogram
* so there's no need for extra instruments here. Metrics might bring additional tags on top of sensor's own ones.
* */
onType("org.apache.kafka.common.metrics.Sensor")
.mixin(classOf[SensorMixin])
.advise(method("record").and(takesArguments(3)), classOf[SensorRecordAdvice])
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package kamon.instrumentation.kafka.sensor.advisor

import kamon.instrumentation.kafka.sensor.SensorBridge
import SensorAdvisors.HasSensorBridge
import kanela.agent.libs.net.bytebuddy.asm.Advice
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.metrics.Sensor.RecordingLevel
import org.apache.kafka.common.metrics.{KafkaMetric, MetricConfig, Sensor}

object SensorAdvisors {

trait HasSensorBridge {
def getMetric: SensorBridge
def setMetric(metric: SensorBridge):Unit
}

class SensorMixin extends HasSensorBridge {
private var _metric: SensorBridge = _

override def getMetric: SensorBridge = _metric
override def setMetric(metric: SensorBridge): Unit = _metric = metric
}
}


class SensorRecordAdvice

object SensorRecordAdvice {
@Advice.OnMethodEnter
def onSensorRecord(
@Advice.Argument(0) value: Double,
@Advice.This sensor: Sensor with HasSensorBridge,
@Advice.FieldValue("recordingLevel") sensorRecordingLevel: RecordingLevel,
@Advice.FieldValue("config") metricConfig: MetricConfig,
@Advice.FieldValue("metrics") metrics: java.util.Map[MetricName, KafkaMetric]
): Unit =
//TODO think if we should keep this or record everything
if (sensorRecordingLevel.shouldRecord(metricConfig.recordLevel().id)) {
sensor.getMetric.record(value)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package kamon.instrumentation.kafka.sensor.advisor

import kamon.instrumentation.kafka.client.KafkaInstrumentation
import kamon.instrumentation.kafka.sensor.NoOpBridge
import kamon.instrumentation.kafka.sensor.advisor.SensorAdvisors.HasSensorBridge
import kamon.instrumentation.kafka.sensor.advisor.metrics.{GeneralPurposeMetric, MetricType, SensorBridgeCreator, TopicMetric}
import kanela.agent.libs.net.bytebuddy.asm.Advice
import org.apache.kafka.common.metrics.Sensor


class SensorCreateAdvisor

object SensorCreateAdvisor {

@Advice.OnMethodExit
def onCreatedSensor(@Advice.Argument(0) name: String, @Advice.Return sensor: Sensor with HasSensorBridge): Unit = {

val bridge = if (KafkaInstrumentation.settings.enableSensorMetrics) {
MetricType(name) match {
case topicMetric: TopicMetric => SensorBridgeCreator.retrieveSpecializedSensorBridge(topicMetric)
case generalMetric: GeneralPurposeMetric => SensorBridgeCreator.retrieveSpecializedSensorBridge(generalMetric)
}
} else {
NoOpBridge
}
sensor.setMetric(bridge)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package kamon.instrumentation.kafka.sensor.advisor.metrics

sealed trait MetricType

case class TopicMetric(topic: String, metric: String) extends MetricType
case class GeneralPurposeMetric(name: String) extends MetricType

object MetricType {

def apply(metricName: String): MetricType = {
metricName match {
case name if isTopicMetric(name) => createTopicMetric(name)
case name => GeneralPurposeMetric(name)
}
}

def isTopicMetric(metricName: String): Boolean = {
metricName.split("\\.") match {
case Array("topic", _, _) => true
case _ => false
}
}

def createTopicMetric(metricName: String): MetricType = {
metricName.split("\\.") match {
case Array("topic", topicName, metricName) => TopicMetric(topicName, metricName)
case _ => GeneralPurposeMetric(metricName)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package kamon.instrumentation.kafka.sensor.advisor.metrics

import kamon.Kamon
import kamon.instrumentation.kafka.sensor.{GaugeBridge, SensorBridge}
import kamon.metric.MeasurementUnit

trait SensorBridgeCreator[T <: MetricType] {
def createSensorBridge(metricType: T): SensorBridge
}

object SensorBridgeCreator {

def retrieveSpecializedSensorBridge[T <: MetricType : SensorBridgeCreator](metric: T): SensorBridge = {
implicitly[SensorBridgeCreator[T]].createSensorBridge(metric)
}

implicit val generalMetricSensorCreator: SensorBridgeCreator[GeneralPurposeMetric] = (metricType: GeneralPurposeMetric) =>
GaugeBridge(Kamon.gauge(metricType.name).withoutTags())

implicit val topicMetricSensorCreator: SensorBridgeCreator[TopicMetric] = (metricType: TopicMetric) =>
metricType.metric match {
// TODO Match more metrics by the name and create specialized Kamon Metrics
case "bytes" => GaugeBridge(
Kamon
.gauge("topic.bytes", "Number of bytes processed by the Kafka Client", MeasurementUnit.information.bytes)
.withTag("topicName", metricType.topic)
)
case metricName => GaugeBridge(
Kamon
.gauge(s"topic.$metricName", "Topic related gauge")
.withTag("topicName", metricType.topic)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package kamon.instrumentation.kafka.sensor

import kamon.Kamon
import kamon.instrumentation.kafka.testutil.TestTopicScope
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Matchers, WordSpec}

class SensorInstrumentationSpec extends WordSpec
with Matchers
with BeforeAndAfter
with BeforeAndAfterAll
with EmbeddedKafka
with TestTopicScope {

// increase zk connection timeout to avoid failing tests in "slow" environments
implicit val defaultConfig: EmbeddedKafkaConfig =
EmbeddedKafkaConfig.apply(customBrokerProperties = EmbeddedKafkaConfig.apply().customBrokerProperties
+ ("zookeeper.connection.timeout.ms" -> "20000")
+ ("auto.create.topics.enable" -> "false")
)

override protected def beforeAll(): Unit = EmbeddedKafka.start()(defaultConfig)

override def afterAll(): Unit = EmbeddedKafka.stop()

"The Kafka Sensor Instrumentation" should {

"export gauge metric" in new TestTopicScope {
publishStringMessageToKafka(testTopicName, "message")(defaultConfig)

Kamon.status().metricRegistry().metrics.size should be > 0
Kamon.status().metricRegistry().metrics.exists(metric => metric.name.contains("topic")) should be(true)
// TODO remove after generating appropriate metrics for each sensor
Kamon.status().metricRegistry().metrics.map(_.name).foreach(println)
}
}
}