Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/mcp_scan/MCPScanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import re
import time
import traceback
from collections import defaultdict
from collections.abc import Callable
Expand Down Expand Up @@ -78,6 +79,7 @@ def __init__(
additional_headers: dict | None = None,
control_servers: list | None = None,
skip_ssl_verify: bool = False,
scan_context: dict | None = None,
**kwargs: Any,
):
logger.info("Initializing MCPScanner")
Expand All @@ -97,6 +99,7 @@ def __init__(
self.control_servers = control_servers
self.verbose = verbose
self.skip_ssl_verify = skip_ssl_verify
self.scan_context = scan_context if scan_context is not None else {}
logger.debug(
"MCPScanner initialized with timeout: %d, checks_per_server: %d", server_timeout, checks_per_server
)
Expand Down Expand Up @@ -299,6 +302,7 @@ async def scan(self) -> list[ScanPathResult]:
result_awaited = await asyncio.gather(*result)

logger.debug("Calling Backend")
time_start = time.perf_counter()
result_verified = await analyze_machine(
result_awaited,
analysis_url=self.analysis_url,
Expand All @@ -309,6 +313,8 @@ async def scan(self) -> list[ScanPathResult]:
verbose=self.verbose,
skip_ssl_verify=self.skip_ssl_verify,
)
self.scan_context["scan_time_seconds"] = time.perf_counter() - time_start

logger.debug("Result verified: %s", result_verified)
logger.debug("Saving storage file")
self.storage_file.save()
Expand Down
27 changes: 18 additions & 9 deletions src/mcp_scan/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from mcp_scan.MCPScanner import MCPScanner
from mcp_scan.printer import print_scan_result
from mcp_scan.Storage import Storage
from mcp_scan.upload import upload
from mcp_scan.upload import get_hostname, upload
from mcp_scan.utils import parse_headers
from mcp_scan.version import version_info
from mcp_scan.well_known_clients import WELL_KNOWN_MCP_PATHS, client_shorthands_to_paths
Expand Down Expand Up @@ -712,6 +712,7 @@ def server(on_exit=None):
parser.print_help()
sys.exit(1)


async def evo(args):
"""
Pushes the scan results to the Evo API.
Expand All @@ -721,13 +722,15 @@ async def evo(args):
3. Revokes the client_id
"""

rich.print(f"Go to https://app.snyk.io and select the tenant on the left nav bar. Copy the Tenant ID from the URL and paste it here: ")
rich.print(
"Go to https://app.snyk.io and select the tenant on the left nav bar. Copy the Tenant ID from the URL and paste it here: "
)
tenant_id = input().strip()
rich.print(f"Paste the Authorization token from https://app.snyk.io/account (API Token -> KEY -> click to show): ")
rich.print("Paste the Authorization token from https://app.snyk.io/account (API Token -> KEY -> click to show): ")
token = input().strip()

push_key_url = f"https://api.snyk.io/hidden/tenants/{tenant_id}/mcp-scan/push-key?version=2025-08-28"
push_scan_url = f"https://api.snyk.io/hidden/mcp-scan/push?version=2025-08-28"
push_scan_url = "https://api.snyk.io/hidden/mcp-scan/push?version=2025-08-28"

# create a client_id (shared secret)
client_id = None
Expand All @@ -743,18 +746,18 @@ async def evo(args):
if not client_id:
rich.print(f"[bold red]Unexpected response[/bold red]: {data}")
return
rich.print(f"Client ID created")
rich.print("Client ID created")
except Exception as e:
rich.print(f"[bold red]Error calling Snyk API[/bold red]: {e}")
return

# Update the default scan args
args.control_servers=[
args.control_servers = [
{
"url": push_scan_url,
"identifier": None,
"identifier": get_hostname() or None,
"opt_out": False,
"headers": [f"x-client-id:{client_id}"]
"headers": [f"x-client-id:{client_id}"],
}
]
await run_scan_inspect(mode="scan", args=args)
Expand All @@ -777,7 +780,12 @@ async def evo(args):


async def run_scan_inspect(mode="scan", args=None):
async with MCPScanner(additional_headers=parse_headers(args.verification_H), **vars(args)) as scanner:
# Initialize scan_context dict that can be populated during scanning
scan_context = {"cli_version": version_info}

async with MCPScanner(
additional_headers=parse_headers(args.verification_H), scan_context=scan_context, **vars(args)
) as scanner:
if mode == "scan":
result = await scanner.scan()
elif mode == "inspect":
Expand All @@ -796,6 +804,7 @@ async def run_scan_inspect(mode="scan", args=None):
verbose=getattr(args, "verbose", False),
additional_headers=parse_headers(server_config["headers"]),
skip_ssl_verify=getattr(args, "skip_ssl_verify", False),
scan_context=scan_context,
)
return result

Expand Down
1 change: 1 addition & 0 deletions src/mcp_scan/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,3 +375,4 @@ class AnalysisServerResponse(BaseModel):
class ScanPathResultsCreate(BaseModel):
scan_path_results: list[ScanPathResult]
scan_user_info: ScanUserInfo
scan_metadata: dict[str, Any] | None = None
8 changes: 7 additions & 1 deletion src/mcp_scan/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ async def upload(
additional_headers: dict | None = None,
max_retries: int = 3,
skip_ssl_verify: bool = False,
scan_context: dict | None = None,
) -> None:
"""
Upload the scan results to the control server with retry logic.
Expand All @@ -75,6 +76,7 @@ async def upload(
additional_headers: Additional HTTP headers to send
max_retries: Maximum number of retry attempts (default: 3)
skip_ssl_verify: Whether to disable SSL certificate verification (default: False)
scan_context: Optional dict containing scan context metadata to include in upload
"""
if not results:
logger.info("No scan results to upload")
Expand All @@ -94,7 +96,11 @@ async def upload(
result.client = get_client_from_path(result.path) or result.client or result.path
results_with_servers.append(result)

payload = ScanPathResultsCreate(scan_path_results=results_with_servers, scan_user_info=user_info)
payload = ScanPathResultsCreate(
scan_path_results=results_with_servers,
scan_user_info=user_info,
scan_metadata=scan_context if scan_context else None,
)

last_exception = None
trace_configs = setup_aiohttp_debug_logging(verbose=verbose)
Expand Down
Loading