From 290626c4840eb4eea7246d83b0fc3e196e8523f5 Mon Sep 17 00:00:00 2001 From: Jian Qiu Date: Mon, 23 Dec 2024 22:05:24 +0800 Subject: [PATCH] feature: hashed worker manager (#21886) Co-authored-by: Qiu Jian --- pkg/appsrv/hashedworkermanager.go | 56 +++++++++++++++++++++++ pkg/cloudcommon/db/taskman/coordinator.go | 19 +++++++- pkg/cloudcommon/db/taskman/tasks.go | 4 +- pkg/cloudcommon/db/taskman/worker.go | 36 ++++++++------- 4 files changed, 95 insertions(+), 20 deletions(-) create mode 100644 pkg/appsrv/hashedworkermanager.go diff --git a/pkg/appsrv/hashedworkermanager.go b/pkg/appsrv/hashedworkermanager.go new file mode 100644 index 00000000000..df4cecb3743 --- /dev/null +++ b/pkg/appsrv/hashedworkermanager.go @@ -0,0 +1,56 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package appsrv + +import ( + "fmt" + + "github.com/serialx/hashring" + + "yunion.io/x/pkg/util/stringutils" +) + +type SHashedWorkerManager struct { + workers []*SWorkerManager + workerRing *hashring.HashRing + indexMap map[string]int +} + +func NewHashWorkerManager(name string, workerCount int, subWorkerCnt int, backlog int, dbWorker bool) *SHashedWorkerManager { + workers := make([]*SWorkerManager, workerCount) + syncWorkerIndexes := make([]string, workerCount) + indexMap := map[string]int{} + for i := range workers { + workers[i] = NewWorkerManager( + fmt.Sprintf("%s-%d", name, i+1), + subWorkerCnt, + backlog, + dbWorker, + ) + syncWorkerIndexes[i] = stringutils.UUID4() + indexMap[syncWorkerIndexes[i]] = i + } + workerRing := hashring.New(syncWorkerIndexes) + return &SHashedWorkerManager{ + workers: workers, + workerRing: workerRing, + indexMap: indexMap, + } +} + +func (man *SHashedWorkerManager) GetWorkerManager(key string) *SWorkerManager { + nodeIdxStr, _ := man.workerRing.GetNode(key) + return man.workers[man.indexMap[nodeIdxStr]] +} diff --git a/pkg/cloudcommon/db/taskman/coordinator.go b/pkg/cloudcommon/db/taskman/coordinator.go index fd4cbd3f6b8..3c4094f9ef7 100644 --- a/pkg/cloudcommon/db/taskman/coordinator.go +++ b/pkg/cloudcommon/db/taskman/coordinator.go @@ -17,6 +17,7 @@ package taskman import ( "context" "reflect" + "sync" "yunion.io/x/jsonutils" "yunion.io/x/log" @@ -26,6 +27,14 @@ import ( "yunion.io/x/onecloud/pkg/cloudcommon/db" ) +var taskWorkerMap map[string]interface{} +var taskWorkManLock *sync.Mutex + +func init() { + taskWorkerMap = make(map[string]interface{}) + taskWorkManLock = &sync.Mutex{} +} + /*type TaskStageFunc func(ctx context.Context, obj db.IStandaloneModel, body jsonutils.JSONObject) type BatchTaskStageFunc func(ctx context.Context, objs []db.IStandaloneModel, body jsonutils.JSONObject) */ @@ -53,6 +62,14 @@ func init() { } func RegisterTaskAndWorker(task interface{}, workerMan *appsrv.SWorkerManager) { + registerTaskAndWorkerMan(task, workerMan) +} + +func RegisterTaskAndHashedWorkerManager(task interface{}, workerMan *appsrv.SHashedWorkerManager) { + registerTaskAndWorkerMan(task, workerMan) +} + +func registerTaskAndWorkerMan(task interface{}, workerMan interface{}) { taskName := gotypes.GetInstanceTypeName(task) if _, ok := taskTable[taskName]; ok { log.Fatalf("Task %s already registered!", taskName) @@ -61,7 +78,7 @@ func RegisterTaskAndWorker(task interface{}, workerMan *appsrv.SWorkerManager) { taskTable[taskName] = taskType // log.Infof("Task %s registerd", taskName) if workerMan != nil { - taskWorkerTable[taskName] = workerMan + taskWorkerMap[taskName] = workerMan } } diff --git a/pkg/cloudcommon/db/taskman/tasks.go b/pkg/cloudcommon/db/taskman/tasks.go index dbe1e39642c..80d381f45f9 100644 --- a/pkg/cloudcommon/db/taskman/tasks.go +++ b/pkg/cloudcommon/db/taskman/tasks.go @@ -460,13 +460,13 @@ func (manager *STaskManager) fetchTask(idStr string) *STask { return task } -func (manager *STaskManager) getTaskName(taskId string) string { +/*func (manager *STaskManager) getTaskName(taskId string) string { baseTask := manager.fetchTask(taskId) if baseTask == nil { return "" } return baseTask.TaskName -} +}*/ func (manager *STaskManager) execTask(taskId string, data jsonutils.JSONObject) { baseTask := manager.fetchTask(taskId) diff --git a/pkg/cloudcommon/db/taskman/worker.go b/pkg/cloudcommon/db/taskman/worker.go index d7267d322a4..a3de6951000 100644 --- a/pkg/cloudcommon/db/taskman/worker.go +++ b/pkg/cloudcommon/db/taskman/worker.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "runtime/debug" - "sync" "yunion.io/x/jsonutils" "yunion.io/x/log" @@ -30,18 +29,24 @@ import ( ) var _taskWorkMan *appsrv.SWorkerManager -var taskWorkerTable map[string]*appsrv.SWorkerManager -var taskWorkManLock *sync.Mutex -func init() { - taskWorkerTable = make(map[string]*appsrv.SWorkerManager) - taskWorkManLock = &sync.Mutex{} -} - -func getTaskWorkMan() *appsrv.SWorkerManager { +func getTaskWorkMan(task *STask) *appsrv.SWorkerManager { taskWorkManLock.Lock() defer taskWorkManLock.Unlock() + if worker, ok := taskWorkerMap[task.TaskName]; ok { + switch workerMan := worker.(type) { + case *appsrv.SWorkerManager: + return workerMan + case *appsrv.SHashedWorkerManager: + key := task.ObjId + if key == MULTI_OBJECTS_ID { + key = task.TaskName + } + return workerMan.GetWorkerManager(key) + } + } + if _taskWorkMan != nil { return _taskWorkMan } @@ -72,14 +77,11 @@ func (t *taskTask) Dump() string { } func runTask(taskId string, data jsonutils.JSONObject) error { - taskName := TaskManager.getTaskName(taskId) - if len(taskName) == 0 { + baseTask := TaskManager.fetchTask(taskId) + if baseTask == nil { return fmt.Errorf("no such task??? task_id=%s", taskId) } - worker := getTaskWorkMan() - if workerMan, ok := taskWorkerTable[taskName]; ok { - worker = workerMan - } + worker := getTaskWorkMan(baseTask) task := &taskTask{ taskId: taskId, @@ -88,14 +90,14 @@ func runTask(taskId string, data jsonutils.JSONObject) error { isOk := worker.Run(task, nil, func(err error) { data := jsonutils.NewDict() - data.Add(jsonutils.NewString(taskName), "task_name") + data.Add(jsonutils.NewString(baseTask.TaskName), "task_name") data.Add(jsonutils.NewString(taskId), "task_id") data.Add(jsonutils.NewString(string(debug.Stack())), "stack") data.Add(jsonutils.NewString(err.Error()), "error") notifyclient.SystemExceptionNotify(context.TODO(), api.ActionSystemPanic, api.TOPIC_RESOURCE_TASK, data) }) if !isOk { - return fmt.Errorf("worker %s(%s) not running may be droped", taskName, taskId) + return fmt.Errorf("worker %s(%s) not running may be dropped", baseTask.TaskName, taskId) } return nil }