Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #21894: Fix/host load servers #21896

Open
wants to merge 2 commits into
base: release/3.12
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/compute/models/disks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2889,7 +2889,7 @@ func (self *SDisk) CleanOverduedSnapshots(ctx context.Context, userCred mcclient
}
snapshot.SetModelManager(SnapshotManager, snapshot)
if snapshot.ExpiredAt.Before(now) {
err = snapshot.StartSnapshotDeleteTask(ctx, userCred, false, self.Id)
err = snapshot.StartSnapshotDeleteTask(ctx, userCred, false, self.Id, 0, 0)
if err != nil {
return err
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/compute/models/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,9 +706,14 @@ func (self *SSnapshotManager) CreateSnapshot(ctx context.Context, owner mcclient
return snapshot, nil
}

func (self *SSnapshot) StartSnapshotDeleteTask(ctx context.Context, userCred mcclient.TokenCredential, reloadDisk bool, parentTaskId string) error {
func (self *SSnapshot) StartSnapshotDeleteTask(ctx context.Context, userCred mcclient.TokenCredential, reloadDisk bool, parentTaskId string, deleteSnapshotTotalCnt int, deletedSnapshotCnt int) error {
params := jsonutils.NewDict()
params.Set("reload_disk", jsonutils.NewBool(reloadDisk))
if deleteSnapshotTotalCnt <= 0 {
deleteSnapshotTotalCnt = 1
}
params.Set("snapshot_total_count", jsonutils.NewInt(int64(deleteSnapshotTotalCnt)))
params.Set("deleted_snapshot_count", jsonutils.NewInt(int64(deletedSnapshotCnt)))
self.SetStatus(ctx, userCred, api.SNAPSHOT_DELETING, "")
task, err := taskman.TaskManager.NewTask(ctx, "SnapshotDeleteTask", self, userCred, params, parentTaskId, "", nil)
if err != nil {
Expand Down Expand Up @@ -772,7 +777,7 @@ func (self *SSnapshot) GetRegionDriver() IRegionDriver {
}

func (self *SSnapshot) CustomizeDelete(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) error {
return self.StartSnapshotDeleteTask(ctx, userCred, false, "")
return self.StartSnapshotDeleteTask(ctx, userCred, false, "", 0, 0)
}

func (self *SSnapshot) PerformDeleted(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
Expand All @@ -783,7 +788,7 @@ func (self *SSnapshot) PerformDeleted(ctx context.Context, userCred mcclient.Tok
if err != nil {
return nil, err
}
err = self.StartSnapshotDeleteTask(ctx, userCred, true, "")
err = self.StartSnapshotDeleteTask(ctx, userCred, true, "", 0, 0)
return nil, err
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/compute/regiondrivers/kvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,15 @@ func (self *SKVMRegionDriver) RequestDeleteInstanceSnapshot(ctx context.Context,
}

params := jsonutils.NewDict()
taskParams := task.GetParams()
var deleteSnapshotTotalCnt int64 = 1
if taskParams.Contains("snapshot_total_count") {
deleteSnapshotTotalCnt, _ = taskParams.Int("snapshot_total_count")
}
deletedSnapshotCnt := deleteSnapshotTotalCnt - int64(len(snapshots))
params.Set("del_snapshot_id", jsonutils.NewString(snapshots[0].Id))
task.SetStage("OnKvmSnapshotDelete", params)
err = snapshots[0].StartSnapshotDeleteTask(ctx, task.GetUserCred(), false, task.GetTaskId())
err = snapshots[0].StartSnapshotDeleteTask(ctx, task.GetUserCred(), false, task.GetTaskId(), int(deleteSnapshotTotalCnt), int(deletedSnapshotCnt))
if err != nil {
return err
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/compute/storagedrivers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ func (self *SBaseStorageDriver) RequestDeleteSnapshot(ctx context.Context, snaps
} else {
params.Set("auto_deleted", jsonutils.JSONTrue)
}
taskParams := task.GetParams()
if taskParams.Contains("snapshot_total_count") {
totalCnt, _ := taskParams.Get("snapshot_total_count")
params.Set("snapshot_total_count", totalCnt)
deletedCnt, _ := taskParams.Get("deleted_snapshot_count")
params.Set("deleted_snapshot_count", deletedCnt)
}

guest.SetStatus(ctx, task.GetUserCred(), api.VM_SNAPSHOT_DELETE, "Start Delete Snapshot")
return drv.RequestDeleteSnapshot(ctx, guest, task, params)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/compute/tasks/disk_backup_create_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (self *DiskBackupCreateTask) OnSave(ctx context.Context, backup *models.SDi
return nil
})
snapshot := snapshotModel.(*models.SSnapshot)
err = snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.GetId())
err = snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.GetId(), 0, 0)
if err != nil {
self.taskFailed(ctx, backup, jsonutils.NewString(err.Error()), api.BACKUP_STATUS_CLEANUP_SNAPSHOT_FAILED)
return
Expand All @@ -151,7 +151,7 @@ func (self *DiskBackupCreateTask) OnSaveFailed(ctx context.Context, backup *mode
}
snapshot := snapshotModel.(*models.SSnapshot)
self.taskFailed(ctx, backup, data, api.BACKUP_STATUS_SAVE_FAILED)
err = snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.GetId())
err = snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.GetId(), 0, 0)
if err != nil {
log.Errorf("unable to cleanup snapshot: %s", err.Error())
self.taskFailed(ctx, backup, data, api.BACKUP_STATUS_SAVE_FAILED)
Expand Down
2 changes: 1 addition & 1 deletion pkg/compute/tasks/disk_clean_overdued_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (self *SnapshotCleanupTask) StartSnapshotsDelete(ctx context.Context, snaps
}
self.SetStage("OnDeleteSnapshot", nil)
snapshot.SetModelManager(models.SnapshotManager, &snapshot)
err := snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.GetId())
err := snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.GetId(), 0, 0)
if err != nil {
self.OnDeleteSnapshotFailed(ctx, self.GetObject(), nil)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/compute/tasks/instance_snapshot_delete_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ func (self *InstanceSnapshotDeleteTask) OnInit(
ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {

isp := obj.(*models.SInstanceSnapshot)
sps, err := isp.GetSnapshots()
if err != nil {
self.taskFail(ctx, isp, jsonutils.NewString(err.Error()))
return
}
snapshotCnt := len(sps)
self.Params.Set("snapshot_total_count", jsonutils.NewInt(int64(snapshotCnt)))
self.SetStage("OnInstanceSnapshotDelete", nil)
if err := isp.GetRegionDriver().RequestDeleteInstanceSnapshot(ctx, isp, self); err != nil {
self.taskFail(ctx, isp, jsonutils.NewString(err.Error()))
Expand All @@ -84,6 +91,7 @@ func (self *InstanceSnapshotDeleteTask) OnKvmSnapshotDelete(
self.taskFail(ctx, isp, jsonutils.NewString(err.Error()))
return
}

if err := isp.GetRegionDriver().RequestDeleteInstanceSnapshot(ctx, isp, self); err != nil {
self.taskFail(ctx, isp, jsonutils.NewString(err.Error()))
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/compute/tasks/snapshot_delete_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (self *GuestDeleteSnapshotsTask) StartDeleteDiskSnapshots(
self.Params.Set("snapshots", jsonutils.Marshal(snapshots))
self.SetStage("OnSnapshotDelete", nil)
snapshot.SetModelManager(models.SnapshotManager, &snapshot)
snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.Id)
snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.Id, 0, 0)
return
}
self.SetStageComplete(ctx, nil)
Expand Down Expand Up @@ -304,7 +304,7 @@ func (self *DiskDeleteSnapshotsTask) StartDeleteDiskSnapshots(
self.Params.Set("snapshots", jsonutils.Marshal(snapshots))
self.SetStage("OnSnapshotDelete", nil)
snapshot.SetModelManager(models.SnapshotManager, &snapshot)
snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.Id)
snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.Id, 0, 0)
return
}
self.SetStageComplete(ctx, nil)
Expand Down
14 changes: 11 additions & 3 deletions pkg/hostman/guestman/guesthandlers/guesthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,12 @@ func guestDeleteSnapshot(ctx context.Context, userCred mcclient.TokenCredential,
if err != nil {
return nil, httperrors.NewMissingParameterError("disk_id")
}
var totalCnt, deletedCnt int64 = 1, 0
if body.Contains("snapshot_total_count") {
totalCnt, _ = body.Int("snapshot_total_count")
deletedCnt, _ = body.Int("deleted_snapshot_count")
}

guest, ok := guestman.GetGuestManager().GetKVMServer(sid)
if !ok {
return nil, httperrors.NewNotFoundError("guest %s not found", sid)
Expand All @@ -717,9 +723,11 @@ func guestDeleteSnapshot(ctx context.Context, userCred mcclient.TokenCredential,
}

params := &guestman.SDeleteDiskSnapshot{
Sid: sid,
DeleteSnapshot: deleteSnapshot,
Disk: disk,
Sid: sid,
DeleteSnapshot: deleteSnapshot,
Disk: disk,
TotalDeleteSnapshotCount: int(totalCnt),
DeletedSnapshotCount: int(deletedCnt),
}

if body.Contains("encrypt_info") {
Expand Down
3 changes: 3 additions & 0 deletions pkg/hostman/guestman/guesthelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ type SDeleteDiskSnapshot struct {
ConvertSnapshot string
BlockStream bool
EncryptInfo apis.SEncryptInfo

TotalDeleteSnapshotCount int
DeletedSnapshotCount int
}

type SLibvirtServer struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/hostman/guestman/guestman.go
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,8 @@ func (m *SGuestManager) DeleteSnapshot(ctx context.Context, params interface{})
if len(delParams.ConvertSnapshot) > 0 || delParams.BlockStream {
guest, _ := m.GetKVMServer(delParams.Sid)
return guest.ExecDeleteSnapshotTask(ctx, delParams.Disk, delParams.DeleteSnapshot,
delParams.ConvertSnapshot, delParams.BlockStream, delParams.EncryptInfo)
delParams.ConvertSnapshot, delParams.BlockStream, delParams.EncryptInfo,
delParams.TotalDeleteSnapshotCount, delParams.DeletedSnapshotCount)
} else {
res := jsonutils.NewDict()
res.Set("deleted", jsonutils.JSONTrue)
Expand Down
38 changes: 27 additions & 11 deletions pkg/hostman/guestman/guesttasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (d *SGuestDiskSyncTask) syncDisksConf() {
}
if idxs := d.guest.GetNeedMergeBackingFileDiskIndexs(); len(idxs) > 0 {
d.guest.StreamDisks(context.Background(),
func() { d.guest.streamDisksComplete(context.Background()) }, idxs,
func() { d.guest.streamDisksComplete(context.Background()) }, idxs, -1, -1,
)
}
d.callback(d.errors...)
Expand Down Expand Up @@ -1635,7 +1635,7 @@ func (s *SGuestResumeTask) startStreamDisks(disksIdx []int) {
s.startTime = time.Time{}
s.detachStartupTask()
if s.IsMonitorAlive() {
s.StreamDisks(s.ctx, func() { s.onStreamComplete(disksIdx) }, disksIdx)
s.StreamDisks(s.ctx, func() { s.onStreamComplete(disksIdx) }, disksIdx, -1, -1)
}
}

Expand Down Expand Up @@ -1730,8 +1730,10 @@ func (s *SGuestBlockProgressBaseTask) onGetBlockJobs(jobs []monitor.BlockJob) {
}

diskCount := s.task.StreamingDiskCount()
streamedDiskCount := s.task.StreamingDiskCompletedCount()
if diskCount > 0 {
progress = float64(s.task.StreamingDiskCompletedCount())/float64(diskCount)*100.0 + 1.0/float64(diskCount)*progress
progress = float64(streamedDiskCount)/float64(diskCount)*100.0 + 1.0/float64(diskCount)*progress
log.Debugf("stream disk111111 progress %v, streamedDiskCount %v, diskCount %v ", progress, streamedDiskCount, diskCount)
}
hostutils.UpdateServerProgress(context.Background(), s.GetId(), progress, mbps)
s.task.OnGetBlockJobs(jobs)
Expand All @@ -1757,12 +1759,17 @@ type SGuestStreamDisksTask struct {
c chan struct{}
streamDevs []string
lvmBacking []string

progressTotalDiskCnt int
progressCompletedDiskCnt int
}

func NewGuestStreamDisksTask(ctx context.Context, guest *SKVMGuestInstance, callback func(), disksIdx []int) *SGuestStreamDisksTask {
func NewGuestStreamDisksTask(ctx context.Context, guest *SKVMGuestInstance, callback func(), disksIdx []int, totalCnt, completedCnt int) *SGuestStreamDisksTask {
task := &SGuestStreamDisksTask{
callback: callback,
disksIdx: disksIdx,
callback: callback,
disksIdx: disksIdx,
progressTotalDiskCnt: totalCnt,
progressCompletedDiskCnt: completedCnt,
}
task.SGuestBlockProgressBaseTask = NewGuestBlockProgressBaseTask(ctx, guest, task)
return task
Expand Down Expand Up @@ -1834,10 +1841,18 @@ func (s *SGuestStreamDisksTask) startDoBlockStream() {
}

func (s *SGuestStreamDisksTask) StreamingDiskCompletedCount() int {
return len(s.disksIdx) - len(s.streamDevs) - 1
completedCnt := len(s.disksIdx) - len(s.streamDevs) - 1
if s.progressCompletedDiskCnt > 0 {
completedCnt += s.progressCompletedDiskCnt
}
return completedCnt
}

func (s *SGuestStreamDisksTask) StreamingDiskCount() int {
if s.progressTotalDiskCnt > 0 {
return s.progressTotalDiskCnt
}

return len(s.disksIdx)
}

Expand Down Expand Up @@ -2060,9 +2075,9 @@ func NewGuestSnapshotDeleteTask(
}
}

func (s *SGuestSnapshotDeleteTask) Start() {
func (s *SGuestSnapshotDeleteTask) Start(totalDeleteSnapshotCount, deletedSnapshotCount int) {
if s.blockStream {
s.startBlockStream()
s.startBlockStream(totalDeleteSnapshotCount, deletedSnapshotCount)
return
}

Expand All @@ -2073,14 +2088,15 @@ func (s *SGuestSnapshotDeleteTask) Start() {
s.fetchDisksInfo(s.doReloadDisk)
}

func (s *SGuestSnapshotDeleteTask) startBlockStream() {
func (s *SGuestSnapshotDeleteTask) startBlockStream(totalDeleteSnapshotCount, deletedSnapshotCount int) {
diskIdx := []int{}
for i := range s.Desc.Disks {
if s.Desc.Disks[i].DiskId == s.disk.GetId() {
diskIdx = append(diskIdx, int(s.Desc.Disks[i].Index))
break
}
}
s.StreamDisks(s.ctx, s.onStreamDiskComplete, diskIdx)
s.StreamDisks(s.ctx, s.onStreamDiskComplete, diskIdx, totalDeleteSnapshotCount, deletedSnapshotCount)
}

func (s *SGuestSnapshotDeleteTask) onStreamDiskComplete() {
Expand Down
15 changes: 15 additions & 0 deletions pkg/hostman/guestman/pci.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,14 +417,29 @@ func (s *SKVMGuestInstance) initGuestDisks(pciRoot, pciBridge *desc.PCIControlle
for i := 0; i < len(s.Desc.Disks); i++ {
devType := qemu.GetDiskDeviceModel(s.Desc.Disks[i].Driver)
id := fmt.Sprintf("drive_%d", s.Desc.Disks[i].Index)
if s.Desc.Disks[i].Pci != nil || s.Desc.Disks[i].Scsi != nil {
log.Infof("guest %s disk %v has been init", s.Desc.Uuid, s.Desc.Disks[i].Index)
continue
}

switch s.Desc.Disks[i].Driver {
case DISK_DRIVER_VIRTIO:
if s.Desc.Disks[i].Pci == nil {
s.Desc.Disks[i].Pci = desc.NewPCIDevice(cont.CType, devType, id)
}
case DISK_DRIVER_SCSI:
if s.Desc.VirtioScsi == nil {
s.Desc.VirtioScsi = &desc.SGuestVirtioScsi{
PCIDevice: desc.NewPCIDevice(pciRoot.CType, "virtio-scsi-pci", "scsi"),
}
}
s.Desc.Disks[i].Scsi = desc.NewScsiDevice(s.Desc.VirtioScsi.Id, devType, id)
case DISK_DRIVER_PVSCSI:
if s.Desc.PvScsi == nil {
s.Desc.PvScsi = &desc.SGuestPvScsi{
PCIDevice: desc.NewPCIDevice(pciRoot.CType, "pvscsi", "scsi"),
}
}
s.Desc.Disks[i].Scsi = desc.NewScsiDevice(s.Desc.PvScsi.Id, devType, id)
case DISK_DRIVER_IDE:
s.Desc.Disks[i].Ide = desc.NewIdeDevice(devType, id)
Expand Down
7 changes: 4 additions & 3 deletions pkg/hostman/guestman/qemu-kvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3120,9 +3120,9 @@ func (s *SKVMGuestInstance) GetFuseTmpPath() string {
return path.Join(s.HomeDir(), "tmp")
}

func (s *SKVMGuestInstance) StreamDisks(ctx context.Context, callback func(), disksIdx []int) {
func (s *SKVMGuestInstance) StreamDisks(ctx context.Context, callback func(), disksIdx []int, totalCnt, completedCnt int) {
log.Infof("Start guest block stream task %s ...", s.GetName())
task := NewGuestStreamDisksTask(ctx, s, callback, disksIdx)
task := NewGuestStreamDisksTask(ctx, s, callback, disksIdx, totalCnt, completedCnt)
task.Start()
}

Expand Down Expand Up @@ -3199,11 +3199,12 @@ func (s *SKVMGuestInstance) StaticSaveSnapshot(
func (s *SKVMGuestInstance) ExecDeleteSnapshotTask(
ctx context.Context, disk storageman.IDisk,
deleteSnapshot string, convertSnapshot string, blockStream bool, encryptInfo apis.SEncryptInfo,
totalDeleteSnapshotCount, deletedSnapshotCount int,
) (jsonutils.JSONObject, error) {
if s.IsRunning() {
if s.isLiveSnapshotEnabled() {
task := NewGuestSnapshotDeleteTask(ctx, s, disk, deleteSnapshot, convertSnapshot, blockStream, encryptInfo)
task.Start()
task.Start(totalDeleteSnapshotCount, deletedSnapshotCount)
return nil, nil
} else {
return nil, fmt.Errorf("Guest dosen't support live snapshot delete")
Expand Down
Loading