Skip to content

Commit

Permalink
fix(store): respect context in flushLoop
Browse files Browse the repository at this point in the history
  • Loading branch information
cristaloleg committed Jul 29, 2024
1 parent 8f53979 commit cf6765c
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 3 deletions.
22 changes: 19 additions & 3 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,12 @@ func (s *Store[H]) flushLoop() {
log.Errorw("writing header batch", "try", i+1, "from", from, "to", to, "err", err)
s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), true)

const maxRetrySleep = time.Second
sleep := min(10*time.Duration(i+1)*time.Millisecond, maxRetrySleep)
time.Sleep(sleep)
const maxRetrySleep = 100 * time.Millisecond
sleepDur := min(10*time.Duration(i+1)*time.Millisecond, maxRetrySleep)

if err := sleep(ctx, sleepDur); err != nil {
break
}
}

s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), false)
Expand Down Expand Up @@ -511,3 +514,16 @@ func indexTo[H header.Header[H]](ctx context.Context, batch datastore.Batch, hea
}
return nil
}

// sleep with cancellation, returns nil when timer has fired, error otherwise.
func sleep(ctx context.Context, duration time.Duration) error {
timer := time.NewTimer(duration)
defer timer.Stop()

select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}
55 changes: 55 additions & 0 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package store

import (
"context"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -89,6 +90,45 @@ func TestStore(t *testing.T) {
assert.Len(t, out, 12)
}

func TestStore_BadFlush(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)

suite := headertest.NewTestSuite(t)

var callCount atomic.Int32

rootDS := sync.MutexWrap(datastore.NewMapDatastore())
ds := &badBatchDatastore{
Datastore: rootDS,
BatchFn: func(ctx context.Context) (datastore.Batch, error) {
count := callCount.Add(1)
// do not fail on 1st call due to store.Init and stop failing after 5 call.
if count > 1 && count < 5 {
return nil, context.Canceled
}
return rootDS.Batch(ctx)
},
}
store := NewTestStore(t, ctx, ds, suite.Head())

head, err := store.Head(ctx)
require.NoError(t, err)
assert.EqualValues(t, suite.Head().Hash(), head.Hash())

in := suite.GenDummyHeaders(10)
err = store.Append(ctx, in...)
require.NoError(t, err)

assert.Eventually(t, func() bool {
// accessing store.ds directly because inside store we use wrappedStore,
// accessing ds or rootDS directly will result in constant error due to key prefix.
ok, err := store.ds.Has(ctx, headKey)
require.NoError(t, err)
return ok
}, 3*time.Second, 50*time.Millisecond)
}

// TestStore_GetRangeByHeight_ExpectedRange
func TestStore_GetRangeByHeight_ExpectedRange(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down Expand Up @@ -279,3 +319,18 @@ func TestStoreInit(t *testing.T) {
err = store.Init(ctx, headers[len(headers)-1]) // init should work with any height, not only 1
require.NoError(t, err)
}

var _ datastore.Batching = &badBatchDatastore{}

type badBatchDatastore struct {
datastore.Datastore

BatchFn func(ctx context.Context) (datastore.Batch, error)
}

func (s *badBatchDatastore) Batch(ctx context.Context) (datastore.Batch, error) {
if s.BatchFn != nil {
return s.BatchFn(ctx)
}
return nil, nil
}

0 comments on commit cf6765c

Please sign in to comment.