Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Avoid creation of workflows with non-empty tables in target keyspace #16874

Merged
merged 11 commits into from
Oct 18, 2024
9 changes: 9 additions & 0 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,15 @@ func (mz *materializer) deploySchema() error {
}
}

// Check if any table being moved is already non-empty in the target keyspace.
// Skip this check for multi-tenant migrations.
if !mz.IsMultiTenantMigration() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nice future addition would be to check for any existing tenant data in each table on the target too.

err := validateEmptyTables(mz.ctx, mz.ts, mz.tmc, mz.targetShards, mz.ms.TableSettings)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're passing in a lot of mz members, which tells me it's probably better to have mz as the method receiver, no?

mz.validateEmptyTables()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this too, but went with the other approach anyway. Refactored the function with the receiver, it looks much cleaner now!
Also, if the method receiver is materializer, it makes sense to have validateEmptyTables in materializer.go. So, moved it from utils.go.

if err != nil {
return err
mattlord marked this conversation as resolved.
Show resolved Hide resolved
}
}

err := forAllShards(mz.targetShards, func(target *topo.ShardInfo) error {
allTables := []string{"/.*/"}

Expand Down
16 changes: 15 additions & 1 deletion go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
mzGetCopyState = "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1"
mzGetLatestCopyState = "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)"
insertPrefix = `/insert into _vt.vreplication\(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys, options\) values `
getNonEmptyTable = "select 1 from `t1` limit 1"
)

var (
Expand Down Expand Up @@ -520,6 +521,7 @@ func TestMigrateVSchema(t *testing.T) {
defer env.close()

env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{})
env.tmc.expectVRQuery(200, getNonEmptyTable, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzGetCopyState, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzGetLatestCopyState, &sqltypes.Result{})

Expand Down Expand Up @@ -578,6 +580,7 @@ func TestMoveTablesDDLFlag(t *testing.T) {
// a circular dependency.
// The TabletManager portion is tested in rpc_vreplication_test.go.
env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{})
env.tmc.expectVRQuery(200, getNonEmptyTable, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzGetCopyState, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzGetLatestCopyState, &sqltypes.Result{})

Expand Down Expand Up @@ -610,6 +613,7 @@ func TestMoveTablesDDLFlag(t *testing.T) {
func TestShardedAutoIncHandling(t *testing.T) {
tableName := "t1"
tableDDL := fmt.Sprintf("create table %s (id int not null auto_increment primary key, c1 varchar(10))", tableName)
validateEmptyTableQuery := fmt.Sprintf("select 1 from `%s` limit 1", tableName)
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
SourceKeyspace: "sourceks",
Expand Down Expand Up @@ -705,6 +709,7 @@ func TestShardedAutoIncHandling(t *testing.T) {
},
},
expectQueries: []string{
validateEmptyTableQuery,
tableDDL, // Unchanged
},
},
Expand Down Expand Up @@ -750,6 +755,7 @@ func TestShardedAutoIncHandling(t *testing.T) {
},
},
expectQueries: []string{ // auto_increment clause removed
validateEmptyTableQuery,
fmt.Sprintf(`create table %s (
id int not null primary key,
c1 varchar(10)
Expand Down Expand Up @@ -806,6 +812,7 @@ func TestShardedAutoIncHandling(t *testing.T) {
},
},
expectQueries: []string{ // auto_increment clause removed
validateEmptyTableQuery,
fmt.Sprintf(`create table %s (
id int not null primary key,
c1 varchar(10)
Expand Down Expand Up @@ -858,6 +865,7 @@ func TestShardedAutoIncHandling(t *testing.T) {
},
},
expectQueries: []string{ // auto_increment clause removed
validateEmptyTableQuery,
fmt.Sprintf(`create table %s (
id int not null primary key,
c1 varchar(10)
Expand Down Expand Up @@ -943,6 +951,7 @@ func TestMoveTablesNoRoutingRules(t *testing.T) {
// a circular dependency.
// The TabletManager portion is tested in rpc_vreplication_test.go.
env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{})
env.tmc.expectVRQuery(200, getNonEmptyTable, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzGetCopyState, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzGetLatestCopyState, &sqltypes.Result{})

Expand Down Expand Up @@ -1061,6 +1070,7 @@ func TestCreateLookupVindexFull(t *testing.T) {
}

env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{})
env.tmc.expectVRQuery(200, "select 1 from `lookup` limit 1", &sqltypes.Result{})
env.tmc.expectVRQuery(200, "/CREATE TABLE `lookup`", &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzGetCopyState, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzGetLatestCopyState, &sqltypes.Result{})
Expand Down Expand Up @@ -2777,7 +2787,10 @@ func TestCreateLookupVindexFailures(t *testing.T) {
},
},
},
vrepExecQueries: []string{"CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)"},
vrepExecQueries: []string{
"select 1 from `t1_lkp` limit 1",
"CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)",
},
createRequest: &createVReplicationWorkflowRequestResponse{
req: nil, // We don't care about defining it in this case
res: &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{},
Expand Down Expand Up @@ -3096,6 +3109,7 @@ func TestKeyRangesEqualOptimization(t *testing.T) {
if tablet.Keyspace != targetKs || tablet.Type != topodatapb.TabletType_PRIMARY {
continue
}
env.tmc.expectVRQuery(int(tablet.Alias.Uid), getNonEmptyTable, &sqltypes.Result{})
// If we are doing a partial MoveTables, we will only perform the workflow
// stream creation / INSERT statment on the shard(s) we're migrating.
if len(tc.moveTablesReq.SourceShards) > 0 && !slices.Contains(tc.moveTablesReq.SourceShards, tablet.Shard) {
Expand Down
58 changes: 58 additions & 0 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"strings"
"sync"

"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/mysql/sqlerror"
Expand Down Expand Up @@ -1019,3 +1021,59 @@ func applyTargetShards(ts *trafficSwitcher, targetShards []string) error {
}
return nil
}

// validateEmptyTables checks if all specified tables are empty across specified shards.
// It queries each shard's primary tablet and if any non-empty table is found, it returns an error
// containing a list of non-empty tables.
func validateEmptyTables(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, shards []*topo.ShardInfo, tableSettings []*vtctldatapb.TableMaterializeSettings) error {
var mu sync.Mutex
isTableFaulty := map[string]bool{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change terminology from "Faulty" to "NonEmpty" like hasTableData or isNonEmpty. Similarly faultyTables => nonEmptyTables or tablesWithData ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


err := forAllShards(shards, func(shard *topo.ShardInfo) error {
primary := shard.PrimaryAlias
if primary == nil {
return fmt.Errorf("shard does not have a primary: %v", shard.ShardName())
mattlord marked this conversation as resolved.
Show resolved Hide resolved
}

ti, err := ts.GetTablet(ctx, primary)
if err != nil {
return err
}

eg, groupCtx := errgroup.WithContext(ctx)
eg.SetLimit(20)

for _, ts := range tableSettings {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth adding the shard and table to the errors in this loop as there could be 1,000 shards and you would want to know what specific shard had a table with data in it, and e.g. what shard and table we encountered an error in processing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do this in a follow-up PR?

eg.Go(func() error {
query := fmt.Sprintf("select 1 from `%s` limit 1", ts.TargetTable)
mattlord marked this conversation as resolved.
Show resolved Hide resolved
res, err := tmc.ExecuteFetchAsDba(groupCtx, ti.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
mattlord marked this conversation as resolved.
Show resolved Hide resolved
Query: []byte(query),
MaxRows: 1,
})
// Ignore table not found error
if err != nil && !IsTableDidNotExistError(err) {
return err
}
if res != nil && len(res.Rows) > 0 {
mu.Lock()
isTableFaulty[ts.TargetTable] = true
mu.Unlock()
}
return nil
})
}
if err = eg.Wait(); err != nil {
return err
}
return nil
})
if err != nil {
return err
}

faultyTables := maps.Keys(isTableFaulty)
if len(faultyTables) > 0 {
return fmt.Errorf("non-empty tables found in target keyspace: %s", strings.Join(faultyTables, ", "))
}
return nil
}
125 changes: 124 additions & 1 deletion go/vt/vtctl/workflow/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,21 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"

"vitess.io/vitess/go/testfiles"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/vtctldata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/etcd2topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/grpcvtctldserver/testutil"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtctldata"
)

// TestCreateDefaultShardRoutingRules confirms that the default shard routing rules are created correctly for sharded
Expand Down Expand Up @@ -243,3 +248,121 @@ func startEtcd(t *testing.T) string {

return clientAddr
}

func TestValidateEmptyTables(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ts := memorytopo.NewServer(ctx, "zone1")
defer ts.Close()

ks := "test_keyspace"
shard1 := "-40"
shard2 := "40-80"
shard3 := "80-"
err := ts.CreateKeyspace(ctx, ks, &topodatapb.Keyspace{})
require.NoError(t, err)

err = ts.CreateShard(ctx, ks, shard1)
require.NoError(t, err)
err = ts.CreateShard(ctx, ks, shard2)
require.NoError(t, err)
err = ts.CreateShard(ctx, ks, shard3)
require.NoError(t, err)

tablet1 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
Keyspace: ks,
Shard: shard1,
}
tablet2 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 200,
},
Keyspace: ks,
Shard: shard2,
}
tablet3 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 300,
},
Keyspace: ks,
Shard: shard3,
}
err = ts.CreateTablet(ctx, tablet1)
require.NoError(t, err)
err = ts.CreateTablet(ctx, tablet2)
require.NoError(t, err)
err = ts.CreateTablet(ctx, tablet3)
require.NoError(t, err)

s1, err := ts.UpdateShardFields(ctx, ks, shard1, func(si *topo.ShardInfo) error {
si.Shard.PrimaryAlias = tablet1.Alias
return nil
})
require.NoError(t, err)
s2, err := ts.UpdateShardFields(ctx, ks, shard2, func(si *topo.ShardInfo) error {
si.Shard.PrimaryAlias = tablet2.Alias
return nil
})
require.NoError(t, err)
s3, err := ts.UpdateShardFields(ctx, ks, shard3, func(si *topo.ShardInfo) error {
si.Shard.PrimaryAlias = tablet3.Alias
return nil
})
require.NoError(t, err)

tmc := &testutil.TabletManagerClient{
ExecuteFetchAsDbaResults: map[string]struct {
Response *querypb.QueryResult
Error error
}{
"zone1-0000000100": {
Response: &querypb.QueryResult{
Rows: []*querypb.Row{{
Lengths: []int64{1},
Values: []byte("1"),
},
},
},
},
"zone1-0000000200": {
Response: &querypb.QueryResult{
Rows: []*querypb.Row{{
Lengths: []int64{1},
Values: []byte("1"),
},
},
},
},
"zone1-0000000300": {
Response: &querypb.QueryResult{
Rows: []*querypb.Row{{}},
},
},
},
}

tableSettings := []*vtctldata.TableMaterializeSettings{
{
TargetTable: "table1",
},
{
TargetTable: "table2",
},
{
TargetTable: "table3",
},
}
err = validateEmptyTables(ctx, ts, tmc, []*topo.ShardInfo{s1, s2, s3}, tableSettings)
assert.ErrorContains(t, err, "table1")
assert.ErrorContains(t, err, "table2")
assert.ErrorContains(t, err, "table3")

err = validateEmptyTables(ctx, ts, tmc, []*topo.ShardInfo{s1, s2, s3}, []*vtctldata.TableMaterializeSettings{})
assert.NoError(t, err, "should not throw any error for empty table settings slice")
}
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,8 @@ func (tmc *fakeTMClient) VReplicationExec(ctx context.Context, tablet *topodatap
}
for qry, res := range tmc.vreQueries[int(tablet.Alias.Uid)] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you need to make this change from regexp.MustCompile(qry) to regexp.MustCompile(qry[1:]) ?
It is not correct afaik: this is supposed to allow regexp queries, so the / at the beginning should be required.

Your second fix for matching query instead of qry is a good catch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to remove the / at the beginning, for regexp in go, we don't need that for regexp matching. This was probably a mistake. We already have similar logic in our codebase: e.g. https://github.com/vitessio/vitess/blob/main/go/vt/vtctl/workflow/framework_test.go#L450-L463

if strings.HasPrefix(qry, "/") {
re := regexp.MustCompile(qry)
if re.MatchString(qry) {
re := regexp.MustCompile(qry[1:])
if re.MatchString(query) {
return res, nil
}
}
Expand Down
Loading
Loading