Skip to content

Commit

Permalink
Merge pull request #2 from infrawatch/pleimer-metrics-timeout
Browse files Browse the repository at this point in the history
Metrics Timeout
  • Loading branch information
csibbitt authored May 7, 2020
2 parents 6d3aa0c + 3389970 commit 32eea9c
Show file tree
Hide file tree
Showing 10 changed files with 537 additions and 39 deletions.
4 changes: 2 additions & 2 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"runtime/pprof"
"strconv"

"github.com/atyronesmith/sa-benchmark/pkg/inetserver"
"github.com/atyronesmith/sa-benchmark/pkg/unixserver"
"github.com/infrawatch/sg2/pkg/inetserver"
"github.com/infrawatch/sg2/pkg/unixserver"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
Expand Down
4 changes: 2 additions & 2 deletions cmd/client/client.go → cmd/udpclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"runtime/pprof"
"strconv"

"github.com/atyronesmith/sa-benchmark/pkg/collectd"
"github.com/atyronesmith/sa-benchmark/pkg/udpclient"
"github.com/infrawatch/sg2/pkg/collectd"
"github.com/infrawatch/sg2/pkg/udpclient"
)

func usage() {
Expand Down
133 changes: 133 additions & 0 deletions cmd/unixclient/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package main

import (
"context"
"flag"
"fmt"
"log"
"net"
"os"
"runtime"
"runtime/pprof"
"time"

"github.com/infrawatch/sg2/pkg/collectd"
)

func usage() {
fmt.Fprintf(os.Stderr, "usage: %s [-c count] unix_address \n", os.Args[0])
fmt.Fprintf(os.Stderr, "options:\n")
flag.PrintDefaults()
}

func sendMetrics(ctx context.Context, address string, count int, hostsCount int, mesg []byte) (err error) {
raddr, err := net.ResolveUnixAddr("unixgram", address)
if err != nil {
return
}

// // Although we're not in a connection-oriented transport,
// // the act of `dialing` is analogous to the act of performing
// // a `connect(2)` syscall for a socket of type SOCK_DGRAM:
// // - it forces the underlying socket to only read and write
// // to and from a specific remote address.
conn, err := net.DialUnix("unixgram", nil, raddr)
if err != nil {
return
}

// Closes the underlying file descriptor associated with the,
// socket so that it no longer refers to any file.
defer conn.Close()

doneChan := make(chan error, 1)
sent := 0
bytesSent := 0
var start time.Time
var end time.Time

go func(conn *net.UnixConn, mesg []byte) {

start = time.Now()

for i := 0; i < count; i++ {
_, err := conn.Write(mesg)
if err != nil {
doneChan <- err
return
}
sent++
bytesSent += len(mesg)
}
end = time.Now()
doneChan <- nil
}(conn, mesg)

for {
select {
case <-ctx.Done():
fmt.Println("cancelled")
err = ctx.Err()
goto done
case err = <-doneChan:
fmt.Printf("Sent...%d\n", sent)
goto done
default:
time.Sleep(time.Second * 1)
fmt.Printf("Sending...%d\n", sent)
if sent >= count {
goto done
}
}
}
done:
diff := end.Sub(start)
fmt.Printf("Send %d messages in %.8v, %.4f Mbps\n", sent, diff, float64(bytesSent)/diff.Seconds()*8.0/1000000.0)

return
}

func main() {
if os.Getenv("DEBUG") != "" {
runtime.SetBlockProfileRate(20)
runtime.SetMutexProfileFraction(20)
}
// parse command line option
hostsNum := flag.Int("hosts", 1, "Number of hosts to simulate")
metricPerMsg := flag.Int("mpm", 1, "Number of metrics per messsage")
msgCount := flag.Int("count", 1000000, "Number of metrics to send")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")

flag.Usage = usage
flag.Parse()

if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal(err)
}
defer f.Close()
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal("could not start CPU profile: ", err)
}
defer pprof.StopCPUProfile()
}
args := flag.Args()

if len(args) != 1 {
fmt.Fprintln(os.Stderr, "Invalid number of arguments...")
usage()
os.Exit(1)
}

addr := args[0]

metric := collectd.GenCPUMetric(10, "Goblin", *metricPerMsg)

ctx := context.Background()

err := sendMetrics(ctx, addr, *msgCount, *hostsNum, metric)
if err != nil {
fmt.Printf("Error occurred: %s\n", err)
}
}
11 changes: 4 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
module github.com/atyronesmith/sa-benchmark
module github.com/infrawatch/sg2

go 1.13
go 1.14

require (
collectd.org v0.3.0
github.com/json-iterator/go v1.1.8
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/prometheus/client_golang v1.2.1
google.golang.org/genproto v0.0.0-20200128133413-58ce757ed39b
github.com/json-iterator/go v1.1.9
github.com/prometheus/client_golang v1.5.1
)
38 changes: 38 additions & 0 deletions pkg/assert/assert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package assert

// package that aids in running tests

import (
"fmt"
"path/filepath"
"reflect"
"runtime"
"testing"
)

// Assert fails the test if the condition is false.
func Assert(tb testing.TB, condition bool, msg string, v ...interface{}) {
if !condition {
_, file, line, _ := runtime.Caller(1)
fmt.Printf("\033[31m%s:%d: "+msg+"\033[39m\n\n", append([]interface{}{filepath.Base(file), line}, v...)...)
tb.FailNow()
}
}

// Ok fails the test if an err is not nil.
func Ok(tb testing.TB, err error) {
if err != nil {
_, file, line, _ := runtime.Caller(1)
fmt.Printf("\033[31m%s:%d: unexpected error: %s\033[39m\n\n", filepath.Base(file), line, err.Error())
tb.FailNow()
}
}

// Equals fails the test if exp is not equal to act.
func Equals(tb testing.TB, exp, act interface{}) {
if !reflect.DeepEqual(exp, act) {
_, file, line, _ := runtime.Caller(1)
fmt.Printf("\033[31m%s:%d:\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n", filepath.Base(file), line, exp, act)
tb.FailNow()
}
}
66 changes: 66 additions & 0 deletions pkg/cacheutil/cacheserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package cacheutil

import (
"container/list"
"context"
"time"
)

// Expiry use to free memory after expire condition
type Expiry interface {
Expired() bool
Delete()
}

// CacheServer for now used only to expire Expiry types
type CacheServer struct {
entries *list.List
Interval time.Duration
}

// NewCacheServer CacheServer factory that sets expiry interval in seconds
func NewCacheServer() *CacheServer {
return &CacheServer{
entries: list.New(),
Interval: 5,
}
}

// Register new expiry object
func (cs *CacheServer) Register(e Expiry) {
cs.entries.PushBack(e)
}

// Run run cache server
func (cs *CacheServer) Run(ctx context.Context) error {
// expiry loop

var err error
for {
select {
case <-ctx.Done():
err = ctx.Err()
goto done
default:
e := cs.entries.Front()
n := cs.entries.Front()
for {
if e == nil {
break
}

if e.Value.(Expiry).Expired() {
e.Value.(Expiry).Delete()
n = e.Next()
cs.entries.Remove(e)
e = n
continue
}
e = e.Next()
}
time.Sleep(time.Second * cs.Interval)
}
}
done:
return err
}
105 changes: 105 additions & 0 deletions pkg/cacheutil/cacheserver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package cacheutil

import (
"context"
"fmt"
"strconv"
"testing"
"time"

"github.com/infrawatch/sg2/pkg/assert"
)

type deleteFn func()

type LabelSeries struct {
interval float64
lastArrival time.Time
deleteFn deleteFn
}

func (ls *LabelSeries) Expired() bool {
return time.Now().Sub(ls.lastArrival).Seconds() >= ls.interval
}

func (ls *LabelSeries) Delete() {
ls.deleteFn()
}

type MetricStash struct {
metrics map[string]map[string]*LabelSeries
}

func NewMetricStash() *MetricStash {
return &MetricStash{
metrics: make(map[string]map[string]*LabelSeries),
}
}

func (ms *MetricStash) addMetric(metricName string, interval float64, numLabels int, cs *CacheServer) {
for i := 0; i < numLabels; i++ {
ls := LabelSeries{
interval: interval,
lastArrival: time.Now(),
}

labelName := "test-label-" + strconv.Itoa(i)

ls.deleteFn = func() {
fmt.Printf("Label %s in metric %s deleted\n", labelName, metricName)
delete(ms.metrics[metricName], labelName)

if len(ms.metrics[metricName]) == 0 {
delete(ms.metrics, metricName)
fmt.Printf("Metrics %s deleted\n", metricName)
}
}

if ms.metrics[metricName] == nil {
ms.metrics[metricName] = make(map[string]*LabelSeries)
}

ms.metrics[metricName][labelName] = &ls
cs.Register(&ls)
}
}

func TestCacheExpiry(t *testing.T) {
ms := NewMetricStash()

cs := NewCacheServer()
ctx := context.Background()

go cs.Run(ctx)

t.Run("single entry", func(t *testing.T) {
ms.addMetric("test-metric", 1, 1, cs)
assert.Equals(t, 1, len(ms.metrics))
for i := 0; i < 2; i++ {
time.Sleep(time.Second * 1)
}

assert.Equals(t, 0, len(ms.metrics))
})

t.Run("different metrics and intervals", func(t *testing.T) {
ms.addMetric("test-metric-1", 1, 1, cs)
ms.addMetric("test-metric-2", 2, 1, cs)

assert.Equals(t, 2, len(ms.metrics))
for i := 0; i < 4; i++ {
time.Sleep(time.Second * 1)
}
assert.Equals(t, 0, len(ms.metrics))
})

t.Run("multilabel metric", func(t *testing.T) {
ms.addMetric("test-metric-1", 1, 10, cs)

assert.Equals(t, 10, len(ms.metrics["test-metric-1"]))
for i := 0; i < 2; i++ {
time.Sleep(time.Second * 1)
}
assert.Equals(t, 0, len(ms.metrics))
})
}
2 changes: 1 addition & 1 deletion pkg/inetserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"net"
"time"

"github.com/atyronesmith/sa-benchmark/pkg/collectd"
"github.com/infrawatch/sg2/pkg/collectd"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down
Loading

0 comments on commit 32eea9c

Please sign in to comment.