Skip to content

Commit

Permalink
implemented registration of CRDs
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Shmulevich <[email protected]>
  • Loading branch information
dmitsh committed May 14, 2024
1 parent a6f0d0c commit 2f8656e
Show file tree
Hide file tree
Showing 15 changed files with 334 additions and 185 deletions.
8 changes: 4 additions & 4 deletions pkg/engine/check_object_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type CheckObjTask struct {
}

// newCheckObjTask initializes and returns CheckObjTask
func newCheckObjTask(log logr.Logger, client *dynamic.DynamicClient, getter ObjGetter, cfg *config.Task) (*CheckObjTask, error) {
func newCheckObjTask(log logr.Logger, client *dynamic.DynamicClient, accessor ObjInfoAccessor, cfg *config.Task) (*CheckObjTask, error) {
if client == nil {
return nil, fmt.Errorf("%s/%s: DynamicClient is not set", cfg.Type, cfg.ID)
}
Expand All @@ -49,8 +49,8 @@ func newCheckObjTask(log logr.Logger, client *dynamic.DynamicClient, getter ObjG
taskType: cfg.Type,
taskID: cfg.ID,
},
client: client,
getter: getter,
client: client,
accessor: accessor,
},
}

Expand All @@ -63,7 +63,7 @@ func newCheckObjTask(log logr.Logger, client *dynamic.DynamicClient, getter ObjG

// Exec implements Runnable interface
func (task *CheckObjTask) Exec(ctx context.Context) error {
info, err := task.getter.GetObjInfo(task.RefTaskID)
info, err := task.accessor.GetObjInfo(task.RefTaskID)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/engine/check_object_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestNewCheckObjTask(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
require.NoError(t, err)
if len(tc.refTaskId) != 0 {
eng.objMap[tc.refTaskId] = nil
eng.objInfoMap[tc.refTaskId] = nil
}

task, err := eng.GetTask(&config.Task{
Expand All @@ -102,7 +102,7 @@ func TestNewCheckObjTask(t *testing.T) {
require.EqualError(t, err, tc.err)
require.Nil(t, tc.task)
} else {
tc.task.getter = eng
tc.task.accessor = eng
require.NoError(t, err)
require.NotNil(t, tc.task)
require.Equal(t, tc.task, task)
Expand Down
35 changes: 25 additions & 10 deletions pkg/engine/check_pod_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package engine
import (
"context"
"fmt"
"regexp"
"time"

"github.com/go-logr/logr"
Expand All @@ -41,8 +42,8 @@ type CheckPodTask struct {
BaseTask
checkPodTaskParams

client *kubernetes.Clientset
getter ObjGetter
client *kubernetes.Clientset
accessor ObjInfoAccessor
}

type checkPodTaskParams struct {
Expand All @@ -53,7 +54,7 @@ type checkPodTaskParams struct {
}

// newCheckPodTask initializes and returns CheckPodTask
func newCheckPodTask(log logr.Logger, client *kubernetes.Clientset, getter ObjGetter, cfg *config.Task) (*CheckPodTask, error) {
func newCheckPodTask(log logr.Logger, client *kubernetes.Clientset, accessor ObjInfoAccessor, cfg *config.Task) (*CheckPodTask, error) {
if client == nil {
return nil, fmt.Errorf("%s/%s: Kubernetes client is not set", cfg.Type, cfg.ID)
}
Expand All @@ -64,8 +65,8 @@ func newCheckPodTask(log logr.Logger, client *kubernetes.Clientset, getter ObjGe
taskType: cfg.Type,
taskID: cfg.ID,
},
client: client,
getter: getter,
client: client,
accessor: accessor,
}

if err := task.validate(cfg.Params); err != nil {
Expand Down Expand Up @@ -98,13 +99,13 @@ func (task *CheckPodTask) validate(params map[string]interface{}) error {

// Exec implements Runnable interface
func (task *CheckPodTask) Exec(ctx context.Context) error {
info, err := task.getter.GetObjInfo(task.RefTaskID)
info, err := task.accessor.GetObjInfo(task.RefTaskID)
if err != nil {
return err
}

if len(info.Pods) == 0 {
return nil
return fmt.Errorf("%s: no pods to check", task.ID())
}

if task.Timeout == 0 {
Expand Down Expand Up @@ -141,9 +142,23 @@ func (task *CheckPodTask) watchPods(ctx context.Context, info *ObjInfo) error {
ctx, cancel := context.WithTimeout(ctx, task.Timeout)
defer cancel()

re := make([]*regexp.Regexp, len(info.Pods))
for i, pod := range info.Pods {
re[i] = regexp.MustCompile(pod)
}

podMap := utils.NewSyncMap()
for _, pod := range info.Pods {
podMap.Set(pod, true)
list, err := task.client.CoreV1().Pods(info.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("%s: failed to list pods: %v", task.ID(), err)
}
for i := range list.Items {
for _, r := range re {
if r.MatchString(list.Items[i].Name) {
task.log.V(4).Info("Added pod", "name", list.Items[i].Name)
podMap.Set(list.Items[i].Name, true)
}
}
}

errs := make(chan error)
Expand All @@ -153,7 +168,7 @@ func (task *CheckPodTask) watchPods(ctx context.Context, info *ObjInfo) error {

informer := factory.Core().V1().Pods().Informer()

_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
task.verifyPod(ctx, podMap, obj, errs)
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/check_pod_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestCheckPodParams(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
require.NoError(t, err)
if len(tc.refTaskId) != 0 {
eng.objMap[tc.refTaskId] = nil
eng.objInfoMap[tc.refTaskId] = nil
}
task, err := eng.GetTask(&config.Task{
ID: taskID,
Expand Down
4 changes: 2 additions & 2 deletions pkg/engine/delete_object_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ type DeleteObjTask struct {
deleteObjTaskParams

client *dynamic.DynamicClient
getter ObjGetter
getter ObjInfoAccessor
}

type deleteObjTaskParams struct {
RefTaskID string `yaml:"refTaskId"`
}

// newDeleteObjTask initializes and returns DeleteObjTask
func newDeleteObjTask(log logr.Logger, client *dynamic.DynamicClient, getter ObjGetter, cfg *config.Task) (*DeleteObjTask, error) {
func newDeleteObjTask(log logr.Logger, client *dynamic.DynamicClient, getter ObjInfoAccessor, cfg *config.Task) (*DeleteObjTask, error) {
if client == nil {
return nil, fmt.Errorf("%s/%s: DynamicClient is not set", cfg.Type, cfg.ID)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/delete_object_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestNewDeleteObjTask(t *testing.T) {
eng, err := New(testLogger, nil, tc.simClients)
require.NoError(t, err)
if len(tc.refTaskId) != 0 {
eng.objMap[tc.refTaskId] = nil
eng.objInfoMap[tc.refTaskId] = nil
}

task, err := eng.GetTask(&config.Task{
Expand Down
93 changes: 67 additions & 26 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/go-logr/logr"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -36,30 +37,37 @@ type Engine interface {
}

type Eng struct {
log logr.Logger
mutex sync.Mutex
k8sClient *kubernetes.Clientset
dynamicClient *dynamic.DynamicClient
objMap map[string]*ObjInfo
log logr.Logger
mutex sync.Mutex
k8sClient *kubernetes.Clientset
dynamicClient *dynamic.DynamicClient
discoveryClient *discovery.DiscoveryClient
objTypeMap map[string]*RegisterObjParams
objInfoMap map[string]*ObjInfo
}

func New(log logr.Logger, config *rest.Config, sim ...bool) (*Eng, error) {
eng := &Eng{
log: log,
objMap: make(map[string]*ObjInfo),
log: log,
objTypeMap: make(map[string]*RegisterObjParams),
objInfoMap: make(map[string]*ObjInfo),
}

if len(sim) == 0 { // len(sim) != 0 in unit tests
var err error
if eng.k8sClient, err = kubernetes.NewForConfig(config); err != nil {
return nil, err
}
if eng.dynamicClient, err = dynamic.NewForConfig(config); err != nil {
return nil, err
}
if eng.k8sClient, err = kubernetes.NewForConfig(config); err != nil {
if eng.discoveryClient, err = discovery.NewDiscoveryClientForConfig(config); err != nil {
return nil, err
}
} else if sim[0] {
eng.dynamicClient = &dynamic.DynamicClient{}
eng.k8sClient = &kubernetes.Clientset{}
eng.dynamicClient = &dynamic.DynamicClient{}
eng.discoveryClient = &discovery.DiscoveryClient{}
}

return eng, nil
Expand Down Expand Up @@ -98,56 +106,57 @@ func (eng *Eng) GetTask(cfg *config.Task) (Runnable, error) {

eng.log.Info("Creating task", "name", cfg.Type, "id", cfg.ID)
switch cfg.Type {
case TaskRegisterObj:
return newRegisterObjTask(eng.log, eng.discoveryClient, eng, cfg)

case TaskSubmitObj:
task, err := newSubmitObjTask(eng.log, eng.dynamicClient, eng, cfg)
if err != nil {
return nil, err
}
return task, nil
return newSubmitObjTask(eng.log, eng.dynamicClient, eng, cfg)

case TaskUpdateObj:
task, err := newUpdateObjTask(eng.log, eng.dynamicClient, eng, cfg)
if err != nil {
return nil, err
}
if _, ok := eng.objMap[task.RefTaskID]; !ok {
if _, ok := eng.objInfoMap[task.RefTaskID]; !ok {
return nil, fmt.Errorf("%s: unreferenced task ID %s", task.ID(), task.RefTaskID)
}
return task, nil

case TaskCheckObj:
task, err := newCheckObjTask(eng.log, eng.dynamicClient, eng, cfg)
if err != nil {
return nil, err
}
if _, ok := eng.objMap[task.RefTaskID]; !ok {
if _, ok := eng.objInfoMap[task.RefTaskID]; !ok {
return nil, fmt.Errorf("%s: unreferenced task ID %s", task.ID(), task.RefTaskID)
}
return task, nil

case TaskDeleteObj:
task, err := newDeleteObjTask(eng.log, eng.dynamicClient, eng, cfg)
if err != nil {
return nil, err
}
if _, ok := eng.objMap[task.RefTaskID]; !ok {
if _, ok := eng.objInfoMap[task.RefTaskID]; !ok {
return nil, fmt.Errorf("%s: unreferenced task ID %s", task.ID(), task.RefTaskID)
}
return task, nil

case TaskUpdateNodes:
return newUpdateNodesTask(eng.log, eng.k8sClient, cfg)

case TaskCheckPod:
task, err := newCheckPodTask(eng.log, eng.k8sClient, eng, cfg)
if err != nil {
return nil, err
}
if _, ok := eng.objMap[task.RefTaskID]; !ok {
if _, ok := eng.objInfoMap[task.RefTaskID]; !ok {
return nil, fmt.Errorf("%s: unreferenced task ID %s", task.ID(), task.RefTaskID)
}
return task, nil

case TaskSleep:
task, err := newSleepTask(eng.log, cfg)
if err != nil {
return nil, err
}
return task, nil
return newSleepTask(eng.log, cfg)

case TaskPause:
return newPauseTask(eng.log, cfg), nil
Expand All @@ -157,16 +166,48 @@ func (eng *Eng) GetTask(cfg *config.Task) (Runnable, error) {
}
}

// SetObjType implements ObjSetter interface and maps object type to RegisterObjParams
func (eng *Eng) SetObjType(params *RegisterObjParams) error {
eng.mutex.Lock()
defer eng.mutex.Unlock()

objType := params.ObjectType()
if _, ok := eng.objTypeMap[objType]; ok {
return fmt.Errorf("SetObjType: duplicate object type %s", objType)
}

eng.objTypeMap[objType] = params

eng.log.V(4).Info("Setting object type", "name", objType)

return nil
}

// GetObjType implements ObjGetter interface returns RegisterObjParams for given object type
func (eng *Eng) GetObjType(objType string) (*RegisterObjParams, error) {
eng.mutex.Lock()
defer eng.mutex.Unlock()

info, ok := eng.objTypeMap[objType]
if !ok {
return nil, fmt.Errorf("GetObjType: missing object type %s", objType)
}

eng.log.V(4).Info("Getting object type", "name", objType)

return info, nil
}

// SetObjInfo implements ObjSetter interface and maps task ID to the corresponding ObjInfo
func (eng *Eng) SetObjInfo(taskID string, info *ObjInfo) error {
eng.mutex.Lock()
defer eng.mutex.Unlock()

if _, ok := eng.objMap[taskID]; ok {
if _, ok := eng.objInfoMap[taskID]; ok {
return fmt.Errorf("SetObjInfo: duplicate task ID %s", taskID)
}

eng.objMap[taskID] = info
eng.objInfoMap[taskID] = info

eng.log.V(4).Info("Setting task info", "taskID", taskID)

Expand All @@ -178,7 +219,7 @@ func (eng *Eng) GetObjInfo(taskID string) (*ObjInfo, error) {
eng.mutex.Lock()
defer eng.mutex.Unlock()

info, ok := eng.objMap[taskID]
info, ok := eng.objInfoMap[taskID]
if !ok {
return nil, fmt.Errorf("GetObjInfo: missing task ID %s", taskID)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/engine/object_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type ObjStateTask struct {
BaseTask
StateParams

client *dynamic.DynamicClient
getter ObjGetter
client *dynamic.DynamicClient
accessor ObjInfoAccessor
}

// validate initializes and validates parameters for ObjStateTask
Expand Down
Loading

0 comments on commit 2f8656e

Please sign in to comment.