Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(syncer/metrics): add metrics to syncer #136

Merged
merged 21 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 182 additions & 14 deletions sync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,216 @@ package sync
import (
"context"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var meter = otel.Meter("header/sync")

type metrics struct {
totalSynced atomic.Int64
totalSyncedGauge metric.Float64ObservableGauge
syncerReg metric.Registration

subjectiveHeadInst metric.Int64ObservableGauge
syncLoopRunningInst metric.Int64ObservableGauge

syncLoopStarted metric.Int64Counter
renaynay marked this conversation as resolved.
Show resolved Hide resolved
trustedPeersOutOfSync metric.Int64Counter
unrecentHeader metric.Int64Counter
subjectiveInit metric.Int64Counter

subjectiveHead atomic.Int64

syncLoopDurationHist metric.Float64Histogram
syncLoopActive atomic.Int64
syncStartedTs time.Time

requestRangeTimeHist metric.Float64Histogram
requestRangeStartTs time.Time

blockTime metric.Float64Histogram
prevHeader time.Time
}

func newMetrics() (*metrics, error) {
totalSynced, err := meter.Float64ObservableGauge(
"total_synced_headers",
metric.WithDescription("total synced headers"),
syncLoopStarted, err := meter.Int64Counter(
"hdr_sync_loop_started_counter",
metric.WithDescription("sync loop started shows that syncing is in progress"),
)
if err != nil {
return nil, err
}

m := &metrics{
totalSyncedGauge: totalSynced,
trustedPeersOutOfSync, err := meter.Int64Counter(
"hdr_sync_trust_peers_out_of_sync_counter",
metric.WithDescription("trusted peers out of sync and gave outdated header"),
)
if err != nil {
return nil, err
}

callback := func(ctx context.Context, observer metric.Observer) error {
observer.ObserveFloat64(totalSynced, float64(m.totalSynced.Load()))
return nil
unrecentHeader, err := meter.Int64Counter(
"hdr_sync_unrecent_header_counter",
metric.WithDescription("tracks every time Syncer returns an unrecent header"),
)
if err != nil {
return nil, err
}
_, err = meter.RegisterCallback(callback, totalSynced)

subjectiveInit, err := meter.Int64Counter(
"hdr_sync_subjective_init_counter",
metric.WithDescription(
"tracks how many times is the node initialized ",
),
)
if err != nil {
return nil, err
}

subjectiveHead, err := meter.Int64ObservableGauge(
"hdr_sync_subjective_head_gauge",
metric.WithDescription("subjective head height"),
)
if err != nil {
return nil, err
}

syncLoopDurationHist, err := meter.Float64Histogram(
"hdr_sync_loop_time_hist",
metric.WithDescription("tracks the duration of syncing"))
if err != nil {
return nil, err
}

requestRangeTimeHist, err := meter.Float64Histogram("hdr_sync_range_request_time_hist",
metric.WithDescription("tracks the duration of GetRangeByHeight requests"))
if err != nil {
return nil, err
}

syncLoopRunningInst, err := meter.Int64ObservableGauge(
"hdr_sync_loop_status_gauge",
metric.WithDescription("reports whether syncing is active or not"))
if err != nil {
return nil, err
}

blockTime, err := meter.Float64Histogram(
"hdr_sync_actual_blockTime_ts_hist",
metric.WithDescription("duration between creation of 2 blocks"),
)
if err != nil {
return nil, err
}

m := &metrics{
syncLoopStarted: syncLoopStarted,
trustedPeersOutOfSync: trustedPeersOutOfSync,
unrecentHeader: unrecentHeader,
subjectiveInit: subjectiveInit,
syncLoopDurationHist: syncLoopDurationHist,
syncLoopRunningInst: syncLoopRunningInst,
requestRangeTimeHist: requestRangeTimeHist,
blockTime: blockTime,
subjectiveHeadInst: subjectiveHead,
}

m.syncerReg, err = meter.RegisterCallback(m.observeMetrics, m.subjectiveHeadInst, m.syncLoopRunningInst)
if err != nil {
return nil, err
}

return m, nil
}

// recordTotalSynced records the total amount of synced headers.
func (m *metrics) recordTotalSynced(totalSynced int) {
func (m *metrics) observeMetrics(_ context.Context, obs metric.Observer) error {
obs.ObserveInt64(m.subjectiveHeadInst, m.subjectiveHead.Load())
obs.ObserveInt64(m.syncLoopRunningInst, m.syncLoopActive.Load())
return nil
}

func (m *metrics) syncStarted(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.syncStartedTs = time.Now()
m.syncLoopStarted.Add(ctx, 1)
m.syncLoopActive.Store(1)
})
}

func (m *metrics) syncFinished(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.syncLoopActive.Store(0)
m.syncLoopDurationHist.Record(ctx, time.Since(m.syncStartedTs).Seconds())
})
}

func (m *metrics) unrecentHead(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.unrecentHeader.Add(ctx, 1)
})
}

func (m *metrics) trustedPeersOutOufSync(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.trustedPeersOutOfSync.Add(ctx, 1)
})
}

func (m *metrics) subjectiveInitialization(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.subjectiveInit.Add(ctx, 1)
})
}

func (m *metrics) updateGetRangeRequestInfo(ctx context.Context, amount int, failed bool) {
m.observe(ctx, func(ctx context.Context) {
m.requestRangeTimeHist.Record(ctx, time.Since(m.requestRangeStartTs).Seconds(),
metric.WithAttributes(
attribute.Int("headers amount", amount),
attribute.Bool("request failed", failed),
))
})
}

func (m *metrics) newSubjectiveHead(ctx context.Context, height uint64, timestamp time.Time) {
m.observe(ctx, func(ctx context.Context) {
m.subjectiveHead.Store(int64(height))

if !m.prevHeader.IsZero() {
m.blockTime.Record(ctx, timestamp.Sub(m.prevHeader).Seconds())
}
})
}

func (m *metrics) rangeRequestStart() {
if m == nil {
return
}
m.requestRangeStartTs = time.Now()
}

func (m *metrics) rangeRequestStop() {
if m == nil {
return
}
m.requestRangeStartTs = time.Time{}
}

m.totalSynced.Add(int64(totalSynced))
func (m *metrics) observe(ctx context.Context, observeFn func(context.Context)) {
walldiss marked this conversation as resolved.
Show resolved Hide resolved
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}
observeFn(ctx)
}

func (m *metrics) Close() error {
if m == nil {
return nil
}
return m.syncerReg.Unregister()
}
15 changes: 9 additions & 6 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *Syncer[H]) Start(ctx context.Context) error {
// Stop stops Syncer.
func (s *Syncer[H]) Stop(context.Context) error {
s.cancel()
return nil
return s.metrics.Close()
}

// SyncWait blocks until ongoing sync is done.
Expand Down Expand Up @@ -158,7 +158,7 @@ func (s *Syncer[H]) State() State {

head, err := s.store.Head(s.ctx)
if err == nil {
state.Height = uint64(head.Height())
state.Height = head.Height()
} else if state.Error == "" {
// don't ignore the error if we can show it in the state
state.Error = err.Error()
Expand All @@ -180,7 +180,9 @@ func (s *Syncer[H]) syncLoop() {
for {
select {
case <-s.triggerSync:
s.metrics.syncStarted(s.ctx)
s.sync(s.ctx)
s.metrics.syncFinished(s.ctx)
case <-s.ctx.Done():
return
}
Expand Down Expand Up @@ -239,8 +241,8 @@ func (s *Syncer[H]) sync(ctx context.Context) {
func (s *Syncer[H]) doSync(ctx context.Context, fromHead, toHead H) (err error) {
s.stateLk.Lock()
s.state.ID++
s.state.FromHeight = uint64(fromHead.Height()) + 1
s.state.ToHeight = uint64(toHead.Height())
s.state.FromHeight = fromHead.Height() + 1
s.state.ToHeight = toHead.Height()
s.state.FromHash = fromHead.Hash()
s.state.ToHash = toHead.Hash()
s.state.Start = time.Now()
Expand Down Expand Up @@ -315,7 +317,10 @@ func (s *Syncer[H]) requestHeaders(
}

to := fromHead.Height() + size + 1
s.metrics.rangeRequestStart()
headers, err := s.getter.GetRangeByHeight(ctx, fromHead, to)
s.metrics.updateGetRangeRequestInfo(s.ctx, int(size)/100, err != nil)
s.metrics.rangeRequestStop()
if err != nil {
return err
}
Expand All @@ -338,7 +343,5 @@ func (s *Syncer[H]) storeHeaders(ctx context.Context, headers ...H) error {
if err != nil {
return err
}

s.metrics.recordTotalSynced(len(headers))
return nil
}
5 changes: 5 additions & 0 deletions sync/sync_head.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err
// if we can't get it - give what we have
reqCtx, cancel := context.WithTimeout(ctx, time.Second*2) // TODO(@vgonkivs): make timeout configurable
defer cancel()
s.metrics.unrecentHead(s.ctx)
netHead, err := s.getter.Head(reqCtx, header.WithTrustedHead[H](sbjHead))
if err != nil {
log.Warnw("failed to get recent head, returning current subjective", "sbjHead", sbjHead.Height(), "err", err)
Expand Down Expand Up @@ -85,10 +86,12 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) {
return s.subjectiveHead(ctx)
}
defer s.getter.Unlock()

trustHead, err := s.getter.Head(ctx)
if err != nil {
return trustHead, err
}
s.metrics.subjectiveInitialization(s.ctx)
// and set it as the new subjective head without validation,
// or, in other words, do 'automatic subjective initialization'
// NOTE: we avoid validation as the head expired to prevent possibility of the Long-Range Attack
Expand All @@ -103,6 +106,7 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) {
log.Warnw("subjective initialization with an old header", "height", trustHead.Height())
}
log.Warn("trusted peer is out of sync")
s.metrics.trustedPeersOutOufSync(s.ctx)
return trustHead, nil
}

Expand All @@ -121,6 +125,7 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) {
"hash", netHead.Hash().String(),
"err", err)
}
s.metrics.newSubjectiveHead(s.ctx, netHead.Height(), netHead.Time())

storeHead, err := s.store.Head(ctx)
if err == nil && storeHead.Height() >= netHead.Height() {
Expand Down
Loading