Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions src/mcp_scan/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
import sys

import aiohttp
import psutil
import rich
from rich.logging import RichHandler
Expand Down Expand Up @@ -572,6 +573,11 @@ def main():
add_server_arguments(proxy_parser)
add_install_arguments(proxy_parser)

# EVO command
evo_parser = subparsers.add_parser("evo", help="Push scan results to Snyk Evo")
# use the same parser as scan
setup_scan_parser(evo_parser)

# Parse arguments (default to 'scan' if no command provided)
if (len(sys.argv) == 1 or sys.argv[1] not in subparsers.choices) and (
not (len(sys.argv) == 2 and sys.argv[1] == "--help")
Expand Down Expand Up @@ -691,12 +697,78 @@ def server(on_exit=None):
from mcp_scan.mcp_server import install_mcp_server

sys.exit(install_mcp_server(args))
elif args.command == "evo":
asyncio.run(evo(args))
sys.exit(0)
else:
# This shouldn't happen due to argparse's handling
rich.print(f"[bold red]Unknown command: {args.command}[/bold red]")
parser.print_help()
sys.exit(1)

async def evo(args):
"""
Pushes the scan results to the Evo API.

1. Creates a client_id (shared secret)
2. Pushes scan results to the Evo API
3. Revokes the client_id
"""

rich.print(f"Go to https://app.snyk.io and select the tenant on the left nav bar. Copy the Tenant ID from the URL and paste it here: ")
tenant_id = input().strip()
rich.print(f"Paste the Authorization token from https://app.snyk.io/account (API Token -> KEY -> click to show): ")
token = input().strip()

push_key_url = f"https://api.snyk.io/hidden/tenants/{tenant_id}/mcp-scan/push-key?version=2025-08-28"
push_scan_url = f"https://api.snyk.io/hidden/mcp-scan/push?version=2025-08-28"

# create a client_id (shared secret)
client_id = None
try:
async with aiohttp.ClientSession() as session:
async with session.post(push_key_url, data="", headers={"Authorization": f"token {token}"}) as resp:
if resp.status not in (200, 201):
text = await resp.text()
rich.print(f"[bold red]Request failed[/bold red]: HTTP {resp.status} - {text}")
return
data = await resp.json()
client_id = data.get("client_id")
if not client_id:
rich.print(f"[bold red]Unexpected response[/bold red]: {data}")
return
rich.print(f"Client ID created")
except Exception as e:
rich.print(f"[bold red]Error calling Snyk API[/bold red]: {e}")
return

# Update the default scan args
args.control_servers=[
{
"url": push_scan_url,
"identifier": None,
"opt_out": False,
"headers": [f"x-client-id:{client_id}"]
}
]
await run_scan_inspect(mode="scan", args=args)

# revoke the created client_id
del_headers = {
"Content-Type": "application/json",
"Authorization": f"token {token}",
"x-client-id": client_id,
}
try:
async with aiohttp.ClientSession() as session:
async with session.delete(push_key_url, headers=del_headers) as del_resp:
if del_resp.status not in (200, 204):
text = await del_resp.text()
rich.print(f"[bold red]Failed to revoke client_id[/bold red]: HTTP {del_resp.status} - {text}")
rich.print("Client ID revoked")
except Exception as e:
rich.print(f"[bold red]Error revoking client_id[/bold red]: {e}")


async def run_scan_inspect(mode="scan", args=None):
async with MCPScanner(additional_headers=parse_headers(args.verification_H), **vars(args)) as scanner:
Expand Down