Skip to content
This repository has been archived by the owner on Mar 27, 2024. It is now read-only.

Commit

Permalink
dyncfgv2
Browse files Browse the repository at this point in the history
  • Loading branch information
ilyam8 committed Feb 22, 2024
1 parent c0b768b commit a3b12a4
Show file tree
Hide file tree
Showing 681 changed files with 22,824 additions and 20,279 deletions.
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ see the appropriate collector readme.
| [docker_engine](https://github.com/netdata/go.d.plugin/tree/master/modules/docker_engine) | Docker Engine |
| [dockerhub](https://github.com/netdata/go.d.plugin/tree/master/modules/dockerhub) | Docker Hub |
| [elasticsearch](https://github.com/netdata/go.d.plugin/tree/master/modules/elasticsearch) | Elasticsearch/OpenSearch |
| [energid](https://github.com/netdata/go.d.plugin/tree/master/modules/energid) | Energi Core |
| [envoy](https://github.com/netdata/go.d.plugin/tree/master/modules/envoy) | Envoy |
| [example](https://github.com/netdata/go.d.plugin/tree/master/modules/example) | - |
| [filecheck](https://github.com/netdata/go.d.plugin/tree/master/modules/filecheck) | Files and Directories |
Expand Down Expand Up @@ -110,9 +109,7 @@ see the appropriate collector readme.
| [redis](https://github.com/netdata/go.d.plugin/tree/master/modules/redis) | Redis |
| [scaleio](https://github.com/netdata/go.d.plugin/tree/master/modules/scaleio) | Dell EMC ScaleIO |
| [SNMP](https://github.com/netdata/go.d.plugin/blob/master/modules/snmp) | SNMP |
| [solr](https://github.com/netdata/go.d.plugin/tree/master/modules/solr) | Solr |
| [squidlog](https://github.com/netdata/go.d.plugin/tree/master/modules/squidlog) | Squid |
| [springboot2](https://github.com/netdata/go.d.plugin/tree/master/modules/springboot2) | Spring Boot2 |
| [supervisord](https://github.com/netdata/go.d.plugin/tree/master/modules/supervisord) | Supervisor |
| [systemdunits](https://github.com/netdata/go.d.plugin/tree/master/modules/systemdunits) | Systemd unit state |
| [tengine](https://github.com/netdata/go.d.plugin/tree/master/modules/tengine) | Tengine |
Expand Down
80 changes: 33 additions & 47 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,17 @@ var isTerminal = isatty.IsTerminal(os.Stdout.Fd())

// Config is an Agent configuration.
type Config struct {
Name string
ConfDir []string
ModulesConfDir []string
ModulesSDConfPath []string
VnodesConfDir []string
StateFile string
LockDir string
ModuleRegistry module.Registry
RunModule string
MinUpdateEvery int
Name string
ConfDir []string
ModulesConfDir []string
ModulesConfSDDir []string
ModulesConfWatchPath []string
VnodesConfDir []string
StateFile string
LockDir string
ModuleRegistry module.Registry
RunModule string
MinUpdateEvery int
}

// Agent represents orchestrator.
Expand All @@ -51,6 +52,7 @@ type Agent struct {
Name string
ConfDir multipath.MultiPath
ModulesConfDir multipath.MultiPath
ModulesConfSDDir multipath.MultiPath
ModulesSDConfPath []string
VnodesConfDir multipath.MultiPath
StateFile string
Expand All @@ -72,7 +74,8 @@ func New(cfg Config) *Agent {
Name: cfg.Name,
ConfDir: cfg.ConfDir,
ModulesConfDir: cfg.ModulesConfDir,
ModulesSDConfPath: cfg.ModulesSDConfPath,
ModulesConfSDDir: cfg.ModulesConfSDDir,
ModulesSDConfPath: cfg.ModulesConfWatchPath,
VnodesConfDir: cfg.VnodesConfDir,
StateFile: cfg.StateFile,
LockDir: cfg.LockDir,
Expand All @@ -96,11 +99,9 @@ func serve(a *Agent) {
var wg sync.WaitGroup

var exit bool
var reload bool

for {
ctx, cancel := context.WithCancel(context.Background())
ctx = context.WithValue(ctx, "reload", reload)

wg.Add(1)
go func() { defer wg.Done(); a.run(ctx) }()
Expand Down Expand Up @@ -136,7 +137,6 @@ func serve(a *Agent) {
os.Exit(0)
}

reload = true
time.Sleep(time.Second)
}
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func (a *Agent) run(ctx context.Context) {

discCfg := a.buildDiscoveryConf(enabledModules)

discoveryManager, err := discovery.NewManager(discCfg)
discMgr, err := discovery.NewManager(discCfg)
if err != nil {
a.Error(err)
if isTerminal {
Expand All @@ -178,64 +178,50 @@ func (a *Agent) run(ctx context.Context) {
return
}

functionsManager := functions.NewManager()

jobsManager := jobmgr.NewManager()
jobsManager.PluginName = a.Name
jobsManager.Out = a.Out
jobsManager.Modules = enabledModules

// TODO: API will be changed in https://github.com/netdata/netdata/pull/16702
//if logger.Level.Enabled(slog.LevelDebug) {
// dyncfgDiscovery, _ := dyncfg.NewDiscovery(dyncfg.Config{
// Plugin: a.Name,
// API: netdataapi.New(a.Out),
// Modules: enabledModules,
// ModuleConfigDefaults: discCfg.Registry,
// Functions: functionsManager,
// })
//
// discoveryManager.Add(dyncfgDiscovery)
//
// jobsManager.Dyncfg = dyncfgDiscovery
//}
fnMgr := functions.NewManager()

jobMgr := jobmgr.New()
jobMgr.PluginName = a.Name
jobMgr.Out = a.Out
jobMgr.Modules = enabledModules
jobMgr.FnReg = fnMgr

if reg := a.setupVnodeRegistry(); reg == nil || reg.Len() == 0 {
vnodes.Disabled = true
} else {
jobsManager.Vnodes = reg
jobMgr.Vnodes = reg
}

if a.LockDir != "" {
jobsManager.FileLock = filelock.New(a.LockDir)
jobMgr.FileLock = filelock.New(a.LockDir)
}

var statusSaveManager *filestatus.Manager
var fsMgr *filestatus.Manager
if !isTerminal && a.StateFile != "" {
statusSaveManager = filestatus.NewManager(a.StateFile)
jobsManager.StatusSaver = statusSaveManager
fsMgr = filestatus.NewManager(a.StateFile)
jobMgr.FileStatus = fsMgr
if store, err := filestatus.LoadStore(a.StateFile); err != nil {
a.Warningf("couldn't load state file: %v", err)
} else {
jobsManager.StatusStore = store
jobMgr.FileStatusStore = store
}
}

in := make(chan []*confgroup.Group)
var wg sync.WaitGroup

wg.Add(1)
go func() { defer wg.Done(); functionsManager.Run(ctx) }()
go func() { defer wg.Done(); fnMgr.Run(ctx) }()

wg.Add(1)
go func() { defer wg.Done(); jobsManager.Run(ctx, in) }()
go func() { defer wg.Done(); jobMgr.Run(ctx, in) }()

wg.Add(1)
go func() { defer wg.Done(); discoveryManager.Run(ctx, in) }()
go func() { defer wg.Done(); discMgr.Run(ctx, in) }()

if statusSaveManager != nil {
if fsMgr != nil {
wg.Add(1)
go func() { defer wg.Done(); statusSaveManager.Run(ctx) }()
go func() { defer wg.Done(); fsMgr.Run(ctx) }()
}

wg.Wait()
Expand Down
24 changes: 12 additions & 12 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ func TestNew(t *testing.T) {

func TestAgent_Run(t *testing.T) {
a := New(Config{
Name: "",
ConfDir: nil,
ModulesConfDir: nil,
ModulesSDConfPath: nil,
StateFile: "",
ModuleRegistry: nil,
RunModule: "",
MinUpdateEvery: 0,
Name: "",
ConfDir: nil,
ModulesConfDir: nil,
ModulesConfWatchPath: nil,
StateFile: "",
ModuleRegistry: nil,
RunModule: "",
MinUpdateEvery: 0,
})

var buf bytes.Buffer
Expand Down Expand Up @@ -74,17 +74,17 @@ func prepareRegistry(mux *sync.Mutex, stats map[string]int, names ...string) mod

func prepareMockModule(name string, mux *sync.Mutex, stats map[string]int) module.Module {
return &module.MockModule{
InitFunc: func() bool {
InitFunc: func() error {
mux.Lock()
defer mux.Unlock()
stats[name+"_init"]++
return true
return nil
},
CheckFunc: func() bool {
CheckFunc: func() error {
mux.Lock()
defer mux.Unlock()
stats[name+"_check"]++
return true
return nil
},
ChartsFunc: func() *module.Charts {
mux.Lock()
Expand Down
93 changes: 0 additions & 93 deletions agent/confgroup/cache.go

This file was deleted.

Loading

0 comments on commit a3b12a4

Please sign in to comment.