diff --git a/backend/app/agents/devrel/agent.py b/backend/app/agents/devrel/agent.py index 4dd2af0..b6ab2dd 100644 --- a/backend/app/agents/devrel/agent.py +++ b/backend/app/agents/devrel/agent.py @@ -28,7 +28,7 @@ def __init__(self, config: Dict[str, Any] = None): google_api_key=settings.gemini_api_key ) self.search_tool = DuckDuckGoSearchTool() - self.faq_tool = FAQTool() + self.faq_tool = FAQTool(llm=self.llm) self.github_toolkit = GitHubToolkit() self.checkpointer = InMemorySaver() super().__init__("DevRelAgent", self.config) @@ -43,7 +43,9 @@ def _build_graph(self): # Phase 2: ReAct Supervisor - Decide what to do next workflow.add_node("react_supervisor", partial(react_supervisor_node, llm=self.llm)) workflow.add_node("web_search_tool", partial(web_search_tool_node, search_tool=self.search_tool, llm=self.llm)) - workflow.add_node("faq_handler_tool", partial(faq_handler_tool_node, faq_tool=self.faq_tool)) + + workflow.add_node("faq_handler_tool", partial(faq_handler_tool_node, search_tool=self.search_tool, llm=self.llm)) + workflow.add_node("onboarding_tool", onboarding_tool_node) workflow.add_node("github_toolkit_tool", partial(github_toolkit_tool_node, github_toolkit=self.github_toolkit)) diff --git a/backend/app/agents/devrel/nodes/handlers/faq.py b/backend/app/agents/devrel/nodes/handlers/faq.py index 8855c32..1a69e30 100644 --- a/backend/app/agents/devrel/nodes/handlers/faq.py +++ b/backend/app/agents/devrel/nodes/handlers/faq.py @@ -1,11 +1,25 @@ import logging +from typing import List, Dict, Any +from langchain_core.messages import HumanMessage from app.agents.state import AgentState +from app.core.config.settings import settings as app_settings + +# Configure logger for this module logger = logging.getLogger(__name__) -async def handle_faq_node(state: AgentState, faq_tool) -> dict: - """Handle FAQ requests""" - logger.info(f"Handling FAQ for session {state.session_id}") + +# Organization identity and official handles from centralized settings +ORG_NAME = app_settings.org_name +OFFICIAL_HANDLES = [app_settings.org_website, app_settings.org_github, app_settings.org_twitter] + + +async def handle_faq_node(state: AgentState, search_tool: Any, llm: Any) -> dict: + """ + Handle FAQ requests dynamically using web search and AI synthesis. + Pass official handles to search tool if it supports site-restricted queries. + """ + logger.info(f"[FAQ_HANDLER] Handling dynamic FAQ for session {state.session_id}") latest_message = "" if state.messages: @@ -13,14 +27,229 @@ async def handle_faq_node(state: AgentState, faq_tool) -> dict: elif state.context.get("original_message"): latest_message = state.context["original_message"] - # faq_tool will be passed from the agent, similar to llm for classify_intent - faq_response = await faq_tool.get_response(latest_message) + # Early exit if no message + if not latest_message: + logger.warning("[FAQ_HANDLER] Empty latest user message; returning fallback") + return { + "task_result": { + "type": "faq", + "response": _generate_fallback_response(latest_message, ORG_NAME), + "source": "dynamic_web_search" + }, + "current_task": "faq_handled" + } + + # Append site restrictions to the query if search tool supports it + try: + from urllib.parse import urlparse + domains = [] + for u in OFFICIAL_HANDLES: + try: + parsed = urlparse(u) + domain = parsed.netloc or parsed.path # handles bare domains + if domain: + domains.append(domain) + except Exception: + continue + site_filters = " OR ".join([f"site:{d}" for d in domains]) + except Exception: + site_filters = "" + logger.info(f"[FAQ_HANDLER] Applying site filters for search: {site_filters or '(none)'}") + + faq_response = await _dynamic_faq_process( + latest_message, + search_tool, + llm, + org_name=ORG_NAME, + site_filters=site_filters, + ) return { "task_result": { "type": "faq", "response": faq_response, - "source": "faq_database" + "source": "dynamic_web_search" }, "current_task": "faq_handled" } + + +async def _dynamic_faq_process( + message: str, + search_tool: Any, + llm: Any, + org_name: str = ORG_NAME, + site_filters: str = "", +) -> str: + """ + Dynamic FAQ handler implementing: + 1. Intent Detection & Query Refinement + 2. Web Search (with site restrictions) + 3. AI-Powered Synthesis + 4. Generate Final Response + 5. Format with Sources + """ + try: + # Step 1: Intent Detection & Query Refinement + logger.info(f"[FAQ_HANDLER] Step 1: Refining FAQ query for org '{org_name}'") + refined_query = await _refine_faq_query(message, llm, org_name) + + # Append site filters for restricting to official handles + if site_filters: + refined_query = f"({refined_query}) AND ({site_filters})" + logger.info(f"[FAQ_HANDLER] Refined and filtered query: {refined_query}") + + # Step 2: Dynamic Web Search + logger.info(f"[FAQ_HANDLER] Step 2: Searching for: {refined_query}") + try: + search_results = await search_tool.search(refined_query) + except Exception as search_err: + logger.error(f"[FAQ_HANDLER] Search tool error: {search_err}") + return _generate_fallback_response(message, org_name) + + if not search_results: + logger.warning(f"[FAQ_HANDLER] No results found for query: {refined_query}") + return _generate_fallback_response(message, org_name) + + # Step 3 & 4: AI-Powered Synthesis & Response Generation + logger.info("[FAQ_HANDLER] Step 3-4: Synthesizing search results into FAQ response") + synthesized_response = await _synthesize_faq_response(message, search_results, llm, org_name) + + # Step 5: Format Final Response with Sources + logger.info("[FAQ_HANDLER] Step 5: Formatting final response with sources") + final_response = _format_faq_response(synthesized_response, search_results) + + return final_response + + except Exception as e: + logger.error(f"[FAQ_HANDLER] Error in dynamic FAQ process: {e}") + return _generate_fallback_response(message, org_name) + + +async def _refine_faq_query(message: str, llm: Any, org_name: str) -> str: + """ + Step 1: Refine user query for organization-specific FAQ search. + """ + refinement_prompt = f""" +You are helping someone find information about {org_name}. +Transform their question into an effective search query that will find official information about the organization. + +User Question: "{message}" + +Create a search query that focuses on: +- Official {org_name} information +- The organization's website, blog, or documentation +- Adding terms like "about", "mission", "projects" if relevant + +Return only the refined search query, nothing else. + +Examples: +- "What does this org do?" โ†’ "{org_name} about mission what we do" +- "How do you work?" โ†’ "{org_name} how it works process methodology" +- "What projects do you have?" โ†’ "{org_name} projects portfolio what we build" +""" + response = await llm.ainvoke([HumanMessage(content=refinement_prompt)]) + refined_query = response.content.strip() + logger.info(f"[FAQ_HANDLER] Refined query: {refined_query}") + return refined_query + + +async def _synthesize_faq_response( + message: str, + search_results: List[Dict[str, Any]], + llm: Any, + org_name: str +) -> str: + """ + Step 3-4: Use LLM to synthesize search results into a comprehensive FAQ answer. + """ + results_context = "" + for i, result in enumerate(search_results[:5]): # Limit to top 5 results + title = result.get('title', 'N/A') + content = result.get('content', 'N/A') + if isinstance(content, str) and len(content) > 500: + content = content[:500] + "..." + url = result.get('url', 'N/A') + results_context += f"\nResult {i+1}:\nTitle: {title}\nContent: {content}\nURL: {url}\n" + + synthesis_prompt = f""" +You are an AI assistant representing {org_name}. A user asked: "{message}" + +Based on the following search results from official sources, provide a comprehensive, helpful answer about {org_name}. + +Search Results: +{results_context} + +Instructions: +1. Answer the user's question directly and conversationally +2. Focus on the most relevant and recent information +3. Be informative but concise (2-3 paragraphs max) +4. If the search results don't fully answer the question, acknowledge what you found +5. Sound helpful and knowledgeable about {org_name} +6. Don't mention "search results" in your response - speak as if you know about the organization + +Your response: +""" + + response = await llm.ainvoke([HumanMessage(content=synthesis_prompt)]) + synthesized_answer = response.content.strip() + logger.info(f"[FAQ_HANDLER] Synthesized FAQ response: {synthesized_answer[:100]}...") + return synthesized_answer + + +def _format_faq_response(synthesized_answer: str, search_results: List[Dict[str, Any]]) -> str: + """ + Step 5: Format the final response with sources. + """ + formatted_response = synthesized_answer + + if search_results: + formatted_response += "\n\n**๐Ÿ“š Sources:**" + for i, result in enumerate(search_results[:3]): # Show top 3 sources + title = result.get('title', 'Source') + url = result.get('url', '#') + formatted_response += f"\n{i+1}. [{title}]({url})" + + return formatted_response + + +def _generate_fallback_response(message: str, org_name: str) -> str: + """ + Generate a helpful fallback when search fails. + """ + return ( + f"I'd be happy to help you learn about {org_name}, but I couldn't find current information to answer your question: \"{message}\"\n\n" + "This might be because:\n" + "- The information isn't publicly available yet\n" + "- The search terms need to be more specific\n" + "- There might be connectivity issues\n\n" + "Try asking a more specific question, or check out our official website and documentation for the most up-to-date information about " + f"{org_name}." + ) + + +# Example usage for testing +if __name__ == "__main__": + import asyncio + from unittest.mock import AsyncMock + + class MockState: + session_id = "test_session" + messages = [{"content": "What projects does your organization have?"}] + context = {} + + async def test_faq_handler(): + mock_state = MockState() + mock_search_tool = AsyncMock() + mock_search_tool.search.return_value = [ + {"title": "Project A", "content": "Details about Project A.", "url": "https://aossie.org/projects/a"}, + {"title": "Project B", "content": "Details about Project B.", "url": "https://aossie.org/projects/b"}, + ] + mock_llm = AsyncMock() + mock_llm.ainvoke.return_value = AsyncMock(content="We have Project A and Project B focusing on AI and Web.") + + response = await handle_faq_node(mock_state, mock_search_tool, mock_llm) + print("FAQ Handler response:") + print(response) + + asyncio.run(test_faq_handler()) diff --git a/backend/app/agents/devrel/nodes/react_supervisor.py b/backend/app/agents/devrel/nodes/react_supervisor.py index c3e278c..660138f 100644 --- a/backend/app/agents/devrel/nodes/react_supervisor.py +++ b/backend/app/agents/devrel/nodes/react_supervisor.py @@ -1,22 +1,51 @@ -import logging import json +import logging +from datetime import datetime from typing import Dict, Any, Literal -from app.agents.state import AgentState from langchain_core.messages import HumanMessage + +from app.agents.state import AgentState from ..prompts.react_prompt import REACT_SUPERVISOR_PROMPT +from ..nodes.generate_response import _get_latest_message as get_latest_message_util logger = logging.getLogger(__name__) +# Configuration constants +MAX_ITERATIONS = 10 +MAX_CONVERSATION_HISTORY = 5 +VALID_ACTIONS = ["web_search", "faq_handler", "onboarding", "github_toolkit", "complete"] + + async def react_supervisor_node(state: AgentState, llm) -> Dict[str, Any]: """ReAct Supervisor: Think -> Act -> Observe""" + + if not _validate_state(state): + logger.error(f"Invalid state for session {getattr(state, 'session_id', 'unknown')}") + return _create_error_response(state, "Invalid state") + logger.info(f"ReAct Supervisor thinking for session {state.session_id}") - # Get current context latest_message = _get_latest_message(state) conversation_history = _get_conversation_history(state) tool_results = state.context.get("tool_results", []) iteration_count = state.context.get("iteration_count", 0) + + # Safety check for max iterations + if iteration_count >= MAX_ITERATIONS: + logger.warning(f"Max iterations ({MAX_ITERATIONS}) reached for session {state.session_id}") + return _create_completion_response(state, "Maximum iterations reached") + + # Safely serialize tool_results for prompt usage + try: + if tool_results: + tool_results_str = json.dumps(tool_results, indent=2, default=str) + else: + tool_results_str = "No previous tool results" + except Exception as e: + logger.warning(f"Failed to serialize tool_results: {e}") + tool_results_str = str(tool_results) if tool_results else "No previous tool results" + forced_action = state.context.get("force_next_tool") if forced_action: logger.info( @@ -52,96 +81,240 @@ async def react_supervisor_node(state: AgentState, llm) -> Dict[str, Any]: "current_task": "supervisor_forced_complete", } + prompt = REACT_SUPERVISOR_PROMPT.format( latest_message=latest_message, - platform=state.platform, - interaction_count=state.interaction_count, + platform=getattr(state, 'platform', 'unknown'), + interaction_count=getattr(state, 'interaction_count', 0), iteration_count=iteration_count, conversation_history=conversation_history, - tool_results=json.dumps(tool_results, indent=2) if tool_results else "No previous tool results" + tool_results=tool_results_str ) - response = await llm.ainvoke([HumanMessage(content=prompt)]) - decision = _parse_supervisor_decision(response.content) + try: + response = await llm.ainvoke([HumanMessage(content=prompt)]) + decision = _parse_supervisor_decision(response.content) - logger.info(f"ReAct Supervisor decision: {decision['action']}") + logger.debug(f"Current iteration: {iteration_count}") + logger.debug(f"Latest message length: {len(latest_message or '')}") + logger.info(f"ReAct Supervisor decision: {decision['action']}") + logger.debug(f"Supervisor thinking: {decision.get('thinking', '')[:100]}...") + logger.debug(f"Supervisor reasoning: {decision.get('reasoning', '')[:100]}...") + + return { + "context": { + **state.context, + "supervisor_thinking": response.content, + "supervisor_decision": decision, + "iteration_count": iteration_count + 1, + "last_action": decision['action'] + }, + "current_task": f"supervisor_decided_{decision['action']}" + } + + except Exception as e: + logger.error(f"Error in react_supervisor_node: {e}", exc_info=True) + return _create_error_response(state, f"Supervisor error: {str(e)}") - # Update state with supervisor's thinking - return { - "context": { - **state.context, - "supervisor_thinking": response.content, - "supervisor_decision": decision, - "iteration_count": iteration_count + 1 - }, - "current_task": f"supervisor_decided_{decision['action']}" - } def _parse_supervisor_decision(response: str) -> Dict[str, Any]: - """Parse the supervisor's decision from LLM response""" + """Parse the supervisor's decision from LLM response with better handling.""" + decision = {"action": "complete", "reasoning": "", "thinking": ""} + try: - lines = response.strip().split('\n') - decision = {"action": "complete", "reasoning": "", "thinking": ""} + if not response or not response.strip(): + logger.warning("Empty response from supervisor, defaulting to complete") + return decision + + current_section = None + content_buffer = [] + + for line in response.strip().split("\n"): + line = line.strip() + if not line: + continue - for line in lines: if line.startswith("THINK:"): - decision["thinking"] = line.replace("THINK:", "").strip() + if current_section and content_buffer: + decision[current_section] = " ".join(content_buffer) + current_section = "thinking" + content_buffer = [line.replace("THINK:", "").strip()] + elif line.startswith("ACT:"): + if current_section and content_buffer: + decision[current_section] = " ".join(content_buffer) action = line.replace("ACT:", "").strip().lower() - if action in ["web_search", "faq_handler", "onboarding", "github_toolkit", "complete"]: + if action in VALID_ACTIONS: decision["action"] = action + else: + logger.warning(f"Invalid action '{action}', defaulting to 'complete'") + decision["action"] = "complete" + current_section = None + content_buffer = [] + elif line.startswith("REASON:"): - decision["reasoning"] = line.replace("REASON:", "").strip() + if current_section and content_buffer: + decision[current_section] = " ".join(content_buffer) + current_section = "reasoning" + content_buffer = [line.replace("REASON:", "").strip()] + + elif current_section and line: + content_buffer.append(line) + + if current_section and content_buffer: + decision[current_section] = " ".join(content_buffer) + + if decision["action"] not in VALID_ACTIONS: + logger.warning(f"Final validation failed for action '{decision['action']}', defaulting to 'complete'") + decision["action"] = "complete" - return decision except Exception as e: - logger.error(f"Error parsing supervisor decision: {e}") - return {"action": "complete", "reasoning": "Error in decision parsing", "thinking": ""} + logger.error(f"Error parsing supervisor decision: {e}", exc_info=True) + + return decision + def supervisor_decision_router(state: AgentState) -> Literal["web_search", "faq_handler", "onboarding", "github_toolkit", "complete"]: """Route based on supervisor's decision""" - decision = state.context.get("supervisor_decision", {}) - action = decision.get("action", "complete") + try: + decision = state.context.get("supervisor_decision", {}) + action = decision.get("action", "complete") - # Safety check for infinite loops - iteration_count = state.context.get("iteration_count", 0) - if iteration_count > 10: - logger.warning(f"Max iterations reached for session {state.session_id}") + iteration_count = state.context.get("iteration_count", 0) + if iteration_count > MAX_ITERATIONS: + logger.warning(f"Max iterations reached for session {state.session_id}") + return "complete" + + if action not in VALID_ACTIONS: + logger.warning(f"Invalid routing action '{action}', defaulting to 'complete'") + return "complete" + + logger.debug(f"Routing to: {action} (iteration {iteration_count})") + return action + + except Exception as e: + logger.error(f"Error in supervisor_decision_router: {e}", exc_info=True) return "complete" - return action def add_tool_result(state: AgentState, tool_name: str, result: Dict[str, Any]) -> Dict[str, Any]: - """Add tool result to state context""" - tool_results = state.context.get("tool_results", []) - tool_results.append({ - "tool": tool_name, - "result": result, - "iteration": state.context.get("iteration_count", 0) - }) + """Add tool result to state context with validation""" + try: + if not _validate_state(state): + logger.error("Invalid state in add_tool_result") + return {"context": state.context if hasattr(state, 'context') else {}} + + tool_results = state.context.get("tool_results", []) + + if not isinstance(result, dict): + logger.warning(f"Tool result for {tool_name} is not a dict, converting") + result = {"result": str(result)} + + tool_entry = { + "tool": tool_name, + "result": result, + "iteration": state.context.get("iteration_count", 0), + "timestamp": datetime.now().isoformat() + } + + tool_results.append(tool_entry) + + if len(tool_results) > 20: + tool_results = tool_results[-20:] + logger.debug("Trimmed tool results to last 20 entries") + + tools_used = getattr(state, 'tools_used', []) + [tool_name] + + return { + "context": { + **state.context, + "tool_results": tool_results + }, + "tools_used": tools_used, + "current_task": f"completed_{tool_name}" + } + + except Exception as e: + logger.error(f"Error in add_tool_result: {e}", exc_info=True) + return {"context": state.context if hasattr(state, 'context') else {}} + + +def _get_latest_message(state: AgentState) -> str: + """Extract the latest message from state""" + try: + return get_latest_message_util(state) + except Exception as e: + logger.error(f"Error getting latest message: {e}", exc_info=True) + return "" + +def _get_conversation_history(state: AgentState, max_messages: int = MAX_CONVERSATION_HISTORY) -> str: + """Get formatted conversation history""" + try: + if not getattr(state, 'messages', None): + return "No previous conversation" + + recent_messages = state.messages[-max_messages:] + formatted_messages = [] + for msg in recent_messages: + if isinstance(msg, dict): + role = msg.get('role', 'user') + content = msg.get('content', '') + if content: + formatted_messages.append( + f"{role}: {content[:200]}{'...' if len(content) > 200 else ''}" + ) + return "\n".join(formatted_messages) if formatted_messages else "No previous conversation" + + except Exception as e: + logger.error(f"Error getting conversation history: {e}", exc_info=True) + return "Error retrieving conversation history" + + +def _validate_state(state: AgentState) -> bool: + """Validate state before processing""" + try: + if not state: + return False + if not getattr(state, 'session_id', None): + logger.error("Invalid state: missing session_id") + return False + if not hasattr(state, 'context'): + logger.error("Invalid state: missing context") + return False + return True + except Exception as e: + logger.error(f"Error validating state: {e}", exc_info=True) + return False + + +def _create_error_response(state: AgentState, error_message: str) -> Dict[str, Any]: + """Create standardized error response""" return { "context": { - **state.context, - "tool_results": tool_results + **(state.context if hasattr(state, 'context') else {}), + "supervisor_decision": { + "action": "complete", + "reasoning": error_message, + "thinking": "Error occurred" + }, + "error": error_message }, - "tools_used": state.tools_used + [tool_name], - "current_task": f"completed_{tool_name}" + "current_task": "supervisor_decided_complete" } -def _get_latest_message(state: AgentState) -> str: - """Extract the latest message from state""" - if state.messages: - return state.messages[-1].get("content", "") - return state.context.get("original_message", "") -def _get_conversation_history(state: AgentState, max_messages: int = 5) -> str: - """Get formatted conversation history""" - if not state.messages: - return "No previous conversation" - - recent_messages = state.messages[-max_messages:] - return "\n".join([ - f"{msg.get('role', 'user')}: {msg.get('content', '')}" - for msg in recent_messages - ]) +def _create_completion_response(state: AgentState, reason: str) -> Dict[str, Any]: + """Create standardized completion response""" + return { + "context": { + **state.context, + "supervisor_decision": { + "action": "complete", + "reasoning": reason, + "thinking": "Completing task" + }, + "completion_reason": reason + }, + "current_task": "supervisor_decided_complete" + } diff --git a/backend/app/agents/devrel/prompts/faq_prompt.py b/backend/app/agents/devrel/prompts/faq_prompt.py new file mode 100644 index 0000000..3f09847 --- /dev/null +++ b/backend/app/agents/devrel/prompts/faq_prompt.py @@ -0,0 +1,62 @@ +REFINEMENT_PROMPT = """ +You are an AI assistant representing **{org_name}**. +A user asked: "{message}" + +Below are search results gathered from official sources about {org_name}. + +Search Results: +{results_context} + +--- + +### Instructions: +1. **Answer the user's question directly and conversationally.** +2. **Focus on the most relevant and recent information.** +3. **Be informative yet concise (2โ€“3 paragraphs max).** +4. **If the search results don't fully answer the question, acknowledge that clearly.** +5. **Maintain a knowledgeable and professional tone** as if you are part of {org_name}. +6. **Do not mention "search results" or sources** โ€” respond as if you know the information first-hand. + +--- + +### Your Response: +""" +SYNTHESIS_PROMPT = """ +You are an AI assistant representing **{org_name}**. +A user asked: "{message}" + +Below are search results gathered from official sources about {org_name}. + +Search Results: +{results_context} + +--- + +### Step 1: Refine User Query +Refine the user's question to make it suitable for an internal FAQ or search engine query. + +**Organization:** {org_name} +**Original User Question:** {message} + +--- + +### Step 2: Synthesize the Information +Based on the refined query and search context, generate a clear, accurate, and conversational answer. + +**Search Results Context:** +{results_context} + +--- + +### Step 3: Write the Final Answer +1. Respond directly and conversationally. +2. Highlight the most relevant and updated facts. +3. Keep it concise (2โ€“3 paragraphs). +4. Acknowledge any missing information if applicable. +5. Sound professional and informed about {org_name}. +6. Avoid mentioning โ€œsearch resultsโ€ โ€” write as though youโ€™re providing expert knowledge. + +--- + +### Your Response: +""" diff --git a/backend/app/agents/devrel/tool_wrappers.py b/backend/app/agents/devrel/tool_wrappers.py index f26d881..4726d2f 100644 --- a/backend/app/agents/devrel/tool_wrappers.py +++ b/backend/app/agents/devrel/tool_wrappers.py @@ -2,7 +2,7 @@ from typing import Dict, Any from app.agents.state import AgentState from .nodes.react_supervisor import add_tool_result -from .nodes.handlers.faq import handle_faq_node +from app.agents.devrel.tools.faq_tool import handle_faq_node from .nodes.handlers.web_search import handle_web_search_node from .nodes.handlers.onboarding import handle_onboarding_node @@ -11,23 +11,21 @@ async def web_search_tool_node(state: AgentState, search_tool, llm) -> Dict[str, Any]: """Execute web search tool and add result to ReAct context""" logger.info(f"Executing web search tool for session {state.session_id}") - handler_result = await handle_web_search_node(state, search_tool, llm) tool_result = handler_result.get("task_result", {}) return add_tool_result(state, "web_search", tool_result) -async def faq_handler_tool_node(state: AgentState, faq_tool) -> Dict[str, Any]: +async def faq_handler_tool_node(state: AgentState, search_tool, llm) -> Dict[str, Any]: """Execute FAQ handler tool and add result to ReAct context""" logger.info(f"Executing FAQ handler tool for session {state.session_id}") - handler_result = await handle_faq_node(state, faq_tool) + handler_result = await handle_faq_node(state, search_tool, llm) tool_result = handler_result.get("task_result", {}) return add_tool_result(state, "faq_handler", tool_result) async def onboarding_tool_node(state: AgentState) -> Dict[str, Any]: """Execute onboarding tool and add result to ReAct context""" logger.info(f"Executing onboarding tool for session {state.session_id}") - handler_result = await handle_onboarding_node(state) tool_result = handler_result.get("task_result", {}) state_update = add_tool_result(state, "onboarding", tool_result) @@ -45,16 +43,15 @@ async def onboarding_tool_node(state: AgentState) -> Dict[str, Any]: return state_update - async def github_toolkit_tool_node(state: AgentState, github_toolkit) -> Dict[str, Any]: """Execute GitHub toolkit tool and add result to ReAct context""" logger.info(f"Executing GitHub toolkit tool for session {state.session_id}") - latest_message = "" - if state.messages: - latest_message = state.messages[-1].get("content", "") - elif state.context.get("original_message"): - latest_message = state.context["original_message"] + latest_message = ( + state.messages[-1].get("content", "") + if state.messages else + state.context.get("original_message", "") + ) try: github_result = await github_toolkit.execute(latest_message) diff --git a/backend/app/agents/devrel/tools/faq_tool.py b/backend/app/agents/devrel/tools/faq_tool.py index df331a5..d4eeb17 100644 --- a/backend/app/agents/devrel/tools/faq_tool.py +++ b/backend/app/agents/devrel/tools/faq_tool.py @@ -1,44 +1,289 @@ import logging -from typing import Optional +import asyncio +from typing import Optional, Any, Dict, List +from pathlib import Path +from langchain_core.messages import HumanMessage +from app.agents.state import AgentState +from app.core.config.settings import settings as app_settings +from app.agents.devrel.tools.search_tool.tavilly import TavilySearchTool logger = logging.getLogger(__name__) +ORG_NAME = app_settings.org_name +OFFICIAL_HANDLES = [ + app_settings.org_website, + app_settings.org_github, + app_settings.org_twitter, +] +FAQ_SEARCH_TIMEOUT = getattr(app_settings, "faq_search_timeout", 10.0) +FAQ_LLM_TIMEOUT = getattr(app_settings, "faq_llm_timeout", 15.0) + +# Path to FAQ prompts +FAQ_PROMPTS_PATH = Path(__file__).parent / "prompt" / "faq_prompt.py" + +def load_prompt(name: str) -> str: + """ + Load a prompt from a .txt file or fallback to faq_prompt.py. + """ + prompt_path = FAQ_PROMPTS_PATH / f"{name}.txt" + + # 1. Try to load from text file + if prompt_path.exists(): + with open(prompt_path, "r", encoding="utf-8") as f: + return f.read() + + # 2. Fallback: Load from Python module + try: + from app.agents.devrel.prompts.faq_prompt import REFINEMENT_PROMPT + return getattr(REFINEMENT_PROMPT, name) + except (ImportError, AttributeError): + raise FileNotFoundError( + f"Prompt '{name}' not found in {prompt_path} or faq_prompt.py" + ) + + class FAQTool: - """FAQ handling tool""" - - # TODO: Add FAQ responses from a database to refer organization's FAQ and Repo's FAQ - - def __init__(self): - self.faq_responses = { - "what is devr.ai": "Devr.AI is an AI-powered Developer Relations assistant that helps open-source communities by automating engagement, issue tracking, and providing intelligent support to developers.", - "how do i contribute": "You can contribute by visiting our GitHub repository, checking open issues, and submitting pull requests. We welcome all types of contributions including code, documentation, and bug reports.", - "what platforms does devr.ai support": "Devr.AI integrates with Discord, Slack, GitHub, and can be extended to other platforms. We use these integrations to provide seamless developer support across multiple channels.", - "who maintains devr.ai": "Devr.AI is maintained by an open-source community of developers passionate about improving developer relations and community engagement.", - "how do i report a bug": "You can report a bug by opening an issue on our GitHub repository. Please include detailed information about the bug, steps to reproduce it, and your environment.", - "how to get started": "To get started with Devr.AI: 1) Check our documentation, 2) Join our Discord community, 3) Explore the GitHub repository, 4) Try contributing to open issues.", - "what is langgraph": "LangGraph is a framework for building stateful, multi-actor applications with large language models. We use it to create intelligent agent workflows for our DevRel automation." - } - - async def get_response(self, question: str) -> Optional[str]: - """Get FAQ response for a question""" - question_lower = question.lower().strip() - - # Direct match - if question_lower in self.faq_responses: - return self.faq_responses[question_lower] - - # Fuzzy matching - for faq_key, response in self.faq_responses.items(): - if self._is_similar_question(question_lower, faq_key): - return response - - return None - - def _is_similar_question(self, question: str, faq_key: str) -> bool: - """Check if question is similar to FAQ key""" - # Simple keyword matching - in production, use better similarity - question_words = set(question.split()) - faq_words = set(faq_key.split()) - - common_words = question_words.intersection(faq_words) - return len(common_words) >= 2 # At least 2 common words + """ + Dynamic FAQ handling tool integrating Tavily search and LLM synthesis. + Handles FAQ queries by refining search terms, fetching relevant data, + and synthesizing coherent responses with source citations. + """ + + def __init__(self, llm: Any): + """ + Initialize FAQ tool with Tavily search and LLM capabilities. + + Args: + llm: Language model for query refinement and synthesis + """ + self.search_tool = TavilySearchTool() + self.llm = llm + logger.info("[FAQ_TOOL] Initialized with Tavily search") + + async def get_response(self, state: AgentState) -> Optional[Dict[str, Any]]: + """ + Fetch and synthesize a dynamic FAQ response using web search and LLM. + """ + logger.info(f"Handling FAQ for session {state.session_id}") + + # Extract latest user message + latest_message = ( + state.messages[-1].get("content", "") + if state.messages + else state.context.get("original_message", "") + ) + + if not latest_message: + logger.warning("[FAQ_TOOL] Empty user message โ€” returning fallback") + return { + "task_result": { + "type": "faq", + "response": self._generate_fallback_response(latest_message), + "source": "tavily_search", + }, + "current_task": "faq_handled", + } + + try: + # Build site filters from official handles + site_filters = self._build_site_filters() + logger.info(f"[FAQ_TOOL] Using site filters: {site_filters or '(none)'}") + + # Process FAQ request + faq_response = await self._dynamic_faq_process( + latest_message, site_filters=site_filters + ) + + return { + "task_result": { + "type": "faq", + "response": faq_response, + "source": "tavily_search", + }, + "current_task": "faq_handled", + } + + except Exception as e: + logger.error(f"[FAQ_TOOL] Failed to handle FAQ: {e}") + return { + "task_result": { + "type": "faq", + "response": f"Sorry, something went wrong while handling your FAQ request: {e}", + "source": "error", + }, + "current_task": "faq_failed", + } + + + def _build_site_filters(self) -> str: + """Construct search site filters from official handles.""" + from urllib.parse import urlparse + + domains = [] + for u in OFFICIAL_HANDLES: + try: + parsed = urlparse(u) + domain = parsed.netloc or parsed.path + if domain: + domains.append(domain) + except Exception: + continue + + return " OR ".join([f"site:{d}" for d in domains]) if domains else "" + + + async def _dynamic_faq_process(self, message: str, site_filters: str = "") -> str: + """ + Pipeline: refine โ†’ search โ†’ synthesize โ†’ format + """ + try: + # Step 1: Refine the query + logger.debug(f"[FAQ_TOOL] Refining query for org '{ORG_NAME}'") + refined_query = await self._refine_faq_query(message) + + logger.debug(f"[FAQ_TOOL] Refined query: {refined_query}") + + # Step 2: Search with Tavily + logger.info(f"[FAQ_TOOL] Searching with Tavily: {refined_query}") + try: + search_results = await asyncio.wait_for( + self.search_tool.search(refined_query, max_results=5), + timeout=FAQ_SEARCH_TIMEOUT + ) + + # Filter results by domain if site_filters exist + if site_filters and search_results: + search_results = self._filter_results_by_domain(search_results) + logger.info(f"[FAQ_TOOL] After domain filtering: {len(search_results)} results") + + except Exception as err: + logger.error(f"[FAQ_TOOL] Search failed: {err}") + return self._generate_fallback_response(message) + + if not search_results: + logger.warning(f"[FAQ_TOOL] No results for: {refined_query}") + return self._generate_fallback_response(message) + + # Step 3: Synthesize response + logger.info("[FAQ_TOOL] Synthesizing FAQ response") + synthesized = await self._synthesize_faq_response(message, search_results) + + # Step 4: Format final response + logger.info("[FAQ_TOOL] Formatting response with citations") + return self._format_faq_response(synthesized, search_results) + + except Exception as e: + logger.error(f"[FAQ_TOOL] Error in dynamic FAQ process: {e}") + return self._generate_fallback_response(message) + + + def _filter_results_by_domain(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Filter search results to only include URLs from official domains. + """ + from urllib.parse import urlparse + + # Extract domains from official handles + allowed_domains = set() + for handle in OFFICIAL_HANDLES: + try: + parsed = urlparse(handle) + domain = parsed.netloc or parsed.path + if domain: + allowed_domains.add(domain.lower()) + except Exception: + continue + + if not allowed_domains: + return results + + # Filter results + filtered = [] + for result in results: + url = result.get("url", "") + try: + parsed = urlparse(url) + result_domain = parsed.netloc.lower() + + # Check if domain matches any allowed domain + if any(allowed in result_domain or result_domain in allowed for allowed in allowed_domains): + filtered.append(result) + except Exception: + continue + + return filtered + + + async def _refine_faq_query(self, message: str) -> str: + """Refine user's question into an optimized search query.""" + refinement_prompt = load_prompt("refinement_prompt").format( + org_name=ORG_NAME, message=message + ) + response = await self.llm.ainvoke([HumanMessage(content=refinement_prompt)]) + refined_query = response.content.strip() + logger.info(f"[FAQ_TOOL] Refined query: {refined_query}") + return refined_query + + + async def _synthesize_faq_response( + self, message: str, search_results: List[Dict[str, Any]] + ) -> str: + """Generate a synthesized answer from search results.""" + # Build context (top 5 results) + results_context = "" + for i, result in enumerate(search_results[:5]): + title = result.get("title", "N/A") + content = result.get("content", "N/A") + if isinstance(content, str) and len(content) > 500: + content = content[:500] + "..." + url = result.get("url", "N/A") + results_context += f"\nResult {i+1}:\nTitle: {title}\nContent: {content}\nURL: {url}\n" + + # Build synthesis prompt + synthesis_prompt = load_prompt("synthesis_prompt").format( + org_name=ORG_NAME, message=message, results_context=results_context + ) + + # LLM synthesis + response = await asyncio.wait_for( + self.llm.ainvoke([HumanMessage(content=synthesis_prompt)]), + timeout=FAQ_LLM_TIMEOUT, + ) + + synthesized_answer = response.content.strip() + logger.debug(f"[FAQ_TOOL] Synthesized response: {synthesized_answer[:100]}...") + return synthesized_answer + + + def _format_faq_response( + self, synthesized_answer: str, search_results: List[Dict[str, Any]] + ) -> str: + """Append top sources to the synthesized answer.""" + formatted = synthesized_answer + if search_results: + formatted += "\n\n**๐Ÿ“š Sources:**" + for i, result in enumerate(search_results[:3]): + title = result.get("title", "Source") + url = result.get("url", "#") + formatted += f"\n{i+1}. [{title}]({url})" + return formatted + + def _generate_fallback_response(self, message: str) -> str: + """Return a friendly fallback message when no data is found.""" + return ( + f"I'd love to help you learn about {ORG_NAME}, but I couldn't find current information for your question:\n" + f"{message}\n\n" + "This might be because:\n" + "- The information isn't publicly available yet\n" + "- The search terms need to be more specific\n" + "- There might be temporary connectivity issues\n\n" + f"Try asking a more specific question, or visit our official website and documentation for the most up-to-date info about {ORG_NAME}." +) + +async def handle_faq_node(state: AgentState, llm: Any) -> dict: + """ + Legacy compatibility wrapper for backward support. + Use FAQTool.get_response() directly in new code. + """ + tool = FAQTool(llm) + return await tool.get_response(state) \ No newline at end of file diff --git a/backend/app/core/config/settings.py b/backend/app/core/config/settings.py index 1349a02..1e18daa 100644 --- a/backend/app/core/config/settings.py +++ b/backend/app/core/config/settings.py @@ -2,6 +2,7 @@ from dotenv import load_dotenv from pydantic import field_validator, ConfigDict from typing import Optional +from pydantic import Field, AliasChoices load_dotenv() @@ -39,9 +40,17 @@ class Settings(BaseSettings): # Backend URL backend_url: str = "" + +# Organization identity (populated from env) + org_name: str = Field(..., validation_alias=AliasChoices("ORG_NAME", "org_name")) + org_website: str = Field(..., validation_alias=AliasChoices("ORG_WEBSITE", "org_website")) + org_github: str = Field(..., validation_alias=AliasChoices("ORG_GITHUB", "org_github")) + org_twitter: str = Field(..., validation_alias=AliasChoices("ORG_TWITTER", "org_twitter")) + # Onboarding UX toggles onboarding_show_oauth_button: bool = True + @field_validator("supabase_url", "supabase_key", mode="before") @classmethod def _not_empty(cls, v, field): diff --git a/backend/app/database/weaviate/scripts/create_schemas.py b/backend/app/database/weaviate/scripts/create_schemas.py index 5362349..3d9f64a 100644 --- a/backend/app/database/weaviate/scripts/create_schemas.py +++ b/backend/app/database/weaviate/scripts/create_schemas.py @@ -54,5 +54,3 @@ def main(): asyncio.run(create_all_schemas()) -if __name__ == "__main__": - main() diff --git a/backend/app/database/weaviate/scripts/populate_db.py b/backend/app/database/weaviate/scripts/populate_db.py index 12bb9eb..e1340d7 100644 --- a/backend/app/database/weaviate/scripts/populate_db.py +++ b/backend/app/database/weaviate/scripts/populate_db.py @@ -302,5 +302,4 @@ def main(): asyncio.run(populate_all_collections()) -if __name__ == "__main__": - main() + diff --git a/env.example b/env.example index 6ed55bc..b8f63cf 100644 --- a/env.example +++ b/env.example @@ -26,4 +26,10 @@ DEVREL_AGENT_MODEL=gemini-2.5-flash GITHUB_AGENT_MODEL=gemini-2.5-flash CLASSIFICATION_AGENT_MODEL=gemini-2.0-flash AGENT_TIMEOUT=30 -MAX_RETRIES=3 \ No newline at end of file +MAX_RETRIES=3 + +# Explicit handles for faq.py +OFFICIAL_HANDLE_1=${ORG_WEBSITE} +OFFICIAL_HANDLE_2=${ORG_GITHUB} +OFFICIAL_HANDLE_3=${ORG_TWITTER} + diff --git a/tests/test_embedding_service.py b/tests/test_embedding_service.py index 9794beb..82ee49f 100644 --- a/tests/test_embedding_service.py +++ b/tests/test_embedding_service.py @@ -52,6 +52,3 @@ def test_clear_cache(self): _ = self.embedding_service.model self.assertIsNotNone(self.embedding_service._model) -# run the tests -if __name__ == "__main__": - unittest.main() \ No newline at end of file