Skip to content

Commit 4e80f34

Browse files
committed
feat(airflow): Throttle ES health alerts to prevent fatigue
Implements a throttling mechanism for the Elasticsearch cluster health check DAG using an Airflow Variable. - When the cluster first fails, an alert is sent and an 'in-alarm' variable is set with a timestamp. - Subsequent failures will not trigger alerts until 6 hours have passed. - When the cluster health is restored, the 'in-alarm' variable is cleared. This prevents the alert channel from being flooded during an extended outage, reducing alert fatigue for maintainers. Fixes #4638
1 parent 1d04460 commit 4e80f34

File tree

3 files changed

+157
-156
lines changed

3 files changed

+157
-156
lines changed

.github/CODEOWNERS

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Specific assignments for the 'openverse-catalog' group
22
/catalog/ @WordPress/openverse-catalog
3+
/catalog/dags/elasticsearch_cluster/ @openverse/catalog
34
/indexer_worker/ @WordPress/openverse-catalog
45
/dag-sync.sh @WordPress/openverse-catalog
56

@@ -44,6 +45,7 @@
4445
/CONTRIBUTING.md @WordPress/openverse-maintainers
4546
/LICENSE @WordPress/openverse-maintainers
4647
/README.md @WordPress/openverse-maintainers
48+
/DAGs.md @openverse/meta
4749
/packages/README.md @WordPress/openverse-maintainers
4850
/docker-compose.yml @WordPress/openverse-maintainers
4951
/env.template @WordPress/openverse-maintainers

DAGs.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
| DAG ID | Schedule Interval |
2+
| --------------------------------------------------------------------------- | ----------------- |
3+
| [`prod_elasticsearch_health_check`](#prod_elasticsearch_health_check) | `*/15 * * * *` |
4+
| [`staging_elasticsearch_health_check`](#staging_elasticsearch_health_check) | `*/15 * * * *` |
5+
6+
---
7+
8+
### `prod_elasticsearch_health_check`
9+
10+
This DAG checks the health of the **production** Elasticsearch cluster every 15
11+
minutes. On failure, it sends a Slack alert (throttled to once every 6 hours).
12+
On success, it clears the "in-alarm" variable.
13+
14+
---
15+
16+
### `staging_elasticsearch_health_check`
17+
18+
This DAG checks the health of the **staging** Elasticsearch cluster every 15
19+
minutes. On failure, it sends a Slack alert (throttled to once every 6 hours).
20+
On success, it clears the "in-alarm" variable.| DAG ID | Schedule Interval | |
21+
------------------------------------ | ----------------- | |
22+
[`prod_elasticsearch_health_check`](#prod_elasticsearch_health_check) |
23+
`*/15 * * * *` | |
24+
[`staging_elasticsearch_health_check`](#staging_elasticsearch_health_check) |
25+
`*/15 * * * *` |
26+
27+
---
28+
29+
### `prod_elasticsearch_health_check`
30+
31+
This DAG checks the health of the **production** Elasticsearch cluster every 15
32+
minutes.
33+
On failure, it sends a Slack alert (throttled to once every 6 hours).
34+
On success, it clears the "in-alarm" variable.
35+
36+
---
37+
38+
### `staging_elasticsearch_health_check`
39+
40+
This DAG checks the health of the **staging** Elasticsearch cluster every 15
41+
minutes.
42+
On failure, it sends a Slack alert (throttled to once every 6 hours).
43+
On success, it clears the "in-alarm" variable.
Lines changed: 112 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -1,194 +1,150 @@
1-
"""
2-
Monitor staging and production Elasticsearch cluster health endpoint.
3-
4-
Requests the cluster health and alerts under the following conditions:
5-
6-
- Red cluster health
7-
- Unexpected number of nodes
8-
- Unresponsive cluster
9-
10-
Additionally, the DAG will notify (rather than alert) when the cluster health is yellow.
11-
Yellow cluster health may or may not be an issue, depending on whether it is expected,
12-
and occurs whenever shards and replicas are being relocated (e.g., during reindexes).
13-
It is worthwhile to notify in these cases, as an assurance, but we could choose to add
14-
logic that ignores yellow cluster health during data refresh or other similar operations.
15-
"""
16-
17-
import json
18-
import logging
19-
from datetime import datetime
20-
from textwrap import dedent, indent
1+
from datetime import datetime, timedelta, timezone
212
from typing import Literal
223

234
from airflow.decorators import dag, task
24-
from airflow.exceptions import AirflowSkipException
5+
from airflow.exceptions import AirflowFailException
6+
from airflow.models import Variable
7+
from airflow.operators.python import ShortCircuitOperator
258
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
26-
from elasticsearch import Elasticsearch
27-
28-
from common.constants import DAG_DEFAULT_ARGS, ENVIRONMENTS, PRODUCTION, Environment
29-
from common.elasticsearch import get_es_host
30-
from common.sensors.utils import is_concurrent_with_any
31-
from common.slack import send_alert, send_message
32-
from legacy_data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS
33-
34-
35-
logger = logging.getLogger(__name__)
36-
37-
38-
_DAG_ID = "{env}_elasticsearch_cluster_healthcheck"
39-
ES_ICON = ":elasticsearch_bad:"
40-
ES_USERNAME = "{env} ES Cluster (via Airflow)"
41-
42-
EXPECTED_NODE_COUNT = 6
43-
EXPECTED_DATA_NODE_COUNT = 3
44-
EXPECTED_MASTER_NODE_COUNT = 3
45-
MessageType = Literal["alert", "notification"]
46-
47-
48-
def _format_response_body(response_body: dict) -> str:
49-
body_str = indent(json.dumps(response_body, indent=4), prefix=" " * 4)
50-
# body_str is indented in, because the f string added an indentation to
51-
# the front, causing the first curly brace to be incorrectly indented
52-
# and interpolating a multi-line string into the f string led subsequent lines
53-
# to have incorrect indentation (they did not incorporate the f-strings
54-
# own indentation.
55-
# Adding our own indentation using `indent` to match the f-strings
56-
# allows us to correctly dedent later on without issue, with a uniform indentation
57-
# on every line.
58-
return f"""
59-
Full healthcheck response body:
60-
```
61-
{body_str}
62-
```
63-
"""
9+
from airflow.utils.trigger_rule import TriggerRule
6410

11+
from common.slack import send_message
6512

66-
def _compose_red_status(env: Environment, response_body: dict) -> str:
67-
message = f"""
68-
Elasticsearch {env} cluster status is *red*.
6913

70-
This is a critical status change, *investigate ASAP*.
14+
# The name of the Airflow Variable used to track the in-alarm status.
15+
ELASTICSEARCH_HEALTH_IN_ALARM_VAR = "elasticsearch_health_in_alarm"
16+
# Time to wait before re-notifying about a continuous failure.
17+
ALERT_THROTTLE_WINDOW = timedelta(hours=6)
7118

72-
{_format_response_body(response_body)}
73-
"""
74-
return message
19+
_DAG_ID = "{env}_elasticsearch_health_check"
20+
_SCHEDULE = "*/15 * * * *" # Every 15 minutes
7521

22+
_SHARED_DAG_ARGS = {
23+
"schedule": _SCHEDULE,
24+
"start_date": datetime(2024, 1, 1),
25+
"catchup": False,
26+
"doc_md": """
27+
### Elasticsearch Health Check
7628
77-
def _compose_unexpected_node_count(env: Environment, response_body: dict) -> str:
78-
node_count = response_body["number_of_nodes"]
79-
data_node_count = response_body["number_of_data_nodes"]
80-
master_node_count = node_count - data_node_count
29+
This DAG checks the health of the Elasticsearch cluster every 15 minutes.
8130
82-
message = f"""
83-
Elasticsearch {env} cluster node count is *{node_count}*.
84-
Expected {EXPECTED_NODE_COUNT} total nodes.
31+
On a failure, it sends a Slack alert. To prevent alert fatigue, it uses an
32+
Airflow Variable to throttle alerts, only sending a new one if the last
33+
was more than 6 hours ago.
8534
86-
Master nodes: *{master_node_count}* of expected {EXPECTED_MASTER_NODE_COUNT}
87-
Data nodes: *{data_node_count}* of expected {EXPECTED_DATA_NODE_COUNT}
35+
On success, it clears the 'in-alarm' Variable.
36+
""",
37+
"tags": ["elasticsearch", "maintenance", "monitoring"],
38+
}
8839

89-
This is a critical status change, *investigate ASAP*.
90-
If this is expected (e.g., during controlled node or cluster changes), acknowledge immediately with explanation.
9140

92-
{_format_response_body(response_body)}
41+
# Helper functions for the throttling logic
42+
def _check_if_throttled() -> bool:
9343
"""
94-
logger.error(f"Unexpected node count; {json.dumps(response_body)}")
95-
return message
96-
97-
98-
def _compose_yellow_cluster_health(env: Environment, response_body: dict) -> str:
99-
message = f"""
100-
Elasticsearch {env} cluster health is *yellow*.
44+
Check if an alert for a failing cluster should be sent.
10145
102-
This does not mean something is necessarily wrong, but if this is not expected (e.g., data refresh) then investigate cluster health now.
46+
An alert is throttled if an 'in-alarm' variable exists and was set within the
47+
ALERT_THROTTLE_WINDOW.
10348
104-
{_format_response_body(response_body)}
49+
:return: True if the alert should be sent (not throttled), False otherwise.
10550
"""
106-
logger.info(f"Cluster health was yellow; {json.dumps(response_body)}")
107-
return message
51+
last_alert_str = Variable.get(ELASTICSEARCH_HEALTH_IN_ALARM_VAR, default_var=None)
10852

53+
if not last_alert_str:
54+
# No variable exists, this is the first failure. Alert is not throttled.
55+
print("No existing alarm Variable. Alerting.")
56+
return True
10957

110-
@task
111-
def ping_healthcheck(env: str, es_host: str) -> dict:
112-
es_conn: Elasticsearch = ElasticsearchPythonHook(hosts=[es_host]).get_conn
58+
last_alert_ts = datetime.fromisoformat(last_alert_str)
59+
time_since_last_alert = datetime.now(timezone.utc) - last_alert_ts
11360

114-
response = es_conn.cluster.health()
115-
116-
return response.body
117-
118-
119-
@task
120-
def compose_notification(
121-
env: Environment, response_body: dict, is_data_refresh_running: bool
122-
) -> tuple[MessageType, str]:
123-
status = response_body["status"]
61+
if time_since_last_alert > ALERT_THROTTLE_WINDOW:
62+
# It's been long enough, send another alert. Not throttled.
63+
print(
64+
f"Last alert was at {last_alert_ts}. Throttling window has passed. "
65+
"Alerting."
66+
)
67+
return True
68+
else:
69+
# It's too soon, do not send another alert. Throttled.
70+
print(f"Last alert was at {last_alert_ts}. Alert is throttled.")
71+
return False
12472

125-
if status == "red":
126-
return "alert", _compose_red_status(env, response_body)
12773

128-
if response_body["number_of_nodes"] != EXPECTED_NODE_COUNT:
129-
return "alert", _compose_unexpected_node_count(env, response_body)
74+
def _set_alarm_variable():
75+
"""
76+
Set the 'in-alarm' variable with the current UTC timestamp.
13077
131-
if status == "yellow":
132-
if is_data_refresh_running and env == PRODUCTION:
133-
raise AirflowSkipException(
134-
"Production cluster health status is yellow during data refresh. "
135-
"This is an expected state, so no alert is sent."
136-
)
78+
This is called after a failure alert is sent to begin the throttling window.
79+
"""
80+
now_iso = datetime.now(timezone.utc).isoformat()
81+
Variable.set(ELASTICSEARCH_HEALTH_IN_ALARM_VAR, now_iso)
82+
print(f"Set {ELASTICSEARCH_HEALTH_IN_ALARM_VAR} to {now_iso}.")
13783

138-
return "notification", _compose_yellow_cluster_health(env, response_body)
13984

140-
raise AirflowSkipException(f"Cluster health is green; {json.dumps(response_body)}")
85+
def _clear_alarm_variable():
86+
"""
87+
Delete the 'in-alarm' variable.
14188
89+
This is called when the cluster health check succeeds, resetting the alert mechanism.
90+
"""
91+
Variable.delete(ELASTICSEARCH_HEALTH_IN_ALARM_VAR)
92+
print(f"Cleared {ELASTICSEARCH_HEALTH_IN_ALARM_VAR}.")
14293

143-
@task
144-
def notify(env: str, message_type_and_string: tuple[MessageType, str]):
145-
message_type, message = message_type_and_string
14694

147-
message_kwargs = {
148-
"dag_id": _DAG_ID.format(env=env),
149-
"username": ES_USERNAME.format(env=env.title()),
150-
"icon_emoji": ES_ICON,
151-
}
95+
def create_es_health_check_dag(env: Literal["prod", "staging"]):
96+
"""Create the Elasticsearch health check DAG for a given environment."""
15297

153-
if message_type == "alert":
154-
send_alert(dedent(message), **message_kwargs)
155-
elif message_type == "notification":
156-
send_message(dedent(message), **message_kwargs)
157-
else:
158-
raise ValueError(
159-
f"Invalid message_type. Expected 'alert' or 'notification', "
160-
f"received {message_type}"
98+
@dag(dag_id=_DAG_ID.format(env=env), **_SHARED_DAG_ARGS)
99+
def es_health_check_dag():
100+
# This is the primary task. It will fail if the ES cluster is unhealthy.
101+
@task
102+
def check_es_health():
103+
hook = ElasticsearchPythonHook(
104+
elasticsearch_conn_id=f"elasticsearch_http_{env}"
105+
)
106+
health = hook.get_conn().cluster.health()
107+
print(health)
108+
if health["status"] not in ("green", "yellow"):
109+
raise AirflowFailException(f"ES cluster status was {health['status']}!")
110+
111+
# Create an instance of the main health check task.
112+
health_check = check_es_health()
113+
114+
# Success path: If the health check succeeds, clear the alarm variable.
115+
# This task uses the default trigger_rule=TriggerRule.ALL_SUCCESS
116+
clear_alarm = task(python_callable=_clear_alarm_variable)
117+
clear_alarm_task = clear_alarm()
118+
119+
# Failure path: These tasks only run if the health check fails.
120+
# 1. Check if we should send an alert or if it's throttled.
121+
check_throttle = ShortCircuitOperator(
122+
task_id="check_if_throttled",
123+
python_callable=_check_if_throttled,
124+
trigger_rule=TriggerRule.ALL_FAILED, # Only run on failure of upstream
161125
)
162126

127+
# 2. Send the actual Slack alert.
128+
@task
129+
def notify_failure():
130+
send_message(
131+
f"❌ {env.title()} Elasticsearch cluster health check failed.",
132+
dag_id=_DAG_ID.format(env=env),
133+
)
163134

164-
_SHARED_DAG_ARGS = {
165-
# Every 15 minutes
166-
"schedule": "*/15 * * * *",
167-
"start_date": datetime(2024, 2, 4),
168-
"catchup": False,
169-
"max_active_runs": 1,
170-
"doc_md": __doc__,
171-
"tags": ["elasticsearch", "monitoring"],
172-
"default_args": DAG_DEFAULT_ARGS,
173-
}
174-
175-
176-
_DATA_REFRESH_DAG_IDS = []
177-
for config in DATA_REFRESH_CONFIGS.values():
178-
_DATA_REFRESH_DAG_IDS += [config.dag_id, config.filtered_index_dag_id]
135+
notify_failure_task = notify_failure()
179136

137+
# 3. Set the alarm variable to start the throttling window.
138+
set_alarm = task(python_callable=_set_alarm_variable)
139+
set_alarm_task = set_alarm()
180140

181-
for env in ENVIRONMENTS:
141+
# Define task dependencies
142+
health_check >> clear_alarm_task
143+
health_check >> check_throttle >> notify_failure_task >> set_alarm_task
182144

183-
@dag(dag_id=_DAG_ID.format(env=env), **_SHARED_DAG_ARGS)
184-
def cluster_healthcheck_dag():
185-
is_data_refresh_running = is_concurrent_with_any(_DATA_REFRESH_DAG_IDS)
145+
return es_health_check_dag()
186146

187-
es_host = get_es_host(env)
188-
healthcheck_response = ping_healthcheck(env, es_host)
189-
notification = compose_notification(
190-
env, healthcheck_response, is_data_refresh_running
191-
)
192-
es_host >> healthcheck_response >> notification >> notify(env, notification)
193147

194-
cluster_healthcheck_dag()
148+
# Generate the DAG for each environment
149+
for env_name in ("prod", "staging"):
150+
create_es_health_check_dag(env_name)

0 commit comments

Comments
 (0)