Skip to content

Commit

Permalink
Merge pull request #232 from xmidt-org/denopink/feat/qos-drop-message…
Browse files Browse the repository at this point in the history
…-response

feat: update rdr codes for dropped message
  • Loading branch information
schmidtw authored Sep 13, 2024
2 parents d2319bd + 461022d commit 218e496
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 39 deletions.
65 changes: 37 additions & 28 deletions internal/wrphandlers/qos/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (

var ErrMaxMessageBytes = errors.New("wrp message payload exceeds maxMessageBytes")

const (
// https://xmidt.io/docs/wrp/basics/#request-delivery-response-rdr-codes
messageIsTooLarge int64 = 4
higherPriorityMessageTookTheSpot int64 = 102
)

// priorityQueue implements heap.Interface and holds wrp Message, using wrp.QOSValue as its priority.
// https://xmidt.io/docs/wrp/basics/#qos-description-qos
type priorityQueue struct {
Expand Down Expand Up @@ -56,23 +62,29 @@ type item struct {
discard bool
}

func (itm *item) dispose() (payloadSize int64) {
var rdr = higherPriorityMessageTookTheSpot

payloadSize = int64(len(itm.msg.Payload))
// Mark itm to be discarded.
itm.discard = true
// Preemptively discard itm's payload to reduce
// resource usage, since itm will be discarded,
itm.msg.Payload = nil
itm.msg.RequestDeliveryResponse = &rdr

return payloadSize
}

// Dequeue returns the next highest priority message.
func (pq *priorityQueue) Dequeue() (wrp.Message, bool) {
var (
msg wrp.Message
ok bool
)
for pq.Len() != 0 {
itm := heap.Pop(pq).(item)
// itm.discard will be true if `itm` has been marked to be discarded,
// i.e. trimmed by `pq.trim()'.
if itm.discard {
continue
}
func (pq *priorityQueue) Dequeue() (msg wrp.Message, ok bool) {
if pq.Len() == 0 {
return msg, false
}

itm, ok := heap.Pop(pq).(item)
if ok {
msg = *itm.msg
ok = true
break
}

// ok will be false if no message was found, otherwise ok will be true.
Expand All @@ -81,15 +93,22 @@ func (pq *priorityQueue) Dequeue() (wrp.Message, bool) {

// Enqueue queues the given message.
func (pq *priorityQueue) Enqueue(msg wrp.Message) error {
var err error

// Check whether msg violates maxMessageBytes.
// The zero value of `pq.maxMessageBytes` will disable individual message size validation.
if pq.maxMessageBytes != 0 && len(msg.Payload) > pq.maxMessageBytes {
return fmt.Errorf("%w: %v", ErrMaxMessageBytes, pq.maxMessageBytes)
var rdr = messageIsTooLarge

msg.Payload = nil
msg.RequestDeliveryResponse = &rdr
err = fmt.Errorf("%w: %v", ErrMaxMessageBytes, pq.maxMessageBytes)
}

heap.Push(pq, msg)
pq.trim()
return nil

return err
}

// trim removes messages with the lowest QualityOfService until the queue no longer violates `maxQueueSize“.
Expand All @@ -111,12 +130,7 @@ func (pq *priorityQueue) trim() {
}
if now.After(itm.expires) {
// Mark itm to be discarded.
// `pq.Dequeue()` will fully discard itm.
itm.discard = true
pq.sizeBytes -= int64(len(itm.msg.Payload))
// Preemptively discard itm's payload to reduce
// resource usage, since itm will be discarded,
itm.msg.Payload = nil
pq.sizeBytes -= itm.dispose()
continue
}

Expand Down Expand Up @@ -153,12 +167,7 @@ func (pq *priorityQueue) trim() {
}

// Mark itm to be discarded.
// `pq.Dequeue()` will fully discard itm.
itm.discard = true
pq.sizeBytes -= int64(len(itm.msg.Payload))
// Preemptively discard itm's payload to reduce
// resource usage, since itm will be discarded,
itm.msg.Payload = nil
pq.sizeBytes -= itm.dispose()
}

}
Expand Down
43 changes: 32 additions & 11 deletions internal/wrphandlers/qos/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ func testEnqueueDequeueAgePriority(t *testing.T) {
require.NoError(err)

for _, msg := range messages {
pq.Enqueue(msg)
err = pq.Enqueue(msg)
if len(msg.Payload) > pq.maxMessageBytes && pq.maxMessageBytes != 0 {
assert.Error(err)
} else {
assert.NoError(err)
}
}

actualMsg, ok := pq.Dequeue()
Expand All @@ -100,6 +105,7 @@ func testEnqueueDequeueAgePriority(t *testing.T) {
}

func testEnqueueDequeue(t *testing.T) {
var rdr = messageIsTooLarge
emptyLowQOSMsg := wrp.Message{
Destination: "mac:00deadbeef00/config",
QualityOfService: 10,
Expand All @@ -124,8 +130,17 @@ func testEnqueueDequeue(t *testing.T) {
Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameter\"]}"),
QualityOfService: wrp.QOSCriticalValue,
}
xLargeCriticalQOSMsg := wrp.Message{
Destination: "mac:00deadbeef04/config",
Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameterXL\"]}"),
QualityOfService: wrp.QOSCriticalValue,
}
emptyXLargeCriticalQOSMsg := wrp.Message{
Destination: "mac:00deadbeef04/config",
QualityOfService: wrp.QOSCriticalValue,
RequestDeliveryResponse: &rdr,
}
enqueueSequenceTest := []wrp.Message{
largeCriticalQOSMsg,
mediumMediumQosMsg,
smallLowQOSMsg,
largeCriticalQOSMsg,
Expand All @@ -136,7 +151,6 @@ func testEnqueueDequeue(t *testing.T) {
mediumHighQosMsg,
}
dequeueSequenceTest := []wrp.Message{
largeCriticalQOSMsg,
largeCriticalQOSMsg,
largeCriticalQOSMsg,
mediumHighQosMsg,
Expand All @@ -152,8 +166,9 @@ func testEnqueueDequeue(t *testing.T) {
queueSizeSequenceTest += len(msg.Payload)
}

// expect 1 message to be drop
enqueueSequenceTest = append(enqueueSequenceTest, smallLowQOSMsg)
// test message payload drop
enqueueSequenceTest = append(enqueueSequenceTest, xLargeCriticalQOSMsg)
dequeueSequenceTest = append([]wrp.Message{emptyXLargeCriticalQOSMsg}, dequeueSequenceTest...)

tests := []struct {
description string
Expand Down Expand Up @@ -192,11 +207,12 @@ func testEnqueueDequeue(t *testing.T) {
expectedQueueSize: 1,
},
{
description: "message too large with a nonempty queue",
messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg},
maxQueueBytes: len(largeCriticalQOSMsg.Payload),
maxMessageBytes: len(largeCriticalQOSMsg.Payload),
expectedQueueSize: 1,
description: "message too large with a nonempty queue",
messages: []wrp.Message{largeCriticalQOSMsg, xLargeCriticalQOSMsg},
maxQueueBytes: len(largeCriticalQOSMsg.Payload),
maxMessageBytes: len(largeCriticalQOSMsg.Payload),
expectedQueueSize: 1,
expectedDequeueSequence: []wrp.Message{emptyXLargeCriticalQOSMsg, largeCriticalQOSMsg},
},
{
description: "drop incoming low priority messages",
Expand Down Expand Up @@ -247,7 +263,12 @@ func testEnqueueDequeue(t *testing.T) {
require.NoError(err)

for _, msg := range tc.messages {
pq.Enqueue(msg)
err = pq.Enqueue(msg)
if len(msg.Payload) > pq.maxMessageBytes && pq.maxMessageBytes != 0 {
assert.Error(err)
} else {
assert.NoError(err)
}
}

if len(tc.expectedDequeueSequence) == 0 {
Expand Down

0 comments on commit 218e496

Please sign in to comment.