Skip to content

Commit

Permalink
simplify logic
Browse files Browse the repository at this point in the history
  • Loading branch information
lukas016 committed Feb 15, 2024
1 parent 6f12394 commit 850ff8f
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 126 deletions.
10 changes: 4 additions & 6 deletions pkg/api/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,10 @@ type VolumeSpec struct {
}

type VolumeStatus struct {
Name string `json:"name,omitempty"`
Handle string `json:"handle,omitempty"`
State VolumeState `json:"state,omitempty"`
Size int64 `json:"size,omitempty"`
DeletedAt *time.Time `json:"deletedAt,omitempty"`
DetachedAt *time.Time `json:"detachedAt,omitempty"`
Name string `json:"name,omitempty"`
Handle string `json:"handle,omitempty"`
State VolumeState `json:"state,omitempty"`
Size int64 `json:"size,omitempty"`
}

type EmptyDiskSpec struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,12 +909,12 @@ func (r *MachineReconciler) updateAPIMachineStatus(ctx context.Context, machine
machine.Status.State = state
}

if len(volumes) != 0 {
if volumes != nil {
requireUpdate = true
machine.Status.VolumeStatus = volumes
}

if len(nics) != 0 {
if nics != nil {
requireUpdate = true
machine.Status.NetworkInterfaceStatus = nics
}
Expand Down
166 changes: 48 additions & 118 deletions pkg/controllers/machine_controller_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"slices"
"sort"
"strings"
"time"

"github.com/digitalocean/go-libvirt"
"github.com/go-logr/logr"
Expand All @@ -23,6 +22,7 @@ import (
libvirtutils "github.com/ironcore-dev/libvirt-provider/pkg/libvirt/utils"
providervolume "github.com/ironcore-dev/libvirt-provider/pkg/plugins/volume"
providerhost "github.com/ironcore-dev/libvirt-provider/pkg/providerhost"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/ptr"
utilstrings "k8s.io/utils/strings"
"libvirt.org/go/libvirtxml"
Expand Down Expand Up @@ -84,69 +84,61 @@ func getLastVolumeSize(machine *api.Machine, volumeID string) *int64 {
return nil
}

func fillStatusWithMountedVolumes(status map[string]*api.VolumeStatus, mounter VolumeMounter) error {
return mounter.ForEachVolume(func(volume *MountVolume) bool {
_, ok := status[volume.ComputeVolumeName]
if !ok {
volumeStatus := api.VolumeStatus{
Name: volume.ComputeVolumeName,
State: api.VolumeStatePending,
}
status[volumeStatus.Name] = &volumeStatus
}
return true
})
}

// reconcileVolumes is doing attaching, detaching, deleting of volumes and it manage status of volumes
func (r *MachineReconciler) reconcileVolumes(ctx context.Context, log logr.Logger, machine *api.Machine, attacher VolumeAttacher) ([]api.VolumeStatus, error) {
mounter := r.machineVolumeMounter(machine)
specVolumes := r.listDesiredVolumes(machine)

currentStatus := machine.Status.GetVolumesAsMap()

attachedVolumes, err := attacher.ListVolumesAsMap()
if err != nil {
// missing list of attached volumes can affect deletion of volumes, so we cannot continue
return machine.Status.VolumeStatus, err
currentVolumeNames := sets.NewString()
if err := attacher.ForEachVolume(func(volume *AttachVolume) bool {
currentVolumeNames.Insert(volume.Name)
return true
}); err != nil {
return nil, fmt.Errorf("error iteratin attached volumes: %w", err)
}

syncStatusWithAttachedVolumes(currentStatus, attachedVolumes)

errs := r.deleteDetachedVolumes(ctx, log, currentStatus, mounter)

err = fillStatusWithMountedVolumes(currentStatus, mounter)
if err != nil {
errs = append(errs, fmt.Errorf("error iterating mounted volumes: %w", err))
if err := mounter.ForEachVolume(func(volume *MountVolume) bool {
currentVolumeNames.Insert(volume.ComputeVolumeName)
return true
}); err != nil {
return nil, fmt.Errorf("error iterating mounted volumes: %w", err)
}

for volumeName, volumeStatus := range currentStatus {
volumeStatus := machine.Status.GetVolumesAsMap()
var errs []error
for volumeName := range currentVolumeNames {
if _, ok := specVolumes[volumeName]; ok {
continue
}

// skip detached volume
if volumeStatus.DeletedAt != nil {
status, ok := volumeStatus[volumeName]
if !ok {
errs = append(errs, fmt.Errorf("[volume %s] error detaching: cannot find in status", volumeName))
continue
}

volumeStatus.State = api.VolumeStatePending

log.V(1).Info("Detach non-required volume", "volumeName", volumeName)
if err := attacher.DetachVolume(volumeName); err != nil && !errors.Is(err, ErrAttachedVolumeNotFound) {
errs = append(errs, fmt.Errorf("error detaching volume: %w", err))
log.V(1).Info("Deleting non-required volume", "volumeName", volumeName)
deleted, err := r.deleteVolume(ctx, log, mounter, attacher, volumeName)
if err != nil {
errs = append(errs, fmt.Errorf("[volume %s] error detaching: %w", volumeName, err))
}
if volumeStatus.DetachedAt == nil {
volumeStatus.DetachedAt = ptr.To[time.Time](time.Now())

if deleted {
delete(volumeStatus, volumeName)
} else {
status.State = api.VolumeStatePending
}
}

for _, volume := range specVolumes {
log.V(1).Info("Reconciling volume", "volumeName", volume.Name)
status, ok := currentStatus[volume.Name]
status, ok := volumeStatus[volume.Name]
if !ok {
status = &api.VolumeStatus{Name: volume.Name, State: api.VolumeStatePending}
currentStatus[volume.Name] = status
status = &api.VolumeStatus{
Name: volume.Name,
State: api.VolumeStatePending,
}
volumeStatus[volume.Name] = status
}

volumeID, volumeSize, err := r.applyVolume(ctx, log, machine, volume, mounter, attacher)
Expand All @@ -162,61 +154,31 @@ func (r *MachineReconciler) reconcileVolumes(ctx context.Context, log logr.Logge
log.V(1).Info("Successfully reconciled volume", "volumeName", volume.Name, "volumeID", volumeID)
}

newVolumeStatus := convertVolumesMapToListAndNormalize(currentStatus)
newVolumeStatus := convertVolumesMapToListAndNormalize(volumeStatus)

if len(errs) > 0 {
return newVolumeStatus, fmt.Errorf("volume reconciliation error(s): %v", errs)
}
return newVolumeStatus, nil
}

func (r *MachineReconciler) deleteDetachedVolumes(ctx context.Context, log logr.Logger, status map[string]*api.VolumeStatus, mounter VolumeMounter) []error {
var errs []error

// Deleting detached volumes from previous reconciliation loop
for volumeName, volumeStatus := range status {
if volumeStatus.DeletedAt == nil {
continue
func (r *MachineReconciler) deleteVolume(ctx context.Context, log logr.Logger, mounter VolumeMounter, attacher VolumeAttacher, volumeName string) (bool, error) {
deleted := false
log.V(1).Info("Detaching volume if attached")
if err := attacher.DetachVolume(volumeName); err != nil {
if !errors.Is(err, ErrAttachedVolumeNotFound) {
return deleted, fmt.Errorf("error detaching volume: %w", err)
}

log.V(1).Info("Unmounting volume if mounted", "volumeName", volumeName)
log.V(1).Info("Successfully detached volume", "volumeName", volumeName)
log.V(1).Info("Unmounting volume if mounted")
if err := mounter.DeleteVolume(ctx, volumeName); err != nil && !errors.Is(err, ErrMountedVolumeNotFound) {
errs = append(errs, fmt.Errorf("error unmounting volume %s: %w", volumeName, err))
continue
return deleted, fmt.Errorf("error unmounting volume: %w", err)
}

delete(status, volumeName)
deleted = true
}

return errs
}

func syncStatusWithAttachedVolumes(status map[string]*api.VolumeStatus, attached map[string]AttachVolume) {
if len(attached) == 0 {
return
}

for key, volumeStatus := range status {
_, ok := attached[key]
if !ok && volumeStatus.DeletedAt == nil {
volumeStatus.DeletedAt = ptr.To[time.Time](time.Now())
}
}

for key := range attached {
_, ok := status[key]
if ok {
continue
}

volume := attached[key]
volumeStatus := api.VolumeStatus{
Name: volume.Name,
State: api.VolumeStateAttached,
Size: volume.Spec.Size,
}
status[key] = &volumeStatus
}
return deleted, nil
}

func convertVolumesMapToListAndNormalize(currentStatus map[string]*api.VolumeStatus) []api.VolumeStatus {
Expand All @@ -241,7 +203,6 @@ type AttachVolume struct {

type VolumeAttacher interface {
ListVolumes() ([]AttachVolume, error)
ListVolumesAsMap() (map[string]AttachVolume, error)
ForEachVolume(f func(*AttachVolume) bool) error
GetVolume(name string) (*AttachVolume, error)
AttachVolume(volume *AttachVolume) error
Expand Down Expand Up @@ -335,7 +296,6 @@ func (a *domainExecutor) ResizeDisk(device string, size int64) error {
type libvirtVolumeAttacher struct {
domainDesc *libvirtxml.Domain
executor DomainExecutor
volumes map[string]*AttachVolume
}

func NewLibvirtVolumeAttacher(domainDesc *libvirtxml.Domain, executor DomainExecutor) (VolumeAttacher, error) {
Expand Down Expand Up @@ -372,44 +332,14 @@ func (a *libvirtVolumeAttacher) diskByVolumeNameIndex(name string) (int, error)
return -1, nil
}

func (a *libvirtVolumeAttacher) loadVolumes() error {
if a.volumes != nil {
return nil
}

a.volumes = map[string]*AttachVolume{}
return a.ForEachVolume(func(volume *AttachVolume) bool {
a.volumes[volume.Name] = volume
return true
})
}

func (a *libvirtVolumeAttacher) ListVolumes() ([]AttachVolume, error) {
err := a.loadVolumes()
if err != nil {
return nil, err
}

var res []AttachVolume
for _, volume := range a.volumes {
if err := a.ForEachVolume(func(volume *AttachVolume) bool {
res = append(res, *volume)
}

return res, nil
}

func (a *libvirtVolumeAttacher) ListVolumesAsMap() (map[string]AttachVolume, error) {
err := a.loadVolumes()
if err != nil {
return true
}); err != nil {
return nil, err
}

// we need to break reference
res := make(map[string]AttachVolume, len(a.volumes))
for key, volume := range a.volumes {
res[key] = *volume
}

return res, nil
}

Expand Down

0 comments on commit 850ff8f

Please sign in to comment.