Skip to content

Commit

Permalink
Merge pull request #1 from lukasrynt/kprom/dynamic-client-id-label
Browse files Browse the repository at this point in the history
kprom: Multiple clients
  • Loading branch information
lukasrynt authored Sep 17, 2024
2 parents b77dd13 + 0ea78bc commit 7f8d2d4
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 140 deletions.
2 changes: 2 additions & 0 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ func (cxn *brokerCxn) hookWriteE2E(key int16, bytesWritten int, writeWait, timeT
WriteWait: writeWait,
TimeToWrite: timeToWrite,
WriteErr: writeErr,
ClientID: cxn.cl.clientIDString(),
})
}
})
Expand Down Expand Up @@ -1232,6 +1233,7 @@ func (cxn *brokerCxn) readResponse(
ReadWait: readWait,
TimeToRead: timeToRead,
ReadErr: readErr,
ClientID: cxn.cl.clientIDString(),
})
}
})
Expand Down
17 changes: 17 additions & 0 deletions pkg/kgo/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ type BrokerE2E struct {
WriteErr error
// ReadErr is any error encountered during reading.
ReadErr error

// ClientID is pointer to ID of the client that made the request
ClientID string
}

// DurationE2E returns the e2e time from the start of when a request is written
Expand Down Expand Up @@ -239,6 +242,9 @@ type ProduceBatchMetrics struct {
// 0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is
// zstd.
CompressionType uint8

// ClientID is pointer to ID of the client that made the request
ClientID string
}

// HookProduceBatchWritten is called whenever a batch is known to be
Expand Down Expand Up @@ -288,6 +294,9 @@ type FetchBatchMetrics struct {
// 0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is
// zstd.
CompressionType uint8

// ClientID is pointer to ID of the client that made the request
ClientID string
}

// HookFetchBatchRead is called whenever a batch if read within the client.
Expand Down Expand Up @@ -418,3 +427,11 @@ func implementsAnyHook(h Hook) bool {
}
return false
}

func (cl *Client) clientIDString() string {
resolved := "kgo"
if cl.cfg.id != nil {
resolved = *cl.cfg.id
}
return resolved
}
1 change: 1 addition & 0 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1738,6 +1738,7 @@ func (p produceMetrics) hook(cfg *cfg, br *broker) {
for _, h := range hooks {
for topic, partitions := range p {
for partition, metrics := range partitions {
metrics.ClientID = br.cl.clientIDString()
h.OnProduceBatchWritten(br.meta, topic, partition, metrics)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,7 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon
in = in[length:]

var m FetchBatchMetrics
m.ClientID = br.cl.clientIDString()

switch t := r.(type) {
case *kmsg.MessageV0:
Expand Down
10 changes: 8 additions & 2 deletions plugin/kprom/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,12 @@ func HandlerOpts(opts promhttp.HandlerOpts) Opt {
return opt{func(c *cfg) { c.handlerOpts = opts }}
}

// WithClientLabel adds a "cliend_id" label to all metrics.
// WithClientLabel adds a "client_id" label to all metrics.
func WithClientLabel() Opt {
return opt{func(c *cfg) { c.withClientLabel = true }}
return opt{func(c *cfg) {
c.withClientLabel = true
c.fetchProduceOpts.labels = append(c.fetchProduceOpts.labels, "client_id")
}}
}

// Subsystem sets the subsystem for the kprom metrics, overriding the default
Expand Down Expand Up @@ -182,6 +185,7 @@ type Detail uint8
const (
ByNode Detail = iota // Include label "node_id" for fetch and produce metrics.
ByTopic // Include label "topic" for fetch and produce metrics.
ByClient // Include label "client_id" for fetch and produce metrics
Batches // Report number of fetched and produced batches.
Records // Report the number of fetched and produced records.
CompressedBytes // Report the number of fetched and produced compressed bytes.
Expand Down Expand Up @@ -211,6 +215,8 @@ func FetchAndProduceDetail(details ...Detail) Opt {
labelsDeduped[ByTopic] = "topic"
case ByNode:
labelsDeduped[ByNode] = "node_id"
case ByClient:
labelsDeduped[ByClient] = "client_id"
case Batches:
c.fetchProduceOpts.batches = true
case Records:
Expand Down
Loading

0 comments on commit 7f8d2d4

Please sign in to comment.