Skip to content

Commit

Permalink
feat: support disableable qos queue trimming
Browse files Browse the repository at this point in the history
- disableable qos queue trimming
- this will match parodus's default behavior
  - no queue trimming
  • Loading branch information
denopink committed Aug 1, 2024
1 parent 31387eb commit 55d3d62
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 7 deletions.
2 changes: 1 addition & 1 deletion internal/wrphandlers/qos/internal_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func validateQueueConstraints() Option {
return optionFunc(
func(h *Handler) error {
if int64(h.maxMessageBytes) > h.maxQueueBytes {
if h.maxQueueBytes != 0 && int64(h.maxMessageBytes) > h.maxQueueBytes {
return fmt.Errorf("%w: MaxMessageBytes > MaxQueueBytes", ErrMisconfiguredQOS)
}

Expand Down
2 changes: 0 additions & 2 deletions internal/wrphandlers/qos/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ func MaxQueueBytes(s int64) Option {
func(h *Handler) error {
if s < 0 {
return fmt.Errorf("%w: negative MaxQueueBytes", ErrMisconfiguredQOS)
} else if s == 0 {
s = DefaultMaxQueueBytes
}

h.maxQueueBytes = s
Expand Down
6 changes: 4 additions & 2 deletions internal/wrphandlers/qos/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type priorityQueue struct {
queue []item
// tieBreaker breaks any QualityOfService ties.
tieBreaker tieBreaker
// maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads
// maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads.
// Zero value will disable queue trimming.
maxQueueBytes int64
// MaxMessageBytes is the largest allowable wrp message payload.
maxMessageBytes int
Expand Down Expand Up @@ -63,7 +64,8 @@ func (pq *priorityQueue) Enqueue(msg wrp.Message) error {

func (pq *priorityQueue) trim() {
// trim until the queue no longer violates maxQueueBytes.
for pq.sizeBytes > pq.maxQueueBytes {
// The zero value of `pq.maxQueueBytes` will disable queue trimming.
for pq.maxQueueBytes != 0 && pq.sizeBytes > pq.maxQueueBytes {
// Note, priorityQueue.drop does not drop the least prioritized queued message.
// i.e.: a high priority queued message may be dropped instead of a lesser queued message.
pq.drop()
Expand Down
10 changes: 8 additions & 2 deletions internal/wrphandlers/qos/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,16 @@ func testEnqueueDequeue(t *testing.T) {
{
description: "message too large with an empty queue",
messages: []wrp.Message{largeCriticalQOSMsg},
maxQueueBytes: len(largeCriticalQOSMsg.Payload),
maxMessageBytes: len(largeCriticalQOSMsg.Payload) - 1,
maxQueueBytes: len(smallLowQOSMsg.Payload),
maxMessageBytes: len(largeCriticalQOSMsg.Payload),
expectedQueueSize: 0,
},
{
description: "allow unbound queue size",
messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg},
maxMessageBytes: len(largeCriticalQOSMsg.Payload),
expectedQueueSize: 2,
},
{
description: "message too large with a nonempty queue",
messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg},
Expand Down
11 changes: 11 additions & 0 deletions internal/wrphandlers/qos/qos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ func TestHandler_HandleWrp(t *testing.T) {
expectedHandleWRPErr error
}{
// success cases
{
description: "enqueued and delivered message prioritizing newer messages without queue size restriction",
maxMessageBytes: 50,
priority: qos.NewestType,
nextCallCount: 1,
next: wrpkit.HandlerFunc(func(wrp.Message) error {
nextCallCount.Add(1)

return nil
}),
},
{
description: "enqueued and delivered message prioritizing newer messages",
maxQueueBytes: 100,
Expand Down

0 comments on commit 55d3d62

Please sign in to comment.