diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index e18a6393eb..fb186944ab 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -5,6 +5,7 @@ import ( "math/rand/v2" "sync" "testing" + "testing/synctest" "time" fuzz "github.com/google/gofuzz" @@ -154,143 +155,6 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.adds["test"]).To(Equal(1)) }) - It("returns an item only after after has passed", func() { - q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() - defer q.ShutDown() - - originalTick := q.tick - q.tick = func(d time.Duration) <-chan time.Time { - Expect(d).To(Equal(time.Second)) - return originalTick(d) - } - - retrievedItem := make(chan struct{}) - - go func() { - defer GinkgoRecover() - q.GetWithPriority() - close(retrievedItem) - }() - - q.AddWithOpts(AddOpts{After: time.Second}, "foo") - - Consistently(retrievedItem).ShouldNot(BeClosed()) - - forwardQueueTimeBy(time.Second) - Eventually(retrievedItem).Should(BeClosed()) - - Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) - Expect(metrics.adds["test"]).To(Equal(1)) - Expect(metrics.retries["test"]).To(Equal(1)) - }) - - It("returns high priority item that became ready before low priority item", func() { - q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() - defer q.ShutDown() - - tickSetup := make(chan any) - originalTick := q.tick - q.tick = func(d time.Duration) <-chan time.Time { - Expect(d).To(Equal(time.Second)) - close(tickSetup) - return originalTick(d) - } - - lowPriority := -100 - highPriority := 0 - q.AddWithOpts(AddOpts{After: 0, Priority: &lowPriority}, "foo") - q.AddWithOpts(AddOpts{After: time.Second, Priority: &highPriority}, "prio") - - Eventually(tickSetup).Should(BeClosed()) - - forwardQueueTimeBy(1 * time.Second) - key, prio, _ := q.GetWithPriority() - - Expect(key).To(Equal("prio")) - Expect(prio).To(Equal(0)) - Expect(metrics.depth["test"]).To(Equal(map[int]int{-100: 1, 0: 0})) - Expect(metrics.adds["test"]).To(Equal(2)) - Expect(metrics.retries["test"]).To(Equal(1)) - }) - - It("returns an item to a waiter as soon as it has one", func() { - q, metrics := newQueue() - defer q.ShutDown() - - retrieved := make(chan struct{}) - go func() { - defer GinkgoRecover() - item, _, _ := q.GetWithPriority() - Expect(item).To(Equal("foo")) - close(retrieved) - }() - - // We are waiting for the GetWithPriority() call to be blocked - // on retrieving an item. As golang doesn't provide a way to - // check if something is listening on a channel without - // sending them a message, I can't think of a way to do this - // without sleeping. - time.Sleep(time.Second) - q.AddWithOpts(AddOpts{}, "foo") - Eventually(retrieved).Should(BeClosed()) - - Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) - Expect(metrics.adds["test"]).To(Equal(1)) - }) - - It("returns multiple items with after in correct order", func() { - q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() - defer q.ShutDown() - - originalTick := q.tick - q.tick = func(d time.Duration) <-chan time.Time { - // What a bunch of bs. Deferring in here causes - // ginkgo to deadlock, presumably because it - // never returns after the defer. Not deferring - // hides the actual assertion result and makes - // it complain that there should be a defer. - // Move the assertion into a goroutine just to - // get around that mess. - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - defer close(done) - - // This is not deterministic and depends on which of - // Add() or Spin() gets the lock first. - Expect(d).To(Or(Equal(200*time.Millisecond), Equal(time.Second))) - }() - <-done - return originalTick(d) - } - - retrievedItem := make(chan struct{}) - retrievedSecondItem := make(chan struct{}) - - go func() { - defer GinkgoRecover() - first, _, _ := q.GetWithPriority() - Expect(first).To(Equal("bar")) - close(retrievedItem) - - second, _, _ := q.GetWithPriority() - Expect(second).To(Equal("foo")) - close(retrievedSecondItem) - }() - - q.AddWithOpts(AddOpts{After: time.Second}, "foo") - q.AddWithOpts(AddOpts{After: 200 * time.Millisecond}, "bar") - - Consistently(retrievedItem).ShouldNot(BeClosed()) - - forwardQueueTimeBy(time.Second) - Eventually(retrievedItem).Should(BeClosed()) - Eventually(retrievedSecondItem).Should(BeClosed()) - - Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) - Expect(metrics.adds["test"]).To(Equal(2)) - }) - It("doesn't include non-ready items in Len()", func() { q, metrics := newQueue() defer q.ShutDown() @@ -325,79 +189,6 @@ var _ = Describe("Controllerworkqueue", func() { Expect(isShutDown).To(BeTrue()) }) - It("Get from priority queue should get unblocked when the priority queue is shut down", func() { - q, _ := newQueue() - - getUnblocked := make(chan struct{}) - - go func() { - defer GinkgoRecover() - defer close(getUnblocked) - - item, priority, isShutDown := q.GetWithPriority() - Expect(item).To(Equal("")) - Expect(priority).To(Equal(0)) - Expect(isShutDown).To(BeTrue()) - }() - - // Verify the go routine above is now waiting for an item. - Eventually(q.waiters.Load).Should(Equal(int64(1))) - Consistently(getUnblocked).ShouldNot(BeClosed()) - - // shut down - q.ShutDown() - - // Verify the shutdown unblocked the go routine. - Eventually(getUnblocked).Should(BeClosed()) - }) - - It("items are included in Len() and the queueDepth metric once they are ready", func() { - q, metrics := newQueue() - defer q.ShutDown() - - q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "foo") - q.AddWithOpts(AddOpts{}, "baz") - q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "bar") - q.AddWithOpts(AddOpts{}, "bal") - - Expect(q.Len()).To(Equal(2)) - metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 2})) - metrics.mu.Unlock() - time.Sleep(time.Second) - Expect(q.Len()).To(Equal(4)) - metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 4})) - metrics.mu.Unlock() - - // Drain queue - for range 4 { - item, _ := q.Get() - q.Done(item) - } - Expect(q.Len()).To(Equal(0)) - metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) - metrics.mu.Unlock() - - // Validate that doing it again still works to notice bugs with removing - // it from the queues becameReady tracking. - q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "foo") - q.AddWithOpts(AddOpts{}, "baz") - q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "bar") - q.AddWithOpts(AddOpts{}, "bal") - - Expect(q.Len()).To(Equal(2)) - metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 2})) - metrics.mu.Unlock() - time.Sleep(time.Second) - Expect(q.Len()).To(Equal(4)) - metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 4})) - metrics.mu.Unlock() - }) - It("returns many items", func() { // This test ensures the queue is able to drain a large queue without panic'ing. // In a previous version of the code we were calling queue.Delete within q.Ascend @@ -460,87 +251,6 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) metrics.mu.Unlock() }) - - It("Updates metrics correctly for an item whose requeueAfter expired that gets added again without requeueAfter", func() { - q, metrics := newQueue() - defer q.ShutDown() - - q.AddWithOpts(AddOpts{After: 50 * time.Millisecond}, "foo") - time.Sleep(100 * time.Millisecond) - - Expect(q.Len()).To(Equal(1)) - metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 1})) - metrics.mu.Unlock() - - q.AddWithOpts(AddOpts{}, "foo") - Expect(q.Len()).To(Equal(1)) - metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 1})) - metrics.mu.Unlock() - - // Get the item to ensure the codepath in - // `spin` for the metrics is passed by so - // that this starts failing if it incorrectly - // calls `metrics.add` again. - item, _ := q.Get() - Expect(item).To(Equal("foo")) - Expect(q.Len()).To(Equal(0)) - metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) - metrics.mu.Unlock() - }) - - It("When adding items with rateLimit, previous items' rateLimit should not affect subsequent items", func() { - q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() - defer q.ShutDown() - - q.rateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second) - originalTick := q.tick - q.tick = func(d time.Duration) <-chan time.Time { - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - defer close(done) - - Expect(d).To(Or(Equal(5*time.Millisecond), Equal(635*time.Millisecond))) - }() - <-done - return originalTick(d) - } - - retrievedItem := make(chan struct{}) - retrievedSecondItem := make(chan struct{}) - - go func() { - defer GinkgoRecover() - first, _, _ := q.GetWithPriority() - Expect(first).To(Equal("foo")) - close(retrievedItem) - - second, _, _ := q.GetWithPriority() - Expect(second).To(Equal("bar")) - close(retrievedSecondItem) - }() - - // after 7 calls, the next When("bar") call will return 640ms. - for range 7 { - q.rateLimiter.When("bar") - } - q.AddWithOpts(AddOpts{RateLimited: true}, "foo", "bar") - - Consistently(retrievedItem).ShouldNot(BeClosed()) - forwardQueueTimeBy(5 * time.Millisecond) - Eventually(retrievedItem).Should(BeClosed()) - - Consistently(retrievedSecondItem).ShouldNot(BeClosed()) - forwardQueueTimeBy(635 * time.Millisecond) - Eventually(retrievedSecondItem).Should(BeClosed()) - - Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) - Expect(metrics.adds["test"]).To(Equal(2)) - Expect(metrics.retries["test"]).To(Equal(2)) - }) }) func BenchmarkAddGetDone(b *testing.B) { @@ -773,3 +483,330 @@ func (b *btreeInteractionValidator) Delete(item *item[string]) (*item[string], b } return old, existed } + +func TestItemIsOnlyReturnedAfterAfterHasPassed(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + g := NewWithT(t) + q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() + defer q.ShutDown() + + originalTick := q.tick + q.tick = func(d time.Duration) <-chan time.Time { + g.Expect(d).To(Equal(time.Second)) + return originalTick(d) + } + + retrievedItem := make(chan struct{}) + go func() { + q.GetWithPriority() + close(retrievedItem) + }() + + q.AddWithOpts(AddOpts{After: time.Second}, "foo") + synctest.Wait() + + g.Expect(retrievedItem).ShouldNot(BeClosed()) + + forwardQueueTimeBy(time.Second) + synctest.Wait() + g.Expect(retrievedItem).Should(BeClosed()) + + g.Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) + g.Expect(metrics.adds["test"]).To(Equal(1)) + g.Expect(metrics.retries["test"]).To(Equal(1)) + }) +} + +func TestHighPriorityItemThatBecameReadyIsReturnedBeforeLowPriorityItem(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + g := NewWithT(t) + + q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() + defer q.ShutDown() + + tickSetup := make(chan any) + originalTick := q.tick + q.tick = func(d time.Duration) <-chan time.Time { + g.Expect(d).To(Equal(time.Second)) + close(tickSetup) + return originalTick(d) + } + + lowPriority := -100 + highPriority := 0 + q.AddWithOpts(AddOpts{After: 0, Priority: &lowPriority}, "foo") + q.AddWithOpts(AddOpts{After: time.Second, Priority: &highPriority}, "prio") + synctest.Wait() + + g.Expect(tickSetup).To(BeClosed()) + + forwardQueueTimeBy(1 * time.Second) + key, prio, _ := q.GetWithPriority() + + g.Expect(key).To(Equal("prio")) + g.Expect(prio).To(Equal(0)) + g.Expect(metrics.depth["test"]).To(Equal(map[int]int{-100: 1, 0: 0})) + g.Expect(metrics.adds["test"]).To(Equal(2)) + g.Expect(metrics.retries["test"]).To(Equal(1)) + }) +} + +func TestItemIsReturnedAsSoonAsPossible(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + g := NewWithT(t) + + q, metrics := newQueue() + defer q.ShutDown() + + retrieved := make(chan struct{}) + go func() { + item, _, _ := q.GetWithPriority() + g.Expect(item).To(Equal("foo")) + close(retrieved) + }() + synctest.Wait() // Wait for the above goroutine to be blocked + + q.AddWithOpts(AddOpts{}, "foo") + synctest.Wait() // Wait until the priorityqueue and the above goroutine finish running + + g.Expect(retrieved).Should(BeClosed()) + + g.Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) + g.Expect(metrics.adds["test"]).To(Equal(1)) + }) +} + +func TestMultipleItemsWithAfterAreReturnedInCorrectOrder(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + g := NewWithT(t) + + q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() + defer q.ShutDown() + + originalTick := q.tick + q.tick = func(d time.Duration) <-chan time.Time { + // This is not deterministic and depends on which of + // Add() or Spin() gets the lock first. + g.Expect(d).To(Or(Equal(200*time.Millisecond), Equal(time.Second))) + return originalTick(d) + } + + retrievedItem := make(chan struct{}) + retrievedSecondItem := make(chan struct{}) + + go func() { + first, _, _ := q.GetWithPriority() + g.Expect(first).To(Equal("bar")) + close(retrievedItem) + + second, _, _ := q.GetWithPriority() + g.Expect(second).To(Equal("foo")) + close(retrievedSecondItem) + }() + + q.AddWithOpts(AddOpts{After: time.Second}, "foo") + q.AddWithOpts(AddOpts{After: 200 * time.Millisecond}, "bar") + synctest.Wait() // Block until the adds are processed + + g.Expect(retrievedItem).NotTo(BeClosed()) + + forwardQueueTimeBy(time.Second) + synctest.Wait() // Block until the priorityqueue finished processing + + g.Expect(retrievedItem).Should(BeClosed()) + g.Expect(retrievedSecondItem).Should(BeClosed()) + + g.Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) + g.Expect(metrics.adds["test"]).To(Equal(2)) + }) +} + +func TestGetFromPriorityQueueIsUnblockedOnShutdown(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + g := NewWithT(t) + + q, _ := newQueue() + + getUnblocked := make(chan struct{}) + + go func() { + defer close(getUnblocked) + + item, priority, isShutDown := q.GetWithPriority() + g.Expect(item).To(Equal("")) + g.Expect(priority).To(Equal(0)) + g.Expect(isShutDown).To(BeTrue()) + }() + synctest.Wait() // Wait for the above goroutine to be blocked + + g.Expect(getUnblocked).NotTo(BeClosed()) + + // shut down + q.ShutDown() + synctest.Wait() + + // Verify the shutdown unblocked the go routine. + g.Expect(getUnblocked).To(BeClosed()) + }) +} + +func TestItemsAreInludedInLenAndMetricsOnceTheyAreReady(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + g := NewWithT(t) + + q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "foo") + q.AddWithOpts(AddOpts{}, "baz") + q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "bar") + q.AddWithOpts(AddOpts{}, "bal") + // Block here until spin finished, otherwise it is possible it + // checks now() after forwardQueueTimeBy updated it, does then + // not listen on tick and causes the write to tick from forwardQueueTimeBy + // to lock up the test. + synctest.Wait() + + g.Expect(q.Len()).To(Equal(2)) + metrics.mu.Lock() + g.Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 2})) + metrics.mu.Unlock() + + forwardQueueTimeBy(time.Second) + synctest.Wait() + + g.Expect(q.Len()).To(Equal(4)) + metrics.mu.Lock() + g.Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 4})) + metrics.mu.Unlock() + + // Drain queue + for range 4 { + item, _ := q.Get() + q.Done(item) + } + g.Expect(q.Len()).To(Equal(0)) + metrics.mu.Lock() + g.Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) + metrics.mu.Unlock() + + // Validate that doing it again still works to notice bugs with removing + // it from the queues becameReady tracking. + q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "foo") + q.AddWithOpts(AddOpts{}, "baz") + q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "bar") + q.AddWithOpts(AddOpts{}, "bal") + // Block here until spin finished, otherwise it is possible it + // checks now() after forwardQueueTimeBy updated it, does then + // not listen on tick and causes the write to tick from forwardQueueTimeBy + // to lock up the test. + synctest.Wait() + + g.Expect(q.Len()).To(Equal(2)) + metrics.mu.Lock() + g.Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 2})) + metrics.mu.Unlock() + + forwardQueueTimeBy(time.Second) + synctest.Wait() + + g.Expect(q.Len()).To(Equal(4)) + metrics.mu.Lock() + g.Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 4})) + metrics.mu.Unlock() + }) +} + +func TestMetricsAreUpdatedForItemWhoseRequeueAfterExpiredThatGetsAddedAgainWithoutRequeueAfter(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + g := NewWithT(t) + + q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{After: 50 * time.Millisecond}, "foo") + synctest.Wait() + forwardQueueTimeBy(50 * time.Millisecond) + synctest.Wait() + + g.Expect(q.Len()).To(Equal(1)) + metrics.mu.Lock() + g.Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 1})) + metrics.mu.Unlock() + + q.AddWithOpts(AddOpts{}, "foo") + g.Expect(q.Len()).To(Equal(1)) + metrics.mu.Lock() + g.Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 1})) + metrics.mu.Unlock() + + // Get the item to ensure the codepath in + // `spin` for the metrics is passed by so + // that this starts failing if it incorrectly + // calls `metrics.add` again. + item, _ := q.Get() + g.Expect(item).To(Equal("foo")) + g.Expect(q.Len()).To(Equal(0)) + metrics.mu.Lock() + g.Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) + metrics.mu.Unlock() + }) +} + +func TesWhenAddingMultipleItemsWithRatelimitTrueTheyDontAffectEachOther(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + g := NewWithT(t) + + q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() + defer q.ShutDown() + + q.rateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second) + originalTick := q.tick + q.tick = func(d time.Duration) <-chan time.Time { + g.Expect(d).To(Or(Equal(5*time.Millisecond), Equal(635*time.Millisecond))) + return originalTick(d) + } + + retrievedItem := make(chan struct{}) + retrievedSecondItem := make(chan struct{}) + + go func() { + first, _, _ := q.GetWithPriority() + g.Expect(first).To(Equal("foo")) + close(retrievedItem) + + second, _, _ := q.GetWithPriority() + g.Expect(second).To(Equal("bar")) + close(retrievedSecondItem) + }() + + // after 7 calls, the next When("bar") call will return 640ms. + for range 7 { + q.rateLimiter.When("bar") + } + q.AddWithOpts(AddOpts{RateLimited: true}, "foo", "bar") + synctest.Wait() // Block until the adds are processed + g.Expect(retrievedItem).NotTo(BeClosed()) + + forwardQueueTimeBy(5 * time.Millisecond) + synctest.Wait() + g.Expect(retrievedItem).NotTo(BeClosed()) + g.Expect(retrievedSecondItem).NotTo(BeClosed()) + + forwardQueueTimeBy(635 * time.Millisecond) + synctest.Wait() + g.Expect(retrievedSecondItem).To(BeClosed()) + + g.Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) + g.Expect(metrics.adds["test"]).To(Equal(2)) + g.Expect(metrics.retries["test"]).To(Equal(2)) + }) +}