Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -216,18 +216,24 @@ fly-deploy-development:
fly-deploy-demo-fresh:
@echo "🧹 Cleaning local Docker cache to ensure fresh build..."
docker system prune -f --filter "until=1h"
@echo "🧹 Cleaning Docker builder cache..."
docker builder prune -f 2>/dev/null || true
./scripts/deployment/deploy_fly.sh --profile demo --force-rebuild

# deploy with fresh build (clears local Docker cache first) - production profile
fly-deploy-production-fresh:
@echo "🧹 Cleaning local Docker cache to ensure fresh build..."
docker system prune -f --filter "until=1h"
@echo "🧹 Cleaning Docker builder cache..."
docker builder prune -f 2>/dev/null || true
./scripts/deployment/deploy_fly.sh --profile production --force-rebuild

# deploy with fresh build (clears local Docker cache first) - development profile
fly-deploy-development-fresh:
@echo "🧹 Cleaning local Docker cache to ensure fresh build..."
docker system prune -f --filter "until=1h"
@echo "🧹 Cleaning Docker builder cache..."
docker builder prune -f 2>/dev/null || true
./scripts/deployment/deploy_fly.sh --profile development --force-rebuild

# test fly.io build locally before deploying (helps catch issues early)
Expand Down Expand Up @@ -416,6 +422,26 @@ posthog-example:
kill-long-runs:
python scripts/maintenance/kill_long_running_tasks.py

# clean up disk space on fly instance (requires SSH access)
fly-cleanup:
@echo "🧹 Running disk cleanup on Fly instance..."
@echo "This will SSH into your Fly instance and run cleanup"
@if [ -z "$$FLY_APP" ]; then echo "Set FLY_APP environment variable"; exit 1; fi
fly ssh console -a $$FLY_APP -C "cd /opt/dagster/app && python scripts/maintenance/cleanup_disk_space.py"

# preview cleanup on fly instance (dry run)
fly-cleanup-preview:
@echo "🔍 Previewing disk cleanup on Fly instance..."
@if [ -z "$$FLY_APP" ]; then echo "Set FLY_APP environment variable"; exit 1; fi
fly ssh console -a $$FLY_APP -C "cd /opt/dagster/app && python scripts/maintenance/cleanup_disk_space.py --dry-run"

# aggressive cleanup for emergency situations
fly-cleanup-aggressive:
@echo "⚡ Running AGGRESSIVE disk cleanup on Fly instance..."
@echo "This will remove more files - use only if disk is critically full"
@if [ -z "$$FLY_APP" ]; then echo "Set FLY_APP environment variable"; exit 1; fi
fly ssh console -a $$FLY_APP -C "cd /opt/dagster/app && python scripts/maintenance/cleanup_disk_space.py --aggressive"

# run docker in dev mode with correct environment
docker-dev-env:
docker compose -f docker-compose.yaml -f docker-compose.dev.yaml up -d
Expand Down
37 changes: 37 additions & 0 deletions Makefile.md
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,43 @@ make posthog-example
make kill-long-runs
```

### Fly.io Disk Space Management

#### `make fly-cleanup-preview`
**Preview disk cleanup on Fly instance (dry run)**
- Shows what files would be removed
- Safe way to check cleanup impact
- Requires `FLY_APP` environment variable

```bash
export FLY_APP=anomstack-demo
make fly-cleanup-preview
```

#### `make fly-cleanup`
**Clean up disk space on Fly instance**
- Removes old artifacts (6+ hours)
- Removes old logs (24+ hours)
- Cleans database and runs VACUUM
- Reports disk usage before/after

```bash
export FLY_APP=anomstack-demo
make fly-cleanup
```

#### `make fly-cleanup-aggressive`
**Emergency disk cleanup (aggressive mode)**
- Removes artifacts older than 1 hour
- Removes ALL log files
- Use only when disk is critically full
- More thorough than normal cleanup

```bash
export FLY_APP=anomstack-demo
make fly-cleanup-aggressive
```

### Legacy Targets

#### `make docker-dev-env`
Expand Down
187 changes: 187 additions & 0 deletions anomstack/jobs/cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""
Cleanup job for managing disk space and removing old artifacts.
"""

import os
import shutil
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path

from dagster import DefaultScheduleStatus, JobDefinition, ScheduleDefinition, job, op, get_dagster_logger


@op
def cleanup_old_artifacts():
"""Clean up old Dagster artifacts to free disk space."""
logger = get_dagster_logger()

artifacts_path = "/data/artifacts/storage"
if not os.path.exists(artifacts_path):
logger.info("Artifacts directory does not exist, skipping cleanup")
return

# Remove artifacts older than 6 hours
cutoff_time = datetime.now() - timedelta(hours=6)
removed_count = 0
freed_bytes = 0

try:
for item in os.listdir(artifacts_path):
item_path = os.path.join(artifacts_path, item)
if os.path.isdir(item_path):
# Get directory modification time
mod_time = datetime.fromtimestamp(os.path.getmtime(item_path))
if mod_time < cutoff_time:
# Calculate size before removal
try:
size = sum(
os.path.getsize(os.path.join(dirpath, filename))
for dirpath, dirnames, filenames in os.walk(item_path)
for filename in filenames
)
shutil.rmtree(item_path)
removed_count += 1
freed_bytes += size
logger.info(f"Removed old artifact directory: {item}")
except Exception as e:
logger.warning(f"Failed to remove {item_path}: {e}")

freed_mb = freed_bytes / (1024 * 1024)
logger.info(f"Cleanup complete: removed {removed_count} directories, freed {freed_mb:.1f}MB")

except Exception as e:
logger.error(f"Error during artifact cleanup: {e}")


@op
def cleanup_old_logs():
"""Clean up old log files."""
logger = get_dagster_logger()

log_dirs = ["/tmp/dagster", "/data/dagster_storage"]
removed_count = 0
freed_bytes = 0

for log_dir in log_dirs:
if not os.path.exists(log_dir):
continue

try:
for root, dirs, files in os.walk(log_dir):
for file in files:
if file.endswith(('.log', '.out', '.err')):
file_path = os.path.join(root, file)
# Remove log files older than 24 hours
if os.path.getmtime(file_path) < (datetime.now() - timedelta(hours=24)).timestamp():
Comment on lines +75 to +76
Copy link

Copilot AI Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The datetime.now() - timedelta(hours=24) calculation is performed for every file in the loop. Consider calculating this cutoff timestamp once before the loop for better performance.

Suggested change
# Remove log files older than 24 hours
if os.path.getmtime(file_path) < (datetime.now() - timedelta(hours=24)).timestamp():
if os.path.getmtime(file_path) < cutoff_time:

Copilot uses AI. Check for mistakes.
try:
size = os.path.getsize(file_path)
os.remove(file_path)
removed_count += 1
freed_bytes += size
except Exception as e:
logger.warning(f"Failed to remove log file {file_path}: {e}")
except Exception as e:
logger.warning(f"Error cleaning logs in {log_dir}: {e}")

freed_mb = freed_bytes / (1024 * 1024)
logger.info(f"Log cleanup complete: removed {removed_count} files, freed {freed_mb:.1f}MB")


@op
def cleanup_old_metrics():
"""Clean up old metric data from database."""
logger = get_dagster_logger()

db_path = "/data/anomstack.db"
if not os.path.exists(db_path):
logger.info("Database does not exist, skipping metric cleanup")
return

try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# Remove metrics older than 90 days
cutoff_date = (datetime.now() - timedelta(days=90)).strftime('%Y-%m-%d')

# Get count before deletion
cursor.execute("SELECT COUNT(*) FROM metrics WHERE metric_timestamp < ?", (cutoff_date,))
old_count = cursor.fetchone()[0]

# Delete old metrics
cursor.execute("DELETE FROM metrics WHERE metric_timestamp < ?", (cutoff_date,))

# Vacuum to reclaim space
cursor.execute("VACUUM")

conn.commit()
conn.close()

logger.info(f"Database cleanup complete: removed {old_count} old metric records")

except Exception as e:
logger.error(f"Error during database cleanup: {e}")


@op
def report_disk_usage():
"""Report current disk usage."""
logger = get_dagster_logger()

try:
# Get disk usage for /data
statvfs = os.statvfs('/data')
total_bytes = statvfs.f_frsize * statvfs.f_blocks
free_bytes = statvfs.f_frsize * statvfs.f_bavail
used_bytes = total_bytes - free_bytes

total_gb = total_bytes / (1024 ** 3)
used_gb = used_bytes / (1024 ** 3)
free_gb = free_bytes / (1024 ** 3)
usage_percent = (used_bytes / total_bytes) * 100

logger.info(f"Disk usage - Total: {total_gb:.1f}GB, Used: {used_gb:.1f}GB ({usage_percent:.1f}%), Free: {free_gb:.1f}GB")

# Get directory sizes
data_dirs = ['/data/artifacts', '/data/dagster_storage', '/data/models']
for dir_path in data_dirs:
if os.path.exists(dir_path):
try:
total_size = sum(
os.path.getsize(os.path.join(dirpath, filename))
for dirpath, dirnames, filenames in os.walk(dir_path)
for filename in filenames
)
size_gb = total_size / (1024 ** 3)
logger.info(f"{dir_path}: {size_gb:.2f}GB")
except Exception as e:
logger.warning(f"Could not calculate size for {dir_path}: {e}")

except Exception as e:
logger.error(f"Error reporting disk usage: {e}")


@job(
name="cleanup_disk_space",
description="Clean up old artifacts, logs, and metrics to free disk space"
)
def cleanup_job():
"""Job to clean up disk space."""
report_disk_usage()
cleanup_old_artifacts()
cleanup_old_logs()
Copy link

Copilot AI Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is trailing whitespace after 'cleanup_old_logs()'.

Suggested change
cleanup_old_logs()
cleanup_old_logs()

Copilot uses AI. Check for mistakes.
cleanup_old_metrics()
report_disk_usage() # Report again after cleanup


# Create schedule to run cleanup every 2 hours
cleanup_schedule = ScheduleDefinition(
job=cleanup_job,
cron_schedule="0 */2 * * *", # Every 2 hours
default_status=DefaultScheduleStatus.RUNNING,
)

# Export for main.py
cleanup_jobs = [cleanup_job]
cleanup_schedules = [cleanup_schedule]
Copy link

Copilot AI Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is trailing whitespace after 'cleanup_schedule]'.

Suggested change
cleanup_schedules = [cleanup_schedule]
cleanup_schedules = [cleanup_schedule]

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove trailing whitespace at the end of the file.

Suggested change
cleanup_schedules = [cleanup_schedule]
cleanup_schedules = [cleanup_schedule]

Copilot uses AI. Check for mistakes.
3 changes: 3 additions & 0 deletions anomstack/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from anomstack.jobs.alert import alert_jobs, alert_schedules
from anomstack.jobs.change import change_jobs, change_schedules
# from anomstack.jobs.cleanup import cleanup_jobs, cleanup_schedules # Temporarily disabled
from anomstack.jobs.delete import delete_jobs, delete_schedules
from anomstack.jobs.ingest import ingest_jobs, ingest_schedules
from anomstack.jobs.llmalert import llmalert_jobs, llmalert_schedules
Expand All @@ -29,6 +30,7 @@
+ summary_jobs
+ delete_jobs
+ reload_jobs
# + cleanup_jobs # Temporarily disabled
)
sensors = [email_on_run_failure, kill_long_running_runs, config_file_watcher]
schedules = (
Expand All @@ -42,6 +44,7 @@
+ summary_schedules
+ delete_schedules
+ reload_schedules
# + cleanup_schedules # Temporarily disabled
)

defs = Definitions(
Expand Down
8 changes: 4 additions & 4 deletions dagster_fly.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ run_retries:
# Aggressive retention policies optimized for Fly.io disk usage
retention:
schedule:
purge_after_days: 2 # Keep for 2 days
purge_after_days: 1 # Keep for 1 day only
sensor:
purge_after_days:
skipped: 1
failure: 2
success: 1
skipped: 1 # 1 day for skipped (minimum allowed by Dagster)
failure: 1 # 1 day for failures
success: 1 # 1 day for successful runs (minimum allowed by Dagster)

# Enhanced run monitoring for Fly.io environment
run_monitoring:
Expand Down
3 changes: 3 additions & 0 deletions docker/Dockerfile.fly
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ FROM python:3.12-slim
# Cache busting argument (set during build to force fresh layers)
ARG CACHEBUST=1

# Use CACHEBUST to invalidate cache when needed (this layer changes when CACHEBUST changes)
RUN echo "Cache bust: $CACHEBUST" > /tmp/cachebust

# Install system dependencies including nginx
RUN apt-get update && apt-get install -y --no-install-recommends \
git \
Expand Down
17 changes: 15 additions & 2 deletions scripts/deployment/deploy_fly.sh
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,23 @@ rm fly.toml.bak
echo "🚀 Deploying application..."

if [[ "$FORCE_REBUILD" == "true" ]]; then
# Generate unique cache busting value with timestamp + random
CACHEBUST_VALUE="$(date +%s)-$(openssl rand -hex 4 2>/dev/null || echo $RANDOM)"
echo "🔄 Force rebuild enabled - using aggressive cache busting..."
fly deploy --no-cache --build-arg CACHEBUST="$(date +%s)" -a "$APP_NAME"
echo "🎯 Cache bust value: $CACHEBUST_VALUE"

# Use multiple cache busting strategies:
# 1. --no-cache: Skip Docker layer cache
# 2. CACHEBUST build arg: Force rebuild of layers that use it
# 3. --dockerfile: Explicit dockerfile path to avoid confusion
fly deploy \
--no-cache \
--build-arg CACHEBUST="$CACHEBUST_VALUE" \
--dockerfile docker/Dockerfile.fly \
-a "$APP_NAME"
else
fly deploy --no-cache -a "$APP_NAME"
echo "⚡ Standard deployment (with caching)..."
fly deploy --dockerfile docker/Dockerfile.fly -a "$APP_NAME"
fi

# Show the status
Expand Down
Loading