Skip to content

Commit 92bfe30

Browse files
authored
Merge branch 'main' into feat/onboarding-focused-workflow
2 parents af72240 + 1b68d85 commit 92bfe30

File tree

22 files changed

+6817
-5389
lines changed

22 files changed

+6817
-5389
lines changed

backend/app/agents/devrel/nodes/gather_context.py

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,25 @@
44

55
from app.agents.state import AgentState
66
from app.services.auth.management import get_or_create_user_by_discord
7+
from app.database.supabase.services import ensure_user_exists, get_conversation_context
78

89
logger = logging.getLogger(__name__)
910

1011
async def gather_context_node(state: AgentState) -> Dict[str, Any]:
1112
"""Gather additional context for the user and their request"""
1213
logger.info(f"Gathering context for session {state.session_id}")
1314

14-
# TODO: Add context gathering from databases
15-
# Currently, context is simple
16-
# In production, query databases for user history, etc.
17-
1815
original_message = state.context.get("original_message", "")
16+
author_info = state.context.get("author", {})
17+
18+
# Ensure user exists in database
19+
user_uuid = await ensure_user_exists(
20+
user_id=state.user_id,
21+
platform=state.platform,
22+
username=author_info.get("username"),
23+
display_name=author_info.get("display_name"),
24+
avatar_url=author_info.get("avatar_url")
25+
)
1926

2027
new_message = {
2128
"role": "user",
@@ -47,9 +54,36 @@ async def gather_context_node(state: AgentState) -> Dict[str, Any]:
4754
context_data = {
4855
"user_profile": profile_data or {"user_id": state.user_id, "platform": state.platform},
4956
"conversation_context": len(state.messages) + 1, # +1 for the new message
50-
"session_info": {"session_id": state.session_id}
57+
"session_info": {"session_id": state.session_id},
58+
"user_uuid": user_uuid
5159
}
5260

61+
# Only retrieve from database if we don't have conversation context already
62+
should_fetch_from_db = not state.conversation_summary and not state.key_topics
63+
64+
if user_uuid and should_fetch_from_db:
65+
logger.info(f"No existing context in state, fetching from database for user {user_uuid}")
66+
prev_context = await get_conversation_context(user_uuid)
67+
if prev_context:
68+
logger.info(f"Retrieved previous conversation context from database")
69+
context_data["previous_conversation"] = prev_context
70+
71+
# Populate state with previous conversation summary and topics
72+
return {
73+
"messages": [new_message],
74+
"context": {**state.context, **context_data},
75+
"conversation_summary": prev_context.get("conversation_summary"),
76+
"key_topics": prev_context.get("key_topics", []),
77+
"current_task": "context_gathered",
78+
"last_interaction_time": datetime.now()
79+
}
80+
else:
81+
logger.info(f"No previous conversation context found in database")
82+
else:
83+
if not should_fetch_from_db:
84+
logger.info(
85+
f"Using existing context from state (conversation_summary: {bool(state.conversation_summary)}, key_topics: {len(state.key_topics)})")
86+
5387
updated_context = {**state.context, **context_data}
5488

5589
result: Dict[str, Any] = {

backend/app/agents/devrel/nodes/generate_response.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import logging
22
import json
33
from typing import Dict, Any
4+
from datetime import datetime
45
from app.agents.state import AgentState
56
from langchain_core.messages import HumanMessage
67
from ..prompts.response_prompt import RESPONSE_PROMPT
8+
from app.database.supabase.services import store_interaction
79

810
logger = logging.getLogger(__name__)
911

@@ -16,6 +18,9 @@ async def generate_response_node(state: AgentState, llm) -> Dict[str, Any]:
1618
try:
1719
final_response = await _create_response(state, llm)
1820

21+
# Store interaction to database
22+
await _store_interaction_to_db(state, final_response)
23+
1924
return {
2025
"final_response": final_response,
2126
"current_task": "response_generated"
@@ -99,3 +104,43 @@ def _get_latest_message(state: AgentState) -> str:
99104
if state.messages:
100105
return state.messages[-1].get("content", "")
101106
return state.context.get("original_message", "")
107+
108+
async def _store_interaction_to_db(state: AgentState, final_response: str) -> None:
109+
"""Store the interaction to database"""
110+
try:
111+
user_uuid = state.context.get("user_uuid")
112+
113+
if not user_uuid:
114+
logger.warning(f"No user_uuid in context, skipping interaction storage for session {state.session_id}")
115+
return
116+
117+
# Get the latest user message content
118+
latest_message = _get_latest_message(state)
119+
120+
# Extract classification data
121+
classification = state.context.get("classification", {})
122+
# TODO: intent key not present in classification schema (contains: needs_devrel, priority, reasoning)
123+
# Modify prompt to include intent key
124+
intent = classification.get("reasoning") # Fallback to reasoning for intent
125+
126+
# Store the interaction
127+
await store_interaction(
128+
user_uuid=user_uuid,
129+
platform=state.platform,
130+
platform_specific_id=f"{state.session_id}_{datetime.now().timestamp()}",
131+
channel_id=state.channel_id,
132+
thread_id=state.thread_id,
133+
content=latest_message,
134+
interaction_type="message",
135+
intent_classification=intent,
136+
topics_discussed=state.key_topics if state.key_topics else None,
137+
metadata={
138+
"session_id": state.session_id,
139+
"response": final_response[:500] if final_response else None,
140+
"tools_used": state.tools_used,
141+
"classification": classification
142+
}
143+
)
144+
145+
except Exception as e:
146+
logger.error(f"Error storing interaction to database: {str(e)}")

backend/app/agents/devrel/nodes/summarization.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -145,18 +145,22 @@ async def store_summary_to_database(state: AgentState) -> None:
145145
logger.error(f"Missing required fields: user_id={state.user_id}, platform={state.platform}")
146146
return
147147

148-
platform_id = state.user_id
149-
platform_column = f"{state.platform}_id"
148+
user_uuid = state.context.get("user_uuid")
150149

151-
# Fetch the user's UUID from the 'users' table
152-
user_response = await supabase.table("users").select("id").eq(platform_column, platform_id).limit(1).execute()
150+
if not user_uuid:
151+
platform_id = state.user_id
152+
platform_column = f"{state.platform}_id"
153153

154-
if not user_response.data:
155-
logger.error(f"User with {platform_column} '{platform_id}' not found in users table.")
156-
return
154+
user_response = await supabase.table("users").select("id").eq(platform_column, platform_id).limit(1).execute()
155+
156+
if not user_response.data:
157+
logger.error(f"User with {platform_column} '{platform_id}' not found in users table.")
158+
return
157159

158-
user_uuid = user_response.data[0]['id']
159-
logger.info(f"Found user UUID: {user_uuid} for {platform_column}: {platform_id}")
160+
user_uuid = user_response.data[0]['id']
161+
logger.info(f"Found user UUID: {user_uuid} for {platform_column}: {platform_id}")
162+
else:
163+
logger.info(f"Using cached user UUID from context: {user_uuid}")
160164

161165
# Record to insert/update
162166
record = {
@@ -178,4 +182,4 @@ async def store_summary_to_database(state: AgentState) -> None:
178182
logger.error(f"❌ Supabase upsert failed for session {state.session_id}: {response}")
179183

180184
except Exception as e:
181-
logger.error(f"Unexpected error while storing summary: {str(e)}")
185+
logger.error(f"Unexpected error while storing summary: {str(e)}")

backend/app/api/router.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from fastapi import APIRouter
22
from .v1.auth import router as auth_router
33
from .v1.health import router as health_router
4+
from .v1.integrations import router as integrations_router
45

56
api_router = APIRouter()
67

@@ -16,4 +17,10 @@
1617
tags=["Health"]
1718
)
1819

20+
api_router.include_router(
21+
integrations_router,
22+
prefix="/v1/integrations",
23+
tags=["Integrations"]
24+
)
25+
1926
__all__ = ["api_router"]

backend/app/api/v1/integrations.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
from fastapi import APIRouter, HTTPException, Depends, status
2+
from uuid import UUID
3+
from app.models.integration import (
4+
IntegrationCreateRequest,
5+
IntegrationUpdateRequest,
6+
IntegrationResponse,
7+
IntegrationListResponse,
8+
IntegrationStatusResponse
9+
)
10+
from app.services.integration_service import integration_service, NotFoundError
11+
from app.core.dependencies import get_current_user
12+
13+
router = APIRouter()
14+
15+
16+
@router.post("/", response_model=IntegrationResponse, status_code=status.HTTP_201_CREATED)
17+
async def create_integration(
18+
request: IntegrationCreateRequest,
19+
user_id: UUID = Depends(get_current_user)
20+
):
21+
"""Create a new organization integration."""
22+
try:
23+
return await integration_service.create_integration(user_id, request)
24+
except ValueError as e:
25+
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
26+
except Exception as e:
27+
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) from e
28+
29+
30+
@router.get("/", response_model=IntegrationListResponse)
31+
async def list_integrations(user_id: UUID = Depends(get_current_user)):
32+
"""List all integrations for the current user."""
33+
try:
34+
integrations = await integration_service.get_integrations(user_id)
35+
return IntegrationListResponse(integrations=integrations, total=len(integrations))
36+
except Exception as e:
37+
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) from e
38+
39+
40+
@router.get("/status/{platform}", response_model=IntegrationStatusResponse)
41+
async def get_integration_status(
42+
platform: str,
43+
user_id: UUID = Depends(get_current_user)
44+
):
45+
"""Get the status of a specific platform integration."""
46+
try:
47+
return await integration_service.get_integration_status(user_id, platform)
48+
except Exception as e:
49+
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) from e
50+
51+
@router.get("/{integration_id}", response_model=IntegrationResponse)
52+
async def get_integration(
53+
integration_id: UUID,
54+
user_id: UUID = Depends(get_current_user)
55+
):
56+
"""Get a specific integration."""
57+
try:
58+
integration = await integration_service.get_integration(user_id, integration_id)
59+
60+
if not integration:
61+
raise HTTPException(
62+
status_code=status.HTTP_404_NOT_FOUND,
63+
detail="Integration not found"
64+
)
65+
66+
return integration
67+
except Exception as e:
68+
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) from e
69+
70+
@router.put("/{integration_id}", response_model=IntegrationResponse)
71+
async def update_integration(
72+
integration_id: UUID,
73+
request: IntegrationUpdateRequest,
74+
user_id: UUID = Depends(get_current_user)
75+
):
76+
"""Update an existing integration."""
77+
try:
78+
return await integration_service.update_integration(user_id, integration_id, request)
79+
except NotFoundError as e:
80+
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) from e
81+
except HTTPException:
82+
raise
83+
except Exception as e:
84+
raise HTTPException(
85+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
86+
detail=f"Failed to update integration: {str(e)}"
87+
) from e
88+
89+
@router.delete("/{integration_id}", status_code=status.HTTP_204_NO_CONTENT)
90+
async def delete_integration(
91+
integration_id: UUID,
92+
user_id: UUID = Depends(get_current_user)
93+
):
94+
"""Delete an integration."""
95+
try:
96+
await integration_service.delete_integration(user_id, integration_id)
97+
except NotFoundError as e:
98+
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) from e
99+
except HTTPException:
100+
raise
101+
except Exception as e:
102+
raise HTTPException(
103+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
104+
detail=f"Failed to delete integration: {str(e)}"
105+
) from e

backend/app/core/dependencies.py

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,69 @@
1-
from fastapi import Request
1+
from fastapi import Header, HTTPException, status, Request
2+
from uuid import UUID
3+
from app.database.supabase.client import get_supabase_client
4+
import logging
25
from typing import TYPE_CHECKING
36

47
if TYPE_CHECKING:
58
from main import DevRAIApplication
69

7-
async def get_app_instance(request: Request) -> "DevRAIApplication":
10+
logger = logging.getLogger(__name__)
11+
12+
13+
def get_app_instance(request: Request) -> "DevRAIApplication":
14+
"""Get the application instance from FastAPI app state."""
15+
return request.app.state.app_instance
16+
17+
18+
async def get_current_user(authorization: str = Header(None)) -> UUID:
819
"""
9-
Dependency to get the application instance from FastAPI's state.
10-
This avoids circular imports by using dependency injection.
20+
Get the current authenticated user from the Supabase JWT token.
21+
22+
Args:
23+
authorization: The Authorization header containing the Bearer token
24+
25+
Returns:
26+
UUID: The user's ID
27+
28+
Raises:
29+
HTTPException: If authentication fails
1130
"""
12-
return request.app.state.app_instance
31+
if not authorization:
32+
raise HTTPException(
33+
status_code=status.HTTP_401_UNAUTHORIZED,
34+
detail="Missing authorization header",
35+
headers={"WWW-Authenticate": "Bearer"},
36+
)
37+
38+
if not authorization.startswith("Bearer "):
39+
raise HTTPException(
40+
status_code=status.HTTP_401_UNAUTHORIZED,
41+
detail="Invalid authorization header format. Expected 'Bearer <token>'",
42+
headers={"WWW-Authenticate": "Bearer"},
43+
)
44+
45+
token = authorization.replace("Bearer ", "")
46+
47+
try:
48+
supabase = get_supabase_client()
49+
# Verify the token and get user
50+
user_response = supabase.auth.get_user(token)
51+
52+
if not user_response or not user_response.user:
53+
raise HTTPException(
54+
status_code=status.HTTP_401_UNAUTHORIZED,
55+
detail="Invalid or expired token",
56+
headers={"WWW-Authenticate": "Bearer"},
57+
)
58+
59+
return UUID(user_response.user.id)
60+
61+
except HTTPException:
62+
raise
63+
except Exception as e:
64+
logger.exception("Authentication error")
65+
raise HTTPException(
66+
status_code=status.HTTP_401_UNAUTHORIZED,
67+
detail="Authentication failed",
68+
headers={"WWW-Authenticate": "Bearer"},
69+
) from e

0 commit comments

Comments
 (0)