Skip to content

Commit 831e224

Browse files
authored
feat: ensure shutdowns are graceful (#42)
1 parent 6e1bc38 commit 831e224

File tree

11 files changed

+361
-149
lines changed

11 files changed

+361
-149
lines changed

cmd/engine.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,25 +34,26 @@ func StartEngine(ctx *cli.Context) error {
3434
numServices := 0
3535

3636
// worker
37+
var workerServer serverv2.Server
3738
var w *worker.Service
3839
if config.Mode() == "standalone" || config.Mode() == "worker" {
3940
runnerClient := initRunner()
4041

41-
w = engine.NewWorker(
42+
workerServer, w = engine.NewWorker(
4243
brokerClient,
4344
runnerClient,
4445
)
4546

46-
numServices++
47+
numServices += 2
4748
}
4849

4950
// coordinator
50-
var httpServer serverv2.Server
51+
var coordinatorServer serverv2.Server
5152
var c *coordinator.Service
5253
if config.Mode() == "standalone" || config.Mode() == "coordinator" {
5354
readwriterClient := initReadWriter()
5455

55-
httpServer, c = engine.NewCoordinator(
56+
coordinatorServer, c = engine.NewCoordinator(
5657
brokerClient,
5758
readwriterClient,
5859
)
@@ -74,6 +75,13 @@ func StartEngine(ctx *cli.Context) error {
7475
slog.InfoContext(ctx.Context, "starting worker")
7576
ch <- w.Start(stop)
7677
}()
78+
79+
wg.Add(1)
80+
go func() {
81+
defer wg.Done()
82+
slog.InfoContext(ctx.Context, "starting worker http server", "address", config.WorkerHttp())
83+
ch <- workerServer.Start()
84+
}()
7785
}
7886

7987
// start coordinator
@@ -91,8 +99,8 @@ func StartEngine(ctx *cli.Context) error {
9199
wg.Add(1)
92100
go func() {
93101
defer wg.Done()
94-
slog.InfoContext(ctx.Context, "starting http server", "address", config.HttpAddress())
95-
ch <- httpServer.Start()
102+
slog.InfoContext(ctx.Context, "starting coordinator http server", "address", config.CoordinatorHttp())
103+
ch <- coordinatorServer.Start()
96104
}()
97105
}
98106

docker-compose.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,14 @@ services:
3131
container_name: worker
3232
build: .
3333
command: engine
34+
ports:
35+
- "4001:4001"
3436
environment:
3537
- ENV=dev
3638
- NAME=worker
3739
- VERSION=0.1.0-alpha.0
3840
- MODE=worker
41+
- HTTP_ADDRESS=:4001
3942
- BROKER=rabbit
4043
- BROKER_LOCATION=amqp://guest:guest@rabbitmq:5672
4144
- RUNNER=docker
@@ -56,8 +59,8 @@ services:
5659
- ENV=dev
5760
- NAME=coordinator
5861
- VERSION=0.1.0-alpha.0
59-
- HTTP_ADDRESS=:4000
6062
- MODE=coordinator
63+
- HTTP_ADDRESS=:4000
6164
- BROKER=rabbit
6265
- BROKER_LOCATION=amqp://guest:guest@rabbitmq:5672
6366
- READ_WRITER=postgres

internal/engine/clients/broker/broker.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ var (
1717
)
1818

1919
type Broker interface {
20-
Subscribe(ctx context.Context, callback func(ctx context.Context, data []byte) error, opts ...SubscribeOption) (chan struct{}, error)
20+
Subscribe(ctx context.Context, callback func(ctx context.Context, data []byte) error, opts ...SubscribeOption) error
2121
Publish(ctx context.Context, data []byte, opts ...PublishOption) error
22+
Close(ctx context.Context) error
2223
}

internal/engine/clients/broker/memory/broker.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@ type memoryBroker struct {
1212
options broker.Options
1313
queues map[string]chan []byte
1414
mtx sync.RWMutex
15+
exit chan struct{}
16+
wg sync.WaitGroup
17+
once sync.Once
1518
}
1619

17-
func (b *memoryBroker) Subscribe(ctx context.Context, callback func(ctx context.Context, data []byte) error, opts ...broker.SubscribeOption) (chan struct{}, error) {
20+
func (b *memoryBroker) Subscribe(ctx context.Context, callback func(ctx context.Context, data []byte) error, opts ...broker.SubscribeOption) error {
1821
options := broker.NewSubscribeOptions(opts...)
1922

2023
// span
@@ -28,12 +31,13 @@ func (b *memoryBroker) Subscribe(ctx context.Context, callback func(ctx context.
2831
}
2932
b.mtx.Unlock()
3033

31-
ch := make(chan struct{})
32-
34+
b.wg.Add(1)
3335
go func() {
36+
defer b.wg.Done()
37+
3438
for {
3539
select {
36-
case <-ch:
40+
case <-b.exit:
3741
return
3842
case data := <-q:
3943
if err := callback(ctx, data); err != nil {
@@ -44,7 +48,7 @@ func (b *memoryBroker) Subscribe(ctx context.Context, callback func(ctx context.
4448
}
4549
}()
4650

47-
return ch, nil
51+
return nil
4852
}
4953

5054
func (b *memoryBroker) Publish(ctx context.Context, data []byte, opts ...broker.PublishOption) error {
@@ -66,13 +70,36 @@ func (b *memoryBroker) Publish(ctx context.Context, data []byte, opts ...broker.
6670
return nil
6771
}
6872

73+
func (b *memoryBroker) Close(ctx context.Context) error {
74+
done := make(chan struct{})
75+
76+
b.once.Do(func() {
77+
close(b.exit)
78+
79+
go func() {
80+
b.wg.Wait()
81+
close(done)
82+
}()
83+
})
84+
85+
select {
86+
case <-ctx.Done():
87+
return ctx.Err()
88+
case <-done:
89+
return nil
90+
}
91+
}
92+
6993
func NewBroker(opts ...broker.Option) broker.Broker {
7094
options := broker.NewOptions(opts...)
7195

7296
b := &memoryBroker{
7397
options: options,
7498
queues: map[string]chan []byte{},
7599
mtx: sync.RWMutex{},
100+
exit: make(chan struct{}),
101+
wg: sync.WaitGroup{},
102+
once: sync.Once{},
76103
}
77104

78105
return b

0 commit comments

Comments
 (0)