Skip to content

Commit

Permalink
feature: hashed worker manager (#21886)
Browse files Browse the repository at this point in the history
Co-authored-by: Qiu Jian <[email protected]>
  • Loading branch information
swordqiu and Qiu Jian authored Dec 23, 2024
1 parent 62d63cc commit 290626c
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 20 deletions.
56 changes: 56 additions & 0 deletions pkg/appsrv/hashedworkermanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package appsrv

import (
"fmt"

"github.com/serialx/hashring"

"yunion.io/x/pkg/util/stringutils"
)

type SHashedWorkerManager struct {
workers []*SWorkerManager
workerRing *hashring.HashRing
indexMap map[string]int
}

func NewHashWorkerManager(name string, workerCount int, subWorkerCnt int, backlog int, dbWorker bool) *SHashedWorkerManager {
workers := make([]*SWorkerManager, workerCount)
syncWorkerIndexes := make([]string, workerCount)
indexMap := map[string]int{}
for i := range workers {
workers[i] = NewWorkerManager(
fmt.Sprintf("%s-%d", name, i+1),
subWorkerCnt,
backlog,
dbWorker,
)
syncWorkerIndexes[i] = stringutils.UUID4()
indexMap[syncWorkerIndexes[i]] = i
}
workerRing := hashring.New(syncWorkerIndexes)
return &SHashedWorkerManager{
workers: workers,
workerRing: workerRing,
indexMap: indexMap,
}
}

func (man *SHashedWorkerManager) GetWorkerManager(key string) *SWorkerManager {
nodeIdxStr, _ := man.workerRing.GetNode(key)
return man.workers[man.indexMap[nodeIdxStr]]
}
19 changes: 18 additions & 1 deletion pkg/cloudcommon/db/taskman/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package taskman
import (
"context"
"reflect"
"sync"

"yunion.io/x/jsonutils"
"yunion.io/x/log"
Expand All @@ -26,6 +27,14 @@ import (
"yunion.io/x/onecloud/pkg/cloudcommon/db"
)

var taskWorkerMap map[string]interface{}
var taskWorkManLock *sync.Mutex

func init() {
taskWorkerMap = make(map[string]interface{})
taskWorkManLock = &sync.Mutex{}
}

/*type TaskStageFunc func(ctx context.Context, obj db.IStandaloneModel, body jsonutils.JSONObject)
type BatchTaskStageFunc func(ctx context.Context, objs []db.IStandaloneModel, body jsonutils.JSONObject)
*/
Expand Down Expand Up @@ -53,6 +62,14 @@ func init() {
}

func RegisterTaskAndWorker(task interface{}, workerMan *appsrv.SWorkerManager) {
registerTaskAndWorkerMan(task, workerMan)
}

func RegisterTaskAndHashedWorkerManager(task interface{}, workerMan *appsrv.SHashedWorkerManager) {
registerTaskAndWorkerMan(task, workerMan)
}

func registerTaskAndWorkerMan(task interface{}, workerMan interface{}) {
taskName := gotypes.GetInstanceTypeName(task)
if _, ok := taskTable[taskName]; ok {
log.Fatalf("Task %s already registered!", taskName)
Expand All @@ -61,7 +78,7 @@ func RegisterTaskAndWorker(task interface{}, workerMan *appsrv.SWorkerManager) {
taskTable[taskName] = taskType
// log.Infof("Task %s registerd", taskName)
if workerMan != nil {
taskWorkerTable[taskName] = workerMan
taskWorkerMap[taskName] = workerMan
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudcommon/db/taskman/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,13 +460,13 @@ func (manager *STaskManager) fetchTask(idStr string) *STask {
return task
}

func (manager *STaskManager) getTaskName(taskId string) string {
/*func (manager *STaskManager) getTaskName(taskId string) string {
baseTask := manager.fetchTask(taskId)
if baseTask == nil {
return ""
}
return baseTask.TaskName
}
}*/

func (manager *STaskManager) execTask(taskId string, data jsonutils.JSONObject) {
baseTask := manager.fetchTask(taskId)
Expand Down
36 changes: 19 additions & 17 deletions pkg/cloudcommon/db/taskman/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"runtime/debug"
"sync"

"yunion.io/x/jsonutils"
"yunion.io/x/log"
Expand All @@ -30,18 +29,24 @@ import (
)

var _taskWorkMan *appsrv.SWorkerManager
var taskWorkerTable map[string]*appsrv.SWorkerManager
var taskWorkManLock *sync.Mutex

func init() {
taskWorkerTable = make(map[string]*appsrv.SWorkerManager)
taskWorkManLock = &sync.Mutex{}
}

func getTaskWorkMan() *appsrv.SWorkerManager {
func getTaskWorkMan(task *STask) *appsrv.SWorkerManager {
taskWorkManLock.Lock()
defer taskWorkManLock.Unlock()

if worker, ok := taskWorkerMap[task.TaskName]; ok {
switch workerMan := worker.(type) {
case *appsrv.SWorkerManager:
return workerMan
case *appsrv.SHashedWorkerManager:
key := task.ObjId
if key == MULTI_OBJECTS_ID {
key = task.TaskName
}
return workerMan.GetWorkerManager(key)
}
}

if _taskWorkMan != nil {
return _taskWorkMan
}
Expand Down Expand Up @@ -72,14 +77,11 @@ func (t *taskTask) Dump() string {
}

func runTask(taskId string, data jsonutils.JSONObject) error {
taskName := TaskManager.getTaskName(taskId)
if len(taskName) == 0 {
baseTask := TaskManager.fetchTask(taskId)
if baseTask == nil {
return fmt.Errorf("no such task??? task_id=%s", taskId)
}
worker := getTaskWorkMan()
if workerMan, ok := taskWorkerTable[taskName]; ok {
worker = workerMan
}
worker := getTaskWorkMan(baseTask)

task := &taskTask{
taskId: taskId,
Expand All @@ -88,14 +90,14 @@ func runTask(taskId string, data jsonutils.JSONObject) error {

isOk := worker.Run(task, nil, func(err error) {
data := jsonutils.NewDict()
data.Add(jsonutils.NewString(taskName), "task_name")
data.Add(jsonutils.NewString(baseTask.TaskName), "task_name")
data.Add(jsonutils.NewString(taskId), "task_id")
data.Add(jsonutils.NewString(string(debug.Stack())), "stack")
data.Add(jsonutils.NewString(err.Error()), "error")
notifyclient.SystemExceptionNotify(context.TODO(), api.ActionSystemPanic, api.TOPIC_RESOURCE_TASK, data)
})
if !isOk {
return fmt.Errorf("worker %s(%s) not running may be droped", taskName, taskId)
return fmt.Errorf("worker %s(%s) not running may be dropped", baseTask.TaskName, taskId)
}
return nil
}

0 comments on commit 290626c

Please sign in to comment.