Skip to content

Commit dbbf074

Browse files
authored
feat: add better health checking (#46)
Impements comprehensive health checking across various internal client interfaces and their implementations.
1 parent 4409bdd commit dbbf074

File tree

14 files changed

+92
-27
lines changed

14 files changed

+92
-27
lines changed

internal/engine/clients/broker/broker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,6 @@ var (
1919
type Broker interface {
2020
Subscribe(ctx context.Context, callback func(ctx context.Context, data []byte) error, opts ...SubscribeOption) error
2121
Publish(ctx context.Context, data []byte, opts ...PublishOption) error
22+
CheckHealth(ctx context.Context) error
2223
Close(ctx context.Context) error
2324
}

internal/engine/clients/broker/memory/broker.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ func (b *memoryBroker) Publish(ctx context.Context, data []byte, opts ...broker.
7070
return nil
7171
}
7272

73+
func (b *memoryBroker) CheckHealth(ctx context.Context) error {
74+
return nil
75+
}
76+
7377
func (b *memoryBroker) Close(ctx context.Context) error {
7478
done := make(chan struct{})
7579

internal/engine/clients/broker/queues.go

Lines changed: 0 additions & 9 deletions
This file was deleted.

internal/engine/clients/broker/rabbit/broker.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,22 @@ func (b *rabbitBroker) Publish(ctx context.Context, data []byte, opts ...broker.
171171
return nil
172172
}
173173

174+
func (b *rabbitBroker) CheckHealth(ctx context.Context) error {
175+
conn, err := b.getConnection()
176+
if err != nil {
177+
return err
178+
}
179+
180+
ch, err := conn.Channel()
181+
if err != nil {
182+
return err
183+
}
184+
185+
defer ch.Close()
186+
187+
return nil
188+
}
189+
174190
func (b *rabbitBroker) Close(ctx context.Context) error {
175191
done := make(chan struct{})
176192

internal/engine/clients/reader/reader.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ import "context"
55
type Reader interface {
66
Read(ctx context.Context, opts ...ReadOption) (*Page, error)
77
ReadById(ctx context.Context, id string, opts ...ReadByIdOption) ([]byte, error)
8+
CheckHealth(ctx context.Context) error
89
}

internal/engine/clients/readwriter/memory/readwriter.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ func (rw *memoryReadWriter) ReadById(ctx context.Context, id string, opts ...rea
5757
return append([]byte{}, data...), nil
5858
}
5959

60+
func (rw *memoryReadWriter) CheckHealth(ctx context.Context) error {
61+
return nil
62+
}
63+
6064
func (rw *memoryReadWriter) Write(ctx context.Context, id string, data []byte, opts ...writer.WriteOption) error {
6165
rw.mtx.Lock()
6266
defer rw.mtx.Unlock()

internal/engine/clients/readwriter/postgres/readwriter.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ func (rw *postgresReadWriter) ReadById(ctx context.Context, id string, opts ...r
102102
return record.Value, nil
103103
}
104104

105+
func (rw *postgresReadWriter) CheckHealth(ctx context.Context) error {
106+
return rw.conn.PingContext(ctx)
107+
}
108+
105109
func (rw *postgresReadWriter) Write(ctx context.Context, id string, data []byte, opts ...writer.WriteOption) error {
106110
if _, err := rw.conn.ExecContext(
107111
ctx,

internal/engine/clients/runner/docker/runner.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@ func (r *dockerRunner) DeleteVolume(ctx context.Context, opts ...runner.DeleteVo
175175
return nil
176176
}
177177

178+
func (r *dockerRunner) CheckHealth(ctx context.Context) error {
179+
_, err := r.client.Ping(ctx)
180+
return err
181+
}
182+
178183
func (r *dockerRunner) Close(ctx context.Context) error {
179184
done := make(chan struct{})
180185

internal/engine/clients/runner/runner.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@ type Runner interface {
1818
Run(ctx context.Context, opts ...RunOption) (string, error)
1919
CreateVolume(ctx context.Context, opts ...CreateVolumeOption) error
2020
DeleteVolume(ctx context.Context, opts ...DeleteVolumeOption) error
21+
CheckHealth(ctx context.Context) error
2122
Close(ctx context.Context) error
2223
}

internal/engine/config/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/w-h-a/workflow/internal/engine/clients/broker"
1111
"github.com/w-h-a/workflow/internal/engine/clients/readwriter"
1212
"github.com/w-h-a/workflow/internal/engine/clients/runner"
13+
"github.com/w-h-a/workflow/internal/task"
1314
)
1415

1516
var (
@@ -44,7 +45,7 @@ func New() {
4445
mode: "standalone",
4546
coordinatorHttp: ":4000",
4647
workerHttp: ":4001",
47-
workerQueues: map[string]int{broker.SCHEDULED: 1, broker.CANCELLED: 1},
48+
workerQueues: map[string]int{string(task.Scheduled): 1, string(task.Cancelled): 1},
4849
broker: "memory",
4950
brokerLocation: "",
5051
runner: "docker",

0 commit comments

Comments
 (0)