From d651a4214e9b553a617041fbab04f4c30119a8a9 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 21 Oct 2024 17:15:18 +0530 Subject: [PATCH] feat: add metrics for atomic distributed transactions (#16939) Signed-off-by: Harshit Gangal --- go/test/endtoend/cluster/vtgate_process.go | 10 +- .../transaction/twopc/metric/main_test.go | 117 ++++++ .../transaction/twopc/metric/metric_test.go | 396 ++++++++++++++++++ .../transaction/twopc/metric/schema.sql | 13 + .../transaction/twopc/metric/vschema.json | 29 ++ .../endtoend/transaction/twopc/twopc_test.go | 27 +- .../endtoend/transaction/twopc/utils/utils.go | 15 + go/test/endtoend/transaction/tx_test.go | 3 +- go/test/endtoend/vtgate/misc_test.go | 4 +- go/vt/vtgate/executor.go | 5 + go/vt/vtgate/tx_conn.go | 17 + go/vt/vttablet/tabletserver/debug_2pc.go | 25 ++ go/vt/vttablet/tabletserver/dt_executor.go | 33 +- go/vt/vttablet/tabletserver/production.go | 8 + .../vttablet/tabletserver/tabletenv/stats.go | 11 +- go/vt/vttablet/tabletserver/tabletserver.go | 24 +- go/vt/vttablet/tabletserver/tx_engine.go | 12 +- test/config.json | 9 + 18 files changed, 701 insertions(+), 57 deletions(-) create mode 100644 go/test/endtoend/transaction/twopc/metric/main_test.go create mode 100644 go/test/endtoend/transaction/twopc/metric/metric_test.go create mode 100644 go/test/endtoend/transaction/twopc/metric/schema.sql create mode 100644 go/test/endtoend/transaction/twopc/metric/vschema.json diff --git a/go/test/endtoend/cluster/vtgate_process.go b/go/test/endtoend/cluster/vtgate_process.go index d7f5dc3dc01..c01f7c6e93b 100644 --- a/go/test/endtoend/cluster/vtgate_process.go +++ b/go/test/endtoend/cluster/vtgate_process.go @@ -309,11 +309,11 @@ func VtgateProcessInstance( } // GetVars returns map of vars -func (vtgate *VtgateProcess) GetVars() (map[string]any, error) { +func (vtgate *VtgateProcess) GetVars() map[string]any { resultMap := make(map[string]any) resp, err := http.Get(vtgate.VerifyURL) if err != nil { - return nil, fmt.Errorf("error getting response from %s", vtgate.VerifyURL) + return nil } defer resp.Body.Close() @@ -321,11 +321,11 @@ func (vtgate *VtgateProcess) GetVars() (map[string]any, error) { respByte, _ := io.ReadAll(resp.Body) err := json.Unmarshal(respByte, &resultMap) if err != nil { - return nil, fmt.Errorf("not able to parse response body") + return nil } - return resultMap, nil + return resultMap } - return nil, fmt.Errorf("unsuccessful response") + return nil } // ReadVSchema reads the vschema from the vtgate endpoint for it and returns diff --git a/go/test/endtoend/transaction/twopc/metric/main_test.go b/go/test/endtoend/transaction/twopc/metric/main_test.go new file mode 100644 index 00000000000..73cc380a900 --- /dev/null +++ b/go/test/endtoend/transaction/twopc/metric/main_test.go @@ -0,0 +1,117 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package transaction + +import ( + "context" + _ "embed" + "flag" + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" + twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + vtgateGrpcAddress string + keyspaceName = "ks" + cell = "zone1" + hostname = "localhost" + sidecarDBName = "vt_ks" + + //go:embed schema.sql + SchemaSQL string + + //go:embed vschema.json + VSchema string +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitcode := func() int { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + if err := clusterInstance.StartTopo(); err != nil { + return 1 + } + + // Reserve vtGate port in order to pass it to vtTablet + clusterInstance.VtgateGrpcPort = clusterInstance.GetAndReservePort() + + // Set extra args for twopc + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, + "--transaction_mode", "TWOPC", + "--grpc_use_effective_callerid", + ) + clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, + "--twopc_enable", + "--twopc_abandon_age", "1", + "--queryserver-config-transaction-cap", "100", + ) + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: SchemaSQL, + VSchema: VSchema, + SidecarDBName: sidecarDBName, + DurabilityPolicy: "semi_sync", + } + if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil { + return 1 + } + + // Start Vtgate + if err := clusterInstance.StartVtgate(); err != nil { + return 1 + } + vtParams = clusterInstance.GetVTParams(keyspaceName) + vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort) + + return m.Run() + }() + os.Exit(exitcode) +} + +func start(t *testing.T) (*mysql.Conn, func()) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + cleanup(t) + + return conn, func() { + conn.Close() + cleanup(t) + } +} + +func cleanup(t *testing.T) { + cluster.PanicHandler(t) + twopcutil.ClearOutTable(t, vtParams, "twopc_user") + twopcutil.ClearOutTable(t, vtParams, "twopc_t1") +} diff --git a/go/test/endtoend/transaction/twopc/metric/metric_test.go b/go/test/endtoend/transaction/twopc/metric/metric_test.go new file mode 100644 index 00000000000..40645628f45 --- /dev/null +++ b/go/test/endtoend/transaction/twopc/metric/metric_test.go @@ -0,0 +1,396 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package transaction + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/test/endtoend/cluster" + twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" + "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/vt/callerid" + "vitess.io/vitess/go/vt/vtgate/vtgateconn" +) + +// TestTransactionModes tests transactions using twopc mode +func TestTransactionModeMetrics(t *testing.T) { + conn, closer := start(t) + defer closer() + + tcases := []struct { + name string + stmts []string + want commitMetric + }{{ + name: "nothing to commit: so no change on vars", + stmts: []string{"commit"}, + }, { + name: "begin commit - no dml: so no change on vars", + stmts: []string{"begin", "commit"}, + }, { + name: "single shard", + stmts: []string{ + "begin", + "insert into twopc_user(id) values (1)", + "commit", + }, + want: commitMetric{TotalCount: 1, SingleCount: 1}, + }, { + name: "multi shard insert", + stmts: []string{ + "begin", + "insert into twopc_user(id) values (7),(8)", + "commit", + }, + want: commitMetric{TotalCount: 1, MultiCount: 1, TwoPCCount: 1}, + }, { + name: "multi shard delete", + stmts: []string{ + "begin", + "delete from twopc_user", + "commit", + }, + want: commitMetric{TotalCount: 1, MultiCount: 1, TwoPCCount: 1}, + }} + + initial := getCommitMetric(t) + utils.Exec(t, conn, "set transaction_mode = multi") + for _, tc := range tcases { + t.Run(tc.name, func(t *testing.T) { + for _, stmt := range tc.stmts { + utils.Exec(t, conn, stmt) + } + updatedMetric := getCommitMetric(t) + assert.EqualValues(t, tc.want.TotalCount, updatedMetric.TotalCount-initial.TotalCount, "TotalCount") + assert.EqualValues(t, tc.want.SingleCount, updatedMetric.SingleCount-initial.SingleCount, "SingleCount") + assert.EqualValues(t, tc.want.MultiCount, updatedMetric.MultiCount-initial.MultiCount, "MultiCount") + assert.Zero(t, updatedMetric.TwoPCCount-initial.TwoPCCount, "TwoPCCount") + initial = updatedMetric + }) + } + + utils.Exec(t, conn, "set transaction_mode = twopc") + for _, tc := range tcases { + t.Run(tc.name, func(t *testing.T) { + for _, stmt := range tc.stmts { + utils.Exec(t, conn, stmt) + } + updatedMetric := getCommitMetric(t) + assert.EqualValues(t, tc.want.TotalCount, updatedMetric.TotalCount-initial.TotalCount, "TotalCount") + assert.EqualValues(t, tc.want.SingleCount, updatedMetric.SingleCount-initial.SingleCount, "SingleCount") + assert.Zero(t, updatedMetric.MultiCount-initial.MultiCount, "MultiCount") + assert.EqualValues(t, tc.want.TwoPCCount, updatedMetric.TwoPCCount-initial.TwoPCCount, "TwoPCCount") + initial = updatedMetric + }) + } +} + +// TestVTGate2PCCommitMetricOnFailure tests unresolved commit metrics on VTGate. +func TestVTGate2PCCommitMetricOnFailure(t *testing.T) { + defer cleanup(t) + + initialCount := getVarValue[float64](t, "CommitUnresolved", clusterInstance.VtgateProcess.GetVars) + + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + conn := vtgateConn.Session("", nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, err = conn.Execute(ctx, "begin", nil) + require.NoError(t, err) + _, err = conn.Execute(ctx, "insert into twopc_user(id, name) values(7,'foo'), (8,'bar')", nil) + require.NoError(t, err) + + // fail after mm commit. + newCtx := callerid.NewContext(ctx, callerid.NewEffectiveCallerID("MMCommitted_FailNow", "", ""), nil) + _, err = conn.Execute(newCtx, "commit", nil) + require.ErrorContains(t, err, "Fail After MM commit") + + updatedCount := getVarValue[float64](t, "CommitUnresolved", clusterInstance.VtgateProcess.GetVars) + assert.EqualValues(t, 1, updatedCount-initialCount, "CommitUnresolved") + + waitForResolve(ctx, t, conn, 5*time.Second) + + _, err = conn.Execute(ctx, "begin", nil) + require.NoError(t, err) + _, err = conn.Execute(ctx, "insert into twopc_user(id, name) values(9,'foo')", nil) + require.NoError(t, err) + _, err = conn.Execute(ctx, "insert into twopc_user(id, name) values(10,'apa')", nil) + require.NoError(t, err) + + // fail during rm commit. + newCtx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("RMCommit_-40_FailNow", "", ""), nil) + _, err = conn.Execute(newCtx, "commit", nil) + require.ErrorContains(t, err, "Fail During RM commit") + + updatedCount = getVarValue[float64](t, "CommitUnresolved", clusterInstance.VtgateProcess.GetVars) + assert.EqualValues(t, 2, updatedCount-initialCount, "CommitUnresolved") + + waitForResolve(ctx, t, conn, 5*time.Second) +} + +// TestVTTablet2PCMetrics tests 2pc metrics on VTTablet. +func TestVTTablet2PCMetrics(t *testing.T) { + defer cleanup(t) + + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + conn := vtgateConn.Session("", nil) + ctx, cancel := context.WithCancel(context.Background()) + ctx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("MMCommitted_FailNow", "", ""), nil) + defer cancel() + + for i := 1; i <= 20; i++ { + _, err = conn.Execute(ctx, "begin", nil) + require.NoError(t, err) + query := fmt.Sprintf("insert into twopc_user(id, name) values(%d,'foo'), (%d,'bar'), (%d,'baz')", i, i*101, i+53) + _, err = conn.Execute(ctx, query, nil) + require.NoError(t, err) + + multi := len(conn.SessionPb().ShardSessions) > 1 + + // fail after mm commit. + _, err = conn.Execute(ctx, "commit", nil) + if multi { + assert.ErrorContains(t, err, "Fail After MM commit") + } else { + assert.NoError(t, err) + } + } + + waitForResolve(ctx, t, conn, 5*time.Second) + + // at least 1 unresolved transaction should be seen by the gauge. + unresolvedCount := getUnresolvedTxCount(t) + assert.Greater(t, unresolvedCount, 1.0) + + // after next ticker should be become zero. + timeout := time.After(3 * time.Second) + for { + select { + case <-timeout: + t.Errorf("unresolved transaction not reduced to zero within the time limit") + return + case <-time.After(500 * time.Millisecond): + unresolvedCount = getUnresolvedTxCount(t) + if unresolvedCount == 0 { + return + } + fmt.Printf("unresolved tx count: %f\n", unresolvedCount) + } + } +} + +// TestVTTablet2PCMetricsFailCommitPrepared tests 2pc metrics on VTTablet on commit prepared failure..';;/ +func TestVTTablet2PCMetricsFailCommitPrepared(t *testing.T) { + defer cleanup(t) + + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + conn := vtgateConn.Session("", nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + newCtx := callerid.NewContext(ctx, callerid.NewEffectiveCallerID("CP_80-_R", "", ""), nil) + execute(t, newCtx, conn, "begin") + execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (4, 1)") + execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (6, 2)") + execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (9, 3)") + _, err = conn.Execute(newCtx, "commit", nil) + require.ErrorContains(t, err, "commit prepared: retryable error") + dtidRetryable := getDTIDFromWarnings(ctx, t, conn) + require.NotEmpty(t, dtidRetryable) + + cpFail := getVarValue[map[string]any](t, "CommitPreparedFail", clusterInstance.Keyspaces[0].Shards[2].FindPrimaryTablet().VttabletProcess.GetVars) + require.EqualValues(t, 1, cpFail["Retryable"]) + require.Nil(t, cpFail["NonRetryable"]) + + newCtx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("CP_80-_NR", "", ""), nil) + execute(t, newCtx, conn, "begin") + execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (20, 11)") + execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (22, 21)") + execute(t, newCtx, conn, "insert into twopc_t1(id, col) values (25, 31)") + _, err = conn.Execute(newCtx, "commit", nil) + require.ErrorContains(t, err, "commit prepared: non retryable error") + dtidNonRetryable := getDTIDFromWarnings(ctx, t, conn) + require.NotEmpty(t, dtidNonRetryable) + + cpFail = getVarValue[map[string]any](t, "CommitPreparedFail", clusterInstance.Keyspaces[0].Shards[2].FindPrimaryTablet().VttabletProcess.GetVars) + require.EqualValues(t, 1, cpFail["Retryable"]) // old counter value + require.EqualValues(t, 1, cpFail["NonRetryable"]) + + // restart to trigger unresolved transactions + err = clusterInstance.Keyspaces[0].Shards[2].FindPrimaryTablet().RestartOnlyTablet() + require.NoError(t, err) + + // dtid with retryable error should be resolved. + waitForDTIDResolve(ctx, t, conn, dtidRetryable, 5*time.Second) + + // dtid with non retryable error should remain unresolved. + qr, err := conn.Execute(ctx, fmt.Sprintf(`show transaction status for '%s'`, dtidNonRetryable), nil) + require.NoError(t, err) + require.NotZero(t, qr.Rows, "should remain unresolved") + + // running conclude transaction for it. + out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput( + "DistributedTransaction", "conclude", "--dtid", dtidNonRetryable) + require.NoError(t, err) + require.Contains(t, out, "Successfully concluded the distributed transaction") + // now verifying + qr, err = conn.Execute(ctx, fmt.Sprintf(`show transaction status for '%s'`, dtidNonRetryable), nil) + require.NoError(t, err) + require.Empty(t, qr.Rows) +} + +func execute(t *testing.T, ctx context.Context, conn *vtgateconn.VTGateSession, query string) { + t.Helper() + + _, err := conn.Execute(ctx, query, nil) + require.NoError(t, err) +} + +func getUnresolvedTxCount(t *testing.T) float64 { + unresolvedCount := 0.0 + for _, shard := range clusterInstance.Keyspaces[0].Shards { + unresolvedTx := getVarValue[map[string]any](t, "UnresolvedTransaction", shard.FindPrimaryTablet().VttabletProcess.GetVars) + if mmCount, exists := unresolvedTx["MetadataManager"]; exists { + unresolvedCount += mmCount.(float64) + } + if rmCount, exists := unresolvedTx["ResourceManager"]; exists { + unresolvedCount += rmCount.(float64) + } + } + return unresolvedCount +} + +type commitMetric struct { + TotalCount float64 + SingleCount float64 + MultiCount float64 + TwoPCCount float64 +} + +func getCommitMetric(t *testing.T) commitMetric { + t.Helper() + + vars := clusterInstance.VtgateProcess.GetVars() + require.NotNil(t, vars) + + cm := commitMetric{} + commitVars, exists := vars["CommitModeTimings"] + if !exists { + return cm + } + + commitMap, ok := commitVars.(map[string]any) + require.True(t, ok, "commit vars is not a map") + + cm.TotalCount = commitMap["TotalCount"].(float64) + + histogram, ok := commitMap["Histograms"].(map[string]any) + require.True(t, ok, "commit histogram is not a map") + + if single, ok := histogram["Single"]; ok { + singleMap, ok := single.(map[string]any) + require.True(t, ok, "single histogram is not a map") + cm.SingleCount = singleMap["Count"].(float64) + } + + if multi, ok := histogram["Multi"]; ok { + multiMap, ok := multi.(map[string]any) + require.True(t, ok, "multi histogram is not a map") + cm.MultiCount = multiMap["Count"].(float64) + } + + if twopc, ok := histogram["TwoPC"]; ok { + twopcMap, ok := twopc.(map[string]any) + require.True(t, ok, "twopc histogram is not a map") + cm.TwoPCCount = twopcMap["Count"].(float64) + } + + return cm +} + +func getVarValue[T any](t *testing.T, key string, varFunc func() map[string]any) T { + t.Helper() + + vars := varFunc() + require.NotNil(t, vars) + + value, exists := vars[key] + if !exists { + return *new(T) + } + castValue, ok := value.(T) + if !ok { + t.Errorf("unexpected type, want: %T, got %T", new(T), value) + } + return castValue +} + +func waitForResolve(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateSession, waitTime time.Duration) { + t.Helper() + + dtid := getDTIDFromWarnings(ctx, t, conn) + waitForDTIDResolve(ctx, t, conn, dtid, waitTime) +} + +func getDTIDFromWarnings(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateSession) string { + qr, err := conn.Execute(ctx, "show warnings", nil) + require.NoError(t, err) + require.Len(t, qr.Rows, 1) + + // validate warning output + w := twopcutil.ToWarn(qr.Rows[0]) + assert.Equal(t, "Warning", w.Level) + assert.EqualValues(t, 302, w.Code) + + // extract transaction ID + indx := strings.Index(w.Msg, " ") + require.Greater(t, indx, 0) + return w.Msg[:indx] +} + +func waitForDTIDResolve(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateSession, dtid string, waitTime time.Duration) { + unresolved := true + totalTime := time.After(waitTime) + for unresolved { + select { + case <-totalTime: + t.Errorf("transaction resolution exceeded wait time of %v", waitTime) + unresolved = false // break the loop. + case <-time.After(100 * time.Millisecond): + qr, err := conn.Execute(ctx, fmt.Sprintf(`show transaction status for '%s'`, dtid), nil) + require.NoError(t, err) + unresolved = len(qr.Rows) != 0 + } + } +} diff --git a/go/test/endtoend/transaction/twopc/metric/schema.sql b/go/test/endtoend/transaction/twopc/metric/schema.sql new file mode 100644 index 00000000000..da6e5cf289a --- /dev/null +++ b/go/test/endtoend/transaction/twopc/metric/schema.sql @@ -0,0 +1,13 @@ +create table twopc_user +( + id bigint, + name varchar(64), + primary key (id) +) Engine=InnoDB; + +create table twopc_t1 +( + id bigint, + col bigint, + primary key (id) +) Engine=InnoDB; \ No newline at end of file diff --git a/go/test/endtoend/transaction/twopc/metric/vschema.json b/go/test/endtoend/transaction/twopc/metric/vschema.json new file mode 100644 index 00000000000..c6be1426a87 --- /dev/null +++ b/go/test/endtoend/transaction/twopc/metric/vschema.json @@ -0,0 +1,29 @@ +{ + "sharded":true, + "vindexes": { + "xxhash": { + "type": "xxhash" + }, + "reverse_bits": { + "type": "reverse_bits" + } + }, + "tables": { + "twopc_user":{ + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "twopc_t1": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] + } + } +} \ No newline at end of file diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index 742aa832cfe..95d962655ba 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -908,21 +908,6 @@ func TestDTResolveAfterTransactionRecord(t *testing.T) { "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) } -type warn struct { - level string - code uint16 - msg string -} - -func toWarn(row sqltypes.Row) warn { - code, _ := row[1].ToUint16() - return warn{ - level: row[0].ToString(), - code: code, - msg: row[2].ToString(), - } -} - type txStatus struct { dtid string state string @@ -948,15 +933,15 @@ func testWarningAndTransactionStatus(t *testing.T, conn *vtgateconn.VTGateSessio require.Len(t, qr.Rows, 1) // validate warning output - w := toWarn(qr.Rows[0]) - assert.Equal(t, "Warning", w.level) - assert.EqualValues(t, 302, w.code) - assert.Contains(t, w.msg, warnMsg) + w := twopcutil.ToWarn(qr.Rows[0]) + assert.Equal(t, "Warning", w.Level) + assert.EqualValues(t, 302, w.Code) + assert.Contains(t, w.Msg, warnMsg) // extract transaction ID - indx := strings.Index(w.msg, " ") + indx := strings.Index(w.Msg, " ") require.Greater(t, indx, 0) - dtid := w.msg[:indx] + dtid := w.Msg[:indx] qr, err = conn.Execute(context.Background(), fmt.Sprintf(`show transaction status for '%v'`, dtid), nil) require.NoError(t, err) diff --git a/go/test/endtoend/transaction/twopc/utils/utils.go b/go/test/endtoend/transaction/twopc/utils/utils.go index 9d0adb57e3c..067877c4ece 100644 --- a/go/test/endtoend/transaction/twopc/utils/utils.go +++ b/go/test/endtoend/transaction/twopc/utils/utils.go @@ -223,3 +223,18 @@ func AddShards(t *testing.T, clusterInstance *cluster.LocalProcessCluster, keysp clusterInstance.Keyspaces[0].Shards = append(clusterInstance.Keyspaces[0].Shards, *shard) } } + +type Warn struct { + Level string + Code uint16 + Msg string +} + +func ToWarn(row sqltypes.Row) Warn { + code, _ := row[1].ToUint16() + return Warn{ + Level: row[0].ToString(), + Code: code, + Msg: row[2].ToString(), + } +} diff --git a/go/test/endtoend/transaction/tx_test.go b/go/test/endtoend/transaction/tx_test.go index 475b17cfa2c..753dcfb46bd 100644 --- a/go/test/endtoend/transaction/tx_test.go +++ b/go/test/endtoend/transaction/tx_test.go @@ -24,12 +24,11 @@ import ( "os" "testing" - "vitess.io/vitess/go/test/endtoend/utils" - "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" ) var ( diff --git a/go/test/endtoend/vtgate/misc_test.go b/go/test/endtoend/vtgate/misc_test.go index f15799a5e71..f3804a2a45f 100644 --- a/go/test/endtoend/vtgate/misc_test.go +++ b/go/test/endtoend/vtgate/misc_test.go @@ -844,8 +844,8 @@ func getVtgateApiErrorCounts(t *testing.T) float64 { } func getVar(t *testing.T, key string) interface{} { - vars, err := clusterInstance.VtgateProcess.GetVars() - require.NoError(t, err) + vars := clusterInstance.VtgateProcess.GetVars() + require.NotNil(t, vars) val, exists := vars[key] if !exists { diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 5dc388f1a3d..928f42fca30 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -77,6 +77,11 @@ var ( queriesProcessedByTable = stats.NewCountersWithMultiLabels("QueriesProcessedByTable", "Queries processed at vtgate by plan type, keyspace and table", []string{"Plan", "Keyspace", "Table"}) queriesRoutedByTable = stats.NewCountersWithMultiLabels("QueriesRoutedByTable", "Queries routed from vtgate to vttablet by plan type, keyspace and table", []string{"Plan", "Keyspace", "Table"}) + // commitMode records the timing of the commit phase of a transaction. + // It also tracks between different transaction mode i.e. Single, Multi and TwoPC + commitMode = stats.NewTimings("CommitModeTimings", "Commit Mode Time", "mode") + commitUnresolved = stats.NewCounter("CommitUnresolved", "Atomic Commit failed to conclude after commit decision is made") + exceedMemoryRowsLogger = logutil.NewThrottledLogger("ExceedMemoryRows", 1*time.Minute) errorTransform errorTransformer = nullErrorTransformer{} diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index eba362e82f9..968b41d38d3 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -21,6 +21,7 @@ import ( "fmt" "strings" "sync" + "time" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/vt/concurrency" @@ -107,12 +108,26 @@ func (txc *TxConn) Commit(ctx context.Context, session *SafeSession) error { twopc = txc.mode == vtgatepb.TransactionMode_TWOPC } + defer recordCommitTime(session, twopc, time.Now()) if twopc { return txc.commit2PC(ctx, session) } return txc.commitNormal(ctx, session) } +func recordCommitTime(session *SafeSession, twopc bool, startTime time.Time) { + switch { + case len(session.ShardSessions) == 0: + // No-op + case len(session.ShardSessions) == 1: + commitMode.Record("Single", startTime) + case twopc: + commitMode.Record("TwoPC", startTime) + default: + commitMode.Record("Multi", startTime) + } +} + func (txc *TxConn) queryService(ctx context.Context, alias *topodatapb.TabletAlias) (queryservice.QueryService, error) { if alias == nil { return txc.tabletGateway, nil @@ -295,6 +310,8 @@ func (txc *TxConn) errActionAndLogWarn(ctx context.Context, session *SafeSession if resumeErr := txc.rollbackTx(ctx, dtid, mmShard, rmShards, session.logging); resumeErr != nil { log.Warningf("Rollback failed after Prepare failure: %v", resumeErr) } + case Commit2pcStartCommit, Commit2pcPrepareCommit: + commitUnresolved.Add(1) } session.RecordWarning(&querypb.QueryWarning{ Code: uint32(sqlerror.ERInAtomicRecovery), diff --git a/go/vt/vttablet/tabletserver/debug_2pc.go b/go/vt/vttablet/tabletserver/debug_2pc.go index a0de20104db..5db72be0fba 100644 --- a/go/vt/vttablet/tabletserver/debug_2pc.go +++ b/go/vt/vttablet/tabletserver/debug_2pc.go @@ -19,12 +19,16 @@ limitations under the License. package tabletserver import ( + "context" "os" "path" "strconv" "time" + "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/log" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" ) const DebugTwoPc = true @@ -46,3 +50,24 @@ func commitPreparedDelayForTest(tsv *TabletServer) { time.Sleep(time.Duration(delVal) * time.Second) } } + +// checkTestFailure is used to simulate failures in 2PC flow for testing when DebugTwoPc is true. +func checkTestFailure(ctx context.Context, shard string) error { + if shard != "80-" { + return nil + } + callerID := callerid.EffectiveCallerIDFromContext(ctx) + if callerID == nil { + return nil + } + switch callerID.Principal { + case "CP_80-_R": + // retryable error. + return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "commit prepared: retryable error") + case "CP_80-_NR": + // non retryable error. + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "commit prepared: non retryable error") + default: + return nil + } +} diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index 823751df638..126c99814b8 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -33,19 +33,21 @@ import ( // DTExecutor is used for executing a distributed transactional request. type DTExecutor struct { - ctx context.Context - logStats *tabletenv.LogStats - te *TxEngine - qe *QueryEngine + ctx context.Context + logStats *tabletenv.LogStats + te *TxEngine + qe *QueryEngine + shardFunc func() string } // NewDTExecutor creates a new distributed transaction executor. -func NewDTExecutor(ctx context.Context, te *TxEngine, qe *QueryEngine, logStats *tabletenv.LogStats) *DTExecutor { +func NewDTExecutor(ctx context.Context, logStats *tabletenv.LogStats, te *TxEngine, qe *QueryEngine, shardFunc func() string) *DTExecutor { return &DTExecutor{ - ctx: ctx, - te: te, - qe: qe, - logStats: logStats, + ctx: ctx, + logStats: logStats, + te: te, + qe: qe, + shardFunc: shardFunc, } } @@ -159,10 +161,21 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) { defer func() { if err != nil { log.Warningf("failed to commit the prepared transaction '%s' with error: %v", dtid, err) - dte.te.checkErrorAndMarkFailed(ctx, dtid, err, "TwopcCommit") + fail := dte.te.checkErrorAndMarkFailed(ctx, dtid, err, "TwopcCommit") + if fail { + dte.te.env.Stats().CommitPreparedFail.Add("NonRetryable", 1) + } else { + dte.te.env.Stats().CommitPreparedFail.Add("Retryable", 1) + } } dte.te.txPool.RollbackAndRelease(ctx, conn) }() + if DebugTwoPc { + if err := checkTestFailure(dte.ctx, dte.shardFunc()); err != nil { + log.Errorf("failing test on commit prepared: %v", err) + return err + } + } if err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid); err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/production.go b/go/vt/vttablet/tabletserver/production.go index 70cb8b092fa..e0d8cb4fd66 100644 --- a/go/vt/vttablet/tabletserver/production.go +++ b/go/vt/vttablet/tabletserver/production.go @@ -18,6 +18,10 @@ limitations under the License. package tabletserver +import ( + "context" +) + // This file defines debug constants that are always false. // This file is used for building production code. // We use go build directives to include a file that defines the constant to true @@ -28,3 +32,7 @@ package tabletserver const DebugTwoPc = false func commitPreparedDelayForTest(tsv *TabletServer) {} + +func checkTestFailure(context.Context, string) error { + return nil +} diff --git a/go/vt/vttablet/tabletserver/tabletenv/stats.go b/go/vt/vttablet/tabletserver/tabletenv/stats.go index 1ad93532719..52bb6a5a3b0 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/stats.go +++ b/go/vt/vttablet/tabletserver/tabletenv/stats.go @@ -34,7 +34,6 @@ type Stats struct { ErrorCounters *stats.CountersWithSingleLabel InternalErrors *stats.CountersWithSingleLabel Warnings *stats.CountersWithSingleLabel - Unresolved *stats.GaugesWithSingleLabel // For now, only Prepares are tracked UserTableQueryCount *stats.CountersWithMultiLabels // Per CallerID/table counts UserTableQueryTimesNs *stats.CountersWithMultiLabels // Per CallerID/table latencies UserTransactionCount *stats.CountersWithMultiLabels // Per CallerID transaction counts @@ -49,6 +48,11 @@ type Stats struct { UserReservedTimesNs *stats.CountersWithSingleLabel // Per CallerID reserved connection duration QueryTimingsByTabletType *servenv.TimingsWrapper // Query timings split by current tablet type + + // Atomic Transactions + Unresolved *stats.GaugesWithSingleLabel + CommitPreparedFail *stats.CountersWithSingleLabel + RedoPreparedFail *stats.CountersWithSingleLabel } // NewStats instantiates a new set of stats scoped by exporter. @@ -83,7 +87,6 @@ func NewStats(exporter *servenv.Exporter) *Stats { ), InternalErrors: exporter.NewCountersWithSingleLabel("InternalErrors", "Internal component errors", "type", "Task", "StrayTransactions", "Panic", "HungQuery", "Schema", "TwopcCommit", "TwopcResurrection", "WatchdogFail", "Messages"), Warnings: exporter.NewCountersWithSingleLabel("Warnings", "Warnings", "type", "ResultsExceeded"), - Unresolved: exporter.NewGaugesWithSingleLabel("Unresolved", "Unresolved items", "item_type", "Prepares"), UserTableQueryCount: exporter.NewCountersWithMultiLabels("UserTableQueryCount", "Queries received for each CallerID/table combination", []string{"TableName", "CallerID", "Type"}), UserTableQueryTimesNs: exporter.NewCountersWithMultiLabels("UserTableQueryTimesNs", "Total latency for each CallerID/table combination", []string{"TableName", "CallerID", "Type"}), UserTransactionCount: exporter.NewCountersWithMultiLabels("UserTransactionCount", "transactions received for each CallerID", []string{"CallerID", "Conclusion"}), @@ -98,6 +101,10 @@ func NewStats(exporter *servenv.Exporter) *Stats { UserReservedTimesNs: exporter.NewCountersWithSingleLabel("UserReservedTimesNs", "Total reserved connection latency for each CallerID", "CallerID"), QueryTimingsByTabletType: exporter.NewTimings("QueryTimingsByTabletType", "Query timings broken down by active tablet type", "TabletType"), + + Unresolved: exporter.NewGaugesWithSingleLabel("UnresolvedTransaction", "Current unresolved transactions", "ManagerType"), + CommitPreparedFail: exporter.NewCountersWithSingleLabel("CommitPreparedFail", "failed prepared transactions commit", "FailureType"), + RedoPreparedFail: exporter.NewCountersWithSingleLabel("RedoPreparedFail", "failed prepared transactions on redo", "FailureType"), } stats.QPSRates = exporter.NewRates("QPS", stats.QueryTimings, 15*60/5, 5*time.Second) return stats diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 261bd900f41..f96911971be 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -703,7 +703,7 @@ func (tsv *TabletServer) Prepare(ctx context.Context, target *querypb.Target, tr "Prepare", "prepare", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) return txe.Prepare(transactionID, dtid) }, ) @@ -716,7 +716,7 @@ func (tsv *TabletServer) CommitPrepared(ctx context.Context, target *querypb.Tar "CommitPrepared", "commit_prepared", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) if DebugTwoPc { commitPreparedDelayForTest(tsv) } @@ -732,7 +732,7 @@ func (tsv *TabletServer) RollbackPrepared(ctx context.Context, target *querypb.T "RollbackPrepared", "rollback_prepared", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) return txe.RollbackPrepared(dtid, originalID) }, ) @@ -765,7 +765,7 @@ func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb. "CreateTransaction", "create_transaction", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) return txe.CreateTransaction(dtid, participants) }, ) @@ -779,7 +779,7 @@ func (tsv *TabletServer) StartCommit(ctx context.Context, target *querypb.Target "StartCommit", "start_commit", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) return txe.StartCommit(transactionID, dtid) }, ) @@ -793,7 +793,7 @@ func (tsv *TabletServer) SetRollback(ctx context.Context, target *querypb.Target "SetRollback", "set_rollback", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) return txe.SetRollback(dtid, transactionID) }, ) @@ -807,7 +807,7 @@ func (tsv *TabletServer) ConcludeTransaction(ctx context.Context, target *queryp "ConcludeTransaction", "conclude_transaction", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) return txe.ConcludeTransaction(dtid) }, ) @@ -820,7 +820,7 @@ func (tsv *TabletServer) ReadTransaction(ctx context.Context, target *querypb.Ta "ReadTransaction", "read_transaction", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) metadata, err = txe.ReadTransaction(dtid) return err }, @@ -835,7 +835,7 @@ func (tsv *TabletServer) UnresolvedTransactions(ctx context.Context, target *que "UnresolvedTransactions", "unresolved_transaction", nil, target, nil, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, logStats) + txe := NewDTExecutor(ctx, logStats, tsv.te, tsv.qe, tsv.getShard) transactions, err = txe.UnresolvedTransactions(time.Duration(abandonAgeSeconds) * time.Second) return err }, @@ -1865,7 +1865,7 @@ func (tsv *TabletServer) registerQueryListHandlers(queryLists []*QueryList) { func (tsv *TabletServer) registerTwopczHandler() { tsv.exporter.HandleFunc("/twopcz", func(w http.ResponseWriter, r *http.Request) { ctx := tabletenv.LocalContext() - txe := NewDTExecutor(ctx, tsv.te, tsv.qe, tabletenv.NewLogStats(ctx, "twopcz")) + txe := NewDTExecutor(ctx, tabletenv.NewLogStats(ctx, "twopcz"), tsv.te, tsv.qe, tsv.getShard) twopczHandler(txe, w, r) }) } @@ -2154,3 +2154,7 @@ func skipQueryPlanCache(options *querypb.ExecuteOptions) bool { } return options.SkipQueryPlanCache || options.HasCreatedTempTables } + +func (tsv *TabletServer) getShard() string { + return tsv.sm.Target().Shard +} diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index 1bd7c85b1f3..b6e0e69b86d 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -84,7 +84,6 @@ type TxEngine struct { // 2. TabletControls have been set in the tablet record, and Query service is going to be disabled. twopcAllowed []bool shutdownGracePeriod time.Duration - coordinatorAddress string abandonAge time.Duration ticks *timer.Timer @@ -455,6 +454,9 @@ func (te *TxEngine) prepareFromRedo() error { allErrs = append(allErrs, vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid)) if prepFailed { failedCounter++ + te.env.Stats().RedoPreparedFail.Add("NonRetryable", 1) + } else { + te.env.Stats().RedoPreparedFail.Add("Retryable", 1) } } else { preparedCounter++ @@ -596,14 +598,13 @@ func (te *TxEngine) startTransactionWatcher() { ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), te.abandonAge/4) defer cancel() - // Raise alerts on prepares that have been unresolved for too long. - // Use 5x abandonAge to give opportunity for transaction coordinator to resolve these redo logs. - count, err := te.twoPC.CountUnresolvedRedo(ctx, time.Now().Add(-te.abandonAge*5)) + // Track unresolved redo logs. + count, err := te.twoPC.CountUnresolvedRedo(ctx, time.Now().Add(-te.abandonAge)) if err != nil { te.env.Stats().InternalErrors.Add("RedoWatcherFail", 1) log.Errorf("Error reading prepared transactions: %v", err) } - te.env.Stats().Unresolved.Set("Prepares", count) + te.env.Stats().Unresolved.Set("ResourceManager", count) // Notify lingering distributed transactions. count, err = te.twoPC.CountUnresolvedTransaction(ctx, time.Now().Add(-te.abandonAge)) @@ -612,6 +613,7 @@ func (te *TxEngine) startTransactionWatcher() { log.Errorf("Error reading unresolved transactions: %v", err) return } + te.env.Stats().Unresolved.Set("MetadataManager", count) if count > 0 { te.dxNotify() } diff --git a/test/config.json b/test/config.json index cfc89de84ce..805c66f73c4 100644 --- a/test/config.json +++ b/test/config.json @@ -851,6 +851,15 @@ "RetryMax": 1, "Tags": [] }, + "vtgate_transaction_twopc_metric": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/transaction/twopc/metric"], + "Command": [], + "Manual": false, + "Shard": "vtgate_transaction", + "RetryMax": 1, + "Tags": [] + }, "vtgate_transaction_twopc_stress": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/transaction/twopc/stress"],