Skip to content

Commit 983f36c

Browse files
authored
feat: introduce notifier (#71)
1 parent 94a1df1 commit 983f36c

File tree

9 files changed

+201
-12
lines changed

9 files changed

+201
-12
lines changed

cmd/engine.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
memorybroker "github.com/w-h-a/workflow/internal/engine/clients/broker/memory"
1414
"github.com/w-h-a/workflow/internal/engine/clients/broker/nats"
1515
"github.com/w-h-a/workflow/internal/engine/clients/broker/rabbit"
16+
"github.com/w-h-a/workflow/internal/engine/clients/notifier"
17+
"github.com/w-h-a/workflow/internal/engine/clients/notifier/local"
1618
"github.com/w-h-a/workflow/internal/engine/clients/readwriter"
1719
memoryreadwriter "github.com/w-h-a/workflow/internal/engine/clients/readwriter/memory"
1820
"github.com/w-h-a/workflow/internal/engine/clients/readwriter/postgres"
@@ -173,10 +175,12 @@ func StartEngine(ctx *cli.Context) error {
173175
var c *coordinator.Service
174176
if config.Mode() == "standalone" || config.Mode() == "coordinator" {
175177
readwriterClient := initReadWriter()
178+
notifierClient := initNotifier()
176179

177180
coordinatorServer, c = engine.NewCoordinator(
178181
brokerClient,
179182
readwriterClient,
183+
notifierClient,
180184
)
181185

182186
numServices += 2
@@ -338,3 +342,7 @@ func initReadWriter() readwriter.ReadWriter {
338342
return memoryreadwriter.NewReadWriter()
339343
}
340344
}
345+
346+
func initNotifier() notifier.Notifier {
347+
return local.NewNotifier()
348+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package notifier
2+
3+
type AlertType string
4+
5+
const (
6+
Success AlertType = "SUCCESS"
7+
Failure AlertType = "FAILURE"
8+
)
9+
10+
type Event struct {
11+
Source string
12+
Type AlertType
13+
Title string
14+
Payload map[string]any
15+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package local
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
7+
"github.com/w-h-a/workflow/internal/engine/clients/notifier"
8+
)
9+
10+
type localNotifier struct {
11+
options notifier.Options
12+
}
13+
14+
func (n *localNotifier) Notify(ctx context.Context, event notifier.Event, opts ...notifier.NotifyOption) error {
15+
slog.InfoContext(
16+
ctx,
17+
"NOTIFY EVENT",
18+
"title", event.Title,
19+
"source", event.Source,
20+
"type", event.Type,
21+
"payload", event.Payload,
22+
)
23+
24+
return nil
25+
}
26+
27+
func NewNotifier(opts ...notifier.Option) notifier.Notifier {
28+
options := notifier.NewOptions(opts...)
29+
30+
n := &localNotifier{
31+
options: options,
32+
}
33+
34+
return n
35+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package notifier
2+
3+
import "context"
4+
5+
type NotifierType string
6+
7+
const (
8+
Local NotifierType = "local"
9+
)
10+
11+
var (
12+
NotifierTypes = map[string]NotifierType{
13+
"local": Local,
14+
}
15+
)
16+
17+
type Notifier interface {
18+
Notify(ctx context.Context, event Event, opts ...NotifyOption) error
19+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package notifier
2+
3+
import "context"
4+
5+
type Option func(o *Options)
6+
7+
type Options struct {
8+
URL string
9+
Secret string
10+
Context context.Context
11+
}
12+
13+
func WithURL(url string) Option {
14+
return func(o *Options) {
15+
o.URL = url
16+
}
17+
}
18+
19+
func WithSecret(secret string) Option {
20+
return func(o *Options) {
21+
o.Secret = secret
22+
}
23+
}
24+
25+
func NewOptions(opts ...Option) Options {
26+
options := Options{
27+
Context: context.Background(),
28+
}
29+
30+
for _, fn := range opts {
31+
fn(&options)
32+
}
33+
34+
return options
35+
}
36+
37+
type NotifyOption func(o *NotifyOptions)
38+
39+
type NotifyOptions struct {
40+
Context context.Context
41+
}
42+
43+
func NewNotifyOptions(opts ...NotifyOption) NotifyOptions {
44+
options := NotifyOptions{
45+
Context: context.Background(),
46+
}
47+
48+
for _, fn := range opts {
49+
fn(&options)
50+
}
51+
52+
return options
53+
}

internal/engine/config/config.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/w-h-a/workflow/internal/engine/clients/broker"
12+
"github.com/w-h-a/workflow/internal/engine/clients/notifier"
1213
"github.com/w-h-a/workflow/internal/engine/clients/readwriter"
1314
"github.com/w-h-a/workflow/internal/engine/clients/runner"
1415
"github.com/w-h-a/workflow/internal/task"
@@ -40,6 +41,7 @@ type config struct {
4041
runnerPruneInterval time.Duration
4142
readwriter string
4243
readwriterLocation string
44+
notifier string
4345
}
4446

4547
func New() {
@@ -65,6 +67,7 @@ func New() {
6567
runnerPruneInterval: 24 * time.Hour,
6668
readwriter: "memory",
6769
readwriterLocation: "",
70+
notifier: "local",
6871
}
6972

7073
env := os.Getenv("ENV")
@@ -201,6 +204,15 @@ func New() {
201204
if len(readwriterLocation) > 0 {
202205
instance.readwriterLocation = readwriterLocation
203206
}
207+
208+
n := os.Getenv("NOTIFIER")
209+
if len(n) > 0 {
210+
if _, ok := notifier.NotifierTypes[n]; ok {
211+
instance.notifier = n
212+
} else {
213+
panic("unsupported notifier")
214+
}
215+
}
204216
})
205217
}
206218

internal/engine/engine.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/w-h-a/pkg/serverv2"
88
httpserver "github.com/w-h-a/pkg/serverv2/http"
99
"github.com/w-h-a/workflow/internal/engine/clients/broker"
10+
"github.com/w-h-a/workflow/internal/engine/clients/notifier"
1011
"github.com/w-h-a/workflow/internal/engine/clients/readwriter"
1112
"github.com/w-h-a/workflow/internal/engine/clients/runner"
1213
"github.com/w-h-a/workflow/internal/engine/config"
@@ -23,11 +24,13 @@ import (
2324
func NewCoordinator(
2425
brokerClient broker.Broker,
2526
readwriterClient readwriter.ReadWriter,
27+
notifierClient notifier.Notifier,
2628
) (serverv2.Server, *coordinator.Service) {
2729
// services
2830
coordinatorService := coordinator.New(
2931
brokerClient,
3032
readwriterClient,
33+
notifierClient,
3134
map[string]int{
3235
string(task.Started): 1,
3336
string(task.Completed): 1,

internal/engine/services/coordinator/service.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7+
"fmt"
78
"log/slog"
89
"strings"
910
"sync"
1011
"time"
1112

1213
"github.com/google/uuid"
1314
"github.com/w-h-a/workflow/internal/engine/clients/broker"
15+
"github.com/w-h-a/workflow/internal/engine/clients/notifier"
1416
"github.com/w-h-a/workflow/internal/engine/clients/reader"
1517
"github.com/w-h-a/workflow/internal/engine/clients/readwriter"
1618
"github.com/w-h-a/workflow/internal/task"
@@ -23,6 +25,7 @@ import (
2325
type Service struct {
2426
broker broker.Broker
2527
readwriter readwriter.ReadWriter
28+
notifier notifier.Notifier
2629
queues map[string]int
2730
locks map[string]*sync.RWMutex
2831
mtx sync.RWMutex
@@ -286,6 +289,19 @@ func (s *Service) handleTask(ctx context.Context, data []byte) error {
286289

287290
s.removeTaskLock(t.ID)
288291

292+
event := notifier.Event{
293+
Source: "workflow-coordinator",
294+
Type: notifier.Success,
295+
Title: fmt.Sprintf("Task %s Completed Successfully", t.ID),
296+
Payload: map[string]any{},
297+
}
298+
299+
if err := s.notifier.Notify(ctx, event); err != nil {
300+
child.RecordError(err)
301+
child.SetStatus(codes.Error, err.Error())
302+
return err
303+
}
304+
289305
child.SetStatus(codes.Ok, "task completed event handled")
290306

291307
return nil
@@ -301,6 +317,21 @@ func (s *Service) handleTask(ctx context.Context, data []byte) error {
301317

302318
s.removeTaskLock(t.ID)
303319

320+
event := notifier.Event{
321+
Source: "workflow-coordinator",
322+
Type: notifier.Failure,
323+
Title: fmt.Sprintf("Task %s Failed", t.ID),
324+
Payload: map[string]any{
325+
"error_msg": t.Error,
326+
},
327+
}
328+
329+
if err := s.notifier.Notify(ctx, event); err != nil {
330+
child.RecordError(err)
331+
child.SetStatus(codes.Error, err.Error())
332+
return err
333+
}
334+
304335
child.SetStatus(codes.Ok, "task failed event handled")
305336

306337
return nil
@@ -377,10 +408,11 @@ func (s *Service) removeTaskLock(id string) {
377408
delete(s.locks, id)
378409
}
379410

380-
func New(b broker.Broker, rw readwriter.ReadWriter, qs map[string]int) *Service {
411+
func New(b broker.Broker, rw readwriter.ReadWriter, n notifier.Notifier, qs map[string]int) *Service {
381412
return &Service{
382413
broker: b,
383414
readwriter: rw,
415+
notifier: n,
384416
queues: qs,
385417
locks: map[string]*sync.RWMutex{},
386418
mtx: sync.RWMutex{},

0 commit comments

Comments
 (0)