Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions backend/app/core/orchestration/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def connect(self):
await self.channel.declare_queue(queue_name, durable=True)
logger.info("Successfully connected to RabbitMQ")
except Exception as e:
logger.error(f"Failed to connect to RabbitMQ: {e}")
logger.error(f"Failed to connect to RabbitMQ: {e}", exc_info=True)
raise

async def start(self, num_workers: int = 3):
Expand Down Expand Up @@ -79,14 +79,22 @@ async def enqueue(self,
if delay > 0:
await asyncio.sleep(delay)

# Guard against publishing while disconnected
if not self.channel or self.channel.is_closed:
raise RuntimeError("Queue channel is not connected. Call start() before enqueue().")

queue_item = {
"id": message.get("id", f"msg_{datetime.now().timestamp()}"),
"priority": priority,
"priority": priority.value, # Serialize enum to string
"data": message
}
json_message = json.dumps(queue_item).encode()
await self.channel.default_exchange.publish(
aio_pika.Message(body=json_message),
aio_pika.Message(
body=json_message,
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
content_type="application/json"
),
routing_key=self.queues[priority]
)
logger.info(f"Enqueued message {queue_item['id']} with priority {priority}")
Expand Down Expand Up @@ -114,13 +122,13 @@ async def _worker(self, worker_name: str):
await self._process_item(item, worker_name)
await message.ack()
except Exception as e:
logger.error(f"Error processing message: {e}")
logger.error("Error processing message: %s", e, exc_info=True)
await message.nack(requeue=False)
except asyncio.CancelledError:
logger.info(f"Worker {worker_name} cancelled")
return
except Exception as e:
logger.error(f"Worker {worker_name} error: {e}")
logger.error("Worker %s error: %s", worker_name, e, exc_info=True)
await asyncio.sleep(0.1)

async def _process_item(self, item: Dict[str, Any], worker_name: str):
Expand Down
17 changes: 6 additions & 11 deletions backend/app/database/weaviate/client.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
import weaviate
import weaviate.exceptions
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import logging

logger = logging.getLogger(__name__)

_client = None


def get_client():
"""Get or create the global Weaviate client instance."""
global _client
if _client is None:
_client = weaviate.use_async_with_local()
return _client
"""Create a new async Weaviate client instance per context use."""
return weaviate.use_async_with_local()

@asynccontextmanager
async def get_weaviate_client() -> AsyncGenerator[weaviate.WeaviateClient, None]:
Expand All @@ -22,11 +17,11 @@ async def get_weaviate_client() -> AsyncGenerator[weaviate.WeaviateClient, None]
try:
await client.connect()
yield client
except Exception as e:
logger.error(f"Weaviate client error: {str(e)}")
except weaviate.exceptions.WeaviateBaseError as e:
logger.error("Weaviate client error: %s", e, exc_info=True)
raise
finally:
try:
await client.close()
except Exception as e:
logger.warning(f"Error closing Weaviate client: {str(e)}")
logger.warning("Error closing Weaviate client: %s", e, exc_info=True)
2 changes: 0 additions & 2 deletions frontend/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ function App() {
supabase.auth.getSession().then(({ data, error }) => {
if (error) {
toast.error('User Login Failed');
console.error('Error checking session:', error);
return;
}
setIsAuthenticated(!!data.session);
Expand Down Expand Up @@ -93,7 +92,6 @@ function App() {
const { error } = await supabase.auth.signOut();
if (error) {
toast.error('Logout failed');
console.error('Error during logout:', error);
return;
}
toast.success('Signed out!');
Expand Down
8 changes: 7 additions & 1 deletion frontend/src/components/integration/BotIntegration.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ const BotIntegration: React.FC<BotIntegrationProps> = ({
setIntegration(null);
}
} catch (error) {
console.error('Error loading integration status:', error);
// Handle integration status loading errors
setIsConnected(false);
setIntegration(null);
// Log in development for debugging
if (import.meta.env.DEV) {
console.error('Failed to load integration status:', error);
}
}
};

Expand Down
16 changes: 12 additions & 4 deletions frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,15 @@ class ApiClient {
// Add response interceptor for error handling
this.client.interceptors.response.use(
(response) => response,
(error) => {
async (error) => {
if (error.response?.status === 401) {
// Handle unauthorized - could redirect to login
console.error('Unauthorized request');
// Clear session and redirect to login
// Avoid infinite redirect loop if already on login page
if (!window.location.pathname.includes('/login')) {
await supabase.auth.signOut();
const returnUrl = encodeURIComponent(window.location.pathname + window.location.search);
window.location.href = `/login?returnUrl=${returnUrl}`;
}
}
return Promise.reject(error);
}
Expand Down Expand Up @@ -166,7 +171,10 @@ class ApiClient {
const response = await this.client.get('/v1/health');
return response.status === 200;
} catch (error) {
console.error('Backend health check failed:', error);
// Log in development for debugging
if (import.meta.env.DEV) {
console.error('Health check failed:', error);
}
return false;
}
}
Expand Down