-
Notifications
You must be signed in to change notification settings - Fork 69
Rabbit MQ Integration #88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rabbit MQ Integration #88
Conversation
WalkthroughThe Changes
Sequence Diagram(s)sequenceDiagram
participant Producer as AsyncQueueManager.enqueue
participant RabbitMQ as RabbitMQ Broker
participant Worker as AsyncQueueManager._worker
participant Handler as Registered Handler
Producer->>RabbitMQ: Publish message (JSON) to queue (via aio_pika)
Worker->>RabbitMQ: Consume message from queue
RabbitMQ-->>Worker: Deliver message
Worker->>Handler: Process message (async/sync)
Handler-->>Worker: Return result
Worker->>RabbitMQ: Ack/Nack message
Assessment against linked issues
Assessment against linked issues: Out-of-scope changes
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
backend/app/core/orchestration/queue_manager.py (1)
64-83: Add error handling and improve delay implementationThe method needs error handling for publish operations and the delay implementation blocks the caller.
Consider this improvement:
async def enqueue(self, message: Dict[str, Any], priority: QueuePriority = QueuePriority.MEDIUM, delay: float = 0): """Add a message to the queue""" + if not self.channel: + raise RuntimeError("Queue manager not connected") if delay > 0: - await asyncio.sleep(delay) + # Schedule delayed message publishing + asyncio.create_task(self._enqueue_delayed(message, priority, delay)) + return queue_item = { "id": message.get("id", f"msg_{datetime.now().timestamp()}"), "priority": priority, "data": message } - json_message = json.dumps(queue_item).encode() - await self.channel.default_exchange.publish( - aio_pika.Message(body=json_message), - routing_key=self.queues[priority] - ) - logger.info(f"Enqueued message {queue_item['id']} with priority {priority}") + try: + json_message = json.dumps(queue_item).encode() + await self.channel.default_exchange.publish( + aio_pika.Message(body=json_message), + routing_key=self.queues[priority] + ) + logger.info(f"Enqueued message {queue_item['id']} with priority {priority}") + except Exception as e: + logger.error(f"Failed to enqueue message: {e}") + raise + +async def _enqueue_delayed(self, message: Dict[str, Any], priority: QueuePriority, delay: float): + """Helper to enqueue message after delay""" + await asyncio.sleep(delay) + await self.enqueue(message, priority, 0)
🧹 Nitpick comments (2)
backend/app/core/orchestration/queue_manager.py (2)
90-115: Optimize worker queue consumption strategyThe current implementation has workers redundantly declaring queues and all workers consume from all queues, which may not provide optimal priority handling.
Consider assigning workers to specific priority queues:
-async def _worker(self, worker_name: str): +async def _worker(self, worker_name: str, worker_id: int, num_workers: int): """Worker coroutine to process queue items""" logger.info(f"Started queue worker: {worker_name}") - # Each worker listens to all queues by priority - queues = [ - await self.channel.declare_queue(self.queues[priority], durable=True) - for priority in [QueuePriority.HIGH, QueuePriority.MEDIUM, QueuePriority.LOW] - ] + # Assign workers to priority queues + # First 50% to HIGH, next 30% to MEDIUM, last 20% to LOW + priorities = [] + if worker_id < num_workers * 0.5: + priorities = [QueuePriority.HIGH, QueuePriority.MEDIUM] + elif worker_id < num_workers * 0.8: + priorities = [QueuePriority.MEDIUM, QueuePriority.LOW] + else: + priorities = [QueuePriority.LOW] + + # Get existing queues (already declared in connect) + queues = [] + for priority in priorities: + queue = await self.channel.get_queue(self.queues[priority]) + queues.append(queue)Also update the start method to pass worker_id:
-task = asyncio.create_task(self._worker(f"worker-{i}")) +task = asyncio.create_task(self._worker(f"worker-{i}", i, num_workers))
16-136: Add tests and connection resilienceAs mentioned in the learnings from PR #72, this RabbitMQ implementation was planned. However, the PR description confirms no tests were added.
Given the critical nature of the queue manager:
- Unit tests are essential for this implementation
- Consider adding connection retry logic for production resilience
- Add health check methods for monitoring
Would you like me to generate comprehensive unit tests for the AsyncQueueManager class or open an issue to track this?
Additionally, consider using
aio_pika.connect_robustconnection features:# Example of configuring robust connection self.connection = await aio_pika.connect_robust( rabbitmq_url, reconnect_interval=5, connection_attempts=3, heartbeat=600 )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
.gitignore(1 hunks)backend/app/core/orchestration/queue_manager.py(6 hunks)backend/main.py(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: smokeyScraper
PR: AOSSIE-Org/Devr.AI#72
File: backend/app/core/orchestration/queue_manager.py:48-66
Timestamp: 2025-06-08T13:27:45.522Z
Learning: The queue manager implementation in backend/app/core/orchestration/queue_manager.py is temporary and will be replaced with RabbitMQ in the future.
Learnt from: smokeyScraper
PR: AOSSIE-Org/Devr.AI#87
File: tests/test_supabase.py:1-3
Timestamp: 2025-06-28T23:15:13.374Z
Learning: In the Devr.AI project, smokeyScraper prefers to defer test updates and fixes (like missing imports after module reorganization) to separate PRs rather than expanding the scope of module update/chore PRs to include comprehensive test refactoring.
Learnt from: smokeyScraper
PR: AOSSIE-Org/Devr.AI#85
File: tests/test_supabase.py:1-3
Timestamp: 2025-06-28T14:45:55.244Z
Learning: In the Devr.AI project, smokeyScraper prefers to defer comprehensive test refactoring to separate PRs/efforts when doing major backend restructuring, rather than expanding the scope of the current refactoring PR to include test updates.
backend/app/core/orchestration/queue_manager.py (1)
Learnt from: smokeyScraper
PR: AOSSIE-Org/Devr.AI#72
File: backend/app/core/orchestration/queue_manager.py:48-66
Timestamp: 2025-06-08T13:27:45.522Z
Learning: The queue manager implementation in backend/app/core/orchestration/queue_manager.py is temporary and will be replaced with RabbitMQ in the future.
🔇 Additional comments (1)
backend/main.py (1)
117-121: LGTM! Improved readabilityThe multi-line conditional format is more readable than the inline ternary expression.
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
|
@smokeyScraper Please Do review and close, already pushed fixes suggested by code rabbit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
backend/app/core/orchestration/queue_manager.py (2)
31-33: Fix excessive blank lines.Static analysis correctly identifies too many blank lines here.
- - +
103-124: Consider optimizing the worker polling strategy.While the implementation is functionally correct with proper error handling and acknowledgments, the current approach has each worker polling all queues sequentially, which may not be optimal for performance.
Consider using RabbitMQ's consumer pattern for more efficient message consumption:
async def _worker(self, worker_name: str): """Worker coroutine to process queue items""" logger.info(f"Started queue worker: {worker_name}") - # Each worker listens to all queues by priority - queues = [ - await self.channel.declare_queue(self.queues[priority], durable=True) - for priority in [QueuePriority.HIGH, QueuePriority.MEDIUM, QueuePriority.LOW] - ] + + async def process_message(message: aio_pika.IncomingMessage): + try: + item = json.loads(message.body.decode()) + await self._process_item(item, worker_name) + await message.ack() + except Exception as e: + logger.error(f"Error processing message: {e}") + await message.nack(requeue=False) + + # Set up consumers for all queues + consumers = [] + for priority in [QueuePriority.HIGH, QueuePriority.MEDIUM, QueuePriority.LOW]: + queue = await self.channel.declare_queue(self.queues[priority], durable=True) + consumer = await queue.consume(process_message) + consumers.append(consumer) + while self.running: - for queue in queues: - try: - message = await queue.get(no_ack=False, fail=False) - if message: - try: - item = json.loads(message.body.decode()) - await self._process_item(item, worker_name) - await message.ack() - except Exception as e: - logger.error(f"Error processing message: {e}") - await message.nack(requeue=False) - except asyncio.CancelledError: - logger.info(f"Worker {worker_name} cancelled") - return - except Exception as e: - logger.error(f"Worker {worker_name} error: {e}") - await asyncio.sleep(0.1) + try: + await asyncio.sleep(1) # Keep worker alive + except asyncio.CancelledError: + logger.info(f"Worker {worker_name} cancelled") + # Cancel consumers + for consumer in consumers: + await consumer.cancel() + returnThis approach would be more efficient as it uses RabbitMQ's push-based consumption instead of polling.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
.DS_Storeis excluded by!**/.DS_Store
📒 Files selected for processing (4)
.gitignore(1 hunks)backend/app/core/orchestration/queue_manager.py(6 hunks)backend/main.py(1 hunks)backend/requirements.txt(1 hunks)
✅ Files skipped from review due to trivial changes (2)
- .gitignore
- backend/main.py
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: smokeyScraper
PR: AOSSIE-Org/Devr.AI#87
File: tests/test_supabase.py:1-3
Timestamp: 2025-06-28T23:15:13.374Z
Learning: In the Devr.AI project, smokeyScraper prefers to defer test updates and fixes (like missing imports after module reorganization) to separate PRs rather than expanding the scope of module update/chore PRs to include comprehensive test refactoring.
Learnt from: smokeyScraper
PR: AOSSIE-Org/Devr.AI#72
File: backend/app/core/orchestration/queue_manager.py:48-66
Timestamp: 2025-06-08T13:27:45.522Z
Learning: The queue manager implementation in backend/app/core/orchestration/queue_manager.py is temporary and will be replaced with RabbitMQ in the future.
Learnt from: smokeyScraper
PR: AOSSIE-Org/Devr.AI#85
File: tests/test_supabase.py:1-3
Timestamp: 2025-06-28T14:45:55.244Z
Learning: In the Devr.AI project, smokeyScraper prefers to defer comprehensive test refactoring to separate PRs/efforts when doing major backend restructuring, rather than expanding the scope of the current refactoring PR to include test updates.
backend/app/core/orchestration/queue_manager.py (1)
Learnt from: smokeyScraper
PR: AOSSIE-Org/Devr.AI#72
File: backend/app/core/orchestration/queue_manager.py:48-66
Timestamp: 2025-06-08T13:27:45.522Z
Learning: The queue manager implementation in backend/app/core/orchestration/queue_manager.py is temporary and will be replaced with RabbitMQ in the future.
🪛 Flake8 (7.2.0)
backend/app/core/orchestration/queue_manager.py
[error] 34-34: too many blank lines (3)
(E303)
🔇 Additional comments (9)
backend/requirements.txt (1)
3-3: Add RabbitMQ client (aio-pika) dependencyThe
aio-pika==9.5.5entry correctly pulls in the latest stable release (Feb 26, 2025) and has no known security advisories. This is a solid choice for the forthcoming RabbitMQ‐based queue manager.backend/app/core/orchestration/queue_manager.py (8)
6-8: Appropriate imports for RabbitMQ integration.The new imports (
aio_pika,json,settings) are correctly added to support the RabbitMQ functionality.
22-25: Correct queue structure change for RabbitMQ.The queue dictionary now properly maps priorities to RabbitMQ queue names instead of asyncio.Queue objects, which is the correct approach for distributed queuing.
29-30: Good addition of connection attributes.The connection and channel attributes are properly typed and will be used for RabbitMQ connection management.
34-45: Excellent implementation addressing previous concerns.This connect method properly addresses the past review comments by:
- Using configuration from settings with a sensible fallback
- Adding comprehensive error handling with try-catch
- Including proper logging for both success and failure cases
- Declaring queues as durable for persistence
The implementation follows best practices for RabbitMQ connection management.
49-49: Correct integration of connection setup.Properly calls the connect method before starting workers, ensuring RabbitMQ is ready.
67-71: Proper resource cleanup implementation.The connection and channel cleanup in the stop method ensures resources are properly released, preventing connection leaks.
87-92: Solid message publishing implementation.The enqueue method correctly:
- Serializes messages to JSON
- Publishes to the default exchange with proper routing keys
- Maintains the same interface while switching to RabbitMQ
Well-implemented distributed queuing approach.
136-139: Great enhancement for handler flexibility.Adding support for both synchronous and asynchronous handlers improves the queue manager's versatility and backward compatibility.
smokeyScraper
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you please update the poetry too?
Please add aio-pika and uvicorn[standard] (dk how uvicorn is missing)
|
LGTM!!! 🚀, works great and is clean. |
|
@smokeyScraper All comments have been resolved you can merge now |
|
Thanks a lot @ShivamMenda !!! |
Closes #66
📝 Description
Introduces Rabbit MQ as a queue manager, uses aio-pika library for async queue management.
🔧 Changes Made
Updated the backend/app/core/orchestration/queue_manager.py to use rabbit MQ
Uses 3 queues to Categorise messages and processes requests
🤝 Collaboration
Collaborated with: @smokeyScraper
✅ Checklist
Summary by CodeRabbit
Refactor
Chores