Skip to content

Commit 04e5519

Browse files
authored
feat: support durable queues (#47)
1 parent dbbf074 commit 04e5519

File tree

4 files changed

+47
-18
lines changed

4 files changed

+47
-18
lines changed

cmd/engine.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ func initBroker() broker.Broker {
143143
case string(broker.Rabbit):
144144
return rabbit.NewBroker(
145145
broker.WithLocation(config.BrokerLocation()),
146+
broker.WithDurable(config.BrokerDurable()),
146147
)
147148
default:
148149
return memorybroker.NewBroker()

internal/engine/clients/broker/options.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ type Option func(o *Options)
66

77
type Options struct {
88
Location string
9+
Durable bool
910
Context context.Context
1011
}
1112

@@ -15,6 +16,12 @@ func WithLocation(location string) Option {
1516
}
1617
}
1718

19+
func WithDurable(durable bool) Option {
20+
return func(o *Options) {
21+
o.Durable = durable
22+
}
23+
}
24+
1825
func NewOptions(opts ...Option) Options {
1926
options := Options{
2027
Context: context.Background(),
@@ -27,21 +34,21 @@ func NewOptions(opts ...Option) Options {
2734
return options
2835
}
2936

30-
type PublishOption func(o *PublishOptions)
37+
type SubscribeOption func(o *SubscribeOptions)
3138

32-
type PublishOptions struct {
39+
type SubscribeOptions struct {
3340
Queue string
3441
Context context.Context
3542
}
3643

37-
func PublishWithQueue(queue string) PublishOption {
38-
return func(o *PublishOptions) {
44+
func SubscribeWithQueue(queue string) SubscribeOption {
45+
return func(o *SubscribeOptions) {
3946
o.Queue = queue
4047
}
4148
}
4249

43-
func NewPublishOptions(opts ...PublishOption) PublishOptions {
44-
options := PublishOptions{
50+
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
51+
options := SubscribeOptions{
4552
Context: context.Background(),
4653
}
4754

@@ -52,21 +59,21 @@ func NewPublishOptions(opts ...PublishOption) PublishOptions {
5259
return options
5360
}
5461

55-
type SubscribeOption func(o *SubscribeOptions)
62+
type PublishOption func(o *PublishOptions)
5663

57-
type SubscribeOptions struct {
64+
type PublishOptions struct {
5865
Queue string
5966
Context context.Context
6067
}
6168

62-
func SubscribeWithQueue(queue string) SubscribeOption {
63-
return func(o *SubscribeOptions) {
69+
func PublishWithQueue(queue string) PublishOption {
70+
return func(o *PublishOptions) {
6471
o.Queue = queue
6572
}
6673
}
6774

68-
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
69-
options := SubscribeOptions{
75+
func NewPublishOptions(opts ...PublishOption) PublishOptions {
76+
options := PublishOptions{
7077
Context: context.Background(),
7178
}
7279

internal/engine/clients/broker/rabbit/broker.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (b *rabbitBroker) Subscribe(ctx context.Context, callback func(ctx context.
5151

5252
if _, err := rbch.QueueDeclare(
5353
options.Queue,
54-
false, // durable
54+
b.options.Durable,
5555
false, // delete when unused
5656
false, // exclusive
5757
false, // no-wait,
@@ -145,7 +145,7 @@ func (b *rabbitBroker) Publish(ctx context.Context, data []byte, opts ...broker.
145145

146146
if _, err := rbch.QueueDeclare(
147147
options.Queue,
148-
false, // durable
148+
b.options.Durable,
149149
false, // delete when unused
150150
false, // exclusive
151151
false, // no-wait,
@@ -154,16 +154,22 @@ func (b *rabbitBroker) Publish(ctx context.Context, data []byte, opts ...broker.
154154
return err
155155
}
156156

157+
publishing := amqp.Publishing{
158+
ContentType: "application/json",
159+
Body: data,
160+
}
161+
162+
if b.options.Durable {
163+
publishing.DeliveryMode = amqp.Persistent
164+
}
165+
157166
if err := rbch.PublishWithContext(
158167
ctx,
159168
"", // exchange
160169
options.Queue, // routing key
161170
false, // mandatory
162171
false, // immediate
163-
amqp.Publishing{
164-
ContentType: "application/json",
165-
Body: data,
166-
},
172+
publishing,
167173
); err != nil {
168174
return broker.ErrPublishing
169175
}

internal/engine/config/config.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type config struct {
2828
workerQueues map[string]int
2929
broker string
3030
brokerLocation string
31+
brokerDurable bool
3132
runner string
3233
runnerHost string
3334
runnerRegistryUser string
@@ -48,6 +49,7 @@ func New() {
4849
workerQueues: map[string]int{string(task.Scheduled): 1, string(task.Cancelled): 1},
4950
broker: "memory",
5051
brokerLocation: "",
52+
brokerDurable: false,
5153
runner: "docker",
5254
runnerHost: "unix:///var/run/docker.sock",
5355
runnerRegistryUser: "",
@@ -123,6 +125,11 @@ func New() {
123125
instance.brokerLocation = brokerLocation
124126
}
125127

128+
brokerDurable := os.Getenv("BROKER_DURABLE")
129+
if brokerDurable == "true" {
130+
instance.brokerDurable = true
131+
}
132+
126133
r := os.Getenv("RUNNER")
127134
if len(r) > 0 {
128135
if _, ok := runner.RuntimeTypes[r]; ok {
@@ -239,6 +246,14 @@ func BrokerLocation() string {
239246
return instance.brokerLocation
240247
}
241248

249+
func BrokerDurable() bool {
250+
if instance == nil {
251+
panic("cfg is nil")
252+
}
253+
254+
return instance.brokerDurable
255+
}
256+
242257
func Runner() string {
243258
if instance == nil {
244259
panic("cfg is nil")

0 commit comments

Comments
 (0)