44
55import os
66import sqlite3
7+ import time
78
89import pandas as pd
910from dagster import get_dagster_logger
1011
12+ MAX_RETRIES = 5
13+ RETRY_DELAY = 1
14+
1115
1216def read_sql_sqlite (sql : str ) -> pd .DataFrame :
1317 """
@@ -19,25 +23,19 @@ def read_sql_sqlite(sql: str) -> pd.DataFrame:
1923 Returns:
2024 pd.DataFrame: The result of the SQL query as a pandas DataFrame.
2125 """
22-
2326 logger = get_dagster_logger ()
24-
2527 sqlite_path = os .environ .get ("ANOMSTACK_SQLITE_PATH" , "tmpdata/anomstack.db" )
26- logger .info (f"sqlite_path:{ sqlite_path } " )
27-
28+ logger .info (f"sqlite_path: { sqlite_path } " )
2829 os .makedirs (os .path .dirname (sqlite_path ), exist_ok = True )
2930
3031 conn = sqlite3 .connect (sqlite_path )
31-
3232 df = pd .read_sql_query (sql , conn )
33-
3433 conn .close ()
3534 return df
3635
37-
3836def save_df_sqlite (df : pd .DataFrame , table_key : str ) -> pd .DataFrame :
3937 """
40- Save df to db.
38+ Save df to db with retry logic .
4139
4240 Args:
4341 df (pd.DataFrame): The DataFrame to save.
@@ -46,21 +44,27 @@ def save_df_sqlite(df: pd.DataFrame, table_key: str) -> pd.DataFrame:
4644 Returns:
4745 pd.DataFrame: The input DataFrame.
4846 """
49-
5047 logger = get_dagster_logger ()
51-
5248 sqlite_path = os .environ .get ("ANOMSTACK_SQLITE_PATH" , "tmpdata/anomstack.db" )
53- logger .info (f"sqlite_path:{ sqlite_path } " )
54-
49+ logger .info (f"sqlite_path: { sqlite_path } " )
5550 os .makedirs (os .path .dirname (sqlite_path ), exist_ok = True )
5651
57- conn = sqlite3 .connect (sqlite_path )
58-
59- try :
60- df .to_sql (table_key , conn , if_exists = 'append' , index = False )
61- except Exception as e :
62- logger .error (f"Error saving DataFrame to SQLite: { e } " )
63- raise
64-
65- conn .close ()
66- return df
52+ attempt = 0
53+ while attempt < MAX_RETRIES :
54+ try :
55+ conn = sqlite3 .connect (sqlite_path )
56+ df .to_sql (table_key , conn , if_exists = 'append' , index = False )
57+ conn .close ()
58+ return df
59+ except sqlite3 .OperationalError as e :
60+ if "database is locked" in str (e ):
61+ attempt += 1
62+ logger .warning (
63+ f"Database is locked; attempt { attempt } of { MAX_RETRIES } . Retrying in { RETRY_DELAY } seconds..."
64+ )
65+ time .sleep (RETRY_DELAY )
66+ else :
67+ logger .error (f"Error saving DataFrame to SQLite: { e } " )
68+ raise
69+ # If all retries fail, raise an error
70+ raise sqlite3 .OperationalError ("Database is locked after multiple attempts." )
0 commit comments