checking system…
Docs / back / src/maf/agents/synthesis.py · line 54
Python · 497 lines
  1"""SynthesisAgent — confidence-weighted ensemble scoring across specialist signals.
  2
  3Implements the "informed simple scoring" approach:
  4  1. Read all AgentSignals from state["agent_signals"]
  5  2. Compute confidence-weighted ensemble score (range -1.0 to +1.0)
  6  3. Apply adaptive weight adjustment for high-VIX environments
  7  4. Make one LLM call: "given these signals and this score, what is your verdict?"
  8  5. Write BUY/HOLD/SELL + full reasoning to state
  9
 10The full evidence chain (per-agent signals, weights used, raw score, LLM reasoning)
 11is stored in state["decisions"]["synthesis"] for complete reviewability in the trail.
 12
 13Scoring formula (Dietterich 2000 ensemble, confidence-weighted):
 14  score_i   = signal_score_i * confidence_i * domain_weight_i
 15  total_w_i = confidence_i * domain_weight_i
 16  final     = Σ(score_i) / Σ(total_w_i)          range: -1.0 .. +1.0
 17  verdict   = BUY if final > 0.15, SELL if final < -0.15, else HOLD
 18"""
 19
 20from __future__ import annotations
 21
 22import json
 23import logging
 24import time
 25from typing import Any
 26
 27from maf.config import AgentConfig
 28from maf.core.agent import AgentContext, BaseAgent
 29from maf.core.state import AgentSignal, ArenaState
 30
 31logger = logging.getLogger(__name__)
 32
 33# Base domain weights — must sum to 1.0
 34BASE_WEIGHTS: dict[str, float] = {
 35    "price": 0.25,      # technical analysis
 36    "sentiment": 0.20,  # news & social sentiment
 37    "onchain": 0.20,    # market microstructure / crypto on-chain
 38    "macro": 0.20,      # macroeconomic conditions
 39    "risk": 0.15,       # quantitative risk metrics
 40}
 41
 42# Numeric encoding for signal directions
 43SIGNAL_SCORES: dict[str, float] = {
 44    "BULLISH": 1.0,
 45    "NEUTRAL": 0.0,
 46    "BEARISH": -1.0,
 47}
 48
 49# VIX thresholds for adaptive weight adjustment
 50_VIX_ELEVATED = 30.0   # risk weight increases above this
 51_VIX_EXTREME = 40.0    # risk weight doubles above this
 52
 53
 54class SynthesisAgent(BaseAgent):
 55    """Synthesize all specialist signals into a final BUY/HOLD/SELL verdict.
 56
 57    Role: judge — called in the sequential synthesis phase after all
 58    parallel specialist analysts have written their AgentSignals.
 59    """
 60
 61    def __init__(self, config: AgentConfig) -> None:
 62        super().__init__(config)
 63
 64    async def run(self, state: ArenaState, ctx: AgentContext) -> ArenaState:
 65        """Run synthesis: score → LLM reasoning → final verdict."""
 66        signals: list[AgentSignal] = state.get("agent_signals") or []
 67        target = state.get("target", {})
 68        ticker = target.get("ticker", "the instrument")
 69
 70        if not signals:
 71            logger.warning("SynthesisAgent: no agent_signals in state — defaulting to HOLD")
 72            state["signal"] = "HOLD"
 73            state["synthesis_verdict"] = "HOLD"
 74            state["synthesis_confidence"] = 0.0
 75            state["synthesis_score"] = 0.0
 76            state["synthesis_reasoning"] = "No specialist signals available."
 77            state.setdefault("decisions", {})["synthesis"] = "HOLD — no signals"
 78            return state
 79
 80        # Detect VIX from risk analyst raw data (if available)
 81        vix = _extract_vix(signals)
 82
 83        # Compute adaptive weights
 84        weights = _compute_adaptive_weights(vix)
 85
 86        # Confidence-weighted ensemble score
 87        score, weight_breakdown = _compute_score(signals, weights)
 88
 89        # Preliminary direction
 90        if score > 0.15:
 91            preliminary = "BUY"
 92        elif score < -0.15:
 93            preliminary = "SELL"
 94        else:
 95            preliminary = "HOLD"
 96
 97        # Build evidence summary for LLM
 98        evidence_lines = [
 99            f"- {s['agent'].upper()} ({s.get('domain', '?')}): "
100            f"{s['signal']} confidence={s['confidence']:.0%}{s.get('summary', '')}"
101            for s in signals
102        ]
103
104        weight_lines = [
105            f"  {domain}: base={BASE_WEIGHTS.get(domain, 0.2):.0%}"
106            + (f" → adaptive={w:.0%} (VIX={vix:.1f})" if vix and domain == "risk" else "")
107            for domain, w in weights.items()
108        ]
109
110        start_time = time.time()
111        system_prompt = self.get_system_prompt() or _DEFAULT_SYNTHESIS_PROMPT
112
113        # Detect whether the configured system prompt is an agnostic-arena
114        # synthesis (event/ad_hoc) — those prompts ship their own output
115        # schema, so we mustn't paste the trading-only ``BUY/HOLD/SELL``
116        # instructions on top. Heuristic: marker strings the agnostic
117        # prompts include verbatim. Falls back to the trading prompt
118        # when neither marker is present.
119        is_event_synthesis = "ideas_shortlist" in system_prompt and "impacted_tickers" in system_prompt
120        is_ad_hoc_synthesis = "alternatives_considered" in system_prompt and "counter_view" in system_prompt
121        is_agnostic = is_event_synthesis or is_ad_hoc_synthesis
122
123        if is_agnostic:
124            # Hand the LLM the FULL specialist payloads + the typed target
125            # so it can faithfully carry agnostic fields (impacted_tickers,
126            # answer, counter_view, etc.) into its synthesis output. The
127            # legacy 1-line evidence summary is insufficient — agnostic
128            # prompts need to read the actual structured signal contents.
129            full_target = json.dumps(state.get("target") or {}, default=str)
130            # Serialise every specialist signal IN FULL — canonical fields
131            # plus the agnostic ``extras`` dict the specialist parser pulled
132            # out of the LLM JSON (impacted_tickers, answer, counter_view,
133            # classification, route, etc.). Without exposing extras the
134            # agnostic prompts have nothing to carry forward.
135            signals_serialised = json.dumps(
136                [
137                    {k: v for k, v in s.items() if k != "elapsed_seconds"}
138                    for s in signals
139                ],
140                default=str, indent=2,
141            )
142            user_prompt = f"""## Synthesis inputs
143
144### Target (typed)
145{full_target}
146
147### Specialist agent signals (full payloads)
148```json
149{signals_serialised}
150```
151
152### Preliminary scoring (for reference; ignore if the system prompt opts out of BUY/HOLD/SELL)
153Weighted ensemble score: **{score:+.3f}**
154Preliminary direction: **{preliminary}**
155
156### Instructions
157Follow the output schema declared in the system prompt above. Use the
158signal vocabulary it specifies (BULLISH | BEARISH | NEUTRAL). Carry
159forward every payload field the upstream specialists shipped —
160`impacted_tickers`, `answer`, `counter_view`, `key_factors`,
161`ideas_shortlist`, `alternatives_considered` — into the matching
162output fields. Don't invent values for fields not in the inputs.
163"""
164        else:
165            user_prompt = f"""## Synthesis Task: {ticker}
166
167### Specialist Agent Signals
168{chr(10).join(evidence_lines)}
169
170### Scoring
171Weighted ensemble score: **{score:+.3f}** (range: -1.0 fully bearish to +1.0 fully bullish)
172Preliminary direction: **{preliminary}**
173Domain weights used:
174{chr(10).join(weight_lines)}
175{f"VIX detected: {vix:.1f} — elevated fear, risk weight increased" if vix and vix > _VIX_ELEVATED else ""}
176
177### Instructions
178Synthesize all signals into a final BUY / HOLD / SELL verdict with confidence.
179
180Rules:
181- Strong agreement across agents (all same direction) → high confidence
182- Mixed signals or score near zero → lean toward HOLD, lower confidence
183- Elevated VIX or bearish risk signal overrides bullish technicals when extreme
184- Your verdict should align with the preliminary direction unless you have a clear reason to deviate
185
186Respond with exactly this JSON on one line, then a separator, then your reasoning:
187{{"verdict":"BUY","confidence":0.72,"reasoning":"2–4 sentence explanation of the key factors."}}
188---REASONING---
189Write a full analysis paragraph explaining the synthesis decision, key risks,
190and what would change the verdict. Reference specific agent signals by name.
191"""
192
193        async def llm_chat(system: str, user: str) -> str:
194            resp = await ctx.llm.chat(system, user)
195            return resp.text
196
197        raw = await llm_chat(system_prompt, user_prompt)
198        elapsed = round(time.time() - start_time, 1)
199
200        verdict_data, reasoning_text = _parse_synthesis_output(raw, preliminary)
201
202        verdict = verdict_data.get("verdict", preliminary)
203        # Agnostic prompts can emit `synthesis_confidence` (often damped)
204        # alongside the legacy `confidence`. Prefer the explicit field
205        # when present so confidence damping is honoured. Same for
206        # `synthesis_reasoning` — agnostic prompts use it as the
207        # user-facing answer; the legacy `reasoning` field is the
208        # short JSON sibling.
209        confidence_raw = verdict_data.get("synthesis_confidence")
210        if confidence_raw is None:
211            confidence_raw = verdict_data.get("confidence", abs(score))
212        confidence = float(confidence_raw)
213        reasoning_raw = verdict_data.get("synthesis_reasoning")
214        if not reasoning_raw:
215            reasoning_raw = verdict_data.get("reasoning", reasoning_text)
216        reasoning = reasoning_raw
217
218        # Agnostic-arena fields — the event/ad_hoc synthesis prompts ship
219        # `ideas_shortlist` (or `impacted_tickers`) and
220        # `alternatives_considered` alongside the verdict. Preserve them
221        # in state so the Prognosis converter
222        # (:func:`maf.actions.prognosis.from_arena_state`) can find
223        # them. Also pass through `synthesis_verdict: null` when the
224        # arena explicitly opts out of BUY/HOLD/SELL (event/ad_hoc).
225        explicit_verdict = verdict_data.get("synthesis_verdict", "__missing__")
226        ideas_shortlist = (
227            verdict_data.get("ideas_shortlist")
228            or verdict_data.get("impacted_tickers")
229            or []
230        )
231        alternatives = verdict_data.get("alternatives_considered") or []
232
233        # Full evidence chain for trail reviewability
234        full_evidence: dict[str, Any] = {
235            "ticker": ticker,
236            "score": round(score, 4),
237            "preliminary": preliminary,
238            "verdict": verdict,
239            "confidence": round(confidence, 3),
240            "weights_used": {k: round(v, 3) for k, v in weights.items()},
241            "vix_detected": vix,
242            "weight_breakdown": weight_breakdown,
243            "per_agent": [
244                {
245                    "agent": s["agent"],
246                    "domain": s.get("domain", ""),
247                    "signal": s["signal"],
248                    "confidence": round(s["confidence"], 3),
249                    "summary": s.get("summary", ""),
250                    "key_factors": s.get("key_factors", []),
251                    "score_contribution": round(
252                        SIGNAL_SCORES.get(s["signal"], 0)
253                        * s["confidence"]
254                        * weights.get(s.get("domain", ""), 0.2),
255                        4,
256                    ),
257                }
258                for s in signals
259            ],
260            "reasoning": reasoning,
261            "full_reasoning_text": reasoning_text,
262            "elapsed_seconds": elapsed,
263        }
264
265        logger.info(
266            "Synthesis %s: %s (score=%.3f, confidence=%.2f, vix=%s, %.1fs)",
267            ticker,
268            verdict,
269            score,
270            confidence,
271            f"{vix:.1f}" if vix else "N/A",
272            elapsed,
273        )
274
275        # Write to state
276        state["signal"] = verdict
277        # Agnostic arenas can emit synthesis_verdict=null (event/ad_hoc
278        # don't issue BUY/HOLD/SELL); honour it when present, otherwise
279        # default to the verdict computed by this agent.
280        if explicit_verdict != "__missing__":
281            state["synthesis_verdict"] = explicit_verdict
282        else:
283            state["synthesis_verdict"] = verdict
284        state["synthesis_score"] = round(score, 4)
285        state["synthesis_confidence"] = round(confidence, 3)
286        state["synthesis_reasoning"] = reasoning
287        if ideas_shortlist:
288            state["ideas_shortlist"] = ideas_shortlist
289        if alternatives:
290            state["alternatives_considered"] = alternatives
291        state.setdefault("decisions", {})["synthesis"] = json.dumps(
292            full_evidence, indent=2
293        )
294        state.setdefault("reports", {})["synthesis"] = (
295            f"## Synthesis: {verdict} ({confidence:.0%} confidence)\n\n"
296            f"Score: {score:+.3f}  |  VIX: {vix or 'N/A'}\n\n"
297            f"{reasoning_text}"
298        )
299
300        return state
301
302
303# ---------------------------------------------------------------------------
304# Scoring helpers
305# ---------------------------------------------------------------------------
306
307
308def _compute_adaptive_weights(vix: float | None) -> dict[str, float]:
309    """Adjust domain weights based on market fear (VIX)."""
310    weights = dict(BASE_WEIGHTS)
311    if vix is None:
312        return weights
313
314    if vix >= _VIX_EXTREME:
315        # Double risk weight, reduce price/sentiment proportionally
316        extra = weights["risk"]  # add another 15%
317        weights["risk"] = min(weights["risk"] * 2, 0.35)
318        reduction = extra / 4  # spread reduction across other 4 domains
319        for d in ("price", "sentiment", "onchain", "macro"):
320            weights[d] = max(weights[d] - reduction, 0.05)
321    elif vix >= _VIX_ELEVATED:
322        # Increase risk weight by 50%, small reductions elsewhere
323        extra = weights["risk"] * 0.5
324        weights["risk"] += extra
325        reduction = extra / 4
326        for d in ("price", "sentiment", "onchain", "macro"):
327            weights[d] = max(weights[d] - reduction, 0.05)
328
329    # Normalize to sum=1
330    total = sum(weights.values())
331    return {k: round(v / total, 4) for k, v in weights.items()}
332
333
334def _compute_score(
335    signals: list[AgentSignal], weights: dict[str, float]
336) -> tuple[float, list[dict[str, Any]]]:
337    """Compute confidence-weighted ensemble score.
338
339    Returns (normalized_score, per-signal breakdown list).
340    """
341    weighted_sum = 0.0
342    total_weight = 0.0
343    breakdown: list[dict[str, Any]] = []
344
345    for s in signals:
346        domain = s.get("domain", s.get("agent", "").replace("_analyst", ""))
347        w = weights.get(domain, 0.2)
348        direction = SIGNAL_SCORES.get(s["signal"], 0.0)
349        confidence = s["confidence"]
350
351        contrib = direction * confidence * w
352        weighted_sum += contrib
353        total_weight += confidence * w
354
355        breakdown.append({
356            "agent": s["agent"],
357            "domain": domain,
358            "signal": s["signal"],
359            "confidence": confidence,
360            "weight": w,
361            "contribution": round(contrib, 4),
362        })
363
364    score = weighted_sum / total_weight if total_weight > 0 else 0.0
365    return round(score, 4), breakdown
366
367
368def _extract_vix(signals: list[AgentSignal]) -> float | None:
369    """Try to read VIX from the risk analyst's raw_data."""
370    for s in signals:
371        if s.get("domain") == "risk" or s.get("agent") == "risk_analyst":
372            raw = s.get("raw_data") or {}
373            vix = raw.get("vix")
374            if vix and isinstance(vix, (int, float)):
375                return float(vix)
376    return None
377
378
379# ---------------------------------------------------------------------------
380# Output parsing
381# ---------------------------------------------------------------------------
382
383
384def _parse_synthesis_output(
385    text: str, fallback_verdict: str
386) -> tuple[dict[str, Any], str]:
387    """Split synthesis LLM output into (verdict_dict, reasoning_text).
388
389    Accepts both:
390
391    * Trading vocab — ``verdict ∈ {BUY, HOLD, SELL}`` (the long-standing
392      contract used by market_pulse / trading_intelligence / etc.).
393    * Agnostic-arena vocab — ``verdict ∈ {BULLISH, BEARISH, NEUTRAL}``
394      with optional ``synthesis_verdict: null`` opt-out plus the
395      ``ideas_shortlist`` / ``impacted_tickers`` /
396      ``alternatives_considered`` passthrough payloads used by
397      event_synthesis + ad_hoc_synthesis.
398
399    Returns the whole parsed dict so callers can read any non-verdict
400    fields the prompt emitted.
401    """
402    import re
403
404    _OK_VERDICTS = {"BUY", "HOLD", "SELL", "BULLISH", "BEARISH", "NEUTRAL"}
405
406    def _normalise(data: dict[str, Any]) -> dict[str, Any]:
407        """Treat ``signal`` as an alias for ``verdict``.
408
409        Agnostic synthesis prompts (event_synthesis.md, ad_hoc_synthesis.md)
410        emit ``signal`` instead of ``verdict``. We accept both so the
411        parser doesn't reject a structurally-valid output just because
412        the key spelling differs.
413        """
414        if "verdict" not in data and "signal" in data:
415            data = {**data, "verdict": data["signal"]}
416        return data
417
418    def _balanced_json_objects(s: str) -> list[str]:
419        """Extract every balanced ``{...}`` block from ``s``.
420
421        Naive bracket counter — handles nested objects/arrays (which the
422        prior non-greedy regex couldn't). Skips brackets inside strings.
423        """
424        out: list[str] = []
425        depth = 0
426        start = -1
427        in_str = False
428        esc = False
429        for i, ch in enumerate(s):
430            if in_str:
431                if esc:
432                    esc = False
433                elif ch == "\\":
434                    esc = True
435                elif ch == '"':
436                    in_str = False
437                continue
438            if ch == '"':
439                in_str = True
440                continue
441            if ch == "{":
442                if depth == 0:
443                    start = i
444                depth += 1
445            elif ch == "}":
446                depth -= 1
447                if depth == 0 and start >= 0:
448                    out.append(s[start:i + 1])
449                    start = -1
450        return out
451
452    def _accept(data: dict[str, Any]) -> bool:
453        return data.get("verdict") in _OK_VERDICTS
454
455    # Agnostic synthesis prompts use ``---NARRATIVE---`` as the separator;
456    # the trading prompt uses ``---REASONING---``. Accept both.
457    for sep in ("---REASONING---", "---NARRATIVE---"):
458        if sep in text:
459            parts = text.split(sep, 1)
460            json_part = parts[0].strip()
461            reasoning = parts[1].strip() if len(parts) > 1 else text
462            json_part = re.sub(r"^```[a-z]*\n?", "", json_part).rstrip("`").strip()
463            try:
464                data = _normalise(json.loads(json_part))
465                if _accept(data):
466                    return data, reasoning
467            except json.JSONDecodeError:
468                pass
469            break
470
471    # Scan every balanced JSON object (nested-safe) for one with verdict/signal.
472    for m in _balanced_json_objects(text):
473        try:
474            data = _normalise(json.loads(m))
475        except json.JSONDecodeError:
476            continue
477        if _accept(data):
478            reasoning = text[text.find(m) + len(m):].strip() or text
479            return data, reasoning
480
481    return {
482        "verdict": fallback_verdict,
483        "confidence": 0.4,
484        "reasoning": "Could not parse structured verdict — defaulting to preliminary direction.",
485    }, text
486
487
488_DEFAULT_SYNTHESIS_PROMPT = """You are a senior portfolio analyst synthesizing signals from five independent specialist analysts.
489
490Your job is to:
4911. Evaluate the weight of evidence across all five signals
4922. Consider whether any single signal should override the others (e.g. extreme risk/macro)
4933. Produce a final BUY / HOLD / SELL verdict with a confidence score
4944. Explain your reasoning clearly and concisely
495
496You must follow the required output format exactly."""