Skip to content

Commit b9e3b92

Browse files
authored
Merge pull request #125 from andrewm4894/add-delete-jobs
Add delete job functionality with SQL template and scheduling
2 parents 7aee53f + 43d99a7 commit b9e3b92

File tree

9 files changed

+258
-40
lines changed

9 files changed

+258
-40
lines changed

anomstack/external/duckdb/duckdb.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,31 @@ def save_df_duckdb(df: pd.DataFrame, table_key: str) -> pd.DataFrame:
6363
query(connection=conn, query=f"CREATE TABLE {table_key} AS SELECT * FROM df")
6464

6565
return df
66+
67+
68+
def run_sql_duckdb(sql: str) -> None:
69+
"""
70+
Execute a non-returning SQL statement in DuckDB.
71+
72+
Args:
73+
sql (str): The SQL statement to execute.
74+
75+
Returns:
76+
None
77+
"""
78+
logger = get_dagster_logger()
79+
80+
duckdb_path = os.environ.get("ANOMSTACK_DUCKDB_PATH", "tmpdata/anomstack.db")
81+
logger.info(f"duckdb_path: {duckdb_path}")
82+
83+
os.makedirs(os.path.dirname(duckdb_path), exist_ok=True)
84+
85+
conn = connect(duckdb_path)
86+
87+
try:
88+
query(connection=conn, query=sql)
89+
except Exception as e:
90+
logger.error(f"Error executing SQL statement in DuckDB: {e}")
91+
raise
92+
finally:
93+
conn.close()

anomstack/external/sqlite/sqlite.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,47 @@ def save_df_sqlite(df: pd.DataFrame, table_key: str) -> pd.DataFrame:
9090
raise
9191
# If all retries fail, raise an error
9292
raise sqlite3.OperationalError("Database is locked after multiple attempts.")
93+
94+
95+
def run_sql_sqlite(sql: str) -> None:
96+
"""
97+
Execute a non-returning SQL statement in SQLite with retry logic.
98+
99+
Args:
100+
sql (str): The SQL statement to execute.
101+
102+
Returns:
103+
None
104+
"""
105+
logger = get_dagster_logger()
106+
sqlite_path = os.environ.get("ANOMSTACK_SQLITE_PATH", "tmpdata/anomstack.db")
107+
logger.info(f"sqlite_path: {sqlite_path}")
108+
os.makedirs(os.path.dirname(sqlite_path), exist_ok=True)
109+
110+
attempt = 0
111+
while attempt < MAX_RETRIES:
112+
try:
113+
conn = sqlite3.connect(sqlite_path)
114+
cursor = conn.cursor()
115+
cursor.execute(sql)
116+
conn.commit()
117+
cursor.close()
118+
conn.close()
119+
return
120+
except sqlite3.OperationalError as e:
121+
if "database is locked" in str(e):
122+
attempt += 1
123+
logger.warning(
124+
f"Database is locked; attempt {attempt} of {MAX_RETRIES}. "
125+
f"Retrying in {RETRY_DELAY} seconds..."
126+
)
127+
time.sleep(RETRY_DELAY)
128+
else:
129+
logger.error(f"Error executing SQL statement: {e}")
130+
raise
131+
finally:
132+
if 'conn' in locals():
133+
conn.close()
134+
135+
# If all retries fail, raise an error
136+
raise sqlite3.OperationalError("Database is locked after multiple attempts.")

anomstack/jobs/delete.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
"""
2+
Generate metric deletion jobs and schedules.
3+
"""
4+
5+
import os
6+
7+
from dagster import (
8+
MAX_RUNTIME_SECONDS_TAG,
9+
DefaultScheduleStatus,
10+
JobDefinition,
11+
ScheduleDefinition,
12+
get_dagster_logger,
13+
job,
14+
op,
15+
)
16+
17+
from anomstack.config import specs
18+
from anomstack.jinja.render import render
19+
from anomstack.sql.read import read_sql
20+
21+
ANOMSTACK_MAX_RUNTIME_SECONDS_TAG = os.getenv(
22+
"ANOMSTACK_MAX_RUNTIME_SECONDS_TAG", 3600
23+
)
24+
25+
26+
def build_delete_job(spec: dict) -> JobDefinition:
27+
"""
28+
Build job definitions for delete jobs.
29+
30+
Args:
31+
spec (dict): A dictionary containing the specifications for the delete job.
32+
33+
Returns:
34+
JobDefinition: A job definition for the delete job.
35+
"""
36+
37+
if spec.get("disable_delete"):
38+
39+
@job(
40+
name=f'{spec["metric_batch"]}_delete_disabled',
41+
tags={MAX_RUNTIME_SECONDS_TAG: ANOMSTACK_MAX_RUNTIME_SECONDS_TAG},
42+
)
43+
def _dummy_job():
44+
@op(name=f'{spec["metric_batch"]}_delete_noop')
45+
def noop():
46+
pass
47+
48+
noop()
49+
50+
return _dummy_job
51+
52+
get_dagster_logger()
53+
54+
metric_batch = spec["metric_batch"]
55+
db = spec["db"]
56+
spec["alert_methods"]
57+
spec["table_key"]
58+
spec.get("metric_tags", {})
59+
spec.get("delete_after_n_days", None)
60+
61+
@job(
62+
name=f"{metric_batch}_delete",
63+
tags={MAX_RUNTIME_SECONDS_TAG: ANOMSTACK_MAX_RUNTIME_SECONDS_TAG},
64+
)
65+
def _job():
66+
"""
67+
Run delete job.
68+
"""
69+
70+
@op(name=f"{metric_batch}_delete_old_data")
71+
def delete_old_data() -> None:
72+
"""
73+
Delete old data.
74+
75+
Returns:
76+
None: None.
77+
"""
78+
_ = read_sql(render("delete_sql", spec), db, returns_df=False)
79+
80+
return None
81+
82+
delete_old_data()
83+
84+
return _job
85+
86+
87+
# Build delete jobs and schedules.
88+
delete_jobs = []
89+
delete_schedules = []
90+
for spec_name, spec in specs.items():
91+
delete_job = build_delete_job(spec)
92+
delete_jobs.append(delete_job)
93+
if spec["delete_default_schedule_status"] == "RUNNING":
94+
delete_default_schedule_status = DefaultScheduleStatus.RUNNING
95+
else:
96+
delete_default_schedule_status = DefaultScheduleStatus.STOPPED
97+
delete_schedule = ScheduleDefinition(
98+
job=delete_job,
99+
cron_schedule=spec["delete_cron_schedule"],
100+
default_status=delete_default_schedule_status,
101+
)
102+
delete_schedules.append(delete_schedule)

anomstack/main.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from anomstack.jobs.alert import alert_jobs, alert_schedules
88
from anomstack.jobs.change import change_jobs, change_schedules
9+
from anomstack.jobs.delete import delete_jobs, delete_schedules
910
from anomstack.jobs.ingest import ingest_jobs, ingest_schedules
1011
from anomstack.jobs.llmalert import llmalert_jobs, llmalert_schedules
1112
from anomstack.jobs.plot import plot_jobs, plot_schedules
@@ -23,6 +24,7 @@
2324
+ plot_jobs
2425
+ change_jobs
2526
+ summary_jobs
27+
+ delete_jobs
2628
)
2729
sensors = [email_on_run_failure]
2830
schedules = (
@@ -34,6 +36,7 @@
3436
+ plot_schedules
3537
+ change_schedules
3638
+ summary_schedules
39+
+ delete_schedules
3740
)
3841

3942
defs = Definitions(

anomstack/sql/read.py

Lines changed: 27 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,55 +3,28 @@
33
different database connectors.
44
"""
55

6-
import re
76

87
import pandas as pd
9-
import sqlglot
108
from dagster import get_dagster_logger
119

1210
from anomstack.df.utils import log_df_info
13-
from anomstack.external.duckdb.duckdb import read_sql_duckdb
11+
from anomstack.external.duckdb.duckdb import read_sql_duckdb, run_sql_duckdb
1412
from anomstack.external.gcp.bigquery import read_sql_bigquery
1513
from anomstack.external.snowflake.snowflake import read_sql_snowflake
16-
from anomstack.external.sqlite.sqlite import read_sql_sqlite
14+
from anomstack.external.sqlite.sqlite import read_sql_sqlite, run_sql_sqlite
15+
from anomstack.sql.translate import db_translate
1716

1817
pd.options.display.max_columns = 10
1918

2019

21-
def db_translate(sql: str, db: str) -> str:
22-
"""
23-
Replace some functions with their db-specific equivalents.
24-
25-
Args:
26-
sql (str): The SQL query to be translated.
27-
db (str): The name of the database to which the query will be sent.
28-
29-
Returns:
30-
str: The translated SQL query.
31-
"""
32-
# Transpile the SQL query to the target database dialect
33-
sql = sqlglot.transpile(sql, write=db, identify=True, pretty=True)[0]
34-
# Replace some functions with their db-specific equivalents
35-
if db == "sqlite":
36-
sql = sql.replace("GET_CURRENT_TIMESTAMP()", "DATETIME('now')")
37-
elif db == "bigquery":
38-
sql = sql.replace("GET_CURRENT_TIMESTAMP()", "CURRENT_TIMESTAMP()")
39-
sql = re.sub(
40-
r"DATE\('now', '(-?\d+) day'\)",
41-
"DATE_ADD(CURRENT_DATE(), INTERVAL \\1 DAY)",
42-
sql
43-
)
44-
45-
return sql
46-
47-
48-
def read_sql(sql: str, db: str) -> pd.DataFrame:
20+
def read_sql(sql: str, db: str, returns_df: bool = True) -> pd.DataFrame:
4921
"""
5022
Read data from SQL.
5123
5224
Args:
5325
sql (str): SQL query to execute.
5426
db (str): Name of the database to connect to.
27+
returns_df (bool, optional): Whether the query expects a DataFrame as a result.
5528
5629
Returns:
5730
pd.DataFrame: A pandas DataFrame containing the results of the SQL query.
@@ -64,13 +37,31 @@ def read_sql(sql: str, db: str) -> pd.DataFrame:
6437
logger.debug(f"-- read_sql() is about to read this qry:\n{sql}")
6538

6639
if db == "bigquery":
67-
df = read_sql_bigquery(sql)
40+
if returns_df:
41+
df = read_sql_bigquery(sql)
42+
elif not returns_df:
43+
raise NotImplementedError(
44+
"BigQuery not yet implemented for non-returns_df queries."
45+
)
6846
elif db == "snowflake":
69-
df = read_sql_snowflake(sql)
47+
if returns_df:
48+
df = read_sql_snowflake(sql)
49+
elif not returns_df:
50+
raise NotImplementedError(
51+
"Snowflake not yet implemented for non-returns_df queries."
52+
)
7053
elif db == "duckdb":
71-
df = read_sql_duckdb(sql)
54+
if returns_df:
55+
df = read_sql_duckdb(sql)
56+
elif not returns_df:
57+
run_sql_duckdb(sql)
58+
df = pd.DataFrame()
7259
elif db == "sqlite":
73-
df = read_sql_sqlite(sql)
60+
if returns_df:
61+
df = read_sql_sqlite(sql)
62+
elif not returns_df:
63+
run_sql_sqlite(sql)
64+
df = pd.DataFrame()
7465
else:
7566
raise ValueError(f"Unknown db: {db}")
7667

anomstack/sql/translate.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import re
2+
3+
import sqlglot
4+
5+
6+
def db_translate(sql: str, db: str) -> str:
7+
"""
8+
Replace some functions with their db-specific equivalents.
9+
10+
Args:
11+
sql (str): The SQL query to be translated.
12+
db (str): The name of the database to which the query will be sent.
13+
14+
Returns:
15+
str: The translated SQL query.
16+
"""
17+
# Transpile the SQL query to the target database dialect
18+
sql = sqlglot.transpile(sql, write=db, identify=True, pretty=True)[0]
19+
# Replace some functions with their db-specific equivalents
20+
if db == "sqlite":
21+
sql = sql.replace("GET_CURRENT_TIMESTAMP()", "DATETIME('now')")
22+
elif db == "bigquery":
23+
sql = sql.replace("GET_CURRENT_TIMESTAMP()", "CURRENT_TIMESTAMP()")
24+
sql = re.sub(
25+
r"DATE\('now', '(-?\d+) day'\)",
26+
"DATE_ADD(CURRENT_DATE(), INTERVAL \\1 DAY)",
27+
sql
28+
)
29+
30+
return sql

metrics/defaults/defaults.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ alert_cron_schedule: "*/5 * * * *" # cron schedule for alerting jobs
8787
alert_default_schedule_status: 'STOPPED' # default schedule status for alert jobs (RUNNING or STOPPED)
8888
change_cron_schedule: "*/5 * * * *" # cron schedule for change detection jobs
8989
change_default_schedule_status: 'STOPPED' # default schedule status for alert jobs (RUNNING or STOPPED)
90+
delete_cron_schedule: "0 6 * * *" # cron schedule for delete jobs
91+
delete_default_schedule_status: 'STOPPED' # default schedule status for delete jobs (RUNNING or STOPPED)
9092
llmalert_cron_schedule: "*/5 * * * *" # cron schedule for llmalerting jobs
9193
llmalert_default_schedule_status: 'STOPPED' # default schedule status for llmalert jobs (RUNNING or STOPPED)
9294
plot_cron_schedule: "*/5 * * * *" # cron schedule for plot jobs
@@ -116,6 +118,9 @@ llmalert_sql: >
116118
# default templated dashboard sql
117119
dashboard_sql: >
118120
{% include "./defaults/sql/dashboard.sql" %}
121+
# default templated delete sql
122+
delete_sql: >
123+
{% include "./defaults/sql/delete.sql" %}
119124
# default templated summary sql
120125
summary_sql: >
121126
{% include "./defaults/sql/summary.sql" %}
@@ -143,3 +148,5 @@ disable_change: False # if you want to disable change detection job for some rea
143148
disable_llmalert: True # if you want to disable llmalert job for some reason.
144149
disable_plot: False # if you want to disable plot job for some reason.
145150
disable_summary: False # if you want to disable summary job for some reason.
151+
disable_delete: False # if you want to disable delete job for some reason.
152+
delete_after_n_days: 365 # delete metrics older than this number of days.

metrics/defaults/sql/delete.sql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
Template for generating the input data for the delete job.
3+
4+
Written for SQLite but will be translated to target dialect based on `db` param via sqlglot.
5+
*/
6+
7+
delete from {{ table_key }}
8+
where
9+
metric_batch = '{{ metric_batch }}'
10+
and
11+
-- Delete data older than the specified number of days
12+
date(metric_timestamp) < date('now', '-{{ delete_after_n_days }} day')
13+
;

tests/test_main.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,19 @@
88

99

1010
def test_jobs_len():
11-
assert len(jobs) == 127
11+
assert len(jobs) == 145
1212

1313

1414
def test_jobs_len_ingest():
15-
assert len(ingest_jobs) == (len(jobs)-1) / 7
15+
assert len(ingest_jobs) == (len(jobs)-1) / 8
1616

1717

1818
def test_schedules_len():
19-
assert len(schedules) == 127
19+
assert len(schedules) == 145
2020

2121

2222
def test_schedules_len_ingest():
23-
assert len(ingest_schedules) == (len(schedules)-1) / 7
23+
assert len(ingest_schedules) == (len(schedules)-1) / 8
2424

2525

2626
def test_jobs_schedules_len_match():

0 commit comments

Comments
 (0)