Skip to content

Commit bc72248

Browse files
authored
Merge pull request #114 from andrewm4894/clean-up-change-alert-snooze-logic
Clean up change alert snooze logic
2 parents 80507b4 + 388e01f commit bc72248

File tree

10 files changed

+150
-84
lines changed

10 files changed

+150
-84
lines changed

Makefile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,17 @@ SHELL=/bin/bash
1616
dashboard:
1717
streamlit run ./dashboard.py --server.port 8501
1818

19-
# start streamlit dashboard as a daemon
19+
# start streamlit dashboard as a daemon with no log file
2020
dashboardd:
21-
nohup streamlit run ./dashboard.py --server.port 8501 &
21+
nohup streamlit run ./dashboard.py --server.port 8501 > /dev/null 2>&1 &
2222

2323
# start dagster locally
2424
local:
2525
dagster dev -f anomstack/main.py
2626

27-
# start dagster locally as a daemon
27+
# start dagster locally as a daemon with no log file
2828
locald:
29-
nohup dagster dev -f anomstack/main.py &
29+
nohup dagster dev -f anomstack/main.py > /dev/null 2>&1 &
3030

3131
# kill any running dagster process
3232
kill-locald:

anomstack/df/utils.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from io import StringIO
2+
from dagster import get_dagster_logger
3+
import pandas as pd
4+
5+
6+
def log_df_info(df: pd.DataFrame, logger=None):
7+
"""
8+
Logs the info of a DataFrame to the logger.
9+
10+
Parameters:
11+
df (pd.DataFrame): The DataFrame whose info needs to be logged.
12+
"""
13+
if not logger:
14+
logger = get_dagster_logger()
15+
buffer = StringIO()
16+
df.info(buf=buffer)
17+
info_str = buffer.getvalue()
18+
logger.info("df.info():\n%s", info_str)

anomstack/external/duckdb/duckdb.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,7 @@ def read_sql_duckdb(sql: str) -> pd.DataFrame:
2828
os.makedirs(os.path.dirname(duckdb_path), exist_ok=True)
2929

3030
conn = connect(duckdb_path)
31-
32-
logger.debug(f"sql:\n{sql}")
3331
df = query(connection=conn, query=sql).df()
34-
logger.debug(f"df:\n{df}")
3532

3633
return df
3734

anomstack/external/gcp/bigquery.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,11 @@ def read_sql_bigquery(sql: str) -> pd.DataFrame:
2525
Returns:
2626
pd.DataFrame: The result of the query as a DataFrame.
2727
"""
28-
29-
logger = get_dagster_logger()
30-
31-
logger.debug(f"sql:\n{sql}")
32-
3328
credentials = get_google_credentials()
34-
3529
df = pd.read_gbq(
3630
query=sql,
3731
credentials=credentials,
3832
)
39-
logger.debug(f"df:\n{df}")
4033

4134
return df
4235

anomstack/external/snowflake/snowflake.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
"""
44

55
import pandas as pd
6-
from dagster import get_dagster_logger
76
from snowflake import connector
87

98
from anomstack.external.snowflake.credentials import get_snowflake_credentials
@@ -21,11 +20,6 @@ def read_sql_snowflake(sql: str, cols_lowercase: bool = True) -> pd.DataFrame:
2120
Returns:
2221
pd.DataFrame: The result of the SQL query as a pandas DataFrame.
2322
"""
24-
25-
logger = get_dagster_logger()
26-
27-
logger.debug(f"sql:\n{sql}")
28-
2923
credentials = get_snowflake_credentials()
3024

3125
conn = connector.connect(
@@ -41,8 +35,6 @@ def read_sql_snowflake(sql: str, cols_lowercase: bool = True) -> pd.DataFrame:
4135
if cols_lowercase:
4236
df.columns = df.columns.str.lower()
4337

44-
logger.debug(f"df:\n{df}")
45-
4638
return df
4739

4840

anomstack/external/sqlite/sqlite.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ def read_sql_sqlite(sql: str) -> pd.DataFrame:
2929

3030
conn = sqlite3.connect(sqlite_path)
3131

32-
logger.debug(f"sql:\n{sql}")
3332
df = pd.read_sql_query(sql, conn)
34-
logger.debug(f"df:\n{df}")
3533

3634
conn.close()
3735
return df

anomstack/jobs/change.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
from anomstack.sql.read import read_sql
2525
from anomstack.validate.validate import validate_df
2626

27-
ANOMSTACK_MAX_RUNTIME_SECONDS_TAG = os.getenv("ANOMSTACK_MAX_RUNTIME_SECONDS_TAG", 3600)
27+
ANOMSTACK_MAX_RUNTIME_SECONDS_TAG = os.getenv(
28+
"ANOMSTACK_MAX_RUNTIME_SECONDS_TAG", 3600
29+
)
2830

2931

3032
def build_change_job(spec: dict) -> JobDefinition:
@@ -62,7 +64,6 @@ def noop():
6264
metric_tags = spec.get("metric_tags", {})
6365
change_threshold = spec.get("change_threshold", 3.5)
6466
change_detect_last_n = spec.get("change_detect_last_n", 1)
65-
change_snooze_n = spec.get("change_snooze_n", 3)
6667

6768
@job(
6869
name=f"{metric_batch}_change",

anomstack/ml/change.py

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
def detect_change(
1313
df_metric: pd.DataFrame,
1414
threshold: float = 3.5,
15-
detect_last_n: int = 1,
16-
snooze_n: int = 3
15+
detect_last_n: int = 1
1716
) -> pd.DataFrame:
1817
"""
1918
Detects change in a metric based on the Median Absolute Deviation (MAD)
@@ -25,8 +24,6 @@ def detect_change(
2524
Defaults to 3.5.
2625
detect_last_n (int, optional): Number of last observations to use for
2726
detection. Defaults to 1.
28-
snooze_n (int, optional): Number of observations to snooze after a
29-
change is detected. Defaults to 3.
3027
3128
Returns:
3229
pd.DataFrame: DataFrame with the detected change information.
@@ -44,24 +41,19 @@ def detect_change(
4441
y_detect_scores = detector.decision_function(X_detect)
4542
logger.debug(f"y_detect_scores: {y_detect_scores}")
4643
df_metric["metric_score"] = list(X_train_scores) + list(y_detect_scores)
47-
df_metric["metric_alert"] = np.where((df_metric["metric_score"] > threshold),1,0)
44+
df_metric["metric_alert"] = np.where(
45+
df_metric["metric_score"] > threshold, 1, 0
46+
)
4847
logger.debug(f"df_metric:\n{df_metric}")
4948
change_detected_count = df_metric["metric_alert"].tail(detect_last_n).sum()
50-
recent_change_detected_count = df_metric["metric_alert"].tail(snooze_n).sum()
5149
if change_detected_count > 0:
52-
# TODO: clean up the logic here as this could stay
53-
# snoozed for a long time until we see sufficient 0's
54-
# in last snooze_n so its more like a dyanmic suppression
55-
if recent_change_detected_count <= 1:
56-
logger.info(f"change detected for {metric_name} at {X_detect_timestamps}")
57-
return df_metric
58-
else:
59-
logger.info(
60-
f"change detected for {metric_name} at {X_detect_timestamps}, "
61-
f"but snoozing as {recent_change_detected_count} recent changes already detected"
62-
)
63-
return pd.DataFrame()
50+
logger.info(
51+
f"change detected for {metric_name} at {X_detect_timestamps}"
52+
)
53+
return df_metric
6454
else:
65-
logger.info(f"no change detected for {metric_name} at {X_detect_timestamps}")
55+
logger.info(
56+
f"no change detected for {metric_name} at {X_detect_timestamps}"
57+
)
6658

6759
return pd.DataFrame()

anomstack/sql/read.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@
99
import sqlglot
1010
from dagster import get_dagster_logger
1111

12+
from anomstack.df.utils import log_df_info
1213
from anomstack.external.duckdb.duckdb import read_sql_duckdb
1314
from anomstack.external.gcp.bigquery import read_sql_bigquery
1415
from anomstack.external.snowflake.snowflake import read_sql_snowflake
1516
from anomstack.external.sqlite.sqlite import read_sql_sqlite
1617

18+
pd.options.display.max_columns = 10
19+
1720

1821
def db_translate(sql: str, db: str) -> str:
1922
"""
@@ -58,7 +61,7 @@ def read_sql(sql: str, db: str) -> pd.DataFrame:
5861

5962
sql = db_translate(sql, db)
6063

61-
logger.debug(f"sql:\n{sql}")
64+
logger.debug(f"-- read_sql() is about to read this qry:\n{sql}")
6265

6366
if db == "bigquery":
6467
df = read_sql_bigquery(sql)
@@ -71,6 +74,8 @@ def read_sql(sql: str, db: str) -> pd.DataFrame:
7174
else:
7275
raise ValueError(f"Unknown db: {db}")
7376

74-
logger.debug(f"df:\n{df}")
77+
log_df_info(df, logger)
78+
logger.debug(f"df.head():\n{df.head()}")
79+
logger.debug(f"df.tail():\n{df.tail()}")
7580

7681
return df

metrics/defaults/sql/change.sql

Lines changed: 107 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,123 @@
11
/*
22
Template for generating the input data for the change detection job.
3+
4+
Written for SQLite but will be translated to target dialect based on `db` param via sqlglot.
35
*/
46

5-
WITH
6-
7-
metric_value_data AS (
8-
SELECT DISTINCT
9-
metric_timestamp,
10-
metric_batch,
11-
metric_name,
12-
AVG(metric_value) AS metric_value
13-
FROM {{ table_key }}
14-
WHERE metric_batch = '{{ metric_batch }}'
15-
AND metric_type = 'metric'
16-
AND DATE(metric_timestamp) >= DATE('now', '-{{ change_metric_timestamp_max_days_ago }} day')
17-
GROUP BY metric_timestamp, metric_batch, metric_name
7+
with
8+
9+
metric_value_data as
10+
(
11+
select distinct
12+
metric_timestamp,
13+
metric_batch,
14+
metric_name,
15+
avg(metric_value) as metric_value
16+
from
17+
{{ table_key }}
18+
where
19+
metric_batch = '{{ metric_batch }}'
20+
and
21+
metric_type = 'metric'
22+
and
23+
-- Filter to the last {{ change_metric_timestamp_max_days_ago }} days
24+
date(metric_timestamp) >= date('now', '-{{ change_metric_timestamp_max_days_ago }} day')
25+
group by
26+
metric_timestamp, metric_batch, metric_name
27+
),
28+
29+
metric_change_alert_data as
30+
(
31+
select distinct
32+
metric_timestamp,
33+
metric_batch,
34+
metric_name,
35+
max(metric_value) as metric_change_alert
36+
from
37+
{{ table_key }}
38+
where
39+
metric_batch = '{{ metric_batch }}'
40+
and
41+
metric_type = 'change'
42+
and
43+
-- Filter to the last {{ change_metric_timestamp_max_days_ago }} days
44+
date(metric_timestamp) >= date('now', '-{{ change_metric_timestamp_max_days_ago }} day')
45+
group by
46+
metric_timestamp, metric_batch, metric_name
47+
),
48+
49+
metric_value_recency_ranked as
50+
(
51+
select distinct
52+
metric_value_data.metric_timestamp,
53+
metric_value_data.metric_batch,
54+
metric_value_data.metric_name,
55+
metric_value_data.metric_value,
56+
-- If change alert not found found for the metric, default to 0
57+
coalesce(metric_change_alert_data.metric_change_alert, 0) as metric_change_alert,
58+
-- Rank the metric values by recency, with 1 being the most recent
59+
row_number() over (partition by metric_value_data.metric_name order by metric_value_data.metric_timestamp desc) as metric_value_recency_rank
60+
from
61+
metric_value_data
62+
left outer join
63+
metric_change_alert_data
64+
on
65+
metric_value_data.metric_batch = metric_change_alert_data.metric_batch
66+
and
67+
metric_value_data.metric_name = metric_change_alert_data.metric_name
68+
and
69+
metric_value_data.metric_timestamp = metric_change_alert_data.metric_timestamp
1870
),
1971

20-
metric_value_recency_ranked AS (
21-
SELECT DISTINCT
22-
metric_timestamp,
23-
metric_batch,
24-
metric_name,
25-
metric_value,
26-
ROW_NUMBER() OVER (PARTITION BY metric_name ORDER BY metric_timestamp DESC) AS metric_value_recency_rank
27-
FROM metric_value_data
72+
-- Snooze any metrics with change alerts in the last {{ change_snooze_n }} values
73+
snoozed_metric_names as
74+
(
75+
select distinct
76+
metric_name
77+
from
78+
metric_value_recency_ranked
79+
where
80+
-- Exclude metrics with change alerts in the last {{ change_snooze_n }} values
81+
metric_change_alert = 1
82+
and
83+
metric_value_recency_rank <= {{ change_snooze_n }}
2884
),
2985

30-
data_smoothed AS (
31-
SELECT
32-
metric_timestamp,
33-
metric_batch,
34-
metric_name,
35-
metric_value,
36-
metric_value_recency_rank,
37-
-- Smooth the metric value over the last {{ change_smooth_n }} values
38-
(SELECT AVG(mv.metric_value)
39-
FROM metric_value_recency_ranked mv
40-
WHERE mv.metric_name = mr.metric_name
41-
AND mv.metric_value_recency_rank BETWEEN mr.metric_value_recency_rank - {{ change_smooth_n }} AND mr.metric_value_recency_rank) AS metric_value_smooth
42-
FROM metric_value_recency_ranked mr
43-
WHERE metric_value_recency_rank <= {{ change_max_n }}
86+
data_smoothed as
87+
(
88+
select
89+
metric_timestamp,
90+
metric_batch,
91+
metric_name,
92+
metric_value,
93+
metric_change_alert,
94+
metric_value_recency_rank,
95+
-- Smooth the metric value over the last {{ change_smooth_n }} values
96+
(
97+
select
98+
avg(mv.metric_value)
99+
from
100+
metric_value_recency_ranked mv
101+
where
102+
mv.metric_name = mr.metric_name
103+
and
104+
mv.metric_value_recency_rank between mr.metric_value_recency_rank - {{ change_smooth_n }} and mr.metric_value_recency_rank
105+
) as metric_value_smooth
106+
from
107+
metric_value_recency_ranked mr
108+
where
109+
metric_value_recency_rank <= {{ change_max_n }}
110+
and
111+
-- Exclude snoozed metrics
112+
metric_name not in (select metric_name from snoozed_metric_names)
44113
)
45114

46-
SELECT
115+
select
47116
metric_timestamp,
48117
metric_batch,
49118
metric_name,
50119
metric_value,
51120
metric_value_smooth
52-
FROM data_smoothed
121+
from
122+
data_smoothed
53123
;

0 commit comments

Comments
 (0)