Skip to content

Commit

Permalink
Fix the bug that unsigned estimatedQueueingDuration in throttling che…
Browse files Browse the repository at this point in the history
…cker may overflow

* Use int64 instead of uint64 here
* Update wait duration to nanos

Signed-off-by: Eric Zhao <[email protected]>
  • Loading branch information
sczyh30 committed Nov 26, 2020
1 parent 7bffa66 commit 963192c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 27 deletions.
50 changes: 25 additions & 25 deletions core/flow/tc_throttling.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,36 @@ package flow
import (
"math"
"sync/atomic"
"time"

"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/util"
)

const (
BlockMsgQueueing = "flow throttling check blocked, estimated queueing time exceeds max queueing time"

MillisToNanosOffset = int64(time.Millisecond / time.Nanosecond)
)

// ThrottlingChecker limits the time interval between two requests.
type ThrottlingChecker struct {
owner *TrafficShapingController
maxQueueingTimeNs uint64
statIntervalNs uint64
lastPassedTime uint64
maxQueueingTimeNs int64
statIntervalNs int64
lastPassedTime int64
}

func NewThrottlingChecker(owner *TrafficShapingController, timeoutMs uint32, statIntervalMs uint32) *ThrottlingChecker {
var statIntervalNs uint64
var statIntervalNs int64
if statIntervalMs == 0 {
defaultIntervalMs := config.MetricStatisticIntervalMs()
if defaultIntervalMs == 0 {
defaultIntervalMs = 1000
}
statIntervalNs = uint64(defaultIntervalMs) * util.UnixTimeUnitOffset
statIntervalNs = 1000 * MillisToNanosOffset
} else {
statIntervalNs = uint64(statIntervalMs) * util.UnixTimeUnitOffset
statIntervalNs = int64(statIntervalMs) * MillisToNanosOffset
}
return &ThrottlingChecker{
owner: owner,
maxQueueingTimeNs: uint64(timeoutMs) * util.UnixTimeUnitOffset,
maxQueueingTimeNs: int64(timeoutMs) * MillisToNanosOffset,
statIntervalNs: statIntervalNs,
lastPassedTime: 0,
}
Expand Down Expand Up @@ -58,35 +60,33 @@ func (c *ThrottlingChecker) DoCheck(_ base.StatNode, batchCount uint32, threshol
return base.NewTokenResultBlocked(base.BlockTypeFlow)
}
// Here we use nanosecond so that we could control the queueing time more accurately.
curNano := util.CurrentTimeNano()
curNano := int64(util.CurrentTimeNano())

// The interval between two requests (in nanoseconds).
intervalNs := uint64(math.Ceil(float64(batchCount) / threshold * float64(c.statIntervalNs)))
intervalNs := int64(math.Ceil(float64(batchCount) / threshold * float64(c.statIntervalNs)))

// Expected pass time of this request.
expectedTime := atomic.LoadUint64(&c.lastPassedTime) + intervalNs
expectedTime := atomic.LoadInt64(&c.lastPassedTime) + intervalNs
if expectedTime <= curNano {
// Contention may exist here, but it's okay.
atomic.StoreUint64(&c.lastPassedTime, curNano)
atomic.StoreInt64(&c.lastPassedTime, curNano)
return nil
}

estimatedQueueingDuration := atomic.LoadUint64(&c.lastPassedTime) + intervalNs - util.CurrentTimeNano()
estimatedQueueingDuration := atomic.LoadInt64(&c.lastPassedTime) + intervalNs - int64(util.CurrentTimeNano())
if estimatedQueueingDuration > c.maxQueueingTimeNs {
msg := "flow throttling check blocked, estimated queueing time exceeds max queueing time"
return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, msg, rule, nil)
return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, BlockMsgQueueing, rule, nil)
}

oldTime := atomic.AddUint64(&c.lastPassedTime, intervalNs)
estimatedQueueingDuration = oldTime - util.CurrentTimeNano()
oldTime := atomic.AddInt64(&c.lastPassedTime, intervalNs)
estimatedQueueingDuration = oldTime - int64(util.CurrentTimeNano())
if estimatedQueueingDuration > c.maxQueueingTimeNs {
// Subtract the interval.
atomic.AddUint64(&c.lastPassedTime, ^(intervalNs - 1))
msg := "flow throttling check blocked, estimated queueing time exceeds max queueing time"
return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, msg, rule, nil)
atomic.AddInt64(&c.lastPassedTime, -intervalNs)
return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, BlockMsgQueueing, rule, nil)
}
if estimatedQueueingDuration > 0 {
return base.NewTokenResultShouldWait(estimatedQueueingDuration / util.UnixTimeUnitOffset)
return base.NewTokenResultShouldWait(time.Duration(estimatedQueueingDuration))
} else {
return base.NewTokenResultShouldWait(0)
}
Expand Down
4 changes: 2 additions & 2 deletions core/flow/tc_throttling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func TestThrottlingChecker_DoCheckSingleThread(t *testing.T) {
waitCount := int(float64(timeoutMs) / (float64(intervalMs) / threshold))
for i := 0; i < waitCount; i++ {
assert.True(t, resultList[i].Status() == base.ResultStatusShouldWait)
wt := resultList[i].WaitMs()
assert.InEpsilon(t, (i+1)*1000/int(waitCount), wt, 10)
wt := resultList[i].NanosToWait()
assert.InEpsilon(t, (i+1)*(int)(time.Second/time.Nanosecond)/waitCount, wt, 10)
}
for i := waitCount; i < reqCount; i++ {
assert.True(t, resultList[i].IsBlocked())
Expand Down

0 comments on commit 963192c

Please sign in to comment.