1"""SpecialistAgent — data-gathering agent that emits a structured AgentSignal. 2 3Like AnalystAgent but with a two-part output: 4 1. Structured signal block (BULLISH/BEARISH/NEUTRAL + confidence + key_factors) 5 2. Full narrative section (stored in state["reports"][agent_name]) 6 7The signal block is parsed from a JSON section in the LLM response and 8accumulated into state["agent_signals"] for the synthesis phase. 9 10Prompt contract 11--------------- 12The system prompt for each specialist must end with the following instruction 13(the agent class injects it automatically so individual prompts stay clean): 14 15 OUTPUT FORMAT 16 After your analysis, respond with exactly this JSON block (no markdown fence), 17 then a separator line, then your full narrative: 18 19 {"signal":"BULLISH","confidence":0.75,"summary":"…≤25 words…","key_factors":["…","…"]} 20 ---NARRATIVE--- 21 <your full analysis here> 22 23The agent class parses the JSON, falls back to NEUTRAL if parsing fails. 24""" 25 26from __future__ import annotations 27 28import json 29import logging 30import re 31import time 32from typing import Any 33 34from maf.config import AgentConfig 35from maf.core.agent import AgentContext, BaseAgent 36from maf.core.react import Tool, react_loop 37from maf.core.state import AgentSignal, ArenaState 38 39logger = logging.getLogger(__name__) 40 41# Domain names derived from agent name (price_analyst → price, etc.) 42_DOMAIN_FROM_NAME: dict[str, str] = { 43 "price_analyst": "price", 44 "sentiment_analyst": "sentiment", 45 "onchain_analyst": "onchain", 46 "macro_analyst": "macro", 47 "risk_analyst": "risk", 48} 49 50_OUTPUT_FORMAT_INSTRUCTION = """ 51 52## Required Output Format 53 54After completing your analysis, respond with exactly this JSON object on a single line 55(no markdown fences), followed immediately by the separator, then your full 56narrative analysis: 57 58{"signal":"BULLISH","confidence":0.70,"summary":"One sentence under 25 words.","key_factors":["factor 1","factor 2","factor 3"]} 59---NARRATIVE--- 60Write your complete analysis here. Cover all data gathered, interpretation, and 61key risks or opportunities. Minimum 3 paragraphs. 62 63Rules: 64- signal: BULLISH | BEARISH | NEUTRAL (your assessment of the instrument) 65- confidence: 0.0–1.0 (how certain you are given available data; be conservative) 66- summary: ≤ 25 words, one sentence 67- key_factors: 2–5 short bullet strings (the decisive data points) 68- Extra keys are allowed when your system prompt declares them — they're 69 preserved verbatim in the AgentSignal.extras dict for downstream agents. 70- Do NOT use markdown fences around the JSON line 71- The separator line must be exactly: ---NARRATIVE--- 72""" 73 74 75# Markers — when the configured system prompt declares its own output 76# schema (the agnostic specialists), skip the generic instruction 77# entirely. Match strings any of these prompts include verbatim. 78_AGNOSTIC_PROMPT_MARKERS = ( 79 "impacted_tickers", # event_impact_analyst 80 "answer_kind", # general_analyst 81 "intent_signal", # intent_classifier 82) 83 84 85def _needs_generic_output_instruction(system_prompt: str) -> bool: 86 """Return True when the configured prompt does NOT declare its own 87 output schema (i.e. we should append the generic BULLISH/BEARISH/NEUTRAL 88 formatting instruction). Agnostic specialists declare their own 89 extras and the generic instruction would tell the LLM to drop them. 90 """ 91 return not any(m in system_prompt for m in _AGNOSTIC_PROMPT_MARKERS) 92 93 94class SpecialistAgent(BaseAgent): 95 """Specialist analyst that produces a structured AgentSignal + narrative. 96 97 Inherits the full ReAct tool-calling loop from the same pattern as 98 AnalystAgent. After the loop, the LLM's final response is parsed for 99 the structured JSON block. The narrative section is stored in 100 ``state["reports"]`` for the trail and dashboard. 101 """ 102 103 def __init__(self, config: AgentConfig) -> None: 104 super().__init__(config) 105 106 async def run(self, state: ArenaState, ctx: AgentContext) -> ArenaState: 107 """Run the specialist's ReAct loop, emit AgentSignal + narrative.""" 108 configured = self.get_system_prompt() 109 if _needs_generic_output_instruction(configured): 110 system_prompt = configured + _OUTPUT_FORMAT_INSTRUCTION 111 else: 112 # Agnostic specialist — the configured prompt owns its 113 # output schema, including the extras keys. Don't paste 114 # the generic "extras forbidden" instruction on top. 115 system_prompt = configured 116 target = state.get("target", {}) 117 118 tool_trace: list[dict[str, Any]] = [] 119 tools = await self._build_tools(ctx, tool_trace, state) 120 user_prompt = self._build_user_prompt(target, state) 121 122 # Inject past memories if available 123 if ctx.memory: 124 situation = json.dumps(target, default=str) 125 memories_text = ctx.memory.recall_as_text(situation, n=2) 126 if memories_text: 127 user_prompt += ( 128 "\n\n## Lessons from Past Similar Situations\n" + memories_text 129 ) 130 131 start_time = time.time() 132 133 async def llm_chat(system: str, user: str) -> str: 134 resp = await ctx.llm.chat(system, user) 135 return resp.text 136 137 raw_output, steps = await react_loop( 138 system_prompt=system_prompt, 139 user_prompt=user_prompt, 140 tools=tools, 141 llm_chat=llm_chat, 142 max_steps=self.config.max_react_steps, 143 ) 144 145 elapsed = round(time.time() - start_time, 1) 146 147 # Parse structured signal + narrative. 148 signal_data, narrative, parse_failed = _parse_output(raw_output) 149 150 # One-shot retry: if we didn't get a parseable signal, ask the LLM 151 # for *just* the JSON block. Most parse failures are reasoning-leak 152 # cases where the model spent its budget thinking and never emitted 153 # the structured block — a single targeted call usually recovers. 154 if parse_failed: 155 retry_prompt = ( 156 "Your prior response did not include the required JSON signal " 157 "block. Without re-doing the analysis, output ONLY the JSON " 158 "object on one line — no markdown, no narrative, no preamble. " 159 "Required keys: signal (BULLISH|BEARISH|NEUTRAL), confidence " 160 "(0..1), summary (≤25 words), key_factors (2-5 short strings). " 161 f"Your prior response was:\n\n{raw_output[-2000:]}" 162 ) 163 try: 164 retry_resp = await ctx.llm.chat_text( 165 system_prompt=system_prompt, 166 user_prompt=retry_prompt, 167 max_tokens=512, 168 temperature=0.0, 169 ) 170 signal_data, _retry_narr, parse_failed = _parse_output( 171 retry_resp, allow_empty_narrative=True, 172 ) 173 # If retry parsed a signal but produced no narrative, keep 174 # the original narrative (the analysis we already paid for). 175 if not _retry_narr: 176 pass # narrative already populated 177 except Exception as exc: 178 logger.warning("Specialist %s: retry call failed: %s", self.name, exc) 179 180 domain = _DOMAIN_FROM_NAME.get(self.name, self.name.replace("_analyst", "")) 181 182 # Pull anything the LLM emitted that isn't a canonical AgentSignal 183 # field into ``extras``. The agnostic specialist prompts 184 # (event_impact_analyst, general_analyst, intent_classifier) ship 185 # custom payloads here that the synthesis layer reads to compose 186 # the Prognosis envelope. 187 _CANONICAL_KEYS = { 188 "signal", "confidence", "summary", "key_factors", 189 "narrative", "raw_data", 190 } 191 extras: dict[str, Any] = { 192 k: v for k, v in signal_data.items() 193 if k not in _CANONICAL_KEYS 194 } 195 196 agent_signal = AgentSignal( 197 agent=self.name, 198 domain=domain, 199 signal=signal_data.get("signal", "NEUTRAL"), 200 confidence=float(signal_data.get("confidence", 0.5)), 201 summary=signal_data.get("summary", ""), 202 key_factors=signal_data.get("key_factors", []), 203 narrative=narrative, 204 raw_data=signal_data.get("raw_data", {}), 205 parse_failed=parse_failed, 206 extras=extras, 207 ) 208 209 logger.info( 210 "Specialist %s: %s (confidence=%.2f, %.1fs, %d ReAct steps)", 211 self.name, 212 agent_signal["signal"], 213 agent_signal["confidence"], 214 elapsed, 215 len(steps), 216 ) 217 218 # Write to state — append to agent_signals list (safe for parallel execution) 219 signals = state.get("agent_signals") or [] 220 signals.append(agent_signal) 221 state["agent_signals"] = signals 222 223 # Also write narrative to reports for trail/dashboard backward compat 224 state.setdefault("reports", {})[self.name] = narrative or raw_output 225 226 # Execution trace 227 state.setdefault("trace", {})[self.name] = { 228 "agent": self.name, 229 "role": self.role, 230 "domain": domain, 231 "signal": agent_signal["signal"], 232 "confidence": agent_signal["confidence"], 233 "react_steps": len(steps), 234 "elapsed_seconds": elapsed, 235 "sources_available": self.config.sources, 236 "tool_calls": tool_trace, 237 } 238 239 return state 240 241 async def _build_tools( 242 self, 243 ctx: AgentContext, 244 trace: list[dict[str, Any]], 245 state: ArenaState, 246 ) -> list[Tool]: 247 """Build Tool objects with monitoring pass-through to SourceRegistry. 248 249 Sources come from two pools: 250 1. ``self.config.sources`` — the primary set the YAML pins on this 251 specialist. 252 2. ``state["replan_enabled_sources"]`` — sources the ReplanAgent 253 enabled for the *current* iteration. We expose them as tools 254 so the LLM can opt-in on the re-run. 255 256 Deduped by name so a source declared in both places only gets one tool. 257 """ 258 tools: list[Tool] = [] 259 seen: set[str] = set() 260 261 def _add(source_name: str) -> None: 262 if source_name in seen: 263 return 264 if not ctx.sources.has(source_name): 265 return 266 seen.add(source_name) 267 268 async def _fetch( 269 _name: str = source_name, 270 _trace: list[dict[str, Any]] = trace, 271 **params: Any, 272 ) -> dict[str, Any]: 273 t0 = time.time() 274 result = await ctx.sources.fetch( 275 _name, params or None, agent=self.name, state=state 276 ) 277 elapsed = round(time.time() - t0, 2) 278 _trace.append({ 279 "tool": f"fetch_{_name}", 280 "source": _name, 281 "params": params or {}, 282 "elapsed_seconds": elapsed, 283 "result_type": result.get("type", "unknown"), 284 "result_keys": list(result.keys()), 285 "has_error": "error" in result, 286 "error": result.get("error", ""), 287 }) 288 return result 289 290 tools.append(Tool( 291 name=f"fetch_{source_name}", 292 description=f"Fetch data from {source_name}", 293 fn=_fetch, 294 )) 295 296 for source_name in self.config.sources: 297 _add(source_name) 298 # Replan-enabled sources are shared across every specialist this 299 # iteration, so all of them get the same extra tools. 300 for source_name in state.get("replan_enabled_sources") or []: 301 _add(source_name) 302 303 return tools 304 305 def _build_user_prompt(self, target: dict[str, Any], state: ArenaState) -> str: 306 """Build the analysis request prompt with target context.""" 307 parts = ["## Analysis Request"] 308 if target: 309 try: 310 from maf.arenas.trading_intelligence.instrument import ( 311 build_instrument_context, 312 ) 313 parts.append(build_instrument_context(target)) 314 except ImportError: 315 parts.append(f"Target: {json.dumps(target, default=str)}") 316 317 parts.append( 318 "\nGather data using your available tools, then produce your analysis " 319 "following the required output format exactly. Be specific about the " 320 "numbers and data you found." 321 ) 322 return "\n".join(parts) 323 324 325# --------------------------------------------------------------------------- 326# Output parsing 327# --------------------------------------------------------------------------- 328 329 330def _parse_output( 331 text: str, *, allow_empty_narrative: bool = False, 332) -> tuple[dict[str, Any], str, bool]: 333 """Split LLM response into ``(signal_dict, narrative_text, parse_failed)``. 334 335 Tries three strategies: 336 1. Look for ``---NARRATIVE---`` separator. 337 2. Find a JSON object containing ``"signal"`` key anywhere in the text. 338 3. Fall back to NEUTRAL with full text as narrative. 339 340 ``allow_empty_narrative`` is used by the retry path where we only asked 341 for the JSON block — an empty narrative is expected, not a failure. 342 """ 343 # Strategy 1: explicit separator 344 if "---NARRATIVE---" in text: 345 parts = text.split("---NARRATIVE---", 1) 346 json_part = parts[0].strip() 347 narrative = parts[1].strip() if len(parts) > 1 else text 348 349 # Remove any accidental code fences around the JSON 350 json_part = re.sub(r"^```[a-z]*\n?", "", json_part).rstrip("`").strip() 351 try: 352 data = json.loads(json_part) 353 if not isinstance(data, dict): 354 # Non-dict JSON (lists, strings, null) would crash 355 # _validate_signal — fall through to strategy 1b/2. 356 raise ValueError("not a JSON object") 357 _validate_signal(data) 358 return data, narrative, False 359 except (json.JSONDecodeError, ValueError, TypeError): 360 pass # fall through to strategy 1b/2 361 362 # Strategy 1b: maybe the whole text is just the JSON object (the 363 # agnostic prompts ask for exactly that). Try parsing as-is before 364 # the regex scanner. This is what lets impacted_tickers/route/etc. 365 # with nested objects parse cleanly. 366 try: 367 stripped = re.sub(r"^```[a-z]*\n?", "", text.strip()).rstrip("`").strip() 368 data = json.loads(stripped) 369 if not isinstance(data, dict): 370 # Reject non-object JSON (lists, strings, numbers, null) 371 # before _validate_signal accidentally tries ``.get()`` on 372 # them. Falls through to strategy 2. 373 raise ValueError("not a JSON object") 374 _validate_signal(data) 375 # No separator — caller decides how to source narrative. 376 narrative_fallback = "" if allow_empty_narrative else text 377 return data, narrative_fallback, False 378 except (json.JSONDecodeError, ValueError, TypeError): 379 pass 380 381 # Strategy 2: scan for JSON with "signal" key — uses a balanced-bracket 382 # walk so nested objects/arrays (impacted_tickers, route, classification, 383 # etc.) parse correctly. 384 def _balanced_json_objects(s: str) -> list[str]: 385 out: list[str] = [] 386 depth = 0 387 start = -1 388 in_str = False 389 esc = False 390 for i, ch in enumerate(s): 391 if in_str: 392 if esc: 393 esc = False 394 elif ch == "\\": 395 esc = True 396 elif ch == '"': 397 in_str = False 398 continue 399 if ch == '"': 400 in_str = True 401 continue 402 if ch == "{": 403 if depth == 0: 404 start = i 405 depth += 1 406 elif ch == "}": 407 depth -= 1 408 if depth == 0 and start >= 0: 409 out.append(s[start:i + 1]) 410 start = -1 411 return out 412 413 for m in _balanced_json_objects(text): 414 if '"signal"' not in m: 415 continue 416 try: 417 data = json.loads(m) 418 if not isinstance(data, dict): 419 continue # same non-dict guard as strategy 1b 420 _validate_signal(data) 421 narrative = text[text.find(m) + len(m):].strip() 422 if not narrative and not allow_empty_narrative: 423 narrative = text 424 return data, narrative, False 425 except (json.JSONDecodeError, ValueError, TypeError): 426 continue 427 428 # Strategy 3: fallback — flag for ReplanAgent to act on. 429 logger.warning("SpecialistAgent: could not parse structured signal from output") 430 return { 431 "signal": "NEUTRAL", 432 "confidence": 0.3, 433 "summary": "Could not parse structured output — see narrative", 434 "key_factors": ["parse_failed"], 435 }, text, True 436 437 438def _validate_signal(data: dict[str, Any]) -> None: 439 """Raise ValueError if the parsed dict is missing required fields.""" 440 if data.get("signal") not in ("BULLISH", "BEARISH", "NEUTRAL"): 441 raise ValueError(f"Invalid signal: {data.get('signal')}") 442 conf = data.get("confidence") 443 if conf is None or not (0.0 <= float(conf) <= 1.0): 444 raise ValueError(f"Invalid confidence: {conf}")