Skip to content

Commit

Permalink
feat: qos queue trimming
Browse files Browse the repository at this point in the history
- implement `priorityQueue.trim()` to remove messages with the lowest qos value while honoring the configured qos `priority` [newest, oldest message]
- the compute cost of trim will be O(n)
  • Loading branch information
denopink committed Aug 3, 2024
1 parent 50caded commit a7d61a1
Show file tree
Hide file tree
Showing 7 changed files with 498 additions and 169 deletions.
10 changes: 9 additions & 1 deletion cmd/xmidt-agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,17 @@ type QOS struct {
MaxQueueBytes int64
// MaxMessageBytes is the largest allowable wrp message payload.
MaxMessageBytes int
// Priority determines what is used [newest, oldest message] for QualityOfService tie breakers,
// Priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming,
// with the default being to prioritize the newest messages.
Priority qos.PriorityType
// LowQOSExpires determines when low qos messages are trimmed.
LowQOSExpires time.Duration
// MediumQOSExpires determines when medium qos messages are trimmed.
MediumQOSExpires time.Duration
// HighQOSExpires determines when high qos messages are trimmed.
HighQOSExpires time.Duration
// CriticalQOSExpires determines when critical qos messages are trimmed.
CriticalQOSExpires time.Duration
}

type Pubsub struct {
Expand Down
4 changes: 4 additions & 0 deletions cmd/xmidt-agent/wrphandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func provideQOSHandler(in qosIn) (*qos.Handler, error) {
qos.MaxQueueBytes(in.QOS.MaxQueueBytes),
qos.MaxMessageBytes(in.QOS.MaxMessageBytes),
qos.Priority(in.QOS.Priority),
qos.LowQOSExpires(in.QOS.LowQOSExpires),
qos.MediumQOSExpires(in.QOS.MediumQOSExpires),
qos.HighQOSExpires(in.QOS.HighQOSExpires),
qos.CriticalQOSExpires(in.QOS.CriticalQOSExpires),
)
}

Expand Down
97 changes: 83 additions & 14 deletions internal/wrphandlers/qos/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,22 @@ package qos
import (
"errors"
"fmt"
"time"
)

const (
DefaultCloudAckTimeout = time.Minute * 7
DefaultExpiresCheckInterval = time.Minute * 5 //To check expires in every 5 mins when cloud connection is down.

// Priority queue defaults.
DefaultMaxQueueBytes = 1 * 1024 * 1024 // 1MB max/queue
DefaultMaxMessageBytes = 256 * 1024 // 256 KB

// QOS expires defaults.
DefaultLowQOSExpires = time.Minute * 15
DefaultMediumQOSExpires = time.Minute * 20
DefaultHighQOSExpires = time.Minute * 25
DefaultCriticalQOSExpires = time.Minute * 30
)

// MaxQueueBytes is the allowable max size of the qos' priority queue, based on the sum of all queued wrp message's payload.
Expand Down Expand Up @@ -47,25 +58,83 @@ func MaxMessageBytes(s int) Option {
})
}

// Priority determines what is used [newest, oldest message] for QualityOfService tie breakers,
// Priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming,
// with the default being to prioritize the newest messages.
func Priority(p PriorityType) Option {
return optionFunc(
func(h *Handler) error {
// Determine what will be used as a QualityOfService tie breaker.
switch p {
case NewestType:
// Prioritize the newest messages.
h.tieBreaker = PriorityNewestMsg
case OldestType:
// Prioritize the oldest messages.
h.tieBreaker = PriorityOldestMsg
default:
return errors.Join(fmt.Errorf("%w: %s", ErrPriorityTypeInvalid, h.priority), ErrMisconfiguredQOS)
func(h *Handler) (err error) {
h.tieBreaker, err = priority(p)
h.priority = p

return err
})
}

// priority determines which tie breakers are used during normal enqueueing.
func priority(p PriorityType) (enqueueTieBreaker tieBreaker, err error) {
// Determine what will be used as a QualityOfService tie breaker during normal enqueueing.
switch p {
case NewestType:
// Prioritize the newest messages.
enqueueTieBreaker = PriorityNewestMsg
case OldestType:
// Prioritize the oldest messages.
enqueueTieBreaker = PriorityOldestMsg
default:
return nil, errors.Join(fmt.Errorf("%w: %s", ErrPriorityTypeInvalid, p), ErrMisconfiguredQOS)
}

return enqueueTieBreaker, nil
}

// LowQOSExpires determines when low qos messages are trimmed.
func LowQOSExpires(t time.Duration) Option {
return optionFunc(
func(h *Handler) (err error) {
if t < 0 {
return fmt.Errorf("%w: negative LowQOSExpires", ErrMisconfiguredQOS)
}

h.priority = p
h.lowQOSExpires = t
return err
})
}

return nil
// MediumQOSExpires determines when medium qos messages are trimmed.
func MediumQOSExpires(t time.Duration) Option {
return optionFunc(
func(h *Handler) (err error) {
if t < 0 {
return fmt.Errorf("%w: negative MediumQOSExpires", ErrMisconfiguredQOS)
}

h.mediumQOSExpires = t
return err
})
}

// HighQOSExpires determines when high qos messages are trimmed.
func HighQOSExpires(t time.Duration) Option {
return optionFunc(
func(h *Handler) (err error) {
if t < 0 {
return fmt.Errorf("%w: negative HighQOSExpires", ErrMisconfiguredQOS)
}

h.highQOSExpires = t
return err
})
}

// CriticalQOSExpires determines when critical qos messages are trimmed.
func CriticalQOSExpires(t time.Duration) Option {
return optionFunc(
func(h *Handler) (err error) {
if t < 0 {
return fmt.Errorf("%w: negative CriticalQOSExpires", ErrMisconfiguredQOS)
}

h.criticalQOSExpires = t
return err
})
}
173 changes: 148 additions & 25 deletions internal/wrphandlers/qos/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"container/heap"
"errors"
"fmt"
"math"
"slices"
"time"

"github.com/xmidt-org/wrp-go/v3"
Expand All @@ -19,6 +21,9 @@ var ErrMaxMessageBytes = errors.New("wrp message payload exceeds maxMessageBytes
type priorityQueue struct {
// queue for wrp messages, ingested by serviceQOS
queue []item
// Priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming,
// with the default being to prioritize the newest messages.
priority PriorityType
// tieBreaker breaks any QualityOfService ties.
tieBreaker tieBreaker
// maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads
Expand All @@ -28,24 +33,56 @@ type priorityQueue struct {
// sizeBytes is the sum of all queued wrp message's payloads.
// An int64 overflow is unlikely since that'll be over 9*10^18 bytes
sizeBytes int64

// QOS expiries.
// lowQOSExpires determines when low qos messages are trimmed.
lowQOSExpires time.Duration
// mediumQOSExpires determines when medium qos messages are trimmed.
mediumQOSExpires time.Duration
// highQOSExpires determines when high qos messages are trimmed.
highQOSExpires time.Duration
// criticalQOSExpires determines when critical qos messages are trimmed.
criticalQOSExpires time.Duration
}

type tieBreaker func(i, j item) bool

type item struct {
msg wrp.Message
timestamp time.Time
msg *wrp.Message
expires time.Time
// expiresQOSPenalty is used as a weighted expires based on the message's qos value.
expiresQOSPenalty time.Time
expired bool
}

// Dequeue returns the next highest priority message.
// type itemTrimmingGroup

// func (ig itemTrimmingGroup) Less(i, j int) bool {
// return ig[i].expiresQOSPenalty.Before(ig[j].expiresQOSPenalty)
// }

// func (ig itemTrimmingGroup) Swap(i, j int) {
// ig[i], ig[j] = ig[j], ig[i]
// }

// Dequeue returns the message with the highest QualityOfService.
func (pq *priorityQueue) Dequeue() (wrp.Message, bool) {
// Required, otherwise heap.Pop will panic during an internal Swap call.
if pq.Len() == 0 {
return wrp.Message{}, false
var (
msg wrp.Message
ok bool
)
for pq.Len() != 0 {
itm := heap.Pop(pq).(item)
// itm.expired will be true if `itm` has already been `pq.trim()'.
// If itm.expired is true, then pop the next item.
if !itm.expired {
msg = *itm.msg
ok = true
break
}
}

msg, ok := heap.Pop(pq).(wrp.Message)

// ok will be false if no message was found, otherwise ok will be true.
return msg, ok
}

Expand All @@ -61,21 +98,73 @@ func (pq *priorityQueue) Enqueue(msg wrp.Message) error {
return nil
}

// trim removes messages with the lowest QualityOfService until the queue no longer violates `maxQueueSize“.
func (pq *priorityQueue) trim() {
// trim until the queue no longer violates maxQueueBytes.
for pq.sizeBytes > pq.maxQueueBytes {
// Note, priorityQueue.drop does not drop the least prioritized queued message.
// i.e.: a high priority queued message may be dropped instead of a lesser queued message.
pq.drop()
// If priorityQueue.queue doesn't violates `maxQueueSize`, then return.
if pq.sizeBytes <= pq.maxQueueBytes {
return
}
}

func (pq *priorityQueue) drop() {
_ = heap.Remove(pq, pq.Len()-1).(wrp.Message)
// Cache messages based on their qos for further trimming (if needed).
itemCache := make(map[wrp.QOSLevel][]*item)
// Remove all expired messages before trimming unexpired lower priority messages.
now := time.Now()
for i := range pq.queue {
itm := &pq.queue[i]
// itm has already been marked to be discarded.
if itm.expired {
continue
}
if now.After(itm.expires) {
// `itm.expired = true` informs `priorityQueue.Dequeue()'
// that itm should be discarded.
itm.expired = true
pq.sizeBytes -= int64(len(itm.msg.Payload))
// `pq.Pop()` will fully remove itm from pq.queue
itm.msg.Payload = nil
continue
}

qosLevel := itm.msg.QualityOfService.Level()
itemCache[qosLevel] = append(itemCache[qosLevel], itm)
}

// Continue trimming until the pq.queue no longer violates maxQueueBytes.
// Remove the messages with the lowest priority.
loop1:
for _, qosLevel := range []wrp.QOSLevel{wrp.QOSLow, wrp.QOSMedium, wrp.QOSHigh, wrp.QOSCritical} {
itms := itemCache[qosLevel]
// O(n*log(n)) sort. https://pkg.go.dev/sort#Sort https://pkg.go.dev/slices#SortFunc
slices.SortFunc(itms, func(i, j *item) int {
switch pq.priority {
case NewestType:
// Prioritize the newest messages.
return i.expiresQOSPenalty.Compare(j.expiresQOSPenalty)
default:
// Prioritize the oldest messages.
return j.expiresQOSPenalty.Compare(i.expiresQOSPenalty)
}
})

for _, itm := range itms {
// If pq.queue doesn't violates `maxQueueSize`, then return.
if pq.sizeBytes <= pq.maxQueueBytes {
break loop1
}

if now.After(itm.expiresQOSPenalty) {
// `itm.expired = true` informs `priorityQueue.Dequeue()'
// that itm should be discarded.
itm.expired = true
pq.sizeBytes -= int64(len(itm.msg.Payload))
// `pq.Pop()` will fully remove itm from pq.queue
itm.msg.Payload = nil
}
}
}
}

// heap.Interface related implementations https://pkg.go.dev/container/heap#Interface

func (pq *priorityQueue) Len() int { return len(pq.queue) }

func (pq *priorityQueue) Less(i, j int) bool {
Expand All @@ -95,9 +184,43 @@ func (pq *priorityQueue) Swap(i, j int) {
}

func (pq *priorityQueue) Push(x any) {
item := item{msg: x.(wrp.Message), timestamp: time.Now()}
pq.sizeBytes += int64(len(item.msg.Payload))
pq.queue = append(pq.queue, item)
expires := time.Now()
msg := x.(wrp.Message)
pq.sizeBytes += int64(len(msg.Payload))

var expiresQOS time.Duration
switch msg.QualityOfService.Level() {
case wrp.QOSLow:
expiresQOS = pq.lowQOSExpires
case wrp.QOSMedium:
expiresQOS = pq.mediumQOSExpires
case wrp.QOSHigh:
expiresQOS = pq.highQOSExpires
case wrp.QOSCritical:
expiresQOS = pq.criticalQOSExpires
}
expires = expires.Add(expiresQOS)

var (
// expiresQOSPenalty is used as a weighted expires based on the message's qos value.
expiresQOSPenalty time.Time
qosPenalty time.Duration
)
// Lower the qos, the sooner the message will expire.
// For example, the lowest qos value `0` will cause the message to expire immediately
// because the added pq.lowQOSExpires will be undone
// due to qosCoefficient = (100. - float64(0)) / 100 = 1.
// Where as the highest qos value `100` will not cause the message to expire any sooner
// because qosCoefficient = (100 - float64(100)) / 100 = 0.
qosCoefficient := (100. - float64(msg.QualityOfService)) / 100.
qosPenalty = time.Duration(math.Floor(float64(expiresQOS) * qosCoefficient))
expiresQOSPenalty = expires.Add(-qosPenalty)

pq.queue = append(pq.queue, item{
msg: &msg,
expires: expires,
expiresQOSPenalty: expiresQOSPenalty,
expired: false})
}

func (pq *priorityQueue) Pop() any {
Expand All @@ -106,19 +229,19 @@ func (pq *priorityQueue) Pop() any {
return nil
}

msg := pq.queue[last].msg
pq.sizeBytes -= int64(len(msg.Payload))
itm := pq.queue[last]
pq.sizeBytes -= int64(len(itm.msg.Payload))
// avoid memory leak
pq.queue[last] = item{}
pq.queue = pq.queue[0:last]

return msg
return itm
}

func PriorityNewestMsg(i, j item) bool {
return i.timestamp.After(j.timestamp)
return i.expires.After(j.expires)
}

func PriorityOldestMsg(i, j item) bool {
return i.timestamp.Before(j.timestamp)
return i.expires.Before(j.expires)
}
Loading

0 comments on commit a7d61a1

Please sign in to comment.