Skip to content

Commit 1b68d85

Browse files
Merge pull request #144 from smokeyScraper/metrics_setup
feat: database metrics setup with enhancing memory logic for downstream metrics
2 parents de6ee5b + 688a83b commit 1b68d85

File tree

6 files changed

+320
-23
lines changed

6 files changed

+320
-23
lines changed

backend/app/agents/devrel/nodes/gather_context.py

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,25 @@
22
from datetime import datetime
33
from typing import Dict, Any
44
from app.agents.state import AgentState
5+
from app.database.supabase.services import ensure_user_exists, get_conversation_context
56

67
logger = logging.getLogger(__name__)
78

89
async def gather_context_node(state: AgentState) -> Dict[str, Any]:
910
"""Gather additional context for the user and their request"""
1011
logger.info(f"Gathering context for session {state.session_id}")
1112

12-
# TODO: Add context gathering from databases
13-
# Currently, context is simple
14-
# In production, query databases for user history, etc.
15-
1613
original_message = state.context.get("original_message", "")
14+
author_info = state.context.get("author", {})
15+
16+
# Ensure user exists in database
17+
user_uuid = await ensure_user_exists(
18+
user_id=state.user_id,
19+
platform=state.platform,
20+
username=author_info.get("username"),
21+
display_name=author_info.get("display_name"),
22+
avatar_url=author_info.get("avatar_url")
23+
)
1724

1825
new_message = {
1926
"role": "user",
@@ -24,9 +31,36 @@ async def gather_context_node(state: AgentState) -> Dict[str, Any]:
2431
context_data = {
2532
"user_profile": {"user_id": state.user_id, "platform": state.platform},
2633
"conversation_context": len(state.messages) + 1, # +1 for the new message
27-
"session_info": {"session_id": state.session_id}
34+
"session_info": {"session_id": state.session_id},
35+
"user_uuid": user_uuid
2836
}
2937

38+
# Only retrieve from database if we don't have conversation context already
39+
should_fetch_from_db = not state.conversation_summary and not state.key_topics
40+
41+
if user_uuid and should_fetch_from_db:
42+
logger.info(f"No existing context in state, fetching from database for user {user_uuid}")
43+
prev_context = await get_conversation_context(user_uuid)
44+
if prev_context:
45+
logger.info(f"Retrieved previous conversation context from database")
46+
context_data["previous_conversation"] = prev_context
47+
48+
# Populate state with previous conversation summary and topics
49+
return {
50+
"messages": [new_message],
51+
"context": {**state.context, **context_data},
52+
"conversation_summary": prev_context.get("conversation_summary"),
53+
"key_topics": prev_context.get("key_topics", []),
54+
"current_task": "context_gathered",
55+
"last_interaction_time": datetime.now()
56+
}
57+
else:
58+
logger.info(f"No previous conversation context found in database")
59+
else:
60+
if not should_fetch_from_db:
61+
logger.info(
62+
f"Using existing context from state (conversation_summary: {bool(state.conversation_summary)}, key_topics: {len(state.key_topics)})")
63+
3064
updated_context = {**state.context, **context_data}
3165

3266
return {

backend/app/agents/devrel/nodes/generate_response.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import logging
22
import json
33
from typing import Dict, Any
4+
from datetime import datetime
45
from app.agents.state import AgentState
56
from langchain_core.messages import HumanMessage
67
from ..prompts.response_prompt import RESPONSE_PROMPT
8+
from app.database.supabase.services import store_interaction
79

810
logger = logging.getLogger(__name__)
911

@@ -16,6 +18,9 @@ async def generate_response_node(state: AgentState, llm) -> Dict[str, Any]:
1618
try:
1719
final_response = await _create_response(state, llm)
1820

21+
# Store interaction to database
22+
await _store_interaction_to_db(state, final_response)
23+
1924
return {
2025
"final_response": final_response,
2126
"current_task": "response_generated"
@@ -99,3 +104,43 @@ def _get_latest_message(state: AgentState) -> str:
99104
if state.messages:
100105
return state.messages[-1].get("content", "")
101106
return state.context.get("original_message", "")
107+
108+
async def _store_interaction_to_db(state: AgentState, final_response: str) -> None:
109+
"""Store the interaction to database"""
110+
try:
111+
user_uuid = state.context.get("user_uuid")
112+
113+
if not user_uuid:
114+
logger.warning(f"No user_uuid in context, skipping interaction storage for session {state.session_id}")
115+
return
116+
117+
# Get the latest user message content
118+
latest_message = _get_latest_message(state)
119+
120+
# Extract classification data
121+
classification = state.context.get("classification", {})
122+
# TODO: intent key not present in classification schema (contains: needs_devrel, priority, reasoning)
123+
# Modify prompt to include intent key
124+
intent = classification.get("reasoning") # Fallback to reasoning for intent
125+
126+
# Store the interaction
127+
await store_interaction(
128+
user_uuid=user_uuid,
129+
platform=state.platform,
130+
platform_specific_id=f"{state.session_id}_{datetime.now().timestamp()}",
131+
channel_id=state.channel_id,
132+
thread_id=state.thread_id,
133+
content=latest_message,
134+
interaction_type="message",
135+
intent_classification=intent,
136+
topics_discussed=state.key_topics if state.key_topics else None,
137+
metadata={
138+
"session_id": state.session_id,
139+
"response": final_response[:500] if final_response else None,
140+
"tools_used": state.tools_used,
141+
"classification": classification
142+
}
143+
)
144+
145+
except Exception as e:
146+
logger.error(f"Error storing interaction to database: {str(e)}")

backend/app/agents/devrel/nodes/summarization.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -145,18 +145,22 @@ async def store_summary_to_database(state: AgentState) -> None:
145145
logger.error(f"Missing required fields: user_id={state.user_id}, platform={state.platform}")
146146
return
147147

148-
platform_id = state.user_id
149-
platform_column = f"{state.platform}_id"
148+
user_uuid = state.context.get("user_uuid")
150149

151-
# Fetch the user's UUID from the 'users' table
152-
user_response = await supabase.table("users").select("id").eq(platform_column, platform_id).limit(1).execute()
150+
if not user_uuid:
151+
platform_id = state.user_id
152+
platform_column = f"{state.platform}_id"
153153

154-
if not user_response.data:
155-
logger.error(f"User with {platform_column} '{platform_id}' not found in users table.")
156-
return
154+
user_response = await supabase.table("users").select("id").eq(platform_column, platform_id).limit(1).execute()
155+
156+
if not user_response.data:
157+
logger.error(f"User with {platform_column} '{platform_id}' not found in users table.")
158+
return
157159

158-
user_uuid = user_response.data[0]['id']
159-
logger.info(f"Found user UUID: {user_uuid} for {platform_column}: {platform_id}")
160+
user_uuid = user_response.data[0]['id']
161+
logger.info(f"Found user UUID: {user_uuid} for {platform_column}: {platform_id}")
162+
else:
163+
logger.info(f"Using cached user UUID from context: {user_uuid}")
160164

161165
# Record to insert/update
162166
record = {
@@ -178,4 +182,4 @@ async def store_summary_to_database(state: AgentState) -> None:
178182
logger.error(f"❌ Supabase upsert failed for session {state.session_id}: {response}")
179183

180184
except Exception as e:
181-
logger.error(f"Unexpected error while storing summary: {str(e)}")
185+
logger.error(f"Unexpected error while storing summary: {str(e)}")

backend/app/database/supabase/scripts/create_db.sql

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,4 +192,27 @@ $$ language 'plpgsql';
192192
CREATE TRIGGER update_conversation_context_updated_at
193193
BEFORE UPDATE ON conversation_context
194194
FOR EACH ROW
195-
EXECUTE FUNCTION update_updated_at_column();
195+
EXECUTE FUNCTION update_updated_at_column();
196+
197+
-- Migration: Add atomic increment function for user interaction count
198+
-- This function safely increments the total_interactions_count for a user
199+
200+
CREATE OR REPLACE FUNCTION increment_user_interaction_count(user_uuid UUID)
201+
RETURNS INTEGER AS $$
202+
DECLARE
203+
new_count INTEGER;
204+
BEGIN
205+
-- Atomically increment the counter and return the new value
206+
UPDATE users
207+
SET total_interactions_count = total_interactions_count + 1
208+
WHERE id = user_uuid
209+
RETURNING total_interactions_count INTO new_count;
210+
211+
-- Return the new count (NULL if user not found)
212+
RETURN new_count;
213+
END;
214+
$$ LANGUAGE plpgsql;
215+
216+
-- Optional: Add a comment for documentation
217+
COMMENT ON FUNCTION increment_user_interaction_count(UUID) IS
218+
'Atomically increments the total_interactions_count for a user. Returns the new count or NULL if user not found.';
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
import logging
2+
from typing import Dict, Any, Optional
3+
from datetime import datetime
4+
import uuid
5+
from app.database.supabase.client import get_supabase_client
6+
7+
logger = logging.getLogger(__name__)
8+
supabase = get_supabase_client()
9+
10+
11+
async def ensure_user_exists(
12+
user_id: str,
13+
platform: str,
14+
username: Optional[str] = None,
15+
display_name: Optional[str] = None,
16+
avatar_url: Optional[str] = None
17+
) -> Optional[str]:
18+
"""
19+
Ensure a user exists in the database. If not, create them.
20+
Returns the user's UUID, or None if an error occurs.
21+
22+
Args:
23+
user_id: Platform-specific user ID (e.g., discord_id, slack_id)
24+
platform: Platform name (discord, slack, github)
25+
username: Platform username
26+
display_name: Display name for the user
27+
avatar_url: Avatar URL
28+
29+
Returns:
30+
User UUID as string, or None on error
31+
"""
32+
try:
33+
platform_id_column = f"{platform}_id"
34+
platform_username_column = f"{platform}_username"
35+
36+
# Check if user exists
37+
response = await supabase.table("users").select("id").eq(platform_id_column, user_id).limit(1).execute()
38+
39+
if response.data:
40+
user_uuid = response.data[0]['id']
41+
logger.info(f"User found: {user_uuid} for {platform_id_column}: {user_id}")
42+
43+
# Update last_active timestamp
44+
last_active_column = f"last_active_{platform}"
45+
await supabase.table("users").update({
46+
last_active_column: datetime.now().isoformat()
47+
}).eq("id", user_uuid).execute()
48+
49+
return user_uuid
50+
51+
# User doesn't exist, create new user
52+
logger.info(f"Creating new user for {platform_id_column}: {user_id}")
53+
54+
new_user = {
55+
"id": str(uuid.uuid4()),
56+
platform_id_column: user_id,
57+
"display_name": display_name or username or f"{platform}_user_{user_id[:8]}",
58+
}
59+
60+
if username:
61+
new_user[platform_username_column] = username
62+
if avatar_url:
63+
new_user["avatar_url"] = avatar_url
64+
65+
# Set last_active timestamp
66+
last_active_column = f"last_active_{platform}"
67+
new_user[last_active_column] = datetime.now().isoformat()
68+
69+
insert_response = await supabase.table("users").insert(new_user).execute()
70+
71+
if insert_response.data:
72+
user_uuid = insert_response.data[0]['id']
73+
logger.info(f"User created successfully: {user_uuid}")
74+
return user_uuid
75+
else:
76+
logger.error(f"Failed to create user: {insert_response}")
77+
return None
78+
79+
except Exception as e:
80+
logger.error(f"Error ensuring user exists: {str(e)}")
81+
return None
82+
83+
84+
async def store_interaction(
85+
user_uuid: str,
86+
platform: str,
87+
platform_specific_id: str,
88+
channel_id: Optional[str] = None,
89+
thread_id: Optional[str] = None,
90+
content: Optional[str] = None,
91+
interaction_type: Optional[str] = None,
92+
intent_classification: Optional[str] = None,
93+
topics_discussed: Optional[list] = None,
94+
metadata: Optional[Dict[str, Any]] = None
95+
) -> bool:
96+
"""
97+
Store an interaction in the database.
98+
99+
Args:
100+
user_uuid: User's UUID from users table
101+
platform: Platform name (discord, slack, github)
102+
platform_specific_id: Platform-specific message/interaction ID
103+
channel_id: Channel ID where interaction occurred
104+
thread_id: Thread ID where interaction occurred
105+
content: Content of the interaction
106+
interaction_type: Type of interaction (message, comment, pr, etc.)
107+
intent_classification: Classification of user intent
108+
topics_discussed: List of topics discussed
109+
metadata: Additional metadata
110+
111+
Returns:
112+
True if successful, False otherwise
113+
"""
114+
try:
115+
interaction_data = {
116+
"id": str(uuid.uuid4()),
117+
"user_id": user_uuid,
118+
"platform": platform,
119+
"platform_specific_id": platform_specific_id,
120+
}
121+
122+
if channel_id:
123+
interaction_data["channel_id"] = channel_id
124+
if thread_id:
125+
interaction_data["thread_id"] = thread_id
126+
if content:
127+
interaction_data["content"] = content
128+
if interaction_type:
129+
interaction_data["interaction_type"] = interaction_type
130+
if intent_classification:
131+
interaction_data["intent_classification"] = intent_classification
132+
if topics_discussed:
133+
interaction_data["topics_discussed"] = topics_discussed
134+
if metadata:
135+
interaction_data["metadata"] = metadata
136+
137+
response = await supabase.table("interactions").insert(interaction_data).execute()
138+
139+
if response.data:
140+
logger.info(f"Interaction stored successfully for user {user_uuid}")
141+
142+
# Atomically increment user's total_interactions_count
143+
try:
144+
rpc_response = await supabase.rpc("increment_user_interaction_count", {"user_uuid": user_uuid}).execute()
145+
if rpc_response.data is not None:
146+
logger.debug(f"Updated interaction count for user {user_uuid}: {rpc_response.data}")
147+
else:
148+
logger.warning(f"User {user_uuid} not found when incrementing interaction count")
149+
except Exception as e:
150+
logger.exception("Error incrementing user interaction count")
151+
152+
# Not failing the entire operation if incrementing the interaction count fails
153+
return True
154+
155+
except Exception as e:
156+
logger.error(f"Error storing interaction: {str(e)}")
157+
return False
158+
159+
160+
async def get_conversation_context(user_uuid: str) -> Optional[Dict[str, Any]]:
161+
"""
162+
Retrieve conversation context for a user.
163+
164+
Args:
165+
user_uuid: User's UUID from users table
166+
167+
Returns:
168+
Dictionary containing conversation context, or None if not found
169+
"""
170+
try:
171+
response = await supabase.table("conversation_context").select("*").eq("user_id", user_uuid).limit(1).execute()
172+
173+
if response.data:
174+
context = response.data[0]
175+
logger.info(f"Retrieved conversation context for user {user_uuid}")
176+
return {
177+
"conversation_summary": context.get("conversation_summary"),
178+
"key_topics": context.get("key_topics", []),
179+
"total_interactions": context.get("total_interactions", 0),
180+
"session_start_time": context.get("session_start_time"),
181+
"session_end_time": context.get("session_end_time"),
182+
}
183+
else:
184+
logger.info(f"No conversation context found for user {user_uuid}")
185+
return None
186+
187+
except Exception as e:
188+
logger.error(f"Error retrieving conversation context: {str(e)}")
189+
return None

0 commit comments

Comments
 (0)