Skip to content

Commit

Permalink
e2e test done
Browse files Browse the repository at this point in the history
  • Loading branch information
jiayouxujin committed Aug 9, 2023
1 parent f009f89 commit 554d228
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 83 deletions.
2 changes: 1 addition & 1 deletion examples/sentinel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: kvrocks.com/v1alpha1
kind: KVRocks
metadata:
name: sentinel-1
namespace: default
namespace: kvrocks
spec:
image: redis:6.2.4
imagePullPolicy: IfNotPresent
Expand Down
10 changes: 5 additions & 5 deletions examples/standard.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: kvrocks.apache.org/v1alpha1
kind: KVRocks
metadata:
name: kvrocks-standard-1-demo
namespace: default
namespace: kvrocks
labels:
kvrocks/system: gt
kvrocks/monitored-by: sentinel-1
Expand Down Expand Up @@ -52,8 +52,8 @@ spec:
operator: Exists
resources:
limits:
cpu: 500m
memory: 500Mi
cpu: 2
memory: 8Gi
requests:
cpu: 500m
memory: 500Mi
cpu: 1
memory: 4Gi
11 changes: 0 additions & 11 deletions pkg/client/kvrocks/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,3 @@ func (s *Client) SubOdownMsg(ip, password string) (*redis.PubSub, func()) {
return pubsub, finalize

}

func (s *Client) ResetMonitorPassword(sentinelIP, sentinelPassword, master, password string) error {
c := kvrocksSentinelClient(sentinelIP, sentinelPassword)
defer c.Close()
var err error
if err = c.Set(ctx, master, "AUTH-PASS", password).Err(); err != nil {
return err
}
s.logger.V(1).Info("sentinel reset master password successfully", "master", master)
return nil
}
4 changes: 2 additions & 2 deletions pkg/controllers/sentinel/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func (h *KVRocksSentinelHandler) ensureMonitor(masterIP, masterName, password st
if err := h.kvrocks.CreateMonitor(sentinelIP, sentinelPassword, masterName, masterIP, password); err != nil {
return err
}
} else { // maybe password change
if err = h.kvrocks.ResetMonitorPassword(sentinelIP, sentinelPassword, masterName, password); err != nil {
} else {
if err = h.kvrocks.ResetMonitor(sentinelIP, sentinelPassword, masterName, password); err != nil {
return err
}
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/controllers/standard/kvrocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strconv"

"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

kvrocksv1alpha1 "github.com/RocksLabs/kvrocks-operator/api/v1alpha1"
"github.com/RocksLabs/kvrocks-operator/pkg/client/kvrocks"
Expand Down Expand Up @@ -92,6 +93,14 @@ func (h *KVRocksStandardHandler) ensureKVRocksReplication() error {
}
}
h.log.V(1).Info("kvrocks replication ok")
// add Finalizer
if !controllerutil.ContainsFinalizer(h.instance, kvrocksv1alpha1.KVRocksFinalizer) {
controllerutil.AddFinalizer(h.instance, kvrocksv1alpha1.KVRocksFinalizer)
if err := h.k8s.UpdateKVRocks(h.instance); err != nil {
return err
}
}
// notify sentinel to update
if v, ok := h.instance.Labels[resources.MonitoredBy]; ok {
return h.updateSentinelAnnotationCount(v)
}
Expand Down
141 changes: 77 additions & 64 deletions test/e2e/standard/standard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,70 +55,77 @@ var _ = Describe("Operator for Standard Mode", func() {
)

var (
instance *kvrocksv1alpha1.KVRocks
key types.NamespacedName
kvrocksInstance *kvrocksv1alpha1.KVRocks
sentinelInstance *kvrocksv1alpha1.KVRocks
kvrocksKey types.NamespacedName
sentinelKey types.NamespacedName
)

BeforeEach(func() {
var err error
instance, err = env.ParseManifest(kvrocksv1alpha1.StandardType)
kvrocksInstance, err = env.ParseManifest(kvrocksv1alpha1.StandardType)
Expect(err).Should(Succeed())
sentinelInstance, err = env.ParseManifest(kvrocksv1alpha1.SentinelType)
Expect(err).Should(Succeed())

key = types.NamespacedName{
Namespace: instance.GetNamespace(),
Name: instance.GetName(),
kvrocksKey = types.NamespacedName{
Namespace: kvrocksInstance.GetNamespace(),
Name: kvrocksInstance.GetName(),
}

Expect(env.Client.Create(ctx, instance)).Should(Succeed())
sentinelKey = types.NamespacedName{
Namespace: sentinelInstance.GetNamespace(),
Name: sentinelInstance.GetName(),
}

Expect(env.Client.Create(ctx, kvrocksInstance)).Should(Succeed())
Expect(env.Client.Create(ctx, sentinelInstance)).Should(Succeed())
Eventually(func() error {
if err = env.Client.Get(ctx, key, instance); err != nil {
if err = env.Client.Get(ctx, kvrocksKey, kvrocksInstance); err != nil {
return err
}
if instance.Status.Status != kvrocksv1alpha1.StatusRunning {
if kvrocksInstance.Status.Status != kvrocksv1alpha1.StatusRunning {
return errors.New("kvrocks doesn't reach running status")
}
if !controllerutil.ContainsFinalizer(instance, kvrocksv1alpha1.KVRocksFinalizer) {
if !controllerutil.ContainsFinalizer(kvrocksInstance, kvrocksv1alpha1.KVRocksFinalizer) {
return errors.New("kvrocks doesn't contain finalizer")
}
return nil
}, timeout, interval).Should(Succeed())
Eventually(func() error {
return checkKVRocks(instance)
return checkKVRocks(kvrocksInstance, sentinelInstance)
}, timeout, interval).Should(Succeed())
})

AfterEach(func() {
Expect(env.Client.Delete(ctx, instance)).Should(Succeed())
Expect(env.Client.Delete(ctx, kvrocksInstance)).Should(Succeed())
Expect(env.Client.Delete(ctx, sentinelInstance)).Should(Succeed())
Eventually(func() bool {
return k8serr.IsNotFound(env.Client.Get(ctx, key, instance))
b1 := k8serr.IsNotFound(env.Client.Get(ctx, kvrocksKey, kvrocksInstance))
b2 := k8serr.IsNotFound(env.Client.Get(ctx, sentinelKey, sentinelInstance))
return b1 && b2
}, timeout, interval).Should(Equal(true))
})

It("test update kvrocks config", func() {
instance.Spec.KVRocksConfig["slowlog-log-slower-than"] = "250000"
instance.Spec.KVRocksConfig["profiling-sample-record-threshold-ms"] = "200"
Expect(env.Client.Update(ctx, instance)).Should(Succeed())
kvrocksInstance.Spec.KVRocksConfig["slowlog-log-slower-than"] = "250000"
kvrocksInstance.Spec.KVRocksConfig["profiling-sample-record-threshold-ms"] = "200"
Expect(env.Client.Update(ctx, kvrocksInstance)).Should(Succeed())
Eventually(func() error {
return checkKVRocks(instance)
return checkKVRocks(kvrocksInstance, sentinelInstance)
}, timeout, interval).Should(Succeed())
})

It("test change password", func() {
instance.Spec.Password = "39c5bb"
Expect(env.Client.Update(ctx, instance)).Should(Succeed())
kvrocksInstance.Spec.Password = "39c5bb"
Expect(env.Client.Update(ctx, kvrocksInstance)).Should(Succeed())
Eventually(func() error {
return checkKVRocks(instance)
return checkKVRocks(kvrocksInstance, sentinelInstance)
}, timeout, interval).Should(Succeed())
})

It("test recover when slave down", func() {
var pod corev1.Pod
key := types.NamespacedName{
Namespace: instance.GetNamespace(),
Name: fmt.Sprintf("%s-%d", instance.GetName(), 1), // TODO remove 1
}
Expect(env.Client.Get(ctx, key, &pod)).Should(Succeed())
Expect(pod.Labels[resources.RedisRole]).Should(Equal(kvrocks.RoleSlaver))
key, pod := getKvrocksByRole(kvrocksInstance, kvrocks.RoleSlaver)
Expect(env.Client.Delete(ctx, &pod)).Should(Succeed())

time.Sleep(time.Second * 30)
Expand All @@ -135,18 +142,12 @@ var _ = Describe("Operator for Standard Mode", func() {
return nil
}, timeout, interval).Should(Succeed())
Eventually(func() error {
return checkKVRocks(instance)
return checkKVRocks(kvrocksInstance, sentinelInstance)
}, timeout, interval).Should(Succeed())
})

It("test recover when master down", func() {
var pod corev1.Pod
key := types.NamespacedName{
Namespace: instance.GetNamespace(),
Name: fmt.Sprintf("%s-%d", instance.GetName(), 0), // TODO remove 0
}
Expect(env.Client.Get(ctx, key, &pod)).Should(Succeed())
Expect(pod.Labels[resources.RedisRole]).Should(Equal(kvrocks.RoleMaster))
key, pod := getKvrocksByRole(kvrocksInstance, kvrocks.RoleMaster)
Expect(env.Client.Delete(ctx, &pod)).Should(Succeed())
// wait pod reconstruction
time.Sleep(time.Second * 30)
Expand All @@ -163,25 +164,24 @@ var _ = Describe("Operator for Standard Mode", func() {
return nil
}, timeout, interval).Should(Succeed())
Eventually(func() error {
return checkKVRocks(instance)
return checkKVRocks(kvrocksInstance, sentinelInstance)
}, timeout, interval).Should(Succeed())
})

It("test recover when sentinel down", func() {
sentinel := resources.GetSentinelInstance(instance)
podList, err := getSentinelPodList(sentinel)
podList, err := getSentinelPodList(sentinelInstance)
Expect(err).Should(Succeed())
Expect(len(podList.Items)).Should(Equal(int(sentinel.Spec.Replicas)))
Expect(len(podList.Items)).Should(Equal(int(sentinelInstance.Spec.Replicas)))
Expect(env.Client.Delete(ctx, &podList.Items[0])).Should(Succeed())

// wait pod reconstruction
time.Sleep(time.Second * 30)
Eventually(func() error {
podList, err := getSentinelPodList(sentinel)
podList, err := getSentinelPodList(sentinelInstance)
if err != nil {
return err
}
if len(podList.Items) != int(sentinel.Spec.Replicas) {
if len(podList.Items) != int(sentinelInstance.Spec.Replicas) {
return fmt.Errorf("please wait pod running")
}
for _, pod := range podList.Items {
Expand All @@ -192,18 +192,18 @@ var _ = Describe("Operator for Standard Mode", func() {
return nil
}, timeout, interval).Should(Succeed())
Eventually(func() error {
return checkKVRocks(instance)
return checkKVRocks(kvrocksInstance, sentinelInstance)
}, timeout, interval).Should(Succeed())
})

It("test shrink", func() {
Expect(env.Client.Get(ctx, key, instance)).Should(Succeed())
Expect(env.Client.Get(ctx, kvrocksKey, kvrocksInstance)).Should(Succeed())
// pod xx-1 should be reserved
instance.Spec.Replicas = 1
Expect(env.Client.Update(ctx, instance)).Should(Succeed())
kvrocksInstance.Spec.Replicas = 1
Expect(env.Client.Update(ctx, kvrocksInstance)).Should(Succeed())
var sts kruise.StatefulSet
Eventually(func() error {
Expect(env.Client.Get(ctx, key, &sts)).Should(Succeed())
Expect(env.Client.Get(ctx, kvrocksKey, &sts)).Should(Succeed())
if sts.Status.ReadyReplicas != int32(1) {
return errors.New("ready replicas error")
}
Expand All @@ -213,17 +213,17 @@ var _ = Describe("Operator for Standard Mode", func() {
return nil
}, timeout, interval).Should(Succeed())
Eventually(func() error {
return checkKVRocks(instance)
return checkKVRocks(kvrocksInstance, sentinelInstance)
}, timeout, interval).Should(Succeed())
})

It("test expansion", func() {
Expect(env.Client.Get(ctx, key, instance)).Should(Succeed())
instance.Spec.Replicas = 5
Expect(env.Client.Update(ctx, instance)).Should(Succeed())
Expect(env.Client.Get(ctx, kvrocksKey, kvrocksInstance)).Should(Succeed())
kvrocksInstance.Spec.Replicas = 5
Expect(env.Client.Update(ctx, kvrocksInstance)).Should(Succeed())
Eventually(func() error {
var sts kruise.StatefulSet
Expect(env.Client.Get(ctx, key, &sts)).Should(Succeed())
Expect(env.Client.Get(ctx, kvrocksKey, &sts)).Should(Succeed())
if sts.Status.ReadyReplicas != 5 {
return errors.New("replication error")
}
Expand All @@ -233,23 +233,23 @@ var _ = Describe("Operator for Standard Mode", func() {
return nil
}, timeout, interval).Should(Succeed())
Eventually(func() error {
return checkKVRocks(instance)
return checkKVRocks(kvrocksInstance, sentinelInstance)
}, timeout, interval).Should(Succeed())
})

})

func checkKVRocks(instance *kvrocksv1alpha1.KVRocks) error {
password := instance.Spec.Password
replicas := int(instance.Spec.Replicas)
func checkKVRocks(kvrocksInstance, sentinelInstance *kvrocksv1alpha1.KVRocks) error {
password := kvrocksInstance.Spec.Password
replicas := int(kvrocksInstance.Spec.Replicas)
masterIP := []string{}
masterOfSlave := map[int]string{}

for index := 0; index < replicas; index++ {
var pod corev1.Pod
key := types.NamespacedName{
Namespace: instance.GetNamespace(),
Name: fmt.Sprintf("%s-%d", instance.GetName(), index),
Namespace: kvrocksInstance.GetNamespace(),
Name: fmt.Sprintf("%s-%d", kvrocksInstance.GetName(), index),
}
if err := env.Client.Get(ctx, key, &pod); err != nil {
return err
Expand All @@ -272,7 +272,7 @@ func checkKVRocks(instance *kvrocksv1alpha1.KVRocks) error {
}
masterOfSlave[index] = curMaster
}
for k, v := range instance.Spec.KVRocksConfig {
for k, v := range kvrocksInstance.Spec.KVRocksConfig {
curValue, err := kvrocksClient.GetConfig(node.IP, password, k)
if err != nil {
return err
Expand All @@ -291,15 +291,13 @@ func checkKVRocks(instance *kvrocksv1alpha1.KVRocks) error {
}
}

sentinel := resources.GetSentinelInstance(instance)

podList, err := getSentinelPodList(sentinel)
podList, err := getSentinelPodList(sentinelInstance)
if err != nil {
return fmt.Errorf("get sentinel pod list error: %v", err)
}
for _, pod := range podList.Items {
_, name := resources.ParseRedisName(instance.Name)
master, err := kvrocksClient.GetMasterFromSentinel(pod.Status.PodIP, sentinel.Spec.Password, name)
_, name := resources.ParseRedisName(kvrocksInstance.Name)
master, err := kvrocksClient.GetMasterFromSentinel(pod.Status.PodIP, sentinelInstance.Spec.Password, name)
if err != nil {
return err
}
Expand All @@ -309,7 +307,7 @@ func checkKVRocks(instance *kvrocksv1alpha1.KVRocks) error {
}

var pvcList corev1.PersistentVolumeClaimList
if err := env.Client.List(ctx, &pvcList, client.InNamespace(instance.Namespace), client.MatchingLabels(instance.Labels)); err != nil {
if err := env.Client.List(ctx, &pvcList, client.InNamespace(kvrocksInstance.Namespace), client.MatchingLabels(kvrocksInstance.Labels)); err != nil {
return err
}
if len(pvcList.Items) != replicas {
Expand Down Expand Up @@ -340,3 +338,18 @@ func getSentinelPodList(sentinel *kvrocksv1alpha1.KVRocks) (*corev1.PodList, err
}
return podList, nil
}

func getKvrocksByRole(instance *kvrocksv1alpha1.KVRocks, role string) (key types.NamespacedName, pod corev1.Pod) {
replicas := int(instance.Spec.Replicas)
for i := 0; i < replicas; i++ {
key = types.NamespacedName{
Namespace: instance.GetNamespace(),
Name: fmt.Sprintf("%s-%d", instance.GetName(), i),
}
Expect(env.Client.Get(ctx, key, &pod)).Should(Succeed())
if pod.Labels[resources.RedisRole] == role {
break
}
}
return
}

0 comments on commit 554d228

Please sign in to comment.