Skip to content

Commit

Permalink
refactor package tree
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Dec 3, 2023
1 parent 8b6f980 commit 33c40c6
Show file tree
Hide file tree
Showing 28 changed files with 101 additions and 68 deletions.
2 changes: 1 addition & 1 deletion cmd/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"fmt"
"log"

"github.com/ethpandaops/minccino/pkg/coordinator/task/tasks"
"github.com/ethpandaops/minccino/pkg/coordinator/tasks"
"github.com/spf13/cobra"
"gopkg.in/yaml.v2"
)
Expand Down
24 changes: 22 additions & 2 deletions pkg/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethpandaops/minccino/pkg/coordinator/buildinfo"
"github.com/ethpandaops/minccino/pkg/coordinator/clients"
"github.com/ethpandaops/minccino/pkg/coordinator/test"
"github.com/ethpandaops/minccino/pkg/coordinator/types"
"github.com/ethpandaops/minccino/pkg/coordinator/web/server"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
Expand All @@ -18,7 +19,9 @@ type Coordinator struct {
// Config is the coordinator configuration.
Config *Config
log logrus.FieldLogger
clientPool *clients.ClientPool
webserver *server.WebServer
tests []types.Test
metricsPort int
lameDuckSeconds int
}
Expand All @@ -27,6 +30,7 @@ func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort, lameDuc
return &Coordinator{
log: log,
Config: config,
tests: []types.Test{},
metricsPort: metricsPort,
lameDuckSeconds: lameDuckSeconds,
}
Expand All @@ -45,6 +49,7 @@ func (c *Coordinator) Run(ctx context.Context) error {
if err != nil {
return err
}
c.clientPool = clientPool
for idx := range c.Config.Endpoints {
err = clientPool.AddClient(&c.Config.Endpoints[idx])
if err != nil {
Expand All @@ -59,15 +64,16 @@ func (c *Coordinator) Run(ctx context.Context) error {
return err
}
if c.Config.Web.Frontend != nil {
c.webserver.StartFrontend(c.Config.Web.Frontend, clientPool)
c.webserver.StartFrontend(c.Config.Web.Frontend, c)
}
}

// run test
testToRun, err := test.CreateRunnable(ctx, c.log, clientPool, c.Config.Test)
testToRun, err := test.CreateRunnable(ctx, c, c.Config.Test)
if err != nil {
return err
}
c.tests = append(c.tests, testToRun)

if err := testToRun.Validate(); err != nil {
return err
Expand Down Expand Up @@ -97,6 +103,20 @@ func (c *Coordinator) Run(ctx context.Context) error {
return nil
}

func (c *Coordinator) Logger() logrus.FieldLogger {
return c.log
}

func (c *Coordinator) ClientPool() *clients.ClientPool {
return c.clientPool
}

func (c *Coordinator) GetTests() []types.Test {
tests := make([]types.Test, len(c.tests))
copy(tests, c.tests)
return tests
}

func (c *Coordinator) startMetrics() error {
c.log.
Info(fmt.Sprintf("Starting metrics server on :%v", c.metricsPort))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/ethpandaops/minccino/pkg/coordinator/clients"
"github.com/ethpandaops/minccino/pkg/coordinator/clients/consensus"
"github.com/ethpandaops/minccino/pkg/coordinator/clients/execution"
"github.com/ethpandaops/minccino/pkg/coordinator/task/types"
"github.com/ethpandaops/minccino/pkg/coordinator/types"
"github.com/imdario/mergo"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -96,7 +96,7 @@ func (t *Task) processCheck(ctx context.Context) error {
expectedResult := !t.config.ExpectUnhealthy
allResultsPass := true
failedClients := []string{}
for _, client := range t.ctx.Scheduler.GetClientPool().GetClientsByNamePatterns(t.config.ClientNamePatterns) {
for _, client := range t.ctx.Scheduler.GetCoordinator().ClientPool().GetClientsByNamePatterns(t.config.ClientNamePatterns) {
checkResult := t.processClientCheck(ctx, client)
if checkResult != expectedResult {
allResultsPass = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

"github.com/ethpandaops/minccino/pkg/coordinator/clients"
"github.com/ethpandaops/minccino/pkg/coordinator/task/types"
"github.com/ethpandaops/minccino/pkg/coordinator/types"
"github.com/imdario/mergo"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -95,7 +95,7 @@ func (t *Task) Execute(ctx context.Context) error {
func (t *Task) processCheck(ctx context.Context) error {
allResultsPass := true
failedClients := []string{}
for _, client := range t.ctx.Scheduler.GetClientPool().GetClientsByNamePatterns(t.config.ClientNamePatterns) {
for _, client := range t.ctx.Scheduler.GetCoordinator().ClientPool().GetClientsByNamePatterns(t.config.ClientNamePatterns) {
checkResult := t.processClientCheck(ctx, client)
if !checkResult {
allResultsPass = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

"github.com/ethpandaops/minccino/pkg/coordinator/clients"
"github.com/ethpandaops/minccino/pkg/coordinator/task/types"
"github.com/ethpandaops/minccino/pkg/coordinator/types"
"github.com/imdario/mergo"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -95,7 +95,7 @@ func (t *Task) Execute(ctx context.Context) error {
func (t *Task) processCheck(ctx context.Context) error {
allResultsPass := true
failedClients := []string{}
for _, client := range t.ctx.Scheduler.GetClientPool().GetClientsByNamePatterns(t.config.ClientNamePatterns) {
for _, client := range t.ctx.Scheduler.GetCoordinator().ClientPool().GetClientsByNamePatterns(t.config.ClientNamePatterns) {
checkResult := t.processClientCheck(ctx, client)
if !checkResult {
allResultsPass = false
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"os/exec"
"time"

"github.com/ethpandaops/minccino/pkg/coordinator/task/types"
"github.com/ethpandaops/minccino/pkg/coordinator/types"
"github.com/imdario/mergo"
"github.com/sirupsen/logrus"
)
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"time"

"github.com/ethpandaops/minccino/pkg/coordinator/task/types"
"github.com/ethpandaops/minccino/pkg/coordinator/types"
"github.com/imdario/mergo"
"github.com/sirupsen/logrus"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"sync"
"time"

"github.com/ethpandaops/minccino/pkg/coordinator/task/types"
"github.com/ethpandaops/minccino/pkg/coordinator/types"
"github.com/imdario/mergo"
"github.com/sirupsen/logrus"
)
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"time"

"github.com/ethpandaops/minccino/pkg/coordinator/task/types"
"github.com/ethpandaops/minccino/pkg/coordinator/types"
"github.com/imdario/mergo"
"github.com/sirupsen/logrus"
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package tasks

import (
"github.com/ethpandaops/minccino/pkg/coordinator/task/types"
"github.com/ethpandaops/minccino/pkg/coordinator/types"

checkclientsarehealthy "github.com/ethpandaops/minccino/pkg/coordinator/task/tasks/check_clients_are_healthy"
checkconsensussyncstatus "github.com/ethpandaops/minccino/pkg/coordinator/task/tasks/check_consensus_sync_status"
checkexecutionsyncstatus "github.com/ethpandaops/minccino/pkg/coordinator/task/tasks/check_execution_sync_status"
runcommand "github.com/ethpandaops/minccino/pkg/coordinator/task/tasks/run_command"
runtasks "github.com/ethpandaops/minccino/pkg/coordinator/task/tasks/run_tasks"
runtasksconcurrent "github.com/ethpandaops/minccino/pkg/coordinator/task/tasks/run_tasks_concurrent"
sleep "github.com/ethpandaops/minccino/pkg/coordinator/task/tasks/sleep"
checkclientsarehealthy "github.com/ethpandaops/minccino/pkg/coordinator/tasks/check_clients_are_healthy"
checkconsensussyncstatus "github.com/ethpandaops/minccino/pkg/coordinator/tasks/check_consensus_sync_status"
checkexecutionsyncstatus "github.com/ethpandaops/minccino/pkg/coordinator/tasks/check_execution_sync_status"
runcommand "github.com/ethpandaops/minccino/pkg/coordinator/tasks/run_command"
runtasks "github.com/ethpandaops/minccino/pkg/coordinator/tasks/run_tasks"
runtasksconcurrent "github.com/ethpandaops/minccino/pkg/coordinator/tasks/run_tasks_concurrent"
sleep "github.com/ethpandaops/minccino/pkg/coordinator/tasks/sleep"
)

var AvailableTaskDescriptors = []*types.TaskDescriptor{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package scheduler
package test

import (
"context"
"fmt"
"sync"
"time"

"github.com/ethpandaops/minccino/pkg/coordinator/clients"
"github.com/ethpandaops/minccino/pkg/coordinator/helper"
"github.com/ethpandaops/minccino/pkg/coordinator/task/tasks"
"github.com/ethpandaops/minccino/pkg/coordinator/task/types"
"github.com/ethpandaops/minccino/pkg/coordinator/tasks"
"github.com/ethpandaops/minccino/pkg/coordinator/types"
"github.com/sirupsen/logrus"
)

type TaskScheduler struct {
coordinator types.Coordinator
logger logrus.FieldLogger
taskCount uint64
allTasks []types.Task
Expand All @@ -22,8 +22,6 @@ type TaskScheduler struct {
rootCleanupTasks []types.Task
taskStateMutex sync.RWMutex
taskStateMap map[types.Task]*taskExecutionState

clientPool *clients.ClientPool
}

type taskExecutionState struct {
Expand All @@ -37,13 +35,13 @@ type taskExecutionState struct {
resultMutex sync.RWMutex
}

func NewTaskScheduler(logger logrus.FieldLogger, clientPool *clients.ClientPool) *TaskScheduler {
func NewTaskScheduler(logger logrus.FieldLogger, coordinator types.Coordinator) *TaskScheduler {
return &TaskScheduler{
logger: logger,
rootTasks: make([]types.Task, 0),
allTasks: make([]types.Task, 0),
taskStateMap: make(map[types.Task]*taskExecutionState),
clientPool: clientPool,
coordinator: coordinator,
}
}

Expand All @@ -55,8 +53,8 @@ func (ts *TaskScheduler) GetTaskCount() int {
return len(ts.allTasks)
}

func (ts *TaskScheduler) GetClientPool() *clients.ClientPool {
return ts.clientPool
func (ts *TaskScheduler) GetCoordinator() types.Coordinator {
return ts.coordinator
}

func (ts *TaskScheduler) ParseTaskOptions(rawtask *helper.RawMessage) (*types.TaskOptions, error) {
Expand Down
36 changes: 14 additions & 22 deletions pkg/coordinator/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,67 +5,59 @@ import (
"fmt"
"time"

"github.com/ethpandaops/minccino/pkg/coordinator/clients"
"github.com/ethpandaops/minccino/pkg/coordinator/task/scheduler"
"github.com/ethpandaops/minccino/pkg/coordinator/types"
"github.com/sirupsen/logrus"
)

type Runnable interface {
Validate() error
Run(ctx context.Context) error
Name() string
Percent() float64
}

type Test struct {
name string
taskScheduler *scheduler.TaskScheduler
taskScheduler *TaskScheduler

log logrus.FieldLogger
config Config

metrics Metrics
}

func CreateRunnable(ctx context.Context, log logrus.FieldLogger, clientPool *clients.ClientPool, config Config) (Runnable, error) {
runnable := &Test{
func CreateRunnable(ctx context.Context, coordinator types.Coordinator, config Config) (types.Test, error) {
test := &Test{
name: config.Name,
log: log.WithField("component", "test").WithField("test", config.Name),
log: coordinator.Logger().WithField("component", "test").WithField("test", config.Name),
config: config,
metrics: NewMetrics("sync_test_coordinator", config.Name),
}

// parse tasks
runnable.taskScheduler = scheduler.NewTaskScheduler(runnable.log, clientPool)
test.taskScheduler = NewTaskScheduler(test.log, coordinator)
for _, rawtask := range config.Tasks {
taskOptions, err := runnable.taskScheduler.ParseTaskOptions(&rawtask)
taskOptions, err := test.taskScheduler.ParseTaskOptions(&rawtask)
if err != nil {
return nil, err
}
_, err = runnable.taskScheduler.AddRootTask(taskOptions)
_, err = test.taskScheduler.AddRootTask(taskOptions)
if err != nil {
return nil, err
}
}

for _, rawtask := range config.CleanupTasks {
taskOptions, err := runnable.taskScheduler.ParseTaskOptions(&rawtask)
taskOptions, err := test.taskScheduler.ParseTaskOptions(&rawtask)
if err != nil {
return nil, err
}
_, err = runnable.taskScheduler.AddCleanupTask(taskOptions)
_, err = test.taskScheduler.AddCleanupTask(taskOptions)
if err != nil {
return nil, err
}
}

// setup metrics
runnable.metrics.Register()
test.metrics.Register()

runnable.metrics.SetTestInfo(config.Name)
runnable.metrics.SetTotalTasks(float64(len(config.Tasks)))
test.metrics.SetTestInfo(config.Name)
test.metrics.SetTotalTasks(float64(len(config.Tasks)))

return runnable, nil
return test, nil
}

func (t *Test) Name() string {
Expand Down
12 changes: 12 additions & 0 deletions pkg/coordinator/types/coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package types

import (
"github.com/ethpandaops/minccino/pkg/coordinator/clients"
"github.com/sirupsen/logrus"
)

type Coordinator interface {
Logger() logrus.FieldLogger
ClientPool() *clients.ClientPool
GetTests() []Test
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package types
import (
"context"

"github.com/ethpandaops/minccino/pkg/coordinator/clients"
"github.com/ethpandaops/minccino/pkg/coordinator/helper"
"github.com/sirupsen/logrus"
)

type TaskScheduler interface {
GetLogger() logrus.FieldLogger
GetClientPool() *clients.ClientPool
GetCoordinator() Coordinator
ParseTaskOptions(rawtask *helper.RawMessage) (*TaskOptions, error)
ExecuteTask(ctx context.Context, task Task, taskWatchFn func(task Task, ctx context.Context, cancelFn context.CancelFunc)) error
WatchTaskPass(task Task, ctx context.Context, cancelFn context.CancelFunc)
Expand Down
10 changes: 10 additions & 0 deletions pkg/coordinator/types/test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package types

import "context"

type Test interface {
Validate() error
Run(ctx context.Context) error
Name() string
Percent() float64
}
Loading

0 comments on commit 33c40c6

Please sign in to comment.