Skip to content

Commit

Permalink
Merge branch 'main' into idp-user-cache
Browse files Browse the repository at this point in the history
  • Loading branch information
bcmmbaga committed Sep 26, 2023
2 parents 1c4e6b2 + 1324169 commit baf2546
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
5 changes: 5 additions & 0 deletions management/server/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
select {
// condition when there are some updates
case update, open := <-updates:

if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().UpdateChannelQueueLength(len(updates) + 1)
}

if !open {
log.Debugf("updates channel for peer %s was closed", peerKey.String())
s.cancelPeerRoutines(peer)
Expand Down
19 changes: 19 additions & 0 deletions management/server/telemetry/grpc_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type GRPCMetrics struct {
activeStreamsGauge asyncint64.Gauge
syncRequestDuration syncint64.Histogram
loginRequestDuration syncint64.Histogram
channelQueueLength syncint64.Histogram
ctx context.Context
}

Expand Down Expand Up @@ -52,6 +53,18 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
return nil, err
}

// We use histogram here as we have multiple channel at the same time and we want to see a slice at any given time
// Then we should be able to extract min, manx, mean and the percentiles.
// TODO(yury): This needs custom bucketing as we are interested in the values from 0 to server.channelBufferSize (100)
channelQueue, err := meter.SyncInt64().Histogram(
"management.grpc.updatechannel.queue",
instrument.WithDescription("Number of update messages in the channel queue"),
instrument.WithUnit("length"),
)
if err != nil {
return nil, err
}

return &GRPCMetrics{
meter: meter,
syncRequestsCounter: syncRequestsCounter,
Expand All @@ -60,6 +73,7 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
activeStreamsGauge: activeStreamsGauge,
syncRequestDuration: syncRequestDuration,
loginRequestDuration: loginRequestDuration,
channelQueueLength: channelQueue,
ctx: ctx,
}, err
}
Expand Down Expand Up @@ -100,3 +114,8 @@ func (grpcMetrics *GRPCMetrics) RegisterConnectedStreams(producer func() int64)
},
)
}

// UpdateChannelQueueLength update the histogram that keep distribution of the update messages channel queue
func (metrics *GRPCMetrics) UpdateChannelQueueLength(len int) {
metrics.channelQueueLength.Record(metrics.ctx, int64(len))
}

0 comments on commit baf2546

Please sign in to comment.