Skip to content

Commit

Permalink
chore(syncer/metrics): add metrics for syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Dec 13, 2023
1 parent 28ff21c commit 0b7da9e
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 5 deletions.
91 changes: 86 additions & 5 deletions sync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sync
import (
"context"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
Expand All @@ -11,8 +12,19 @@ import (
var meter = otel.Meter("header/sync")

type metrics struct {
ctx context.Context

totalSynced atomic.Int64
totalSyncedGauge metric.Float64ObservableGauge

syncLoopStarted metric.Int64Counter
trustedPeersOutOfSync metric.Int64Counter
laggingHeadersStart metric.Int64Counter

subjectiveHead atomic.Int64
headerTimestamp metric.Float64Histogram

headerReceived time.Time
}

func newMetrics() (*metrics, error) {
Expand All @@ -24,27 +36,96 @@ func newMetrics() (*metrics, error) {
return nil, err
}

syncLoopStarted, err := meter.Int64Counter("sync_loop_started", metric.WithDescription("sync loop started"))
if err != nil {
return nil, err
}

trustedPeersOutOfSync, err := meter.Int64Counter("tr_peers_out_of_sync", metric.WithDescription("trusted peers out of sync"))
if err != nil {
return nil, err
}

laggingHeadersStart, err := meter.Int64Counter("sync_lagging_hdr_start", metric.WithDescription("lagging header start"))
if err != nil {
return nil, err
}

subjectiveHead, err := meter.Int64ObservableGauge("sync_subjective_head", metric.WithDescription("subjective head height"))
if err != nil {
return nil, err
}

headerTimestamp, err := meter.Float64Histogram("sync_subjective_head_ts",
metric.WithDescription("subjective_head_timestamp"))
if err != nil {
return nil, err
}

m := &metrics{
totalSyncedGauge: totalSynced,
ctx: context.Background(),
totalSyncedGauge: totalSynced,
syncLoopStarted: syncLoopStarted,
trustedPeersOutOfSync: trustedPeersOutOfSync,
laggingHeadersStart: laggingHeadersStart,
headerTimestamp: headerTimestamp,
}

callback := func(ctx context.Context, observer metric.Observer) error {
observer.ObserveFloat64(totalSynced, float64(m.totalSynced.Load()))
observer.ObserveInt64(subjectiveHead, m.subjectiveHead.Load())
return nil
}
_, err = meter.RegisterCallback(callback, totalSynced)

_, err = meter.RegisterCallback(callback, totalSynced, subjectiveHead)
if err != nil {
return nil, err
}

return m, nil
}

// recordTotalSynced records the total amount of synced headers.
func (m *metrics) recordTotalSynced(totalSynced int) {
if m == nil {
return
}

m.totalSynced.Add(int64(totalSynced))
}

func (m *metrics) recordSyncLoopStarted() {
if m == nil {
return
}
m.syncLoopStarted.Add(m.ctx, 1)
}

func (m *metrics) recordTrustedPeersOutOfSync() {
if m == nil {
return
}
m.trustedPeersOutOfSync.Add(m.ctx, 1)
}

func (m *metrics) observeNewHead(height int64) {
if m == nil {
return
}
m.subjectiveHead.Store(height)
}

func (m *metrics) observeLaggingHeader(threshold time.Duration, receivedAt time.Time) {
if m == nil {
return
}
if !m.headerReceived.IsZero() &&
float64(receivedAt.Second()-m.headerReceived.Second()) > threshold.Seconds() {
m.laggingHeadersStart.Add(m.ctx, 1)
}
m.headerReceived = receivedAt
}

func (m *metrics) observeHeaderTimestamp(timestamp time.Time) {
if m == nil {
return
}
m.headerTimestamp.Record(m.ctx, float64(timestamp.Second()))
}
1 change: 1 addition & 0 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (s *Syncer[H]) syncLoop() {
for {
select {
case <-s.triggerSync:
s.metrics.recordSyncLoopStarted()
s.sync(s.ctx)
case <-s.ctx.Done():
return
Expand Down
3 changes: 3 additions & 0 deletions sync/sync_head.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) {
log.Warnw("subjective initialization with an old header", "height", trustHead.Height())
}
log.Warn("trusted peer is out of sync")
s.metrics.recordTrustedPeersOutOfSync()
return trustHead, nil
}

Expand Down Expand Up @@ -130,6 +131,8 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) {
s.pending.Add(netHead)
s.wantSync()
log.Infow("new network head", "height", netHead.Height(), "hash", netHead.Hash())
s.metrics.observeNewHead(int64(netHead.Height()))
s.metrics.observeLaggingHeader(s.Params.blockTime, time.Now())
}

// incomingNetworkHead processes new potential network headers.
Expand Down

0 comments on commit 0b7da9e

Please sign in to comment.