Skip to content

Commit 2dc7f1f

Browse files
committed
feat: requeue with dynamic delay
1 parent 576b9f7 commit 2dc7f1f

File tree

6 files changed

+102
-43
lines changed

6 files changed

+102
-43
lines changed

dispatcher/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ down:
1111
COMPOSE_PROJECT_NAME=myaegee docker-compose -f ./docker/docker-compose.yml -f ./docker/docker-compose.dev.yml down
1212

1313
test: build-dev
14-
@sleep 5; sed -i "s/'172.18.0.2'/$$(docker inspect myaegee_rabbit_1 | jq .[0].NetworkSettings.Networks.OMS.IPAddress)/" helpers/send.py ; cd helpers && python send.py
14+
@sleep 5; sed -i "s/'172.18.0.X'/$$(docker inspect myaegee_rabbit_1 | jq .[0].NetworkSettings.Networks.OMS.IPAddress)/" helpers/send.py ; cd helpers && python send.py
1515

1616
rabbit:
1717
@DOCKER_BUILDKIT=0 docker build -t aegee/rabbit:latest -f docker/Dockerfile.rabbit . && docker push aegee/rabbit

dispatcher/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,15 @@ rather in order:
4444
1. [x] When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to: we need to mark both the queue and messages as durable
4545
1. [ ] Add auto-retry (DLQ). rabbit is smart and doesn't let me process a message again unless i force it.. https://devcorner.digitalpress.blog/rabbitmq-retries-the-new-full-story/
4646
1. [ ] add the telegram queue
47+
1. investigate the massmailer queue: a queue which picks every message, and creates a list of "bcc" to send only one email? (danger: queue needs something like batch ack..) - OR it is not feasible at all because "mass"mailer is still "personalised" mailer?
48+
49+
1. why do we even have a `<`title`>` (which is dynamic), why not using directly the subject? (re: the body of the email)
50+
1. remove extension Jinja2 (into jinja)
51+
1. make it such that templates list is read from fs (for dynamic tests)
52+
53+
54+
55+
https://www.rabbitmq.com/publishers.html#unroutable
56+
57+
58+
Each consumer (subscription) has an identifier called a consumer tag. It can be used to unsubscribe from messages. Consumer tags are just strings.

dispatcher/dispatcher/main.py

Lines changed: 54 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,48 @@
5454
queue='requeue_queue',
5555
routing_key='wait')
5656

57+
def requeue_wait(ch, method, properties, body, reason):
58+
REQUEUE_DELAY_DURATIONS = [
59+
5 * 60000, # 5 mins
60+
50 * 60000, # 50 mins
61+
5*60 * 60000, # 5 hrs
62+
5*60*10 * 60000, # 50 hrs
63+
5*60*20 * 60000, # 100 hrs
64+
]
65+
66+
if (properties.headers != None and "x-delay" in properties.headers):
67+
index = REQUEUE_DELAY_DURATIONS.index(int(properties.headers["x-delay"]))
68+
if (index+1 == len(REQUEUE_DELAY_DURATIONS) ):
69+
print('Max retry time hit, dropping message')
70+
# TODO: notify someone that they've been sloppy
71+
ch.basic_ack(delivery_tag = method.delivery_tag)
72+
return
73+
else:
74+
print(f'Attempt {index+1}/{len(REQUEUE_DELAY_DURATIONS)-1}')
75+
wait = REQUEUE_DELAY_DURATIONS[index+1]
76+
else:
77+
wait = REQUEUE_DELAY_DURATIONS[0]
78+
79+
headers = {
80+
'reason': reason,
81+
'x-delay': wait,
82+
}
83+
prop = pika.BasicProperties(
84+
headers=headers,
85+
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE,
86+
)
87+
channel.basic_publish(exchange='wait_exchange',
88+
routing_key='wait',
89+
body=body,
90+
properties=prop) #NOTE it completely ignores the previous properties (and it's fine)
91+
ch.basic_ack(delivery_tag=method.delivery_tag)
92+
5793
def send_email(ch, method, properties, body):
5894
"""
5995
Callback for the NORMAL MESSAGE
6096
Output: send an email
6197
OR
62-
Output: Dead-letter queue
98+
Output: Wait-exchange
6399
"""
64100
msg = json.loads(body)
65101

@@ -69,37 +105,20 @@ def send_email(ch, method, properties, body):
69105
# TODO: send a notification to someone about adding a template
70106
# NOTE: this is a requeuable message
71107
print(f"Template {msg['template']}.jinja2 not found")
72-
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
108+
requeue_wait(ch, method, properties, body, reason="template_not_found")
73109
return
74110

75111
try:
76112
rendered = template.render(msg['parameters'], altro=msg['subject'])
77113
except exceptions.UndefinedError as e:
78114
# NOTE: this is a NON-requeuable message
79115
print(f"Error in rendering: some parameter is undefined (error: {e}; message: {msg})")
80-
# TODO: check if there is no x-header about DLQ if
81-
# i send directly to queue from this (i.e. without passing from DLQ),
82-
83-
# headers = {
84-
# 'reason': 'parameter_undefined',
85-
# 'x-delay': '60000',
86-
# }
87-
# prop = pika.BasicProperties(
88-
# headers=headers,
89-
# delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE,
90-
# )
91-
# channel.basic_publish(exchange='wait_exchange',
92-
# routing_key='wait',
93-
# body=body,
94-
# properties=prop)
95-
# ch.basic_ack(delivery_tag=method.delivery_tag)
96-
97-
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
116+
requeue_wait(ch, method, properties, body, reason="parameter_undefined")
98117
return
99118
except exceptions.TemplateNotFound:
100119
# NOTE: this is a requeuable message
101120
print(f"A sub-template in {msg['template']}.jinja2 was not found")
102-
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
121+
requeue_wait(ch, method, properties, body, reason="sub-template_not_found")
103122
return
104123

105124
email = EmailMessage()
@@ -114,10 +133,12 @@ def send_email(ch, method, properties, body):
114133
def process_dead_letter_messages(ch, method, properties, body):
115134
"""
116135
Callback for the ERROR MESSAGE
117-
Output: Requeue on delayed exchange (if error about missing template)
118-
OR
119-
Output: Remove (if error about unretrievable data)
120-
#TODO I can't know which is the case, with the current design
136+
Output: none yet. I don't expect for messages to fall here, I keep the DLQ for safety
137+
138+
@see https://stackoverflow.com/a/58500336
139+
"The way to do this is not to use NACK at all but to generate and return a 'new' message
140+
(which is simply the current message you are handling, but adding new headers to it).
141+
It appears that a NACK is basically doing this anyway according to the AMQP spec."
121142
"""
122143
REQUEUE_DELAY_DURATIONS = [
123144
5 * 60000, # 5 mins
@@ -126,15 +147,9 @@ def process_dead_letter_messages(ch, method, properties, body):
126147
5*60*10 * 60000, # 50 hrs
127148
5*60*20 * 60000, # 100 hrs
128149
]
129-
wait_for = REQUEUE_DELAY_DURATIONS[4] # TODO make it dynamic
130-
131-
print(f'DLQ')
132-
print(properties.headers)
133-
print(f'Message was in "{properties.headers["x-first-death-exchange"]}" (specifically "{properties.headers["x-first-death-queue"]}" queue) and was rejected because: {properties.headers["x-first-death-reason"]}')
134-
print()
150+
wait_for = REQUEUE_DELAY_DURATIONS[-1]
135151

136152
headers = {
137-
#'reason': 'parameter_undefined',
138153
'x-delay': wait_for,
139154
}
140155
fullheaders = {**properties.headers, **headers}
@@ -154,18 +169,20 @@ def process_requeue(ch, method, properties, body):
154169
Callback for the WAITING MESSAGES
155170
Output: Requeue on normal exchange (if error about missing template)
156171
OR
157-
Output: Remove (if limit time reached) #TODO
172+
Output: Remove (if unfixable error)
158173
"""
159174

160-
print(f'REQUEUE-WAIT')
161-
print(properties.headers)
162-
print(f'Message was in "{properties.headers["x-first-death-exchange"]}" (specifically "{properties.headers["x-first-death-queue"]}" queue) and was rejected because: {properties.headers["x-first-death-reason"]}')
163-
print()
175+
if (properties.headers["reason"] == 'parameter_undefined'):
176+
print('Impossible to fix error, dropping message')
177+
#TODO output something/notify to leave a trail for better debugging on what was missing
178+
ch.basic_ack(delivery_tag = method.delivery_tag)
179+
return
164180

165181
channel.basic_publish(exchange='eml',
166182
routing_key='mail',
167183
body=body,
168184
properties=pika.BasicProperties(
185+
headers = properties.headers, # propagation to avoid endless loop
169186
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE,
170187
))
171188
ch.basic_ack(delivery_tag = method.delivery_tag)

dispatcher/docker/docker-compose.dev.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@ version: "3.4"
22

33
services:
44

5+
rabbit:
6+
build:
7+
context: ./${PATH_DISPATCHER}/..
8+
dockerfile: ./docker/Dockerfile.rabbit
9+
image: aegee/rabbit:3.11-mgmt-delayxch
10+
511
dispatcher:
612
build:
713
context: ./${PATH_DISPATCHER}/..

dispatcher/docker/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ version: "3.4"
33
services:
44

55
rabbit:
6-
image: aegee/rabbit
6+
image: aegee/rabbit:3.11-mgmt-delayxch
77
restart: always
88
expose:
99
- 5672

dispatcher/helpers/send.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from faker import Faker
1313
faker = Faker()
1414

15+
ERROR_TEST=True
1516
RANDOM_AMOUNT_TEST=False
1617
MIN_MSG=1
1718
MAX_MSG=8
@@ -122,7 +123,7 @@ def agora_sentence():
122123
},
123124
}
124125

125-
RABBIT_HOST="172.18.0.2" #FIXME
126+
RABBIT_HOST="172.18.0.X" #FIXME (as this is a python script launched on host we cant use docker's dns)
126127
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBIT_HOST))
127128
channel = connection.channel()
128129

@@ -132,15 +133,15 @@ def agora_sentence():
132133
channel.queue_declare(queue='email',
133134
arguments={
134135
'x-dead-letter-exchange': "dead_letter_exchange",
135-
'x-dead-letter-routing-key': "dead_letterl_routing_key",
136+
'x-dead-letter-routing-key': "dead_letter_routing_key",
136137
'x-death-header': True,
137138
},
138139
durable=True)
139140
channel.queue_bind(exchange='eml',
140141
queue='email',
141142
routing_key='mail')
142143

143-
def generate_fake_payload(subj="", template=""):
144+
def generate_fake_payload(subj="", template="", return_malformed_mail=False):
144145
email = {
145146
"from": "[email protected]",
146147
"to": [faker.email() for _ in range(random.randrange(1,3))],
@@ -214,6 +215,9 @@ def generate_fake_payload(subj="", template=""):
214215
"""
215216
}
216217
}
218+
if(return_malformed_mail):
219+
email["template"] = MAIL_TEMPLATES["EVENTS"]["MAIL_EVENT_UPDATED"]
220+
del email["parameters"]["event"]
217221
return email
218222

219223

@@ -228,7 +232,27 @@ def generate_fake_payload(subj="", template=""):
228232
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
229233
))
230234
print(f" [x] Sent {email['subject']} (to {email['to']})")
231-
print(f" Gee, I sent all {amount} ")
235+
print(f" Gee, I sent all {amount} ")
236+
237+
if(ERROR_TEST):
238+
#ERROR 1: no template
239+
email = generate_fake_payload(template="notexisting", subj="will never be delivered")
240+
channel.basic_publish(exchange='eml',
241+
routing_key='mail',
242+
body=json.dumps(email),
243+
properties=pika.BasicProperties(
244+
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
245+
))
246+
print(f" [x] Sent {email['subject']} (to {email['to']})")
247+
#ERROR 2: missing field
248+
email = generate_fake_payload(return_malformed_mail=True, subj="will never be delivered")
249+
channel.basic_publish(exchange='eml',
250+
routing_key='mail',
251+
body=json.dumps(email),
252+
properties=pika.BasicProperties(
253+
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
254+
))
255+
print(f" [x] Sent {email['subject']} (to {email['to']})")
232256

233257
if(ALL_TEMPLATES_TEST):
234258
templates_tested=0

0 commit comments

Comments
 (0)