Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions backend/app/agents/devrel/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@
from langgraph.graph import StateGraph, END
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.checkpoint.memory import InMemorySaver

from ..base_agent import BaseAgent, AgentState
from .tools.search_tool.ddg import DuckDuckGoSearchTool
from .tools.faq_tool import FAQTool
from .github.github_toolkit import GitHubToolkit
from app.core.config import settings

from .nodes.gather_context import gather_context_node
from .nodes.summarization import check_summarization_needed, summarize_conversation_node, store_summary_to_database
from .nodes.react_supervisor import react_supervisor_node, supervisor_decision_router
from .tool_wrappers import web_search_tool_node, faq_handler_tool_node, onboarding_tool_node, github_toolkit_tool_node
from .tool_wrappers import web_search_tool_node, onboarding_tool_node, github_toolkit_tool_node, thinking_node_tool_node
from .nodes.generate_response import generate_response_node


# βœ… Import new FAQ handler
from app.agents.devrel.nodes.handlers.faq import handle_faq_node_with_llm
logger = logging.getLogger(__name__)


class DevRelAgent(BaseAgent):
"""DevRel LangGraph Agent for community support and engagement"""

Expand All @@ -28,7 +33,6 @@ def __init__(self, config: Dict[str, Any] = None):
google_api_key=settings.gemini_api_key
)
self.search_tool = DuckDuckGoSearchTool()
self.faq_tool = FAQTool()
self.github_toolkit = GitHubToolkit()
self.checkpointer = InMemorySaver()
super().__init__("DevRelAgent", self.config)
Expand All @@ -38,12 +42,16 @@ def _build_graph(self):
workflow = StateGraph(AgentState)

# Phase 1: Gather Context

workflow.add_node("gather_context", gather_context_node)
workflow.add_node("thinking_node", partial(thinking_node_tool_node, llm=self.llm))
workflow.add_edge("gather_context", "thinking_node")
workflow.add_edge("thinking_node", "react_supervisor")

# Phase 2: ReAct Supervisor - Decide what to do next
workflow.add_node("react_supervisor", partial(react_supervisor_node, llm=self.llm))
workflow.add_node("web_search_tool", partial(web_search_tool_node, search_tool=self.search_tool, llm=self.llm))
workflow.add_node("faq_handler_tool", partial(faq_handler_tool_node, faq_tool=self.faq_tool))
workflow.add_node("faq_handler_tool", partial(handle_faq_node_with_llm, llm=self.llm)) # βœ… Updated usage
workflow.add_node("onboarding_tool", onboarding_tool_node)
workflow.add_node("github_toolkit_tool", partial(github_toolkit_tool_node, github_toolkit=self.github_toolkit))

Expand All @@ -56,7 +64,6 @@ def _build_graph(self):

# Entry point
workflow.set_entry_point("gather_context")
workflow.add_edge("gather_context", "react_supervisor")

# ReAct supervisor routing
workflow.add_conditional_edges(
Expand Down
2 changes: 1 addition & 1 deletion backend/app/agents/devrel/github/tools/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async def handle_web_search(query: str) -> Dict[str, Any]:

try:
search_tool = DuckDuckGoSearchTool()
search_results = await search_tool.search(query, max_results=5)
search_results = await search_tool._arun(query)

if not search_results:
return {
Expand Down
71 changes: 64 additions & 7 deletions backend/app/agents/devrel/nodes/handlers/faq.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,83 @@
import logging
from typing import List

from app.agents.state import AgentState
from app.agents.devrel.tools.search_tool.duckduckgo_tool import DuckDuckGoSearchTool
from langchain_core.messages import HumanMessage
from langchain_core.language_models.chat_models import BaseChatModel

logger = logging.getLogger(__name__)

async def handle_faq_node(state: AgentState, faq_tool) -> dict:
"""Handle FAQ requests"""
logger.info(f"Handling FAQ for session {state.session_id}")

async def _generate_queries_with_llm(llm: BaseChatModel, question: str) -> List[str]:
"""Use LLM to generate 2–3 refined search queries for a FAQ question."""
try:
prompt = (
"You are an AI assistant. Given the following user question, "
"generate 2–3 specific search queries that can help answer it accurately from the web. "
"Only return the queries, one per line, no bullet points.\n\n"
f"User question: {question}"
)

response = await llm.ainvoke([HumanMessage(content=prompt)])
raw_queries = response.content.strip().split("\n")
queries = [q.strip() for q in raw_queries if q.strip()]
return queries or [question]
except Exception as e:
logger.warning(
f"[FAQ Handler] Query generation failed, using fallback. Error: {e}"
)
return [question]


async def handle_faq_node_with_llm(state: AgentState, llm: BaseChatModel) -> dict:
"""Handles general FAQ by searching the web with LLM-generated queries."""
logger.info(f"[FAQ Handler] Handling FAQ for session {state.session_id}")

# Extract latest user message
latest_message = ""
if state.messages:
latest_message = state.messages[-1].get("content", "")
elif state.context.get("original_message"):
latest_message = state.context["original_message"]

# faq_tool will be passed from the agent, similar to llm for classify_intent
faq_response = await faq_tool.get_response(latest_message)
if not latest_message:
logger.warning("[FAQ Handler] No user question found.")
return {
"task_result": {
"type": "faq",
"response": "Sorry, I couldn't find the question to answer.",
"source": "web_faq_llm"
},
"current_task": "faq_handled"
}

# Generate refined queries
queries = await _generate_queries_with_llm(llm, latest_message)

# Run web search
search_tool = DuckDuckGoSearchTool()
results = []

for query in queries:
try:
result = await search_tool.search(query)
results.append(f"Query: {query}\n{result}")
except Exception as e:
logger.warning(
f"[FAQ Handler] Search failed for query: {query} β€” {e}"
)

# Combine and return results
combined_result = (
"\n\n".join(results) if results else "Sorry, no results found from the web."
)

return {
"task_result": {
"type": "faq",
"response": faq_response,
"source": "faq_database"
"response": combined_result,
"source": "web_faq_llm"
},
"current_task": "faq_handled"
}
8 changes: 5 additions & 3 deletions backend/app/agents/devrel/nodes/handlers/web_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ async def handle_web_search_node(state: AgentState, search_tool, llm) -> dict:
latest_message = state.context["original_message"]

search_query = await _extract_search_query(latest_message, llm)
search_results = await search_tool.search(search_query)

# Fix: Use arun() instead of search()
search_results = await search_tool.arun(search_query)

return {
"task_result": {
Expand All @@ -51,7 +53,7 @@ def create_search_response(task_result: Dict[str, Any]) -> str:
"""
Create a user-friendly response string from search results.
"""

query = task_result.get("query")
results = task_result.get("results", [])

Expand All @@ -61,7 +63,7 @@ def create_search_response(task_result: Dict[str, Any]) -> str:
response_parts = [f"Here's what I found for '{query}':"]
for i, result in enumerate(results[:5]):
title = result.get('title', 'N/A')
snippet = result.get('content', 'N/A')
snippet = result.get('content', 'N/A')
url = result.get('url', '#')
response_parts.append(f"{i+1}. {title}: {snippet}")
response_parts.append(f" (Source: {url})")
Expand Down
38 changes: 36 additions & 2 deletions backend/app/agents/devrel/tool_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from typing import Dict, Any
from app.agents.state import AgentState
from .nodes.react_supervisor import add_tool_result
from .nodes.handlers.faq import handle_faq_node
from app.agents.devrel.nodes.handlers.faq import handle_faq_node_with_llm
from .nodes.handlers.web_search import handle_web_search_node
from .nodes.handlers.onboarding import handle_onboarding_node
from langchain_core.messages import HumanMessage

logger = logging.getLogger(__name__)

Expand All @@ -20,7 +21,7 @@ async def faq_handler_tool_node(state: AgentState, faq_tool) -> Dict[str, Any]:
"""Execute FAQ handler tool and add result to ReAct context"""
logger.info(f"Executing FAQ handler tool for session {state.session_id}")

handler_result = await handle_faq_node(state, faq_tool)
handler_result = await handle_faq_node_with_llm(state, faq_tool)
tool_result = handler_result.get("task_result", {})
return add_tool_result(state, "faq_handler", tool_result)

Expand Down Expand Up @@ -55,3 +56,36 @@ async def github_toolkit_tool_node(state: AgentState, github_toolkit) -> Dict[st
}

return add_tool_result(state, "github_toolkit", tool_result)


async def thinking_node_tool_node(state: AgentState, llm) -> Dict[str, Any]:
"""Rephrase user query into a clean and clear question"""
logger.info(f"Executing Thinking Node for session {state.session_id}")

latest_message = ""
if state.messages:
latest_message = state.messages[-1].get("content", "")
elif state.context.get("original_message"):
latest_message = state.context["original_message"]

# Prompt to rephrase
prompt = (
"Rephrase this user query into a clear and formal question "
"suitable for an FAQ or web search:\n\n"
f"'{latest_message}'"
)
Comment on lines +72 to +76
Copy link
Contributor

@smokeyScraper smokeyScraper Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ReAct supervisor node is the router right? won't aligning the query this way make it more aligned for FAQ or web search? as this thinking node is present prior to the router

have you tried any interactions, if yes please do share.


try:
llm_response = await llm.ainvoke([HumanMessage(content=prompt)])
clean_question = llm_response.content.strip()
except Exception as e:
logger.error(f"Thinking node LLM error: {e}")
clean_question = latest_message # fallback

# Store in state context
state.context["rephrased_query"] = clean_question

return add_tool_result(state, "thinking_node", {
"type": "thinking_node",
"rephrased_query": clean_question
})
56 changes: 17 additions & 39 deletions backend/app/agents/devrel/tools/search_tool/ddg.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,21 @@
import asyncio
import logging
from typing import List, Dict, Any
from ddgs import DDGS
from langsmith import traceable
from langchain.tools import BaseTool
from typing import Type
from pydantic import BaseModel, Field
from app.agents.devrel.tools.search_tool.duckduckgo_tool import DuckDuckGoSearchTool

logger = logging.getLogger(__name__)
class DuckDuckGoSearchInput(BaseModel):
query: str = Field(..., description="Search query")

class DuckDuckGoSearchTool:
"""DDGS-based DuckDuckGo search integration"""
class DuckDuckGoSearchLangTool(BaseTool):
name: str = "duckduckgo_search"
description: str = "Search the web using DuckDuckGo"
args_schema: Type[BaseModel] = DuckDuckGoSearchInput

def __init__(self):
pass
def _run(self, query: str) -> str:
tool = DuckDuckGoSearchTool()
return tool.search(query)

def _perform_search(self, query: str, max_results: int):
with DDGS() as ddg:
return ddg.text(query, max_results=max_results)

@traceable(name="duckduckgo_search_tool", run_type="tool")
async def search(self, query: str, max_results: int = 5) -> List[Dict[str, Any]]:
try:
response = await asyncio.to_thread(
self._perform_search,
query=query,
max_results=max_results
)

results = []
for result in response or []:
results.append({
"title": result.get("title", ""),
"content": result.get("body", ""),
"url": result.get("href", ""),
"score": 0
})
return results

except (ConnectionError, TimeoutError) as e:
logger.warning("Network issue during DDG search: %s", e)
return []
except Exception as e:
logger.error("DuckDuckGo search failed: %s", str(e))
return []
async def _arun(self, query: str) -> str:
tool = DuckDuckGoSearchTool()
return await tool.search(query)
# Use .search here, NOT .run or ._run
33 changes: 33 additions & 0 deletions backend/app/agents/devrel/tools/search_tool/duckduckgo_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import logging
from ddgs import DDGS

logger = logging.getLogger(__name__)

class DuckDuckGoSearchTool:
async def search(self, query: str) -> str:
logger.info(f"Performing DuckDuckGo search for: {query}")
try:
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=5))

if not results:
logger.info("No search results found")
return "No search results found."

# Format results for the agent
formatted_results = []
for result in results:
formatted_results.append({
'title': result.get('title', ''),
'content': result.get('body', ''),
'url': result.get('href', '')
})

return str(formatted_results)

except ConnectionError as e:
logger.error(f"Network error during search: {e}")
return f"Network error: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error during search: {e}")
return f"Search failed: {str(e)}"
2 changes: 0 additions & 2 deletions backend/app/core/orchestration/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ def __init__(self):
self.connection: Optional[aio_pika.RobustConnection] = None
self.channel: Optional[aio_pika.abc.AbstractChannel] = None



async def connect(self):
try:
rabbitmq_url = getattr(settings, 'rabbitmq_url', 'amqp://guest:guest@localhost/')
Expand Down
Loading