Skip to content

Commit

Permalink
增加对非row格式binlog的支持
Browse files Browse the repository at this point in the history
  • Loading branch information
danny lai committed Apr 28, 2019
1 parent 7415920 commit 9990eb6
Show file tree
Hide file tree
Showing 13 changed files with 609 additions and 221 deletions.
33 changes: 24 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# 简介
binlog_rollback实现了基于row格式binlog的回滚闪回功能,让误删除或者误更新数据,可以不停机不使用备份而快速回滚误操作。
binlog_rollback也可以解释binlog(支持非row格式binlog)生成易读的SQL,让查找问题如什么时个某个表的某个字段的值被更新成了1,或者找出某个时间内某个表的所有删除操作等问题变得简单。
binlog_rollback可以按配置输出各个表的update/insert/delete统计报表, 也会输出大事务与长事务的分析, 应用是否干了坏事一目了然, 也会输出所有DDL。
binlog_rollback通过解释mysql/mariadb binlog/relaylog实现以下三大功能:
1)flashback/闪回/回滚, DML回滚到任意时间或者位置。
生成的文件名为rollback.xxx.sql或者db.tb.rollback.xxx.sql
生成的SQL形式如下
```sql
begin
Expand All @@ -9,33 +13,39 @@
commit
```
2)前滚,把binlog/relaylog的DML解释成易读的SQL语句。
*支持非row格式的binlog, 默认不解释非row格式的DML, 需要指定参数-stsql
生成的文件名为forward.xxx.sql或者db.tb.forward.xxx.sql
生成的SQL形式如下
```sql
begin
# datetime=2017-10-23_00:14:28 database=danny table=emp binlog=mysql-bin.000012 startpos=417 stoppos=575
INSERT INTO `danny`.`emp` (`id`,`name`,`sr`,`icon`,`points`,`sa`,`sex`) VALUES (1,'张三1','华南理工大学&SCUT',X'89504e47',1.1,1.1,1)
commit
```
3)统计分析, 统计各个表的DML情况, 找出大事务与长事务。
3)统计分析, 输出各个表的DML统计, 输出大事务与长事务, 输出所有的DDL
DML统计结果文件:binlog_stats.txt
大事务与长事务结果文件:big_long_trx.txt
DDL结果文件:ddl_info.txt
![DML统计](https://github.com/GoDannyLai/binlog_rollback/raw/master/misc/img/dml_report.png)
![大事务与长事务](https://github.com/GoDannyLai/binlog_rollback/raw/master/misc/img/long_big_trx.png)
4) 输出DDL与原始SQL(5.7)
![DDL信息](https://github.com/GoDannyLai/binlog_rollback/raw/master/misc/img/ddl_info.png)
4) 输出row格式下的原始SQL(5.7)
结果文件名为original_sql.binlogxxx.sql
![原始SQL](https://github.com/GoDannyLai/binlog_rollback/raw/master/misc/img/org_sql.png)

*以上功能均可指定任意的单库多库, 单表多表, 任意时间点, 任意binlog位置。
*支持mysql5.5及以上,也支持mariadb的binlog, 支持传统复制的binlog, 也支持GTID的binlog。
*支持直接指定文件路径的binlog, 也支持主从复制, binlog_rollback作为从库从主库拉binlog来过解释。
*也支持目标binlog中包含了DDL(增加与减少表字段, 变化表字位置)的场景。
# 限制
*binlog格式必须为row,且binlog_row_image=full
*使用回滚/闪回功能时,binlog格式必须为row,且binlog_row_image=full, 其它功能支持非row格式binlog
*只能回滚DML, 不能回滚DDL
*支持V4格式的binlog, V3格式的没测试过
*支持V4格式的binlog, V3格式的没测试过,测试与使用结果显示,mysql5.1,mysql5.5, mysql5.6与mysql5.7的binlog均支持
*支持指定-tl时区来解释binlog中time/datetime字段的内容。开始时间-sdt与结束时间-edt也会使用此指定的时区,
但注意此开始与结束时间针对的是binlog event header中保存的unix timestamp。结果中的额外的datetime时间信息都是binlog event header中的unix timestamp
*decimal字段使用float64来表示, 但不损失精度
*所有字符类型字段内容按golang的utf8(相当于mysql的utf8mb4)来表示
# 适用场景
# 常用场景
1)数据被误操作, 需要把某几个表的数据不停机回滚到某个时间点
2)数据异常, 帮忙从binlog中找出这个表的某些数据是什么时间修改成某些值的
3)IO高TPS高, 帮忙查出那些表在频繁更新
Expand All @@ -60,9 +70,10 @@
![](https://github.com/GoDannyLai/binlog_rollback/raw/master/misc/img/time_mysqlbinlog.png)

2) 支持V4版本的binlog, 支持传统与GTID的binlog, 支持mysql5.5与mairiadb5.5及以上版本的binlog, 也同样支持relaylog(结果中注释的信息binlog=xxx startpos=xxx stoppos=xx是对应的主库的binlog信息)
2) 支持V4版本的binlog, 支持传统与GTID的binlog, 支持mysql5.1与mairiadb5.5及以上版本的binlog, 也同样支持relaylog(结果中注释的信息binlog=xxx startpos=xxx stoppos=xx是对应的主库的binlog信息)
--mtype=mariadb
3)支持以时间及位置条件过滤, 并且支持单个以及多个连续binlog的解释。
区间范围为左闭右开, [-sxx, -exxx)
解释binlog的开始位置:
-sbin mysql-bin.000101
-spos 4
Expand All @@ -74,6 +85,7 @@
解释binlog的结束时间
-edt "2018-04-22 11:00:00"
4)支持以库及表条件过滤, 以逗号分隔
支持正则表达式,如-dbs "db\d+,db_sh\d+"。正则表达式中请使用小写字母,因为数据库名与表名会先转成小写再与正则表达式进行匹配
-dbs db1,db2
-tbs tb1,tb2
5)支持以DML类型(update,delete,insert)条件过滤
Expand Down Expand Up @@ -159,9 +171,12 @@
-r 100
对于一个insert 1000行的插入, 会生成10个insert语句,每个语句插入100行

15)支持自定义DDL语句过滤正则表达式来输出目标DDL
-de
默认为"^\s*(alter|create|rename|truncate|drop)", 大小写不敏感
15)支持非row格式binlog的解释
当-w 2sql时加上参数-stsql,则会解释非row格式的DML语句。使用的是https://github.com/pingcap/parser的SQL解释器来解释DDL与非row格式的DML。
由于不是支持所有要SQL, 如create trigger就不支持, 遇到SQL无法解释时会报错退出, 如需要跳过该SQL并继续解释, 请使用参数-ies。-ies 后接正则表达式,
解释错误或者无法解释的SQL如果匹配-ies指定的正则表达式, 则binlog_rollback不会退出而是跳过该SQL继续解释后面的binlog, 否则错误退出。
-ies后接的正则表达式请使用小写字母, 因为binlog_rollback会先把SQL转成小写再与之匹配。

16)支持目标binlog中包含DDL(增减字段,变化字段位置)的情形
binlog只保存了各个字段的位置, 并没有保存各个字段的名字。在前滚与回滚的模式下, binlog_rollback需要拿到表结构信息来生成易读的SQL, 如果表结构有变化, 那如何处理?
例如表tmp的DDL如下
Expand Down
144 changes: 122 additions & 22 deletions binlog_com.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
package main

import (
"dannytools/constvar"
"dannytools/dsql"
"dannytools/ehand"
"dannytools/logging"

//"os"
"path/filepath"
//"fmt"
"strings"
"time"

//"github.com/davecgh/go-spew/spew"

"fmt"
"sync"

"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
sliceKits "github.com/toolkits/slice"
)

type BinEventHandlingIndx struct {
Expand Down Expand Up @@ -46,13 +56,15 @@ type MyBinEvent struct {
SqlType string // insert, update, delete
Timestamp uint32
TrxIndex uint64
TrxStatus int // 0:begin, 1: commit, 2: rollback, -1: in_progress
TrxStatus int // 0:begin, 1: commit, 2: rollback, -1: in_progress
QuerySql *dsql.SqlInfo // for ddl and binlog which is not row format
OrgSql string // for ddl and binlog which is not row format
}

func CheckBinHeaderCondition(cfg *ConfCmd, header *replication.EventHeader, currentBinlog *string) int {
func CheckBinHeaderCondition(cfg *ConfCmd, header *replication.EventHeader, currentBinlog string) int {
// process: 0, continue: 1, break: 2

myPos := mysql.Position{Name: *currentBinlog, Pos: header.LogPos}
myPos := mysql.Position{Name: currentBinlog, Pos: header.LogPos}
//fmt.Println(cfg.StartFilePos, cfg.IfSetStopFilePos, myPos)
if cfg.IfSetStartFilePos {
cmpRe := myPos.Compare(cfg.StartFilePos)
Expand All @@ -63,7 +75,7 @@ func CheckBinHeaderCondition(cfg *ConfCmd, header *replication.EventHeader, curr

if cfg.IfSetStopFilePos {
cmpRe := myPos.Compare(cfg.StopFilePos)
if cmpRe == 1 {
if cmpRe >= 0 {
return C_reBreak
}
}
Expand All @@ -76,7 +88,7 @@ func CheckBinHeaderCondition(cfg *ConfCmd, header *replication.EventHeader, curr
}

if cfg.IfSetStopDateTime {
if header.Timestamp > cfg.StopDatetime {
if header.Timestamp >= cfg.StopDatetime {
return C_reBreak
}
}
Expand All @@ -85,23 +97,23 @@ func CheckBinHeaderCondition(cfg *ConfCmd, header *replication.EventHeader, curr
}

if header.EventType == replication.WRITE_ROWS_EVENTv1 || header.EventType == replication.WRITE_ROWS_EVENTv2 {
if sliceKits.ContainsString(cfg.FilterSql, "insert") {
if cfg.IsTargetDml("insert") {
return C_reProcess
} else {
return C_reContinue
}
}

if header.EventType == replication.UPDATE_ROWS_EVENTv1 || header.EventType == replication.UPDATE_ROWS_EVENTv2 {
if sliceKits.ContainsString(cfg.FilterSql, "update") {
if cfg.IsTargetDml("update") {
return C_reProcess
} else {
return C_reContinue
}
}

if header.EventType == replication.DELETE_ROWS_EVENTv1 || header.EventType == replication.DELETE_ROWS_EVENTv2 {
if sliceKits.ContainsString(cfg.FilterSql, "delete") {
if cfg.IsTargetDml("delete") {
return C_reProcess
} else {
return C_reContinue
Expand All @@ -118,11 +130,13 @@ func (this *MyBinEvent) CheckBinEvent(cfg *ConfCmd, ev *replication.BinlogEvent,
case replication.ROTATE_EVENT:
rotatEvent := ev.Event.(*replication.RotateEvent)
*currentBinlog = string(rotatEvent.NextLogName)

if cfg.ToLastLog && cfg.Mode == "repl" && cfg.WorkType == "stats" {
return C_reContinue
}
myPos.Name = string(rotatEvent.NextLogName)
myPos.Pos = uint32(rotatEvent.Position)
gLogger.WriteToLogByFieldsNormalOnlyMsg(fmt.Sprintf("log rotate %s", myPos.String()), logging.INFO)
if cfg.IfSetStartFilePos {
cmpRe := myPos.Compare(cfg.StartFilePos)
if cmpRe == -1 {
Expand All @@ -132,11 +146,12 @@ func (this *MyBinEvent) CheckBinEvent(cfg *ConfCmd, ev *replication.BinlogEvent,

if cfg.IfSetStopFilePos {
cmpRe := myPos.Compare(cfg.StopFilePos)
if cmpRe == 1 {
if cmpRe >= 0 {
return C_reBreak
}
}
this.IfRowsEvent = false
return C_reContinue

case replication.WRITE_ROWS_EVENTv1,
replication.UPDATE_ROWS_EVENTv1,
Expand All @@ -151,26 +166,111 @@ func (this *MyBinEvent) CheckBinEvent(cfg *ConfCmd, ev *replication.BinlogEvent,
wrEvent := ev.Event.(*replication.RowsEvent)
db := string(wrEvent.Table.Schema)
tb := string(wrEvent.Table.Table)
if len(cfg.Databases) > 0 {
if !sliceKits.ContainsString(cfg.Databases, db) {
return C_reContinue
}
if !cfg.IsTargetTable(db, tb) {
return C_reContinue
}
if len(cfg.Tables) > 0 {
if !sliceKits.ContainsString(cfg.Tables, tb) {
return C_reContinue
/*
if len(cfg.Databases) > 0 {
if !sliceKits.ContainsString(cfg.Databases, db) {
return C_reContinue
}
}
}
if len(cfg.Tables) > 0 {
if !sliceKits.ContainsString(cfg.Tables, tb) {
return C_reContinue
}
}
*/

this.BinEvent = wrEvent
this.IfRowsEvent = true

case replication.QUERY_EVENT:

this.IfRowsEvent = false
//queryEvent := ev.Event.(*replication.QueryEvent)
//db = string(queryEvent.Schema)
//sql = string(queryEvent.Query)

queryEvent := ev.Event.(*replication.QueryEvent)
db := string(queryEvent.Schema) // for DML/DDL, schema is not empty if connection has default database, ie use db

sqlStr := string(queryEvent.Query)
this.OrgSql = sqlStr
lowerSqlStr := strings.TrimSpace(strings.ToLower(sqlStr))

// for mysql, begin of transaction, it write a query event with sql 'BEGIN'
// for DML, row or statement, a trx is composed of gtid event, query event(BEGIN), query event/Rows_query event, xid event
// for DDL, a trx is composed of gtid event, query event
// query event may be composed of use database, DML/DDL sql
if lowerSqlStr != "begin" && lowerSqlStr != "commit" {
if db == "" && gUseDatabase != "" {
db = gUseDatabase
}
//ev.Dump(os.Stdout)
//tidb.parser not support create trigger statement
parsedResult, lastUseDb, err := dsql.ParseSqlsForSqlInfo(gSqlParser, sqlStr, db)
//spew.Dump(parsedResult)
if err != nil {
if cfg.IgnoreParsedErrRegexp != nil && cfg.IgnoreParsedErrRegexp.MatchString(lowerSqlStr) {
gLogger.WriteToLogByFieldsErrorExtramsgExitCode(err, fmt.Sprintf("\nerror to parse sql from query event, binlog=%s, time=%s, sql=%s",
myPos.String(), time.Unix(int64(ev.Header.Timestamp), 0).Format(constvar.DATETIME_FORMAT_NOSPACE), sqlStr),
logging.ERROR, ehand.ERR_ERROR)
return C_reContinue
} else {
//exit program here
gLogger.WriteToLogByFieldsErrorExtramsgExit(err, fmt.Sprintf("\nerror to parse sql from query event, binlog=%s, time=%s, sql=%s",
myPos.String(), time.Unix(int64(ev.Header.Timestamp), 0).Format(constvar.DATETIME_FORMAT_NOSPACE), sqlStr),
logging.ERROR, ehand.ERR_ERROR)
return C_reBreak
}

}

// have new use database
if lastUseDb != "" && gUseDatabase != lastUseDb {
gUseDatabase = lastUseDb
}

// skip other query event, we only care Table DDL and DML
if len(parsedResult) != 1 {
gLogger.WriteToLogByFieldsNormalOnlyMsg(fmt.Sprintf("skip it, parsed result for query event is nil or more than one(len(parsedResult)=%d), binlog=%s, time=%s, sql=%s",
len(parsedResult), myPos.String(), time.Unix(int64(ev.Header.Timestamp), 0).Format(constvar.DATETIME_FORMAT_NOSPACE), sqlStr), logging.WARNING)
return C_reContinue
}

// should only one result

if parsedResult[0].IsDml() {
if !cfg.ParseStatementSql {
//gLogger.WriteToLogByFieldsNormalOnlyMsg("not parse dml", logging.INFO)
return C_reContinue
}
if !cfg.IsTargetDml(parsedResult[0].GetDmlName()) {
//gLogger.WriteToLogByFieldsNormalOnlyMsg(fmt.Sprintf("dml %s not target", parsedResult[0].GetDmlName()), logging.INFO)
return C_reContinue
}

ifAnyTargetTable := false
for j := range parsedResult[0].Tables {
if parsedResult[0].Tables[j].Database == "" {
parsedResult[0].Tables[j].Database = db
}

if cfg.IsTargetTable(parsedResult[0].Tables[j].Database, parsedResult[0].Tables[j].Table) {
ifAnyTargetTable = true
}

}
if !ifAnyTargetTable {
//gLogger.WriteToLogByFieldsNormalOnlyMsg("not target table", logging.INFO)
return C_reContinue
}

}

this.QuerySql = parsedResult[0].Copy()
//gLogger.WriteToLogByFieldsNormalOnlyMsg("should be processed", logging.INFO)

}

case replication.XID_EVENT:
this.IfRowsEvent = false

Expand Down
Loading

0 comments on commit 9990eb6

Please sign in to comment.