Skip to content

Commit

Permalink
Change WRR LB to ensure even pending requests
Browse files Browse the repository at this point in the history
  • Loading branch information
horkhe committed Apr 25, 2024
1 parent 98234a2 commit db23305
Show file tree
Hide file tree
Showing 2 changed files with 315 additions and 408 deletions.
272 changes: 148 additions & 124 deletions pkg/server/service/loadbalancer/wrr/wrr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,21 @@ import (
"container/heap"
"context"
"errors"
"hash/fnv"
"net/http"
"strconv"
"sync"

"github.com/traefik/traefik/v2/pkg/config/dynamic"
"github.com/traefik/traefik/v2/pkg/config/runtime"
"github.com/traefik/traefik/v2/pkg/log"
)

type namedHandler struct {
http.Handler
name string
weight float64
deadline float64
pending uint64
healthy bool
queueIdx int
}

type stickyCookie struct {
Expand All @@ -34,27 +35,20 @@ type stickyCookie struct {
type Balancer struct {
stickyCookie *stickyCookie
wantsHealthCheck bool

handlersMu sync.RWMutex
// References all the handlers by name and also by the hashed value of the name.
handlerMap map[string]*namedHandler
handlers []*namedHandler
curDeadline float64
// status is a record of which child services of the Balancer are healthy, keyed
// by name of child service. A service is initially added to the map when it is
// created via Add, and it is later removed or added to the map as needed,
// through the SetStatus method.
status map[string]struct{}
// updaters is the list of hooks that are run (to update the Balancer
// parent(s)), whenever the Balancer status changes.
updaters []func(bool)

mutex sync.RWMutex
enabledHandlers priorityQueue
handlersByName map[string]*namedHandler
healthyCount int
}

// New creates a new load balancer.
func New(sticky *dynamic.Sticky, wantHealthCheck bool) *Balancer {
balancer := &Balancer{
status: make(map[string]struct{}),
handlerMap: make(map[string]*namedHandler),
handlersByName: make(map[string]*namedHandler),
wantsHealthCheck: wantHealthCheck,
}
if sticky != nil && sticky.Cookie != nil {
Expand All @@ -64,78 +58,53 @@ func New(sticky *dynamic.Sticky, wantHealthCheck bool) *Balancer {
httpOnly: sticky.Cookie.HTTPOnly,
}
}

return balancer
}

// Len implements heap.Interface/sort.Interface.
func (b *Balancer) Len() int { return len(b.handlers) }

// Less implements heap.Interface/sort.Interface.
func (b *Balancer) Less(i, j int) bool {
return b.handlers[i].deadline < b.handlers[j].deadline
}

// Swap implements heap.Interface/sort.Interface.
func (b *Balancer) Swap(i, j int) {
b.handlers[i], b.handlers[j] = b.handlers[j], b.handlers[i]
}

// Push implements heap.Interface for pushing an item into the heap.
func (b *Balancer) Push(x interface{}) {
h, ok := x.(*namedHandler)
if !ok {
return
}

b.handlers = append(b.handlers, h)
}

// Pop implements heap.Interface for popping an item from the heap.
// It panics if b.Len() < 1.
func (b *Balancer) Pop() interface{} {
h := b.handlers[len(b.handlers)-1]
b.handlers = b.handlers[0 : len(b.handlers)-1]
return h
}

// SetStatus sets on the balancer that its given child is now of the given
// status. balancerName is only needed for logging purposes.
func (b *Balancer) SetStatus(ctx context.Context, childName string, up bool) {
b.handlersMu.Lock()
defer b.handlersMu.Unlock()

upBefore := len(b.status) > 0

status := "DOWN"
if up {
status = "UP"
}
log.FromContext(ctx).Debugf("Setting status of %s to %v", childName, status)
if up {
b.status[childName] = struct{}{}
} else {
delete(b.status, childName)
// status.
func (b *Balancer) SetStatus(ctx context.Context, childName string, healthy bool) {
log.FromContext(ctx).Debugf("Setting status of %s to %v", childName, statusAsStr(healthy))

b.mutex.Lock()
nh := b.handlersByName[childName]
if nh == nil {
b.mutex.Unlock()
return
}

upAfter := len(b.status) > 0
status = "DOWN"
if upAfter {
status = "UP"
healthyBefore := b.healthyCount > 0
if nh.healthy != healthy {
nh.healthy = healthy
if healthy {
b.healthyCount++
b.enabledHandlers.push(nh)
} else {
b.healthyCount--
}
}
healthyAfter := b.healthyCount > 0
b.mutex.Unlock()

// No Status Change
if upBefore == upAfter {
if healthyBefore == healthyAfter {
// We're still with the same status, no need to propagate
log.FromContext(ctx).Debugf("Still %s, no need to propagate", status)
log.FromContext(ctx).Debugf("Still %s, no need to propagate", statusAsStr(healthyBefore))
return
}

// Status Change
log.FromContext(ctx).Debugf("Propagating new %s status", status)
log.FromContext(ctx).Debugf("Propagating new %s status", statusAsStr(healthyAfter))
for _, fn := range b.updaters {
fn(upAfter)
fn(healthyAfter)
}
}

func statusAsStr(healthy bool) string {
if healthy {
return runtime.StatusUp
}
return runtime.StatusDown
}

// RegisterStatusUpdater adds fn to the list of hooks that are run when the
Expand All @@ -151,59 +120,61 @@ func (b *Balancer) RegisterStatusUpdater(fn func(up bool)) error {

var errNoAvailableServer = errors.New("no available server")

func (b *Balancer) nextServer() (*namedHandler, error) {
b.handlersMu.Lock()
defer b.handlersMu.Unlock()

if len(b.handlers) == 0 || len(b.status) == 0 {
return nil, errNoAvailableServer
func (b *Balancer) acquireHandler(preferredName string) (*namedHandler, error) {
b.mutex.Lock()
defer b.mutex.Unlock()
var nh *namedHandler
// Check the preferred handler fist if provided.
if preferredName != "" {
nh = b.handlersByName[preferredName]
if nh != nil && nh.healthy {
nh.pending++
b.enabledHandlers.fix(nh)
return nh, nil
}
}

var handler *namedHandler
// Pick the handler with the least number of pending requests.
for {
// Pick handler with closest deadline.
handler = heap.Pop(b).(*namedHandler)

// curDeadline should be handler's deadline so that new added entry would have a fair competition environment with the old ones.
b.curDeadline = handler.deadline
handler.deadline += 1 / handler.weight

heap.Push(b, handler)
if _, ok := b.status[handler.name]; ok {
break
nh = b.enabledHandlers.pop()
if nh == nil {
return nil, errNoAvailableServer
}
// If the handler is marked as unhealthy, then continue with the next
// best option. It will be put back into the priority queue once its
// status changes to healthy.
if !nh.healthy {
continue
}
// Otherwise increment the number of pending requests, put it back into
// the priority queue, and return it as a selected for the request.
nh.pending++
b.enabledHandlers.push(nh)
log.WithoutContext().Debugf("Service selected by WRR: %s", nh.name)
return nh, nil
}
}

log.WithoutContext().Debugf("Service selected by WRR: %s", handler.name)
return handler, nil
func (b *Balancer) releaseHandler(nh *namedHandler) {
b.mutex.Lock()
defer b.mutex.Unlock()
nh.pending--
if nh.healthy {
b.enabledHandlers.fix(nh)
}
}

func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var preferredName string
if b.stickyCookie != nil {
cookie, err := req.Cookie(b.stickyCookie.name)

if err != nil && !errors.Is(err, http.ErrNoCookie) {
log.WithoutContext().Warnf("Error while reading cookie: %v", err)
}

if err == nil && cookie != nil {
b.handlersMu.RLock()
handler, ok := b.handlerMap[cookie.Value]
b.handlersMu.RUnlock()

if ok && handler != nil {
b.handlersMu.RLock()
_, isHealthy := b.status[handler.name]
b.handlersMu.RUnlock()
if isHealthy {
handler.ServeHTTP(w, req)
return
}
}
preferredName = cookie.Value
}
}

server, err := b.nextServer()
nh, err := b.acquireHandler(preferredName)
if err != nil {
if errors.Is(err, errNoAvailableServer) {
http.Error(w, errNoAvailableServer.Error(), http.StatusServiceUnavailable)
Expand All @@ -214,11 +185,18 @@ func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}

if b.stickyCookie != nil {
cookie := &http.Cookie{Name: b.stickyCookie.name, Value: hash(server.name), Path: "/", HttpOnly: b.stickyCookie.httpOnly, Secure: b.stickyCookie.secure}
cookie := &http.Cookie{
Name: b.stickyCookie.name,
Value: nh.name,
Path: "/",
HttpOnly: b.stickyCookie.httpOnly,
Secure: b.stickyCookie.secure,
}
http.SetCookie(w, cookie)
}

server.ServeHTTP(w, req)
nh.ServeHTTP(w, req)
b.releaseHandler(nh)
}

// Add adds a handler.
Expand All @@ -233,21 +211,67 @@ func (b *Balancer) Add(name string, handler http.Handler, weight *int) {
return
}

h := &namedHandler{Handler: handler, name: name, weight: float64(w)}
nh := &namedHandler{
Handler: handler,
name: name,
weight: float64(w),
pending: 1,
healthy: true,
}
b.mutex.Lock()
b.enabledHandlers.push(nh)
b.handlersByName[nh.name] = nh
b.healthyCount++
b.mutex.Unlock()
}

type priorityQueue struct {
heap []*namedHandler
}

b.handlersMu.Lock()
h.deadline = b.curDeadline + 1/h.weight
heap.Push(b, h)
b.status[name] = struct{}{}
b.handlerMap[name] = h
b.handlerMap[hash(name)] = h
b.handlersMu.Unlock()
func (pq *priorityQueue) push(nh *namedHandler) {
heap.Push(pq, nh)
}

func hash(input string) string {
hasher := fnv.New64()
// We purposely ignore the error because the implementation always returns nil.
_, _ = hasher.Write([]byte(input))
func (pq *priorityQueue) pop() *namedHandler {
if len(pq.heap) < 1 {
return nil
}
return heap.Pop(pq).(*namedHandler)
}

return strconv.FormatUint(hasher.Sum64(), 16)
func (pq *priorityQueue) fix(nh *namedHandler) {
heap.Fix(pq, nh.queueIdx)
}

// Len implements heap.Interface/sort.Interface.
func (pq *priorityQueue) Len() int { return len(pq.heap) }

// Less implements heap.Interface/sort.Interface.
func (pq *priorityQueue) Less(i, j int) bool {
nhi, nhj := pq.heap[i], pq.heap[j]
return float64(nhi.pending)/nhi.weight < float64(nhj.pending)/nhj.weight
}

// Swap implements heap.Interface/sort.Interface.
func (pq *priorityQueue) Swap(i, j int) {
pq.heap[i], pq.heap[j] = pq.heap[j], pq.heap[i]
pq.heap[i].queueIdx = i
pq.heap[j].queueIdx = j
}

// Push implements heap.Interface for pushing an item into the heap.
func (pq *priorityQueue) Push(x interface{}) {
nh := x.(*namedHandler)
nh.queueIdx = len(pq.heap)
pq.heap = append(pq.heap, nh)
}

// Pop implements heap.Interface for popping an item from the heap.
// It panics if b.Len() < 1.
func (pq *priorityQueue) Pop() interface{} {
lastIdx := len(pq.heap) - 1
nh := pq.heap[lastIdx]
pq.heap = pq.heap[0:lastIdx]
return nh
}
Loading

0 comments on commit db23305

Please sign in to comment.