Skip to content

Commit

Permalink
fix(region): ContainerBatchStopTask and ContainerBatchStartTask (#21617)
Browse files Browse the repository at this point in the history
  • Loading branch information
zexi authored Nov 18, 2024
1 parent 1196406 commit 3ff4931
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 27 deletions.
1 change: 1 addition & 0 deletions pkg/cloudcommon/db/taskman/subtasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func init() {
"subtask",
"subtasks",
)}
SubTaskManager.SetVirtualObject(SubTaskManager)
}

type SSubTask struct {
Expand Down
5 changes: 3 additions & 2 deletions pkg/cloudcommon/db/taskman/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,9 @@ func (manager *STaskManager) NewParallelTask(

parentTask := task.GetParentTask()
if parentTask != nil {
st := SSubTask{TaskId: parentTask.Id, Stage: parentTask.Stage, SubtaskId: task.Id}
err := SubTaskManager.TableSpec().Insert(ctx, &st)
st := &SSubTask{TaskId: parentTask.Id, Stage: parentTask.Stage, SubtaskId: task.Id}
st.SetModelManager(SubTaskManager, st)
err := SubTaskManager.TableSpec().Insert(ctx, st)
if err != nil {
log.Errorf("Subtask insert error %s", err)
return nil, err
Expand Down
29 changes: 29 additions & 0 deletions pkg/compute/models/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,35 @@ func (m *SContainerManager) validateSpecProbeHandler(probe apis.ContainerProbeHa
return nil
}

func (m *SContainerManager) startBatchTask(ctx context.Context, userCred mcclient.TokenCredential, taskName string, ctrs []SContainer, taskData *jsonutils.JSONDict, parentTaskId string) error {
ctrPtrs := make([]db.IStandaloneModel, len(ctrs))
for i := range ctrs {
ctrPtrs[i] = &ctrs[i]
}
task, err := taskman.TaskManager.NewParallelTask(ctx, taskName, ctrPtrs, userCred, taskData, parentTaskId, "")
if err != nil {
return errors.Wrapf(err, "NewParallelTask %s", taskName)
}
return task.ScheduleRun(nil)
}

func (m *SContainerManager) StartBatchStartTask(ctx context.Context, userCred mcclient.TokenCredential, ctrs []SContainer, parentTaskId string) error {
return m.startBatchTask(ctx, userCred, "ContainerBatchStartTask", ctrs, nil, parentTaskId)
}

func (m *SContainerManager) StartBatchStopTask(ctx context.Context, userCred mcclient.TokenCredential, ctrs []SContainer, timeout int, parentTaskId string) error {
params := make([]api.ContainerStopInput, len(ctrs))
for i := range ctrs {
params[i] = api.ContainerStopInput{
Timeout: timeout,
}
}
taskParams := jsonutils.NewDict()
taskParams.Add(jsonutils.Marshal(params), "params")

return m.startBatchTask(ctx, userCred, "ContainerBatchStopTask", ctrs, taskParams, parentTaskId)
}

func (c *SContainer) PostCreate(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data jsonutils.JSONObject) {
c.SVirtualResourceBase.PostCreate(ctx, userCred, ownerId, query, data)
if !jsonutils.QueryBoolean(data, "skip_task", false) {
Expand Down
49 changes: 49 additions & 0 deletions pkg/compute/tasks/container_batch_start_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tasks

import (
"context"
"fmt"

"yunion.io/x/jsonutils"

"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
"yunion.io/x/onecloud/pkg/compute/models"
)

func init() {
taskman.RegisterTask(ContainerBatchStartTask{})
}

type ContainerBatchStartTask struct {
taskman.STask
}

func (t *ContainerBatchStartTask) OnInit(ctx context.Context, objs []db.IStandaloneModel, data jsonutils.JSONObject) {
t.SetStage("OnContainersRestartComplete", nil)
for i := range objs {
ctr := objs[i].(*models.SContainer)
if err := ctr.StartStartTask(ctx, t.GetUserCred(), t.GetId()); err != nil {
t.SetStageFailed(ctx, jsonutils.NewString(fmt.Sprintf("start container %s: %s", ctr.GetName(), err.Error())))
return
}
}
}

func (t *ContainerBatchStartTask) OnContainersRestartComplete(ctx context.Context, objs []db.IStandaloneModel, data jsonutils.JSONObject) {
t.SetStageComplete(ctx, nil)
}
54 changes: 54 additions & 0 deletions pkg/compute/tasks/container_batch_stop_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tasks

import (
"context"
"fmt"

"yunion.io/x/jsonutils"

api "yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
"yunion.io/x/onecloud/pkg/compute/models"
)

func init() {
taskman.RegisterTask(ContainerBatchStopTask{})
}

type ContainerBatchStopTask struct {
taskman.STask
}

func (t *ContainerBatchStopTask) OnInit(ctx context.Context, objs []db.IStandaloneModel, data jsonutils.JSONObject) {
t.SetStage("OnContainersStopComplete", nil)

params := make([]api.ContainerStopInput, 0)
t.GetParams().Unmarshal(&params, "params")

for i := range objs {
ctr := objs[i].(*models.SContainer)
if err := ctr.StartStopTask(ctx, t.GetUserCred(), &params[i], t.GetTaskId()); err != nil {
t.SetStageFailed(ctx, jsonutils.NewString(fmt.Sprintf("stop container %s: %s", ctr.GetName(), err.Error())))
return
}
}
}

func (t *ContainerBatchStopTask) OnContainersStopComplete(ctx context.Context, objs []db.IStandaloneModel, data jsonutils.JSONObject) {
t.SetStageComplete(ctx, nil)
}
12 changes: 3 additions & 9 deletions pkg/compute/tasks/pod_start_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,9 @@ func (t *PodStartTask) OnPodStarted(ctx context.Context, pod *models.SGuest, _ j
t.OnContainerStartedFailed(ctx, pod, jsonutils.NewString(errors.Wrap(err, "GetContainersByPod").Error()))
return
}
isAllStarted := true
for i := range ctrs {
if !api.ContainerRunningStatus.Has(ctrs[i].GetStatus()) && ctrs[i].GetStatus() != api.CONTAINER_STATUS_PROBE_FAILED {
isAllStarted = false
ctrs[i].StartStartTask(ctx, t.GetUserCred(), t.GetTaskId())
}
}
if isAllStarted {
t.OnContainerStarted(ctx, pod, nil)
t.SetStage("OnContainerStarted", nil)
if err := models.GetContainerManager().StartBatchStartTask(ctx, t.GetUserCred(), ctrs, t.GetId()); err != nil {
t.OnContainerStartedFailed(ctx, pod, jsonutils.NewString(err.Error()))
return
}
}
Expand Down
24 changes: 8 additions & 16 deletions pkg/compute/tasks/pod_stop_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"

api "yunion.io/x/onecloud/pkg/apis/compute"
Expand All @@ -36,7 +35,6 @@ func init() {
}

func (t *PodStopTask) OnInit(ctx context.Context, obj db.IStandaloneModel, body jsonutils.JSONObject) {
t.SetStage("OnWaitContainerStopped", nil)
t.OnWaitContainerStopped(ctx, obj.(*models.SGuest), nil)
}

Expand All @@ -47,29 +45,23 @@ func (t *PodStopTask) OnWaitContainerStopped(ctx context.Context, pod *models.SG
t.OnWaitContainerStoppedFailed(ctx, pod, jsonutils.NewString(errors.Wrap(err, "GetContainersByPod").Error()))
return
}
isAllStopped := true
for i := range ctrs {
curCtr := ctrs[i]
log.Infof("========container status: %s", curCtr.GetStatus())
if curCtr.GetStatus() != api.CONTAINER_STATUS_EXITED {
isAllStopped = false
curCtr.StartStopTask(ctx, t.GetUserCred(), &api.ContainerStopInput{
Timeout: 1,
}, t.GetTaskId())
if len(ctrs) == 0 {
t.OnContainerStopped(ctx, pod, nil)
} else {
t.SetStage("OnContainerStopped", nil)
if err := models.GetContainerManager().StartBatchStopTask(ctx, t.GetUserCred(), ctrs, 1, t.GetId()); err != nil {
t.OnWaitContainerStoppedFailed(ctx, pod, jsonutils.NewString(err.Error()))
return
}
}
if isAllStopped {
t.OnContainerStopped(ctx, pod)
return
}
}

func (t *PodStopTask) OnWaitContainerStoppedFailed(ctx context.Context, pod *models.SGuest, data jsonutils.JSONObject) {
pod.SetStatus(ctx, t.GetUserCred(), api.POD_STATUS_STOP_CONTAINER_FAILED, data.String())
t.SetStageFailed(ctx, data)
}

func (t *PodStopTask) OnContainerStopped(ctx context.Context, pod *models.SGuest) {
func (t *PodStopTask) OnContainerStopped(ctx context.Context, pod *models.SGuest, _ jsonutils.JSONObject) {
t.SetStage("OnPodStopped", nil)
task, err := taskman.TaskManager.NewTask(ctx, "GuestStopTask", pod, t.GetUserCred(), nil, t.GetTaskId(), "", nil)
if err != nil {
Expand Down

0 comments on commit 3ff4931

Please sign in to comment.