Skip to content

Commit

Permalink
reproduce watch starvation and event loss
Browse files Browse the repository at this point in the history
Signed-off-by: Chao Chen <[email protected]>
  • Loading branch information
chaochn47 committed Mar 8, 2024
1 parent 266a3ba commit e00bb98
Show file tree
Hide file tree
Showing 2 changed files with 246 additions and 0 deletions.
1 change: 1 addition & 0 deletions tests/e2e/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func newClient(t *testing.T, entpoints []string, cfg e2e.ClientConfig) *clientv3
Endpoints: entpoints,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
Logger: zap.NewNop(),
}
if tlscfg != nil {
ccfg.TLS, err = tlscfg.ClientConfig()
Expand Down
245 changes: 245 additions & 0 deletions tests/e2e/watch_delay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@ package e2e

import (
"context"
"errors"
"fmt"
"math/rand"
"path"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"

v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)
Expand Down Expand Up @@ -246,3 +253,241 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr
return nil
})
}

// TestWatchOnStreamMultiplex ensures slow etcd watchers throws terminal ErrCompacted error if its
// next batch of events to be sent are compacted.
func TestWatchOnStreamMultiplex(t *testing.T) {
e2e.BeforeTest(t)
clus, err := e2e.NewEtcdProcessCluster(context.Background(), t, e2e.WithClusterSize(1))
require.NoError(t, err)
defer clus.Close()
endpoints := clus.EndpointsGRPC()
c := newClient(t, endpoints, e2e.ClientConfig{})
rootCtx, rootCtxCancel := context.WithCancel(context.Background())
defer rootCtxCancel()

g := errgroup.Group{}
watchKeyPrefix := "/registry/pods/"
commonWatchOpts := []clientv3.OpOption{
clientv3.WithPrefix(),
clientv3.WithPrevKV(),
}

watchCacheEventsReceived := atomic.Int64{}
watchCacheWatchInitialized := make(chan struct{})
watchCacheWatcherExited := make(chan struct{})
g.Go(func() error {
// simulate watch cache
lg := zaptest.NewLogger(t).Named("watch-cache")
defer func() { lg.Debug("watcher exited") }()
gresp, err := c.Get(rootCtx, "foo")
if err != nil {
panic(err)
}
rev := gresp.Header.Revision

lastEventModifiedRevision := rev
watchCacheWatchOpts := append([]clientv3.OpOption{clientv3.WithCreatedNotify(), clientv3.WithRev(rev), clientv3.WithProgressNotify()}, commonWatchOpts...)
for wres := range c.Watch(rootCtx, watchKeyPrefix, watchCacheWatchOpts...) {
if wres.Err() != nil {
lg.Warn("got watch response error",
zap.Int64("last-received-events-kv-mod-revision", lastEventModifiedRevision),
zap.Int64("compact-revision", wres.CompactRevision),
zap.String("error", wres.Err().Error()))
close(watchCacheWatcherExited)
return nil
}
if wres.Created {
close(watchCacheWatchInitialized)
}
watchCacheEventsReceived.Add(int64(len(wres.Events)))
for _, ev := range wres.Events {
if ev.Kv.ModRevision != lastEventModifiedRevision+1 {
close(watchCacheWatcherExited)
return fmt.Errorf("event loss detected; want rev %d but got rev %d", lastEventModifiedRevision+1, ev.Kv.ModRevision)
}
lastEventModifiedRevision = ev.Kv.ModRevision
}
}
return nil
})
<-watchCacheWatchInitialized

var wg sync.WaitGroup
numOfDirectWatches := 800
for i := 0; i < numOfDirectWatches; i++ {
wg.Add(1)
g.Go(func() error {
perDirectWatchContext, perDirectWatchCancelFn := context.WithCancel(rootCtx)
retry := 0
for {
watchOpts := append([]clientv3.OpOption{}, commonWatchOpts...)
if retry == 0 {
watchOpts = append(watchOpts, clientv3.WithCreatedNotify())
}
err := directWatch(perDirectWatchContext, &wg, c, watchKeyPrefix, watchOpts)
if errors.Is(err, v3rpc.ErrCompacted) {
retry++
continue
}
// if watch is cancelled by client or closed by server, we should exit
perDirectWatchCancelFn()
return nil
}
})
}
wg.Wait()

eventsTriggered := atomic.Int64{}
loadCtx, loadCtxCancel := context.WithTimeout(rootCtx, time.Minute)
defer loadCtxCancel()
generateLoad(loadCtx, c, watchKeyPrefix, &g, &eventsTriggered)
compaction(loadCtx, t, &g, c)

// validate whether watch cache watcher is compacted or get all the events.
compareEventsReceivedAndTriggered(loadCtx, t, rootCtxCancel, &g, &watchCacheEventsReceived, &eventsTriggered, watchCacheWatcherExited)
require.NoError(t, g.Wait())
}

func directWatch(ctx context.Context, wg *sync.WaitGroup, c *clientv3.Client, keyPrefix string, watchOpts []clientv3.OpOption) error {
wch := c.Watch(ctx, keyPrefix, watchOpts...)
for wres := range wch {
if wres.Err() != nil {
return wres.Err()
}
if wres.Created {
wg.Done()
}
}
return nil
}

func generateLoad(ctx context.Context, c *clientv3.Client, watchKeyPrefix string, group *errgroup.Group, counter *atomic.Int64) {
numOfUpdater := 200
keyValueSize := 1000
keyValueSizeUpperLimit := 1200
for i := 0; i < numOfUpdater; i++ {
writeKeyPrefix := path.Join(watchKeyPrefix, fmt.Sprintf("%d", i))
group.Go(func() error {
count := 0
keyValuePayload := randomStringMaker.makeString(keyValueSize, keyValueSizeUpperLimit)
for {
select {
case <-ctx.Done():
return nil
default:
}
count++
key := path.Join(writeKeyPrefix, fmt.Sprintf("%d", count))
if _, err := c.Put(ctx, key, keyValuePayload); err == nil {
counter.Add(1)
}
if _, err := c.Delete(ctx, key); err == nil {
counter.Add(1)
}
time.Sleep(10 * time.Millisecond)
}
})
}
}

type randomStringAlphabet string

func (a randomStringAlphabet) makeString(minLen, maxLen int) string {
n := minLen
if minLen < maxLen {
n += rand.Intn(maxLen - minLen)
}
var s string
for i := 0; i < n; i++ {
s += string(a[rand.Intn(len(a))])
}
return s
}

var randomStringMaker = randomStringAlphabet("abcdefghijklmnopqrstuvwxyz0123456789")

func compaction(ctx context.Context, t *testing.T, group *errgroup.Group, c *clientv3.Client) {
group.Go(func() error {
lg := zaptest.NewLogger(t).Named("compaction")
lastCompactRev := int64(-1)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
lg.Warn("context deadline exceeded, exit compaction routine")
return nil
case <-ticker.C:
}
if lastCompactRev < 0 {
gresp, err := c.Get(ctx, "foo")
if err != nil {
panic(err)
}
lastCompactRev = gresp.Header.Revision
continue
}
cres, err := c.Compact(ctx, lastCompactRev, clientv3.WithCompactPhysical())
if err != nil {
lg.Warn("failed to compact", zap.Error(err))
continue
}
lg.Debug("compacted rev", zap.Int64("compact-revision", lastCompactRev))
lastCompactRev = cres.Header.Revision
}
})
}

func compareEventsReceivedAndTriggered(
loadCtx context.Context,
t *testing.T,
rootCtxCancel context.CancelFunc,
group *errgroup.Group,
watchCacheEventsReceived *atomic.Int64,
eventsTriggered *atomic.Int64,
watchCacheWatcherExited <-chan struct{},
) {
group.Go(func() error {
defer rootCtxCancel() // cancel all the watchers and load.

lg := zaptest.NewLogger(t).Named("compareEvents")
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
timer := time.NewTimer(2 * time.Minute)
defer timer.Stop()

var once sync.Once
for {
// block until traffic is done.
select {
case <-loadCtx.Done():
once.Do(func() { lg.Info("load generator context is done") })
case <-watchCacheWatcherExited:
// watch cache watcher channel is expected to be closed with compacted error
// then there is no need to wait for load to verify watch cache receives all the events.
return nil
}

select {
case <-ticker.C:
case <-timer.C:
triggered := eventsTriggered.Load()
received := watchCacheEventsReceived.Load()
return fmt.Errorf("5 minutes passed since load generation is done, watch cache lost event detected; "+
"watch evetns received %d, received %d", received, triggered)
}
triggered := eventsTriggered.Load()
received := watchCacheEventsReceived.Load()
if received >= triggered {
lg.Info("The number of events watch cache received is high than or equal to events triggered on client side",
zap.Int64("watch-cache-received", received),
zap.Int64("traffic-triggered", triggered))
return nil
}
lg.Warn("watch events received is lagging behind",
zap.Int64("watch-events-received", received),
zap.Int64("events-triggered", triggered))
}
})
}

0 comments on commit e00bb98

Please sign in to comment.