1"""Generic MCP source adapter — call any HTTP/SSE MCP server as a tool. 2 3Wraps the official ``mcp`` SDK's ``streamable_http`` client so MAF agents 4can pull data from any Model Context Protocol server with one binding. 5 6Practical use: pin EODHD's hosted MCP server 7(``https://mcpv2.eodhd.dev/v1/mcp?apikey=${EODHD_API_KEY}``) to give an 8equity-research agent access to 77 financial-data tools without writing 9a per-API adapter for each one. 10 11Config keys (overridable per-call via ``params``): 12 url: MCP server URL (default $MCP_URL, no default fallback). 13 tool: Tool name to call (required; e.g. ``get_fundamentals``). 14 params: dict passed verbatim as the tool's arguments. Merged with 15 runtime ``params`` (runtime wins). 16 headers: optional dict of extra HTTP headers (auth tokens, etc.). 17 timeout_s: HTTP timeout (default 30). 18 init_timeout_s: time budget for MCP session init (default 10). 19 20Degradation: returns ``{"error": "...", "data": []}`` instead of raising 21when the URL is missing, the server is unreachable, init fails, or the 22tool call errors. Agents that bind this adapter keep running with an 23empty payload rather than crashing the arena. 24""" 25 26from __future__ import annotations 27 28import logging 29import os 30from typing import Any 31 32from maf.sources.base import BaseSource 33 34logger = logging.getLogger(__name__) 35 36 37DEFAULT_TIMEOUT_S = 30.0 38DEFAULT_INIT_TIMEOUT_S = 10.0 39 40 41def _resolve_url(cfg: dict[str, Any]) -> str | None: 42 """Resolve URL from config, then ``MCP_URL`` env. Expand $VARS first.""" 43 raw = cfg.get("url") or os.environ.get("MCP_URL") 44 if not raw: 45 return None 46 # Allow ${EODHD_API_KEY}-style placeholders inside the URL so configs 47 # can read secrets from env without inlining them in YAML. 48 return os.path.expandvars(str(raw)) 49 50 51def _summarise_result(result: Any) -> dict[str, Any]: 52 """Convert MCP CallToolResult to a JSON-safe dict. 53 54 The SDK returns a Pydantic model with ``content`` (list of TextContent 55 or ImageContent), ``isError``, and ``structuredContent`` (optional). 56 We unpack it into a stable shape MAF agents can read. 57 """ 58 text_parts: list[str] = [] 59 other_parts: list[dict[str, Any]] = [] 60 for c in getattr(result, "content", []) or []: 61 ctype = getattr(c, "type", None) 62 if ctype == "text": 63 text_parts.append(getattr(c, "text", "")) 64 else: 65 other_parts.append({ 66 "type": ctype, 67 "repr": str(c), 68 }) 69 structured = getattr(result, "structuredContent", None) or getattr(result, "structured_content", None) 70 return { 71 "is_error": bool(getattr(result, "isError", False)), 72 "text": "\n\n".join(text_parts).strip(), 73 "structured": structured, 74 "other": other_parts, 75 } 76 77 78class MCPSource(BaseSource): 79 """Call any MCP server via streamable-HTTP. 80 81 Bound once per (server, tool) — bind the same server multiple times if 82 you want multiple tools from it. Each call opens a fresh MCP session 83 because the SDK's client is stream-scoped; this is fine for the 84 once-per-arena-run use case but ill-suited for high-throughput 85 polling. 86 """ 87 88 adapter_name = "mcp_remote" 89 90 @classmethod 91 def freshness_spec(cls, binding_config: dict[str, Any]) -> dict[str, Any]: 92 url = binding_config.get("url") or os.environ.get("MCP_URL") or "(unset)" 93 tool = binding_config.get("tool") or "(any)" 94 return { 95 "type": "external", 96 "detail": f"MCP {url} · tool={tool}", 97 } 98 99 async def fetch(self, params: dict[str, Any] | None = None) -> dict[str, Any]: 100 cfg: dict[str, Any] = {**self.config, **(params or {})} 101 tool = cfg.get("tool") 102 url = _resolve_url(cfg) 103 104 if not url: 105 return self._err(tool, None, "MCP URL not set — supply 'url' or $MCP_URL") 106 if not tool: 107 return self._err(None, url, "MCP tool name required (config.tool)") 108 # Don't ship the placeholder-resolved URL on every error — it may 109 # contain secrets. We strip query string for the error message only. 110 url_safe = url.split("?", 1)[0] 111 112 # Merge static config.params with runtime params; runtime wins. 113 tool_args: dict[str, Any] = {} 114 for src in (self.config.get("params") or {}, params or {}): 115 if isinstance(src, dict): 116 tool_args.update(src) 117 # Pop control keys that aren't tool arguments. 118 for k in ("url", "tool", "params", "headers", "timeout_s", "init_timeout_s"): 119 tool_args.pop(k, None) 120 121 headers = dict(self.config.get("headers") or {}) 122 timeout_s = float(cfg.get("timeout_s") or DEFAULT_TIMEOUT_S) 123 init_timeout_s = float(cfg.get("init_timeout_s") or DEFAULT_INIT_TIMEOUT_S) 124 125 try: 126 import asyncio 127 from mcp.client.session import ClientSession 128 from mcp.client.streamable_http import streamablehttp_client 129 except ImportError as exc: 130 return self._err(tool, url_safe, 131 f"mcp package not installed ({exc}). Run: pip install 'mcp>=1.0'") 132 133 try: 134 async with streamablehttp_client( 135 url=url, headers=headers, timeout=timeout_s, 136 ) as (read_stream, write_stream, _get_session_id): 137 async with ClientSession(read_stream, write_stream) as session: 138 await asyncio.wait_for(session.initialize(), timeout=init_timeout_s) 139 result = await asyncio.wait_for( 140 session.call_tool(tool, arguments=tool_args), 141 timeout=timeout_s, 142 ) 143 payload = _summarise_result(result) 144 except asyncio.TimeoutError: 145 return self._err(tool, url_safe, f"MCP call timed out after {timeout_s}s") 146 except Exception as exc: 147 logger.exception("MCP call failed: %s %s", url_safe, tool) 148 return self._err(tool, url_safe, f"{type(exc).__name__}: {exc}") 149 150 if payload.get("is_error"): 151 return self._err(tool, url_safe, 152 f"tool returned isError=true: {payload.get('text','')[:200]}") 153 154 return { 155 "type": "mcp_remote", 156 "url": url_safe, 157 "tool": tool, 158 "args": tool_args, 159 "data": payload, 160 } 161 162 @staticmethod 163 def _err(tool: str | None, url: str | None, msg: str) -> dict[str, Any]: 164 return { 165 "type": "mcp_remote", 166 "url": url, 167 "tool": tool, 168 "data": {"text": "", "structured": None, "is_error": True, "other": []}, 169 "error": msg, 170 }