Skip to content

Commit

Permalink
feat(priority queue): add support for queue items that are not just c…
Browse files Browse the repository at this point in the history
…mp.Ordered
  • Loading branch information
adityathebe authored and moshloop committed Dec 11, 2024
1 parent 3a91b89 commit b1b4bfd
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.PHONY: test
test:
go test ./... -v
go test ./... -v --count=1

.PHONY: lint
lint:
Expand Down
28 changes: 13 additions & 15 deletions collections/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package collections

import (
"cmp"
"errors"
"fmt"
"iter"
"strings"
Expand All @@ -36,7 +36,7 @@ type MetricsOpts[T comparable] struct {
Labels map[string]any
Labeller map[string]func(i T) string
DurationBuckets []float64
MetricName string
Name string
Disable bool
}

Expand Down Expand Up @@ -109,31 +109,31 @@ func newMetrics[T comparable](opts MetricsOpts[T]) *metrics[T] {
}
}

if opts.MetricName == "" {
opts.MetricName = "priority_queue"
if opts.Name == "" {
opts.Name = "priority_queue"
}

return &metrics[T]{
opts: opts,
enqueuedTotal: promauto.NewCounterVec(prometheus.CounterOpts{
Name: opts.MetricName + "_enqueued_total",
Name: opts.Name + "_enqueued_total",
Help: "The total number of enqueued items",
}, keys),
dedupedTotal: promauto.NewCounterVec(prometheus.CounterOpts{
Name: opts.MetricName + "_deduped_total",
Name: opts.Name + "_deduped_total",
Help: "The total number of enqueued items",
}, keys),
dequeuedTotal: promauto.NewCounterVec(prometheus.CounterOpts{
Name: opts.MetricName + "_dequeued_total",
Name: opts.Name + "_dequeued_total",
Help: "The total number of dequeued items",
}, keys),
queueSize: promauto.NewGauge(prometheus.GaugeOpts{
Name: opts.MetricName + "_size",
Name: opts.Name + "_size",
Help: "The current size of the queue",
ConstLabels: labels,
}),
queueDuration: promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: opts.MetricName + "_duration",
Name: opts.Name + "_duration",
Help: "Time an object spent in the queue in milliseconds",
Buckets: opts.DurationBuckets,
}, keys),
Expand Down Expand Up @@ -165,17 +165,15 @@ type QueueOpts[T comparable] struct {
Metrics MetricsOpts[T]
}

func New[T cmp.Ordered](opts QueueOpts[T]) (*Queue[T], error) {

func NewQueue[T comparable](opts QueueOpts[T]) (*Queue[T], error) {
if opts.Dedupe && opts.Equals == nil {
return nil, fmt.Errorf("Dedupe requires Equals function")
return nil, errors.New("dedupe requires Equals function")
}

if opts.Comparator == nil {
opts.Comparator = func(a, b T) int {
return cmp.Compare(a, b)
}
return nil, errors.New("a comparator function is required")
}

return &Queue[T]{
heap: binaryheap.NewWith(func(a, b queueItem[T]) int {
return opts.Comparator(a.item, b.item)
Expand Down
64 changes: 56 additions & 8 deletions collections/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
. "github.com/onsi/gomega"
)

func TestPriorityQueue(t *testing.T) {
func TestPriorityQueueString(t *testing.T) {
g := NewWithT(t)

pq, err := New(QueueOpts[string]{
pq, err := NewQueue(QueueOpts[string]{
Comparator: strings.Compare,
Metrics: MetricsOpts[string]{
Labels: map[string]any{
Expand Down Expand Up @@ -52,18 +52,67 @@ func TestPriorityQueue(t *testing.T) {
g.Expect("priority_queue_duration_count").To(matchers.MatchCounter(1, "prefix", "i"))

g.Expect("priority_queue_size").To(matchers.MatchCounter(0))
}

type QueueItem struct {
Timestamp time.Time // Queued time
Obj map[string]any
}

func (t *QueueItem) Name() string {
return t.Obj["name"].(string)
}

func NewQueueItem(obj map[string]any) *QueueItem {
return &QueueItem{
Timestamp: time.Now(),
Obj: obj,
}
}

func TestPriorityQueue(t *testing.T) {
g := NewWithT(t)

pq, err := NewQueue(QueueOpts[*QueueItem]{
Metrics: MetricsOpts[*QueueItem]{
Name: "test",
},
Comparator: func(a, b *QueueItem) int {
return strings.Compare(a.Obj["name"].(string), b.Obj["name"].(string))
},
Dedupe: true,
Equals: func(a, b *QueueItem) bool {
return strings.EqualFold(a.Obj["name"].(string), b.Obj["name"].(string))
},
})
g.Expect(err).To(BeNil())
g.Expect(pq.Size()).To(BeZero())

names := []string{"bob", "foo", "bar", "eve", "baz", "alice", "bob"}
for _, name := range names {
pq.Enqueue(NewQueueItem(map[string]any{"name": name}))
}

g.Expect(pq.Size()).To(BeNumerically("==", len(names)))

expected := []string{"alice", "bar", "baz", "bob", "eve", "foo"}
for _, e := range expected {
g.Expect(first(pq.Peek()).Name()).To(Equal(e))
g.Expect(first(pq.Dequeue()).Name()).Should(Equal(e))
}

g.Expect(pq.Size()).To(BeZero())
}

func TestPriorityQueueDedupe(t *testing.T) {
g := NewWithT(t)

pq, err := New(QueueOpts[string]{
pq, err := NewQueue(QueueOpts[string]{
Equals: func(a, b string) bool { return a == b },
Dedupe: true,
Comparator: strings.Compare,
Metrics: MetricsOpts[string]{
MetricName: "dedupe_queue",
Name: "dedupe_queue",
}})

g.Expect(err).To(BeNil())
Expand Down Expand Up @@ -94,10 +143,10 @@ func TestPriorityQueueDedupe(t *testing.T) {

func TestPriorityQueueConcurrency(t *testing.T) {
g := NewWithT(t)
pq, err := New(QueueOpts[string]{
pq, err := NewQueue(QueueOpts[string]{
Comparator: strings.Compare,
Metrics: MetricsOpts[string]{
MetricName: "concurrent_queue",
Name: "concurrent_queue",
},
})
g.Expect(err).To(BeNil())
Expand Down Expand Up @@ -150,9 +199,8 @@ func TestPriorityQueueConcurrency(t *testing.T) {
g.Expect(pq.Size()).To(BeNumerically("==", 0))

t.Log("\n" + matchers.DumpMetrics("priority"))

}

func first[T1 any, T2 any](a T1, b T2) T1 {
func first[T1 any, T2 any](a T1, _ T2) T1 {
return a
}

0 comments on commit b1b4bfd

Please sign in to comment.