Skip to content

Commit

Permalink
chore: update based on pr feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
denopink committed Sep 13, 2024
1 parent fcaeb57 commit 461022d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 9 deletions.
18 changes: 14 additions & 4 deletions internal/wrphandlers/qos/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package qos
import (
"container/heap"
"errors"
"fmt"
"slices"
"time"

Expand All @@ -14,7 +15,7 @@ import (

var ErrMaxMessageBytes = errors.New("wrp message payload exceeds maxMessageBytes")

var (
const (
// https://xmidt.io/docs/wrp/basics/#request-delivery-response-rdr-codes
messageIsTooLarge int64 = 4
higherPriorityMessageTookTheSpot int64 = 102
Expand Down Expand Up @@ -62,13 +63,15 @@ type item struct {
}

func (itm *item) dispose() (payloadSize int64) {
var rdr = higherPriorityMessageTookTheSpot

payloadSize = int64(len(itm.msg.Payload))
// Mark itm to be discarded.
itm.discard = true
// Preemptively discard itm's payload to reduce
// resource usage, since itm will be discarded,
itm.msg.Payload = nil
itm.msg.RequestDeliveryResponse = &higherPriorityMessageTookTheSpot
itm.msg.RequestDeliveryResponse = &rdr

return payloadSize
}
Expand All @@ -89,16 +92,23 @@ func (pq *priorityQueue) Dequeue() (msg wrp.Message, ok bool) {
}

// Enqueue queues the given message.
func (pq *priorityQueue) Enqueue(msg wrp.Message) {
func (pq *priorityQueue) Enqueue(msg wrp.Message) error {
var err error

// Check whether msg violates maxMessageBytes.
// The zero value of `pq.maxMessageBytes` will disable individual message size validation.
if pq.maxMessageBytes != 0 && len(msg.Payload) > pq.maxMessageBytes {
var rdr = messageIsTooLarge

msg.Payload = nil
msg.RequestDeliveryResponse = &messageIsTooLarge
msg.RequestDeliveryResponse = &rdr
err = fmt.Errorf("%w: %v", ErrMaxMessageBytes, pq.maxMessageBytes)
}

heap.Push(pq, msg)
pq.trim()

return err
}

// trim removes messages with the lowest QualityOfService until the queue no longer violates `maxQueueSize“.
Expand Down
17 changes: 14 additions & 3 deletions internal/wrphandlers/qos/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ func testEnqueueDequeueAgePriority(t *testing.T) {
require.NoError(err)

for _, msg := range messages {
pq.Enqueue(msg)
err = pq.Enqueue(msg)
if len(msg.Payload) > pq.maxMessageBytes && pq.maxMessageBytes != 0 {
assert.Error(err)
} else {
assert.NoError(err)
}
}

actualMsg, ok := pq.Dequeue()
Expand All @@ -100,6 +105,7 @@ func testEnqueueDequeueAgePriority(t *testing.T) {
}

func testEnqueueDequeue(t *testing.T) {
var rdr = messageIsTooLarge
emptyLowQOSMsg := wrp.Message{
Destination: "mac:00deadbeef00/config",
QualityOfService: 10,
Expand Down Expand Up @@ -132,7 +138,7 @@ func testEnqueueDequeue(t *testing.T) {
emptyXLargeCriticalQOSMsg := wrp.Message{
Destination: "mac:00deadbeef04/config",
QualityOfService: wrp.QOSCriticalValue,
RequestDeliveryResponse: &messageIsTooLarge,
RequestDeliveryResponse: &rdr,
}
enqueueSequenceTest := []wrp.Message{
mediumMediumQosMsg,
Expand Down Expand Up @@ -257,7 +263,12 @@ func testEnqueueDequeue(t *testing.T) {
require.NoError(err)

for _, msg := range tc.messages {
pq.Enqueue(msg)
err = pq.Enqueue(msg)
if len(msg.Payload) > pq.maxMessageBytes && pq.maxMessageBytes != 0 {
assert.Error(err)
} else {
assert.NoError(err)
}
}

if len(tc.expectedDequeueSequence) == 0 {
Expand Down
4 changes: 2 additions & 2 deletions internal/wrphandlers/qos/qos.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,14 @@ func (h *Handler) serviceQOS(queue <-chan wrp.Message) {
}

// ErrMaxMessageBytes errrors are ignored.
pq.Enqueue(msg)
_ = pq.Enqueue(msg)
case <-ready:
// Previous Handler.wrpHandler has finished, check whether it
// was successful or not.
if msg, ok := <-failedMsg; ok {
// Delivery failed, re-enqueue message and try again later.
// ErrMaxMessageBytes errrors are ignored.
pq.Enqueue(msg)
_ = pq.Enqueue(msg)
}

ready, failedMsg = nil, nil
Expand Down

0 comments on commit 461022d

Please sign in to comment.