Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .example.env
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ ANOMSTACK_LLM_PLATFORM=openai
# some dagster env vars
DAGSTER_LOG_LEVEL=DEBUG
DAGSTER_CONCURRENCY=4
DAGSTER_HOME=/opt/dagster/dagster_home

# max runtime for a job in dagster
# https://docs.dagster.io/deployment/run-monitoring#general-run-timeouts
Expand Down
20 changes: 12 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ SHELL=/bin/bash

.PHONY: local locald kill-locald ps-locald dev

# start dagster locally
# start dagster locally (simple - just set DAGSTER_HOME directly)
local:
dagster dev -f anomstack/main.py
DAGSTER_HOME=$$(pwd)/dagster_home dagster dev -f anomstack/main.py

# start dagster locally as a daemon with no log file
locald:
Expand Down Expand Up @@ -124,10 +124,6 @@ docker-restart-dashboard:
docker-restart-code:
docker compose restart anomstack_code

# stop all containers
docker-stop:
docker compose down

# alias for docker-stop
docker-down:
docker compose down
Expand Down Expand Up @@ -217,8 +213,16 @@ requirements-install:

# run the PostHog example ingest function
posthog-example:
python scripts/posthog_example.py
python scripts/posthog_example.py

# kill any dagster runs exceeding configured timeout
kill-long-runs:
python scripts/kill_long_running_tasks.py
python scripts/kill_long_running_tasks.py

# run docker in dev mode with correct environment
docker-dev-env:
docker compose -f docker-compose.yaml -f docker-compose.dev.yaml up -d

# stop docker containers
docker-stop:
docker compose -f docker-compose.yaml -f docker-compose.dev.yaml down
16 changes: 6 additions & 10 deletions dagster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,13 @@ retention:
failure: 7
success: 7

#schedules:
# use_threads: true
# num_workers: 8
schedules:
use_threads: true
num_workers: 8

#sensors:
# use_threads: true
# num_workers: 4
sensors:
use_threads: true
num_workers: 4

telemetry:
enabled: true

# kill sensor configuration
kill_sensor:
kill_after_minutes: 60
10 changes: 7 additions & 3 deletions dagster_docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ run_monitoring:
start_timeout_seconds: 600
cancel_timeout_seconds: 300

# kill sensor configuration
kill_sensor:
kill_after_minutes: 60
schedules:
use_threads: true
num_workers: 8

sensors:
use_threads: true
num_workers: 4

schedule_storage:
module: dagster_postgres.schedule_storage
Expand Down
Empty file added dagster_home/dagster.yaml
Empty file.
9 changes: 6 additions & 3 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ services:
- ./tmp:/opt/dagster/app/tmp
- anomstack_metrics_duckdb:/metrics_db/duckdb
- ./dagster_home:/opt/dagster/dagster_home
- ./dagster_docker.yaml:/opt/dagster/dagster_home/dagster.yaml
Copy link

Copilot AI Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Mounting dagster_docker.yaml directly over dagster.yaml could overwrite any existing dagster.yaml file in the container. This tight coupling makes it difficult to have different configurations per service. Consider using a more explicit configuration approach or documenting this behavior clearly.

Suggested change
- ./dagster_docker.yaml:/opt/dagster/dagster_home/dagster.yaml
- ./dagster_docker.yaml:/opt/dagster/dagster_home/dagster_docker.yaml

Copilot uses AI. Check for mistakes.
env_file:
- .env
environment:
Expand All @@ -52,7 +53,7 @@ services:
DAGSTER_POSTGRES_DB: "${ANOMSTACK_POSTGRES_DB:-postgres_db}"
DAGSTER_CURRENT_IMAGE: "andrewm4894/anomstack_code:latest"
ANOMSTACK_DUCKDB_PATH: "/metrics_db/duckdb/anomstack.db"
DAGSTER_HOME: "${DAGSTER_HOME:-/opt/dagster/dagster_home}"
DAGSTER_HOME: "/opt/dagster/dagster_home"
networks:
- anomstack_network
healthcheck:
Expand Down Expand Up @@ -95,13 +96,14 @@ services:
DAGSTER_POSTGRES_USER: "${ANOMSTACK_POSTGRES_USER:-postgres_user}"
DAGSTER_POSTGRES_PASSWORD: "${ANOMSTACK_POSTGRES_PASSWORD:-postgres_password}"
DAGSTER_POSTGRES_DB: "${ANOMSTACK_POSTGRES_DB:-postgres_db}"
DAGSTER_HOME: "${DAGSTER_HOME:-/opt/dagster/dagster_home}"
DAGSTER_HOME: "/opt/dagster/dagster_home"
volumes: # Make docker client accessible so we can terminate containers from the webserver
- /var/run/docker.sock:/var/run/docker.sock
- /tmp/io_manager_storage:/tmp/io_manager_storage
- ./tmp:/opt/dagster/app/tmp
- anomstack_metrics_duckdb:/metrics_db/duckdb
- ./dagster_home:/opt/dagster/dagster_home
- ./dagster_docker.yaml:/opt/dagster/dagster_home/dagster.yaml
networks:
- anomstack_network
depends_on:
Expand Down Expand Up @@ -137,13 +139,14 @@ services:
DAGSTER_POSTGRES_USER: "${ANOMSTACK_POSTGRES_USER:-postgres_user}"
DAGSTER_POSTGRES_PASSWORD: "${ANOMSTACK_POSTGRES_PASSWORD:-postgres_password}"
DAGSTER_POSTGRES_DB: "${ANOMSTACK_POSTGRES_DB:-postgres_db}"
DAGSTER_HOME: "${DAGSTER_HOME:-/opt/dagster/dagster_home}"
DAGSTER_HOME: "/opt/dagster/dagster_home"
volumes: # Make docker client accessible so we can launch containers using host docker
- /var/run/docker.sock:/var/run/docker.sock
- /tmp/io_manager_storage:/tmp/io_manager_storage
- ./tmp:/opt/dagster/app/tmp
- anomstack_metrics_duckdb:/metrics_db/duckdb
- ./dagster_home:/opt/dagster/dagster_home
- ./dagster_docker.yaml:/opt/dagster/dagster_home/dagster.yaml
networks:
- anomstack_network
depends_on:
Expand Down
11 changes: 9 additions & 2 deletions scripts/kill_long_running_tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys
from pathlib import Path
from datetime import datetime, timedelta, timezone

Expand All @@ -17,8 +18,14 @@
os.environ["ANOMSTACK_DAGSTER_LOCAL_COMPUTE_LOG_MANAGER_DIRECTORY"] = "tmp"
os.environ["ANOMSTACK_DAGSTER_LOCAL_ARTIFACT_STORAGE_DIR"] = "tmp"

# Cutoff for long-running (1 hour ago)
cutoff_time = datetime.now(timezone.utc) - timedelta(hours=1)
# Add the parent directory to sys.path to import the sensor module
sys.path.append(str(script_dir.parent))
Copy link

Copilot AI Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modifying sys.path at runtime can lead to import conflicts and makes the code harder to maintain. Consider using relative imports or installing the package in development mode with 'pip install -e .' instead.

Copilot uses AI. Check for mistakes.
from anomstack.sensors.timeout import get_kill_after_minutes

# Use the same configurable timeout as the sensor
kill_after_minutes = get_kill_after_minutes()
cutoff_time = datetime.now(timezone.utc) - timedelta(minutes=kill_after_minutes)
print(f"Using {kill_after_minutes} minute timeout")
Comment on lines +23 to +28
Copy link

Copilot AI Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import could fail if the anomstack.sensors.timeout module doesn't exist or doesn't contain the get_kill_after_minutes function. Consider adding error handling to gracefully fall back to a default timeout value if the import fails.

Suggested change
from anomstack.sensors.timeout import get_kill_after_minutes
# Use the same configurable timeout as the sensor
kill_after_minutes = get_kill_after_minutes()
cutoff_time = datetime.now(timezone.utc) - timedelta(minutes=kill_after_minutes)
print(f"Using {kill_after_minutes} minute timeout")
try:
from anomstack.sensors.timeout import get_kill_after_minutes
# Use the same configurable timeout as the sensor
kill_after_minutes = get_kill_after_minutes()
print(f"Using {kill_after_minutes} minute timeout")
except (ImportError, AttributeError) as e:
print(f"⚠️ Warning: Could not import 'get_kill_after_minutes' or call it: {e}")
kill_after_minutes = 60 # Default timeout value
print(f"Using default timeout of {kill_after_minutes} minutes")
cutoff_time = datetime.now(timezone.utc) - timedelta(minutes=kill_after_minutes)

Copilot uses AI. Check for mistakes.

instance = DagsterInstance.get()
running_runs = instance.get_runs(filters=RunsFilter(statuses=[DagsterRunStatus.STARTED]))
Expand Down