Skip to content

Commit 6633c7b

Browse files
authored
Merge pull request #130 from andrewm4894/turso
Add Turso
2 parents 4656e4f + b26b79e commit 6633c7b

File tree

10 files changed

+220
-86
lines changed

10 files changed

+220
-86
lines changed

.example.env

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ ANOMSTACK_DUCKDB_PATH=tmpdata/anomstack-duckdb.db
2626

2727
# local sqlite path for testing/dev quickstart
2828
ANOMSTACK_SQLITE_PATH=tmpdata/anomstack-sqlite.db
29+
# example using turso
30+
# https://docs.turso.tech/sdk/python/quickstart
31+
# ANOMSTACK_SQLITE_PATH=libsql://<your-database-url>.turso.io
2932

3033
# table id to store metrics in
3134
ANOMSTACK_TABLE_KEY=tmp.metrics
@@ -97,3 +100,7 @@ ANOMSTACK_POSTGRES_FORWARD_PORT=5432
97100

98101
# motherduck related env vars
99102
ANOMSTACK_MOTHERDUCK_TOKEN=
103+
104+
# turso related env vars
105+
ANOMSTACK_TURSO_DATABASE_URL=
106+
ANOMSTACK_TURSO_AUTH_TOKEN=

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ Supported sources and databases for your metrics to live in and be queried from:
5050
<th align="center"><a href="./anomstack/external/duckdb/duckdb.py" target="_blank">DuckDB</a></th>
5151
<th align="center"><a href="./anomstack/external/sqlite/sqlite.py" target="_blank">SQLite</a></th>
5252
<th align="center"><a href="./anomstack/external/duckdb/duckdb.py" target="_blank">MotherDuck</a></th>
53+
<th align="center"><a href="./anomstack/external/sqlite/sqlite.py" target="_blank">Turso</a></th>
5354
<th align="center">Redshift</th>
5455
</tr>
5556
</thead>
@@ -61,6 +62,7 @@ Supported sources and databases for your metrics to live in and be queried from:
6162
<td align="center">✅</td>
6263
<td align="center">✅</td>
6364
<td align="center">✅</td>
65+
<td align="center">✅</td>
6466
<td align="center">🚧</td>
6567
</tr>
6668
</tbody>

anomstack/df/utils.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,25 @@ def log_df_info(df: pd.DataFrame, logger=None):
1717
df.info(buf=buffer)
1818
info_str = buffer.getvalue()
1919
logger.info("df.info():\n%s", info_str)
20+
21+
22+
def generate_insert_sql(df, table_name, batch_size=100) -> str:
23+
"""Generate SQL DDL and batched DML from DataFrame."""
24+
columns = ', '.join(df.columns)
25+
insert_sqls = []
26+
for i in range(0, len(df), batch_size):
27+
batch = df.iloc[i:i+batch_size]
28+
values_list = []
29+
for _, row in batch.iterrows():
30+
row_values = []
31+
for val in row:
32+
if isinstance(val, str) or isinstance(val, pd.Timestamp):
33+
row_values.append(f'\'{val}\'')
34+
else:
35+
row_values.append(str(val))
36+
values_list.append(f"({', '.join(row_values)})")
37+
values = ', '.join(values_list)
38+
insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES {values};"
39+
insert_sqls.append(insert_sql)
40+
41+
return insert_sqls
Lines changed: 155 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,169 @@
11
"""
2-
Some helper functions for sqlite.
2+
Helper functions for SQLite (or Turso) with retry logic.
33
"""
44

55
import os
6-
import sqlite3
76
import time
7+
from contextlib import contextmanager
88

9+
import libsql_experimental as libsql
910
import pandas as pd
1011
from dagster import get_dagster_logger
1112

13+
from anomstack.df.utils import generate_insert_sql
14+
from anomstack.sql.utils import get_columns_from_sql
15+
1216
MAX_RETRIES = 5
1317
RETRY_DELAY = 1
1418

1519

16-
def read_sql_sqlite(sql: str) -> pd.DataFrame:
20+
def get_sqlite_path() -> str:
21+
"""
22+
Returns the path to the SQLite (or Turso) database,
23+
creating directories if needed.
24+
25+
By default, uses the env var ANOMSTACK_SQLITE_PATH,
26+
or falls back to "tmpdata/anomstack-sqlite.db".
1727
"""
18-
Read data from SQLite with retry logic.
28+
default_path = "tmpdata/anomstack-sqlite.db"
29+
path = os.environ.get("ANOMSTACK_SQLITE_PATH", default_path)
30+
# If not a Turso URI, create directories for local DB path
31+
if not path.endswith("turso.io"):
32+
os.makedirs(os.path.dirname(path), exist_ok=True)
33+
return path
34+
35+
36+
def get_conn(sqlite_path: str) -> libsql.Connection:
37+
"""
38+
Get a connection to the SQLite or Turso database.
39+
40+
If the path ends with 'turso.io', it uses the
41+
ANOMSTACK_TURSO_DATABASE_URL and ANOMSTACK_TURSO_AUTH_TOKEN
42+
environment variables for authentication.
43+
Otherwise, it connects to a local SQLite database.
1944
2045
Args:
21-
sql (str): The SQL query to execute.
46+
sqlite_path (str): The path or URL of the database.
2247
2348
Returns:
24-
pd.DataFrame: The result of the SQL query as a pandas DataFrame.
49+
libsql.Connection: The connection object.
2550
"""
26-
logger = get_dagster_logger()
27-
sqlite_path = os.environ.get("ANOMSTACK_SQLITE_PATH", "tmpdata/anomstack-sqlite.db")
28-
logger.info(f"sqlite_path: {sqlite_path}")
29-
os.makedirs(os.path.dirname(sqlite_path), exist_ok=True)
51+
if sqlite_path.endswith("turso.io"):
52+
url = os.environ.get("ANOMSTACK_TURSO_DATABASE_URL", None)
53+
auth_token = os.environ.get("ANOMSTACK_TURSO_AUTH_TOKEN", None)
54+
return libsql.connect(sqlite_path, sync_url=url, auth_token=auth_token)
55+
else:
56+
return libsql.connect(sqlite_path)
57+
58+
59+
def with_sqlite_retry(action, logger=None, max_retries=MAX_RETRIES, retry_delay=RETRY_DELAY):
60+
"""
61+
Executes a callable with retry logic if the database is locked.
62+
63+
Args:
64+
action (callable): A zero-argument function that performs the DB action and returns a value.
65+
logger (Logger, optional): Logger for logging warnings/errors. Defaults to None.
66+
max_retries (int, optional): Maximum number of retries. Defaults to MAX_RETRIES.
67+
retry_delay (float, optional): Delay in seconds between retries. Defaults to RETRY_DELAY.
3068
31-
attempt = 0
32-
while attempt < MAX_RETRIES:
69+
Returns:
70+
The result of 'action' if successful.
71+
72+
Raises:
73+
Exception: If the database remains locked after all retries or another error occurs.
74+
"""
75+
for attempt in range(max_retries):
3376
try:
34-
conn = sqlite3.connect(sqlite_path)
35-
df = pd.read_sql_query(sql, conn)
36-
conn.close()
37-
return df
38-
except sqlite3.OperationalError as e:
77+
return action()
78+
except Exception as e:
3979
if "database is locked" in str(e):
40-
attempt += 1
41-
logger.warning(
42-
f"Database is locked; attempt {attempt} of {MAX_RETRIES}. "
43-
f"Retrying in {RETRY_DELAY} seconds..."
44-
)
45-
time.sleep(RETRY_DELAY)
80+
if logger:
81+
logger.warning(
82+
f"Database is locked; attempt {attempt + 1} of {max_retries}. "
83+
f"Retrying in {retry_delay} seconds..."
84+
)
85+
time.sleep(retry_delay)
4686
else:
47-
logger.error(f"Error reading from SQLite: {e}")
87+
if logger:
88+
logger.error(f"Error during DB action: {e}")
4889
raise
49-
finally:
50-
if 'conn' in locals():
51-
conn.close()
90+
raise Exception("Database is locked after multiple attempts.")
91+
5292

53-
# If all retries fail, raise an error
54-
raise sqlite3.OperationalError("Database is locked after multiple attempts.")
93+
@contextmanager
94+
def sqlite_connection():
95+
"""
96+
Context manager that yields a DB connection, ensuring it is closed on exit.
97+
"""
98+
path = get_sqlite_path()
99+
conn = get_conn(path)
100+
yield conn
101+
102+
103+
def infer_sqlite_type(dtype) -> str:
104+
"""
105+
Map pandas dtypes to SQLite types.
106+
107+
Args:
108+
dtype: A pandas dtype (e.g. df.dtypes[col]).
109+
110+
Returns:
111+
str: The corresponding SQLite type name.
112+
"""
113+
if pd.api.types.is_integer_dtype(dtype):
114+
return "INTEGER"
115+
elif pd.api.types.is_float_dtype(dtype):
116+
return "REAL"
117+
elif pd.api.types.is_datetime64_any_dtype(dtype):
118+
return "TEXT"
119+
else:
120+
return "TEXT"
121+
122+
123+
def generate_create_table_sql(df: pd.DataFrame, table_name: str) -> str:
124+
"""
125+
Generate the CREATE TABLE statement for a given DataFrame.
126+
127+
Args:
128+
df (pd.DataFrame): The DataFrame whose columns are used to infer table schema.
129+
table_name (str): The name of the table.
130+
131+
Returns:
132+
str: The CREATE TABLE SQL statement.
133+
"""
134+
column_defs = [
135+
f"{col} {infer_sqlite_type(dtype)}"
136+
for col, dtype in zip(df.columns, df.dtypes)
137+
]
138+
return f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(column_defs)});"
139+
140+
141+
def read_sql_sqlite(sql: str) -> pd.DataFrame:
142+
"""
143+
Read data from SQLite (or Turso) with retry logic.
144+
145+
Args:
146+
sql (str): The SQL query to execute.
147+
148+
Returns:
149+
pd.DataFrame: The result of the SQL query as a pandas DataFrame.
150+
"""
151+
logger = get_dagster_logger()
152+
logger.info(f"Reading from DB path: {get_sqlite_path()}")
153+
154+
def _action():
155+
with sqlite_connection() as conn:
156+
cursor = conn.execute(sql)
157+
rows = cursor.fetchall()
158+
columns = [desc[0] for desc in cursor.description] if cursor.description else get_columns_from_sql(sql)
159+
return pd.DataFrame(rows, columns=columns)
160+
161+
return with_sqlite_retry(_action, logger=logger)
55162

56163

57164
def save_df_sqlite(df: pd.DataFrame, table_key: str) -> pd.DataFrame:
58165
"""
59-
Save df to db with retry logic.
166+
Save a DataFrame to the database (SQLite or Turso) with retry logic.
60167
61168
Args:
62169
df (pd.DataFrame): The DataFrame to save.
@@ -66,35 +173,24 @@ def save_df_sqlite(df: pd.DataFrame, table_key: str) -> pd.DataFrame:
66173
pd.DataFrame: The input DataFrame.
67174
"""
68175
logger = get_dagster_logger()
69-
sqlite_path = os.environ.get("ANOMSTACK_SQLITE_PATH", "tmpdata/anomstack-sqlite.db")
70-
logger.info(f"sqlite_path: {sqlite_path}")
71-
os.makedirs(os.path.dirname(sqlite_path), exist_ok=True)
176+
logger.info(f"Saving DataFrame to DB path: {get_sqlite_path()}")
177+
178+
def _action():
179+
with sqlite_connection() as conn:
180+
create_table_sql = generate_create_table_sql(df, table_key)
181+
conn.execute(create_table_sql)
182+
insert_sqls = generate_insert_sql(df, table_key)
183+
for ins_sql in insert_sqls:
184+
conn.execute(ins_sql)
185+
conn.commit()
186+
return df
72187

73-
attempt = 0
74-
while attempt < MAX_RETRIES:
75-
try:
76-
conn = sqlite3.connect(sqlite_path)
77-
df.to_sql(table_key, conn, if_exists='append', index=False)
78-
conn.close()
79-
return df
80-
except sqlite3.OperationalError as e:
81-
if "database is locked" in str(e):
82-
attempt += 1
83-
logger.warning(
84-
f"Database is locked; attempt {attempt} of {MAX_RETRIES}. "
85-
f"Retrying in {RETRY_DELAY} seconds..."
86-
)
87-
time.sleep(RETRY_DELAY)
88-
else:
89-
logger.error(f"Error saving DataFrame to SQLite: {e}")
90-
raise
91-
# If all retries fail, raise an error
92-
raise sqlite3.OperationalError("Database is locked after multiple attempts.")
188+
return with_sqlite_retry(_action, logger=logger)
93189

94190

95191
def run_sql_sqlite(sql: str) -> None:
96192
"""
97-
Execute a non-returning SQL statement in SQLite with retry logic.
193+
Execute a non-returning SQL statement (e.g. CREATE, INSERT, UPDATE, DELETE) with retry logic.
98194
99195
Args:
100196
sql (str): The SQL statement to execute.
@@ -103,34 +199,11 @@ def run_sql_sqlite(sql: str) -> None:
103199
None
104200
"""
105201
logger = get_dagster_logger()
106-
sqlite_path = os.environ.get("ANOMSTACK_SQLITE_PATH", "tmpdata/anomstack-sqlite.db")
107-
logger.info(f"sqlite_path: {sqlite_path}")
108-
os.makedirs(os.path.dirname(sqlite_path), exist_ok=True)
202+
logger.info(f"Executing SQL against DB path: {get_sqlite_path()}")
109203

110-
attempt = 0
111-
while attempt < MAX_RETRIES:
112-
try:
113-
conn = sqlite3.connect(sqlite_path)
114-
cursor = conn.cursor()
115-
cursor.execute(sql)
204+
def _action():
205+
with sqlite_connection() as conn:
206+
conn.execute(sql)
116207
conn.commit()
117-
cursor.close()
118-
conn.close()
119-
return
120-
except sqlite3.OperationalError as e:
121-
if "database is locked" in str(e):
122-
attempt += 1
123-
logger.warning(
124-
f"Database is locked; attempt {attempt} of {MAX_RETRIES}. "
125-
f"Retrying in {RETRY_DELAY} seconds..."
126-
)
127-
time.sleep(RETRY_DELAY)
128-
else:
129-
logger.error(f"Error executing SQL statement: {e}")
130-
raise
131-
finally:
132-
if 'conn' in locals():
133-
conn.close()
134208

135-
# If all retries fail, raise an error
136-
raise sqlite3.OperationalError("Database is locked after multiple attempts.")
209+
with_sqlite_retry(_action, logger=logger)

anomstack/sql/utils.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from typing import List
2+
3+
import sqlglot
4+
import sqlglot.expressions as exp
5+
6+
7+
def get_columns_from_sql(sql: str) -> List[str]:
8+
"""
9+
Get the columns from a SQL query.
10+
11+
Args:
12+
sql (str): The SQL query to extract columns from.
13+
14+
Returns:
15+
List[str]: The columns in the SQL query.
16+
"""
17+
columns = []
18+
for expression in sqlglot.parse_one(sql).find(exp.Select).args["expressions"]:
19+
if isinstance(expression, exp.Alias):
20+
columns.append(expression.text("alias"))
21+
elif isinstance(expression, exp.Column):
22+
columns.append(expression.text("this"))
23+
return columns

dev.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
#%%
22

33

4-
54
#%%

metrics/defaults/defaults.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# default values to be applied to all batches unless overridden in metric batch specific yaml files.
2-
db: "duckdb" # database type to use.
2+
db: "sqlite" # database type to use.
33
table_key: "metrics" # table to store metrics in.
44

55
############################################

0 commit comments

Comments
 (0)