1+ """
2+ Cleanup job for managing disk space and removing old artifacts.
3+ """
4+
5+ import os
6+ import shutil
7+ import sqlite3
8+ from datetime import datetime , timedelta
9+ from pathlib import Path
10+
11+ from dagster import DefaultScheduleStatus , JobDefinition , ScheduleDefinition , job , op , get_dagster_logger
12+
13+
14+ @op
15+ def cleanup_old_artifacts ():
16+ """Clean up old Dagster artifacts to free disk space."""
17+ logger = get_dagster_logger ()
18+
19+ artifacts_path = "/data/artifacts/storage"
20+ if not os .path .exists (artifacts_path ):
21+ logger .info ("Artifacts directory does not exist, skipping cleanup" )
22+ return
23+
24+ # Remove artifacts older than 6 hours
25+ cutoff_time = datetime .now () - timedelta (hours = 6 )
26+ removed_count = 0
27+ freed_bytes = 0
28+
29+ try :
30+ for item in os .listdir (artifacts_path ):
31+ item_path = os .path .join (artifacts_path , item )
32+ if os .path .isdir (item_path ):
33+ # Get directory modification time
34+ mod_time = datetime .fromtimestamp (os .path .getmtime (item_path ))
35+ if mod_time < cutoff_time :
36+ # Calculate size before removal
37+ try :
38+ size = sum (
39+ os .path .getsize (os .path .join (dirpath , filename ))
40+ for dirpath , dirnames , filenames in os .walk (item_path )
41+ for filename in filenames
42+ )
43+ shutil .rmtree (item_path )
44+ removed_count += 1
45+ freed_bytes += size
46+ logger .info (f"Removed old artifact directory: { item } " )
47+ except Exception as e :
48+ logger .warning (f"Failed to remove { item_path } : { e } " )
49+
50+ freed_mb = freed_bytes / (1024 * 1024 )
51+ logger .info (f"Cleanup complete: removed { removed_count } directories, freed { freed_mb :.1f} MB" )
52+
53+ except Exception as e :
54+ logger .error (f"Error during artifact cleanup: { e } " )
55+
56+
57+ @op
58+ def cleanup_old_logs ():
59+ """Clean up old log files."""
60+ logger = get_dagster_logger ()
61+
62+ log_dirs = ["/tmp/dagster" , "/data/dagster_storage" ]
63+ removed_count = 0
64+ freed_bytes = 0
65+
66+ for log_dir in log_dirs :
67+ if not os .path .exists (log_dir ):
68+ continue
69+
70+ try :
71+ for root , dirs , files in os .walk (log_dir ):
72+ for file in files :
73+ if file .endswith (('.log' , '.out' , '.err' )):
74+ file_path = os .path .join (root , file )
75+ # Remove log files older than 24 hours
76+ if os .path .getmtime (file_path ) < (datetime .now () - timedelta (hours = 24 )).timestamp ():
77+ try :
78+ size = os .path .getsize (file_path )
79+ os .remove (file_path )
80+ removed_count += 1
81+ freed_bytes += size
82+ except Exception as e :
83+ logger .warning (f"Failed to remove log file { file_path } : { e } " )
84+ except Exception as e :
85+ logger .warning (f"Error cleaning logs in { log_dir } : { e } " )
86+
87+ freed_mb = freed_bytes / (1024 * 1024 )
88+ logger .info (f"Log cleanup complete: removed { removed_count } files, freed { freed_mb :.1f} MB" )
89+
90+
91+ @op
92+ def cleanup_old_metrics ():
93+ """Clean up old metric data from database."""
94+ logger = get_dagster_logger ()
95+
96+ db_path = "/data/anomstack.db"
97+ if not os .path .exists (db_path ):
98+ logger .info ("Database does not exist, skipping metric cleanup" )
99+ return
100+
101+ try :
102+ conn = sqlite3 .connect (db_path )
103+ cursor = conn .cursor ()
104+
105+ # Remove metrics older than 90 days
106+ cutoff_date = (datetime .now () - timedelta (days = 90 )).strftime ('%Y-%m-%d' )
107+
108+ # Get count before deletion
109+ cursor .execute ("SELECT COUNT(*) FROM metrics WHERE metric_timestamp < ?" , (cutoff_date ,))
110+ old_count = cursor .fetchone ()[0 ]
111+
112+ # Delete old metrics
113+ cursor .execute ("DELETE FROM metrics WHERE metric_timestamp < ?" , (cutoff_date ,))
114+
115+ # Vacuum to reclaim space
116+ cursor .execute ("VACUUM" )
117+
118+ conn .commit ()
119+ conn .close ()
120+
121+ logger .info (f"Database cleanup complete: removed { old_count } old metric records" )
122+
123+ except Exception as e :
124+ logger .error (f"Error during database cleanup: { e } " )
125+
126+
127+ @op
128+ def report_disk_usage ():
129+ """Report current disk usage."""
130+ logger = get_dagster_logger ()
131+
132+ try :
133+ # Get disk usage for /data
134+ statvfs = os .statvfs ('/data' )
135+ total_bytes = statvfs .f_frsize * statvfs .f_blocks
136+ free_bytes = statvfs .f_frsize * statvfs .f_bavail
137+ used_bytes = total_bytes - free_bytes
138+
139+ total_gb = total_bytes / (1024 ** 3 )
140+ used_gb = used_bytes / (1024 ** 3 )
141+ free_gb = free_bytes / (1024 ** 3 )
142+ usage_percent = (used_bytes / total_bytes ) * 100
143+
144+ logger .info (f"Disk usage - Total: { total_gb :.1f} GB, Used: { used_gb :.1f} GB ({ usage_percent :.1f} %), Free: { free_gb :.1f} GB" )
145+
146+ # Get directory sizes
147+ data_dirs = ['/data/artifacts' , '/data/dagster_storage' , '/data/models' ]
148+ for dir_path in data_dirs :
149+ if os .path .exists (dir_path ):
150+ try :
151+ total_size = sum (
152+ os .path .getsize (os .path .join (dirpath , filename ))
153+ for dirpath , dirnames , filenames in os .walk (dir_path )
154+ for filename in filenames
155+ )
156+ size_gb = total_size / (1024 ** 3 )
157+ logger .info (f"{ dir_path } : { size_gb :.2f} GB" )
158+ except Exception as e :
159+ logger .warning (f"Could not calculate size for { dir_path } : { e } " )
160+
161+ except Exception as e :
162+ logger .error (f"Error reporting disk usage: { e } " )
163+
164+
165+ @job (
166+ name = "cleanup_disk_space" ,
167+ description = "Clean up old artifacts, logs, and metrics to free disk space"
168+ )
169+ def cleanup_job ():
170+ """Job to clean up disk space."""
171+ report_disk_usage ()
172+ cleanup_old_artifacts ()
173+ cleanup_old_logs ()
174+ cleanup_old_metrics ()
175+ report_disk_usage () # Report again after cleanup
176+
177+
178+ # Create schedule to run cleanup every 2 hours
179+ cleanup_schedule = ScheduleDefinition (
180+ job = cleanup_job ,
181+ cron_schedule = "0 */2 * * *" , # Every 2 hours
182+ default_status = DefaultScheduleStatus .RUNNING ,
183+ )
184+
185+ # Export for main.py
186+ cleanup_jobs = [cleanup_job ]
187+ cleanup_schedules = [cleanup_schedule ]
0 commit comments