Skip to content

Commit

Permalink
Logging
Browse files Browse the repository at this point in the history
This patch adds logging functionality via zerolog library.
  • Loading branch information
paramite committed Jul 4, 2019
1 parent a4ae6b8 commit 485a0e5
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 27 deletions.
17 changes: 15 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/go-ini/ini"
"github.com/rs/zerolog"
)

const (
Expand All @@ -33,6 +34,7 @@ type Section struct {
}

type Config struct {
log zerolog.Logger
metadata map[string][]Parameter
Sections map[string]*Section
}
Expand Down Expand Up @@ -168,9 +170,10 @@ func validate(value string, validators []Validator) error {
return nil
}

func NewConfig(metadata map[string][]Parameter) (*Config, error) {
func NewConfig(metadata map[string][]Parameter, logger zerolog.Logger) (*Config, error) {
var conf Config
conf.metadata = metadata
conf.log = logger
// initialize config with default values
conf.Sections = make(map[string]*Section)
for sectionName, sectionMetadata := range conf.metadata {
Expand All @@ -193,7 +196,6 @@ func (conf *Config) Parse(path string) error {
if err != nil {
return err
}
//TODO: log loaded config file
for sectionName, sectionMetadata := range conf.metadata {
if sectionData, err := data.GetSection(sectionName); err == nil {
for _, param := range sectionMetadata {
Expand All @@ -202,6 +204,17 @@ func (conf *Config) Parse(path string) error {
return fmt.Errorf("Failed to validate parameter %s. %s", param.Name, err.Error())
}
conf.Sections[sectionName].Options[param.Name].value = paramData.Value()
conf.log.Debug().
Str("section", sectionName).
Str("option", param.Name).
Str("value", paramData.Value()).
Msg("Using parsed configuration value.")
} else {
conf.log.Debug().
Str("section", sectionName).
Str("option", param.Name).
Str("value", conf.Sections[sectionName].Options[param.Name].value).
Msg("Using default configuration value.")
}
}
}
Expand Down
48 changes: 37 additions & 11 deletions main/main.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,45 @@
package main

import (
"flag"
"fmt"
"os"

"github.com/paramite/collectd-sensubility/config"
"github.com/paramite/collectd-sensubility/sensu"
"github.com/rs/zerolog"
)

const DEFAULT_CONFIG_PATH = "/etc/collectd-sensubility.conf"

func main() {
metadata := config.GetAgentConfigMetadata()
cfg, err := config.NewConfig(metadata)
debug := flag.Bool("debug", false, "enables debugging logs")
verbose := flag.Bool("verbose", false, "enables debugging logs")
logpath := flag.String("log", "/var/log/collectd/sensubility.log", "path to log file")
flag.Parse()

// set logging
logfile, err := os.OpenFile(*logpath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
defer logfile.Close()
if err != nil {
panic(err.Error())
fmt.Printf("Failed to open log file %s.\n", *logpath)
os.Exit(2)
}
zerolog.SetGlobalLevel(zerolog.WarnLevel)
if *verbose {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
} else if *debug {
zerolog.SetGlobalLevel(zerolog.DebugLevel)
}
log := zerolog.New(logfile).With().Timestamp().Logger()

// spawn entities
metadata := config.GetAgentConfigMetadata()
cfg, err := config.NewConfig(metadata, log.With().Str("component", "config-parser").Logger())
if err != nil {
log.Fatal().Err(err).Msg("Failed to parse config file.")
os.Exit(2)
}
confPath := os.Getenv("COLLECTD_SENSUBILITY_CONFIG")
if confPath == "" {
confPath = DEFAULT_CONFIG_PATH
Expand All @@ -25,24 +48,25 @@ func main() {
if err != nil {
panic(err.Error())
}
sensuConnector, err := sensu.NewConnector(cfg)
sensuConnector, err := sensu.NewConnector(cfg, log.With().Str("component", "sensu-connector").Logger())
if err != nil {
fmt.Println(err.Error())
log.Fatal().Err(err).Msg("Failed to spawn RabbitMQ connector.")
os.Exit(2)
}
defer sensuConnector.Disconnect()

sensuScheduler, err := sensu.NewScheduler(cfg)
sensuScheduler, err := sensu.NewScheduler(cfg, log.With().Str("component", "sensu-scheduler").Logger())
if err != nil {
fmt.Println(err.Error())
log.Fatal().Err(err).Msg("Failed to spawn check scheduler.")
os.Exit(2)
}

sensuExecutor, err := sensu.NewExecutor(cfg)
sensuExecutor, err := sensu.NewExecutor(cfg, log.With().Str("component", "sensu-executor").Logger())
if err != nil {
fmt.Println(err.Error())
log.Fatal().Err(err).Msg("Failed to spawn check executor.")
os.Exit(2)
}
defer sensuExecutor.Clean()

requests := make(chan interface{})
results := make(chan interface{})
Expand All @@ -51,6 +75,7 @@ func main() {

sensuConnector.Start(requests, results)
sensuScheduler.Start(requests)

// spawn worker goroutines
workers := cfg.Sections["sensu"].Options["worker_count"].GetInt()
for i := 0; i < workers; i++ {
Expand All @@ -61,12 +86,13 @@ func main() {
case sensu.CheckRequest:
res, err := sensuExecutor.Execute(req)
if err != nil {
//TODO: log warning
reqstr := fmt.Sprintf("Request{name=%s, command=%s, issued=%d}", req.Name, req.Command, req.Issued)
log.Error().Err(err).Str("request", reqstr).Msg("Failed to execute requested command.")
continue
}
results <- res
default:
//TODO: log warning
log.Error().Err(err).Str("request", fmt.Sprintf("%v", req)).Msg("Failed to execute requested command.")
}
}
}()
Expand Down
17 changes: 10 additions & 7 deletions sensu/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/juju/errors"
"github.com/paramite/collectd-sensubility/config"
"github.com/rs/zerolog"
"github.com/streadway/amqp"
)

Expand Down Expand Up @@ -35,6 +36,7 @@ type Connector struct {
ClientName string
ClientAddress string
KeepaliveInterval int
log zerolog.Logger
queueName string
exchangeName string
inConnection *amqp.Connection
Expand All @@ -45,14 +47,15 @@ type Connector struct {
consumer <-chan amqp.Delivery
}

func NewConnector(cfg *config.Config) (*Connector, error) {
func NewConnector(cfg *config.Config, logger zerolog.Logger) (*Connector, error) {
var connector Connector
connector.Address = cfg.Sections["sensu"].Options["connection"].GetString()
connector.Subscription = cfg.Sections["sensu"].Options["subscriptions"].GetStrings(",")
connector.ClientName = cfg.Sections["sensu"].Options["client_name"].GetString()
connector.ClientAddress = cfg.Sections["sensu"].Options["client_address"].GetString()
connector.KeepaliveInterval = cfg.Sections["sensu"].Options["keepalive_interval"].GetInt()

connector.log = logger
connector.exchangeName = fmt.Sprintf("client:%s", connector.ClientName)
connector.queueName = fmt.Sprintf("%s-collectd-%d", connector.ClientName, time.Now().Unix())

Expand Down Expand Up @@ -165,7 +168,7 @@ func (self *Connector) Start(outchan chan interface{}, inchan chan interface{})
if err == nil {
outchan <- request
} else {
//TODO: log warning
self.log.Warn().Err(err).Bytes("request-body", req.Body).Msg("Failed to unmarshal request body.")
}
}
}()
Expand All @@ -177,7 +180,7 @@ func (self *Connector) Start(outchan chan interface{}, inchan chan interface{})
case Result:
body, err := json.Marshal(result)
if err != nil {
//TODO: log warning
self.log.Error().Err(err).Msg("Failed to marshal execution result.")
continue
}
err = self.outChannel.Publish(
Expand All @@ -194,10 +197,10 @@ func (self *Connector) Start(outchan chan interface{}, inchan chan interface{})
Priority: 0, // 0-9
})
if err != nil {
//TODO: log warning
self.log.Error().Err(err).Msg("Failed to publish execution result.")
}
default:
//TODO: log warning
self.log.Error().Str("type", fmt.Sprintf("%t", res)).Msg("Received execution result with invalid type.")
}
}
}()
Expand All @@ -213,7 +216,7 @@ func (self *Connector) Start(outchan chan interface{}, inchan chan interface{})
Timestamp: time.Now().Unix(),
})
if err != nil {
//TODO: log warning
self.log.Error().Err(err).Msg("Failed to marshal keepalive body.")
continue
}
err = self.outChannel.Publish(
Expand All @@ -230,7 +233,7 @@ func (self *Connector) Start(outchan chan interface{}, inchan chan interface{})
Priority: 0, // 0-9
})
if err != nil {
//TODO: log warning
self.log.Error().Err(err).Msg("Failed to publish keepalive body.")
}
time.Sleep(time.Duration(self.KeepaliveInterval) * time.Second)
}
Expand Down
16 changes: 11 additions & 5 deletions sensu/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/juju/errors"
"github.com/paramite/collectd-sensubility/config"
"github.com/rs/zerolog"
)

const (
Expand Down Expand Up @@ -38,15 +39,18 @@ type Executor struct {
ClientName string
TmpBaseDir string
ShellPath string
log zerolog.Logger
scriptCache map[string]string
}

func NewExecutor(cfg *config.Config) (*Executor, error) {
func NewExecutor(cfg *config.Config, logger zerolog.Logger) (*Executor, error) {
var executor Executor
executor.ClientName = cfg.Sections["sensu"].Options["client_name"].GetString()
executor.TmpBaseDir = cfg.Sections["sensu"].Options["tmp_base_dir"].GetString()
executor.ShellPath = cfg.Sections["sensu"].Options["shell_path"].GetString()

executor.scriptCache = make(map[string]string)
executor.log = logger
if _, err := os.Stat(executor.TmpBaseDir); os.IsNotExist(err) {
err := os.MkdirAll(executor.TmpBaseDir, 0700)
if err != nil {
Expand All @@ -71,6 +75,7 @@ func (self *Executor) Execute(request CheckRequest) (Result, error) {
}
self.scriptCache[request.Command] = scriptFile.Name()
scriptFile.Close()
self.log.Debug().Str("command", request.Command).Str("path", scriptFile.Name()).Msg("Created check script.")
}

//cmdParts := strings.Split(request.Command, " ")
Expand All @@ -97,6 +102,7 @@ func (self *Executor) Execute(request CheckRequest) (Result, error) {
} else if strings.TrimSpace(cmdErr) != "" {
status = CHECK_WARN
}

result := Result{
Client: self.ClientName,
Check: CheckResult{
Expand All @@ -110,11 +116,11 @@ func (self *Executor) Execute(request CheckRequest) (Result, error) {
},
}

self.log.Debug().Str("command", request.Command).Int("status", status).Msg("Executed check script.")
return result, nil
}

func (self *Executor) Clean() error {
//TODO: delete tmp dir

return nil
func (self *Executor) Clean() {
os.Remove(self.TmpBaseDir)
self.log.Debug().Str("dir", self.TmpBaseDir).Msg("Removed temporary directory.")
}
8 changes: 6 additions & 2 deletions sensu/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/juju/errors"
"github.com/paramite/collectd-sensubility/config"
"github.com/rs/zerolog"
)

type Check struct {
Expand All @@ -24,10 +25,12 @@ type Check struct {

type Scheduler struct {
Checks map[string]Check
log zerolog.Logger
}

func NewScheduler(cfg *config.Config) (*Scheduler, error) {
func NewScheduler(cfg *config.Config, logger zerolog.Logger) (*Scheduler, error) {
var scheduler Scheduler
scheduler.log = logger
err := json.Unmarshal(cfg.Sections["sensu"].Options["checks"].GetBytes(), &scheduler.Checks)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -42,7 +45,7 @@ func (self *Scheduler) Start(outchan chan interface{}) {
cases := []reflect.SelectCase{}
for name, data := range self.Checks {
if data.Interval < 1 {
//TODO: log warning
self.log.Warn().Str("check", name).Int("interval", data.Interval).Msg("Configuration contains invalid interval.")
continue
}
//TODO: use rather time.NewTicker() to be able to ticker.Stop() all tickers in Scheduler.Stop()
Expand All @@ -57,6 +60,7 @@ func (self *Scheduler) Start(outchan chan interface{}) {
for {
index, _, _ := reflect.Select(cases)
// request check execution
self.log.Debug().Str("check", checks[index]).Msg("Requesting execution of check.")
outchan <- CheckRequest{
Command: self.Checks[checks[index]].Command,
Name: checks[index],
Expand Down

0 comments on commit 485a0e5

Please sign in to comment.