checking system…
Docs / back / src/maf/agents/specialist.py · line 73
Python · 445 lines
  1"""SpecialistAgent — data-gathering agent that emits a structured AgentSignal.
  2
  3Like AnalystAgent but with a two-part output:
  4  1. Structured signal block  (BULLISH/BEARISH/NEUTRAL + confidence + key_factors)
  5  2. Full narrative section   (stored in state["reports"][agent_name])
  6
  7The signal block is parsed from a JSON section in the LLM response and
  8accumulated into state["agent_signals"] for the synthesis phase.
  9
 10Prompt contract
 11---------------
 12The system prompt for each specialist must end with the following instruction
 13(the agent class injects it automatically so individual prompts stay clean):
 14
 15  OUTPUT FORMAT
 16  After your analysis, respond with exactly this JSON block (no markdown fence),
 17  then a separator line, then your full narrative:
 18
 19  {"signal":"BULLISH","confidence":0.75,"summary":"…≤25 words…","key_factors":["…","…"]}
 20  ---NARRATIVE---
 21  <your full analysis here>
 22
 23The agent class parses the JSON, falls back to NEUTRAL if parsing fails.
 24"""
 25
 26from __future__ import annotations
 27
 28import json
 29import logging
 30import re
 31import time
 32from typing import Any
 33
 34from maf.config import AgentConfig
 35from maf.core.agent import AgentContext, BaseAgent
 36from maf.core.react import Tool, react_loop
 37from maf.core.state import AgentSignal, ArenaState
 38
 39logger = logging.getLogger(__name__)
 40
 41# Domain names derived from agent name (price_analyst → price, etc.)
 42_DOMAIN_FROM_NAME: dict[str, str] = {
 43    "price_analyst": "price",
 44    "sentiment_analyst": "sentiment",
 45    "onchain_analyst": "onchain",
 46    "macro_analyst": "macro",
 47    "risk_analyst": "risk",
 48}
 49
 50_OUTPUT_FORMAT_INSTRUCTION = """
 51
 52## Required Output Format
 53
 54After completing your analysis, respond with exactly this JSON object on a single line
 55(no markdown fences), followed immediately by the separator, then your full
 56narrative analysis:
 57
 58{"signal":"BULLISH","confidence":0.70,"summary":"One sentence under 25 words.","key_factors":["factor 1","factor 2","factor 3"]}
 59---NARRATIVE---
 60Write your complete analysis here. Cover all data gathered, interpretation, and
 61key risks or opportunities. Minimum 3 paragraphs.
 62
 63Rules:
 64- signal: BULLISH | BEARISH | NEUTRAL (your assessment of the instrument)
 65- confidence: 0.0–1.0 (how certain you are given available data; be conservative)
 66- summary: ≤ 25 words, one sentence
 67- key_factors: 2–5 short bullet strings (the decisive data points)
 68- Extra keys are allowed when your system prompt declares them — they're
 69  preserved verbatim in the AgentSignal.extras dict for downstream agents.
 70- Do NOT use markdown fences around the JSON line
 71- The separator line must be exactly: ---NARRATIVE---
 72"""
 73
 74
 75# Markers — when the configured system prompt declares its own output
 76# schema (the agnostic specialists), skip the generic instruction
 77# entirely. Match strings any of these prompts include verbatim.
 78_AGNOSTIC_PROMPT_MARKERS = (
 79    "impacted_tickers",          # event_impact_analyst
 80    "answer_kind",               # general_analyst
 81    "intent_signal",             # intent_classifier
 82)
 83
 84
 85def _needs_generic_output_instruction(system_prompt: str) -> bool:
 86    """Return True when the configured prompt does NOT declare its own
 87    output schema (i.e. we should append the generic BULLISH/BEARISH/NEUTRAL
 88    formatting instruction). Agnostic specialists declare their own
 89    extras and the generic instruction would tell the LLM to drop them.
 90    """
 91    return not any(m in system_prompt for m in _AGNOSTIC_PROMPT_MARKERS)
 92
 93
 94class SpecialistAgent(BaseAgent):
 95    """Specialist analyst that produces a structured AgentSignal + narrative.
 96
 97    Inherits the full ReAct tool-calling loop from the same pattern as
 98    AnalystAgent.  After the loop, the LLM's final response is parsed for
 99    the structured JSON block.  The narrative section is stored in
100    ``state["reports"]`` for the trail and dashboard.
101    """
102
103    def __init__(self, config: AgentConfig) -> None:
104        super().__init__(config)
105
106    async def run(self, state: ArenaState, ctx: AgentContext) -> ArenaState:
107        """Run the specialist's ReAct loop, emit AgentSignal + narrative."""
108        configured = self.get_system_prompt()
109        if _needs_generic_output_instruction(configured):
110            system_prompt = configured + _OUTPUT_FORMAT_INSTRUCTION
111        else:
112            # Agnostic specialist — the configured prompt owns its
113            # output schema, including the extras keys. Don't paste
114            # the generic "extras forbidden" instruction on top.
115            system_prompt = configured
116        target = state.get("target", {})
117
118        tool_trace: list[dict[str, Any]] = []
119        tools = await self._build_tools(ctx, tool_trace, state)
120        user_prompt = self._build_user_prompt(target, state)
121
122        # Inject past memories if available
123        if ctx.memory:
124            situation = json.dumps(target, default=str)
125            memories_text = ctx.memory.recall_as_text(situation, n=2)
126            if memories_text:
127                user_prompt += (
128                    "\n\n## Lessons from Past Similar Situations\n" + memories_text
129                )
130
131        start_time = time.time()
132
133        async def llm_chat(system: str, user: str) -> str:
134            resp = await ctx.llm.chat(system, user)
135            return resp.text
136
137        raw_output, steps = await react_loop(
138            system_prompt=system_prompt,
139            user_prompt=user_prompt,
140            tools=tools,
141            llm_chat=llm_chat,
142            max_steps=self.config.max_react_steps,
143        )
144
145        elapsed = round(time.time() - start_time, 1)
146
147        # Parse structured signal + narrative.
148        signal_data, narrative, parse_failed = _parse_output(raw_output)
149
150        # One-shot retry: if we didn't get a parseable signal, ask the LLM
151        # for *just* the JSON block. Most parse failures are reasoning-leak
152        # cases where the model spent its budget thinking and never emitted
153        # the structured block — a single targeted call usually recovers.
154        if parse_failed:
155            retry_prompt = (
156                "Your prior response did not include the required JSON signal "
157                "block. Without re-doing the analysis, output ONLY the JSON "
158                "object on one line — no markdown, no narrative, no preamble. "
159                "Required keys: signal (BULLISH|BEARISH|NEUTRAL), confidence "
160                "(0..1), summary (≤25 words), key_factors (2-5 short strings). "
161                f"Your prior response was:\n\n{raw_output[-2000:]}"
162            )
163            try:
164                retry_resp = await ctx.llm.chat_text(
165                    system_prompt=system_prompt,
166                    user_prompt=retry_prompt,
167                    max_tokens=512,
168                    temperature=0.0,
169                )
170                signal_data, _retry_narr, parse_failed = _parse_output(
171                    retry_resp, allow_empty_narrative=True,
172                )
173                # If retry parsed a signal but produced no narrative, keep
174                # the original narrative (the analysis we already paid for).
175                if not _retry_narr:
176                    pass  # narrative already populated
177            except Exception as exc:
178                logger.warning("Specialist %s: retry call failed: %s", self.name, exc)
179
180        domain = _DOMAIN_FROM_NAME.get(self.name, self.name.replace("_analyst", ""))
181
182        # Pull anything the LLM emitted that isn't a canonical AgentSignal
183        # field into ``extras``. The agnostic specialist prompts
184        # (event_impact_analyst, general_analyst, intent_classifier) ship
185        # custom payloads here that the synthesis layer reads to compose
186        # the Prognosis envelope.
187        _CANONICAL_KEYS = {
188            "signal", "confidence", "summary", "key_factors",
189            "narrative", "raw_data",
190        }
191        extras: dict[str, Any] = {
192            k: v for k, v in signal_data.items()
193            if k not in _CANONICAL_KEYS
194        }
195
196        agent_signal = AgentSignal(
197            agent=self.name,
198            domain=domain,
199            signal=signal_data.get("signal", "NEUTRAL"),
200            confidence=float(signal_data.get("confidence", 0.5)),
201            summary=signal_data.get("summary", ""),
202            key_factors=signal_data.get("key_factors", []),
203            narrative=narrative,
204            raw_data=signal_data.get("raw_data", {}),
205            parse_failed=parse_failed,
206            extras=extras,
207        )
208
209        logger.info(
210            "Specialist %s: %s (confidence=%.2f, %.1fs, %d ReAct steps)",
211            self.name,
212            agent_signal["signal"],
213            agent_signal["confidence"],
214            elapsed,
215            len(steps),
216        )
217
218        # Write to state — append to agent_signals list (safe for parallel execution)
219        signals = state.get("agent_signals") or []
220        signals.append(agent_signal)
221        state["agent_signals"] = signals
222
223        # Also write narrative to reports for trail/dashboard backward compat
224        state.setdefault("reports", {})[self.name] = narrative or raw_output
225
226        # Execution trace
227        state.setdefault("trace", {})[self.name] = {
228            "agent": self.name,
229            "role": self.role,
230            "domain": domain,
231            "signal": agent_signal["signal"],
232            "confidence": agent_signal["confidence"],
233            "react_steps": len(steps),
234            "elapsed_seconds": elapsed,
235            "sources_available": self.config.sources,
236            "tool_calls": tool_trace,
237        }
238
239        return state
240
241    async def _build_tools(
242        self,
243        ctx: AgentContext,
244        trace: list[dict[str, Any]],
245        state: ArenaState,
246    ) -> list[Tool]:
247        """Build Tool objects with monitoring pass-through to SourceRegistry.
248
249        Sources come from two pools:
250          1. ``self.config.sources`` — the primary set the YAML pins on this
251             specialist.
252          2. ``state["replan_enabled_sources"]`` — sources the ReplanAgent
253             enabled for the *current* iteration. We expose them as tools
254             so the LLM can opt-in on the re-run.
255
256        Deduped by name so a source declared in both places only gets one tool.
257        """
258        tools: list[Tool] = []
259        seen: set[str] = set()
260
261        def _add(source_name: str) -> None:
262            if source_name in seen:
263                return
264            if not ctx.sources.has(source_name):
265                return
266            seen.add(source_name)
267
268            async def _fetch(
269                _name: str = source_name,
270                _trace: list[dict[str, Any]] = trace,
271                **params: Any,
272            ) -> dict[str, Any]:
273                t0 = time.time()
274                result = await ctx.sources.fetch(
275                    _name, params or None, agent=self.name, state=state
276                )
277                elapsed = round(time.time() - t0, 2)
278                _trace.append({
279                    "tool": f"fetch_{_name}",
280                    "source": _name,
281                    "params": params or {},
282                    "elapsed_seconds": elapsed,
283                    "result_type": result.get("type", "unknown"),
284                    "result_keys": list(result.keys()),
285                    "has_error": "error" in result,
286                    "error": result.get("error", ""),
287                })
288                return result
289
290            tools.append(Tool(
291                name=f"fetch_{source_name}",
292                description=f"Fetch data from {source_name}",
293                fn=_fetch,
294            ))
295
296        for source_name in self.config.sources:
297            _add(source_name)
298        # Replan-enabled sources are shared across every specialist this
299        # iteration, so all of them get the same extra tools.
300        for source_name in state.get("replan_enabled_sources") or []:
301            _add(source_name)
302
303        return tools
304
305    def _build_user_prompt(self, target: dict[str, Any], state: ArenaState) -> str:
306        """Build the analysis request prompt with target context."""
307        parts = ["## Analysis Request"]
308        if target:
309            try:
310                from maf.arenas.trading_intelligence.instrument import (
311                    build_instrument_context,
312                )
313                parts.append(build_instrument_context(target))
314            except ImportError:
315                parts.append(f"Target: {json.dumps(target, default=str)}")
316
317        parts.append(
318            "\nGather data using your available tools, then produce your analysis "
319            "following the required output format exactly. Be specific about the "
320            "numbers and data you found."
321        )
322        return "\n".join(parts)
323
324
325# ---------------------------------------------------------------------------
326# Output parsing
327# ---------------------------------------------------------------------------
328
329
330def _parse_output(
331    text: str, *, allow_empty_narrative: bool = False,
332) -> tuple[dict[str, Any], str, bool]:
333    """Split LLM response into ``(signal_dict, narrative_text, parse_failed)``.
334
335    Tries three strategies:
336    1. Look for ``---NARRATIVE---`` separator.
337    2. Find a JSON object containing ``"signal"`` key anywhere in the text.
338    3. Fall back to NEUTRAL with full text as narrative.
339
340    ``allow_empty_narrative`` is used by the retry path where we only asked
341    for the JSON block — an empty narrative is expected, not a failure.
342    """
343    # Strategy 1: explicit separator
344    if "---NARRATIVE---" in text:
345        parts = text.split("---NARRATIVE---", 1)
346        json_part = parts[0].strip()
347        narrative = parts[1].strip() if len(parts) > 1 else text
348
349        # Remove any accidental code fences around the JSON
350        json_part = re.sub(r"^```[a-z]*\n?", "", json_part).rstrip("`").strip()
351        try:
352            data = json.loads(json_part)
353            if not isinstance(data, dict):
354                # Non-dict JSON (lists, strings, null) would crash
355                # _validate_signal — fall through to strategy 1b/2.
356                raise ValueError("not a JSON object")
357            _validate_signal(data)
358            return data, narrative, False
359        except (json.JSONDecodeError, ValueError, TypeError):
360            pass  # fall through to strategy 1b/2
361
362    # Strategy 1b: maybe the whole text is just the JSON object (the
363    # agnostic prompts ask for exactly that). Try parsing as-is before
364    # the regex scanner. This is what lets impacted_tickers/route/etc.
365    # with nested objects parse cleanly.
366    try:
367        stripped = re.sub(r"^```[a-z]*\n?", "", text.strip()).rstrip("`").strip()
368        data = json.loads(stripped)
369        if not isinstance(data, dict):
370            # Reject non-object JSON (lists, strings, numbers, null)
371            # before _validate_signal accidentally tries ``.get()`` on
372            # them. Falls through to strategy 2.
373            raise ValueError("not a JSON object")
374        _validate_signal(data)
375        # No separator — caller decides how to source narrative.
376        narrative_fallback = "" if allow_empty_narrative else text
377        return data, narrative_fallback, False
378    except (json.JSONDecodeError, ValueError, TypeError):
379        pass
380
381    # Strategy 2: scan for JSON with "signal" key — uses a balanced-bracket
382    # walk so nested objects/arrays (impacted_tickers, route, classification,
383    # etc.) parse correctly.
384    def _balanced_json_objects(s: str) -> list[str]:
385        out: list[str] = []
386        depth = 0
387        start = -1
388        in_str = False
389        esc = False
390        for i, ch in enumerate(s):
391            if in_str:
392                if esc:
393                    esc = False
394                elif ch == "\\":
395                    esc = True
396                elif ch == '"':
397                    in_str = False
398                continue
399            if ch == '"':
400                in_str = True
401                continue
402            if ch == "{":
403                if depth == 0:
404                    start = i
405                depth += 1
406            elif ch == "}":
407                depth -= 1
408                if depth == 0 and start >= 0:
409                    out.append(s[start:i + 1])
410                    start = -1
411        return out
412
413    for m in _balanced_json_objects(text):
414        if '"signal"' not in m:
415            continue
416        try:
417            data = json.loads(m)
418            if not isinstance(data, dict):
419                continue  # same non-dict guard as strategy 1b
420            _validate_signal(data)
421            narrative = text[text.find(m) + len(m):].strip()
422            if not narrative and not allow_empty_narrative:
423                narrative = text
424            return data, narrative, False
425        except (json.JSONDecodeError, ValueError, TypeError):
426            continue
427
428    # Strategy 3: fallback — flag for ReplanAgent to act on.
429    logger.warning("SpecialistAgent: could not parse structured signal from output")
430    return {
431        "signal": "NEUTRAL",
432        "confidence": 0.3,
433        "summary": "Could not parse structured output — see narrative",
434        "key_factors": ["parse_failed"],
435    }, text, True
436
437
438def _validate_signal(data: dict[str, Any]) -> None:
439    """Raise ValueError if the parsed dict is missing required fields."""
440    if data.get("signal") not in ("BULLISH", "BEARISH", "NEUTRAL"):
441        raise ValueError(f"Invalid signal: {data.get('signal')}")
442    conf = data.get("confidence")
443    if conf is None or not (0.0 <= float(conf) <= 1.0):
444        raise ValueError(f"Invalid confidence: {conf}")