From 3ff493169e468cdbbcac95a64aba950b88027f7e Mon Sep 17 00:00:00 2001 From: Zexi Li Date: Mon, 18 Nov 2024 17:18:26 +0800 Subject: [PATCH] fix(region): ContainerBatchStopTask and ContainerBatchStartTask (#21617) --- pkg/cloudcommon/db/taskman/subtasks.go | 1 + pkg/cloudcommon/db/taskman/tasks.go | 5 +- pkg/compute/models/containers.go | 29 ++++++++++ .../tasks/container_batch_start_task.go | 49 +++++++++++++++++ .../tasks/container_batch_stop_task.go | 54 +++++++++++++++++++ pkg/compute/tasks/pod_start_task.go | 12 ++--- pkg/compute/tasks/pod_stop_task.go | 24 +++------ 7 files changed, 147 insertions(+), 27 deletions(-) create mode 100644 pkg/compute/tasks/container_batch_start_task.go create mode 100644 pkg/compute/tasks/container_batch_stop_task.go diff --git a/pkg/cloudcommon/db/taskman/subtasks.go b/pkg/cloudcommon/db/taskman/subtasks.go index 586e70784fc..f3e2eecc8a3 100644 --- a/pkg/cloudcommon/db/taskman/subtasks.go +++ b/pkg/cloudcommon/db/taskman/subtasks.go @@ -43,6 +43,7 @@ func init() { "subtask", "subtasks", )} + SubTaskManager.SetVirtualObject(SubTaskManager) } type SSubTask struct { diff --git a/pkg/cloudcommon/db/taskman/tasks.go b/pkg/cloudcommon/db/taskman/tasks.go index 82c95add863..dbe1e39642c 100644 --- a/pkg/cloudcommon/db/taskman/tasks.go +++ b/pkg/cloudcommon/db/taskman/tasks.go @@ -431,8 +431,9 @@ func (manager *STaskManager) NewParallelTask( parentTask := task.GetParentTask() if parentTask != nil { - st := SSubTask{TaskId: parentTask.Id, Stage: parentTask.Stage, SubtaskId: task.Id} - err := SubTaskManager.TableSpec().Insert(ctx, &st) + st := &SSubTask{TaskId: parentTask.Id, Stage: parentTask.Stage, SubtaskId: task.Id} + st.SetModelManager(SubTaskManager, st) + err := SubTaskManager.TableSpec().Insert(ctx, st) if err != nil { log.Errorf("Subtask insert error %s", err) return nil, err diff --git a/pkg/compute/models/containers.go b/pkg/compute/models/containers.go index 1fe67e9c919..11d58354fa7 100644 --- a/pkg/compute/models/containers.go +++ b/pkg/compute/models/containers.go @@ -352,6 +352,35 @@ func (m *SContainerManager) validateSpecProbeHandler(probe apis.ContainerProbeHa return nil } +func (m *SContainerManager) startBatchTask(ctx context.Context, userCred mcclient.TokenCredential, taskName string, ctrs []SContainer, taskData *jsonutils.JSONDict, parentTaskId string) error { + ctrPtrs := make([]db.IStandaloneModel, len(ctrs)) + for i := range ctrs { + ctrPtrs[i] = &ctrs[i] + } + task, err := taskman.TaskManager.NewParallelTask(ctx, taskName, ctrPtrs, userCred, taskData, parentTaskId, "") + if err != nil { + return errors.Wrapf(err, "NewParallelTask %s", taskName) + } + return task.ScheduleRun(nil) +} + +func (m *SContainerManager) StartBatchStartTask(ctx context.Context, userCred mcclient.TokenCredential, ctrs []SContainer, parentTaskId string) error { + return m.startBatchTask(ctx, userCred, "ContainerBatchStartTask", ctrs, nil, parentTaskId) +} + +func (m *SContainerManager) StartBatchStopTask(ctx context.Context, userCred mcclient.TokenCredential, ctrs []SContainer, timeout int, parentTaskId string) error { + params := make([]api.ContainerStopInput, len(ctrs)) + for i := range ctrs { + params[i] = api.ContainerStopInput{ + Timeout: timeout, + } + } + taskParams := jsonutils.NewDict() + taskParams.Add(jsonutils.Marshal(params), "params") + + return m.startBatchTask(ctx, userCred, "ContainerBatchStopTask", ctrs, taskParams, parentTaskId) +} + func (c *SContainer) PostCreate(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data jsonutils.JSONObject) { c.SVirtualResourceBase.PostCreate(ctx, userCred, ownerId, query, data) if !jsonutils.QueryBoolean(data, "skip_task", false) { diff --git a/pkg/compute/tasks/container_batch_start_task.go b/pkg/compute/tasks/container_batch_start_task.go new file mode 100644 index 00000000000..cebcb57ace5 --- /dev/null +++ b/pkg/compute/tasks/container_batch_start_task.go @@ -0,0 +1,49 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "fmt" + + "yunion.io/x/jsonutils" + + "yunion.io/x/onecloud/pkg/cloudcommon/db" + "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman" + "yunion.io/x/onecloud/pkg/compute/models" +) + +func init() { + taskman.RegisterTask(ContainerBatchStartTask{}) +} + +type ContainerBatchStartTask struct { + taskman.STask +} + +func (t *ContainerBatchStartTask) OnInit(ctx context.Context, objs []db.IStandaloneModel, data jsonutils.JSONObject) { + t.SetStage("OnContainersRestartComplete", nil) + for i := range objs { + ctr := objs[i].(*models.SContainer) + if err := ctr.StartStartTask(ctx, t.GetUserCred(), t.GetId()); err != nil { + t.SetStageFailed(ctx, jsonutils.NewString(fmt.Sprintf("start container %s: %s", ctr.GetName(), err.Error()))) + return + } + } +} + +func (t *ContainerBatchStartTask) OnContainersRestartComplete(ctx context.Context, objs []db.IStandaloneModel, data jsonutils.JSONObject) { + t.SetStageComplete(ctx, nil) +} diff --git a/pkg/compute/tasks/container_batch_stop_task.go b/pkg/compute/tasks/container_batch_stop_task.go new file mode 100644 index 00000000000..e934eb543f7 --- /dev/null +++ b/pkg/compute/tasks/container_batch_stop_task.go @@ -0,0 +1,54 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "fmt" + + "yunion.io/x/jsonutils" + + api "yunion.io/x/onecloud/pkg/apis/compute" + "yunion.io/x/onecloud/pkg/cloudcommon/db" + "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman" + "yunion.io/x/onecloud/pkg/compute/models" +) + +func init() { + taskman.RegisterTask(ContainerBatchStopTask{}) +} + +type ContainerBatchStopTask struct { + taskman.STask +} + +func (t *ContainerBatchStopTask) OnInit(ctx context.Context, objs []db.IStandaloneModel, data jsonutils.JSONObject) { + t.SetStage("OnContainersStopComplete", nil) + + params := make([]api.ContainerStopInput, 0) + t.GetParams().Unmarshal(¶ms, "params") + + for i := range objs { + ctr := objs[i].(*models.SContainer) + if err := ctr.StartStopTask(ctx, t.GetUserCred(), ¶ms[i], t.GetTaskId()); err != nil { + t.SetStageFailed(ctx, jsonutils.NewString(fmt.Sprintf("stop container %s: %s", ctr.GetName(), err.Error()))) + return + } + } +} + +func (t *ContainerBatchStopTask) OnContainersStopComplete(ctx context.Context, objs []db.IStandaloneModel, data jsonutils.JSONObject) { + t.SetStageComplete(ctx, nil) +} diff --git a/pkg/compute/tasks/pod_start_task.go b/pkg/compute/tasks/pod_start_task.go index 517cd6652f2..99dfe24a1b5 100644 --- a/pkg/compute/tasks/pod_start_task.go +++ b/pkg/compute/tasks/pod_start_task.go @@ -47,15 +47,9 @@ func (t *PodStartTask) OnPodStarted(ctx context.Context, pod *models.SGuest, _ j t.OnContainerStartedFailed(ctx, pod, jsonutils.NewString(errors.Wrap(err, "GetContainersByPod").Error())) return } - isAllStarted := true - for i := range ctrs { - if !api.ContainerRunningStatus.Has(ctrs[i].GetStatus()) && ctrs[i].GetStatus() != api.CONTAINER_STATUS_PROBE_FAILED { - isAllStarted = false - ctrs[i].StartStartTask(ctx, t.GetUserCred(), t.GetTaskId()) - } - } - if isAllStarted { - t.OnContainerStarted(ctx, pod, nil) + t.SetStage("OnContainerStarted", nil) + if err := models.GetContainerManager().StartBatchStartTask(ctx, t.GetUserCred(), ctrs, t.GetId()); err != nil { + t.OnContainerStartedFailed(ctx, pod, jsonutils.NewString(err.Error())) return } } diff --git a/pkg/compute/tasks/pod_stop_task.go b/pkg/compute/tasks/pod_stop_task.go index 44c1017de21..15f5169c479 100644 --- a/pkg/compute/tasks/pod_stop_task.go +++ b/pkg/compute/tasks/pod_stop_task.go @@ -18,7 +18,6 @@ import ( "context" "yunion.io/x/jsonutils" - "yunion.io/x/log" "yunion.io/x/pkg/errors" api "yunion.io/x/onecloud/pkg/apis/compute" @@ -36,7 +35,6 @@ func init() { } func (t *PodStopTask) OnInit(ctx context.Context, obj db.IStandaloneModel, body jsonutils.JSONObject) { - t.SetStage("OnWaitContainerStopped", nil) t.OnWaitContainerStopped(ctx, obj.(*models.SGuest), nil) } @@ -47,21 +45,15 @@ func (t *PodStopTask) OnWaitContainerStopped(ctx context.Context, pod *models.SG t.OnWaitContainerStoppedFailed(ctx, pod, jsonutils.NewString(errors.Wrap(err, "GetContainersByPod").Error())) return } - isAllStopped := true - for i := range ctrs { - curCtr := ctrs[i] - log.Infof("========container status: %s", curCtr.GetStatus()) - if curCtr.GetStatus() != api.CONTAINER_STATUS_EXITED { - isAllStopped = false - curCtr.StartStopTask(ctx, t.GetUserCred(), &api.ContainerStopInput{ - Timeout: 1, - }, t.GetTaskId()) + if len(ctrs) == 0 { + t.OnContainerStopped(ctx, pod, nil) + } else { + t.SetStage("OnContainerStopped", nil) + if err := models.GetContainerManager().StartBatchStopTask(ctx, t.GetUserCred(), ctrs, 1, t.GetId()); err != nil { + t.OnWaitContainerStoppedFailed(ctx, pod, jsonutils.NewString(err.Error())) + return } } - if isAllStopped { - t.OnContainerStopped(ctx, pod) - return - } } func (t *PodStopTask) OnWaitContainerStoppedFailed(ctx context.Context, pod *models.SGuest, data jsonutils.JSONObject) { @@ -69,7 +61,7 @@ func (t *PodStopTask) OnWaitContainerStoppedFailed(ctx context.Context, pod *mod t.SetStageFailed(ctx, data) } -func (t *PodStopTask) OnContainerStopped(ctx context.Context, pod *models.SGuest) { +func (t *PodStopTask) OnContainerStopped(ctx context.Context, pod *models.SGuest, _ jsonutils.JSONObject) { t.SetStage("OnPodStopped", nil) task, err := taskman.TaskManager.NewTask(ctx, "GuestStopTask", pod, t.GetUserCred(), nil, t.GetTaskId(), "", nil) if err != nil {