Skip to content

Commit

Permalink
[release-17.0] VDiff: properly split cell values in record when using…
Browse files Browse the repository at this point in the history
… TabletPicker (#14099) (#14103)

Signed-off-by: Matt Lord <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Matt Lord <[email protected]>
  • Loading branch information
vitess-bot[bot] and mattlord authored Sep 28, 2023
1 parent 4a81edb commit 70b7a2c
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 38 deletions.
33 changes: 20 additions & 13 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,23 +104,27 @@ var testCases = []*testCase{
}

func TestVDiff2(t *testing.T) {
allCellNames = "zone1"
defaultCellName := "zone1"
allCellNames = "zone5,zone1,zone2,zone3,zone4"
sourceKs := "product"
sourceShards := []string{"0"}
targetKs := "customer"
targetShards := []string{"-80", "80-"}
// This forces us to use multiple vstream packets even with small test tables
extraVTTabletArgs = []string{"--vstream_packet_size=1"}

vc = NewVitessCluster(t, "TestVDiff2", []string{allCellNames}, mainClusterConfig)
vc = NewVitessCluster(t, "TestVDiff2", strings.Split(allCellNames, ","), mainClusterConfig)
require.NotNil(t, vc)
defaultCell = vc.Cells[defaultCellName]
cells := []*Cell{defaultCell}
zone1 := vc.Cells["zone1"]
zone2 := vc.Cells["zone2"]
zone3 := vc.Cells["zone3"]
defaultCell = zone1

defer vc.TearDown(t)

vc.AddKeyspace(t, cells, sourceKs, strings.Join(sourceShards, ","), initialProductVSchema, initialProductSchema, 0, 0, 100, sourceKsOpts)
// The primary tablet is only added in the first cell.
// We ONLY add primary tablets in this test.
_, err := vc.AddKeyspace(t, []*Cell{zone2, zone1, zone3}, sourceKs, strings.Join(sourceShards, ","), initialProductVSchema, initialProductSchema, 0, 0, 100, sourceKsOpts)
require.NoError(t, err)

vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
Expand All @@ -140,23 +144,25 @@ func TestVDiff2(t *testing.T) {

generateMoreCustomers(t, sourceKs, 100)

_, err := vc.AddKeyspace(t, cells, targetKs, strings.Join(targetShards, ","), customerVSchema, customerSchema, 0, 0, 200, targetKsOpts)
// The primary tablet is only added in the first cell.
// We ONLY add primary tablets in this test.
tks, err := vc.AddKeyspace(t, []*Cell{zone3, zone1, zone2}, targetKs, strings.Join(targetShards, ","), customerVSchema, customerSchema, 0, 0, 200, targetKsOpts)
require.NoError(t, err)
for _, shard := range targetShards {
require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard))
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
testWorkflow(t, vc, tc, cells)
// Primary tablets for any new shards are added in the first cell.
testWorkflow(t, vc, tc, tks, []*Cell{zone3, zone2, zone1})
})
}
}

func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell) {
func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, cells []*Cell) {
arrTargetShards := strings.Split(tc.targetShards, ",")
if tc.typ == "Reshard" {
tks := vc.Cells[cells[0].Name].Keyspaces[tc.targetKs]
require.NoError(t, vc.AddShards(t, cells, tks, tc.targetShards, 0, 0, tc.tabletBaseID, targetKsOpts))
for _, shard := range arrTargetShards {
require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, tc.targetKs, shard))
Expand All @@ -169,6 +175,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell)
if tc.typ == "Reshard" {
args = append(args, "--source_shards", tc.sourceShards, "--target_shards", tc.targetShards)
}
args = append(args, "--cells", allCellNames)
args = append(args, "--tables", tc.tables)
args = append(args, "Create")
args = append(args, ksWorkflow)
Expand All @@ -180,14 +187,14 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell)
catchup(t, tab, tc.workflow, tc.typ)
}

vdiff(t, tc.targetKs, tc.workflow, cells[0].Name, true, true, nil)
vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)

if tc.autoRetryError {
testAutoRetryError(t, tc, cells[0].Name)
testAutoRetryError(t, tc, allCellNames)
}

if tc.resume {
testResume(t, tc, cells[0].Name)
testResume(t, tc, allCellNames)
}

// These are done here so that we have a valid workflow to test the commands against
Expand Down
6 changes: 5 additions & 1 deletion go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ func doVdiff2(t *testing.T, keyspace, workflow, cells string, want *expectedVDif

func performVDiff2Action(t *testing.T, ksWorkflow, cells, action, actionArg string, expectError bool, extraFlags ...string) (uuid string, output string) {
var err error
args := []string{"VDiff", "--", "--tablet_types=primary", "--source_cell=" + cells, "--format=json"}
// This will always result in us using a PRIMARY tablet, which is all
// we start in many e2e tests, but it avoids the tablet picker logic
// where when you ONLY specify the PRIMARY type it then picks the
// shard's primary and ignores any cell settings.
args := []string{"VDiff", "--", "--tablet_types=in_order:primary,replica", "--source_cell=" + cells, "--format=json"}
if len(extraFlags) > 0 {
args = append(args, extraFlags...)
}
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vdiff/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package vdiff
import (
"context"
"fmt"
"strings"
"testing"

"github.com/google/uuid"
Expand Down Expand Up @@ -106,8 +107,8 @@ func TestVDiff(t *testing.T) {
MaxRows: 100,
},
PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{
SourceCell: tstenv.Cells[0],
TargetCell: tstenv.Cells[0],
SourceCell: strings.Join(tstenv.Cells, ","),
TargetCell: strings.Join(tstenv.Cells, ","),
TabletTypes: "primary",
},
ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{
Expand Down
51 changes: 29 additions & 22 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -121,7 +122,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error {
}
}()

if err := td.selectTablets(ctx, td.wd.opts.PickerOptions.SourceCell, td.wd.opts.PickerOptions.TabletTypes); err != nil {
if err := td.selectTablets(ctx); err != nil {
return err
}
if err := td.syncSourceStreams(ctx); err != nil {
Expand Down Expand Up @@ -199,16 +200,22 @@ func (td *tableDiffer) forEachSource(cb func(source *migrationSource) error) err
return allErrors.AggrError(vterrors.Aggregate)
}

func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes string) error {
var wg sync.WaitGroup
ct := td.wd.ct
var err1, err2 error
func (td *tableDiffer) selectTablets(ctx context.Context) error {
var (
wg sync.WaitGroup
sourceErr, targetErr error
targetTablet *topodata.Tablet
)

// The cells from the vdiff record are a comma separated list.
sourceCells := strings.Split(td.wd.opts.PickerOptions.SourceCell, ",")
targetCells := strings.Split(td.wd.opts.PickerOptions.TargetCell, ",")

// For Mount+Migrate, the source tablets will be in a different
// Vitess cluster with its own TopoServer.
sourceTopoServer := ct.ts
if ct.externalCluster != "" {
extTS, err := ct.ts.OpenExternalVitessClusterServer(ctx, ct.externalCluster)
sourceTopoServer := td.wd.ct.ts
if td.wd.ct.externalCluster != "" {
extTS, err := td.wd.ct.ts.OpenExternalVitessClusterServer(ctx, td.wd.ct.externalCluster)
if err != nil {
return err
}
Expand All @@ -217,39 +224,39 @@ func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes stri
wg.Add(1)
go func() {
defer wg.Done()
err1 = td.forEachSource(func(source *migrationSource) error {
tablet, err := pickTablet(ctx, sourceTopoServer, cell, ct.vde.thisTablet.Alias.Cell, ct.sourceKeyspace, source.shard, tabletTypes)
sourceErr = td.forEachSource(func(source *migrationSource) error {
sourceTablet, err := pickTablet(ctx, sourceTopoServer, sourceCells, td.wd.ct.vde.thisTablet.Alias.Cell, td.wd.ct.sourceKeyspace, source.shard, td.wd.opts.PickerOptions.TabletTypes)
if err != nil {
return err
}
source.tablet = tablet
source.tablet = sourceTablet
return nil
})
}()

wg.Add(1)
go func() {
defer wg.Done()
tablet, err2 := pickTablet(ctx, ct.ts, td.wd.opts.PickerOptions.TargetCell, ct.vde.thisTablet.Alias.Cell, ct.vde.thisTablet.Keyspace,
ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes)
if err2 != nil {
targetTablet, targetErr = pickTablet(ctx, td.wd.ct.ts, targetCells, td.wd.ct.vde.thisTablet.Alias.Cell, td.wd.ct.vde.thisTablet.Keyspace,
td.wd.ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes)
if targetErr != nil {
return
}
ct.targetShardStreamer = &shardStreamer{
tablet: tablet,
shard: tablet.Shard,
td.wd.ct.targetShardStreamer = &shardStreamer{
tablet: targetTablet,
shard: targetTablet.Shard,
}
}()

wg.Wait()
if err1 != nil {
return err1
if sourceErr != nil {
return sourceErr
}
return err2
return targetErr
}

func pickTablet(ctx context.Context, ts *topo.Server, cell, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) {
tp, err := discovery.NewTabletPicker(ctx, ts, []string{cell}, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{})
func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) {
tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{})
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 70b7a2c

Please sign in to comment.