Skip to content

Commit

Permalink
fix: port read_after_write fix to 0.1.x
Browse files Browse the repository at this point in the history
  • Loading branch information
earayu committed Mar 22, 2024
1 parent 15518e8 commit a1e727c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 8 deletions.
45 changes: 37 additions & 8 deletions go/test/endtoend/wesql/queries/readafterwrite_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,62 @@ package queries

import (
"fmt"
"github.com/stretchr/testify/assert"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/utils"
)

func TestReadAfterWrite_Session(t *testing.T) {
runReadAfterWriteTest(t, true, "SESSION", false, false, false)
runReadAfterWriteTest(t, true, "SESSION", false, false, false, false)
}

func TestReadAfterWrite_Session_Transaction(t *testing.T) {
runReadAfterWriteTest(t, true, "SESSION", false, true, false)
runReadAfterWriteTest(t, true, "SESSION", false, true, false, false)
}

func TestReadAfterWrite_Session_Transaction_OLAP(t *testing.T) {
runReadAfterWriteTest(t, true, "SESSION", false, true, true)
runReadAfterWriteTest(t, true, "SESSION", false, true, true, false)
}

func TestReadAfterWrite_Instance(t *testing.T) {
runReadAfterWriteTest(t, true, "INSTANCE", true, false, false)
runReadAfterWriteTest(t, true, "INSTANCE", true, false, false, false)
}

func TestReadAfterWrite_Instance_Transaction(t *testing.T) {
runReadAfterWriteTest(t, true, "INSTANCE", true, true, false)
runReadAfterWriteTest(t, true, "INSTANCE", true, true, false, false)
}

func TestReadAfterWrite_Instance_Transaction_OLAP(t *testing.T) {
runReadAfterWriteTest(t, true, "INSTANCE", true, true, true)
runReadAfterWriteTest(t, true, "INSTANCE", true, true, true, false)
}

func TestReadAfterWrite_Session_WithTimeZone(t *testing.T) {
runReadAfterWriteTest(t, true, "SESSION", false, false, false, true)
}

func TestReadAfterWrite_Session_Transaction_WithTimeZone(t *testing.T) {
runReadAfterWriteTest(t, true, "SESSION", false, true, false, true)
}

func TestReadAfterWrite_Session_Transaction_OLAP_WithTimeZone(t *testing.T) {
runReadAfterWriteTest(t, true, "SESSION", false, true, true, true)
}

func TestReadAfterWrite_Instance_WithTimeZone(t *testing.T) {
runReadAfterWriteTest(t, true, "INSTANCE", true, false, false, true)
}

func TestReadAfterWrite_Instance_Transaction_WithTimeZone(t *testing.T) {
runReadAfterWriteTest(t, true, "INSTANCE", true, true, false, true)
}

func TestReadAfterWrite_Instance_Transaction_OLAP_WithTimeZone(t *testing.T) {
runReadAfterWriteTest(t, true, "INSTANCE", true, true, true, true)
}

func TestReadAfterWrite_Global(t *testing.T) {
Expand All @@ -64,13 +89,17 @@ func TestReadAfterWrite_Session_Transaction_OLAP_CrossVTGate_And_Except_Failure(
runReadAfterWriteCrossVTGateTest(t, true, "SESSION", true, true)
}

func runReadAfterWriteTest(t *testing.T, enableReadWriteSplitting bool, readAfterWriteConsistency string, separateConn, enableTransaction bool, olap bool) {
func runReadAfterWriteTest(t *testing.T, enableReadWriteSplitting bool, readAfterWriteConsistency string, separateConn, enableTransaction bool, olap bool, setTimeZone bool) {
createDbExecDropDb(t, "readafterwrite_session_test", func(getConn func() *mysql.Conn) {
rwConn := getConn()
roConn := rwConn
if separateConn {
roConn = getConn()
}
if setTimeZone {
utils.Exec(t, rwConn, "set time_zone='+04:00'")
utils.Exec(t, roConn, "set time_zone='+04:00'")
}
execMulti(t, rwConn, "create table t1(c1 int primary key auto_increment, c2 int);insert into t1(c1, c2) values(null, 1)")

// enable read after write & enable read after write for session
Expand Down
10 changes: 10 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,10 +1002,20 @@ func (qre *QueryExecutor) txFetch(conn *StatefulConnection, record bool) (*sqlty
if err != nil {
return nil, err
}
waitGtidPrefixAdded := false
if qre.plan.PlanID == p.PlanSelect {
sql, waitGtidPrefixAdded = qre.addPrefixWaitGtid(sql)
}
// Only record successful queries.
if record {
conn.TxProperties().RecordQuery(sql)
}
if waitGtidPrefixAdded {
qr, err = qre.discardWaitGtidResponse(qr, err, conn.UnderlyingDBConn(), true)
}
if err != nil {
return nil, err
}
return qr, nil
}

Expand Down

0 comments on commit a1e727c

Please sign in to comment.