Skip to content

Commit

Permalink
feat: add QueueWithDelay option in priorityqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
yashmehrotra authored and moshloop committed Dec 17, 2024
1 parent 4b33afe commit e5bae40
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 3 deletions.
44 changes: 41 additions & 3 deletions collections/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,9 @@ func getOrCreateHistogramVec(prefix, suffix, help string, keys []string, buckets
}

type queueItem[T comparable] struct {
item T
inserted time.Time
item T
inserted time.Time
notBefore time.Time
}

// Assert Queue implementation
Expand Down Expand Up @@ -208,6 +209,10 @@ func NewQueue[T comparable](opts QueueOpts[T]) (*Queue[T], error) {

return &Queue[T]{
heap: binaryheap.NewWith(func(a, b queueItem[T]) int {
nbc := notBeforeComparator(a, b)
if nbc != 0 {
return nbc
}
return opts.Comparator(a.item, b.item)
}),
Comparator: opts.Comparator,
Expand All @@ -217,6 +222,18 @@ func NewQueue[T comparable](opts QueueOpts[T]) (*Queue[T], error) {
}, nil
}

func notBeforeComparator[T comparable](a, b queueItem[T]) int {
if a.notBefore.IsZero() && b.notBefore.IsZero() {
return 0
}

if a.notBefore.Sub(b.notBefore) > 0 {
return 1
} else {
return -1
}
}

// Enqueue adds a value to the end of the queue
func (queue *Queue[T]) Enqueue(value T) {
queue.mutex.Lock()
Expand All @@ -228,6 +245,18 @@ func (queue *Queue[T]) Enqueue(value T) {
queue.mutex.Unlock()
}

// Enqueue adds a value to the end of the queue
func (queue *Queue[T]) EnqueueWithDelay(value T, delay time.Duration) {
queue.mutex.Lock()
queue.heap.Push(queueItem[T]{
item: value,
inserted: time.Now(),
notBefore: time.Now().Add(delay),
})
queue.metrics.enqueue(value, queue.heap.Size())
queue.mutex.Unlock()
}

type Equals[T any] interface {
Equals(T) bool
}
Expand All @@ -238,9 +267,18 @@ func (queue *Queue[T]) Dequeue() (T, bool) {
queue.mutex.Lock()
defer queue.mutex.Unlock()

var zero T

// Peek for notBefore
v, _ := queue.heap.Peek()
if !v.notBefore.IsZero() {
if v.notBefore.Sub(time.Now()) < 0 {
return zero, false
}
}

wrapper, ok := queue.heap.Pop()
if !ok {
var zero T
return zero, false
}

Expand Down
31 changes: 31 additions & 0 deletions collections/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,37 @@ func TestPriorityQueueConcurrency(t *testing.T) {
t.Log("\n" + matchers.DumpMetrics("priority"))
}

func TestPriorityQueueWithDelay(t *testing.T) {
g := NewWithT(t)

pq, err := NewQueue(QueueOpts[string]{
Equals: func(a, b string) bool { return a == b },
Comparator: strings.Compare,
Metrics: MetricsOpts[string]{
Disable: true,
},
})

g.Expect(err).To(BeNil())
g.Expect(pq.Size()).To(BeNumerically("==", 0))

pq.EnqueueWithDelay("item1-delay-10s", 10*time.Second)
pq.EnqueueWithDelay("item1-delay-05s", 5*time.Second)
pq.Enqueue("item1")

var items []string
for {
if pq.Size() == 0 {
break
}

item, _ := pq.Dequeue()
items = append(items, item)
}

g.Expect(items).To(Equal([]string{"item1", "item1-delay-05s", "item1-delay-10s"}))
}

func first[T1 any, T2 any](a T1, _ T2) T1 {
return a
}

0 comments on commit e5bae40

Please sign in to comment.