Skip to content
Merged
Binary file added .DS_Store
Binary file not shown.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,4 @@ cython_debug/
.pypirc

# Local Netlify folder
.netlify
.netlify
119 changes: 64 additions & 55 deletions backend/app/core/orchestration/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from typing import Dict, Any, Callable, Optional
from datetime import datetime
from enum import Enum
import aio_pika
import json
from app.core.config import settings

logger = logging.getLogger(__name__)

Expand All @@ -12,27 +15,45 @@ class QueuePriority(str, Enum):
LOW = "low"

class AsyncQueueManager:
"""AsyncIO-based queue manager for agent orchestration"""
"""Queue manager for agent orchestration"""

def __init__(self):
self.queues = {
QueuePriority.HIGH: asyncio.Queue(),
QueuePriority.MEDIUM: asyncio.Queue(),
QueuePriority.LOW: asyncio.Queue()
QueuePriority.HIGH: 'high_task_queue',
QueuePriority.MEDIUM: 'medium_task_queue',
QueuePriority.LOW: 'low_task_queue'
}
self.handlers: Dict[str, Callable] = {}
self.running = False
self.worker_tasks = []
self.connection: Optional[aio_pika.RobustConnection] = None
self.channel: Optional[aio_pika.abc.AbstractChannel] = None



async def connect(self):
try:
rabbitmq_url = getattr(settings, 'rabbitmq_url', 'amqp://guest:guest@localhost/')
self.connection = await aio_pika.connect_robust(rabbitmq_url)
self.channel = await self.connection.channel()
# Declare queues
for queue_name in self.queues.values():
await self.channel.declare_queue(queue_name, durable=True)
logger.info("Successfully connected to RabbitMQ")
except Exception as e:
logger.error(f"Failed to connect to RabbitMQ: {e}")
raise

async def start(self, num_workers: int = 3):
"""Start the queue processing workers"""
await self.connect()
self.running = True

for i in range(num_workers):
task = asyncio.create_task(self._worker(f"worker-{i}"))
self.worker_tasks.append(task)

logger.info(f"Started {num_workers} queue workers")
logger.info(f"Started {num_workers} async queue workers")

async def stop(self):
"""Stop the queue processing"""
Expand All @@ -43,7 +64,11 @@ async def stop(self):
task.cancel()

await asyncio.gather(*self.worker_tasks, return_exceptions=True)
logger.info("Stopped all queue workers")
if self.channel:
await self.channel.close()
if self.connection:
await self.connection.close()
logger.info("Stopped all queue workers and closed connection")

async def enqueue(self,
message: Dict[str, Any],
Expand All @@ -56,13 +81,15 @@ async def enqueue(self,

queue_item = {
"id": message.get("id", f"msg_{datetime.now().timestamp()}"),
"timestamp": datetime.now().isoformat(),
"priority": priority,
"data": message
}

await self.queues[priority].put(queue_item)
logger.debug(f"Enqueued message {queue_item['id']} with priority {priority}")
json_message = json.dumps(queue_item).encode()
await self.channel.default_exchange.publish(
aio_pika.Message(body=json_message),
routing_key=self.queues[priority]
)
logger.info(f"Enqueued message {queue_item['id']} with priority {priority}")

def register_handler(self, message_type: str, handler: Callable):
"""Register a handler for a specific message type"""
Expand All @@ -72,50 +99,29 @@ def register_handler(self, message_type: str, handler: Callable):
async def _worker(self, worker_name: str):
"""Worker coroutine to process queue items"""
logger.info(f"Started queue worker: {worker_name}")

# Each worker listens to all queues by priority
queues = [
await self.channel.declare_queue(self.queues[priority], durable=True)
for priority in [QueuePriority.HIGH, QueuePriority.MEDIUM, QueuePriority.LOW]
]
while self.running:
try:
# Process queues by priority
item = await self._get_next_item()

if item:
await self._process_item(item, worker_name)
else:
# No items available, wait a bit
await asyncio.sleep(0.1)

except asyncio.CancelledError:
logger.info(f"Worker {worker_name} cancelled")
break
except (ConnectionError, TimeoutError) as e:
logger.error(f"Connection error in worker {worker_name}: {str(e)}")
await asyncio.sleep(5) # Longer pause for connection issues
except Exception as e:
logger.error(f"Unexpected error in worker {worker_name}: {str(e)}")
await asyncio.sleep(1) # Brief pause on error

async def _get_next_item(self) -> Optional[Dict[str, Any]]:
"""Get the next item from queues (priority-based)"""

# Try high priority first
try:
return self.queues[QueuePriority.HIGH].get_nowait()
except asyncio.QueueEmpty:
pass

# Then medium priority
try:
return self.queues[QueuePriority.MEDIUM].get_nowait()
except asyncio.QueueEmpty:
pass

# Finally low priority
try:
return self.queues[QueuePriority.LOW].get_nowait()
except asyncio.QueueEmpty:
pass

return None
for queue in queues:
try:
message = await queue.get(no_ack=False, fail=False)
if message:
try:
item = json.loads(message.body.decode())
await self._process_item(item, worker_name)
await message.ack()
except Exception as e:
logger.error(f"Error processing message: {e}")
await message.nack(requeue=False)
except asyncio.CancelledError:
logger.info(f"Worker {worker_name} cancelled")
return
except Exception as e:
logger.error(f"Worker {worker_name} error: {e}")
await asyncio.sleep(0.1)

async def _process_item(self, item: Dict[str, Any], worker_name: str):
"""Process a queue item"""
Expand All @@ -127,9 +133,12 @@ async def _process_item(self, item: Dict[str, Any], worker_name: str):

if handler:
logger.debug(f"Worker {worker_name} processing {item['id']} (type: {message_type})")
await handler(message_data)
if asyncio.iscoroutinefunction(handler):
await handler(message_data)
else:
handler(message_data)
else:
logger.warning(f"No handler found for message type: {message_type}")

except Exception as e:
logger.error(f"Error processing item {item['id']}: {str(e)}")
logger.error(f"Error processing item {item.get('id', 'unknown')}: {str(e)}")
2 changes: 1 addition & 1 deletion backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,4 @@ async def favicon():
host="0.0.0.0",
port=8000,
reload=True
)
)
Loading