-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Avoid creation of workflows with non-empty tables in target keyspace #16874
Changes from 7 commits
e1bff4b
d0c2b5c
6c454ac
1f30481
6561442
bd4d559
483841d
786bd7c
92412cc
10b5a45
492406e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -290,6 +290,15 @@ func (mz *materializer) deploySchema() error { | |
} | ||
} | ||
|
||
// Check if any table being moved is already non-empty in the target keyspace. | ||
// Skip this check for multi-tenant migrations. | ||
if !mz.IsMultiTenantMigration() { | ||
err := validateEmptyTables(mz.ctx, mz.ts, mz.tmc, mz.targetShards, mz.ms.TableSettings) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're passing in a lot of mz members, which tells me it's probably better to have mz as the method receiver, no?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought about this too, but went with the other approach anyway. Refactored the function with the receiver, it looks much cleaner now! |
||
if err != nil { | ||
return err | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
err := forAllShards(mz.targetShards, func(target *topo.ShardInfo) error { | ||
allTables := []string{"/.*/"} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,8 @@ import ( | |
"strings" | ||
"sync" | ||
|
||
"golang.org/x/exp/maps" | ||
"golang.org/x/sync/errgroup" | ||
"google.golang.org/protobuf/encoding/prototext" | ||
|
||
"vitess.io/vitess/go/mysql/sqlerror" | ||
|
@@ -1019,3 +1021,59 @@ func applyTargetShards(ts *trafficSwitcher, targetShards []string) error { | |
} | ||
return nil | ||
} | ||
|
||
// validateEmptyTables checks if all specified tables are empty across specified shards. | ||
// It queries each shard's primary tablet and if any non-empty table is found, it returns an error | ||
// containing a list of non-empty tables. | ||
func validateEmptyTables(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, shards []*topo.ShardInfo, tableSettings []*vtctldatapb.TableMaterializeSettings) error { | ||
var mu sync.Mutex | ||
isTableFaulty := map[string]bool{} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change terminology from "Faulty" to "NonEmpty" like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
||
err := forAllShards(shards, func(shard *topo.ShardInfo) error { | ||
primary := shard.PrimaryAlias | ||
if primary == nil { | ||
return fmt.Errorf("shard does not have a primary: %v", shard.ShardName()) | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
ti, err := ts.GetTablet(ctx, primary) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
eg, groupCtx := errgroup.WithContext(ctx) | ||
eg.SetLimit(20) | ||
|
||
for _, ts := range tableSettings { | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be worth adding the shard and table to the errors in this loop as there could be 1,000 shards and you would want to know what specific shard had a table with data in it, and e.g. what shard and table we encountered an error in processing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we do this in a follow-up PR? |
||
eg.Go(func() error { | ||
query := fmt.Sprintf("select 1 from `%s` limit 1", ts.TargetTable) | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
res, err := tmc.ExecuteFetchAsDba(groupCtx, ti.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{ | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Query: []byte(query), | ||
MaxRows: 1, | ||
}) | ||
// Ignore table not found error | ||
if err != nil && !IsTableDidNotExistError(err) { | ||
return err | ||
} | ||
if res != nil && len(res.Rows) > 0 { | ||
mu.Lock() | ||
isTableFaulty[ts.TargetTable] = true | ||
mu.Unlock() | ||
} | ||
return nil | ||
}) | ||
} | ||
if err = eg.Wait(); err != nil { | ||
return err | ||
} | ||
return nil | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
faultyTables := maps.Keys(isTableFaulty) | ||
if len(faultyTables) > 0 { | ||
return fmt.Errorf("non-empty tables found in target keyspace: %s", strings.Join(faultyTables, ", ")) | ||
} | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -480,8 +480,8 @@ func (tmc *fakeTMClient) VReplicationExec(ctx context.Context, tablet *topodatap | |
} | ||
for qry, res := range tmc.vreQueries[int(tablet.Alias.Uid)] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did you need to make this change from Your second fix for matching There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to remove the |
||
if strings.HasPrefix(qry, "/") { | ||
re := regexp.MustCompile(qry) | ||
if re.MatchString(qry) { | ||
re := regexp.MustCompile(qry[1:]) | ||
if re.MatchString(query) { | ||
return res, nil | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A nice future addition would be to check for any existing tenant data in each table on the target too.