Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,26 @@ posthog-example:
kill-long-runs:
python scripts/maintenance/kill_long_running_tasks.py

# clean up disk space on fly instance (requires SSH access)
fly-cleanup:
@echo "🧹 Running disk cleanup on Fly instance..."
@echo "This will SSH into your Fly instance and run cleanup"
@if [ -z "$$FLY_APP" ]; then echo "Set FLY_APP environment variable"; exit 1; fi
fly ssh console -a $$FLY_APP -C "cd /opt/dagster/app && python scripts/maintenance/cleanup_disk_space.py"

# preview cleanup on fly instance (dry run)
fly-cleanup-preview:
@echo "🔍 Previewing disk cleanup on Fly instance..."
@if [ -z "$$FLY_APP" ]; then echo "Set FLY_APP environment variable"; exit 1; fi
fly ssh console -a $$FLY_APP -C "cd /opt/dagster/app && python scripts/maintenance/cleanup_disk_space.py --dry-run"

# aggressive cleanup for emergency situations
fly-cleanup-aggressive:
@echo "⚡ Running AGGRESSIVE disk cleanup on Fly instance..."
@echo "This will remove more files - use only if disk is critically full"
@if [ -z "$$FLY_APP" ]; then echo "Set FLY_APP environment variable"; exit 1; fi
fly ssh console -a $$FLY_APP -C "cd /opt/dagster/app && python scripts/maintenance/cleanup_disk_space.py --aggressive"

# run docker in dev mode with correct environment
docker-dev-env:
docker compose -f docker-compose.yaml -f docker-compose.dev.yaml up -d
Expand Down
37 changes: 37 additions & 0 deletions Makefile.md
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,43 @@ make posthog-example
make kill-long-runs
```

### Fly.io Disk Space Management

#### `make fly-cleanup-preview`
**Preview disk cleanup on Fly instance (dry run)**
- Shows what files would be removed
- Safe way to check cleanup impact
- Requires `FLY_APP` environment variable

```bash
export FLY_APP=anomstack-demo
make fly-cleanup-preview
```

#### `make fly-cleanup`
**Clean up disk space on Fly instance**
- Removes old artifacts (6+ hours)
- Removes old logs (24+ hours)
- Cleans database and runs VACUUM
- Reports disk usage before/after

```bash
export FLY_APP=anomstack-demo
make fly-cleanup
```

#### `make fly-cleanup-aggressive`
**Emergency disk cleanup (aggressive mode)**
- Removes artifacts older than 1 hour
- Removes ALL log files
- Use only when disk is critically full
- More thorough than normal cleanup

```bash
export FLY_APP=anomstack-demo
make fly-cleanup-aggressive
```

### Legacy Targets

#### `make docker-dev-env`
Expand Down
187 changes: 187 additions & 0 deletions anomstack/jobs/cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""
Cleanup job for managing disk space and removing old artifacts.
"""

import os
import shutil
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path

from dagster import DefaultScheduleStatus, JobDefinition, ScheduleDefinition, job, op, get_dagster_logger


@op
def cleanup_old_artifacts():
"""Clean up old Dagster artifacts to free disk space."""
logger = get_dagster_logger()

artifacts_path = "/data/artifacts/storage"
if not os.path.exists(artifacts_path):
logger.info("Artifacts directory does not exist, skipping cleanup")
return

# Remove artifacts older than 6 hours
cutoff_time = datetime.now() - timedelta(hours=6)
removed_count = 0
freed_bytes = 0

try:
for item in os.listdir(artifacts_path):
item_path = os.path.join(artifacts_path, item)
if os.path.isdir(item_path):
# Get directory modification time
mod_time = datetime.fromtimestamp(os.path.getmtime(item_path))
if mod_time < cutoff_time:
# Calculate size before removal
try:
size = sum(
os.path.getsize(os.path.join(dirpath, filename))
for dirpath, dirnames, filenames in os.walk(item_path)
for filename in filenames
)
shutil.rmtree(item_path)
removed_count += 1
freed_bytes += size
logger.info(f"Removed old artifact directory: {item}")
except Exception as e:
logger.warning(f"Failed to remove {item_path}: {e}")

freed_mb = freed_bytes / (1024 * 1024)
logger.info(f"Cleanup complete: removed {removed_count} directories, freed {freed_mb:.1f}MB")

except Exception as e:
logger.error(f"Error during artifact cleanup: {e}")


@op
def cleanup_old_logs():
"""Clean up old log files."""
logger = get_dagster_logger()

log_dirs = ["/tmp/dagster", "/data/dagster_storage"]
removed_count = 0
freed_bytes = 0

for log_dir in log_dirs:
if not os.path.exists(log_dir):
continue

try:
for root, dirs, files in os.walk(log_dir):
for file in files:
if file.endswith(('.log', '.out', '.err')):
file_path = os.path.join(root, file)
# Remove log files older than 24 hours
if os.path.getmtime(file_path) < (datetime.now() - timedelta(hours=24)).timestamp():
Comment on lines +75 to +76
Copy link

Copilot AI Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The datetime.now() - timedelta(hours=24) calculation is performed for every file in the loop. Consider calculating this cutoff timestamp once before the loop for better performance.

Suggested change
# Remove log files older than 24 hours
if os.path.getmtime(file_path) < (datetime.now() - timedelta(hours=24)).timestamp():
if os.path.getmtime(file_path) < cutoff_time:

Copilot uses AI. Check for mistakes.
try:
size = os.path.getsize(file_path)
os.remove(file_path)
removed_count += 1
freed_bytes += size
except Exception as e:
logger.warning(f"Failed to remove log file {file_path}: {e}")
except Exception as e:
logger.warning(f"Error cleaning logs in {log_dir}: {e}")

freed_mb = freed_bytes / (1024 * 1024)
logger.info(f"Log cleanup complete: removed {removed_count} files, freed {freed_mb:.1f}MB")


@op
def cleanup_old_metrics():
"""Clean up old metric data from database."""
logger = get_dagster_logger()

db_path = "/data/anomstack.db"
if not os.path.exists(db_path):
logger.info("Database does not exist, skipping metric cleanup")
return

try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# Remove metrics older than 90 days
cutoff_date = (datetime.now() - timedelta(days=90)).strftime('%Y-%m-%d')

# Get count before deletion
cursor.execute("SELECT COUNT(*) FROM metrics WHERE metric_timestamp < ?", (cutoff_date,))
old_count = cursor.fetchone()[0]

# Delete old metrics
cursor.execute("DELETE FROM metrics WHERE metric_timestamp < ?", (cutoff_date,))

# Vacuum to reclaim space
cursor.execute("VACUUM")

conn.commit()
conn.close()

logger.info(f"Database cleanup complete: removed {old_count} old metric records")

except Exception as e:
logger.error(f"Error during database cleanup: {e}")


@op
def report_disk_usage():
"""Report current disk usage."""
logger = get_dagster_logger()

try:
# Get disk usage for /data
statvfs = os.statvfs('/data')
total_bytes = statvfs.f_frsize * statvfs.f_blocks
free_bytes = statvfs.f_frsize * statvfs.f_bavail
used_bytes = total_bytes - free_bytes

total_gb = total_bytes / (1024 ** 3)
used_gb = used_bytes / (1024 ** 3)
free_gb = free_bytes / (1024 ** 3)
usage_percent = (used_bytes / total_bytes) * 100

logger.info(f"Disk usage - Total: {total_gb:.1f}GB, Used: {used_gb:.1f}GB ({usage_percent:.1f}%), Free: {free_gb:.1f}GB")

# Get directory sizes
data_dirs = ['/data/artifacts', '/data/dagster_storage', '/data/models']
for dir_path in data_dirs:
if os.path.exists(dir_path):
try:
total_size = sum(
os.path.getsize(os.path.join(dirpath, filename))
for dirpath, dirnames, filenames in os.walk(dir_path)
for filename in filenames
)
size_gb = total_size / (1024 ** 3)
logger.info(f"{dir_path}: {size_gb:.2f}GB")
except Exception as e:
logger.warning(f"Could not calculate size for {dir_path}: {e}")

except Exception as e:
logger.error(f"Error reporting disk usage: {e}")


@job(
name="cleanup_disk_space",
description="Clean up old artifacts, logs, and metrics to free disk space"
)
def cleanup_job():
"""Job to clean up disk space."""
report_disk_usage()
cleanup_old_artifacts()
cleanup_old_logs()
Copy link

Copilot AI Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is trailing whitespace after 'cleanup_old_logs()'.

Suggested change
cleanup_old_logs()
cleanup_old_logs()

Copilot uses AI. Check for mistakes.
cleanup_old_metrics()
report_disk_usage() # Report again after cleanup


# Create schedule to run cleanup every 2 hours
cleanup_schedule = ScheduleDefinition(
job=cleanup_job,
cron_schedule="0 */2 * * *", # Every 2 hours
default_status=DefaultScheduleStatus.RUNNING,
)

# Export for main.py
cleanup_jobs = [cleanup_job]
cleanup_schedules = [cleanup_schedule]
Copy link

Copilot AI Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is trailing whitespace after 'cleanup_schedule]'.

Suggested change
cleanup_schedules = [cleanup_schedule]
cleanup_schedules = [cleanup_schedule]

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove trailing whitespace at the end of the file.

Suggested change
cleanup_schedules = [cleanup_schedule]
cleanup_schedules = [cleanup_schedule]

Copilot uses AI. Check for mistakes.
3 changes: 3 additions & 0 deletions anomstack/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from anomstack.jobs.alert import alert_jobs, alert_schedules
from anomstack.jobs.change import change_jobs, change_schedules
from anomstack.jobs.cleanup import cleanup_jobs, cleanup_schedules
from anomstack.jobs.delete import delete_jobs, delete_schedules
from anomstack.jobs.ingest import ingest_jobs, ingest_schedules
from anomstack.jobs.llmalert import llmalert_jobs, llmalert_schedules
Expand All @@ -29,6 +30,7 @@
+ summary_jobs
+ delete_jobs
+ reload_jobs
+ cleanup_jobs
)
sensors = [email_on_run_failure, kill_long_running_runs, config_file_watcher]
schedules = (
Expand All @@ -42,6 +44,7 @@
+ summary_schedules
+ delete_schedules
+ reload_schedules
+ cleanup_schedules
)

defs = Definitions(
Expand Down
8 changes: 4 additions & 4 deletions dagster_fly.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ run_retries:
# Aggressive retention policies optimized for Fly.io disk usage
retention:
schedule:
purge_after_days: 2 # Keep for 2 days
purge_after_days: 1 # Keep for 1 day only
sensor:
purge_after_days:
skipped: 1
failure: 2
success: 1
skipped: 0.5 # 12 hours for skipped
failure: 1 # 1 day for failures
success: 0.25 # 6 hours for successful runs
Copy link

Copilot AI Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Using decimal values for days (0.5, 0.25) may be confusing and error-prone. Consider using explicit hour-based configuration or adding clear comments about the exact time periods these represent.

Suggested change
skipped: 0.5 # 12 hours for skipped
failure: 1 # 1 day for failures
success: 0.25 # 6 hours for successful runs
skipped: "12h" # 12 hours for skipped
failure: "1d" # 1 day for failures
success: "6h" # 6 hours for successful runs

Copilot uses AI. Check for mistakes.

# Enhanced run monitoring for Fly.io environment
run_monitoring:
Expand Down
44 changes: 44 additions & 0 deletions scripts/maintenance/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,50 @@ python kill_long_running_tasks.py
- **Validation**: Checks job status before taking action
- **Error Handling**: Handles unreachable user code servers gracefully

### `cleanup_disk_space.py`
Standalone script for managing disk space by cleaning up old artifacts, logs, and metrics.

**Features:**
- **Artifact Cleanup**: Removes old Dagster run artifacts
- **Log Cleanup**: Removes old log files from multiple directories
- **Database Cleanup**: Removes old metrics and vacuums database
- **Disk Usage Reporting**: Shows before/after disk usage statistics
- **Dry Run Mode**: Preview cleanup without making changes
- **Aggressive Mode**: More thorough cleanup for emergency situations

**Use Cases:**
- **Emergency Cleanup**: Free disk space when volume is full
- **Scheduled Maintenance**: Regular cleanup to prevent disk issues
- **Deployment Optimization**: Optimize Fly.io volume usage
- **Development**: Clean up after testing

**Usage:**
```bash
# Preview what would be cleaned up
python cleanup_disk_space.py --dry-run

# Normal cleanup (6h artifacts, 24h logs)
python cleanup_disk_space.py

# Aggressive cleanup (1h artifacts, all logs)
python cleanup_disk_space.py --aggressive

# Emergency cleanup with preview
python cleanup_disk_space.py --dry-run --aggressive
```

**Cleanup Targets:**
- **Artifacts**: Dagster run artifacts older than 6 hours (1 hour in aggressive mode)
- **Logs**: Log files older than 24 hours (all logs in aggressive mode)
- **Database**: Metrics older than 90 days + VACUUM operation
- **Locations**: `/data/artifacts`, `/tmp/dagster`, `/data/dagster_storage`

**Safety Features:**
- **Dry Run Mode**: Safe preview of cleanup actions
- **Detailed Reporting**: Shows exactly what will be/was removed
- **Error Handling**: Continues cleanup even if individual files fail
- **Size Calculation**: Reports space freed by cleanup operations

## Common Maintenance Tasks

### Regular Cleanup Operations
Expand Down
Loading