From 9b904cc0ab05046b6b1f50fb9a2e3d9ad7a108a8 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 20 Sep 2023 19:18:25 -0400 Subject: [PATCH] Migrate client side unit tests Signed-off-by: Matt Lord --- .../vreplication/vdiff/vdiff_env_test.go | 351 ++++++++++++ .../command/vreplication/vdiff/vdiff_test.go | 530 ++++++++++++++++++ 2 files changed, 881 insertions(+) create mode 100644 go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_env_test.go create mode 100644 go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_test.go diff --git a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_env_test.go b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_env_test.go new file mode 100644 index 00000000000..13766555124 --- /dev/null +++ b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_env_test.go @@ -0,0 +1,351 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vdiff + +import ( + "bytes" + "context" + "fmt" + "io" + "math/rand" + "os" + "sync" + "testing" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vtctl/workflow" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/queryservice/fakes" + "vitess.io/vitess/go/vt/vttablet/tabletconn" + "vitess.io/vitess/go/vt/vttablet/tabletconntest" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +const ( + // vdiffStopPosition is the default stop position for the target vreplication. + // It can be overridden with the positons argument to newTestVDiffEnv. + vdiffStopPosition = "MySQL56/d834e6b8-7cbf-11ed-a1eb-0242ac120002:1-892" + // vdiffSourceGtid should be the position reported by the source side VStreamResults. + // It's expected to be higher the vdiffStopPosition. + vdiffSourceGtid = "MySQL56/d834e6b8-7cbf-11ed-a1eb-0242ac120002:1-893" + // vdiffTargetPrimaryPosition is the primary position of the target after + // vreplication has been synchronized. + vdiffTargetPrimaryPosition = "MySQL56/e34d6fb6-7cbf-11ed-a1eb-0242ac120002:1-892" +) + +type testVDiffEnv struct { + ws *workflow.Server + workflow string + topoServ *topo.Server + cell string + tabletType topodatapb.TabletType + tmc *testVDiffTMClient + getOutput func() string + + mu sync.Mutex + tablets map[int]*testVDiffTablet +} + +//---------------------------------------------- +// testVDiffEnv + +func newTestVDiffEnv(t testing.TB, ctx context.Context, sourceShards, targetShards []string, query string, positions map[string]string) *testVDiffEnv { + env := &testVDiffEnv{ + workflow: "vdiffTest", + tablets: make(map[int]*testVDiffTablet), + topoServ: memorytopo.NewServer(ctx, "cell"), + cell: "cell", + tabletType: topodatapb.TabletType_REPLICA, + tmc: newTestVDiffTMClient(), + } + env.ws = workflow.NewServer(env.topoServ, env.tmc) + env.tmc.testEnv = env + + // Generate a unique dialer name. + dialerName := fmt.Sprintf("VDiffTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + env.mu.Lock() + defer env.mu.Unlock() + if qs, ok := env.tablets[int(tablet.Alias.Uid)]; ok { + return qs, nil + } + return nil, fmt.Errorf("tablet %d not found", tablet.Alias.Uid) + }) + tabletconntest.SetProtocol("go.cmd.vtctldclient.vreplication.vdiff_env_test", dialerName) + + tabletID := 100 + for _, shard := range sourceShards { + _ = env.addTablet(tabletID, "source", shard, topodatapb.TabletType_PRIMARY) + env.tmc.waitpos[tabletID+1] = vdiffStopPosition + + tabletID += 10 + } + tabletID = 200 + for _, shard := range targetShards { + primary := env.addTablet(tabletID, "target", shard, topodatapb.TabletType_PRIMARY) + + var rows []string + var posRows []string + for j, sourceShard := range sourceShards { + bls := &binlogdatapb.BinlogSource{ + Keyspace: "source", + Shard: sourceShard, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: query, + }}, + }, + } + rows = append(rows, fmt.Sprintf("%d|%v|||", j+1, bls)) + position := vdiffStopPosition + if pos := positions[sourceShard+shard]; pos != "" { + position = pos + } + posRows = append(posRows, fmt.Sprintf("%v|%s", bls, position)) + + // vdiff.syncTargets. This actually happens after stopTargets. + // But this is one statement per stream. + env.tmc.setVRResults( + primary.tablet, + fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for vdiff' where id=%d", vdiffSourceGtid, j+1), + &sqltypes.Result{}, + ) + } + // migrater buildMigrationTargets + env.tmc.setVRResults( + primary.tablet, + "select id, source, message, cell, tablet_types, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where workflow='vdiffTest' and db_name='vt_target'", + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message|cell|tablet_types|workflow_type|workflow_sub_type|defer_secondary_keys", + "int64|varchar|varchar|varchar|varchar|int64|int64|int64"), + rows..., + ), + ) + + // vdiff.stopTargets + env.tmc.setVRResults(primary.tablet, "update _vt.vreplication set state='Stopped', message='for vdiff' where db_name='vt_target' and workflow='vdiffTest'", &sqltypes.Result{}) + env.tmc.setVRResults( + primary.tablet, + "select source, pos from _vt.vreplication where db_name='vt_target' and workflow='vdiffTest'", + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "source|pos", + "varchar|varchar"), + posRows..., + ), + ) + + // vdiff.syncTargets (continued) + env.tmc.vrpos[tabletID] = vdiffSourceGtid + env.tmc.pos[tabletID] = vdiffTargetPrimaryPosition + + // vdiff.startQueryStreams + env.tmc.waitpos[tabletID+1] = vdiffTargetPrimaryPosition + + // vdiff.restartTargets + env.tmc.setVRResults(primary.tablet, "update _vt.vreplication set state='Running', message='', stop_pos='' where db_name='vt_target' and workflow='vdiffTest'", &sqltypes.Result{}) + + tabletID += 10 + } + env.resetOutput() + return env +} + +func (env *testVDiffEnv) resetOutput() { + env.mu.Lock() + defer env.mu.Unlock() + ogstdout := os.Stdout + r, w, err := os.Pipe() + if err != nil { + log.Errorf("os.Pipe() err: %v", err) + } + os.Stdout = w + env.getOutput = func() string { + w.Close() + os.Stdout = ogstdout + var buf bytes.Buffer + _, _ = io.Copy(&buf, r) + return buf.String() + } +} + +func (env *testVDiffEnv) close() { + env.mu.Lock() + defer env.mu.Unlock() + for _, t := range env.tablets { + _ = env.topoServ.DeleteTablet(context.Background(), t.tablet.Alias) + } + env.tablets = nil + env.topoServ.Close() + env.ws = nil +} + +func (env *testVDiffEnv) addTablet(id int, keyspace, shard string, tabletType topodatapb.TabletType) *testVDiffTablet { + env.mu.Lock() + defer env.mu.Unlock() + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: env.cell, + Uid: uint32(id), + }, + Keyspace: keyspace, + Shard: shard, + KeyRange: &topodatapb.KeyRange{}, + Type: tabletType, + PortMap: map[string]int32{ + "test": int32(id), + }, + } + env.tablets[id] = newTestVDiffTablet(tablet) + if err := env.topoServ.InitTablet(context.Background(), tablet, false /* allowPrimaryOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil { + panic(err) + } + if tabletType == topodatapb.TabletType_PRIMARY { + _, err := env.topoServ.UpdateShardFields(context.Background(), keyspace, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = tablet.Alias + return nil + }) + if err != nil { + panic(err) + } + } + return env.tablets[id] +} + +//---------------------------------------------- +// testVDiffTablet + +type testVDiffTablet struct { + queryservice.QueryService + tablet *topodatapb.Tablet +} + +func newTestVDiffTablet(tablet *topodatapb.Tablet) *testVDiffTablet { + return &testVDiffTablet{ + QueryService: fakes.ErrorQueryService, + tablet: tablet, + } +} + +func (tvt *testVDiffTablet) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { + return callback(&querypb.StreamHealthResponse{ + Serving: true, + Target: &querypb.Target{ + Keyspace: tvt.tablet.Keyspace, + Shard: tvt.tablet.Shard, + TabletType: tvt.tablet.Type, + }, + RealtimeStats: &querypb.RealtimeStats{}, + }) +} + +//---------------------------------------------- +// testVDiffTMCclient + +type testVDiffTMClient struct { + tmclient.TabletManagerClient + vrQueries map[int]map[string]*querypb.QueryResult + vdRequests map[int]map[string]*tabletmanagerdatapb.VDiffResponse + waitpos map[int]string + vrpos map[int]string + pos map[int]string + + testEnv *testVDiffEnv +} + +func newTestVDiffTMClient() *testVDiffTMClient { + return &testVDiffTMClient{ + vrQueries: make(map[int]map[string]*querypb.QueryResult), + vdRequests: make(map[int]map[string]*tabletmanagerdatapb.VDiffResponse), + waitpos: make(map[int]string), + vrpos: make(map[int]string), + pos: make(map[int]string), + } +} + +func (tmc *testVDiffTMClient) setVRResults(tablet *topodatapb.Tablet, query string, result *sqltypes.Result) { + queries, ok := tmc.vrQueries[int(tablet.Alias.Uid)] + if !ok { + queries = make(map[string]*querypb.QueryResult) + tmc.vrQueries[int(tablet.Alias.Uid)] = queries + } + queries[query] = sqltypes.ResultToProto3(result) +} + +func (tmc *testVDiffTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) { + result, ok := tmc.vrQueries[int(tablet.Alias.Uid)][query] + if !ok { + return nil, fmt.Errorf("query %q not found for tablet %d", query, tablet.Alias.Uid) + } + return result, nil +} + +func (tmc *testVDiffTMClient) setVDResults(tablet *topodatapb.Tablet, req *tabletmanagerdatapb.VDiffRequest, res *tabletmanagerdatapb.VDiffResponse) { + reqs, ok := tmc.vdRequests[int(tablet.Alias.Uid)] + if !ok { + reqs = make(map[string]*tabletmanagerdatapb.VDiffResponse) + tmc.vdRequests[int(tablet.Alias.Uid)] = reqs + } + reqs[req.VdiffUuid] = res +} + +func (tmc *testVDiffTMClient) VDiff(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.VDiffRequest) (*tabletmanagerdatapb.VDiffResponse, error) { + resp, ok := tmc.vdRequests[int(tablet.Alias.Uid)][req.VdiffUuid] + if !ok { + return nil, fmt.Errorf("request %+v not found for tablet %d", req, tablet.Alias.Uid) + } + return resp, nil +} + +func (tmc *testVDiffTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) { + id := int32(1) + resp := &tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ + Workflow: "vdiffTest", + } + + sourceShards, _ := tmc.testEnv.topoServ.GetShardNames(ctx, "source") + streams := make([]*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream, 0, len(sourceShards)) + for _, shard := range sourceShards { + streams = append(streams, &tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + Id: id, + Bls: &binlogdatapb.BinlogSource{ + Keyspace: "source", + Shard: shard, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: ".*", + }, + }, + }, + }, + }) + id++ + } + resp.Streams = streams + + return resp, nil +} diff --git a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_test.go b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_test.go new file mode 100644 index 00000000000..37957754b08 --- /dev/null +++ b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_test.go @@ -0,0 +1,530 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vdiff + +import ( + "context" + "fmt" + "math" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "gotest.tools/assert" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" + + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" +) + +var ( + fields = sqltypes.MakeTestFields( + "vdiff_state|last_error|table_name|uuid|table_state|table_rows|started_at|rows_compared|completed_at|has_mismatch|report", + "varbinary|varbinary|varbinary|varchar|varbinary|int64|timestamp|int64|timestamp|int64|json", + ) + options = &tabletmanagerdatapb.VDiffOptions{ + PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{ + TabletTypes: "primary", + }, + CoreOptions: &tabletmanagerdatapb.VDiffCoreOptions{ + Tables: "t1", + }, + ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{ + Format: "json", + }, + } +) + +func TestVDiffUnsharded(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + env := newTestVDiffEnv(t, ctx, []string{"0"}, []string{"0"}, "", nil) + defer env.close() + + UUID := uuid.New().String() + req := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: "target", + Workflow: env.workflow, + Action: string(vdiff.ShowAction), + ActionArg: UUID, + } + starttime := time.Now().UTC().Format(vdiff.TimestampFormat) + comptime := time.Now().Add(1 * time.Second).UTC().Format(vdiff.TimestampFormat) + goodReportfmt := `{ + "Workflow": "vdiffTest", + "Keyspace": "target", + "State": "completed", + "UUID": "%s", + "RowsCompared": %d, + "HasMismatch": %t, + "Shards": "0", + "StartedAt": "%s", + "CompletedAt": "%s" +} +` + + badReportfmt := `{ + "Workflow": "vdiffTest", + "Keyspace": "target", + "State": "completed", + "UUID": "%s", + "RowsCompared": %d, + "HasMismatch": %t, + "Shards": "0", + "StartedAt": "%s", + "CompletedAt": "%s", + "TableSummary": { + "t1": { + "TableName": "t1", + "State": "completed", + "RowsCompared": %d, + "MatchingRows": %d, + "MismatchedRows": %d, + "ExtraRowsSource": %d, + "ExtraRowsTarget": %d + } + }, + "Reports": { + "t1": { + "0": { + "TableName": "t1", + "ProcessedRows": %d, + "MatchingRows": %d, + "MismatchedRows": %d, + "ExtraRowsSource": %d, + "ExtraRowsTarget": %d, + %s + } + } + } +} +` + + testcases := []struct { + id string + result *sqltypes.Result + report string + }{{ + id: "1", + result: sqltypes.MakeTestResult(fields, + "completed||t1|"+UUID+"|completed|3|"+starttime+"|3|"+comptime+"|0|"+ + `{"TableName": "t1", "MatchingRows": 3, "ProcessedRows": 3, "MismatchedRows": 0, "ExtraRowsSource": 0, `+ + `"ExtraRowsTarget": 0}`), + report: fmt.Sprintf(goodReportfmt, + UUID, 3, false, starttime, comptime, + ), + }, { + id: "2", + result: sqltypes.MakeTestResult(fields, + "completed||t1|"+UUID+"|completed|3|"+starttime+"|3|"+comptime+"|1|"+ + `{"TableName": "t1", "MatchingRows": 1, "ProcessedRows": 3, "MismatchedRows": 0, "ExtraRowsSource": 0, `+ + `"ExtraRowsTarget": 2, "ExtraRowsTargetSample": [{"Row": {"c1": "2", "c2": "4"}}]}`), + report: fmt.Sprintf(badReportfmt, + UUID, 3, true, starttime, comptime, 3, 1, 0, 0, 2, 3, 1, 0, 0, 2, + `"ExtraRowsTargetSample": [ + { + "Row": { + "c1": "2", + "c2": "4" + } + } + ]`), + }, { + id: "3", + result: sqltypes.MakeTestResult(fields, + "completed||t1|"+UUID+"|completed|3|"+starttime+"|3|"+comptime+"|1|"+ + `{"TableName": "t1", "MatchingRows": 1, "ProcessedRows": 3, "MismatchedRows": 0, "ExtraRowsSource": 2, `+ + `"ExtraRowsTarget": 0, "ExtraRowsSourceSample": [{"Row": {"c1": "2", "c2": "4"}}]}`), + report: fmt.Sprintf(badReportfmt, + UUID, 3, true, starttime, comptime, 3, 1, 0, 2, 0, 3, 1, 0, 2, 0, + `"ExtraRowsSourceSample": [ + { + "Row": { + "c1": "2", + "c2": "4" + } + } + ]`), + }, { + id: "4", + result: sqltypes.MakeTestResult(fields, + "completed||t1|"+UUID+"|completed|3|"+starttime+"|3|"+comptime+"|1|"+ + `{"TableName": "t1", "MatchingRows": 2, "ProcessedRows": 3, "MismatchedRows": 0, "ExtraRowsSource": 1, `+ + `"ExtraRowsTarget": 0, "ExtraRowsSourceSample": [{"Row": {"c1": "2", "c2": "4"}}]}`), + report: fmt.Sprintf(badReportfmt, + UUID, 3, true, starttime, comptime, 3, 2, 0, 1, 0, 3, 2, 0, 1, 0, + `"ExtraRowsSourceSample": [ + { + "Row": { + "c1": "2", + "c2": "4" + } + } + ]`), + }, { + id: "5", + result: sqltypes.MakeTestResult(fields, + "completed||t1|"+UUID+"|completed|3|"+starttime+"|3|"+comptime+"|1|"+ + `{"TableName": "t1", "MatchingRows": 2, "ProcessedRows": 3, "MismatchedRows": 0, "ExtraRowsSource": 1, `+ + `"ExtraRowsTarget": 0, "ExtraRowsSourceSample": [{"Row": {"c1": "2", "c2": "4"}}]}`), + report: fmt.Sprintf(badReportfmt, + UUID, 3, true, starttime, comptime, 3, 2, 0, 1, 0, 3, 2, 0, 1, 0, + `"ExtraRowsSourceSample": [ + { + "Row": { + "c1": "2", + "c2": "4" + } + } + ]`), + }, { + id: "6", + result: sqltypes.MakeTestResult(fields, + "completed||t1|"+UUID+"|completed|3|"+starttime+"|3|"+comptime+"|1|"+ + `{"TableName": "t1", "MatchingRows": 2, "ProcessedRows": 3, "MismatchedRows": 1, "ExtraRowsSource": 0, `+ + `"ExtraRowsTarget": 0, "MismatchedRowsSample": [{"Source": {"Row": {"c1": "2", "c2": "3"}}, `+ + `"Target": {"Row": {"c1": "2", "c2": "4"}}}]}`), + report: fmt.Sprintf(badReportfmt, + UUID, 3, true, starttime, comptime, 3, 2, 1, 0, 0, 3, 2, 1, 0, 0, + `"MismatchedRowsSample": [ + { + "Source": { + "Row": { + "c1": "2", + "c2": "3" + } + }, + "Target": { + "Row": { + "c1": "2", + "c2": "4" + } + } + } + ]`), + }, { + id: "7", // --only_pks + result: sqltypes.MakeTestResult(fields, + "completed||t1|"+UUID+"|completed|3|"+starttime+"|3|"+comptime+"|1|"+ + `{"TableName": "t1", "MatchingRows": 2, "ProcessedRows": 3, "MismatchedRows": 1, "ExtraRowsSource": 0, `+ + `"ExtraRowsTarget": 0, "MismatchedRowsSample": [{"Source": {"Row": {"c1": "2"}}, `+ + `"Target": {"Row": {"c1": "2"}}}]}`), + report: fmt.Sprintf(badReportfmt, + UUID, 3, true, starttime, comptime, 3, 2, 1, 0, 0, 3, 2, 1, 0, 0, + `"MismatchedRowsSample": [ + { + "Source": { + "Row": { + "c1": "2" + } + }, + "Target": { + "Row": { + "c1": "2" + } + } + } + ]`), + }, { + id: "8", // --debug_query + result: sqltypes.MakeTestResult(fields, + "completed||t1|"+UUID+"|completed|3|"+starttime+"|3|"+comptime+"|1|"+ + `{"TableName": "t1", "MatchingRows": 2, "ProcessedRows": 3, "MismatchedRows": 1, "ExtraRowsSource": 0, `+ + `"ExtraRowsTarget": 0, "MismatchedRowsSample": [{"Source": {"Row": {"c1": "2", "c2": "3"}, "Query": "select c1, c2 from t1 where c1=2;"}, `+ + `"Target": {"Row": {"c1": "2", "c2": "4"}, "Query": "select c1, c2 from t1 where c1=2;"}}]}`), + report: fmt.Sprintf(badReportfmt, + UUID, 3, true, starttime, comptime, 3, 2, 1, 0, 0, 3, 2, 1, 0, 0, + `"MismatchedRowsSample": [ + { + "Source": { + "Row": { + "c1": "2", + "c2": "3" + }, + "Query": "select c1, c2 from t1 where c1=2;" + }, + "Target": { + "Row": { + "c1": "2", + "c2": "4" + }, + "Query": "select c1, c2 from t1 where c1=2;" + } + } + ]`), + }, + } + + for _, tcase := range testcases { + t.Run(tcase.id, func(t *testing.T) { + res := &tabletmanagerdatapb.VDiffResponse{ + Id: 1, + Output: sqltypes.ResultToProto3(tcase.result), + } + env.tmc.setVDResults(env.tablets[200].tablet, req, res) + req := &vtctldatapb.VDiffShowRequest{ + TargetKeyspace: "target", + Workflow: env.workflow, + Arg: UUID, + } + + resp, err := env.ws.VDiffShow(context.Background(), req) + require.NoError(t, err) + vds, err := displayShowSingleSummary(options.ReportOptions.Format, "target", env.workflow, UUID, resp, false) + require.NoError(t, err) + require.Equal(t, vdiff.CompletedState, vds) + + output := env.getOutput() + assert.Equal(t, tcase.report, output) + env.resetOutput() + }) + } +} + +func TestVDiffSharded(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + env := newTestVDiffEnv(t, ctx, []string{"-40", "40-"}, []string{"-80", "80-"}, "", map[string]string{ + "-80": "MySQL56/0e45e704-7cb9-11ed-a1eb-0242ac120002:1-890", + "80-": "MySQL56/1497ddb0-7cb9-11ed-a1eb-0242ac120002:1-891", + }) + defer env.close() + + UUID := uuid.New().String() + req := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: "target", + Workflow: env.workflow, + Action: string(vdiff.ShowAction), + ActionArg: UUID, + } + starttime := time.Now().UTC().Format(vdiff.TimestampFormat) + comptime := time.Now().Add(1 * time.Second).UTC().Format(vdiff.TimestampFormat) + verbosefmt := `{ + "Workflow": "vdiffTest", + "Keyspace": "target", + "State": "completed", + "UUID": "%s", + "RowsCompared": %d, + "HasMismatch": %t, + "Shards": "-80,80-", + "StartedAt": "%s", + "CompletedAt": "%s", + "TableSummary": { + "t1": { + "TableName": "t1", + "State": "completed", + "RowsCompared": %d, + "MatchingRows": %d, + "MismatchedRows": %d, + "ExtraRowsSource": %d, + "ExtraRowsTarget": %d + } + }, + "Reports": { + "t1": { + "-80": { + "TableName": "t1", + "ProcessedRows": %d, + "MatchingRows": %d, + "MismatchedRows": %d, + "ExtraRowsSource": %d, + "ExtraRowsTarget": %d + }, + "80-": { + "TableName": "t1", + "ProcessedRows": %d, + "MatchingRows": %d, + "MismatchedRows": %d, + "ExtraRowsSource": %d, + "ExtraRowsTarget": %d + } + } + } +} +` + + testcases := []struct { + id string + shard1Res *sqltypes.Result + shard2Res *sqltypes.Result + report string + }{{ + id: "1", + shard1Res: sqltypes.MakeTestResult(fields, + "completed||t1|"+UUID+"|completed|3|"+starttime+"|3|"+comptime+"|0|"+ + `{"TableName": "t1", "MatchingRows": 3, "ProcessedRows": 3, "MismatchedRows": 0, "ExtraRowsSource": 0, `+ + `"ExtraRowsTarget": 0}`), + shard2Res: sqltypes.MakeTestResult(fields, + "completed||t1|"+UUID+"|completed|3|"+starttime+"|3|"+comptime+"|0|"+ + `{"TableName": "t1", "MatchingRows": 3, "ProcessedRows": 3, "MismatchedRows": 0, "ExtraRowsSource": 0, `+ + `"ExtraRowsTarget": 0}`), + report: fmt.Sprintf(verbosefmt, + UUID, 6, false, starttime, comptime, 6, 6, 0, 0, 0, 3, 3, 0, 0, 0, 3, 3, 0, 0, 0, + ), + }} + + for _, tcase := range testcases { + t.Run(tcase.id, func(t *testing.T) { + shard1Res := &tabletmanagerdatapb.VDiffResponse{ + Id: 1, + Output: sqltypes.ResultToProto3(tcase.shard1Res), + } + shard2Res := &tabletmanagerdatapb.VDiffResponse{ + Id: 1, + Output: sqltypes.ResultToProto3(tcase.shard2Res), + } + env.tmc.setVDResults(env.tablets[200].tablet, req, shard1Res) + env.tmc.setVDResults(env.tablets[210].tablet, req, shard2Res) + req := &vtctldatapb.VDiffShowRequest{ + TargetKeyspace: "target", + Workflow: env.workflow, + Arg: UUID, + } + + resp, err := env.ws.VDiffShow(context.Background(), req) + require.NoError(t, err) + vds, err := displayShowSingleSummary(options.ReportOptions.Format, "target", env.workflow, UUID, resp, true) + require.NoError(t, err) + require.Equal(t, vdiff.CompletedState, vds) + + output := env.getOutput() + assert.Equal(t, tcase.report, output) + env.resetOutput() + }) + } +} + +func TestGetStructNames(t *testing.T) { + type s struct { + A string + B int64 + } + got := getStructFieldNames(s{}) + want := []string{"A", "B"} + require.EqualValues(t, want, got) +} + +func TestBuildProgressReport(t *testing.T) { + type args struct { + summary *summary + rowsToCompare int64 + } + tests := []struct { + name string + args args + want *vdiff.ProgressReport + }{ + { + name: "no progress", + args: args{ + summary: &summary{RowsCompared: 0}, + rowsToCompare: 100, + }, + want: &vdiff.ProgressReport{ + Percentage: 0, + ETA: "", // no ETA + }, + }, + { + name: "one third of the way", + args: args{ + summary: &summary{ + RowsCompared: 33, + StartedAt: time.Now().Add(-10 * time.Second).UTC().Format(vdiff.TimestampFormat), + }, + rowsToCompare: 100, + }, + want: &vdiff.ProgressReport{ + Percentage: 33, + ETA: time.Now().Add(20 * time.Second).UTC().Format(vdiff.TimestampFormat), + }, + }, + { + name: "half way", + args: args{ + summary: &summary{ + RowsCompared: 5000000000, + StartedAt: time.Now().Add(-10 * time.Hour).UTC().Format(vdiff.TimestampFormat), + }, + rowsToCompare: 10000000000, + }, + want: &vdiff.ProgressReport{ + Percentage: 50, + ETA: time.Now().Add(10 * time.Hour).UTC().Format(vdiff.TimestampFormat), + }, + }, + { + name: "full progress", + args: args{ + summary: &summary{ + RowsCompared: 100, + CompletedAt: time.Now().UTC().Format(vdiff.TimestampFormat), + }, + rowsToCompare: 100, + }, + want: &vdiff.ProgressReport{ + Percentage: 100, + ETA: time.Now().UTC().Format(vdiff.TimestampFormat), + }, + }, + { + name: "more than in I_S", + args: args{ + summary: &summary{ + RowsCompared: 100, + CompletedAt: time.Now().UTC().Format(vdiff.TimestampFormat), + }, + rowsToCompare: 50, + }, + want: &vdiff.ProgressReport{ + Percentage: 100, + ETA: time.Now().UTC().Format(vdiff.TimestampFormat), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + buildProgressReport(tt.args.summary, tt.args.rowsToCompare) + // We always check the percentage + require.Equal(t, int(tt.want.Percentage), int(tt.args.summary.Progress.Percentage)) + + // We only check the ETA if there is one + if tt.want.ETA != "" { + // Let's check that we're within 1 second to avoid flakes + wantTime, err := time.Parse(vdiff.TimestampFormat, tt.want.ETA) + require.NoError(t, err) + var timeDiff float64 + if tt.want.Percentage == 100 { + completedTime, err := time.Parse(vdiff.TimestampFormat, tt.args.summary.CompletedAt) + require.NoError(t, err) + timeDiff = math.Abs(completedTime.Sub(wantTime).Seconds()) + } else { + startTime, err := time.Parse(vdiff.TimestampFormat, tt.args.summary.StartedAt) + require.NoError(t, err) + completedTimeUnix := float64(time.Now().UTC().Unix()-startTime.UTC().Unix()) * (100 / tt.want.Percentage) + estimatedTime, err := time.Parse(vdiff.TimestampFormat, tt.want.ETA) + require.NoError(t, err) + timeDiff = math.Abs(estimatedTime.Sub(startTime).Seconds() - completedTimeUnix) + } + require.LessOrEqual(t, timeDiff, 1.0) + } + }) + } +}