diff --git a/pkg/controller/topology.go b/pkg/controller/topology.go index 93aaf7a9b..02d8fe4ec 100644 --- a/pkg/controller/topology.go +++ b/pkg/controller/topology.go @@ -20,7 +20,7 @@ import ( "fmt" "hash/fnv" "math/rand" - "sort" + "slices" "strconv" "strings" @@ -39,8 +39,14 @@ import ( "k8s.io/klog/v2" ) +type topologySegment struct { + Key, Value string +} + // topologyTerm represents a single term where its topology key value pairs are AND'd together. -type topologyTerm map[string]string +// +// Be sure to sort after construction for compare() and subset() to work properly. +type topologyTerm []topologySegment func GenerateVolumeNodeAffinity(accessibleTopology []*csi.Topology) *v1.VolumeNodeAffinity { if len(accessibleTopology) == 0 { @@ -229,7 +235,8 @@ func GenerateAccessibilityRequirements( return nil, nil } - requisiteTerms = deduplicate(requisiteTerms) + slices.SortFunc(requisiteTerms, topologyTerm.compare) + requisiteTerms = slices.CompactFunc(requisiteTerms, slices.Equal) // TODO (verult) reduce subset duplicate terms (advanced reduction) requirement.Requisite = toCSITopology(requisiteTerms) @@ -243,14 +250,19 @@ func GenerateAccessibilityRequirements( // requisiteTerms and shifting the sorted terms based on hash of pvcName and replica index suffix hash, index := getPVCNameHashAndIndexOffset(pvcName) i := (hash + index) % uint32(len(requisiteTerms)) - preferredTerms = sortAndShift(requisiteTerms, nil, i) + preferredTerms = append(requisiteTerms[i:], requisiteTerms[:i]...) } else { // Delayed binding, use topology from that node to populate preferredTerms if strictTopology { // In case of strict topology, preferred = requisite preferredTerms = requisiteTerms } else { - preferredTerms = sortAndShift(requisiteTerms, selectedTopology, 0) + for i, t := range requisiteTerms { + if t.subset(selectedTopology) { + preferredTerms = append(requisiteTerms[i:], requisiteTerms[:i]...) + break + } + } if preferredTerms == nil { // Topology from selected node is not in requisite. This case should never be hit: // - If AllowedTopologies is specified, the scheduler should choose a node satisfying the @@ -417,12 +429,12 @@ func flatten(allowedTopologies []v1.TopologySelectorTerm) []topologyTerm { if len(oldTerms) == 0 { // No previous terms to distribute over. Simply append the new term. - newTerms = append(newTerms, topologyTerm{selectorExpression.Key: v}) + newTerms = append(newTerms, topologyTerm{{selectorExpression.Key, v}}) } else { for _, oldTerm := range oldTerms { // "Distribute" by adding an entry to the term - newTerm := oldTerm.clone() - newTerm[selectorExpression.Key] = v + newTerm := slices.Clone(oldTerm) + newTerm = append(newTerm, topologySegment{selectorExpression.Key, v}) newTerms = append(newTerms, newTerm) } } @@ -435,41 +447,10 @@ func flatten(allowedTopologies []v1.TopologySelectorTerm) []topologyTerm { finalTerms = append(finalTerms, oldTerms...) } - return finalTerms -} - -func deduplicate(terms []topologyTerm) []topologyTerm { - termMap := make(map[string]topologyTerm) - for _, term := range terms { - termMap[term.hash()] = term - } - - var dedupedTerms []topologyTerm - for _, term := range termMap { - dedupedTerms = append(dedupedTerms, term) - } - return dedupedTerms -} - -// Sort the given terms in place, -// then return a new list of terms equivalent to the sorted terms, but shifted so that -// either the primary term (if specified) or term at shiftIndex is the first in the list. -func sortAndShift(terms []topologyTerm, primary topologyTerm, shiftIndex uint32) []topologyTerm { - var preferredTerms []topologyTerm - sort.Slice(terms, func(i, j int) bool { - return terms[i].less(terms[j]) - }) - if primary == nil { - preferredTerms = append(terms[shiftIndex:], terms[:shiftIndex]...) - } else { - for i, t := range terms { - if t.subset(primary) { - preferredTerms = append(terms[i:], terms[:i]...) - break - } - } + for _, term := range finalTerms { + term.sort() } - return preferredTerms + return finalTerms } func getTopologyKeys(csiNode *storagev1.CSINode, driverName string) []string { @@ -482,14 +463,15 @@ func getTopologyKeys(csiNode *storagev1.CSINode, driverName string) []string { } func getTopologyFromNode(node *v1.Node, topologyKeys []string) (term topologyTerm, isMissingKey bool) { - term = make(topologyTerm) + term = make(topologyTerm, 0, len(topologyKeys)) for _, key := range topologyKeys { v, ok := node.Labels[key] if !ok { return nil, true } - term[key] = v + term = append(term, topologySegment{key, v}) } + term.sort() return term, false } @@ -514,56 +496,65 @@ func buildTopologyKeySelector(topologyKeys []string) (labels.Selector, error) { return selector, nil } -func (t topologyTerm) clone() topologyTerm { - ret := make(topologyTerm) - for k, v := range t { - ret[k] = v - } - return ret +func (t topologyTerm) sort() { + slices.SortFunc(t, func(a, b topologySegment) int { + r := strings.Compare(a.Key, b.Key) + if r != 0 { + return r + } + // Should not happen currently. We may support multi-value in the future? + return strings.Compare(a.Value, b.Value) + }) } -// "#,#,..." -// Hash properties: -// - Two equivalent topologyTerms have the same hash -// - Ordering of hashes correspond to a natural ordering of their topologyTerms. For example: -// - com.example.csi/zone#zone1 < com.example.csi/zone#zone2 -// - com.example.csi/rack#zz < com.example.csi/zone#zone1 -// - com.example.csi/z#z1 < com.example.csi/zone#zone1 -// - com.example.csi/rack#rackA,com.example.csi/zone#zone2 < com.example.csi/rackB,com.example.csi/zone#zone1 -// Note that both '#' and ',' are less than '/', '-', '_', '.', [A-Z0-9a-z] -func (t topologyTerm) hash() string { - var segments []string - for k, v := range t { - segments = append(segments, k+"#"+v) +func (t topologyTerm) compare(other topologyTerm) int { + if len(t) != len(other) { + return len(t) - len(other) } - - sort.Strings(segments) - return strings.Join(segments, ",") -} - -func (t topologyTerm) less(other topologyTerm) bool { - return t.hash() < other.hash() + for i, k1 := range t { + k2 := other[i] + r := strings.Compare(k1.Key, k2.Key) + if r != 0 { + return r + } + r = strings.Compare(k1.Value, k2.Value) + if r != 0 { + return r + } + } + return 0 } func (t topologyTerm) subset(other topologyTerm) bool { - for key, tv := range t { - v, ok := other[key] - if !ok || v != tv { + if len(t) == 0 { + return true + } + j := 0 + for _, k2 := range other { + k1 := t[j] + if k1.Key != k2.Key { + continue + } + if k1.Value != k2.Value { return false } + j++ + if j == len(t) { + // All segments in t have been checked and is present in other. + return true + } } - - return true -} - -func (t topologyTerm) equal(other topologyTerm) bool { - return t.hash() == other.hash() + return false } func toCSITopology(terms []topologyTerm) []*csi.Topology { - var out []*csi.Topology + out := make([]*csi.Topology, 0, len(terms)) for _, term := range terms { - out = append(out, &csi.Topology{Segments: term}) + segs := make(map[string]string, len(term)) + for _, k := range term { + segs[k.Key] = k.Value + } + out = append(out, &csi.Topology{Segments: segs}) } return out } diff --git a/pkg/controller/topology_test.go b/pkg/controller/topology_test.go index d4bbc2d64..46c4d32a6 100644 --- a/pkg/controller/topology_test.go +++ b/pkg/controller/topology_test.go @@ -18,10 +18,12 @@ package controller import ( "fmt" + "slices" "testing" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" "google.golang.org/protobuf/testing/protocmp" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" @@ -1454,6 +1456,42 @@ func TestPreferredTopologies(t *testing.T) { } } +func BenchmarkDedupAndSortZone(b *testing.B) { + terms := make([]topologyTerm, 0, 3000) + for range 1000 { + for _, zone := range [...]string{"zone1", "zone2", "zone3"} { + terms = append(terms, topologyTerm{ + {"topology.kubernetes.io/region", "some-region"}, + {"topology.kubernetes.io/zone", zone}, + }) + } + } + benchmarkDedupAndSort(b, terms) +} + +func BenchmarkDedupAndSortHost(b *testing.B) { + terms := make([]topologyTerm, 0, 3000) + for i := range 1000 { + for j, zone := range [...]string{"zone1", "zone2", "zone3"} { + terms = append(terms, topologyTerm{ + {"example.com/instance-id", fmt.Sprintf("i-%05d", i+j*10000)}, + {"topology.kubernetes.io/region", "some-region"}, + {"topology.kubernetes.io/zone", zone}, + }) + } + } + benchmarkDedupAndSort(b, terms) +} + +func benchmarkDedupAndSort(b *testing.B, terms []topologyTerm) { + for range b.N { + terms := slices.Clone(terms) + slices.SortFunc(terms, topologyTerm.compare) + terms = slices.CompactFunc(terms, slices.Equal) + toCSITopology(terms) + } +} + func buildNodes(nodeLabels []map[string]string) *v1.NodeList { list := &v1.NodeList{} i := 0 @@ -1636,3 +1674,160 @@ func listers(kubeClient *fakeclientset.Clientset) ( factory.WaitForCacheSync(stopChan) return scLister, csiNodeLister, nodeLister, claimLister, vaLister, stopChan } + +func TestTopologyTermSort(t *testing.T) { + testCases := []struct { + name string + terms, expected topologyTerm + }{ + { + name: "empty", + }, + { + name: "single-key", + terms: topologyTerm{ + {"zone", "us-east-1a"}, + }, + expected: topologyTerm{ + {"zone", "us-east-1a"}, + }, + }, + { + name: "multiple-keys", + terms: topologyTerm{ + {"zone", "us-east-1a"}, + {"instance", "i-123"}, + }, + expected: topologyTerm{ + {"instance", "i-123"}, + {"zone", "us-east-1a"}, + }, + }, + { + name: "multiple-values", // should not happen currently + terms: topologyTerm{ + {"zone", "us-east-1b"}, + {"zone", "us-east-1a"}, + }, + expected: topologyTerm{ + {"zone", "us-east-1a"}, + {"zone", "us-east-1b"}, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tc.terms.sort() + assert.Equal(t, tc.expected, tc.terms) + }) + } +} + +func TestTopologyTermCompare(t *testing.T) { + testCases := []struct { + name string + first, second topologyTerm + }{ + { + name: "shorter", + first: topologyTerm{}, + second: topologyTerm{ + {"zone", "us-east-1a"}, + }, + }, + { + name: "key-smaller", + first: topologyTerm{ + {"instance", "i-123"}, + }, + second: topologyTerm{ + {"zone", "us-east-1a"}, + }, + }, + { + name: "value-smaller", + first: topologyTerm{ + {"instance", "i-123"}, + {"zone", "us-east-1a"}, + }, + second: topologyTerm{ + {"instance", "i-123"}, + {"zone", "us-east-1b"}, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Less(t, tc.first.compare(tc.second), 0) + assert.Greater(t, tc.second.compare(tc.first), 0) + assert.Equal(t, 0, tc.first.compare(tc.first)) + assert.Equal(t, 0, tc.second.compare(tc.second)) + }) + } +} + +func TestTopologyTermSubset(t *testing.T) { + testCases := []struct { + name string + terms, other topologyTerm + subset bool + }{ + { + name: "empty", + subset: true, + }, + { + name: "shorter", + terms: topologyTerm{}, + other: topologyTerm{{"zone", "us-east-1a"}}, + subset: true, + }, + { + name: "longer", + terms: topologyTerm{{"zone", "us-east-1a"}}, + other: topologyTerm{}, + subset: false, + }, + { + name: "same", + terms: topologyTerm{{"zone", "us-east-1a"}}, + other: topologyTerm{{"zone", "us-east-1a"}}, + subset: true, + }, + { + name: "shorter-2", + terms: topologyTerm{{"instance", "i-123"}}, + other: topologyTerm{{"instance", "i-123"}, {"zone", "us-east-1a"}}, + subset: true, + }, + { + name: "longer-2", + terms: topologyTerm{{"instance", "i-123"}, {"zone", "us-east-1a"}}, + other: topologyTerm{{"instance", "i-123"}}, + subset: false, + }, + { + name: "shorter-3", + terms: topologyTerm{{"zone", "us-east-1a"}}, + other: topologyTerm{{"instance", "i-123"}, {"zone", "us-east-1a"}}, + subset: true, + }, + { + name: "longer-3", + terms: topologyTerm{{"instance", "i-123"}, {"zone", "us-east-1a"}}, + other: topologyTerm{{"zone", "us-east-1a"}}, + subset: false, + }, + { + name: "unrelated", + terms: topologyTerm{{"instance", "i-123"}}, + other: topologyTerm{{"zone", "us-east-1a"}}, + subset: false, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.subset, tc.terms.subset(tc.other)) + }) + } +}