Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support volume reservation on storagepool #11

Merged
merged 13 commits into from
Jan 24, 2024
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,10 @@ However, it is important to note that LiteIO does not currently support data rep

- [x] Disk-Agent exposes metric service
- [ ] SPDK volume replica


## Contact

Wechat Group QRCode

![Wechat Group](doc/image/wechat_group.JPG)
65 changes: 36 additions & 29 deletions cmd/controller/antplugins/localstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/kubeutil"
Expand Down Expand Up @@ -41,6 +43,7 @@ func NewReportLocalStoragePlugin(h *controllers.PluginHandle) (p plugin.Plugin,
}

p = &ReportLocalStoragePlugin{
Client: h.Client,
NodeUpdater: kubeutil.NewKubeNodeInfoGetter(h.Req.KubeCli),
PoolUtil: kubeutil.NewStoragePoolUtil(h.Client),
ReportLocalConfigs: pluginCfg.DefaultLocalSpaceRules,
Expand All @@ -50,7 +53,7 @@ func NewReportLocalStoragePlugin(h *controllers.PluginHandle) (p plugin.Plugin,

// ReportLocalStoragePlugin is a AntstorVolume plugin.
type ReportLocalStoragePlugin struct {
// NodeGetter kubeutil.NodeInfoGetterIface
Client client.Client
NodeUpdater kubeutil.NodeUpdaterIface
PoolUtil kubeutil.StoragePoolUpdater

Expand Down Expand Up @@ -96,18 +99,44 @@ func (r *ReportLocalStoragePlugin) Reconcile(ctx *plugin.Context) (result plugin

// report the local storage when the StoragePool is created in the first place.
if isPool && pool != nil {
totalBs := pool.GetAvailableBytes()
if _, has := pool.Labels[v1.PoolLocalStorageBytesKey]; !has {
log.Info("update node/status capacity", "local-storage", totalBs)
// update Pool Label "obnvmf/node-local-storage-size" = totalBs
err = r.PoolUtil.SavePoolLocalStorageMark(pool, uint64(totalBs))
var (
localBS uint64
node corev1.Node
snode *state.Node
hasNodeRes bool
hasPoolLabel bool
)

// calculate local storage
snode, err = stateObj.GetNodeByNodeID(pool.Name)
if err != nil {
log.Error(err, "find node failed")
return plugin.Result{Error: err}
}
localBS = CalculateLocalStorageCapacity(snode)

// get node
err = r.Client.Get(ctx.ReqCtx.Ctx, client.ObjectKey{Name: pool.Name}, &node)
if err != nil {
log.Error(err, "getting Node failed")
return plugin.Result{Error: err}
}

_, hasNodeRes = node.Status.Allocatable[kubeutil.SdsLocalStorageResourceKey]
_, hasPoolLabel = pool.Labels[v1.PoolLocalStorageBytesKey]
log.Info("check pool PoolLocalStorageBytesKey and node SdsLocalStorageResourceKey", "nodeResource", hasNodeRes, "hasPoolLabel", hasPoolLabel)

if !hasPoolLabel || !hasNodeRes {
log.Info("update node/status capacity", "local-storage", localBS)
// update Pool Label "obnvmf/local-storage-bytes" = totalBs
err = r.PoolUtil.SavePoolLocalStorageMark(pool, localBS)
if err != nil {
log.Error(err, "SavePoolLocalStorageMark failed")
return plugin.Result{Error: err}
}

// update node/status capacity = totalBs
_, err = r.NodeUpdater.ReportLocalDiskResource(pool.Name, uint64(totalBs))
_, err = r.NodeUpdater.ReportLocalDiskResource(pool.Name, localBS)
if err != nil {
log.Error(err, "ReportLocalDiskResource failed")
return plugin.Result{Error: err}
Expand All @@ -126,28 +155,6 @@ func (r *ReportLocalStoragePlugin) Reconcile(ctx *plugin.Context) (result plugin
}
var sp = node.Pool

/*
var localStorePct int
var volInState *v1.AntstorVolume
for _, item := range r.ReportLocalConfigs {
selector, err := metav1.LabelSelectorAsSelector(&item.LabelSelector)
if err != nil {
log.Error(err, "LabelSelectorAsSelector failed", "selector", item.LabelSelector)
continue
}
if selector.Matches(labels.Set(sp.Spec.NodeInfo.Labels)) && item.EnableDefault {
localStorePct = item.DefaultLocalStoragePct
log.Info("matched local-storage percentage", "pct", localStorePct)
}
}

volInState, err = node.GetVolumeByID(volume.Spec.Uuid)
if err == nil {
log.Info("copy volume into state")
*volInState = *volume
}
*/

var expectLocalSize = CalculateLocalStorageCapacity(node)
var localSizeStr = strconv.Itoa(int(expectLocalSize))
log.Info("compare local storage size", "in label", sp.Labels[v1.PoolLocalStorageBytesKey], "expect", localSizeStr, "delTS", volume.DeletionTimestamp)
Expand Down
Binary file added doc/image/wechat_group.JPG
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions hack/deploy/lvm/050-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ data:
nodeTaints:
- key: node.sigma.ali/lifecycle
operator: Exists
#nodeReservations:
#- id: obnvmf/app-vol
# size: 107374182400 # 100Gi
pluginConfigs:
defaultLocalSpaceRules:
- enableDefault: true
Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ type SchedulerConfig struct {
NodeCacheSelector map[string]string `json:"nodeCacheSelector" yaml:"nodeCacheSelector"`
// MinLocalStoragePct defines the minimun percentage of local storage to be reserved on one node.
MinLocalStoragePct int `json:"minLocalStoragePct" yaml:"minLocalStoragePct"`
// NodeReservations defines the reservations on each node
NodeReservations []NodeReservation `json:"nodeReservations" yaml:"nodeReservations"`
}

type NodeReservation struct {
ID string `json:"id" yaml:"id"`
Size int64 `json:"size" yaml:"size"`
}

type NoScheduleConfig struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/manager/controllers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager {
Concurrency: 4,
MainHandler: &reconciler.StoragePoolReconcileHandler{
Client: mgr.GetClient(),
Cfg: req.ControllerConfig,
State: stateObj,
PoolUtil: poolUtil,
KubeCli: kubeClient,
Expand Down
16 changes: 16 additions & 0 deletions pkg/controller/manager/reconciler/pool_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/kubeutil"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/plugin"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state"
"code.alipay.com/dbplatform/node-disk-controller/pkg/util/misc"
Expand All @@ -39,6 +40,7 @@ var (
type StoragePoolReconcileHandler struct {
client.Client

Cfg config.Config
State state.StateIface
PoolUtil kubeutil.StoragePoolUpdater
KubeCli kubernetes.Interface
Expand Down Expand Up @@ -227,6 +229,7 @@ func (r *StoragePoolReconcileHandler) handleDeletion(pCtx *plugin.Context) (resu
func (r *StoragePoolReconcileHandler) saveToState(sp *v1.StoragePool, log logr.Logger) (result plugin.Result) {
var patch = client.MergeFrom(sp.DeepCopy())
var err error
var node *state.Node

r.State.SetStoragePool(sp)

Expand All @@ -244,6 +247,19 @@ func (r *StoragePoolReconcileHandler) saveToState(sp *v1.StoragePool, log logr.L
}
}

// try to add reservation by config
node, err = r.State.GetNodeByNodeID(sp.Name)
if err != nil {
log.Error(err, "GetNodeByNodeID error")
}
for _, item := range r.Cfg.Scheduler.NodeReservations {
if node != nil {
if _, has := node.GetReservation(item.ID); !has {
node.Reserve(state.NewReservation(item.ID, item.Size))
}
}
}

return plugin.Result{}
}

Expand Down
25 changes: 25 additions & 0 deletions pkg/controller/manager/scheduler/filter/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ func BasicFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume) b
)
// check if err is nil

// if voume matches reservation, then do not do following checks
if pass, hasErr := matchReservationFilter(ctx, n, vol); hasErr || pass {
return pass
}

// consider Pool FreeSpace
var freeRes = n.GetFreeResourceNonLock()
var freeDisk = freeRes[v1.ResourceDiskPoolByte]
Expand Down Expand Up @@ -100,3 +105,23 @@ func BasicFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume) b

return true
}

func matchReservationFilter(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume) (pass, hasError bool) {
if resvId, has := vol.Annotations[v1.ReservationIDKey]; has {
free := n.FreeResource.Storage()
if free.CmpInt64(0) < 0 {
ctx.Error.AddReason(ReasonPoolFreeSize)
return false, true
}

if r, has := n.GetReservation(resvId); has {
if r.Size() < int64(vol.Spec.SizeByte) {
ctx.Error.AddReason(ReasonReservationSize)
return false, true
}
return true, false
}
}

return false, false
}
1 change: 1 addition & 0 deletions pkg/controller/manager/scheduler/filter/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
ReasonNodeAffinity = "NodeAffinity"
ReasonPoolAffinity = "PoolAffinity"
ReasonPoolUnschedulable = "PoolUnschedulable"
ReasonReservationSize = "ReservationTooSmall"

NoStoragePoolAvailable = "NoStoragePoolAvailable"
//
Expand Down
43 changes: 33 additions & 10 deletions pkg/controller/manager/state/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,31 @@ type NodeStateAPI struct {
KernelLVM *v1.KernelLVM `json:"kernelLVM,omitempty"`
SpdkLVS *v1.SpdkLVStore `json:"spdkLVS,omitempty"`
// Volumes breif info
Volumes []VolumeBreif `json:"volumes"`
// FreeSize of the pool
FreeSize int64 `json:"freeSize"`
Volumes []VolumeBrief `json:"volumes"`
// VgFreeSize of the pool
VgFreeSize int64 `json:"vgFreeSize"`
// MemFreeSize in controller memory
MemFreeSize int64 `json:"memFreeSize"`
// MemFreeSizeStr readable size in controller memory
MemFreeSizeStr string `json:"memFreeSizeStr"`
// Conditions of the pool status
Conditions map[v1.PoolConditionType]v1.ConditionStatus `json:"conditions"`
// Resvervations on the node
Resvervations []ReservationBreif `json:"reservations"`
}

type VolumeBreif struct {
type VolumeBrief struct {
Namespace string `json:"ns"`
Name string `json:"name"`
DataHolder string `json:"dataHolder"`
Size int64 `json:"size"`
}

type ReservationBreif struct {
ID string `json:"id"`
Size int64 `json:"size"`
}

func NewStateHandler(s StateIface) *StateHandler {
return &StateHandler{state: s}
}
Expand All @@ -49,11 +60,14 @@ func (h *StateHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request)
}

var api = NodeStateAPI{
Name: spName,
PoolLabels: node.Pool.Labels,
KernelLVM: &node.Pool.Spec.KernelLVM,
SpdkLVS: &node.Pool.Spec.SpdkLVStore,
FreeSize: node.Pool.Status.VGFreeSize.Value(),
Name: spName,
PoolLabels: node.Pool.Labels,
KernelLVM: &node.Pool.Spec.KernelLVM,
SpdkLVS: &node.Pool.Spec.SpdkLVStore,
VgFreeSize: node.Pool.Status.VGFreeSize.Value(),
MemFreeSize: int64(node.FreeResource.Storage().AsApproximateFloat64()),
MemFreeSizeStr: node.FreeResource.Storage().String(),

Conditions: make(map[v1.PoolConditionType]v1.ConditionStatus),
}

Expand All @@ -62,14 +76,23 @@ func (h *StateHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request)
}

for _, vol := range node.Volumes {
api.Volumes = append(api.Volumes, VolumeBreif{
api.Volumes = append(api.Volumes, VolumeBrief{
Namespace: vol.Namespace,
Name: vol.Name,
Size: int64(vol.Spec.SizeByte),
DataHolder: vol.Labels[v1.VolumeDataHolderKey],
})
}

if node.resvSet != nil {
for _, resv := range node.resvSet.Items() {
api.Resvervations = append(api.Resvervations, ReservationBreif{
ID: resv.ID(),
Size: resv.Size(),
})
}
}

bs, err := json.Marshal(api)
if err != nil {
writer.Write([]byte(err.Error()))
Expand Down
46 changes: 38 additions & 8 deletions pkg/controller/manager/state/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ func (n *Node) AddVolume(vol *v1.AntstorVolume) (err error) {
defer n.volLock.Unlock()

var nodeID = n.Info.ID
var duplicate bool

// delete reservation if volume has reservation id
if resvID := getVolumeReservationID(vol); resvID != "" {
n.resvSet.Unreserve(resvID)
}

// check duplicate
for _, item := range n.Volumes {
if item.Name == vol.Name {
Expand All @@ -77,19 +84,16 @@ func (n *Node) AddVolume(vol *v1.AntstorVolume) (err error) {
// save the newer volume
*item = *vol.DeepCopy()
klog.Infof("vol %s already in node %s. type and sizes equal to each other", vol.Name, nodeID)
return
duplicate = true
}
}

// delete reservation if volume has reservation id
if resvID := getVolumeReservationID(vol); resvID != "" {
n.resvSet.Unreserve(resvID)
if !duplicate {
// volume reside on Node
vol.Spec.TargetNodeId = n.Pool.Spec.NodeInfo.ID
n.Volumes = append(n.Volumes, vol)
}

n.Volumes = append(n.Volumes, vol)
// volume reside on Node
vol.Spec.TargetNodeId = n.Pool.Spec.NodeInfo.ID

// update free resource
n.FreeResource = n.GetFreeResourceNonLock()

Expand Down Expand Up @@ -231,14 +235,40 @@ func (n *Node) GetFreeResourceNonLock() (free corev1.ResourceList) {

// Reserve storage resource for Node
func (n *Node) Reserve(r ReservationIface) {
// if volume is already binded, then skip reservation.
var resvID = r.ID()
for _, vol := range n.Volumes {
if resvID == getVolumeReservationID(vol) {
return
}
}

// check free resource
if free := n.FreeResource.Storage(); free != nil {
if free.CmpInt64(r.Size()) < 0 {
klog.Errorf("node %s have no enough disk pool space for reservation %s", n.Info.ID, resvID)
return
}
}

n.volLock.Lock()
defer n.volLock.Unlock()

n.resvSet.Reserve(r)
// update free resource
n.FreeResource = n.GetFreeResourceNonLock()
}

// Unreserve storage resource
func (n *Node) Unreserve(id string) {
n.volLock.Lock()
defer n.volLock.Unlock()

n.resvSet.Unreserve(id)
// update free resource
n.FreeResource = n.GetFreeResourceNonLock()
}

func (n *Node) GetReservation(id string) (r ReservationIface, has bool) {
return n.resvSet.GetById(id)
}
Loading