Skip to content
This repository was archived by the owner on Jun 12, 2024. It is now read-only.

Commit c47bf32

Browse files
authored
Add dispatch task handler (#42)
This handler allows to register other handlers and dispatch based on type of the task in the queue.
1 parent 627b323 commit c47bf32

File tree

2 files changed

+105
-0
lines changed

2 files changed

+105
-0
lines changed

pkg/queue/handlers/dispatcher.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package handlers
2+
3+
import (
4+
"context"
5+
6+
"github.com/contiamo/go-base/pkg/queue"
7+
"github.com/contiamo/go-base/pkg/queue/workers"
8+
"github.com/contiamo/go-base/pkg/tracing"
9+
"github.com/pkg/errors"
10+
"github.com/sirupsen/logrus"
11+
)
12+
13+
var (
14+
// ErrNoHandlerFound occurs when dispatcher can'f find a registered handler for a task type
15+
ErrNoHandlerFound = errors.New("no handler found")
16+
)
17+
18+
// NewDispatchHandler creates a task handler that will dispatch tasks to other handlers
19+
func NewDispatchHandler(handlers map[queue.TaskType]workers.TaskHandler) workers.TaskHandler {
20+
return &dispatchHandler{
21+
Tracer: tracing.NewTracer("handlers", "DispatchHandler"),
22+
handlers: handlers,
23+
}
24+
}
25+
26+
type dispatchHandler struct {
27+
tracing.Tracer
28+
handlers map[queue.TaskType]workers.TaskHandler
29+
}
30+
31+
func (h *dispatchHandler) Process(ctx context.Context, task queue.Task, heartbeats chan<- queue.Progress) (err error) {
32+
span, ctx := h.StartSpan(ctx, "Process")
33+
defer func() {
34+
h.FinishSpan(span, err)
35+
}()
36+
span.SetTag("task.id", task.ID)
37+
span.SetTag("task.queue", task.Queue)
38+
span.SetTag("task.type", task.Type)
39+
span.SetTag("task.spec", string(task.Spec))
40+
41+
logrus := logrus.WithField("type", task.Type)
42+
43+
logrus.Debug("dispatching task...")
44+
handler, ok := h.handlers[task.Type]
45+
if !ok {
46+
logrus.Error("there is no handler for this task type")
47+
return ErrNoHandlerFound
48+
}
49+
return handler.Process(ctx, task, heartbeats)
50+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package handlers
2+
3+
import (
4+
"context"
5+
"errors"
6+
"io/ioutil"
7+
"os"
8+
"testing"
9+
"time"
10+
11+
"github.com/contiamo/go-base/pkg/queue"
12+
"github.com/contiamo/go-base/pkg/queue/workers"
13+
"github.com/sirupsen/logrus"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
type errHandler struct {
18+
t *testing.T
19+
expTask queue.Task
20+
}
21+
22+
func (h errHandler) Process(ctx context.Context, task queue.Task, heartbeats chan<- queue.Progress) (err error) {
23+
require.Equal(h.t, h.expTask, task)
24+
return errors.New("invalid")
25+
}
26+
27+
func TestDispatcherProcess(t *testing.T) {
28+
logrus.SetOutput(ioutil.Discard)
29+
defer logrus.SetOutput(os.Stdout)
30+
31+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
32+
defer cancel()
33+
34+
t.Run("propagates the underlying handler's error", func(t *testing.T) {
35+
task := queue.Task{TaskBase: queue.TaskBase{Type: "test"}}
36+
37+
h := NewDispatchHandler(map[queue.TaskType]workers.TaskHandler{
38+
"test": errHandler{t: t, expTask: task},
39+
})
40+
41+
err := h.Process(ctx, task, nil)
42+
require.Error(t, err)
43+
require.Equal(t, "invalid", err.Error())
44+
})
45+
46+
t.Run("returns ErrNoHandlerFound when there is no handler", func(t *testing.T) {
47+
task := queue.Task{TaskBase: queue.TaskBase{Type: "test"}}
48+
49+
h := NewDispatchHandler(map[queue.TaskType]workers.TaskHandler{})
50+
51+
err := h.Process(ctx, task, nil)
52+
require.Error(t, err)
53+
require.Equal(t, ErrNoHandlerFound.Error(), err.Error())
54+
})
55+
}

0 commit comments

Comments
 (0)