Skip to content

Commit

Permalink
Merge pull request #8 from knocknote/feature/refactor-migrator
Browse files Browse the repository at this point in the history
refactor migrate command
  • Loading branch information
goccy authored Dec 4, 2018
2 parents 4fa8b40 + e56bb82 commit a4e5450
Show file tree
Hide file tree
Showing 3 changed files with 333 additions and 238 deletions.
242 changes: 4 additions & 238 deletions cmd/octillery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package main

import (
"bufio"
"bytes"
coresql "database/sql"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"os"
Expand All @@ -17,19 +15,17 @@ import (
"strconv"
"strings"
"time"
"unicode"

flags "github.com/jessevdk/go-flags"
vtparser "github.com/knocknote/vitess-sqlparser/sqlparser"
"github.com/pkg/errors"
"github.com/schemalex/schemalex"
"github.com/schemalex/schemalex/diff"
"go.knocknote.io/octillery"
"go.knocknote.io/octillery/algorithm"
"go.knocknote.io/octillery/config"
"go.knocknote.io/octillery/connection"
_ "go.knocknote.io/octillery/connection/adapter/plugin"
"go.knocknote.io/octillery/database/sql"
"go.knocknote.io/octillery/migrator"
"go.knocknote.io/octillery/printer"
"go.knocknote.io/octillery/sqlparser"
"go.knocknote.io/octillery/transposer"
Expand Down Expand Up @@ -117,98 +113,7 @@ func (cmd *TransposeCommand) Execute(args []string) error {
return errors.WithStack(transposer.New().Transpose(pattern, searchPath, cmd.Ignore, transposeClosure))
}

type schemaTextSource string

func (s schemaTextSource) WriteSchema(dst io.Writer) error {
if _, err := io.WriteString(dst, string(s)); err != nil {
return errors.Wrap(err, `failed to copy text contents to dst`)
}
return nil
}

type serverSource struct {
conn *coresql.DB
}

// WriteSchema get normalized schema from mysql server and write it to dst.
// This method's original source code is `schemalex/source.go`
func (s *serverSource) WriteSchema(dst io.Writer) error {
db := s.conn
tableRows, err := db.Query("SHOW TABLES")
if err != nil {
return errors.Wrap(err, `failed to execute 'SHOW TABLES'`)
}
defer tableRows.Close()
parser, err := sqlparser.New()
if err != nil {
return errors.WithStack(err)
}
var table string
var tableSchema string
var buf bytes.Buffer
for tableRows.Next() {
if err = tableRows.Scan(&table); err != nil {
return errors.Wrap(err, `failed to scan tables`)
}

if err = db.QueryRow("SHOW CREATE TABLE `"+table+"`").Scan(&table, &tableSchema); err != nil {
return errors.Wrapf(err, `failed to execute 'SHOW CREATE TABLE "%s"'`, table)
}
if buf.Len() > 0 {
buf.WriteString("\n\n")
}
query, err := parser.Parse(tableSchema)
if err != nil {
return errors.WithStack(err)
}
// normalize DDL because schemalex cannot parse PARTITION option
normalizedSchema := vtparser.String(query.(*sqlparser.QueryBase).Stmt)
buf.WriteString(normalizedSchema)
buf.WriteByte(';')
}

return errors.WithStack(schemalex.NewReaderSource(&buf).WriteSchema(dst))
}

func (cmd *MigrateCommand) compareSchema(from schemalex.SchemaSource, to schemalex.SchemaSource) (string, error) {
var buf bytes.Buffer
p := schemalex.New()
if err := diff.Sources(
&buf,
from,
to,
diff.WithTransaction(false), diff.WithParser(p),
); err != nil {
return "", errors.WithStack(err)
}
return buf.String(), nil
}

// CompareResult type for results of comparing schema
type CompareResult struct {
diff string
dsn string
conn *coresql.DB
}

// CombinedQuery has all `sqlparser.Query` for a DNS
type CombinedQuery struct {
queries []sqlparser.Query
conn *coresql.DB
}

func (c *CombinedQuery) allDDL() string {
allDDL := []string{}
for _, query := range c.queries {
// normalize DDL because schemalex cannot parse PARTITION option
normalizedDDL := vtparser.String(query.(*sqlparser.QueryBase).Stmt)
allDDL = append(allDDL, normalizedDDL)
}
return strings.Join(allDDL, ";\n")
}

// Execute executes migrate command
// nolint: gocyclo
func (cmd *MigrateCommand) Execute(args []string) error {
if len(args) == 0 {
return errors.New("argument is required. it is path to directory includes schema file or direct path to schema file")
Expand All @@ -217,151 +122,12 @@ func (cmd *MigrateCommand) Execute(args []string) error {
return errors.WithStack(err)
}

schamePath := args[0]
parser, err := sqlparser.New()
schemaPath := args[0]
migrator, err := migrator.NewMigrator("mysql", cmd.DryRun, cmd.Quiet)
if err != nil {
return errors.WithStack(err)
}
tableNameToOriginalQueryMap := map[string]sqlparser.Query{}
queries := []sqlparser.Query{}
if err := filepath.Walk(schamePath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return errors.WithStack(err)
}
if info.IsDir() {
return nil
}
schema, err := ioutil.ReadFile(path)
if err != nil {
return errors.WithStack(err)
}
query, err := parser.Parse(string(schema))
if err != nil {
return errors.WithStack(err)
}
tableNameToOriginalQueryMap[query.Table()] = query
queries = append(queries, query)
return nil
}); err != nil {
return errors.WithStack(err)
}

mgr, err := connection.NewConnectionManager()
if err != nil {
return errors.WithStack(err)
}
dsnToQueryMap := map[string]*CombinedQuery{}
for _, query := range queries {
conn, err := mgr.ConnectionByTableName(query.Table())
if err != nil {
return errors.WithStack(err)
}
if conn.IsShard {
for _, shard := range conn.ShardConnections.AllShard() {
cfg := conn.Config.ShardConfigByName(shard.ShardName)
dsn := fmt.Sprintf("%s/%s", cfg.Masters[0], cfg.NameOrPath)
if _, exists := dsnToQueryMap[dsn]; exists {
dsnToQueryMap[dsn].queries = append(dsnToQueryMap[dsn].queries, query)
} else {
dsnToQueryMap[dsn] = &CombinedQuery{
queries: []sqlparser.Query{query},
conn: shard.Connection,
}
}
}
} else {
cfg := conn.Config
dsn := fmt.Sprintf("%s/%s", cfg.Masters[0], cfg.NameOrPath)
if _, exists := dsnToQueryMap[dsn]; exists {
dsnToQueryMap[dsn].queries = append(dsnToQueryMap[dsn].queries, query)
} else {
dsnToQueryMap[dsn] = &CombinedQuery{
queries: []sqlparser.Query{query},
conn: conn.Connection,
}
}
}
}
results := []*CompareResult{}
for dsn, combinedQuery := range dsnToQueryMap {
allDDL := combinedQuery.allDDL()
fromSource := &serverSource{
conn: combinedQuery.conn,
}
diff, err := cmd.compareSchema(fromSource, schemaTextSource(allDDL))
if err != nil {
return errors.WithStack(err)
}
if len(diff) == 0 {
continue
}

replacedDDL := []string{}
splittedDDL := strings.Split(diff, ";")
for _, ddl := range splittedDDL {
if ddl == "" || ddl == "\n" {
continue
}
if !strings.HasPrefix(ddl, "CREATE TABLE") {
replacedDDL = append(replacedDDL, ddl+";")
continue
}

// If diff is `CREATE TABLE` statement, use original DDL ( not eliminated PARTITION option )
stmt, err := parser.Parse(ddl)
if err != nil {
return errors.WithStack(err)
}
tableName := stmt.Table()
query := tableNameToOriginalQueryMap[tableName]
replacedDDL = append(replacedDDL, query.(*sqlparser.QueryBase).Text)
}
results = append(results, &CompareResult{
diff: strings.Join(replacedDDL, "\n"),
dsn: dsn,
conn: combinedQuery.conn,
})
}
if cmd.DryRun {
if len(results) > 0 {
for _, result := range results {
if result.diff == "" || result.diff == "\n" {
continue
}
fmt.Printf("[ %s ]\n\n", result.dsn)
for _, diff := range strings.Split(result.diff, ";") {
trimmedDiff := strings.TrimFunc(diff, func(r rune) bool {
return unicode.IsSpace(r)
})
if trimmedDiff == "" {
continue
}
fmt.Printf("%s\n\n", trimmedDiff)
}
}
}
} else {
for _, result := range results {
if !cmd.Quiet {
fmt.Printf("[ %s ]\n\n", result.dsn)
}
for _, diff := range strings.Split(result.diff, ";") {
trimmedDiff := strings.TrimFunc(diff, func(r rune) bool {
return unicode.IsSpace(r)
})
if trimmedDiff == "" {
continue
}
if !cmd.Quiet {
fmt.Printf("%s\n\n", trimmedDiff)
}
if _, err := result.conn.Exec(trimmedDiff); err != nil {
return errors.WithStack(err)
}
}
}
}
return nil
return errors.WithStack(migrator.Migrate(schemaPath))
}

func (cmd *ImportCommand) schemaFromTableName(tableName string) (vtparser.Statement, error) {
Expand Down
Loading

0 comments on commit a4e5450

Please sign in to comment.