Skip to content

Commit 2616103

Browse files
Merge pull request #84 from smokeyScraper/user_profiling
[feat]: update weaviate interactions to async and add user profiling logic for user indexing and retrieval
2 parents c2aded9 + 13993f5 commit 2616103

File tree

11 files changed

+733
-56
lines changed

11 files changed

+733
-56
lines changed

backend/.env.example

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ SUPABASE_SERVICE_ROLE_KEY=
88
DISCORD_BOT_TOKEN=
99
# ENABLE_DISCORD_BOT=true
1010

11+
GITHUB_TOKEN=
12+
1113
# EMBEDDING_MODEL=BAAI/bge-small-en-v1.5
1214
# EMBEDDING_MAX_BATCH_SIZE=32
1315
# EMBEDDING_DEVICE=cpu

backend/app/api/v1/auth.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
from fastapi.responses import HTMLResponse
33
from app.db.supabase.supabase_client import get_supabase_client
44
from app.db.supabase.users_service import find_user_by_session_and_verify, get_verification_session_info
5+
from app.db.weaviate.user_profiling import profile_user_from_github
56
from typing import Optional
67
import logging
8+
import asyncio
79

810
logger = logging.getLogger(__name__)
911
router = APIRouter()
@@ -67,7 +69,15 @@ async def auth_callback(request: Request, code: Optional[str] = Query(None), ses
6769
logger.error("User verification failed - no pending verification found")
6870
return _error_response("No pending verification found or verification has expired. Please try the !verify_github command again.")
6971

70-
logger.info(f"Successfully verified user: {verified_user.id}")
72+
logger.info(f"Successfully verified user: {verified_user.id}!")
73+
74+
logger.info(f"Indexing user: {verified_user.id} into Weaviate...")
75+
try:
76+
asyncio.create_task(profile_user_from_github(str(verified_user.id), github_username))
77+
logger.info(f"User profiling started in background for: {verified_user.id}")
78+
except Exception as e:
79+
logger.error(f"Error starting user profiling: {verified_user.id}: {str(e)}")
80+
7181
return _success_response(github_username)
7282

7383
except Exception as e:
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
import logging
2+
import asyncio
3+
import aiohttp
4+
from typing import List, Optional, Dict
5+
from datetime import datetime
6+
from collections import Counter
7+
from app.model.weaviate.models import WeaviateUserProfile, WeaviateRepository, WeaviatePullRequest
8+
from app.db.weaviate.weaviate_operations import store_user_profile
9+
from app.core.config import settings
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class GitHubUserProfiler:
15+
"""
16+
Class to handle GitHub user profiling and Weaviate storage.
17+
Uses organization's GitHub token to fetch public user data via GitHub REST API.
18+
"""
19+
20+
def __init__(self):
21+
if not settings.github_token:
22+
raise ValueError("GitHub token not configured in environment variables")
23+
24+
self.headers = {
25+
"Authorization": f"token {settings.github_token}",
26+
"Accept": "application/vnd.github.v3+json",
27+
"User-Agent": "DevRel-AI-Bot/1.0"
28+
}
29+
self.base_url = "https://api.github.com"
30+
self.session = None
31+
32+
async def __aenter__(self):
33+
"""Create async HTTP session"""
34+
timeout = aiohttp.ClientTimeout(total=60, connect=10, sock_read=30)
35+
connector = aiohttp.TCPConnector(
36+
limit=50, # Total connection pool size
37+
limit_per_host=10, # Per-host connection limit
38+
ttl_dns_cache=300, # DNS cache TTL
39+
use_dns_cache=True,
40+
)
41+
42+
self.session = aiohttp.ClientSession(
43+
headers=self.headers,
44+
timeout=timeout,
45+
connector=connector
46+
)
47+
return self
48+
49+
async def __aexit__(self, exc_type, exc_val, exc_tb):
50+
"""Close async HTTP session"""
51+
if self.session:
52+
await self.session.close()
53+
54+
async def _make_request(self, url: str, params: Dict = None) -> Optional[Dict]:
55+
"""Make a GET request to GitHub API"""
56+
try:
57+
async with self.session.get(url, params=params) as response:
58+
if response.status == 200:
59+
return await response.json()
60+
elif response.status == 404:
61+
logger.warning(f"GitHub API 404: {url}")
62+
return None
63+
elif response.status == 403:
64+
logger.error(f"GitHub API rate limit exceeded: {url}")
65+
return None
66+
else:
67+
logger.error(f"GitHub API error {response.status}: {url}")
68+
return None
69+
except asyncio.TimeoutError:
70+
logger.error(f"Timeout accessing GitHub API: {url}")
71+
return None
72+
except Exception as e:
73+
logger.error(f"Error making request to {url}: {str(e)}")
74+
return None
75+
76+
async def get_user_data(self, github_username: str) -> Optional[Dict]:
77+
"""Fetch user data"""
78+
url = f"{self.base_url}/users/{github_username}"
79+
user_data = await self._make_request(url)
80+
81+
if user_data:
82+
logger.info(f"Successfully fetched user data for {github_username}")
83+
else:
84+
logger.error(f"Failed to fetch user data for {github_username}")
85+
86+
return user_data
87+
88+
async def get_user_repositories(self, github_username: str, max_repos: int = 50) -> List[Dict]:
89+
"""Fetch user repositories"""
90+
try:
91+
params = {
92+
"type": "owner",
93+
"sort": "updated",
94+
"direction": "desc",
95+
"per_page": max_repos
96+
}
97+
98+
url = f"{self.base_url}/users/{github_username}/repos"
99+
repos = await self._make_request(url, params)
100+
101+
if repos and isinstance(repos, list):
102+
logger.info(f"Successfully fetched {len(repos)} repositories for {github_username}")
103+
return repos
104+
else:
105+
logger.info(f"No repositories found for {github_username}")
106+
return []
107+
108+
except Exception as e:
109+
logger.error(f"Error fetching repositories for {github_username}: {str(e)}")
110+
return []
111+
112+
async def get_repository_languages(self, languages_url: str) -> List[str]:
113+
"""Fetch repository languages"""
114+
try:
115+
languages_data = await self._make_request(languages_url)
116+
if languages_data and isinstance(languages_data, dict):
117+
return list(languages_data.keys())
118+
return []
119+
except Exception as e:
120+
logger.warning(f"Error fetching languages from {languages_url}: {str(e)}")
121+
return []
122+
123+
async def get_user_pull_requests(self, github_username: str, max_prs: int = 100) -> List[WeaviatePullRequest]:
124+
"""Fetch pull requests"""
125+
try:
126+
params = {
127+
"q": f"author:{github_username} is:pr",
128+
"sort": "created",
129+
"order": "desc",
130+
"per_page": max_prs
131+
}
132+
133+
url = f"{self.base_url}/search/issues"
134+
search_result = await self._make_request(url, params)
135+
136+
if not search_result or "items" not in search_result:
137+
logger.info(f"No pull requests found for {github_username}")
138+
return []
139+
140+
items = search_result["items"]
141+
pull_requests = []
142+
143+
for pr_data in items:
144+
try:
145+
repo_name = "unknown"
146+
if pr_data.get("html_url"):
147+
url_parts = pr_data["html_url"].split('/')
148+
if len(url_parts) >= 5:
149+
repo_name = f"{url_parts[3]}/{url_parts[4]}"
150+
151+
merged_at = None
152+
if pr_data.get("pull_request") and pr_data["pull_request"].get("merged_at"):
153+
merged_at = pr_data["pull_request"]["merged_at"]
154+
155+
pr_obj = WeaviatePullRequest(
156+
title=pr_data["title"],
157+
body=pr_data.get("body", "")[:500] if pr_data.get("body") else "",
158+
state=pr_data["state"],
159+
repository=repo_name,
160+
created_at=pr_data.get("created_at"),
161+
closed_at=pr_data.get("closed_at"),
162+
merged_at=merged_at,
163+
labels=[label["name"] for label in pr_data.get("labels", [])],
164+
url=pr_data["html_url"]
165+
)
166+
pull_requests.append(pr_obj)
167+
168+
except Exception as e:
169+
logger.warning(f"Error processing pull request: {str(e)}")
170+
continue
171+
172+
logger.info(f"Successfully fetched {len(pull_requests)} pull requests for {github_username}")
173+
return pull_requests
174+
175+
except Exception as e:
176+
logger.error(f"Error fetching pull requests for {github_username}: {str(e)}")
177+
return []
178+
179+
async def _process_repository(self, repo_data: Dict) -> Optional[WeaviateRepository]:
180+
"""Process a single repository"""
181+
try:
182+
languages = []
183+
if repo_data.get("languages_url"):
184+
languages = await self.get_repository_languages(repo_data["languages_url"])
185+
186+
return WeaviateRepository(
187+
name=repo_data["name"],
188+
description=repo_data.get("description"),
189+
url=repo_data["html_url"],
190+
languages=languages,
191+
stars=repo_data.get("stargazers_count", 0),
192+
forks=repo_data.get("forks_count", 0)
193+
)
194+
except Exception as e:
195+
logger.warning(f"Error processing repository {repo_data.get('name', 'unknown')}: {str(e)}")
196+
return None
197+
198+
def analyze_language_frequency(self, repositories: List[WeaviateRepository]) -> List[str]:
199+
"""
200+
Analyze language frequency across repositories to identify top 5 languages.
201+
"""
202+
language_counter = Counter()
203+
for repo in repositories:
204+
language_counter.update(repo.languages)
205+
206+
top_languages = language_counter.most_common(5)
207+
logger.info(f"Top 5 languages by frequency: {top_languages}")
208+
return [lang for lang, _ in top_languages]
209+
210+
async def build_user_profile(self, user_id: str, github_username: str) -> Optional[WeaviateUserProfile]:
211+
"""
212+
Build a complete user profile for Weaviate indexing
213+
"""
214+
logger.info(f"Building user profile for GitHub user: {github_username}")
215+
216+
# Run user data, repositories, and pull requests fetch concurrently
217+
user_task = self.get_user_data(github_username)
218+
repos_task = self.get_user_repositories(github_username)
219+
prs_task = self.get_user_pull_requests(github_username)
220+
221+
try:
222+
user_data, repos_data, pull_requests = await asyncio.gather(
223+
user_task, repos_task, prs_task, return_exceptions=True
224+
)
225+
except Exception as e:
226+
logger.error(f"Error in concurrent data fetching: {str(e)}")
227+
return None
228+
229+
if isinstance(user_data, Exception) or not user_data:
230+
logger.error(f"Could not fetch user data for {github_username}")
231+
return None
232+
233+
if isinstance(repos_data, Exception):
234+
logger.warning(f"Error fetching repositories: {repos_data}")
235+
repos_data = []
236+
237+
if isinstance(pull_requests, Exception):
238+
logger.warning(f"Error fetching pull requests: {pull_requests}")
239+
pull_requests = []
240+
241+
logger.info(f"Found {len(repos_data)} repositories and {len(pull_requests)} pull requests for {github_username}")
242+
243+
repository_tasks = [self._process_repository(repo) for repo in repos_data]
244+
245+
repositories = []
246+
if repository_tasks:
247+
try:
248+
repo_results = await asyncio.gather(*repository_tasks, return_exceptions=True)
249+
repositories = [r for r in repo_results if r is not None and not isinstance(r, Exception)]
250+
except Exception as e:
251+
logger.warning(f"Error processing repositories: {str(e)}")
252+
253+
all_languages = set()
254+
all_topics = set()
255+
total_stars = 0
256+
total_forks = 0
257+
258+
for repo_obj in repositories:
259+
all_languages.update(repo_obj.languages)
260+
total_stars += repo_obj.stars
261+
total_forks += repo_obj.forks
262+
263+
for repo_data in repos_data:
264+
topics = repo_data.get("topics", [])
265+
if topics:
266+
all_topics.update(topics)
267+
268+
top_languages = self.analyze_language_frequency(repositories)
269+
270+
profile = WeaviateUserProfile(
271+
user_id=user_id,
272+
github_username=github_username,
273+
display_name=user_data.get("name"),
274+
bio=user_data.get("bio"),
275+
location=user_data.get("location"),
276+
repositories=repositories,
277+
pull_requests=pull_requests,
278+
languages=top_languages,
279+
topics=list(all_topics),
280+
followers_count=user_data.get("followers", 0),
281+
following_count=user_data.get("following", 0),
282+
total_stars_received=total_stars,
283+
total_forks=total_forks,
284+
profile_text_for_embedding="", # TODO: Invoke agent/llm to generate this
285+
last_updated=datetime.now()
286+
)
287+
288+
logger.info(
289+
f"Successfully built profile for {github_username}: "
290+
f"{len(repositories)} repos, {len(top_languages)} top languages, "
291+
f"{len(pull_requests)} pull requests analyzed"
292+
)
293+
return profile
294+
295+
296+
async def profile_user_from_github(user_id: str, github_username: str) -> bool:
297+
"""Profile a user and store in Weaviate with proper resource management."""
298+
299+
async with GitHubUserProfiler() as profiler:
300+
try:
301+
profile = await profiler.build_user_profile(user_id, github_username)
302+
if profile:
303+
success = await store_user_profile(profile)
304+
if success:
305+
logger.info(f"Successfully stored profile for user {github_username}")
306+
return success
307+
return False
308+
except Exception as e:
309+
logger.error(f"Failed to profile user {github_username}: {str(e)}")
310+
return False
Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,32 @@
11
import weaviate
2+
from contextlib import asynccontextmanager
3+
from typing import AsyncGenerator
4+
import logging
25

3-
# Connect to local Weaviate instance
4-
client = weaviate.connect_to_local()
6+
logger = logging.getLogger(__name__)
7+
8+
_client = None
59

610

711
def get_client():
8-
return client
12+
"""Get or create the global Weaviate client instance."""
13+
global _client
14+
if _client is None:
15+
_client = weaviate.use_async_with_local()
16+
return _client
17+
18+
@asynccontextmanager
19+
async def get_weaviate_client() -> AsyncGenerator[weaviate.WeaviateClient, None]:
20+
"""Async context manager for Weaviate client."""
21+
client = get_client()
22+
try:
23+
await client.connect()
24+
yield client
25+
except Exception as e:
26+
logger.error(f"Weaviate client error: {str(e)}")
27+
raise
28+
finally:
29+
try:
30+
await client.close()
31+
except Exception as e:
32+
logger.warning(f"Error closing Weaviate client: {str(e)}")

0 commit comments

Comments
 (0)