1"""SynthesisAgent — confidence-weighted ensemble scoring across specialist signals. 2 3Implements the "informed simple scoring" approach: 4 1. Read all AgentSignals from state["agent_signals"] 5 2. Compute confidence-weighted ensemble score (range -1.0 to +1.0) 6 3. Apply adaptive weight adjustment for high-VIX environments 7 4. Make one LLM call: "given these signals and this score, what is your verdict?" 8 5. Write BUY/HOLD/SELL + full reasoning to state 9 10The full evidence chain (per-agent signals, weights used, raw score, LLM reasoning) 11is stored in state["decisions"]["synthesis"] for complete reviewability in the trail. 12 13Scoring formula (Dietterich 2000 ensemble, confidence-weighted): 14 score_i = signal_score_i * confidence_i * domain_weight_i 15 total_w_i = confidence_i * domain_weight_i 16 final = Σ(score_i) / Σ(total_w_i) range: -1.0 .. +1.0 17 verdict = BUY if final > 0.15, SELL if final < -0.15, else HOLD 18""" 19 20from __future__ import annotations 21 22import json 23import logging 24import time 25from typing import Any 26 27from maf.config import AgentConfig 28from maf.core.agent import AgentContext, BaseAgent 29from maf.core.state import AgentSignal, ArenaState 30 31logger = logging.getLogger(__name__) 32 33# Base domain weights — must sum to 1.0 34BASE_WEIGHTS: dict[str, float] = { 35 "price": 0.25, # technical analysis 36 "sentiment": 0.20, # news & social sentiment 37 "onchain": 0.20, # market microstructure / crypto on-chain 38 "macro": 0.20, # macroeconomic conditions 39 "risk": 0.15, # quantitative risk metrics 40} 41 42# Numeric encoding for signal directions 43SIGNAL_SCORES: dict[str, float] = { 44 "BULLISH": 1.0, 45 "NEUTRAL": 0.0, 46 "BEARISH": -1.0, 47} 48 49# VIX thresholds for adaptive weight adjustment 50_VIX_ELEVATED = 30.0 # risk weight increases above this 51_VIX_EXTREME = 40.0 # risk weight doubles above this 52 53 54class SynthesisAgent(BaseAgent): 55 """Synthesize all specialist signals into a final BUY/HOLD/SELL verdict. 56 57 Role: judge — called in the sequential synthesis phase after all 58 parallel specialist analysts have written their AgentSignals. 59 """ 60 61 def __init__(self, config: AgentConfig) -> None: 62 super().__init__(config) 63 64 async def run(self, state: ArenaState, ctx: AgentContext) -> ArenaState: 65 """Run synthesis: score → LLM reasoning → final verdict.""" 66 signals: list[AgentSignal] = state.get("agent_signals") or [] 67 target = state.get("target", {}) 68 ticker = target.get("ticker", "the instrument") 69 70 if not signals: 71 logger.warning("SynthesisAgent: no agent_signals in state — defaulting to HOLD") 72 state["signal"] = "HOLD" 73 state["synthesis_verdict"] = "HOLD" 74 state["synthesis_confidence"] = 0.0 75 state["synthesis_score"] = 0.0 76 state["synthesis_reasoning"] = "No specialist signals available." 77 state.setdefault("decisions", {})["synthesis"] = "HOLD — no signals" 78 return state 79 80 # Detect VIX from risk analyst raw data (if available) 81 vix = _extract_vix(signals) 82 83 # Compute adaptive weights 84 weights = _compute_adaptive_weights(vix) 85 86 # Confidence-weighted ensemble score 87 score, weight_breakdown = _compute_score(signals, weights) 88 89 # Preliminary direction 90 if score > 0.15: 91 preliminary = "BUY" 92 elif score < -0.15: 93 preliminary = "SELL" 94 else: 95 preliminary = "HOLD" 96 97 # Build evidence summary for LLM 98 evidence_lines = [ 99 f"- {s['agent'].upper()} ({s.get('domain', '?')}): " 100 f"{s['signal']} confidence={s['confidence']:.0%} — {s.get('summary', '')}" 101 for s in signals 102 ] 103 104 weight_lines = [ 105 f" {domain}: base={BASE_WEIGHTS.get(domain, 0.2):.0%}" 106 + (f" → adaptive={w:.0%} (VIX={vix:.1f})" if vix and domain == "risk" else "") 107 for domain, w in weights.items() 108 ] 109 110 start_time = time.time() 111 system_prompt = self.get_system_prompt() or _DEFAULT_SYNTHESIS_PROMPT 112 113 # Detect whether the configured system prompt is an agnostic-arena 114 # synthesis (event/ad_hoc) — those prompts ship their own output 115 # schema, so we mustn't paste the trading-only ``BUY/HOLD/SELL`` 116 # instructions on top. Heuristic: marker strings the agnostic 117 # prompts include verbatim. Falls back to the trading prompt 118 # when neither marker is present. 119 is_event_synthesis = "ideas_shortlist" in system_prompt and "impacted_tickers" in system_prompt 120 is_ad_hoc_synthesis = "alternatives_considered" in system_prompt and "counter_view" in system_prompt 121 is_agnostic = is_event_synthesis or is_ad_hoc_synthesis 122 123 if is_agnostic: 124 # Hand the LLM the FULL specialist payloads + the typed target 125 # so it can faithfully carry agnostic fields (impacted_tickers, 126 # answer, counter_view, etc.) into its synthesis output. The 127 # legacy 1-line evidence summary is insufficient — agnostic 128 # prompts need to read the actual structured signal contents. 129 full_target = json.dumps(state.get("target") or {}, default=str) 130 # Serialise every specialist signal IN FULL — canonical fields 131 # plus the agnostic ``extras`` dict the specialist parser pulled 132 # out of the LLM JSON (impacted_tickers, answer, counter_view, 133 # classification, route, etc.). Without exposing extras the 134 # agnostic prompts have nothing to carry forward. 135 signals_serialised = json.dumps( 136 [ 137 {k: v for k, v in s.items() if k != "elapsed_seconds"} 138 for s in signals 139 ], 140 default=str, indent=2, 141 ) 142 user_prompt = f"""## Synthesis inputs 143 144### Target (typed) 145{full_target} 146 147### Specialist agent signals (full payloads) 148```json 149{signals_serialised} 150``` 151 152### Preliminary scoring (for reference; ignore if the system prompt opts out of BUY/HOLD/SELL) 153Weighted ensemble score: **{score:+.3f}** 154Preliminary direction: **{preliminary}** 155 156### Instructions 157Follow the output schema declared in the system prompt above. Use the 158signal vocabulary it specifies (BULLISH | BEARISH | NEUTRAL). Carry 159forward every payload field the upstream specialists shipped — 160`impacted_tickers`, `answer`, `counter_view`, `key_factors`, 161`ideas_shortlist`, `alternatives_considered` — into the matching 162output fields. Don't invent values for fields not in the inputs. 163""" 164 else: 165 user_prompt = f"""## Synthesis Task: {ticker} 166 167### Specialist Agent Signals 168{chr(10).join(evidence_lines)} 169 170### Scoring 171Weighted ensemble score: **{score:+.3f}** (range: -1.0 fully bearish to +1.0 fully bullish) 172Preliminary direction: **{preliminary}** 173Domain weights used: 174{chr(10).join(weight_lines)} 175{f"VIX detected: {vix:.1f} — elevated fear, risk weight increased" if vix and vix > _VIX_ELEVATED else ""} 176 177### Instructions 178Synthesize all signals into a final BUY / HOLD / SELL verdict with confidence. 179 180Rules: 181- Strong agreement across agents (all same direction) → high confidence 182- Mixed signals or score near zero → lean toward HOLD, lower confidence 183- Elevated VIX or bearish risk signal overrides bullish technicals when extreme 184- Your verdict should align with the preliminary direction unless you have a clear reason to deviate 185 186Respond with exactly this JSON on one line, then a separator, then your reasoning: 187{{"verdict":"BUY","confidence":0.72,"reasoning":"2–4 sentence explanation of the key factors."}} 188---REASONING--- 189Write a full analysis paragraph explaining the synthesis decision, key risks, 190and what would change the verdict. Reference specific agent signals by name. 191""" 192 193 async def llm_chat(system: str, user: str) -> str: 194 resp = await ctx.llm.chat(system, user) 195 return resp.text 196 197 raw = await llm_chat(system_prompt, user_prompt) 198 elapsed = round(time.time() - start_time, 1) 199 200 verdict_data, reasoning_text = _parse_synthesis_output(raw, preliminary) 201 202 verdict = verdict_data.get("verdict", preliminary) 203 # Agnostic prompts can emit `synthesis_confidence` (often damped) 204 # alongside the legacy `confidence`. Prefer the explicit field 205 # when present so confidence damping is honoured. Same for 206 # `synthesis_reasoning` — agnostic prompts use it as the 207 # user-facing answer; the legacy `reasoning` field is the 208 # short JSON sibling. 209 confidence_raw = verdict_data.get("synthesis_confidence") 210 if confidence_raw is None: 211 confidence_raw = verdict_data.get("confidence", abs(score)) 212 confidence = float(confidence_raw) 213 reasoning_raw = verdict_data.get("synthesis_reasoning") 214 if not reasoning_raw: 215 reasoning_raw = verdict_data.get("reasoning", reasoning_text) 216 reasoning = reasoning_raw 217 218 # Agnostic-arena fields — the event/ad_hoc synthesis prompts ship 219 # `ideas_shortlist` (or `impacted_tickers`) and 220 # `alternatives_considered` alongside the verdict. Preserve them 221 # in state so the Prognosis converter 222 # (:func:`maf.actions.prognosis.from_arena_state`) can find 223 # them. Also pass through `synthesis_verdict: null` when the 224 # arena explicitly opts out of BUY/HOLD/SELL (event/ad_hoc). 225 explicit_verdict = verdict_data.get("synthesis_verdict", "__missing__") 226 ideas_shortlist = ( 227 verdict_data.get("ideas_shortlist") 228 or verdict_data.get("impacted_tickers") 229 or [] 230 ) 231 alternatives = verdict_data.get("alternatives_considered") or [] 232 233 # Full evidence chain for trail reviewability 234 full_evidence: dict[str, Any] = { 235 "ticker": ticker, 236 "score": round(score, 4), 237 "preliminary": preliminary, 238 "verdict": verdict, 239 "confidence": round(confidence, 3), 240 "weights_used": {k: round(v, 3) for k, v in weights.items()}, 241 "vix_detected": vix, 242 "weight_breakdown": weight_breakdown, 243 "per_agent": [ 244 { 245 "agent": s["agent"], 246 "domain": s.get("domain", ""), 247 "signal": s["signal"], 248 "confidence": round(s["confidence"], 3), 249 "summary": s.get("summary", ""), 250 "key_factors": s.get("key_factors", []), 251 "score_contribution": round( 252 SIGNAL_SCORES.get(s["signal"], 0) 253 * s["confidence"] 254 * weights.get(s.get("domain", ""), 0.2), 255 4, 256 ), 257 } 258 for s in signals 259 ], 260 "reasoning": reasoning, 261 "full_reasoning_text": reasoning_text, 262 "elapsed_seconds": elapsed, 263 } 264 265 logger.info( 266 "Synthesis %s: %s (score=%.3f, confidence=%.2f, vix=%s, %.1fs)", 267 ticker, 268 verdict, 269 score, 270 confidence, 271 f"{vix:.1f}" if vix else "N/A", 272 elapsed, 273 ) 274 275 # Write to state 276 state["signal"] = verdict 277 # Agnostic arenas can emit synthesis_verdict=null (event/ad_hoc 278 # don't issue BUY/HOLD/SELL); honour it when present, otherwise 279 # default to the verdict computed by this agent. 280 if explicit_verdict != "__missing__": 281 state["synthesis_verdict"] = explicit_verdict 282 else: 283 state["synthesis_verdict"] = verdict 284 state["synthesis_score"] = round(score, 4) 285 state["synthesis_confidence"] = round(confidence, 3) 286 state["synthesis_reasoning"] = reasoning 287 if ideas_shortlist: 288 state["ideas_shortlist"] = ideas_shortlist 289 if alternatives: 290 state["alternatives_considered"] = alternatives 291 state.setdefault("decisions", {})["synthesis"] = json.dumps( 292 full_evidence, indent=2 293 ) 294 state.setdefault("reports", {})["synthesis"] = ( 295 f"## Synthesis: {verdict} ({confidence:.0%} confidence)\n\n" 296 f"Score: {score:+.3f} | VIX: {vix or 'N/A'}\n\n" 297 f"{reasoning_text}" 298 ) 299 300 return state 301 302 303# --------------------------------------------------------------------------- 304# Scoring helpers 305# --------------------------------------------------------------------------- 306 307 308def _compute_adaptive_weights(vix: float | None) -> dict[str, float]: 309 """Adjust domain weights based on market fear (VIX).""" 310 weights = dict(BASE_WEIGHTS) 311 if vix is None: 312 return weights 313 314 if vix >= _VIX_EXTREME: 315 # Double risk weight, reduce price/sentiment proportionally 316 extra = weights["risk"] # add another 15% 317 weights["risk"] = min(weights["risk"] * 2, 0.35) 318 reduction = extra / 4 # spread reduction across other 4 domains 319 for d in ("price", "sentiment", "onchain", "macro"): 320 weights[d] = max(weights[d] - reduction, 0.05) 321 elif vix >= _VIX_ELEVATED: 322 # Increase risk weight by 50%, small reductions elsewhere 323 extra = weights["risk"] * 0.5 324 weights["risk"] += extra 325 reduction = extra / 4 326 for d in ("price", "sentiment", "onchain", "macro"): 327 weights[d] = max(weights[d] - reduction, 0.05) 328 329 # Normalize to sum=1 330 total = sum(weights.values()) 331 return {k: round(v / total, 4) for k, v in weights.items()} 332 333 334def _compute_score( 335 signals: list[AgentSignal], weights: dict[str, float] 336) -> tuple[float, list[dict[str, Any]]]: 337 """Compute confidence-weighted ensemble score. 338 339 Returns (normalized_score, per-signal breakdown list). 340 """ 341 weighted_sum = 0.0 342 total_weight = 0.0 343 breakdown: list[dict[str, Any]] = [] 344 345 for s in signals: 346 domain = s.get("domain", s.get("agent", "").replace("_analyst", "")) 347 w = weights.get(domain, 0.2) 348 direction = SIGNAL_SCORES.get(s["signal"], 0.0) 349 confidence = s["confidence"] 350 351 contrib = direction * confidence * w 352 weighted_sum += contrib 353 total_weight += confidence * w 354 355 breakdown.append({ 356 "agent": s["agent"], 357 "domain": domain, 358 "signal": s["signal"], 359 "confidence": confidence, 360 "weight": w, 361 "contribution": round(contrib, 4), 362 }) 363 364 score = weighted_sum / total_weight if total_weight > 0 else 0.0 365 return round(score, 4), breakdown 366 367 368def _extract_vix(signals: list[AgentSignal]) -> float | None: 369 """Try to read VIX from the risk analyst's raw_data.""" 370 for s in signals: 371 if s.get("domain") == "risk" or s.get("agent") == "risk_analyst": 372 raw = s.get("raw_data") or {} 373 vix = raw.get("vix") 374 if vix and isinstance(vix, (int, float)): 375 return float(vix) 376 return None 377 378 379# --------------------------------------------------------------------------- 380# Output parsing 381# --------------------------------------------------------------------------- 382 383 384def _parse_synthesis_output( 385 text: str, fallback_verdict: str 386) -> tuple[dict[str, Any], str]: 387 """Split synthesis LLM output into (verdict_dict, reasoning_text). 388 389 Accepts both: 390 391 * Trading vocab — ``verdict ∈ {BUY, HOLD, SELL}`` (the long-standing 392 contract used by market_pulse / trading_intelligence / etc.). 393 * Agnostic-arena vocab — ``verdict ∈ {BULLISH, BEARISH, NEUTRAL}`` 394 with optional ``synthesis_verdict: null`` opt-out plus the 395 ``ideas_shortlist`` / ``impacted_tickers`` / 396 ``alternatives_considered`` passthrough payloads used by 397 event_synthesis + ad_hoc_synthesis. 398 399 Returns the whole parsed dict so callers can read any non-verdict 400 fields the prompt emitted. 401 """ 402 import re 403 404 _OK_VERDICTS = {"BUY", "HOLD", "SELL", "BULLISH", "BEARISH", "NEUTRAL"} 405 406 def _normalise(data: dict[str, Any]) -> dict[str, Any]: 407 """Treat ``signal`` as an alias for ``verdict``. 408 409 Agnostic synthesis prompts (event_synthesis.md, ad_hoc_synthesis.md) 410 emit ``signal`` instead of ``verdict``. We accept both so the 411 parser doesn't reject a structurally-valid output just because 412 the key spelling differs. 413 """ 414 if "verdict" not in data and "signal" in data: 415 data = {**data, "verdict": data["signal"]} 416 return data 417 418 def _balanced_json_objects(s: str) -> list[str]: 419 """Extract every balanced ``{...}`` block from ``s``. 420 421 Naive bracket counter — handles nested objects/arrays (which the 422 prior non-greedy regex couldn't). Skips brackets inside strings. 423 """ 424 out: list[str] = [] 425 depth = 0 426 start = -1 427 in_str = False 428 esc = False 429 for i, ch in enumerate(s): 430 if in_str: 431 if esc: 432 esc = False 433 elif ch == "\\": 434 esc = True 435 elif ch == '"': 436 in_str = False 437 continue 438 if ch == '"': 439 in_str = True 440 continue 441 if ch == "{": 442 if depth == 0: 443 start = i 444 depth += 1 445 elif ch == "}": 446 depth -= 1 447 if depth == 0 and start >= 0: 448 out.append(s[start:i + 1]) 449 start = -1 450 return out 451 452 def _accept(data: dict[str, Any]) -> bool: 453 return data.get("verdict") in _OK_VERDICTS 454 455 # Agnostic synthesis prompts use ``---NARRATIVE---`` as the separator; 456 # the trading prompt uses ``---REASONING---``. Accept both. 457 for sep in ("---REASONING---", "---NARRATIVE---"): 458 if sep in text: 459 parts = text.split(sep, 1) 460 json_part = parts[0].strip() 461 reasoning = parts[1].strip() if len(parts) > 1 else text 462 json_part = re.sub(r"^```[a-z]*\n?", "", json_part).rstrip("`").strip() 463 try: 464 data = _normalise(json.loads(json_part)) 465 if _accept(data): 466 return data, reasoning 467 except json.JSONDecodeError: 468 pass 469 break 470 471 # Scan every balanced JSON object (nested-safe) for one with verdict/signal. 472 for m in _balanced_json_objects(text): 473 try: 474 data = _normalise(json.loads(m)) 475 except json.JSONDecodeError: 476 continue 477 if _accept(data): 478 reasoning = text[text.find(m) + len(m):].strip() or text 479 return data, reasoning 480 481 return { 482 "verdict": fallback_verdict, 483 "confidence": 0.4, 484 "reasoning": "Could not parse structured verdict — defaulting to preliminary direction.", 485 }, text 486 487 488_DEFAULT_SYNTHESIS_PROMPT = """You are a senior portfolio analyst synthesizing signals from five independent specialist analysts. 489 490Your job is to: 4911. Evaluate the weight of evidence across all five signals 4922. Consider whether any single signal should override the others (e.g. extreme risk/macro) 4933. Produce a final BUY / HOLD / SELL verdict with a confidence score 4944. Explain your reasoning clearly and concisely 495 496You must follow the required output format exactly."""