diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index c3d5a9a7..05481e95 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -477,6 +477,7 @@ func (cxn *brokerCxn) hookWriteE2E(key int16, bytesWritten int, writeWait, timeT WriteWait: writeWait, TimeToWrite: timeToWrite, WriteErr: writeErr, + ClientID: cxn.cl.clientIDString(), }) } }) @@ -1232,6 +1233,7 @@ func (cxn *brokerCxn) readResponse( ReadWait: readWait, TimeToRead: timeToRead, ReadErr: readErr, + ClientID: cxn.cl.clientIDString(), }) } }) diff --git a/pkg/kgo/hooks.go b/pkg/kgo/hooks.go index aeff4f19..5d52e329 100644 --- a/pkg/kgo/hooks.go +++ b/pkg/kgo/hooks.go @@ -134,6 +134,9 @@ type BrokerE2E struct { WriteErr error // ReadErr is any error encountered during reading. ReadErr error + + // ClientID is pointer to ID of the client that made the request + ClientID string } // DurationE2E returns the e2e time from the start of when a request is written @@ -239,6 +242,9 @@ type ProduceBatchMetrics struct { // 0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is // zstd. CompressionType uint8 + + // ClientID is pointer to ID of the client that made the request + ClientID string } // HookProduceBatchWritten is called whenever a batch is known to be @@ -288,6 +294,9 @@ type FetchBatchMetrics struct { // 0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is // zstd. CompressionType uint8 + + // ClientID is pointer to ID of the client that made the request + ClientID string } // HookFetchBatchRead is called whenever a batch if read within the client. @@ -418,3 +427,11 @@ func implementsAnyHook(h Hook) bool { } return false } + +func (cl *Client) clientIDString() string { + resolved := "kgo" + if cl.cfg.id != nil { + resolved = *cl.cfg.id + } + return resolved +} diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 6d0f3dfe..4516bc6f 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -1738,6 +1738,7 @@ func (p produceMetrics) hook(cfg *cfg, br *broker) { for _, h := range hooks { for topic, partitions := range p { for partition, metrics := range partitions { + metrics.ClientID = br.cl.clientIDString() h.OnProduceBatchWritten(br.meta, topic, partition, metrics) } } diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 0c475d14..a910f0a9 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1362,6 +1362,7 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon in = in[length:] var m FetchBatchMetrics + m.ClientID = br.cl.clientIDString() switch t := r.(type) { case *kmsg.MessageV0: diff --git a/plugin/kprom/config.go b/plugin/kprom/config.go index d907bebe..d517119a 100644 --- a/plugin/kprom/config.go +++ b/plugin/kprom/config.go @@ -96,9 +96,12 @@ func HandlerOpts(opts promhttp.HandlerOpts) Opt { return opt{func(c *cfg) { c.handlerOpts = opts }} } -// WithClientLabel adds a "cliend_id" label to all metrics. +// WithClientLabel adds a "client_id" label to all metrics. func WithClientLabel() Opt { - return opt{func(c *cfg) { c.withClientLabel = true }} + return opt{func(c *cfg) { + c.withClientLabel = true + c.fetchProduceOpts.labels = append(c.fetchProduceOpts.labels, "client_id") + }} } // Subsystem sets the subsystem for the kprom metrics, overriding the default @@ -182,6 +185,7 @@ type Detail uint8 const ( ByNode Detail = iota // Include label "node_id" for fetch and produce metrics. ByTopic // Include label "topic" for fetch and produce metrics. + ByClient // Include label "client_id" for fetch and produce metrics Batches // Report number of fetched and produced batches. Records // Report the number of fetched and produced records. CompressedBytes // Report the number of fetched and produced compressed bytes. @@ -211,6 +215,8 @@ func FetchAndProduceDetail(details ...Detail) Opt { labelsDeduped[ByTopic] = "topic" case ByNode: labelsDeduped[ByNode] = "node_id" + case ByClient: + labelsDeduped[ByClient] = "client_id" case Batches: c.fetchProduceOpts.batches = true case Records: diff --git a/plugin/kprom/kprom.go b/plugin/kprom/kprom.go index 737e4c74..370150e7 100644 --- a/plugin/kprom/kprom.go +++ b/plugin/kprom/kprom.go @@ -125,14 +125,21 @@ func (m *Metrics) Handler() http.Handler { // This method is meant to be called by the hook system and not by the user func (m *Metrics) OnNewClient(client *kgo.Client) { var ( - factory = promauto.With(m.cfg.reg) - namespace = m.cfg.namespace - subsystem = m.cfg.subsystem - constLabels prometheus.Labels + factory = promauto.With(m.cfg.reg) + namespace = m.cfg.namespace + subsystem = m.cfg.subsystem ) + var constLabels prometheus.Labels + dynamicLabels := []string{"node_id"} if m.cfg.withClientLabel { constLabels = make(prometheus.Labels) constLabels["client_id"] = client.OptValue(kgo.ClientID).(string) + dynamicLabels = append(dynamicLabels, "client_id") + } + + // Skip metrics definitions if hook was already called + if m.connConnectsTotal != nil { + return } // returns Hist buckets if set, otherwise defBucket @@ -146,129 +153,115 @@ func (m *Metrics) OnNewClient(client *kgo.Client) { // Connection m.connConnectsTotal = factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "connects_total", - Help: "Total number of connections opened", + Namespace: namespace, + Subsystem: subsystem, + Name: "connects_total", + Help: "Total number of connections opened", }, []string{"node_id"}) m.connConnectErrorsTotal = factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "connect_errors_total", - Help: "Total number of connection errors", + Namespace: namespace, + Subsystem: subsystem, + Name: "connect_errors_total", + Help: "Total number of connection errors", }, []string{"node_id"}) m.connDisconnectsTotal = factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "disconnects_total", - Help: "Total number of connections closed", + Namespace: namespace, + Subsystem: subsystem, + Name: "disconnects_total", + Help: "Total number of connections closed", }, []string{"node_id"}) // Write m.writeBytesTotal = factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "write_bytes_total", - Help: "Total number of bytes written to the TCP connection. The bytes count is tracked after compression (when used).", - }, []string{"node_id"}) + Namespace: namespace, + Subsystem: subsystem, + Name: "write_bytes_total", + Help: "Total number of bytes written", + }, dynamicLabels) m.writeErrorsTotal = factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "write_errors_total", - Help: "Total number of write errors", - }, []string{"node_id"}) + Namespace: namespace, + Subsystem: subsystem, + Name: "write_errors_total", + Help: "Total number of write errors", + }, dynamicLabels) m.writeWaitSeconds = factory.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "write_wait_seconds", - Help: "Time spent waiting to write to Kafka", - Buckets: getHistogramBuckets(WriteWait), - }, []string{"node_id"}) + Namespace: namespace, + Subsystem: subsystem, + Name: "write_wait_seconds", + Help: "Time spent waiting to write to Kafka", + Buckets: getHistogramBuckets(WriteWait), + }, dynamicLabels) m.writeTimeSeconds = factory.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "write_time_seconds", - Help: "Time spent writing to Kafka", - Buckets: getHistogramBuckets(WriteTime), - }, []string{"node_id"}) + Namespace: namespace, + Subsystem: subsystem, + Name: "write_time_seconds", + Help: "Time spent writing to Kafka", + Buckets: getHistogramBuckets(WriteTime), + }, dynamicLabels) // Read m.readBytesTotal = factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "read_bytes_total", - Help: "Total number of bytes read from the TCP connection. The bytes count is tracked before uncompression (when used).", - }, []string{"node_id"}) + Namespace: namespace, + Subsystem: subsystem, + Name: "read_bytes_total", + Help: "Total number of bytes read", + }, dynamicLabels) m.readErrorsTotal = factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "read_errors_total", - Help: "Total number of read errors", - }, []string{"node_id"}) + Namespace: namespace, + Subsystem: subsystem, + Name: "read_errors_total", + Help: "Total number of read errors", + }, dynamicLabels) m.readWaitSeconds = factory.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "read_wait_seconds", - Help: "Time spent waiting to read from Kafka", - Buckets: getHistogramBuckets(ReadWait), - }, []string{"node_id"}) + Namespace: namespace, + Subsystem: subsystem, + Name: "read_wait_seconds", + Help: "Time spent waiting to read from Kafka", + Buckets: getHistogramBuckets(ReadWait), + }, dynamicLabels) m.readTimeSeconds = factory.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "read_time_seconds", - Help: "Time spent reading from Kafka", - Buckets: getHistogramBuckets(ReadTime), - }, []string{"node_id"}) + Namespace: namespace, + Subsystem: subsystem, + Name: "read_time_seconds", + Help: "Time spent reading from Kafka", + Buckets: getHistogramBuckets(ReadTime), + }, dynamicLabels) // Request E2E duration & Throttle m.requestDurationE2ESeconds = factory.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "request_duration_e2e_seconds", - Help: "Time from the start of when a request is written to the end of when the response for that request was fully read", - Buckets: getHistogramBuckets(RequestDurationE2E), - }, []string{"node_id"}) + Namespace: namespace, + Subsystem: subsystem, + Name: "request_duration_e2e_seconds", + Help: "Time from the start of when a request is written to the end of when the response for that request was fully read", + Buckets: getHistogramBuckets(RequestDurationE2E), + }, dynamicLabels) m.requestThrottledSeconds = factory.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "request_throttled_seconds", - Help: "Time the request was throttled", - Buckets: getHistogramBuckets(RequestThrottled), - }, []string{"node_id"}) + Namespace: namespace, + Subsystem: subsystem, + Name: "request_throttled_seconds", + Help: "Time the request was throttled", + Buckets: getHistogramBuckets(RequestThrottled), + }, dynamicLabels) // Produce m.produceCompressedBytes = factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "produce_compressed_bytes_total", - Help: "Total number of compressed bytes produced", + Namespace: namespace, + Subsystem: subsystem, + Name: "produce_compressed_bytes_total", + Help: "Total number of compressed bytes produced", }, m.cfg.fetchProduceOpts.labels) produceUncompressedBytesName := "produce_bytes_total" @@ -276,37 +269,33 @@ func (m *Metrics) OnNewClient(client *kgo.Client) { produceUncompressedBytesName = "produce_uncompressed_bytes_total" } m.produceUncompressedBytes = factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: produceUncompressedBytesName, - Help: "Total number of uncompressed bytes produced", + Namespace: namespace, + Subsystem: subsystem, + Name: produceUncompressedBytesName, + Help: "Total number of uncompressed bytes produced", }, m.cfg.fetchProduceOpts.labels) m.produceBatchesTotal = factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "produce_batches_total", - Help: "Total number of batches produced", + Namespace: namespace, + Subsystem: subsystem, + Name: "produce_batches_total", + Help: "Total number of batches produced", }, m.cfg.fetchProduceOpts.labels) m.produceRecordsTotal = factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "produce_records_total", - Help: "Total number of records produced", + Namespace: namespace, + Subsystem: subsystem, + Name: "produce_records_total", + Help: "Total number of records produced", }, m.cfg.fetchProduceOpts.labels) // Fetch m.fetchCompressedBytes = factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "fetch_compressed_bytes_total", - Help: "Total number of compressed bytes fetched", + Namespace: namespace, + Subsystem: subsystem, + Name: "fetch_compressed_bytes_total", + Help: "Total number of compressed bytes fetched", }, m.cfg.fetchProduceOpts.labels) fetchUncompressedBytesName := "fetch_bytes_total" @@ -314,27 +303,24 @@ func (m *Metrics) OnNewClient(client *kgo.Client) { fetchUncompressedBytesName = "fetch_uncompressed_bytes_total" } m.fetchUncompressedBytes = factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: fetchUncompressedBytesName, - Help: "Total number of uncompressed bytes fetched", + Namespace: namespace, + Subsystem: subsystem, + Name: fetchUncompressedBytesName, + Help: "Total number of uncompressed bytes fetched", }, m.cfg.fetchProduceOpts.labels) m.fetchBatchesTotal = factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "fetch_batches_total", - Help: "Total number of batches fetched", + Namespace: namespace, + Subsystem: subsystem, + Name: "fetch_batches_total", + Help: "Total number of batches fetched", }, m.cfg.fetchProduceOpts.labels) m.fetchRecordsTotal = factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - ConstLabels: constLabels, - Name: "fetch_records_total", - Help: "Total number of records fetched", + Namespace: namespace, + Subsystem: subsystem, + Name: "fetch_records_total", + Help: "Total number of records fetched", }, m.cfg.fetchProduceOpts.labels) // Buffers @@ -423,7 +409,7 @@ func (m *Metrics) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval tim // metrics gathering. // This method is meant to be called by the hook system and not by the user func (m *Metrics) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, metrics kgo.ProduceBatchMetrics) { - labels := m.fetchProducerLabels(kgo.NodeName(meta.NodeID), topic) + labels := m.fetchProducerLabels(kgo.NodeName(meta.NodeID), topic, metrics.ClientID) if m.cfg.fetchProduceOpts.uncompressedBytes { m.produceUncompressedBytes.With(labels).Add(float64(metrics.UncompressedBytes)) } @@ -442,7 +428,7 @@ func (m *Metrics) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ // gathering. // This method is meant to be called by the hook system and not by the user func (m *Metrics) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, metrics kgo.FetchBatchMetrics) { - labels := m.fetchProducerLabels(kgo.NodeName(meta.NodeID), topic) + labels := m.fetchProducerLabels(kgo.NodeName(meta.NodeID), topic, metrics.ClientID) if m.cfg.fetchProduceOpts.uncompressedBytes { m.fetchUncompressedBytes.With(labels).Add(float64(metrics.UncompressedBytes)) } @@ -469,41 +455,47 @@ func (m *Metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten i // This method is meant to be called by the hook system and not by the user func (m *Metrics) OnBrokerE2E(meta kgo.BrokerMetadata, _ int16, e2e kgo.BrokerE2E) { nodeId := kgo.NodeName(meta.NodeID) + labelVals := []string{nodeId} + if m.cfg.withClientLabel { + labelVals = append(labelVals, e2e.ClientID) + } if e2e.WriteErr != nil { - m.writeErrorsTotal.WithLabelValues(nodeId).Inc() + m.writeErrorsTotal.WithLabelValues(labelVals...).Inc() return } - m.writeBytesTotal.WithLabelValues(nodeId).Add(float64(e2e.BytesWritten)) + m.writeBytesTotal.WithLabelValues(labelVals...).Add(float64(e2e.BytesWritten)) if _, ok := m.cfg.histograms[WriteWait]; ok { - m.writeWaitSeconds.WithLabelValues(nodeId).Observe(e2e.WriteWait.Seconds()) + m.writeWaitSeconds.WithLabelValues(labelVals...).Observe(e2e.WriteWait.Seconds()) } if _, ok := m.cfg.histograms[WriteTime]; ok { - m.writeTimeSeconds.WithLabelValues(nodeId).Observe(e2e.TimeToWrite.Seconds()) + m.writeTimeSeconds.WithLabelValues(labelVals...).Observe(e2e.TimeToWrite.Seconds()) } if e2e.ReadErr != nil { - m.readErrorsTotal.WithLabelValues(nodeId).Inc() + m.readErrorsTotal.WithLabelValues(labelVals...).Inc() return } - m.readBytesTotal.WithLabelValues(nodeId).Add(float64(e2e.BytesRead)) + m.readBytesTotal.WithLabelValues(labelVals...).Add(float64(e2e.BytesRead)) if _, ok := m.cfg.histograms[ReadWait]; ok { - m.readWaitSeconds.WithLabelValues(nodeId).Observe(e2e.ReadWait.Seconds()) + m.readWaitSeconds.WithLabelValues(labelVals...).Observe(e2e.ReadWait.Seconds()) } if _, ok := m.cfg.histograms[ReadTime]; ok { - m.readTimeSeconds.WithLabelValues(nodeId).Observe(e2e.TimeToRead.Seconds()) + m.readTimeSeconds.WithLabelValues(labelVals...).Observe(e2e.TimeToRead.Seconds()) } if _, ok := m.cfg.histograms[RequestDurationE2E]; ok { - m.requestDurationE2ESeconds.WithLabelValues(nodeId).Observe(e2e.DurationE2E().Seconds()) + m.requestDurationE2ESeconds.WithLabelValues(labelVals...).Observe(e2e.DurationE2E().Seconds()) } } -func (m *Metrics) fetchProducerLabels(nodeId, topic string) prometheus.Labels { - labels := make(prometheus.Labels, 2) +func (m *Metrics) fetchProducerLabels(nodeId, topic, clientID string) prometheus.Labels { + labels := make(prometheus.Labels, 3) for _, l := range m.cfg.fetchProduceOpts.labels { switch l { case "topic": labels[l] = topic case "node_id": labels[l] = nodeId + case "client_id": + labels[l] = clientID } } return labels