Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ SUPABASE_SERVICE_ROLE_KEY=
DISCORD_BOT_TOKEN=
# ENABLE_DISCORD_BOT=true

GITHUB_TOKEN=

# EMBEDDING_MODEL=BAAI/bge-small-en-v1.5
# EMBEDDING_MAX_BATCH_SIZE=32
# EMBEDDING_DEVICE=cpu
Expand Down
12 changes: 11 additions & 1 deletion backend/app/api/v1/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from fastapi.responses import HTMLResponse
from app.db.supabase.supabase_client import get_supabase_client
from app.db.supabase.users_service import find_user_by_session_and_verify, get_verification_session_info
from app.db.weaviate.user_profiling import profile_user_from_github
from typing import Optional
import logging
import asyncio

logger = logging.getLogger(__name__)
router = APIRouter()
Expand Down Expand Up @@ -67,7 +69,15 @@ async def auth_callback(request: Request, code: Optional[str] = Query(None), ses
logger.error("User verification failed - no pending verification found")
return _error_response("No pending verification found or verification has expired. Please try the !verify_github command again.")

logger.info(f"Successfully verified user: {verified_user.id}")
logger.info(f"Successfully verified user: {verified_user.id}!")

logger.info(f"Indexing user: {verified_user.id} into Weaviate...")
try:
asyncio.create_task(profile_user_from_github(str(verified_user.id), github_username))
logger.info(f"User profiling started in background for: {verified_user.id}")
except Exception as e:
logger.error(f"Error starting user profiling: {verified_user.id}: {str(e)}")

return _success_response(github_username)

except Exception as e:
Expand Down
310 changes: 310 additions & 0 deletions backend/app/db/weaviate/user_profiling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
import logging
import asyncio
import aiohttp
from typing import List, Optional, Dict
from datetime import datetime
from collections import Counter
from app.model.weaviate.models import WeaviateUserProfile, WeaviateRepository, WeaviatePullRequest
from app.db.weaviate.weaviate_operations import store_user_profile
from app.core.config import settings

logger = logging.getLogger(__name__)


class GitHubUserProfiler:
"""
Class to handle GitHub user profiling and Weaviate storage.
Uses organization's GitHub token to fetch public user data via GitHub REST API.
"""

def __init__(self):
if not settings.github_token:
raise ValueError("GitHub token not configured in environment variables")

self.headers = {
"Authorization": f"token {settings.github_token}",
"Accept": "application/vnd.github.v3+json",
"User-Agent": "DevRel-AI-Bot/1.0"
}
self.base_url = "https://api.github.com"
self.session = None

async def __aenter__(self):
"""Create async HTTP session"""
timeout = aiohttp.ClientTimeout(total=60, connect=10, sock_read=30)
connector = aiohttp.TCPConnector(
limit=50, # Total connection pool size
limit_per_host=10, # Per-host connection limit
ttl_dns_cache=300, # DNS cache TTL
use_dns_cache=True,
)

self.session = aiohttp.ClientSession(
headers=self.headers,
timeout=timeout,
connector=connector
)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Close async HTTP session"""
if self.session:
await self.session.close()

async def _make_request(self, url: str, params: Dict = None) -> Optional[Dict]:
"""Make a GET request to GitHub API"""
try:
async with self.session.get(url, params=params) as response:
if response.status == 200:
return await response.json()
elif response.status == 404:
logger.warning(f"GitHub API 404: {url}")
return None
elif response.status == 403:
logger.error(f"GitHub API rate limit exceeded: {url}")
return None
else:
logger.error(f"GitHub API error {response.status}: {url}")
return None
except asyncio.TimeoutError:
logger.error(f"Timeout accessing GitHub API: {url}")
return None
except Exception as e:
logger.error(f"Error making request to {url}: {str(e)}")
return None

async def get_user_data(self, github_username: str) -> Optional[Dict]:
"""Fetch user data"""
url = f"{self.base_url}/users/{github_username}"
user_data = await self._make_request(url)

if user_data:
logger.info(f"Successfully fetched user data for {github_username}")
else:
logger.error(f"Failed to fetch user data for {github_username}")

return user_data

async def get_user_repositories(self, github_username: str, max_repos: int = 50) -> List[Dict]:
"""Fetch user repositories"""
try:
params = {
"type": "owner",
"sort": "updated",
"direction": "desc",
"per_page": max_repos
}

url = f"{self.base_url}/users/{github_username}/repos"
repos = await self._make_request(url, params)

if repos and isinstance(repos, list):
logger.info(f"Successfully fetched {len(repos)} repositories for {github_username}")
return repos
else:
logger.info(f"No repositories found for {github_username}")
return []

except Exception as e:
logger.error(f"Error fetching repositories for {github_username}: {str(e)}")
return []

async def get_repository_languages(self, languages_url: str) -> List[str]:
"""Fetch repository languages"""
try:
languages_data = await self._make_request(languages_url)
if languages_data and isinstance(languages_data, dict):
return list(languages_data.keys())
return []
except Exception as e:
logger.warning(f"Error fetching languages from {languages_url}: {str(e)}")
return []

async def get_user_pull_requests(self, github_username: str, max_prs: int = 100) -> List[WeaviatePullRequest]:
"""Fetch pull requests"""
try:
params = {
"q": f"author:{github_username} is:pr",
"sort": "created",
"order": "desc",
"per_page": max_prs
}

url = f"{self.base_url}/search/issues"
search_result = await self._make_request(url, params)

if not search_result or "items" not in search_result:
logger.info(f"No pull requests found for {github_username}")
return []

items = search_result["items"]
pull_requests = []

for pr_data in items:
try:
repo_name = "unknown"
if pr_data.get("html_url"):
url_parts = pr_data["html_url"].split('/')
if len(url_parts) >= 5:
repo_name = f"{url_parts[3]}/{url_parts[4]}"

merged_at = None
if pr_data.get("pull_request") and pr_data["pull_request"].get("merged_at"):
merged_at = pr_data["pull_request"]["merged_at"]

pr_obj = WeaviatePullRequest(
title=pr_data["title"],
body=pr_data.get("body", "")[:500] if pr_data.get("body") else "",
state=pr_data["state"],
repository=repo_name,
created_at=pr_data.get("created_at"),
closed_at=pr_data.get("closed_at"),
merged_at=merged_at,
labels=[label["name"] for label in pr_data.get("labels", [])],
url=pr_data["html_url"]
)
pull_requests.append(pr_obj)

except Exception as e:
logger.warning(f"Error processing pull request: {str(e)}")
continue

logger.info(f"Successfully fetched {len(pull_requests)} pull requests for {github_username}")
return pull_requests

except Exception as e:
logger.error(f"Error fetching pull requests for {github_username}: {str(e)}")
return []

async def _process_repository(self, repo_data: Dict) -> Optional[WeaviateRepository]:
"""Process a single repository"""
try:
languages = []
if repo_data.get("languages_url"):
languages = await self.get_repository_languages(repo_data["languages_url"])

return WeaviateRepository(
name=repo_data["name"],
description=repo_data.get("description"),
url=repo_data["html_url"],
languages=languages,
stars=repo_data.get("stargazers_count", 0),
forks=repo_data.get("forks_count", 0)
)
except Exception as e:
logger.warning(f"Error processing repository {repo_data.get('name', 'unknown')}: {str(e)}")
return None

def analyze_language_frequency(self, repositories: List[WeaviateRepository]) -> List[str]:
"""
Analyze language frequency across repositories to identify top 5 languages.
"""
language_counter = Counter()
for repo in repositories:
language_counter.update(repo.languages)

top_languages = language_counter.most_common(5)
logger.info(f"Top 5 languages by frequency: {top_languages}")
return [lang for lang, _ in top_languages]

async def build_user_profile(self, user_id: str, github_username: str) -> Optional[WeaviateUserProfile]:
"""
Build a complete user profile for Weaviate indexing
"""
logger.info(f"Building user profile for GitHub user: {github_username}")

# Run user data, repositories, and pull requests fetch concurrently
user_task = self.get_user_data(github_username)
repos_task = self.get_user_repositories(github_username)
prs_task = self.get_user_pull_requests(github_username)

try:
user_data, repos_data, pull_requests = await asyncio.gather(
user_task, repos_task, prs_task, return_exceptions=True
)
except Exception as e:
logger.error(f"Error in concurrent data fetching: {str(e)}")
return None

if isinstance(user_data, Exception) or not user_data:
logger.error(f"Could not fetch user data for {github_username}")
return None

if isinstance(repos_data, Exception):
logger.warning(f"Error fetching repositories: {repos_data}")
repos_data = []

if isinstance(pull_requests, Exception):
logger.warning(f"Error fetching pull requests: {pull_requests}")
pull_requests = []

logger.info(f"Found {len(repos_data)} repositories and {len(pull_requests)} pull requests for {github_username}")

repository_tasks = [self._process_repository(repo) for repo in repos_data]

repositories = []
if repository_tasks:
try:
repo_results = await asyncio.gather(*repository_tasks, return_exceptions=True)
repositories = [r for r in repo_results if r is not None and not isinstance(r, Exception)]
except Exception as e:
logger.warning(f"Error processing repositories: {str(e)}")

all_languages = set()
all_topics = set()
total_stars = 0
total_forks = 0

for repo_obj in repositories:
all_languages.update(repo_obj.languages)
total_stars += repo_obj.stars
total_forks += repo_obj.forks

for repo_data in repos_data:
topics = repo_data.get("topics", [])
if topics:
all_topics.update(topics)

top_languages = self.analyze_language_frequency(repositories)

profile = WeaviateUserProfile(
user_id=user_id,
github_username=github_username,
display_name=user_data.get("name"),
bio=user_data.get("bio"),
location=user_data.get("location"),
repositories=repositories,
pull_requests=pull_requests,
languages=top_languages,
topics=list(all_topics),
followers_count=user_data.get("followers", 0),
following_count=user_data.get("following", 0),
total_stars_received=total_stars,
total_forks=total_forks,
profile_text_for_embedding="", # TODO: Invoke agent/llm to generate this
last_updated=datetime.now()
)

logger.info(
f"Successfully built profile for {github_username}: "
f"{len(repositories)} repos, {len(top_languages)} top languages, "
f"{len(pull_requests)} pull requests analyzed"
)
return profile
Comment on lines +210 to +293
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider refactoring this method to reduce complexity.

This method has 22 local variables (pylint recommends max 15). Consider extracting some logic into helper methods, such as:

  • Repository processing logic (lines 243-266)
  • Language and topic aggregation (lines 253-268)

Would you like me to help refactor this method into smaller, more focused helper methods?

🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 210-210: Too many local variables (22/15)

(R0914)

🤖 Prompt for AI Agents
In backend/app/db/weaviate/user_profiling.py from lines 210 to 293, the
build_user_profile method is too complex with 22 local variables, exceeding
pylint's recommended maximum of 15. To fix this, extract the repository
processing logic (lines 243-266) into a separate helper method that handles
processing repositories and filtering results. Also, move the language and topic
aggregation logic (lines 253-268) into another helper method that computes all
languages, topics, stars, and forks totals. Then call these helper methods from
build_user_profile to simplify it and reduce local variables.



async def profile_user_from_github(user_id: str, github_username: str) -> bool:
"""Profile a user and store in Weaviate with proper resource management."""

async with GitHubUserProfiler() as profiler:
try:
profile = await profiler.build_user_profile(user_id, github_username)
if profile:
success = await store_user_profile(profile)
if success:
logger.info(f"Successfully stored profile for user {github_username}")
return success
return False
except Exception as e:
logger.error(f"Failed to profile user {github_username}: {str(e)}")
return False
30 changes: 27 additions & 3 deletions backend/app/db/weaviate/weaviate_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,32 @@
import weaviate
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import logging

# Connect to local Weaviate instance
client = weaviate.connect_to_local()
logger = logging.getLogger(__name__)

_client = None


def get_client():
return client
"""Get or create the global Weaviate client instance."""
global _client
if _client is None:
_client = weaviate.use_async_with_local()
return _client

@asynccontextmanager
async def get_weaviate_client() -> AsyncGenerator[weaviate.WeaviateClient, None]:
"""Async context manager for Weaviate client."""
client = get_client()
try:
await client.connect()
yield client
except Exception as e:
logger.error(f"Weaviate client error: {str(e)}")
raise
finally:
try:
await client.close()
except Exception as e:
logger.warning(f"Error closing Weaviate client: {str(e)}")
Loading