-
Notifications
You must be signed in to change notification settings - Fork 0
/
heartbeat.go
123 lines (103 loc) · 3.94 KB
/
heartbeat.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package discovery
import (
"context"
"errors"
"time"
"connectrpc.com/connect"
"github.com/google/uuid"
"github.com/overmindtech/sdp-go"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/durationpb"
)
const DefaultHeartbeatFrequency = 5 * time.Minute
var ErrNoHealthcheckDefined = errors.New("no healthcheck defined")
// HeartbeatSender sends a heartbeat to the management API, this is called at
// `DefaultHeartbeatFrequency` by default when the engine is running, or
// `StartSendingHeartbeats` has been called manually. Users can also call this
// method to immediately send a heartbeat if required
func (e *Engine) SendHeartbeat(ctx context.Context) error {
if e.EngineConfig.HeartbeatOptions == nil || e.EngineConfig.HeartbeatOptions.HealthCheck == nil {
return ErrNoHealthcheckDefined
}
healthCheckError := e.EngineConfig.HeartbeatOptions.HealthCheck()
var heartbeatError *string
if healthCheckError != nil {
heartbeatError = new(string)
*heartbeatError = healthCheckError.Error()
}
var engineUUID []byte
if e.EngineConfig.SourceUUID != uuid.Nil {
engineUUID = e.EngineConfig.SourceUUID[:]
}
// Get available types and scopes
availableScopesMap := make(map[string]bool)
adapterMetadata := []*sdp.AdapterMetadata{}
for _, adapter := range e.sh.VisibleAdapters() {
for _, scope := range adapter.Scopes() {
availableScopesMap[scope] = true
}
adapterMetadata = append(adapterMetadata, adapter.Metadata())
}
// Extract slices from maps
availableScopes := make([]string, 0)
for s := range availableScopesMap {
availableScopes = append(availableScopes, s)
}
// Calculate the duration for the next heartbeat, based on the current
// frequency x2.5 to give us some leeway
nextHeartbeat := time.Duration(float64(e.EngineConfig.HeartbeatOptions.Frequency) * 2.5)
_, err := e.EngineConfig.HeartbeatOptions.ManagementClient.SubmitSourceHeartbeat(ctx, &connect.Request[sdp.SubmitSourceHeartbeatRequest]{
Msg: &sdp.SubmitSourceHeartbeatRequest{
UUID: engineUUID,
Version: e.EngineConfig.Version,
Name: e.EngineConfig.SourceName,
Type: e.EngineConfig.EngineType,
AvailableScopes: availableScopes,
AdapterMetadata: adapterMetadata,
Managed: e.EngineConfig.OvermindManagedSource,
Error: heartbeatError,
NextHeartbeatMax: durationpb.New(nextHeartbeat),
},
})
return err
}
// Starts sending heartbeats at the specified frequency. These will be sent in
// the background and this function will return immediately. Heartbeats are
// automatically started when the engine started, but if an adapter has startup
// steps that take a long time, or are liable to fail, the user may want to
// start the heartbeats first so that users can see that the adapter has failed
// to start.
//
// If this is called multiple times, nothing will happen. Heartbeats will be
// stopped when the engine is stopped, or when the provided context is canceled.
//
// This will send one heartbeat initially when the method is called, and will
// then run in a background goroutine that sends heartbeats at the specified
// frequency, and will stop when the provided context is canceled.
func (e *Engine) StartSendingHeartbeats(ctx context.Context) {
if e.EngineConfig.HeartbeatOptions == nil || e.EngineConfig.HeartbeatOptions.Frequency == 0 || e.heartbeatCancel != nil {
return
}
var heartbeatContext context.Context
heartbeatContext, e.heartbeatCancel = context.WithCancel(ctx)
// Send one heartbeat at the beginning
err := e.SendHeartbeat(heartbeatContext)
if err != nil {
log.WithError(err).Error("Failed to send heartbeat")
}
go func() {
ticker := time.NewTicker(e.EngineConfig.HeartbeatOptions.Frequency)
defer ticker.Stop()
for {
select {
case <-heartbeatContext.Done():
return
case <-ticker.C:
err := e.SendHeartbeat(heartbeatContext)
if err != nil {
log.WithError(err).Error("Failed to send heartbeat")
}
}
}
}()
}