diff --git a/tests/e2e/utils.go b/tests/e2e/utils.go index 65357ae18518..26182eb30b4d 100644 --- a/tests/e2e/utils.go +++ b/tests/e2e/utils.go @@ -42,6 +42,7 @@ func newClient(t *testing.T, entpoints []string, cfg e2e.ClientConfig) *clientv3 Endpoints: entpoints, DialTimeout: 5 * time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()}, + Logger: zap.NewNop(), } if tlscfg != nil { ccfg.TLS, err = tlscfg.ClientConfig() diff --git a/tests/e2e/watch_delay_test.go b/tests/e2e/watch_delay_test.go index ffde0a02cb13..0e3053f809c8 100644 --- a/tests/e2e/watch_delay_test.go +++ b/tests/e2e/watch_delay_test.go @@ -19,15 +19,22 @@ package e2e import ( "context" + "errors" "fmt" + "math/rand" + "path" "strings" "sync" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" "golang.org/x/sync/errgroup" + v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" ) @@ -246,3 +253,241 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr return nil }) } + +// TestWatchOnStreamMultiplex ensures slow etcd watchers throws terminal ErrCompacted error if its +// next batch of events to be sent are compacted. +func TestWatchOnStreamMultiplex(t *testing.T) { + e2e.BeforeTest(t) + clus, err := e2e.NewEtcdProcessCluster(context.Background(), t, e2e.WithClusterSize(1)) + require.NoError(t, err) + defer clus.Close() + endpoints := clus.EndpointsGRPC() + c := newClient(t, endpoints, e2e.ClientConfig{}) + rootCtx, rootCtxCancel := context.WithCancel(context.Background()) + defer rootCtxCancel() + + g := errgroup.Group{} + watchKeyPrefix := "/registry/pods/" + commonWatchOpts := []clientv3.OpOption{ + clientv3.WithPrefix(), + clientv3.WithPrevKV(), + } + + watchCacheEventsReceived := atomic.Int64{} + watchCacheWatchInitialized := make(chan struct{}) + watchCacheWatcherExited := make(chan struct{}) + g.Go(func() error { + // simulate watch cache + lg := zaptest.NewLogger(t).Named("watch-cache") + defer func() { lg.Debug("watcher exited") }() + gresp, err := c.Get(rootCtx, "foo") + if err != nil { + panic(err) + } + rev := gresp.Header.Revision + + lastEventModifiedRevision := rev + watchCacheWatchOpts := append([]clientv3.OpOption{clientv3.WithCreatedNotify(), clientv3.WithRev(rev), clientv3.WithProgressNotify()}, commonWatchOpts...) + for wres := range c.Watch(rootCtx, watchKeyPrefix, watchCacheWatchOpts...) { + if wres.Err() != nil { + lg.Warn("got watch response error", + zap.Int64("last-received-events-kv-mod-revision", lastEventModifiedRevision), + zap.Int64("compact-revision", wres.CompactRevision), + zap.String("error", wres.Err().Error())) + close(watchCacheWatcherExited) + return nil + } + if wres.Created { + close(watchCacheWatchInitialized) + } + watchCacheEventsReceived.Add(int64(len(wres.Events))) + for _, ev := range wres.Events { + if ev.Kv.ModRevision != lastEventModifiedRevision+1 { + close(watchCacheWatcherExited) + return fmt.Errorf("event loss detected; want rev %d but got rev %d", lastEventModifiedRevision+1, ev.Kv.ModRevision) + } + lastEventModifiedRevision = ev.Kv.ModRevision + } + } + return nil + }) + <-watchCacheWatchInitialized + + var wg sync.WaitGroup + numOfDirectWatches := 800 + for i := 0; i < numOfDirectWatches; i++ { + wg.Add(1) + g.Go(func() error { + perDirectWatchContext, perDirectWatchCancelFn := context.WithCancel(rootCtx) + retry := 0 + for { + watchOpts := append([]clientv3.OpOption{}, commonWatchOpts...) + if retry == 0 { + watchOpts = append(watchOpts, clientv3.WithCreatedNotify()) + } + err := directWatch(perDirectWatchContext, &wg, c, watchKeyPrefix, watchOpts) + if errors.Is(err, v3rpc.ErrCompacted) { + retry++ + continue + } + // if watch is cancelled by client or closed by server, we should exit + perDirectWatchCancelFn() + return nil + } + }) + } + wg.Wait() + + eventsTriggered := atomic.Int64{} + loadCtx, loadCtxCancel := context.WithTimeout(rootCtx, time.Minute) + defer loadCtxCancel() + generateLoad(loadCtx, c, watchKeyPrefix, &g, &eventsTriggered) + compaction(loadCtx, t, &g, c) + + // validate whether watch cache watcher is compacted or get all the events. + compareEventsReceivedAndTriggered(loadCtx, t, rootCtxCancel, &g, &watchCacheEventsReceived, &eventsTriggered, watchCacheWatcherExited) + require.NoError(t, g.Wait()) +} + +func directWatch(ctx context.Context, wg *sync.WaitGroup, c *clientv3.Client, keyPrefix string, watchOpts []clientv3.OpOption) error { + wch := c.Watch(ctx, keyPrefix, watchOpts...) + for wres := range wch { + if wres.Err() != nil { + return wres.Err() + } + if wres.Created { + wg.Done() + } + } + return nil +} + +func generateLoad(ctx context.Context, c *clientv3.Client, watchKeyPrefix string, group *errgroup.Group, counter *atomic.Int64) { + numOfUpdater := 200 + keyValueSize := 1000 + keyValueSizeUpperLimit := 1200 + for i := 0; i < numOfUpdater; i++ { + writeKeyPrefix := path.Join(watchKeyPrefix, fmt.Sprintf("%d", i)) + group.Go(func() error { + count := 0 + keyValuePayload := randomStringMaker.makeString(keyValueSize, keyValueSizeUpperLimit) + for { + select { + case <-ctx.Done(): + return nil + default: + } + count++ + key := path.Join(writeKeyPrefix, fmt.Sprintf("%d", count)) + if _, err := c.Put(ctx, key, keyValuePayload); err == nil { + counter.Add(1) + } + if _, err := c.Delete(ctx, key); err == nil { + counter.Add(1) + } + time.Sleep(10 * time.Millisecond) + } + }) + } +} + +type randomStringAlphabet string + +func (a randomStringAlphabet) makeString(minLen, maxLen int) string { + n := minLen + if minLen < maxLen { + n += rand.Intn(maxLen - minLen) + } + var s string + for i := 0; i < n; i++ { + s += string(a[rand.Intn(len(a))]) + } + return s +} + +var randomStringMaker = randomStringAlphabet("abcdefghijklmnopqrstuvwxyz0123456789") + +func compaction(ctx context.Context, t *testing.T, group *errgroup.Group, c *clientv3.Client) { + group.Go(func() error { + lg := zaptest.NewLogger(t).Named("compaction") + lastCompactRev := int64(-1) + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + lg.Warn("context deadline exceeded, exit compaction routine") + return nil + case <-ticker.C: + } + if lastCompactRev < 0 { + gresp, err := c.Get(ctx, "foo") + if err != nil { + panic(err) + } + lastCompactRev = gresp.Header.Revision + continue + } + cres, err := c.Compact(ctx, lastCompactRev, clientv3.WithCompactPhysical()) + if err != nil { + lg.Warn("failed to compact", zap.Error(err)) + continue + } + lg.Debug("compacted rev", zap.Int64("compact-revision", lastCompactRev)) + lastCompactRev = cres.Header.Revision + } + }) +} + +func compareEventsReceivedAndTriggered( + loadCtx context.Context, + t *testing.T, + rootCtxCancel context.CancelFunc, + group *errgroup.Group, + watchCacheEventsReceived *atomic.Int64, + eventsTriggered *atomic.Int64, + watchCacheWatcherExited <-chan struct{}, +) { + group.Go(func() error { + defer rootCtxCancel() // cancel all the watchers and load. + + lg := zaptest.NewLogger(t).Named("compareEvents") + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + timer := time.NewTimer(2 * time.Minute) + defer timer.Stop() + + var once sync.Once + for { + // block until traffic is done. + select { + case <-loadCtx.Done(): + once.Do(func() { lg.Info("load generator context is done") }) + case <-watchCacheWatcherExited: + // watch cache watcher channel is expected to be closed with compacted error + // then there is no need to wait for load to verify watch cache receives all the events. + return nil + } + + select { + case <-ticker.C: + case <-timer.C: + triggered := eventsTriggered.Load() + received := watchCacheEventsReceived.Load() + return fmt.Errorf("5 minutes passed since load generation is done, watch cache lost event detected; "+ + "watch evetns received %d, received %d", received, triggered) + } + triggered := eventsTriggered.Load() + received := watchCacheEventsReceived.Load() + if received >= triggered { + lg.Info("The number of events watch cache received is high than or equal to events triggered on client side", + zap.Int64("watch-cache-received", received), + zap.Int64("traffic-triggered", triggered)) + return nil + } + lg.Warn("watch events received is lagging behind", + zap.Int64("watch-events-received", received), + zap.Int64("events-triggered", triggered)) + } + }) +}