Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export BuildSummary func for VDiff show #17413

Merged
merged 1 commit into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 1 addition & 222 deletions go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@ limitations under the License.
package vdiff

import (
"encoding/json"
"fmt"
"html/template"
"io"
"math"
"reflect"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -579,7 +577,7 @@ func buildRecentListings(resp *vtctldatapb.VDiffShowResponse) ([]*listing, error
func displayShowSingleSummary(out io.Writer, format, keyspace, workflowName, uuid string, resp *vtctldatapb.VDiffShowResponse, verbose bool) (vdiff.VDiffState, error) {
state := vdiff.UnknownState
var output string
summary, err := buildSingleSummary(keyspace, workflowName, uuid, resp, verbose)
summary, err := workflow.BuildSummary(keyspace, workflowName, uuid, resp, verbose)
if err != nil {
return state, err
}
Expand Down Expand Up @@ -616,225 +614,6 @@ func displayShowSingleSummary(out io.Writer, format, keyspace, workflowName, uui
return state, nil
}

func buildSingleSummary(keyspace, workflow, uuid string, resp *vtctldatapb.VDiffShowResponse, verbose bool) (*summary, error) {
summary := &summary{
Workflow: workflow,
Keyspace: keyspace,
UUID: uuid,
State: vdiff.UnknownState,
RowsCompared: 0,
StartedAt: "",
CompletedAt: "",
HasMismatch: false,
Shards: "",
Reports: make(map[string]map[string]vdiff.DiffReport),
Errors: make(map[string]string),
Progress: nil,
}

var tableSummaryMap map[string]tableSummary
var reports map[string]map[string]vdiff.DiffReport
// Keep a tally of the states across all tables in all shards.
tableStateCounts := map[vdiff.VDiffState]int{
vdiff.UnknownState: 0,
vdiff.PendingState: 0,
vdiff.StartedState: 0,
vdiff.StoppedState: 0,
vdiff.ErrorState: 0,
vdiff.CompletedState: 0,
}
// Keep a tally of the summary states across all shards.
shardStateCounts := map[vdiff.VDiffState]int{
vdiff.UnknownState: 0,
vdiff.PendingState: 0,
vdiff.StartedState: 0,
vdiff.StoppedState: 0,
vdiff.ErrorState: 0,
vdiff.CompletedState: 0,
}
// Keep a tally of the approximate total rows to process as we'll use this for our progress
// report.
totalRowsToCompare := int64(0)
var shards []string
for shard, resp := range resp.TabletResponses {
first := true
if resp != nil && resp.Output != nil {
shards = append(shards, shard)
qr := sqltypes.Proto3ToResult(resp.Output)
if tableSummaryMap == nil {
tableSummaryMap = make(map[string]tableSummary, 0)
reports = make(map[string]map[string]vdiff.DiffReport, 0)
}
for _, row := range qr.Named().Rows {
// Update the global VDiff summary based on the per shard level summary.
// Since these values will be the same for all subsequent rows we only use
// the first row.
if first {
first = false
// Our timestamps are strings in `2022-06-26 20:43:25` format so we sort
// them lexicographically.
// We should use the earliest started_at across all shards.
if sa := row.AsString("started_at", ""); summary.StartedAt == "" || sa < summary.StartedAt {
summary.StartedAt = sa
}
// And we should use the latest completed_at across all shards.
if ca := row.AsString("completed_at", ""); summary.CompletedAt == "" || ca > summary.CompletedAt {
summary.CompletedAt = ca
}
// If we had an error on the shard, then let's add that to the summary.
if le := row.AsString("last_error", ""); le != "" {
summary.Errors[shard] = le
}
// Keep track of how many shards are marked as a specific state. We check
// this combined with the shard.table states to determine the VDiff summary
// state.
shardStateCounts[vdiff.VDiffState(strings.ToLower(row.AsString("vdiff_state", "")))]++
}

// Global VDiff summary updates that take into account the per table details
// per shard.
{
summary.RowsCompared += row.AsInt64("rows_compared", 0)
totalRowsToCompare += row.AsInt64("table_rows", 0)

// If we had a mismatch on any table on any shard then the global VDiff
// summary does too.
if mm, _ := row.ToBool("has_mismatch"); mm {
summary.HasMismatch = true
}
}

// Table summary information that must be accounted for across all shards.
{
table := row.AsString("table_name", "")
if table == "" { // This occurs when the table diff has not started on 1 or more shards
continue
}
// Create the global VDiff table summary object if it doesn't exist.
if _, ok := tableSummaryMap[table]; !ok {
tableSummaryMap[table] = tableSummary{
TableName: table,
State: vdiff.UnknownState,
}

}
ts := tableSummaryMap[table]
// This is the shard level VDiff table state.
sts := vdiff.VDiffState(strings.ToLower(row.AsString("table_state", "")))
tableStateCounts[sts]++

// The error state must be sticky, and we should not override any other
// known state with completed.
switch sts {
case vdiff.CompletedState:
if ts.State == vdiff.UnknownState {
ts.State = sts
}
case vdiff.ErrorState:
ts.State = sts
default:
if ts.State != vdiff.ErrorState {
ts.State = sts
}
}

diffReport := row.AsString("report", "")
dr := vdiff.DiffReport{}
if diffReport != "" {
err := json.Unmarshal([]byte(diffReport), &dr)
if err != nil {
return nil, err
}
ts.RowsCompared += dr.ProcessedRows
ts.MismatchedRows += dr.MismatchedRows
ts.MatchingRows += dr.MatchingRows
ts.ExtraRowsTarget += dr.ExtraRowsTarget
ts.ExtraRowsSource += dr.ExtraRowsSource
}
if _, ok := reports[table]; !ok {
reports[table] = make(map[string]vdiff.DiffReport)
}

reports[table][shard] = dr
tableSummaryMap[table] = ts
}
}
}
}

// The global VDiff summary should progress from pending->started->completed with
// stopped for any shard and error for any table being sticky for the global summary.
// We should only consider the VDiff to be complete if it's completed for every table
// on every shard.
if shardStateCounts[vdiff.StoppedState] > 0 {
summary.State = vdiff.StoppedState
} else if shardStateCounts[vdiff.ErrorState] > 0 || tableStateCounts[vdiff.ErrorState] > 0 {
summary.State = vdiff.ErrorState
} else if tableStateCounts[vdiff.StartedState] > 0 {
summary.State = vdiff.StartedState
} else if tableStateCounts[vdiff.PendingState] > 0 {
summary.State = vdiff.PendingState
} else if tableStateCounts[vdiff.CompletedState] == (len(tableSummaryMap) * len(shards)) {
// When doing shard consolidations/merges, we cannot rely solely on the
// vdiff_table state as there are N sources that we process rows from sequentially
// with each one writing to the shared _vt.vdiff_table record for the target shard.
// So we only mark the vdiff for the shard as completed when we've finished
// processing rows from all of the sources -- which is recorded by marking the
// vdiff done for the shard by setting _vt.vdiff.state = completed.
if shardStateCounts[vdiff.CompletedState] == len(shards) {
summary.State = vdiff.CompletedState
} else {
summary.State = vdiff.StartedState
}
} else {
summary.State = vdiff.UnknownState
}

// If the vdiff has been started then we can calculate the progress.
if summary.State == vdiff.StartedState {
summary.Progress = BuildProgressReport(summary.RowsCompared, totalRowsToCompare, summary.StartedAt)
}

sort.Strings(shards) // Sort for predictable output
summary.Shards = strings.Join(shards, ",")
summary.TableSummaryMap = tableSummaryMap
summary.Reports = reports
if !summary.HasMismatch && !verbose {
summary.Reports = nil
summary.TableSummaryMap = nil
}
// If we haven't completed the global VDiff then be sure to reflect that with no
// CompletedAt value.
if summary.State != vdiff.CompletedState {
summary.CompletedAt = ""
}
return summary, nil
}

func BuildProgressReport(rowsCompared int64, rowsToCompare int64, startedAt string) *vdiff.ProgressReport {
report := &vdiff.ProgressReport{}
if rowsCompared >= 1 {
// Round to 2 decimal points.
report.Percentage = math.Round(math.Min((float64(rowsCompared)/float64(rowsToCompare))*100, 100.00)*100) / 100
}
if math.IsNaN(report.Percentage) {
report.Percentage = 0
}
pctToGo := math.Abs(report.Percentage - 100.00)
startTime, _ := time.Parse(vdiff.TimestampFormat, startedAt)
curTime := time.Now().UTC()
runTime := curTime.Unix() - startTime.Unix()
if report.Percentage >= 1 {
// Calculate how long 1% took, on avg, and multiply that by the % left.
eta := time.Unix(((int64(runTime)/int64(report.Percentage))*int64(pctToGo))+curTime.Unix(), 1).UTC()
// Cap the ETA at 1 year out to prevent providing nonsensical ETAs.
if eta.Before(time.Now().UTC().AddDate(1, 0, 0)) {
report.ETA = eta.Format(vdiff.TimestampFormat)
}
}
return report
}

func commandShow(cmd *cobra.Command, args []string) error {
format, err := common.GetOutputFormat(cmd)
if err != nil {
Expand Down
110 changes: 0 additions & 110 deletions go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package vdiff
import (
"context"
"fmt"
"math"
"testing"
"time"

Expand Down Expand Up @@ -690,112 +689,3 @@ func TestGetStructNames(t *testing.T) {
want := []string{"A", "B"}
require.EqualValues(t, want, got)
}

func TestBuildProgressReport(t *testing.T) {
now := time.Now()
type args struct {
summary *summary
rowsToCompare int64
}
tests := []struct {
name string
args args
want *vdiff.ProgressReport
}{
{
name: "no progress",
args: args{
summary: &summary{RowsCompared: 0},
rowsToCompare: 100,
},
want: &vdiff.ProgressReport{
Percentage: 0,
ETA: "", // no ETA
},
},
{
name: "one third of the way",
args: args{
summary: &summary{
RowsCompared: 33,
StartedAt: now.Add(-10 * time.Second).UTC().Format(vdiff.TimestampFormat),
},
rowsToCompare: 100,
},
want: &vdiff.ProgressReport{
Percentage: 33,
ETA: now.Add(20 * time.Second).UTC().Format(vdiff.TimestampFormat),
},
},
{
name: "half way",
args: args{
summary: &summary{
RowsCompared: 5000000000,
StartedAt: now.Add(-10 * time.Hour).UTC().Format(vdiff.TimestampFormat),
},
rowsToCompare: 10000000000,
},
want: &vdiff.ProgressReport{
Percentage: 50,
ETA: now.Add(10 * time.Hour).UTC().Format(vdiff.TimestampFormat),
},
},
{
name: "full progress",
args: args{
summary: &summary{
RowsCompared: 100,
CompletedAt: now.UTC().Format(vdiff.TimestampFormat),
},
rowsToCompare: 100,
},
want: &vdiff.ProgressReport{
Percentage: 100,
ETA: now.UTC().Format(vdiff.TimestampFormat),
},
},
{
name: "more than in I_S",
args: args{
summary: &summary{
RowsCompared: 100,
CompletedAt: now.UTC().Format(vdiff.TimestampFormat),
},
rowsToCompare: 50,
},
want: &vdiff.ProgressReport{
Percentage: 100,
ETA: now.UTC().Format(vdiff.TimestampFormat),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.args.summary.Progress = BuildProgressReport(tt.args.summary.RowsCompared, tt.args.rowsToCompare, tt.args.summary.StartedAt)
// We always check the percentage
require.Equal(t, int(tt.want.Percentage), int(tt.args.summary.Progress.Percentage))

// We only check the ETA if there is one.
if tt.want.ETA != "" {
// Let's check that we're within 1 second to avoid flakes.
wantTime, err := time.Parse(vdiff.TimestampFormat, tt.want.ETA)
require.NoError(t, err)
var timeDiff float64
if tt.want.Percentage == 100 {
completedTime, err := time.Parse(vdiff.TimestampFormat, tt.args.summary.CompletedAt)
require.NoError(t, err)
timeDiff = math.Abs(completedTime.Sub(wantTime).Seconds())
} else {
startTime, err := time.Parse(vdiff.TimestampFormat, tt.args.summary.StartedAt)
require.NoError(t, err)
completedTimeUnix := float64(now.UTC().Unix()-startTime.UTC().Unix()) * (100 / tt.want.Percentage)
estimatedTime, err := time.Parse(vdiff.TimestampFormat, tt.want.ETA)
require.NoError(t, err)
timeDiff = math.Abs(estimatedTime.Sub(startTime).Seconds() - completedTimeUnix)
}
require.LessOrEqual(t, timeDiff, 1.0)
}
})
}
}
2 changes: 1 addition & 1 deletion go/vt/vtadmin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1785,7 +1785,7 @@ func (api *API) VDiffShow(ctx context.Context, req *vtadminpb.VDiffShowRequest)
}
}
if report.State == string(vdiff.StartedState) {
progress := vdiffcmd.BuildProgressReport(report.RowsCompared, totalRowsToCompare, report.StartedAt)
progress := workflow.BuildProgressReport(report.RowsCompared, totalRowsToCompare, report.StartedAt)
report.Progress = &vtadminpb.VDiffProgress{
Percentage: progress.Percentage,
Eta: progress.ETA,
Expand Down
Loading
Loading