checking system…
Docs / back / src/maf/sources/adapters/mcp_remote.py · line 66
Python · 171 lines
  1"""Generic MCP source adapter — call any HTTP/SSE MCP server as a tool.
  2
  3Wraps the official ``mcp`` SDK's ``streamable_http`` client so MAF agents
  4can pull data from any Model Context Protocol server with one binding.
  5
  6Practical use: pin EODHD's hosted MCP server
  7(``https://mcpv2.eodhd.dev/v1/mcp?apikey=${EODHD_API_KEY}``) to give an
  8equity-research agent access to 77 financial-data tools without writing
  9a per-API adapter for each one.
 10
 11Config keys (overridable per-call via ``params``):
 12    url:        MCP server URL (default $MCP_URL, no default fallback).
 13    tool:       Tool name to call (required; e.g. ``get_fundamentals``).
 14    params:     dict passed verbatim as the tool's arguments. Merged with
 15                runtime ``params`` (runtime wins).
 16    headers:    optional dict of extra HTTP headers (auth tokens, etc.).
 17    timeout_s:  HTTP timeout (default 30).
 18    init_timeout_s: time budget for MCP session init (default 10).
 19
 20Degradation: returns ``{"error": "...", "data": []}`` instead of raising
 21when the URL is missing, the server is unreachable, init fails, or the
 22tool call errors. Agents that bind this adapter keep running with an
 23empty payload rather than crashing the arena.
 24"""
 25
 26from __future__ import annotations
 27
 28import logging
 29import os
 30from typing import Any
 31
 32from maf.sources.base import BaseSource
 33
 34logger = logging.getLogger(__name__)
 35
 36
 37DEFAULT_TIMEOUT_S = 30.0
 38DEFAULT_INIT_TIMEOUT_S = 10.0
 39
 40
 41def _resolve_url(cfg: dict[str, Any]) -> str | None:
 42    """Resolve URL from config, then ``MCP_URL`` env. Expand $VARS first."""
 43    raw = cfg.get("url") or os.environ.get("MCP_URL")
 44    if not raw:
 45        return None
 46    # Allow ${EODHD_API_KEY}-style placeholders inside the URL so configs
 47    # can read secrets from env without inlining them in YAML.
 48    return os.path.expandvars(str(raw))
 49
 50
 51def _summarise_result(result: Any) -> dict[str, Any]:
 52    """Convert MCP CallToolResult to a JSON-safe dict.
 53
 54    The SDK returns a Pydantic model with ``content`` (list of TextContent
 55    or ImageContent), ``isError``, and ``structuredContent`` (optional).
 56    We unpack it into a stable shape MAF agents can read.
 57    """
 58    text_parts: list[str] = []
 59    other_parts: list[dict[str, Any]] = []
 60    for c in getattr(result, "content", []) or []:
 61        ctype = getattr(c, "type", None)
 62        if ctype == "text":
 63            text_parts.append(getattr(c, "text", ""))
 64        else:
 65            other_parts.append({
 66                "type": ctype,
 67                "repr": str(c),
 68            })
 69    structured = getattr(result, "structuredContent", None) or getattr(result, "structured_content", None)
 70    return {
 71        "is_error": bool(getattr(result, "isError", False)),
 72        "text": "\n\n".join(text_parts).strip(),
 73        "structured": structured,
 74        "other": other_parts,
 75    }
 76
 77
 78class MCPSource(BaseSource):
 79    """Call any MCP server via streamable-HTTP.
 80
 81    Bound once per (server, tool) — bind the same server multiple times if
 82    you want multiple tools from it. Each call opens a fresh MCP session
 83    because the SDK's client is stream-scoped; this is fine for the
 84    once-per-arena-run use case but ill-suited for high-throughput
 85    polling.
 86    """
 87
 88    adapter_name = "mcp_remote"
 89
 90    @classmethod
 91    def freshness_spec(cls, binding_config: dict[str, Any]) -> dict[str, Any]:
 92        url = binding_config.get("url") or os.environ.get("MCP_URL") or "(unset)"
 93        tool = binding_config.get("tool") or "(any)"
 94        return {
 95            "type": "external",
 96            "detail": f"MCP {url} · tool={tool}",
 97        }
 98
 99    async def fetch(self, params: dict[str, Any] | None = None) -> dict[str, Any]:
100        cfg: dict[str, Any] = {**self.config, **(params or {})}
101        tool = cfg.get("tool")
102        url = _resolve_url(cfg)
103
104        if not url:
105            return self._err(tool, None, "MCP URL not set — supply 'url' or $MCP_URL")
106        if not tool:
107            return self._err(None, url, "MCP tool name required (config.tool)")
108        # Don't ship the placeholder-resolved URL on every error — it may
109        # contain secrets. We strip query string for the error message only.
110        url_safe = url.split("?", 1)[0]
111
112        # Merge static config.params with runtime params; runtime wins.
113        tool_args: dict[str, Any] = {}
114        for src in (self.config.get("params") or {}, params or {}):
115            if isinstance(src, dict):
116                tool_args.update(src)
117        # Pop control keys that aren't tool arguments.
118        for k in ("url", "tool", "params", "headers", "timeout_s", "init_timeout_s"):
119            tool_args.pop(k, None)
120
121        headers = dict(self.config.get("headers") or {})
122        timeout_s = float(cfg.get("timeout_s") or DEFAULT_TIMEOUT_S)
123        init_timeout_s = float(cfg.get("init_timeout_s") or DEFAULT_INIT_TIMEOUT_S)
124
125        try:
126            import asyncio
127            from mcp.client.session import ClientSession
128            from mcp.client.streamable_http import streamablehttp_client
129        except ImportError as exc:
130            return self._err(tool, url_safe,
131                f"mcp package not installed ({exc}). Run: pip install 'mcp>=1.0'")
132
133        try:
134            async with streamablehttp_client(
135                url=url, headers=headers, timeout=timeout_s,
136            ) as (read_stream, write_stream, _get_session_id):
137                async with ClientSession(read_stream, write_stream) as session:
138                    await asyncio.wait_for(session.initialize(), timeout=init_timeout_s)
139                    result = await asyncio.wait_for(
140                        session.call_tool(tool, arguments=tool_args),
141                        timeout=timeout_s,
142                    )
143            payload = _summarise_result(result)
144        except asyncio.TimeoutError:
145            return self._err(tool, url_safe, f"MCP call timed out after {timeout_s}s")
146        except Exception as exc:
147            logger.exception("MCP call failed: %s %s", url_safe, tool)
148            return self._err(tool, url_safe, f"{type(exc).__name__}: {exc}")
149
150        if payload.get("is_error"):
151            return self._err(tool, url_safe,
152                f"tool returned isError=true: {payload.get('text','')[:200]}")
153
154        return {
155            "type": "mcp_remote",
156            "url": url_safe,
157            "tool": tool,
158            "args": tool_args,
159            "data": payload,
160        }
161
162    @staticmethod
163    def _err(tool: str | None, url: str | None, msg: str) -> dict[str, Any]:
164        return {
165            "type": "mcp_remote",
166            "url": url,
167            "tool": tool,
168            "data": {"text": "", "structured": None, "is_error": True, "other": []},
169            "error": msg,
170        }