From a7d61a15230cbffe92d6b58afb3debc0b97d0e2a Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Tue, 4 Jun 2024 19:25:02 -0400 Subject: [PATCH] feat: qos queue trimming - implement `priorityQueue.trim()` to remove messages with the lowest qos value while honoring the configured qos `priority` [newest, oldest message] - the compute cost of trim will be O(n) --- cmd/xmidt-agent/config.go | 10 +- cmd/xmidt-agent/wrphandlers.go | 4 + internal/wrphandlers/qos/options.go | 97 +++++++-- internal/wrphandlers/qos/priority_queue.go | 173 +++++++++++++--- .../wrphandlers/qos/priority_queue_test.go | 169 ++++++++++------ internal/wrphandlers/qos/qos.go | 25 ++- internal/wrphandlers/qos/qos_test.go | 189 ++++++++++++------ 7 files changed, 498 insertions(+), 169 deletions(-) diff --git a/cmd/xmidt-agent/config.go b/cmd/xmidt-agent/config.go index 0085982..e5ec61f 100644 --- a/cmd/xmidt-agent/config.go +++ b/cmd/xmidt-agent/config.go @@ -64,9 +64,17 @@ type QOS struct { MaxQueueBytes int64 // MaxMessageBytes is the largest allowable wrp message payload. MaxMessageBytes int - // Priority determines what is used [newest, oldest message] for QualityOfService tie breakers, + // Priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming, // with the default being to prioritize the newest messages. Priority qos.PriorityType + // LowQOSExpires determines when low qos messages are trimmed. + LowQOSExpires time.Duration + // MediumQOSExpires determines when medium qos messages are trimmed. + MediumQOSExpires time.Duration + // HighQOSExpires determines when high qos messages are trimmed. + HighQOSExpires time.Duration + // CriticalQOSExpires determines when critical qos messages are trimmed. + CriticalQOSExpires time.Duration } type Pubsub struct { diff --git a/cmd/xmidt-agent/wrphandlers.go b/cmd/xmidt-agent/wrphandlers.go index dfd1d11..7c24f98 100644 --- a/cmd/xmidt-agent/wrphandlers.go +++ b/cmd/xmidt-agent/wrphandlers.go @@ -75,6 +75,10 @@ func provideQOSHandler(in qosIn) (*qos.Handler, error) { qos.MaxQueueBytes(in.QOS.MaxQueueBytes), qos.MaxMessageBytes(in.QOS.MaxMessageBytes), qos.Priority(in.QOS.Priority), + qos.LowQOSExpires(in.QOS.LowQOSExpires), + qos.MediumQOSExpires(in.QOS.MediumQOSExpires), + qos.HighQOSExpires(in.QOS.HighQOSExpires), + qos.CriticalQOSExpires(in.QOS.CriticalQOSExpires), ) } diff --git a/internal/wrphandlers/qos/options.go b/internal/wrphandlers/qos/options.go index 4904f5b..e956f2d 100644 --- a/internal/wrphandlers/qos/options.go +++ b/internal/wrphandlers/qos/options.go @@ -6,11 +6,22 @@ package qos import ( "errors" "fmt" + "time" ) const ( + DefaultCloudAckTimeout = time.Minute * 7 + DefaultExpiresCheckInterval = time.Minute * 5 //To check expires in every 5 mins when cloud connection is down. + + // Priority queue defaults. DefaultMaxQueueBytes = 1 * 1024 * 1024 // 1MB max/queue DefaultMaxMessageBytes = 256 * 1024 // 256 KB + + // QOS expires defaults. + DefaultLowQOSExpires = time.Minute * 15 + DefaultMediumQOSExpires = time.Minute * 20 + DefaultHighQOSExpires = time.Minute * 25 + DefaultCriticalQOSExpires = time.Minute * 30 ) // MaxQueueBytes is the allowable max size of the qos' priority queue, based on the sum of all queued wrp message's payload. @@ -47,25 +58,83 @@ func MaxMessageBytes(s int) Option { }) } -// Priority determines what is used [newest, oldest message] for QualityOfService tie breakers, +// Priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming, // with the default being to prioritize the newest messages. func Priority(p PriorityType) Option { return optionFunc( - func(h *Handler) error { - // Determine what will be used as a QualityOfService tie breaker. - switch p { - case NewestType: - // Prioritize the newest messages. - h.tieBreaker = PriorityNewestMsg - case OldestType: - // Prioritize the oldest messages. - h.tieBreaker = PriorityOldestMsg - default: - return errors.Join(fmt.Errorf("%w: %s", ErrPriorityTypeInvalid, h.priority), ErrMisconfiguredQOS) + func(h *Handler) (err error) { + h.tieBreaker, err = priority(p) + h.priority = p + + return err + }) +} + +// priority determines which tie breakers are used during normal enqueueing. +func priority(p PriorityType) (enqueueTieBreaker tieBreaker, err error) { + // Determine what will be used as a QualityOfService tie breaker during normal enqueueing. + switch p { + case NewestType: + // Prioritize the newest messages. + enqueueTieBreaker = PriorityNewestMsg + case OldestType: + // Prioritize the oldest messages. + enqueueTieBreaker = PriorityOldestMsg + default: + return nil, errors.Join(fmt.Errorf("%w: %s", ErrPriorityTypeInvalid, p), ErrMisconfiguredQOS) + } + + return enqueueTieBreaker, nil +} + +// LowQOSExpires determines when low qos messages are trimmed. +func LowQOSExpires(t time.Duration) Option { + return optionFunc( + func(h *Handler) (err error) { + if t < 0 { + return fmt.Errorf("%w: negative LowQOSExpires", ErrMisconfiguredQOS) } - h.priority = p + h.lowQOSExpires = t + return err + }) +} - return nil +// MediumQOSExpires determines when medium qos messages are trimmed. +func MediumQOSExpires(t time.Duration) Option { + return optionFunc( + func(h *Handler) (err error) { + if t < 0 { + return fmt.Errorf("%w: negative MediumQOSExpires", ErrMisconfiguredQOS) + } + + h.mediumQOSExpires = t + return err + }) +} + +// HighQOSExpires determines when high qos messages are trimmed. +func HighQOSExpires(t time.Duration) Option { + return optionFunc( + func(h *Handler) (err error) { + if t < 0 { + return fmt.Errorf("%w: negative HighQOSExpires", ErrMisconfiguredQOS) + } + + h.highQOSExpires = t + return err + }) +} + +// CriticalQOSExpires determines when critical qos messages are trimmed. +func CriticalQOSExpires(t time.Duration) Option { + return optionFunc( + func(h *Handler) (err error) { + if t < 0 { + return fmt.Errorf("%w: negative CriticalQOSExpires", ErrMisconfiguredQOS) + } + + h.criticalQOSExpires = t + return err }) } diff --git a/internal/wrphandlers/qos/priority_queue.go b/internal/wrphandlers/qos/priority_queue.go index 39a59cb..546625e 100644 --- a/internal/wrphandlers/qos/priority_queue.go +++ b/internal/wrphandlers/qos/priority_queue.go @@ -7,6 +7,8 @@ import ( "container/heap" "errors" "fmt" + "math" + "slices" "time" "github.com/xmidt-org/wrp-go/v3" @@ -19,6 +21,9 @@ var ErrMaxMessageBytes = errors.New("wrp message payload exceeds maxMessageBytes type priorityQueue struct { // queue for wrp messages, ingested by serviceQOS queue []item + // Priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming, + // with the default being to prioritize the newest messages. + priority PriorityType // tieBreaker breaks any QualityOfService ties. tieBreaker tieBreaker // maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads @@ -28,24 +33,56 @@ type priorityQueue struct { // sizeBytes is the sum of all queued wrp message's payloads. // An int64 overflow is unlikely since that'll be over 9*10^18 bytes sizeBytes int64 + + // QOS expiries. + // lowQOSExpires determines when low qos messages are trimmed. + lowQOSExpires time.Duration + // mediumQOSExpires determines when medium qos messages are trimmed. + mediumQOSExpires time.Duration + // highQOSExpires determines when high qos messages are trimmed. + highQOSExpires time.Duration + // criticalQOSExpires determines when critical qos messages are trimmed. + criticalQOSExpires time.Duration } type tieBreaker func(i, j item) bool type item struct { - msg wrp.Message - timestamp time.Time + msg *wrp.Message + expires time.Time + // expiresQOSPenalty is used as a weighted expires based on the message's qos value. + expiresQOSPenalty time.Time + expired bool } -// Dequeue returns the next highest priority message. +// type itemTrimmingGroup + +// func (ig itemTrimmingGroup) Less(i, j int) bool { +// return ig[i].expiresQOSPenalty.Before(ig[j].expiresQOSPenalty) +// } + +// func (ig itemTrimmingGroup) Swap(i, j int) { +// ig[i], ig[j] = ig[j], ig[i] +// } + +// Dequeue returns the message with the highest QualityOfService. func (pq *priorityQueue) Dequeue() (wrp.Message, bool) { - // Required, otherwise heap.Pop will panic during an internal Swap call. - if pq.Len() == 0 { - return wrp.Message{}, false + var ( + msg wrp.Message + ok bool + ) + for pq.Len() != 0 { + itm := heap.Pop(pq).(item) + // itm.expired will be true if `itm` has already been `pq.trim()'. + // If itm.expired is true, then pop the next item. + if !itm.expired { + msg = *itm.msg + ok = true + break + } } - msg, ok := heap.Pop(pq).(wrp.Message) - + // ok will be false if no message was found, otherwise ok will be true. return msg, ok } @@ -61,21 +98,73 @@ func (pq *priorityQueue) Enqueue(msg wrp.Message) error { return nil } +// trim removes messages with the lowest QualityOfService until the queue no longer violates `maxQueueSize“. func (pq *priorityQueue) trim() { - // trim until the queue no longer violates maxQueueBytes. - for pq.sizeBytes > pq.maxQueueBytes { - // Note, priorityQueue.drop does not drop the least prioritized queued message. - // i.e.: a high priority queued message may be dropped instead of a lesser queued message. - pq.drop() + // If priorityQueue.queue doesn't violates `maxQueueSize`, then return. + if pq.sizeBytes <= pq.maxQueueBytes { + return } -} -func (pq *priorityQueue) drop() { - _ = heap.Remove(pq, pq.Len()-1).(wrp.Message) + // Cache messages based on their qos for further trimming (if needed). + itemCache := make(map[wrp.QOSLevel][]*item) + // Remove all expired messages before trimming unexpired lower priority messages. + now := time.Now() + for i := range pq.queue { + itm := &pq.queue[i] + // itm has already been marked to be discarded. + if itm.expired { + continue + } + if now.After(itm.expires) { + // `itm.expired = true` informs `priorityQueue.Dequeue()' + // that itm should be discarded. + itm.expired = true + pq.sizeBytes -= int64(len(itm.msg.Payload)) + // `pq.Pop()` will fully remove itm from pq.queue + itm.msg.Payload = nil + continue + } + + qosLevel := itm.msg.QualityOfService.Level() + itemCache[qosLevel] = append(itemCache[qosLevel], itm) + } + + // Continue trimming until the pq.queue no longer violates maxQueueBytes. + // Remove the messages with the lowest priority. +loop1: + for _, qosLevel := range []wrp.QOSLevel{wrp.QOSLow, wrp.QOSMedium, wrp.QOSHigh, wrp.QOSCritical} { + itms := itemCache[qosLevel] + // O(n*log(n)) sort. https://pkg.go.dev/sort#Sort https://pkg.go.dev/slices#SortFunc + slices.SortFunc(itms, func(i, j *item) int { + switch pq.priority { + case NewestType: + // Prioritize the newest messages. + return i.expiresQOSPenalty.Compare(j.expiresQOSPenalty) + default: + // Prioritize the oldest messages. + return j.expiresQOSPenalty.Compare(i.expiresQOSPenalty) + } + }) + + for _, itm := range itms { + // If pq.queue doesn't violates `maxQueueSize`, then return. + if pq.sizeBytes <= pq.maxQueueBytes { + break loop1 + } + + if now.After(itm.expiresQOSPenalty) { + // `itm.expired = true` informs `priorityQueue.Dequeue()' + // that itm should be discarded. + itm.expired = true + pq.sizeBytes -= int64(len(itm.msg.Payload)) + // `pq.Pop()` will fully remove itm from pq.queue + itm.msg.Payload = nil + } + } + } } // heap.Interface related implementations https://pkg.go.dev/container/heap#Interface - func (pq *priorityQueue) Len() int { return len(pq.queue) } func (pq *priorityQueue) Less(i, j int) bool { @@ -95,9 +184,43 @@ func (pq *priorityQueue) Swap(i, j int) { } func (pq *priorityQueue) Push(x any) { - item := item{msg: x.(wrp.Message), timestamp: time.Now()} - pq.sizeBytes += int64(len(item.msg.Payload)) - pq.queue = append(pq.queue, item) + expires := time.Now() + msg := x.(wrp.Message) + pq.sizeBytes += int64(len(msg.Payload)) + + var expiresQOS time.Duration + switch msg.QualityOfService.Level() { + case wrp.QOSLow: + expiresQOS = pq.lowQOSExpires + case wrp.QOSMedium: + expiresQOS = pq.mediumQOSExpires + case wrp.QOSHigh: + expiresQOS = pq.highQOSExpires + case wrp.QOSCritical: + expiresQOS = pq.criticalQOSExpires + } + expires = expires.Add(expiresQOS) + + var ( + // expiresQOSPenalty is used as a weighted expires based on the message's qos value. + expiresQOSPenalty time.Time + qosPenalty time.Duration + ) + // Lower the qos, the sooner the message will expire. + // For example, the lowest qos value `0` will cause the message to expire immediately + // because the added pq.lowQOSExpires will be undone + // due to qosCoefficient = (100. - float64(0)) / 100 = 1. + // Where as the highest qos value `100` will not cause the message to expire any sooner + // because qosCoefficient = (100 - float64(100)) / 100 = 0. + qosCoefficient := (100. - float64(msg.QualityOfService)) / 100. + qosPenalty = time.Duration(math.Floor(float64(expiresQOS) * qosCoefficient)) + expiresQOSPenalty = expires.Add(-qosPenalty) + + pq.queue = append(pq.queue, item{ + msg: &msg, + expires: expires, + expiresQOSPenalty: expiresQOSPenalty, + expired: false}) } func (pq *priorityQueue) Pop() any { @@ -106,19 +229,19 @@ func (pq *priorityQueue) Pop() any { return nil } - msg := pq.queue[last].msg - pq.sizeBytes -= int64(len(msg.Payload)) + itm := pq.queue[last] + pq.sizeBytes -= int64(len(itm.msg.Payload)) // avoid memory leak pq.queue[last] = item{} pq.queue = pq.queue[0:last] - return msg + return itm } func PriorityNewestMsg(i, j item) bool { - return i.timestamp.After(j.timestamp) + return i.expires.After(j.expires) } func PriorityOldestMsg(i, j item) bool { - return i.timestamp.Before(j.timestamp) + return i.expires.Before(j.expires) } diff --git a/internal/wrphandlers/qos/priority_queue_test.go b/internal/wrphandlers/qos/priority_queue_test.go index e6ca034..acbceef 100644 --- a/internal/wrphandlers/qos/priority_queue_test.go +++ b/internal/wrphandlers/qos/priority_queue_test.go @@ -52,17 +52,17 @@ func testEnqueueDequeueAgePriority(t *testing.T) { } tests := []struct { description string - tieBreaker tieBreaker + priority PriorityType expectedMsg wrp.Message }{ { description: "drop incoming low priority messages while prioritizing older messages", - tieBreaker: PriorityOldestMsg, + priority: OldestType, expectedMsg: smallLowQOSMsgOldest, }, { description: "drop incoming low priority messages while prioritizing newer messages", - tieBreaker: PriorityNewestMsg, + priority: NewestType, expectedMsg: smallLowQOSMsgNewest, }, } @@ -72,20 +72,28 @@ func testEnqueueDequeueAgePriority(t *testing.T) { assert := assert.New(t) require := require.New(t) pq := priorityQueue{ - maxQueueBytes: int64(len(smallLowQOSMsgOldest.Payload)), - maxMessageBytes: len(smallLowQOSMsgOldest.Payload), - tieBreaker: tc.tieBreaker, + maxQueueBytes: int64(len(smallLowQOSMsgOldest.Payload)), + maxMessageBytes: len(smallLowQOSMsgOldest.Payload), + lowQOSExpires: DefaultLowQOSExpires, + mediumQOSExpires: DefaultMediumQOSExpires, + highQOSExpires: DefaultHighQOSExpires, + criticalQOSExpires: DefaultCriticalQOSExpires, + priority: tc.priority, } + + var err error + pq.tieBreaker, err = priority(tc.priority) + require.NoError(err) + for _, msg := range messages { pq.Enqueue(msg) } - assert.Equal(1, pq.Len()) - actualMsg, ok := pq.Dequeue() require.True(ok) require.NotEmpty(actualMsg) assert.Equal(tc.expectedMsg, actualMsg) + assert.Equal(int64(0), pq.sizeBytes) }) } } @@ -105,6 +113,11 @@ func testEnqueueDequeue(t *testing.T) { Payload: []byte("{\"command\":\"GET\",\"names\":[]}"), QualityOfService: wrp.QOSMediumValue, } + mediumHighQosMsg := wrp.Message{ + Destination: "mac:00deadbeef02/config", + Payload: []byte("{\"command\":\"GET\",\"names\":[]}"), + QualityOfService: wrp.QOSHighValue, + } largeCriticalQOSMsg := wrp.Message{ Destination: "mac:00deadbeef03/config", Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameter\"]}"), @@ -119,11 +132,13 @@ func testEnqueueDequeue(t *testing.T) { largeCriticalQOSMsg, smallLowQOSMsg, mediumMediumQosMsg, + mediumHighQosMsg, } dequeueSequenceTest := []wrp.Message{ largeCriticalQOSMsg, largeCriticalQOSMsg, largeCriticalQOSMsg, + mediumHighQosMsg, mediumMediumQosMsg, mediumMediumQosMsg, smallLowQOSMsg, @@ -173,6 +188,14 @@ func testEnqueueDequeue(t *testing.T) { maxMessageBytes: len(largeCriticalQOSMsg.Payload), expectedQueueSize: 1, }, + { + description: "drop incoming low priority messages", + messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg, smallLowQOSMsg, mediumMediumQosMsg}, + maxQueueBytes: len(largeCriticalQOSMsg.Payload) * 2, + maxMessageBytes: len(largeCriticalQOSMsg.Payload), + expectedQueueSize: 2, + expectedDequeueSequence: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg}, + }, { description: "remove some low priority messages to fit a higher priority message", messages: []wrp.Message{mediumMediumQosMsg, mediumMediumQosMsg, mediumMediumQosMsg, largeCriticalQOSMsg}, @@ -201,16 +224,22 @@ func testEnqueueDequeue(t *testing.T) { assert := assert.New(t) require := require.New(t) pq := priorityQueue{ - maxQueueBytes: int64(tc.maxQueueBytes), - maxMessageBytes: tc.maxMessageBytes, - tieBreaker: PriorityNewestMsg, + maxQueueBytes: int64(tc.maxQueueBytes), + maxMessageBytes: tc.maxMessageBytes, + lowQOSExpires: DefaultLowQOSExpires, + mediumQOSExpires: 0, + highQOSExpires: DefaultHighQOSExpires, + criticalQOSExpires: DefaultCriticalQOSExpires, } + + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) + for _, msg := range tc.messages { pq.Enqueue(msg) } - assert.Equal(tc.expectedQueueSize, pq.Len()) - if len(tc.expectedDequeueSequence) == 0 { return } @@ -221,96 +250,105 @@ func testEnqueueDequeue(t *testing.T) { require.NotEmpty(actualMsg) assert.Equal(expectedMsg, actualMsg) } + + assert.Equal(int64(0), pq.sizeBytes) + }) } } func testSize(t *testing.T) { assert := assert.New(t) + require := require.New(t) msg := wrp.Message{ Destination: "mac:00deadbeef00/config", Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameter\"]}"), } - pq := priorityQueue{tieBreaker: PriorityNewestMsg} + pq := priorityQueue{} + + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) assert.Equal(int64(0), pq.sizeBytes) pq.Push(msg) pq.Push(msg) assert.Equal(int64(len(msg.Payload)*2), pq.sizeBytes) } + func testLen(t *testing.T) { assert := assert.New(t) - pq := priorityQueue{queue: []item{ + require := require.New(t) + pq := priorityQueue{} + pq.queue = []item{ { - msg: wrp.Message{ - + msg: &wrp.Message{ Destination: "mac:00deadbeef00/config", }, - timestamp: time.Now(), + expires: time.Now(), }, { - msg: wrp.Message{ + msg: &wrp.Message{ Destination: "mac:00deadbeef01/config", }, - timestamp: time.Now(), + expires: time.Now(), }, - }, - tieBreaker: PriorityNewestMsg, } + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) assert.Equal(len(pq.queue), pq.Len()) } func testLess(t *testing.T) { oldestMsg := item{ - msg: wrp.Message{ + msg: &wrp.Message{ Destination: "mac:00deadbeef00/config", QualityOfService: wrp.QOSCriticalValue, }, - timestamp: time.Now(), + expires: time.Now(), } newestMsg := item{ - msg: wrp.Message{ + msg: &wrp.Message{ Destination: "mac:00deadbeef01/config", QualityOfService: wrp.QOSLowValue, }, - timestamp: time.Now(), + expires: time.Now(), } tieBreakerMsg := item{ - msg: wrp.Message{ + msg: &wrp.Message{ Destination: "mac:00deadbeef02/config", QualityOfService: wrp.QOSCriticalValue, }, - timestamp: time.Now(), + expires: time.Now(), } tests := []struct { description string priority PriorityType - tieBreaker tieBreaker }{ { description: "less", priority: NewestType, - tieBreaker: PriorityNewestMsg, }, { description: "tie breaker prioritizing newer messages", priority: NewestType, - tieBreaker: PriorityNewestMsg, }, { description: "tie breaker prioritizing older messages", priority: OldestType, - tieBreaker: PriorityOldestMsg, }, } for _, tc := range tests { t.Run(tc.description, func(t *testing.T) { assert := assert.New(t) + require := require.New(t) + pq := priorityQueue{} + pq.queue = []item{oldestMsg, newestMsg, tieBreakerMsg} - pq := priorityQueue{ - queue: []item{oldestMsg, newestMsg, tieBreakerMsg}, - tieBreaker: tc.tieBreaker, - } + var err error + pq.tieBreaker, err = priority(tc.priority) + require.NoError(err) // wrp.QOSCriticalValue > wrp.QOSLowValue assert.True(pq.Less(0, 1)) @@ -334,40 +372,44 @@ func testLess(t *testing.T) { func testSwap(t *testing.T) { assert := assert.New(t) + require := require.New(t) msg0 := wrp.Message{ Destination: "mac:00deadbeef00/config", } msg2 := wrp.Message{ Destination: "mac:00deadbeef02/config", } - pq := priorityQueue{queue: []item{ + pq := priorityQueue{} + pq.queue = []item{ { - msg: msg0, - timestamp: time.Now(), + msg: &msg0, + expires: time.Now(), }, { - msg: wrp.Message{ + msg: &wrp.Message{ Destination: "mac:00deadbeef01/config", }, - timestamp: time.Now(), + expires: time.Now(), }, { - msg: msg2, - timestamp: time.Now(), + msg: &msg2, + expires: time.Now(), }, - }, - tieBreaker: PriorityNewestMsg, } + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) pq.Swap(0, 2) // pq.queue[0] should contain msg2 - assert.Equal(msg2, pq.queue[0].msg) + assert.Equal(msg2, *pq.queue[0].msg) // pq.queue[2] should contain msg0 - assert.Equal(msg0, pq.queue[2].msg) + assert.Equal(msg0, *pq.queue[2].msg) } func testPush(t *testing.T) { assert := assert.New(t) + require := require.New(t) messages := []wrp.Message{ { Destination: "mac:00deadbeef00/config", @@ -379,10 +421,15 @@ func testPush(t *testing.T) { Destination: "mac:00deadbeef02/config", }, } - pq := priorityQueue{tieBreaker: PriorityNewestMsg} + pq := priorityQueue{} + + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) + for _, msg := range messages { pq.Push(msg) - assert.Equal(msg, pq.queue[pq.Len()-1].msg) + assert.Equal(msg, *pq.queue[pq.Len()-1].msg) } } @@ -409,7 +456,7 @@ func testPop(t *testing.T) { description: "single message with memory leak check", items: []item{ { - msg: msg0, + msg: &msg0, }, }, expectedMessage: msg0, @@ -418,13 +465,13 @@ func testPop(t *testing.T) { description: "multiple messages with memory leak check", items: []item{ { - msg: msg0, + msg: &msg0, }, { - msg: msg1, + msg: &msg1, }, { - msg: msg2, + msg: &msg2, }, }, expectedMessage: msg2, @@ -435,18 +482,22 @@ func testPop(t *testing.T) { assert := assert.New(t) require := require.New(t) - pq := priorityQueue{queue: tc.items, tieBreaker: PriorityNewestMsg} + pq := priorityQueue{} + pq.queue = tc.items + + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) + // no sorting is applied, Pop will pop the last message from priorityQueue's queue - switch msg := pq.Pop().(type) { + switch itm := pq.Pop().(type) { case nil: assert.Len(tc.items, 0) - case wrp.Message: - assert.Equal(tc.expectedMessage, msg) + case item: + assert.Equal(tc.expectedMessage, *itm.msg) require.NotEmpty(tc.items, "Pop() should have returned a nil instead of a wrp.Message") // check for memory leak assert.Empty(tc.items[len(tc.items)-1]) - assert.Equal(wrp.Message{}, tc.items[len(tc.items)-1].msg) - assert.True(tc.items[len(tc.items)-1].timestamp.IsZero()) default: require.Fail("Pop() returned an unknown type") } diff --git a/internal/wrphandlers/qos/qos.go b/internal/wrphandlers/qos/qos.go index ff94e96..323855f 100644 --- a/internal/wrphandlers/qos/qos.go +++ b/internal/wrphandlers/qos/qos.go @@ -6,6 +6,7 @@ package qos import ( "errors" "sync" + "time" "github.com/xmidt-org/wrp-go/v3" "github.com/xmidt-org/xmidt-agent/internal/wrpkit" @@ -33,7 +34,7 @@ type Handler struct { next wrpkit.Handler // queue for wrp messages, ingested by serviceQOS queue chan wrp.Message - // Priority determines what is used [newest, oldest message] for QualityOfService tie breakers, + // priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming, // with the default being to prioritize the newest messages. priority PriorityType // tieBreaker breaks any QualityOfService ties. @@ -43,6 +44,16 @@ type Handler struct { // MaxMessageBytes is the largest allowable wrp message payload. maxMessageBytes int + // QOS expiries. + // lowQOSExpires determines when low qos messages are trimmed. + lowQOSExpires time.Duration + // mediumQOSExpires determines when medium qos messages are trimmed. + mediumQOSExpires time.Duration + // highQOSExpires determines when high qos messages are trimmed. + highQOSExpires time.Duration + // criticalQOSExpires determines when critical qos messages are trimmed. + criticalQOSExpires time.Duration + lock sync.Mutex } @@ -55,13 +66,17 @@ func New(next wrpkit.Handler, opts ...Option) (*Handler, error) { return nil, ErrInvalidInput } - // Add configuration validators. - opts = append(opts, validateQueueConstraints(), validatePriority(), validateTieBreaker()) - h := Handler{ - next: next, + next: next, + lowQOSExpires: DefaultLowQOSExpires, + mediumQOSExpires: DefaultMediumQOSExpires, + highQOSExpires: DefaultHighQOSExpires, + criticalQOSExpires: DefaultCriticalQOSExpires, } + // Add configuration validators. + opts = append(opts, validateQueueConstraints(), validatePriority(), validateTieBreaker()) + var errs error for _, opt := range opts { if opt != nil { diff --git a/internal/wrphandlers/qos/qos_test.go b/internal/wrphandlers/qos/qos_test.go index c643718..5f6f7c9 100644 --- a/internal/wrphandlers/qos/qos_test.go +++ b/internal/wrphandlers/qos/qos_test.go @@ -31,10 +31,9 @@ func TestHandler_HandleWrp(t *testing.T) { ) tests := []struct { - description string - maxQueueBytes int - maxMessageBytes int - priority qos.PriorityType + description string + options []qos.Option + priority qos.PriorityType // int64 required for nextCallCount atomic.Int64 comparison nextCallCount int64 next wrpkit.Handler @@ -46,11 +45,9 @@ func TestHandler_HandleWrp(t *testing.T) { }{ // success cases { - description: "enqueued and delivered message prioritizing newer messages", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.NewestType, - nextCallCount: 1, + description: "enqueued and delivered message prioritizing newer messages", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -58,11 +55,9 @@ func TestHandler_HandleWrp(t *testing.T) { }), }, { - description: "enqueued and delivered message prioritizing older messages", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.OldestType, - nextCallCount: 1, + description: "enqueued and delivered message prioritizing older messages", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.OldestType)}, + nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -70,11 +65,9 @@ func TestHandler_HandleWrp(t *testing.T) { }), }, { - description: "re-enqueue message that failed its delivery", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.NewestType, - nextCallCount: 2, + description: "re-enqueue message that failed its delivery", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 2, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) if nextCallCount.Load() < 2 { @@ -87,11 +80,10 @@ func TestHandler_HandleWrp(t *testing.T) { failDeliveryOnce: true, }, { - description: "queue messages while message delivery is blocked", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.NewestType, - nextCallCount: 0, + description: "queue messages while message delivery is blocked", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + + nextCallCount: 0, next: wrpkit.HandlerFunc(func(wrp.Message) error { // halt qos message delivery time.Sleep(1 * time.Second) @@ -101,11 +93,30 @@ func TestHandler_HandleWrp(t *testing.T) { shouldHalt: true, }, { - description: "zero MaxQueueBytes option value", - maxQueueBytes: 0, - maxMessageBytes: 50, - priority: qos.NewestType, - nextCallCount: 1, + description: "zero MaxQueueBytes option value", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + + nextCallCount: 1, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + }, + { + description: "zero MaxMessageBytes option value", + options: []qos.Option{qos.MaxQueueBytes(int64(0)), qos.MaxMessageBytes(0), qos.Priority(qos.NewestType)}, + nextCallCount: 1, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + }, + { + description: "non-negative LowQOSExpires option value", + options: []qos.Option{qos.LowQOSExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -113,11 +124,29 @@ func TestHandler_HandleWrp(t *testing.T) { }), }, { - description: "zero MaxMessageBytes option value", - maxQueueBytes: qos.DefaultMaxQueueBytes, - maxMessageBytes: 0, - priority: qos.NewestType, - nextCallCount: 1, + description: "non-negative MediumQOSExpires option value", + options: []qos.Option{qos.MediumQOSExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 1, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + }, + { + description: "non-negative HighQOSExpires option value", + options: []qos.Option{qos.HighQOSExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 1, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + }, + { + description: "non-negative CriticalQOSExpires option value", + options: []qos.Option{qos.CriticalQOSExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -126,17 +155,13 @@ func TestHandler_HandleWrp(t *testing.T) { }, // failure cases { - description: "invalid inputs for qos.New", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.NewestType, - expectedNewErr: qos.ErrInvalidInput, + description: "invalid inputs for qos.New", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + expectedNewErr: qos.ErrInvalidInput, }, { - description: "negative MaxQueueBytes option value", - maxQueueBytes: -1, - maxMessageBytes: 50, - priority: qos.NewestType, + description: "negative MaxQueueBytes option value", + options: []qos.Option{qos.MaxQueueBytes(int64(-1)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -145,10 +170,8 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "negative MaxMessageBytes option value", - maxQueueBytes: 100, - maxMessageBytes: -1, - priority: qos.NewestType, + description: "negative MaxMessageBytes option value", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(-1), qos.Priority(qos.NewestType)}, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -157,10 +180,8 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "negative invalid priority type option value", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: -1, + description: "negative invalid priority type option value", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(-1)}, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -169,10 +190,8 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "positive invalid priority type option value", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: math.MaxInt64, + description: "positive invalid priority type option value", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(math.MaxInt64)}, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -181,10 +200,8 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "unknown priority type option value", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.UnknownType, + description: "unknown priority type option value", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.UnknownType)}, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -193,11 +210,9 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "qos has stopped", - maxQueueBytes: 100, - maxMessageBytes: 50, - nextCallCount: 0, - priority: qos.NewestType, + description: "qos has stopped", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 0, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -206,13 +221,57 @@ func TestHandler_HandleWrp(t *testing.T) { shutdown: true, expectedHandleWRPErr: qos.ErrQOSHasShutdown, }, + { + description: "negative LowQOSExpires option value", + options: []qos.Option{qos.LowQOSExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 0, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + expectedNewErr: qos.ErrMisconfiguredQOS, + }, + { + description: "negative MediumQOSExpires option value", + options: []qos.Option{qos.MediumQOSExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 0, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + expectedNewErr: qos.ErrMisconfiguredQOS, + }, + { + description: "negative HighQOSExpires option value", + options: []qos.Option{qos.HighQOSExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 0, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + expectedNewErr: qos.ErrMisconfiguredQOS, + }, + { + description: "negative CriticalQOSExpires option value", + options: []qos.Option{qos.CriticalQOSExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 0, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + expectedNewErr: qos.ErrMisconfiguredQOS, + }, } for _, tc := range tests { t.Run(tc.description, func(t *testing.T) { assert := assert.New(t) require := require.New(t) - h, err := qos.New(tc.next, qos.MaxQueueBytes(int64(tc.maxQueueBytes)), qos.MaxMessageBytes(tc.maxMessageBytes), qos.Priority(tc.priority)) + h, err := qos.New(tc.next, tc.options...) if tc.expectedNewErr != nil { assert.ErrorIs(err, tc.expectedNewErr) assert.Nil(h)