Skip to content
This repository was archived by the owner on Jun 12, 2024. It is now read-only.

Commit 7a4d4a3

Browse files
authored
Close heartbeats in dispatcher when unknown task type (#57)
This will unblock the queue in case the task type is not handled by anything.
1 parent ed88685 commit 7a4d4a3

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

pkg/queue/handlers/dispatcher.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ func (h *dispatchHandler) Process(ctx context.Context, task queue.Task, heartbea
4444
handler, ok := h.handlers[task.Type]
4545
if !ok {
4646
logrus.Error("there is no handler for this task type")
47+
close(heartbeats)
4748
return ErrNoHandlerFound
4849
}
4950
return handler.Process(ctx, task, heartbeats)

pkg/queue/handlers/dispatcher_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,19 @@ func TestDispatcherProcess(t *testing.T) {
4343
require.Equal(t, "invalid", err.Error())
4444
})
4545

46-
t.Run("returns ErrNoHandlerFound when there is no handler", func(t *testing.T) {
46+
t.Run("returns ErrNoHandlerFound when there is no handler and closes heartbeats", func(t *testing.T) {
4747
task := queue.Task{TaskBase: queue.TaskBase{Type: "test"}}
4848

4949
h := NewDispatchHandler(map[queue.TaskType]workers.TaskHandler{})
5050

51-
err := h.Process(ctx, task, nil)
51+
heartbeats := make(chan queue.Progress)
52+
err := h.Process(ctx, task, heartbeats)
53+
<-heartbeats
54+
require.PanicsWithError(t, "send on closed channel", func() {
55+
heartbeats <- queue.Progress{}
56+
})
5257
require.Error(t, err)
5358
require.Equal(t, ErrNoHandlerFound.Error(), err.Error())
59+
5460
})
5561
}

0 commit comments

Comments
 (0)