diff --git a/docs/mcp.md b/docs/mcp.md index 4ee7b5781..9b37861ae 100644 --- a/docs/mcp.md +++ b/docs/mcp.md @@ -320,6 +320,32 @@ agent = Agent( ) ``` +## Resources + +MCP servers can expose resources that provide context data to agents. Resources are URI-addressable pieces of data (such as configuration files, documentation, or metrics) that agents can read and use. Servers that support resources expose two methods: + +- `list_resources()` enumerates the available resources. +- `read_resource(uri)` fetches the content of a specific resource by its URI. + +```python +from agents import Agent + +resources_result = await server.list_resources() +for resource in resources_result.resources: + print(f"Resource: {resource.name} ({resource.uri})") + +resource_result = await server.read_resource("config://app/settings") +config_content = resource_result.contents[0].text + +agent = Agent( + name="Config Assistant", + instructions=f"You are a helpful assistant. Current configuration:\n{config_content}", + mcp_servers=[server], +) +``` + +Resources can use any URI scheme (e.g., `file://`, `http://`, `config://`, `docs://`) and can contain text or binary data. Unlike prompts which generate instructions, resources provide raw context data that agents can reference during execution. + ## Caching Every agent run calls `list_tools()` on each MCP server. Remote servers can introduce noticeable latency, so all of the MCP diff --git a/examples/mcp/resource_server/README.md b/examples/mcp/resource_server/README.md new file mode 100644 index 000000000..200a15c32 --- /dev/null +++ b/examples/mcp/resource_server/README.md @@ -0,0 +1,99 @@ +# MCP Resource Server Example + +This example demonstrates MCP resource support for providing context data to agents. Resources allow MCP servers to expose data that agents can read and use as context for their responses. + +## What are MCP Resources? + +MCP resources are named, URI-addressable pieces of data that can be read by agents. Unlike prompts (which generate instructions) or tools (which perform actions), resources provide static or dynamic context data that agents can reference. + +Common use cases: +- Configuration files and settings +- API documentation +- System metrics and logs +- Knowledge base articles +- Templates and examples + +## Running the Example + +```bash +uv run python examples/mcp/resource_server/main.py +``` + +## How it Works + +### Server Side (`server.py`) + +The example server provides four resources: + +1. **`config://app/settings`** - Application configuration +2. **`docs://api/overview`** - API documentation +3. **`data://metrics/summary`** - System metrics +4. **`docs://security/guidelines`** - Security best practices + +Each resource is defined using the `@mcp.resource()` decorator: + +```python +@mcp.resource("config://app/settings") +def get_app_settings() -> str: + """Application configuration settings""" + return """# Application Settings + ... + """ +``` + +### Client Side (`main.py`) + +The client demonstrates several ways to use resources: + +1. **List available resources** using `list_resources()` +2. **Read resource content** using `read_resource(uri)` +3. **Provide resources as context** to agents + +Example usage: + +```python +# List resources +resources_result = await server.list_resources() +for resource in resources_result.resources: + print(f"Resource: {resource.name} - {resource.uri}") + +# Read a specific resource +resource_result = await server.read_resource("config://app/settings") +content = resource_result.contents[0].text + +# Use resource content in agent instructions +agent = Agent( + name="Config Assistant", + instructions=f"You are a helpful assistant. Configuration: {content}", + mcp_servers=[server], +) +``` + +## Demos Included + +1. **Configuration Assistant** - Answers questions about app configuration +2. **API Documentation Assistant** - Helps users understand the API +3. **Security Reviewer** - Reviews code using security guidelines +4. **Metrics Analyzer** - Analyzes system performance metrics + +## Resource URI Schemes + +Resources can use any URI scheme. Common patterns: + +- `file://` - File system resources +- `http://` or `https://` - Web resources +- `config://` - Configuration resources +- `docs://` - Documentation resources +- `data://` - Data resources +- Custom schemes for domain-specific resources + +## Next Steps + +Try modifying the example to: + +1. Add new resources (e.g., error codes, examples, schemas) +2. Use different URI schemes +3. Provide dynamic content (e.g., current timestamp, system status) +4. Combine multiple resources in agent context +5. Implement resource templates for parameterized content + diff --git a/examples/mcp/resource_server/main.py b/examples/mcp/resource_server/main.py new file mode 100644 index 000000000..8258cfe9b --- /dev/null +++ b/examples/mcp/resource_server/main.py @@ -0,0 +1,207 @@ +import asyncio +import os +import shutil +import subprocess +import time +from typing import Any + +from agents import Agent, Runner, gen_trace_id, trace +from agents.mcp import MCPServer, MCPServerStreamableHttp +from agents.model_settings import ModelSettings + + +async def get_resource_content(mcp_server: MCPServer, uri: str) -> str: + """Get resource content by URI""" + print(f"Reading resource: {uri}") + + try: + resource_result = await mcp_server.read_resource(uri) + if resource_result.contents: + content = resource_result.contents[0] + # Handle both text and blob content + if hasattr(content, "text"): + return content.text + else: + return str(content) + return "Resource content not available" + except Exception as e: + print(f"Failed to read resource: {e}") + return f"Error reading resource: {e}" + + +async def show_available_resources(mcp_server: MCPServer): + """Show available resources""" + print("=== AVAILABLE RESOURCES ===") + + resources_result = await mcp_server.list_resources() + print(f"Found {len(resources_result.resources)} resource(s):") + for i, resource in enumerate(resources_result.resources, 1): + description = ( + resource.description + if hasattr(resource, "description") and resource.description + else "No description" + ) + mime_type = resource.mimeType if hasattr(resource, "mimeType") else "unknown" + print(f" {i}. {resource.name}") + print(f" URI: {resource.uri}") + print(f" Type: {mime_type}") + print(f" Description: {description}") + print() + + +async def demo_config_assistant(mcp_server: MCPServer): + """Demo: Assistant that uses configuration resource""" + print("=== CONFIGURATION ASSISTANT DEMO ===") + + # Read configuration resource + config = await get_resource_content(mcp_server, "config://app/settings") + + # Create agent with configuration context + agent = Agent( + name="Config Assistant", + instructions=f"""You are a helpful assistant that knows about the application configuration. + +Here is the current configuration: +{config} + +Answer questions about the configuration clearly and concisely.""", + model_settings=ModelSettings(tool_choice="auto"), + ) + + question = "What database is the application using and what's the connection pool size?" + print(f"Question: {question}") + + result = await Runner.run(starting_agent=agent, input=question) + print(f"Answer: {result.final_output}") + print("\n" + "=" * 50 + "\n") + + +async def demo_api_documentation_assistant(mcp_server: MCPServer): + """Demo: Assistant that uses API documentation resource""" + print("=== API DOCUMENTATION ASSISTANT DEMO ===") + + # Read API documentation resource + api_docs = await get_resource_content(mcp_server, "docs://api/overview") + + # Create agent with API documentation context + agent = Agent( + name="API Documentation Assistant", + instructions=f"""You are an API documentation assistant. + +Here is the API documentation: +{api_docs} + +Help users understand and use the API effectively.""", + model_settings=ModelSettings(tool_choice="auto"), + ) + + question = "How do I authenticate with the API and what are the rate limits?" + print(f"Question: {question}") + + result = await Runner.run(starting_agent=agent, input=question) + print(f"Answer: {result.final_output}") + print("\n" + "=" * 50 + "\n") + + +async def demo_security_reviewer(mcp_server: MCPServer): + """Demo: Security reviewer that uses security guidelines resource""" + print("=== SECURITY REVIEWER DEMO ===") + + # Read security guidelines resource + security_guidelines = await get_resource_content(mcp_server, "docs://security/guidelines") + + # Create agent with security context + agent = Agent( + name="Security Reviewer", + instructions=f"""You are a security expert reviewing code and configurations. + +Security Guidelines: +{security_guidelines} + +Review code and provide security recommendations based on these guidelines.""", + model_settings=ModelSettings(tool_choice="auto"), + ) + + code_to_review = """ +def login(username, password): + query = f"SELECT * FROM users WHERE username='{username}' AND password='{password}'" + result = db.execute(query) + return result +""" + + print(f"Code to review: {code_to_review}") + + result = await Runner.run( + starting_agent=agent, + input=f"Please review this login function for security issues:\n{code_to_review}", + ) + print(f"Security Review: {result.final_output}") + print("\n" + "=" * 50 + "\n") + + +async def demo_metrics_analyzer(mcp_server: MCPServer): + """Demo: Metrics analyzer that uses metrics resource""" + print("=== METRICS ANALYZER DEMO ===") + + # Read metrics resource + metrics = await get_resource_content(mcp_server, "data://metrics/summary") + + # Create agent with metrics context + agent = Agent( + name="Metrics Analyzer", + instructions=f"""You are a system performance analyst. + +Current System Metrics: +{metrics} + +Analyze the metrics and provide insights about system performance and health.""", + model_settings=ModelSettings(tool_choice="auto"), + ) + + question = "What's the current system health and are there any concerning metrics?" + print(f"Question: {question}") + + result = await Runner.run(starting_agent=agent, input=question) + print(f"Analysis: {result.final_output}") + print("\n" + "=" * 50 + "\n") + + +async def main(): + async with MCPServerStreamableHttp( + name="Resource Server", + params={"url": "http://localhost:8000/mcp"}, + ) as server: + trace_id = gen_trace_id() + with trace(workflow_name="MCP Resource Demo", trace_id=trace_id): + print(f"Trace: https://platform.openai.com/traces/trace?trace_id={trace_id}\n") + + await show_available_resources(server) + await demo_config_assistant(server) + await demo_api_documentation_assistant(server) + await demo_security_reviewer(server) + await demo_metrics_analyzer(server) + + +if __name__ == "__main__": + if not shutil.which("uv"): + raise RuntimeError("uv is not installed") + + process: subprocess.Popen[Any] | None = None + try: + this_dir = os.path.dirname(os.path.abspath(__file__)) + server_file = os.path.join(this_dir, "server.py") + + print("Starting Resource Server...") + process = subprocess.Popen(["uv", "run", server_file]) + time.sleep(3) + print("Server started\n") + except Exception as e: + print(f"Error starting server: {e}") + exit(1) + + try: + asyncio.run(main()) + finally: + if process: + process.terminate() + print("Server terminated.") diff --git a/examples/mcp/resource_server/server.py b/examples/mcp/resource_server/server.py new file mode 100644 index 000000000..f19166802 --- /dev/null +++ b/examples/mcp/resource_server/server.py @@ -0,0 +1,152 @@ +from mcp.server.fastmcp import FastMCP + +# Create server +mcp = FastMCP("Resource Server") + + +# Sample resources providing context data +@mcp.resource("config://app/settings") +def get_app_settings() -> str: + """Application configuration settings""" + return """# Application Settings + +Server Configuration: +- Host: 0.0.0.0 +- Port: 8080 +- Environment: production +- Max connections: 1000 + +Database Configuration: +- Type: PostgreSQL +- Host: db.example.com +- Port: 5432 +- Database: myapp_db +- Connection pool size: 20 + +Cache Configuration: +- Type: Redis +- Host: cache.example.com +- Port: 6379 +- TTL: 3600 seconds +""" + + +@mcp.resource("docs://api/overview") +def get_api_overview() -> str: + """API documentation overview""" + return """# API Overview + +This API provides access to the core application functionality. + +## Authentication +All API requests require a valid API key in the Authorization header: +``` +Authorization: Bearer YOUR_API_KEY +``` + +## Endpoints + +### Users API +- GET /api/v1/users - List all users +- GET /api/v1/users/{id} - Get user by ID +- POST /api/v1/users - Create new user +- PUT /api/v1/users/{id} - Update user +- DELETE /api/v1/users/{id} - Delete user + +### Products API +- GET /api/v1/products - List all products +- GET /api/v1/products/{id} - Get product by ID +- POST /api/v1/products - Create new product +- PUT /api/v1/products/{id} - Update product +- DELETE /api/v1/products/{id} - Delete product + +## Rate Limits +- 1000 requests per hour per API key +- 100 requests per minute per API key + +## Error Codes +- 400: Bad Request +- 401: Unauthorized +- 403: Forbidden +- 404: Not Found +- 429: Too Many Requests +- 500: Internal Server Error +""" + + +@mcp.resource("data://metrics/summary") +def get_metrics_summary() -> str: + """System metrics summary""" + return """# System Metrics Summary + +Last Updated: 2025-11-26 12:00:00 UTC + +## Performance Metrics +- Average Response Time: 45ms +- 95th Percentile Response Time: 120ms +- 99th Percentile Response Time: 250ms +- Requests per Second: 1,250 +- Error Rate: 0.02% + +## Resource Usage +- CPU Usage: 35% +- Memory Usage: 4.2 GB / 16 GB (26%) +- Disk Usage: 120 GB / 500 GB (24%) +- Network I/O: 15 MB/s + +## Database Metrics +- Active Connections: 45 / 100 +- Query Response Time (avg): 12ms +- Slow Queries (>1s): 3 in last hour +- Cache Hit Rate: 94% + +## User Activity +- Active Users (last hour): 1,450 +- New Registrations (today): 87 +- Total Users: 125,340 +""" + + +@mcp.resource("docs://security/guidelines") +def get_security_guidelines() -> str: + """Security best practices and guidelines""" + return """# Security Guidelines + +## Authentication & Authorization +1. Always use strong passwords (min 12 characters) +2. Enable 2FA for all accounts +3. Use role-based access control (RBAC) +4. Implement proper session management +5. Rotate API keys every 90 days + +## Data Protection +1. Encrypt sensitive data at rest +2. Use TLS 1.3 for data in transit +3. Implement proper input validation +4. Sanitize all user inputs +5. Follow principle of least privilege + +## API Security +1. Rate limit all endpoints +2. Validate all API requests +3. Use HMAC signatures for webhooks +4. Implement CORS properly +5. Log all security events + +## Vulnerability Management +1. Scan dependencies regularly +2. Apply security patches within 7 days +3. Conduct regular security audits +4. Maintain security incident response plan +5. Report vulnerabilities to security@example.com + +## Compliance +- GDPR compliant +- SOC 2 Type II certified +- ISO 27001 certified +- PCI DSS compliant (for payment data) +""" + + +if __name__ == "__main__": + mcp.run(transport="streamable-http") diff --git a/src/agents/mcp/server.py b/src/agents/mcp/server.py index 4fff94d0b..83c8e775b 100644 --- a/src/agents/mcp/server.py +++ b/src/agents/mcp/server.py @@ -15,7 +15,14 @@ from mcp.client.sse import sse_client from mcp.client.streamable_http import GetSessionIdCallback, streamablehttp_client from mcp.shared.message import SessionMessage -from mcp.types import CallToolResult, GetPromptResult, InitializeResult, ListPromptsResult +from mcp.types import ( + CallToolResult, + GetPromptResult, + InitializeResult, + ListPromptsResult, + ListResourcesResult, + ReadResourceResult, +) from typing_extensions import NotRequired, TypedDict from ..exceptions import UserError @@ -92,6 +99,18 @@ async def get_prompt( """Get a specific prompt from the server.""" pass + @abc.abstractmethod + async def list_resources( + self, + ) -> ListResourcesResult: + """List the resources available on the server.""" + pass + + @abc.abstractmethod + async def read_resource(self, uri: str) -> ReadResourceResult: + """Read a specific resource from the server.""" + pass + class _MCPServerWithClientSession(MCPServer, abc.ABC): """Base class for MCP servers that use a `ClientSession` to communicate with the server.""" @@ -344,6 +363,21 @@ async def get_prompt( return await self.session.get_prompt(name, arguments) + async def list_resources( + self, + ) -> ListResourcesResult: + """List the resources available on the server.""" + if not self.session: + raise UserError("Server not initialized. Make sure you call `connect()` first.") + + return await self.session.list_resources() + + async def read_resource(self, uri: str) -> ReadResourceResult: + """Read a specific resource from the server.""" + if not self.session: + raise UserError("Server not initialized. Make sure you call `connect()` first.") + return await self.session.read_resource(uri) # type: ignore[arg-type] + async def cleanup(self): """Cleanup the server.""" async with self._cleanup_lock: diff --git a/tests/mcp/helpers.py b/tests/mcp/helpers.py index dec713bf6..8fec35eb9 100644 --- a/tests/mcp/helpers.py +++ b/tests/mcp/helpers.py @@ -9,7 +9,9 @@ Content, GetPromptResult, ListPromptsResult, + ListResourcesResult, PromptMessage, + ReadResourceResult, TextContent, ) @@ -120,6 +122,21 @@ async def get_prompt( message = PromptMessage(role="user", content=TextContent(type="text", text=content)) return GetPromptResult(description=f"Fake prompt: {name}", messages=[message]) + async def list_resources(self, run_context=None, agent=None) -> ListResourcesResult: + """Return empty list of resources for fake server""" + return ListResourcesResult(resources=[]) + + async def read_resource(self, uri: str) -> ReadResourceResult: + """Return a simple resource result for fake server""" + from mcp.types import TextResourceContents + from pydantic import AnyUrl + + uri_obj = AnyUrl(uri) + contents = TextResourceContents( + uri=uri_obj, mimeType="text/plain", text=f"Fake resource content for {uri}" + ) + return ReadResourceResult(contents=[contents]) + @property def name(self) -> str: return self._server_name diff --git a/tests/mcp/test_prompt_server.py b/tests/mcp/test_prompt_server.py index 15afe28e4..86a72bbbe 100644 --- a/tests/mcp/test_prompt_server.py +++ b/tests/mcp/test_prompt_server.py @@ -66,6 +66,14 @@ async def list_tools(self, run_context=None, agent=None): async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None = None): raise NotImplementedError("This fake server doesn't support tools") + async def list_resources(self, run_context=None, agent=None): + from mcp.types import ListResourcesResult + + return ListResourcesResult(resources=[]) + + async def read_resource(self, uri: str): + raise NotImplementedError("This fake server doesn't support resources") + @property def name(self) -> str: return self._server_name diff --git a/tests/mcp/test_resource_server.py b/tests/mcp/test_resource_server.py new file mode 100644 index 000000000..69bcb0373 --- /dev/null +++ b/tests/mcp/test_resource_server.py @@ -0,0 +1,330 @@ +from typing import Any + +import pytest + +from agents import Agent, Runner +from agents.mcp import MCPServer + +from ..fake_model import FakeModel +from ..test_responses import get_text_message + + +class FakeMCPResourceServer(MCPServer): + """Fake MCP server for testing resource functionality""" + + def __init__(self, server_name: str = "fake_resource_server"): + self.resources: list[Any] = [] + self.resource_contents: dict[str, str] = {} + self._server_name = server_name + + def add_resource( + self, uri: str, name: str, description: str | None = None, mime_type: str = "text/plain" + ): + """Add a resource to the fake server""" + from mcp.types import Resource + from pydantic import AnyUrl + + uri_obj = AnyUrl(uri) + resource = Resource(uri=uri_obj, name=name, description=description, mimeType=mime_type) + self.resources.append(resource) + + def set_resource_content(self, uri: str, content: str): + """Set the content that should be returned for a resource""" + self.resource_contents[uri] = content + + async def connect(self): + pass + + async def cleanup(self): + pass + + async def list_resources(self, run_context=None, agent=None): + """List available resources""" + from mcp.types import ListResourcesResult + + return ListResourcesResult(resources=self.resources) + + async def read_resource(self, uri: str): + """Read a resource""" + from mcp.types import ReadResourceResult, TextResourceContents + from pydantic import AnyUrl + + if uri not in self.resource_contents: + raise ValueError(f"Resource '{uri}' not found") + + content = self.resource_contents[uri] + uri_obj = AnyUrl(uri) + contents = TextResourceContents(uri=uri_obj, mimeType="text/plain", text=content) + + return ReadResourceResult(contents=[contents]) + + async def list_tools(self, run_context=None, agent=None): + return [] + + async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None = None): + raise NotImplementedError("This fake server doesn't support tools") + + async def list_prompts(self, run_context=None, agent=None): + from mcp.types import ListPromptsResult + + return ListPromptsResult(prompts=[]) + + async def get_prompt(self, name: str, arguments: dict[str, Any] | None = None): + raise NotImplementedError("This fake server doesn't support prompts") + + @property + def name(self) -> str: + return self._server_name + + +@pytest.mark.asyncio +async def test_list_resources(): + """Test listing available resources""" + server = FakeMCPResourceServer() + server.add_resource( + uri="file:///sample.txt", + name="sample.txt", + description="A sample text file", + ) + + result = await server.list_resources() + + assert len(result.resources) == 1 + assert str(result.resources[0].uri) == "file:///sample.txt" + assert result.resources[0].name == "sample.txt" + assert result.resources[0].description == "A sample text file" + + +@pytest.mark.asyncio +async def test_read_resource(): + """Test reading a resource""" + server = FakeMCPResourceServer() + server.add_resource( + uri="file:///sample.txt", + name="sample.txt", + description="A sample text file", + ) + server.set_resource_content("file:///sample.txt", "This is the content of sample.txt") + + result = await server.read_resource("file:///sample.txt") + + assert len(result.contents) == 1 + assert str(result.contents[0].uri) == "file:///sample.txt" + assert result.contents[0].text == "This is the content of sample.txt" + assert result.contents[0].mimeType == "text/plain" + + +@pytest.mark.asyncio +async def test_read_resource_not_found(): + """Test reading a resource that doesn't exist""" + server = FakeMCPResourceServer() + + with pytest.raises(ValueError, match="Resource 'file:///nonexistent.txt' not found"): + await server.read_resource("file:///nonexistent.txt") + + +@pytest.mark.asyncio +async def test_multiple_resources(): + """Test server with multiple resources""" + server = FakeMCPResourceServer() + + # Add multiple resources + server.add_resource( + uri="file:///doc1.txt", + name="doc1.txt", + description="First document", + ) + server.add_resource( + uri="file:///doc2.txt", + name="doc2.txt", + description="Second document", + ) + + server.set_resource_content("file:///doc1.txt", "Content of document 1") + server.set_resource_content("file:///doc2.txt", "Content of document 2") + + # Test listing resources + resources_result = await server.list_resources() + assert len(resources_result.resources) == 2 + + resource_uris = [str(r.uri) for r in resources_result.resources] + assert "file:///doc1.txt" in resource_uris + assert "file:///doc2.txt" in resource_uris + + # Test reading each resource + doc1_result = await server.read_resource("file:///doc1.txt") + assert doc1_result.contents[0].text == "Content of document 1" + + doc2_result = await server.read_resource("file:///doc2.txt") + assert doc2_result.contents[0].text == "Content of document 2" + + +@pytest.mark.asyncio +async def test_resource_with_different_mime_types(): + """Test resources with different MIME types""" + server = FakeMCPResourceServer() + + server.add_resource( + uri="file:///data.json", + name="data.json", + description="JSON data file", + mime_type="application/json", + ) + server.add_resource( + uri="file:///readme.md", + name="readme.md", + description="Markdown file", + mime_type="text/markdown", + ) + + result = await server.list_resources() + + assert len(result.resources) == 2 + json_resource = next(r for r in result.resources if str(r.uri) == "file:///data.json") + assert json_resource.mimeType == "application/json" + + md_resource = next(r for r in result.resources if str(r.uri) == "file:///readme.md") + assert md_resource.mimeType == "text/markdown" + + +@pytest.mark.asyncio +async def test_agent_with_resource_server(): + """Test using an agent with a resource server""" + server = FakeMCPResourceServer() + server.add_resource( + uri="file:///context.txt", + name="context.txt", + description="Context information", + ) + server.set_resource_content( + "file:///context.txt", "Important context: Project is written in Python 3.11" + ) + + # Get context from resource + resource_result = await server.read_resource("file:///context.txt") + context = resource_result.contents[0].text + + # Create agent with resource context + model = FakeModel() + agent = Agent( + name="resource_agent", + instructions=f"You are a helpful assistant. {context}", + model=model, + mcp_servers=[server], + ) + + # Mock model response + model.add_multiple_turn_outputs( + [[get_text_message("Based on the context, I know the project uses Python 3.11.")]] + ) + + # Run the agent + result = await Runner.run(agent, input="What version of Python does the project use?") + + assert "Python 3.11" in result.final_output or "python 3.11" in result.final_output.lower() + # Check instructions (it could be a string or callable) + instructions = agent.instructions if isinstance(agent.instructions, str) else "" + assert "Important context: Project is written in Python 3.11" in instructions + + +@pytest.mark.asyncio +@pytest.mark.parametrize("streaming", [False, True]) +async def test_agent_with_resources_streaming(streaming: bool): + """Test using resources with streaming and non-streaming""" + server = FakeMCPResourceServer() + server.add_resource( + uri="file:///config.txt", + name="config.txt", + description="Configuration", + ) + server.set_resource_content("file:///config.txt", "Server port: 8080") + + # Get configuration from resource + resource_result = await server.read_resource("file:///config.txt") + config = resource_result.contents[0].text + + # Create agent + model = FakeModel() + agent = Agent( + name="streaming_resource_agent", + instructions=f"Configuration: {config}", + model=model, + mcp_servers=[server], + ) + + model.add_multiple_turn_outputs([[get_text_message("The server runs on port 8080.")]]) + + if streaming: + streaming_result = Runner.run_streamed(agent, input="What port does the server run on?") + async for _ in streaming_result.stream_events(): + pass + final_result = streaming_result.final_output + else: + result = await Runner.run(agent, input="What port does the server run on?") + final_result = result.final_output + + assert "8080" in final_result + + +@pytest.mark.asyncio +async def test_resource_cleanup(): + """Test that resource server cleanup works correctly""" + server = FakeMCPResourceServer() + server.add_resource("file:///test.txt", "test.txt", "Test file") + server.set_resource_content("file:///test.txt", "Test content") + + # Test that server works before cleanup + result = await server.read_resource("file:///test.txt") + assert result.contents[0].text == "Test content" + + # Cleanup should not raise any errors + await server.cleanup() + + # Server should still work after cleanup (in this fake implementation) + result = await server.read_resource("file:///test.txt") + assert result.contents[0].text == "Test content" + + +@pytest.mark.asyncio +async def test_empty_resource_list(): + """Test listing resources when none are available""" + server = FakeMCPResourceServer() + + result = await server.list_resources() + + assert len(result.resources) == 0 + + +@pytest.mark.asyncio +async def test_resource_without_description(): + """Test resource without optional description""" + server = FakeMCPResourceServer() + server.add_resource( + uri="file:///minimal.txt", + name="minimal.txt", + ) + + result = await server.list_resources() + + assert len(result.resources) == 1 + assert str(result.resources[0].uri) == "file:///minimal.txt" + assert result.resources[0].name == "minimal.txt" + + +@pytest.mark.asyncio +async def test_resource_uri_formats(): + """Test various URI formats for resources""" + server = FakeMCPResourceServer() + + # Test different URI schemes + server.add_resource("file:///path/to/file.txt", "file.txt", "File URI") + server.add_resource("http://example.com/resource", "web-resource", "HTTP URI") + server.add_resource("custom://namespace/resource", "custom-resource", "Custom URI") + + result = await server.list_resources() + + assert len(result.resources) == 3 + uris = [str(r.uri) for r in result.resources] + assert "file:///path/to/file.txt" in uris + assert "http://example.com/resource" in uris + assert "custom://namespace/resource" in uris