Skip to content

Commit

Permalink
release-3.5: backport etcd-io#17263 Fix tx buffer inconsistency if th…
Browse files Browse the repository at this point in the history
…ere are unordered key writes in one tx.

etcd-io#17263

Notes:
1. batch_tx_test.go is not incorporated into this backport because main and release-3.5 diverges a lot on buckets/schema/etc..
It is also not the core of the bug fix.
2. verify in tx_buffer.go is removed as mainly due to the dependency is not there in relese-3.5.

Signed-off-by: Chao Chen <[email protected]>
  • Loading branch information
chaochn47 committed Nov 10, 2024
1 parent 4726460 commit 3555f72
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 8 deletions.
25 changes: 17 additions & 8 deletions server/mvcc/backend/tx_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func (txw *txWriteBuffer) put(bucket Bucket, k, v []byte) {
}

func (txw *txWriteBuffer) putSeq(bucket Bucket, k, v []byte) {
// TODO: Add (in tests?) verification whether k>b[len(b)]
// putSeq is only be called for the data in the Key bucket. The keys
// in the Key bucket should be monotonically increasing revisions.
txw.putInternal(bucket, k, v)
}

Expand Down Expand Up @@ -80,6 +81,9 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
rb, ok := txr.buckets[k]
if !ok {
delete(txw.buckets, k)
if seq, ok := txw.bucket2seq[k]; ok && !seq {
wb.dedupe()
}
txr.buckets[k] = wb
continue
}
Expand Down Expand Up @@ -124,7 +128,7 @@ func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
bufVersion: 0,
}
for bucketName, bucket := range txr.txBuffer.buckets {
txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
txrCopy.txBuffer.buckets[bucketName] = bucket.CopyUsed()
}
return txrCopy
}
Expand All @@ -148,7 +152,7 @@ func newBucketBuffer() *bucketBuffer {
func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 }
idx := sort.Search(bb.used, f)
if idx < 0 {
if idx < 0 || idx >= bb.used {
return nil, nil
}
if len(endKey) == 0 {
Expand Down Expand Up @@ -201,10 +205,15 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
return
}
bb.dedupe()
}

// dedupe removes duplicates, using only newest update
func (bb *bucketBuffer) dedupe() {
if bb.used <= 1 {
return
}
sort.Stable(bb)

// remove duplicates, using only newest update
widx := 0
for ridx := 1; ridx < bb.used; ridx++ {
if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
Expand All @@ -221,11 +230,11 @@ func (bb *bucketBuffer) Less(i, j int) bool {
}
func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] }

func (bb *bucketBuffer) Copy() *bucketBuffer {
func (bb *bucketBuffer) CopyUsed() *bucketBuffer {
bbCopy := bucketBuffer{
buf: make([]kv, len(bb.buf)),
buf: make([]kv, bb.used),
used: bb.used,
}
copy(bbCopy.buf, bb.buf)
copy(bbCopy.buf, bb.buf[:bb.used])
return &bbCopy
}
142 changes: 142 additions & 0 deletions server/mvcc/backend/tx_buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package backend

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func Test_bucketBuffer_CopyUsed_After_Add(t *testing.T) {
bb := &bucketBuffer{buf: make([]kv, 10), used: 0}
for i := 0; i < 20; i++ {
k := fmt.Sprintf("key%d", i)
v := fmt.Sprintf("val%d", i)
bb.add([]byte(k), []byte(v))
bbCopy := bb.CopyUsed()
assert.Equal(t, bb.used, bbCopy.used)
assert.Len(t, bbCopy.buf, bbCopy.used)
assert.GreaterOrEqual(t, len(bb.buf), len(bbCopy.buf))
}
}

func Test_bucketBuffer_CopyUsed(t *testing.T) {
tests := []struct {
name string
bufLen int
used int
wantPanic bool
wantUsed int
wantBufLen int
}{
{
name: "used is 0",
bufLen: 10,
used: 0,
wantPanic: false,
wantUsed: 0,
wantBufLen: 0,
},
{
name: "used is greater than 0 and less than len(buf)",
bufLen: 10,
used: 5,
wantPanic: false,
wantUsed: 5,
wantBufLen: 5,
},
{
name: "used is equal to len(buf)",
bufLen: 10,
used: 10,
wantPanic: false,
wantUsed: 10,
wantBufLen: 10,
},
{
name: "used is greater than len(buf)",
bufLen: 10,
used: 11,
wantPanic: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
bb := &bucketBuffer{buf: make([]kv, tt.bufLen), used: tt.used}
if tt.wantPanic {
assert.Panicsf(t, func() {
bb.CopyUsed()
}, "expected panic when used (%d) and the length of buf (%d)", tt.used, tt.bufLen)
} else {
bbCopy := bb.CopyUsed()
assert.Equal(t, tt.wantUsed, bbCopy.used)
assert.Len(t, bbCopy.buf, tt.wantBufLen)
}
})
}
}

func TestDedupe(t *testing.T) {
tests := []struct {
name string
keys, vals, expectedKeys, expectedVals []string
}{
{
name: "empty",
keys: []string{},
vals: []string{},
expectedKeys: []string{},
expectedVals: []string{},
},
{
name: "single kv",
keys: []string{"key1"},
vals: []string{"val1"},
expectedKeys: []string{"key1"},
expectedVals: []string{"val1"},
},
{
name: "duplicate key",
keys: []string{"key1", "key1"},
vals: []string{"val1", "val2"},
expectedKeys: []string{"key1"},
expectedVals: []string{"val2"},
},
{
name: "unordered keys",
keys: []string{"key3", "key1", "key4", "key2", "key1", "key4"},
vals: []string{"val1", "val5", "val3", "val4", "val2", "val6"},
expectedKeys: []string{"key1", "key2", "key3", "key4"},
expectedVals: []string{"val2", "val4", "val1", "val6"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
bb := &bucketBuffer{buf: make([]kv, 10), used: 0}
for i := 0; i < len(tt.keys); i++ {
bb.add([]byte(tt.keys[i]), []byte(tt.vals[i]))
}
bb.dedupe()
assert.Len(t, tt.expectedKeys, bb.used)
for i := 0; i < bb.used; i++ {
assert.Equal(t, bb.buf[i].key, []byte(tt.expectedKeys[i]))
assert.Equal(t, bb.buf[i].val, []byte(tt.expectedVals[i]))
}
})
}
}

0 comments on commit 3555f72

Please sign in to comment.