Skip to content

Commit

Permalink
Merge pull request #1310 from huww98/optimize-topologyTerm
Browse files Browse the repository at this point in the history
Optimize for topology term
  • Loading branch information
k8s-ci-robot authored Dec 13, 2024
2 parents 7463124 + bd3b4a1 commit c79d156
Show file tree
Hide file tree
Showing 2 changed files with 267 additions and 81 deletions.
153 changes: 72 additions & 81 deletions pkg/controller/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"fmt"
"hash/fnv"
"math/rand"
"sort"
"slices"
"strconv"
"strings"

Expand All @@ -39,8 +39,14 @@ import (
"k8s.io/klog/v2"
)

type topologySegment struct {
Key, Value string
}

// topologyTerm represents a single term where its topology key value pairs are AND'd together.
type topologyTerm map[string]string
//
// Be sure to sort after construction for compare() and subset() to work properly.
type topologyTerm []topologySegment

func GenerateVolumeNodeAffinity(accessibleTopology []*csi.Topology) *v1.VolumeNodeAffinity {
if len(accessibleTopology) == 0 {
Expand Down Expand Up @@ -229,7 +235,8 @@ func GenerateAccessibilityRequirements(
return nil, nil
}

requisiteTerms = deduplicate(requisiteTerms)
slices.SortFunc(requisiteTerms, topologyTerm.compare)
requisiteTerms = slices.CompactFunc(requisiteTerms, slices.Equal)
// TODO (verult) reduce subset duplicate terms (advanced reduction)

requirement.Requisite = toCSITopology(requisiteTerms)
Expand All @@ -243,14 +250,19 @@ func GenerateAccessibilityRequirements(
// requisiteTerms and shifting the sorted terms based on hash of pvcName and replica index suffix
hash, index := getPVCNameHashAndIndexOffset(pvcName)
i := (hash + index) % uint32(len(requisiteTerms))
preferredTerms = sortAndShift(requisiteTerms, nil, i)
preferredTerms = append(requisiteTerms[i:], requisiteTerms[:i]...)
} else {
// Delayed binding, use topology from that node to populate preferredTerms
if strictTopology {
// In case of strict topology, preferred = requisite
preferredTerms = requisiteTerms
} else {
preferredTerms = sortAndShift(requisiteTerms, selectedTopology, 0)
for i, t := range requisiteTerms {
if t.subset(selectedTopology) {
preferredTerms = append(requisiteTerms[i:], requisiteTerms[:i]...)
break
}
}
if preferredTerms == nil {
// Topology from selected node is not in requisite. This case should never be hit:
// - If AllowedTopologies is specified, the scheduler should choose a node satisfying the
Expand Down Expand Up @@ -417,12 +429,12 @@ func flatten(allowedTopologies []v1.TopologySelectorTerm) []topologyTerm {

if len(oldTerms) == 0 {
// No previous terms to distribute over. Simply append the new term.
newTerms = append(newTerms, topologyTerm{selectorExpression.Key: v})
newTerms = append(newTerms, topologyTerm{{selectorExpression.Key, v}})
} else {
for _, oldTerm := range oldTerms {
// "Distribute" by adding an entry to the term
newTerm := oldTerm.clone()
newTerm[selectorExpression.Key] = v
newTerm := slices.Clone(oldTerm)
newTerm = append(newTerm, topologySegment{selectorExpression.Key, v})
newTerms = append(newTerms, newTerm)
}
}
Expand All @@ -435,41 +447,10 @@ func flatten(allowedTopologies []v1.TopologySelectorTerm) []topologyTerm {
finalTerms = append(finalTerms, oldTerms...)
}

return finalTerms
}

func deduplicate(terms []topologyTerm) []topologyTerm {
termMap := make(map[string]topologyTerm)
for _, term := range terms {
termMap[term.hash()] = term
}

var dedupedTerms []topologyTerm
for _, term := range termMap {
dedupedTerms = append(dedupedTerms, term)
}
return dedupedTerms
}

// Sort the given terms in place,
// then return a new list of terms equivalent to the sorted terms, but shifted so that
// either the primary term (if specified) or term at shiftIndex is the first in the list.
func sortAndShift(terms []topologyTerm, primary topologyTerm, shiftIndex uint32) []topologyTerm {
var preferredTerms []topologyTerm
sort.Slice(terms, func(i, j int) bool {
return terms[i].less(terms[j])
})
if primary == nil {
preferredTerms = append(terms[shiftIndex:], terms[:shiftIndex]...)
} else {
for i, t := range terms {
if t.subset(primary) {
preferredTerms = append(terms[i:], terms[:i]...)
break
}
}
for _, term := range finalTerms {
term.sort()
}
return preferredTerms
return finalTerms
}

func getTopologyKeys(csiNode *storagev1.CSINode, driverName string) []string {
Expand All @@ -482,14 +463,15 @@ func getTopologyKeys(csiNode *storagev1.CSINode, driverName string) []string {
}

func getTopologyFromNode(node *v1.Node, topologyKeys []string) (term topologyTerm, isMissingKey bool) {
term = make(topologyTerm)
term = make(topologyTerm, 0, len(topologyKeys))
for _, key := range topologyKeys {
v, ok := node.Labels[key]
if !ok {
return nil, true
}
term[key] = v
term = append(term, topologySegment{key, v})
}
term.sort()
return term, false
}

Expand All @@ -514,56 +496,65 @@ func buildTopologyKeySelector(topologyKeys []string) (labels.Selector, error) {
return selector, nil
}

func (t topologyTerm) clone() topologyTerm {
ret := make(topologyTerm)
for k, v := range t {
ret[k] = v
}
return ret
func (t topologyTerm) sort() {
slices.SortFunc(t, func(a, b topologySegment) int {
r := strings.Compare(a.Key, b.Key)
if r != 0 {
return r
}
// Should not happen currently. We may support multi-value in the future?
return strings.Compare(a.Value, b.Value)
})
}

// "<k1>#<v1>,<k2>#<v2>,..."
// Hash properties:
// - Two equivalent topologyTerms have the same hash
// - Ordering of hashes correspond to a natural ordering of their topologyTerms. For example:
// - com.example.csi/zone#zone1 < com.example.csi/zone#zone2
// - com.example.csi/rack#zz < com.example.csi/zone#zone1
// - com.example.csi/z#z1 < com.example.csi/zone#zone1
// - com.example.csi/rack#rackA,com.example.csi/zone#zone2 < com.example.csi/rackB,com.example.csi/zone#zone1
// Note that both '#' and ',' are less than '/', '-', '_', '.', [A-Z0-9a-z]
func (t topologyTerm) hash() string {
var segments []string
for k, v := range t {
segments = append(segments, k+"#"+v)
func (t topologyTerm) compare(other topologyTerm) int {
if len(t) != len(other) {
return len(t) - len(other)
}

sort.Strings(segments)
return strings.Join(segments, ",")
}

func (t topologyTerm) less(other topologyTerm) bool {
return t.hash() < other.hash()
for i, k1 := range t {
k2 := other[i]
r := strings.Compare(k1.Key, k2.Key)
if r != 0 {
return r
}
r = strings.Compare(k1.Value, k2.Value)
if r != 0 {
return r
}
}
return 0
}

func (t topologyTerm) subset(other topologyTerm) bool {
for key, tv := range t {
v, ok := other[key]
if !ok || v != tv {
if len(t) == 0 {
return true
}
j := 0
for _, k2 := range other {
k1 := t[j]
if k1.Key != k2.Key {
continue
}
if k1.Value != k2.Value {
return false
}
j++
if j == len(t) {
// All segments in t have been checked and is present in other.
return true
}
}

return true
}

func (t topologyTerm) equal(other topologyTerm) bool {
return t.hash() == other.hash()
return false
}

func toCSITopology(terms []topologyTerm) []*csi.Topology {
var out []*csi.Topology
out := make([]*csi.Topology, 0, len(terms))
for _, term := range terms {
out = append(out, &csi.Topology{Segments: term})
segs := make(map[string]string, len(term))
for _, k := range term {
segs[k.Key] = k.Value
}
out = append(out, &csi.Topology{Segments: segs})
}
return out
}
Expand Down
Loading

0 comments on commit c79d156

Please sign in to comment.