Skip to content

Commit

Permalink
Ensure that rowstreamer uses optimal index when possible
Browse files Browse the repository at this point in the history
This is the index that should contain the columns specified in the
ORDER BY clause and allows us to leverage the record ordering in
the index and avoid having to do a filesort.

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Aug 30, 2023
1 parent 99271d4 commit 7e10b5d
Show file tree
Hide file tree
Showing 13 changed files with 315 additions and 190 deletions.
4 changes: 2 additions & 2 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,8 @@ func (fmd *FakeMysqlDaemon) GetPrimaryKeyColumns(ctx context.Context, dbName, ta
}

// GetPrimaryKeyEquivalentColumns is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, error) {
return []string{}, nil
func (fmd *FakeMysqlDaemon) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, string, error) {
return []string{}, "", nil
}

// PreflightSchemaChange is part of the MysqlDaemon interface
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type MysqlDaemon interface {
GetSchema(ctx context.Context, dbName string, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error)
GetColumns(ctx context.Context, dbName, table string) ([]*querypb.Field, []string, error)
GetPrimaryKeyColumns(ctx context.Context, dbName, table string) ([]string, error)
GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, error)
GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, string, error)
PreflightSchemaChange(ctx context.Context, dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error)
ApplySchemaChange(ctx context.Context, dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error)

Expand Down
12 changes: 7 additions & 5 deletions go/vt/mysqlctl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,16 +577,16 @@ func (mysqld *Mysqld) ApplySchemaChange(ctx context.Context, dbName string, chan
// defined PRIMARY KEY then it may return the columns for
// that index if it is likely the most efficient one amongst
// the available PKE indexes on the table.
func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, error) {
func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName, table string) ([]string, string, error) {
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
if err != nil {
return nil, err
return nil, "", err
}
defer conn.Recycle()

// We use column name aliases to guarantee lower case for our named results.
sql := `
SELECT COLUMN_NAME AS column_name FROM information_schema.STATISTICS AS index_cols INNER JOIN
SELECT index_cols.COLUMN_NAME AS column_name, index_cols.INDEX_NAME as index_name FROM information_schema.STATISTICS AS index_cols INNER JOIN
(
SELECT stats.INDEX_NAME, SUM(
CASE LOWER(cols.DATA_TYPE)
Expand Down Expand Up @@ -629,15 +629,17 @@ func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName
sql = fmt.Sprintf(sql, encodedDbName, encodedTable, encodedDbName, encodedTable, encodedDbName, encodedTable)
qr, err := conn.ExecuteFetch(sql, 1000, true)
if err != nil {
return nil, err
return nil, "", err
}

named := qr.Named()
cols := make([]string, len(qr.Rows))
index := ""
for i, row := range named.Rows {
cols[i] = row.AsString("column_name", "")
index = row.AsString("index_name", "")
}
return cols, err
return cols, index, err
}

// tableDefinitions is a sortable collection of table definitions
Expand Down
283 changes: 148 additions & 135 deletions go/vt/proto/binlogdata/binlogdata.pb.go

Large diffs are not rendered by default.

43 changes: 43 additions & 0 deletions go/vt/proto/binlogdata/binlogdata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func (vr *vreplicator) buildColInfoMap(ctx context.Context) (map[string][]*Colum
pks = td.PrimaryKeyColumns
} else {
// Use a PK equivalent if one exists
if pks, err = vr.mysqld.GetPrimaryKeyEquivalentColumns(ctx, vr.dbClient.DBName(), td.Name); err != nil {
if pks, _, err = vr.mysqld.GetPrimaryKeyEquivalentColumns(ctx, vr.dbClient.DBName(), td.Name); err != nil {
return nil, err
}
// Fall back to using every column in the table if there's no PK or PKE
Expand Down
61 changes: 35 additions & 26 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"encoding/json"
"fmt"
"reflect"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -90,30 +89,34 @@ func TestRecalculatePKColsInfoByColumnNames(t *testing.T) {
func TestPrimaryKeyEquivalentColumns(t *testing.T) {
ctx := context.Background()
tests := []struct {
name string
table string
ddl string
want []string
wantErr bool
name string
table string
ddl string
wantCols []string
wantIndex string
wantErr bool
}{
{
name: "WITHPK",
table: "withpk_t",
ddl: `CREATE TABLE withpk_t (pkid INT NOT NULL AUTO_INCREMENT, col1 VARCHAR(25),
PRIMARY KEY (pkid))`,
want: []string{"pkid"},
wantCols: []string{"pkid"},
wantIndex: "PRIMARY",
},
{
name: "0PKE",
table: "zeropke_t",
ddl: `CREATE TABLE zeropke_t (id INT NULL, col1 VARCHAR(25), UNIQUE KEY (id))`,
want: []string{},
name: "0PKE",
table: "zeropke_t",
ddl: `CREATE TABLE zeropke_t (id INT NULL, col1 VARCHAR(25), UNIQUE KEY (id))`,
wantCols: []string{},
wantIndex: "",
},
{
name: "1PKE",
table: "onepke_t",
ddl: `CREATE TABLE onepke_t (id INT NOT NULL, col1 VARCHAR(25), UNIQUE KEY (id))`,
want: []string{"id"},
name: "1PKE",
table: "onepke_t",
ddl: `CREATE TABLE onepke_t (id INT NOT NULL, col1 VARCHAR(25), UNIQUE KEY (id))`,
wantCols: []string{"id"},
wantIndex: "id",
},
{
name: "3MULTICOL1PKE",
Expand All @@ -122,64 +125,70 @@ func TestPrimaryKeyEquivalentColumns(t *testing.T) {
col3 VARCHAR(25) NOT NULL, col4 VARCHAR(25), UNIQUE KEY c4_c2_c1 (col4, col2, col1),
UNIQUE KEY c1_c2 (col1, col2), UNIQUE KEY c1_c2_c4 (col1, col2, col4),
KEY nc1_nc2 (col1, col2))`,
want: []string{"col1", "col2"},
wantCols: []string{"col1", "col2"},
wantIndex: "c1_c2",
},
{
name: "3MULTICOL2PKE",
table: "twomcpke_t",
ddl: `CREATE TABLE twomcpke_t (col1 VARCHAR(25) NOT NULL, col2 VARCHAR(25) NOT NULL,
col3 VARCHAR(25) NOT NULL, col4 VARCHAR(25), UNIQUE KEY (col4), UNIQUE KEY c4_c2_c1 (col4, col2, col1),
UNIQUE KEY c1_c2_c3 (col1, col2, col3), UNIQUE KEY c1_c2 (col1, col2))`,
want: []string{"col1", "col2"},
wantCols: []string{"col1", "col2"},
wantIndex: "c1_c2",
},
{
name: "1INTPKE1CHARPKE",
table: "oneintpke1charpke_t",
ddl: `CREATE TABLE oneintpke1charpke_t (col1 VARCHAR(25) NOT NULL, col2 VARCHAR(25) NOT NULL,
col3 VARCHAR(25) NOT NULL, id1 INT NOT NULL, id2 INT NOT NULL,
UNIQUE KEY c1_c2 (col1, col2), UNIQUE KEY id1_id2 (id1, id2))`,
want: []string{"id1", "id2"},
wantCols: []string{"id1", "id2"},
wantIndex: "id1_id2",
},
{
name: "INTINTVSVCHAR",
table: "twointvsvcharpke_t",
ddl: `CREATE TABLE twointvsvcharpke_t (col1 VARCHAR(25) NOT NULL, id1 INT NOT NULL, id2 INT NOT NULL,
UNIQUE KEY c1 (col1), UNIQUE KEY id1_id2 (id1, id2))`,
want: []string{"id1", "id2"},
wantCols: []string{"id1", "id2"},
wantIndex: "id1_id2",
},
{
name: "TINYINTVSBIGINT",
table: "tinyintvsbigint_t",
ddl: `CREATE TABLE tinyintvsbigint_t (tid1 TINYINT NOT NULL, id1 INT NOT NULL,
UNIQUE KEY tid1 (tid1), UNIQUE KEY id1 (id1))`,
want: []string{"tid1"},
wantCols: []string{"tid1"},
wantIndex: "tid1",
},
{
name: "VCHARINTVSINT2VARCHAR",
table: "vcharintvsinttwovchar_t",
ddl: `CREATE TABLE vcharintvsinttwovchar_t (id1 INT NOT NULL, col1 VARCHAR(25) NOT NULL, col2 VARCHAR(25) NOT NULL,
UNIQUE KEY col1_id1 (col1, id1), UNIQUE KEY id1_col1_col2 (id1, col1, col2))`,
want: []string{"col1", "id1"},
wantCols: []string{"col1", "id1"},
wantIndex: "col1_id1",
},
{
name: "VCHARVSINT3",
table: "vcharvsintthree_t",
ddl: `CREATE TABLE vcharvsintthree_t (id1 INT NOT NULL, id2 INT NOT NULL, id3 INT NOT NULL, col1 VARCHAR(50) NOT NULL,
UNIQUE KEY col1 (col1), UNIQUE KEY id1_id2_id3 (id1, id2, id3))`,
want: []string{"id1", "id2", "id3"},
wantCols: []string{"id1", "id2", "id3"},
wantIndex: "id1_id2_id3",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.NoError(t, env.Mysqld.ExecuteSuperQuery(ctx, tt.ddl))
got, err := env.Mysqld.GetPrimaryKeyEquivalentColumns(ctx, env.Dbcfgs.DBName, tt.table)
got, indexName, err := env.Mysqld.GetPrimaryKeyEquivalentColumns(ctx, env.Dbcfgs.DBName, tt.table)
if (err != nil) != tt.wantErr {
t.Errorf("Mysqld.GetPrimaryKeyEquivalentColumns() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Mysqld.GetPrimaryKeyEquivalentColumns() = %v, want %v", got, tt.want)
}
require.Equalf(t, got, tt.wantCols, "Mysqld.GetPrimaryKeyEquivalentColumns() columns = %v, want %v", got, tt.wantCols)
require.Equalf(t, indexName, tt.wantIndex, "Mysqld.GetPrimaryKeyEquivalentColumns() index = %v, want %v", indexName, tt.wantIndex)
})
}
}
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,10 +547,13 @@ func (vse *Engine) getMySQLEndpoint(ctx context.Context, db dbconfigs.Connector)
// and maps the column names to field indexes in the MinimalTable struct.
func (vse *Engine) mapPKEquivalentCols(ctx context.Context, table *binlogdatapb.MinimalTable) ([]int, error) {
mysqld := mysqlctl.NewMysqld(vse.env.Config().DB)
pkeColNames, err := mysqld.GetPrimaryKeyEquivalentColumns(ctx, vse.env.Config().DB.DBName, table.Name)
pkeColNames, indexName, err := mysqld.GetPrimaryKeyEquivalentColumns(ctx, vse.env.Config().DB.DBName, table.Name)
if err != nil {
return nil, err
}
if len(pkeColNames) > 0 && indexName != "" {
table.PKIndex = indexName
}
pkeCols := make([]int, len(pkeColNames))
matches := 0
for n, field := range table.Fields {
Expand Down
16 changes: 13 additions & 3 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (rs *rowStreamer) buildPlan() error {
if err != nil {
return err
}
rs.sendQuery, err = rs.buildSelect()
rs.sendQuery, err = rs.buildSelect(st)
if err != nil {
return err
}
Expand Down Expand Up @@ -220,6 +220,8 @@ func (rs *rowStreamer) buildPKColumns(st *binlogdatapb.MinimalTable) ([]int, err
pkColumns[i] = i
}
return pkColumns, nil
} else {
st.PKIndex = "PRIMARY"
}
for _, pk := range st.PKColumns {
if pk >= int64(len(st.Fields)) {
Expand All @@ -230,7 +232,7 @@ func (rs *rowStreamer) buildPKColumns(st *binlogdatapb.MinimalTable) ([]int, err
return pkColumns, nil
}

func (rs *rowStreamer) buildSelect() (string, error) {
func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error) {
buf := sqlparser.NewTrackedBuffer(nil)
// We could have used select *, but being explicit is more predictable.
buf.Myprintf("select ")
Expand All @@ -245,7 +247,15 @@ func (rs *rowStreamer) buildSelect() (string, error) {
}
prefix = ", "
}
buf.Myprintf(" from %v", sqlparser.NewIdentifierCS(rs.plan.Table.Name))
// If we know the index name that we should be using then tell MySQL
// to use it if possible. This helps to ensure that we are able to
// leverage the ordering from the index itself and avoid having to
// do a FILESORT of all the results.
var indexHint string
if st.PKIndex != "" {
indexHint = fmt.Sprintf(" force index (`%s`)", st.PKIndex)
}
buf.Myprintf(" from %v%s", sqlparser.NewIdentifierCS(rs.plan.Table.Name), indexHint)
if len(rs.lastpk) != 0 {
if len(rs.lastpk) != len(rs.pkColumns) {
return "", fmt.Errorf("primary key values don't match length: %v vs %v", rs.lastpk, rs.pkColumns)
Expand Down
Loading

0 comments on commit 7e10b5d

Please sign in to comment.