Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 133 additions & 49 deletions pkg/controller/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package priorityqueue
import (
"fmt"
"math/rand/v2"
"strconv"
"sync"
"testing"
"testing/synctest"
Expand Down Expand Up @@ -103,6 +104,30 @@ var _ = Describe("Controllerworkqueue", func() {
Expect(metrics.adds["test"]).To(Equal(1))
})

It("enqueues a locked item", func() {
q, metrics := newQueue()
defer q.ShutDown()

q.AddWithOpts(AddOpts{}, "foo")
Expect(q.Len()).To(Equal(1))
item, priority, shutdown := q.GetWithPriority()
Expect(item).To(Equal("foo"))
Expect(priority).To(Equal(0))
Expect(shutdown).To(BeFalse())

q.AddWithOpts(AddOpts{}, "foo")
Expect(q.Len()).To(Equal(1))
q.Done("foo")

item, priority, shutdown = q.GetWithPriority()
Expect(item).To(Equal("foo"))
Expect(priority).To(Equal(0))
Expect(shutdown).To(BeFalse())

Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0}))
Expect(metrics.adds["test"]).To(Equal(2))
})

It("retains the highest priority", func() {
q, metrics := newQueue()
defer q.ShutDown()
Expand All @@ -120,6 +145,23 @@ var _ = Describe("Controllerworkqueue", func() {
Expect(metrics.adds["test"]).To(Equal(1))
})

It("will not decrease the priority", func() {
q, metrics := newQueue()
defer q.ShutDown()

q.AddWithOpts(AddOpts{Priority: ptr.To(2)}, "foo")
q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "foo")

item, priority, _ := q.GetWithPriority()
Expect(item).To(Equal("foo"))
Expect(priority).To(Equal(2))

Expect(q.Len()).To(Equal(0))

Expect(metrics.depth["test"]).To(Equal(map[int]int{2: 0}))
Expect(metrics.adds["test"]).To(Equal(1))
})

It("gets pushed to the front if the priority increases", func() {
q, metrics := newQueue()
defer q.ShutDown()
Expand Down Expand Up @@ -251,6 +293,35 @@ var _ = Describe("Controllerworkqueue", func() {
Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0}))
metrics.mu.Unlock()
})

It("updates metrics correctly when an item with after is re-added with higher priority and no after", func() {
q, metrics := newQueue()
defer q.ShutDown()

q.AddWithOpts(AddOpts{After: time.Hour, Priority: ptr.To(0)}, "foo")
Expect(q.Len()).To(Equal(0))
metrics.mu.Lock()
Expect(metrics.depth["test"]).To(Equal(map[int]int{}))
metrics.mu.Unlock()

q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, "foo")

Expect(q.Len()).To(Equal(1))
metrics.mu.Lock()
Expect(metrics.depth["test"]).To(Equal(map[int]int{1: 1}))
metrics.mu.Unlock()

item, priority, _ := q.GetWithPriority()
Expect(item).To(Equal("foo"))
Expect(priority).To(Equal(1))
Expect(q.Len()).To(Equal(0))

metrics.mu.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like unlock is missing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the test ends after checking on the metrics, so there is no need to unlock, it will gets gced there anyways

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, got that. Just seems cleaner to unlock and not leave that locked

Expect(metrics.depth["test"][1]).To(Equal(0))
for _, depth := range metrics.depth["test"] {
Expect(depth).To(Equal(0))
}
})
})

func BenchmarkAddGetDone(b *testing.B) {
Expand Down Expand Up @@ -443,56 +514,69 @@ func newQueueWithTimeForwarder() (_ *priorityqueue[string], _ *fakeMetricsProvid

func TestHighPriorityItemsAreReturnedBeforeLowPriorityItemMultipleTimes(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
g := NewWithT(t)

q, metrics := newQueue()
defer q.ShutDown()

const itemsPerPriority = 1000
lowPriority := 0
lowMiddlePriority := 5
middlePriority := 10
upperMiddlePriority := 15
highPriority := 20
for i := range itemsPerPriority {
q.AddWithOpts(AddOpts{Priority: &highPriority}, fmt.Sprintf("high-%d", i))
q.AddWithOpts(AddOpts{Priority: &upperMiddlePriority}, fmt.Sprintf("upperMiddle-%d", i))
q.AddWithOpts(AddOpts{Priority: &middlePriority}, fmt.Sprintf("middle-%d", i))
q.AddWithOpts(AddOpts{Priority: &lowMiddlePriority}, fmt.Sprintf("lowMiddle-%d", i))
q.AddWithOpts(AddOpts{Priority: &lowPriority}, fmt.Sprintf("low-%d", i))
}
synctest.Wait()
for _, after := range []time.Duration{-time.Second, 0, time.Second} {
t.Run(fmt.Sprintf("after=%v", after), func(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
g := NewWithT(t)

q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder()
defer q.ShutDown()

const itemsPerPriority = 1000
lowPriority := -10
lowMiddlePriority := -5
middlePriority := 0
upperMiddlePriority := 5
highPriority := 10
for i := range itemsPerPriority {
q.AddWithOpts(AddOpts{Priority: &highPriority, After: after}, fmt.Sprintf("high-%d", i))
q.AddWithOpts(AddOpts{Priority: &upperMiddlePriority, After: after}, fmt.Sprintf("upperMiddle-%d", i))
q.AddWithOpts(AddOpts{Priority: &middlePriority, After: after}, fmt.Sprintf("middle-%d", i))
q.AddWithOpts(AddOpts{Priority: &lowMiddlePriority, After: after}, fmt.Sprintf("lowMiddle-%d", i))
q.AddWithOpts(AddOpts{Priority: &lowPriority, After: after}, fmt.Sprintf("low-%d", i))
}
synctest.Wait()
if after > 0 {
forwardQueueTimeBy(after)
synctest.Wait()
}

for range itemsPerPriority {
key, prio, _ := q.GetWithPriority()
g.Expect(prio).To(Equal(highPriority))
g.Expect(key).To(HavePrefix("high-"))
}
for range itemsPerPriority {
key, prio, _ := q.GetWithPriority()
g.Expect(prio).To(Equal(upperMiddlePriority))
g.Expect(key).To(HavePrefix("upperMiddle-"))
}
for range itemsPerPriority {
key, prio, _ := q.GetWithPriority()
g.Expect(prio).To(Equal(middlePriority))
g.Expect(key).To(HavePrefix("middle-"))
}
for range itemsPerPriority {
key, prio, _ := q.GetWithPriority()
g.Expect(prio).To(Equal(lowMiddlePriority))
g.Expect(key).To(HavePrefix("lowMiddle-"))
}
for range itemsPerPriority {
key, prio, _ := q.GetWithPriority()
g.Expect(prio).To(Equal(lowPriority))
g.Expect(key).To(HavePrefix("low-"))
}
g.Expect(metrics.depth["test"]).To(Equal(map[int]int{10: 0, 5: 0, 0: 0, 20: 0, 15: 0}))
g.Expect(metrics.adds["test"]).To(Equal(itemsPerPriority * 5))
g.Expect(metrics.retries["test"]).To(Equal(0))
})
for i := range itemsPerPriority {
key, prio, _ := q.GetWithPriority()
g.Expect(prio).To(Equal(highPriority))
g.Expect(key).To(Equal("high-" + strconv.Itoa(i)))
}
for i := range itemsPerPriority {
key, prio, _ := q.GetWithPriority()
g.Expect(prio).To(Equal(upperMiddlePriority))
g.Expect(key).To(Equal("upperMiddle-" + strconv.Itoa(i)))
}
for i := range itemsPerPriority {
key, prio, _ := q.GetWithPriority()
g.Expect(prio).To(Equal(middlePriority))
g.Expect(key).To(Equal("middle-" + strconv.Itoa(i)))
}
for i := range itemsPerPriority {
key, prio, _ := q.GetWithPriority()
g.Expect(prio).To(Equal(lowMiddlePriority))
g.Expect(key).To(Equal("lowMiddle-" + strconv.Itoa(i)))
}
for i := range itemsPerPriority {
key, prio, _ := q.GetWithPriority()
g.Expect(prio).To(Equal(lowPriority))
g.Expect(key).To(Equal("low-" + strconv.Itoa(i)))
}
g.Expect(metrics.depth["test"]).To(Equal(map[int]int{-10: 0, -5: 0, 0: 0, 5: 0, 10: 0}))
g.Expect(metrics.adds["test"]).To(Equal(itemsPerPriority * 5))
expectedRetries := 0
if after > 0 {
expectedRetries = itemsPerPriority * 5
}
g.Expect(metrics.retries["test"]).To(Equal(expectedRetries))
})
})
}
}

func newQueue() (*priorityqueue[string], *fakeMetricsProvider) {
Expand Down
Loading