forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
graphite.go
149 lines (136 loc) · 3.86 KB
/
graphite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package graphite
import (
"errors"
"io"
"log"
"math/rand"
"net"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)
type Graphite struct {
// URL is only for backwards compatability
Servers []string
Prefix string
Template string
Timeout int
conns []net.Conn
}
var sampleConfig = `
## TCP endpoint for your graphite instance.
## If multiple endpoints are configured, output will be load balanced.
## Only one of the endpoints will be written to with each iteration.
servers = ["localhost:2003"]
## Prefix metrics name
prefix = ""
## Graphite output template
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
template = "host.tags.measurement.field"
## timeout in seconds for the write connection to graphite
timeout = 2
`
func (g *Graphite) Connect() error {
// Set default values
if g.Timeout <= 0 {
g.Timeout = 2
}
if len(g.Servers) == 0 {
g.Servers = append(g.Servers, "localhost:2003")
}
// Get Connections
var conns []net.Conn
for _, server := range g.Servers {
conn, err := net.DialTimeout("tcp", server, time.Duration(g.Timeout)*time.Second)
if err == nil {
conns = append(conns, conn)
}
}
g.conns = conns
return nil
}
func (g *Graphite) Close() error {
// Closing all connections
for _, conn := range g.conns {
conn.Close()
}
return nil
}
func (g *Graphite) SampleConfig() string {
return sampleConfig
}
func (g *Graphite) Description() string {
return "Configuration for Graphite server to send metrics to"
}
// We need check eof as we can write to nothing without noticing anything is wrong
// the connection stays in a close_wait
// We can detect that by finding an eof
// if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!)
// props to Tv via the authors of carbon-relay-ng` for this trick.
func checkEOF(conn net.Conn) {
b := make([]byte, 1024)
conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
num, err := conn.Read(b)
if err == io.EOF {
log.Printf("E! Conn %s is closed. closing conn explicitly", conn)
conn.Close()
return
}
// just in case i misunderstand something or the remote behaves badly
if num != 0 {
log.Printf("I! conn %s .conn.Read data? did not expect that. data: %s\n", conn, b[:num])
}
// Log non-timeout errors or close.
if e, ok := err.(net.Error); !(ok && e.Timeout()) {
log.Printf("E! conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s\n", conn, err)
conn.Close()
}
}
// Choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Prepare data
var batch []byte
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template)
if err != nil {
return err
}
for _, metric := range metrics {
buf, err := s.Serialize(metric)
if err != nil {
log.Printf("E! Error serializing some metrics to graphite: %s", err.Error())
}
batch = append(batch, buf...)
}
// This will get set to nil if a successful write occurs
err = errors.New("Could not write to any Graphite server in cluster\n")
// Send data to a random server
p := rand.Perm(len(g.conns))
for _, n := range p {
if g.Timeout > 0 {
g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
}
checkEOF(g.conns[n])
if _, e := g.conns[n].Write(batch); e != nil {
// Error
log.Println("E! Graphite Error: " + e.Error())
// Let's try the next one
} else {
// Success
err = nil
break
}
}
// try to reconnect
if err != nil {
log.Println("E! Reconnecting: ")
g.Connect()
}
return err
}
func init() {
outputs.Add("graphite", func() telegraf.Output {
return &Graphite{}
})
}