diff --git a/backend/app/core/orchestration/queue_manager.py b/backend/app/core/orchestration/queue_manager.py index 346bc9a..885424b 100644 --- a/backend/app/core/orchestration/queue_manager.py +++ b/backend/app/core/orchestration/queue_manager.py @@ -41,7 +41,7 @@ async def connect(self): await self.channel.declare_queue(queue_name, durable=True) logger.info("Successfully connected to RabbitMQ") except Exception as e: - logger.error(f"Failed to connect to RabbitMQ: {e}") + logger.error(f"Failed to connect to RabbitMQ: {e}", exc_info=True) raise async def start(self, num_workers: int = 3): @@ -79,14 +79,22 @@ async def enqueue(self, if delay > 0: await asyncio.sleep(delay) + # Guard against publishing while disconnected + if not self.channel or self.channel.is_closed: + raise RuntimeError("Queue channel is not connected. Call start() before enqueue().") + queue_item = { "id": message.get("id", f"msg_{datetime.now().timestamp()}"), - "priority": priority, + "priority": priority.value, # Serialize enum to string "data": message } json_message = json.dumps(queue_item).encode() await self.channel.default_exchange.publish( - aio_pika.Message(body=json_message), + aio_pika.Message( + body=json_message, + delivery_mode=aio_pika.DeliveryMode.PERSISTENT, + content_type="application/json" + ), routing_key=self.queues[priority] ) logger.info(f"Enqueued message {queue_item['id']} with priority {priority}") @@ -114,13 +122,13 @@ async def _worker(self, worker_name: str): await self._process_item(item, worker_name) await message.ack() except Exception as e: - logger.error(f"Error processing message: {e}") + logger.error("Error processing message: %s", e, exc_info=True) await message.nack(requeue=False) except asyncio.CancelledError: logger.info(f"Worker {worker_name} cancelled") return except Exception as e: - logger.error(f"Worker {worker_name} error: {e}") + logger.error("Worker %s error: %s", worker_name, e, exc_info=True) await asyncio.sleep(0.1) async def _process_item(self, item: Dict[str, Any], worker_name: str): diff --git a/backend/app/database/weaviate/client.py b/backend/app/database/weaviate/client.py index 5ed11ed..5132596 100644 --- a/backend/app/database/weaviate/client.py +++ b/backend/app/database/weaviate/client.py @@ -1,19 +1,14 @@ import weaviate +import weaviate.exceptions from contextlib import asynccontextmanager from typing import AsyncGenerator import logging logger = logging.getLogger(__name__) -_client = None - - def get_client(): - """Get or create the global Weaviate client instance.""" - global _client - if _client is None: - _client = weaviate.use_async_with_local() - return _client + """Create a new async Weaviate client instance per context use.""" + return weaviate.use_async_with_local() @asynccontextmanager async def get_weaviate_client() -> AsyncGenerator[weaviate.WeaviateClient, None]: @@ -22,11 +17,11 @@ async def get_weaviate_client() -> AsyncGenerator[weaviate.WeaviateClient, None] try: await client.connect() yield client - except Exception as e: - logger.error(f"Weaviate client error: {str(e)}") + except weaviate.exceptions.WeaviateBaseError as e: + logger.error("Weaviate client error: %s", e, exc_info=True) raise finally: try: await client.close() except Exception as e: - logger.warning(f"Error closing Weaviate client: {str(e)}") + logger.warning("Error closing Weaviate client: %s", e, exc_info=True) diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 9f70aa6..f506220 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -47,7 +47,6 @@ function App() { supabase.auth.getSession().then(({ data, error }) => { if (error) { toast.error('User Login Failed'); - console.error('Error checking session:', error); return; } setIsAuthenticated(!!data.session); @@ -93,7 +92,6 @@ function App() { const { error } = await supabase.auth.signOut(); if (error) { toast.error('Logout failed'); - console.error('Error during logout:', error); return; } toast.success('Signed out!'); diff --git a/frontend/src/components/integration/BotIntegration.tsx b/frontend/src/components/integration/BotIntegration.tsx index 813c251..99dcf2a 100644 --- a/frontend/src/components/integration/BotIntegration.tsx +++ b/frontend/src/components/integration/BotIntegration.tsx @@ -47,7 +47,13 @@ const BotIntegration: React.FC = ({ setIntegration(null); } } catch (error) { - console.error('Error loading integration status:', error); + // Handle integration status loading errors + setIsConnected(false); + setIntegration(null); + // Log in development for debugging + if (import.meta.env.DEV) { + console.error('Failed to load integration status:', error); + } } }; diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 963ca3b..553c1bf 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -83,10 +83,15 @@ class ApiClient { // Add response interceptor for error handling this.client.interceptors.response.use( (response) => response, - (error) => { + async (error) => { if (error.response?.status === 401) { - // Handle unauthorized - could redirect to login - console.error('Unauthorized request'); + // Clear session and redirect to login + // Avoid infinite redirect loop if already on login page + if (!window.location.pathname.includes('/login')) { + await supabase.auth.signOut(); + const returnUrl = encodeURIComponent(window.location.pathname + window.location.search); + window.location.href = `/login?returnUrl=${returnUrl}`; + } } return Promise.reject(error); } @@ -166,7 +171,10 @@ class ApiClient { const response = await this.client.get('/v1/health'); return response.status === 200; } catch (error) { - console.error('Backend health check failed:', error); + // Log in development for debugging + if (import.meta.env.DEV) { + console.error('Health check failed:', error); + } return false; } }