checking system…
Docs / back / src/maf/agents/replan.py · line 102
Python · 436 lines
  1"""ReplanAgent — decides whether the arena should fetch more data and re-run.
  2
  3Where this fits
  4---------------
  5A specialist + synthesis pipeline produces a verdict + confidence. Sometimes
  6the confidence is low because the *picture is mixed* (real disagreement —
  7nothing more data can fix) and sometimes because the *picture is incomplete*
  8(a key source was empty / a regime is ambiguous). ReplanAgent looks at the
  9post-synthesis state and decides which of those two cases applies.
 10
 11If "incomplete", it picks 1–3 additional source bindings from a pool the
 12arena config declares as ``available_sources_for_replan`` and signals the
 13phase graph to loop back to the analysis phase. On the next iteration the
 14specialists see the extra data. The arena caps total iterations via
 15``ArenaConfig.max_iterations`` so a runaway can't loop forever.
 16
 17If "mixed", or the iteration cap was hit, it commits to the existing verdict
 18and lets the graph transition to ``emit``.
 19
 20Output written to state
 21-----------------------
 22* ``state["replan"]`` — diagnostic dict ({should_replan, reason,
 23  additional_sources, iteration, max_iterations})
 24* ``state["_next_phase"]`` — only set when looping back. The :class:`PhaseGraph`
 25  reads this and overrides the configured transition for this hop.
 26* ``state["iteration"]`` — incremented when looping back.
 27
 28Decision algorithm
 29------------------
 301. If ``iteration >= max_iterations - 1`` → no replan (cap hit).
 312. Detect explicit data gaps:
 32   * a specialist failed (fewer signals than the analysis-phase agent count); or
 33   * any signal flagged a ``no_<...>_data`` key_factor.
 34   In either case, force replan (still bounded by the iteration cap).
 353. If ``synthesis_confidence >= confidence_floor`` → no replan.
 364. Otherwise: ask the LLM (with a tight JSON-only prompt) to pick from
 37   ``available`` the sources most likely to close the data gap, *and*
 38   to write a one-line reason. If the LLM fails to produce parseable
 39   JSON, fall back to a deterministic heuristic that bumps every
 40   available source whose name isn't already in the active binding list.
 41
 42This last fallback matters: an LLM hiccup on the replan call shouldn't
 43permanently kill the loop — it should still iterate at least once.
 44"""
 45
 46from __future__ import annotations
 47
 48import json
 49import logging
 50from typing import Any
 51
 52from maf.config import AgentConfig
 53from maf.core.agent import AgentContext, BaseAgent
 54from maf.core.state import ArenaState
 55from maf.llm.json_parsing import parse_json_lenient
 56from maf.streaming import get_event_bus
 57
 58logger = logging.getLogger(__name__)
 59
 60
 61# Default thresholds. Arena config can override via agent ``extra`` block:
 62#   extra:
 63#     confidence_floor: 0.55
 64#     loopback_phase: "analysis"
 65#     available_sources: ["news_macro", "fomo2_knowledge", ...]
 66DEFAULT_CONFIDENCE_FLOOR = 0.55
 67DEFAULT_LOOPBACK_PHASE = "analysis"
 68
 69
 70_REPLAN_SYSTEM_PROMPT = """\
 71You are a replan-controller for a multi-agent trading arena. Your job is to
 72decide whether the arena needs additional data before committing to a verdict.
 73
 74You receive:
 75  - the current synthesis verdict + confidence + reasoning
 76  - the list of agent signals already produced (with confidences and key factors)
 77  - the list of sources already consulted this iteration
 78  - the list of additional sources the arena can ENABLE for a re-run
 79
 80Output rules (STRICT JSON, nothing else):
 81
 82  {
 83    "should_replan": <true|false>,
 84    "reason": "<one sentence — why>",
 85    "additional_sources": ["<source_name>", ...]  // pick 1-3, only from the
 86                                                  // available list, only when
 87                                                  // should_replan is true
 88  }
 89
 90Pick `should_replan=true` ONLY when:
 91  - synthesis_confidence is low AND
 92  - the low confidence reflects MISSING DATA (e.g. no macro context, no recent
 93    news, conflicting signals where one signal has very low confidence).
 94
 95Pick `should_replan=false` when:
 96  - confidence is already high enough, OR
 97  - the low confidence reflects GENUINE DISAGREEMENT (specialists have high
 98    confidence but disagree — adding more data is unlikely to resolve it).
 99"""
100
101
102class ReplanAgent(BaseAgent):
103    """Decide if the arena should re-run with more data."""
104
105    def __init__(self, config: AgentConfig) -> None:
106        super().__init__(config)
107        extra = self.config.extra or {}
108        self.confidence_floor: float = float(
109            extra.get("confidence_floor", DEFAULT_CONFIDENCE_FLOOR)
110        )
111        self.loopback_phase: str = str(
112            extra.get("loopback_phase", DEFAULT_LOOPBACK_PHASE)
113        )
114        # Source pool the agent can choose from. Arena config wires this; if
115        # empty, replan still works but it can't *add* anything — useful for
116        # arenas that just want a confidence-gated "rerun with same inputs".
117        self.available_sources: list[str] = list(
118            extra.get("available_sources", [])
119        )
120
121    async def run(self, state: ArenaState, ctx: AgentContext) -> ArenaState:
122        bus = get_event_bus()
123        arena_name = state.get("arena_name", "")
124        arena_id = state.get("arena_id", "")
125        correlation_id = state.get("metadata", {}).get("correlation_id", "")
126
127        confidence = float(state.get("synthesis_confidence") or 0.0)
128        verdict = state.get("synthesis_verdict") or state.get("signal") or "HOLD"
129        iteration = int(state.get("iteration") or 0)
130        max_iterations = int(state.get("metadata", {}).get("max_iterations", 1))
131
132        # Cap: never loop past max_iterations - 1 (we're 0-indexed; if cap is
133        # 3, allowed iteration ids are 0, 1, 2 → on iter==2 stop).
134        iterations_remaining = max(0, max_iterations - 1 - iteration)
135
136        decision: dict[str, Any] = {
137            "should_replan": False,
138            "reason": "",
139            "additional_sources": [],
140            "iteration": iteration,
141            "max_iterations": max_iterations,
142            "confidence_floor": self.confidence_floor,
143        }
144
145        # Detect explicit data gaps. These force replan regardless of the
146        # confidence float because a missing specialist or a "no_X_data"
147        # marker is a *known* incomplete picture — exactly what replan is for.
148        missing_specialists = _missing_specialists(state)
149        gap_markers = _gap_markers(state)
150        has_data_gap = bool(missing_specialists or gap_markers)
151        decision["missing_specialists"] = missing_specialists
152        decision["gap_markers"] = gap_markers
153
154        if iterations_remaining <= 0:
155            decision["reason"] = (
156                f"iteration cap reached ({iteration + 1}/{max_iterations})"
157            )
158        elif has_data_gap:
159            # Forced replan path — pick sources but don't ask the LLM if it's
160            # not worth the latency. Fall back to deterministic if the LLM
161            # call would just stall the loop.
162            decision["forced"] = True
163            llm_pick = await _ask_llm_to_pick(
164                ctx, state, _replan_candidates(state, ctx, self.available_sources),
165                confidence=confidence, verdict=str(verdict),
166            ) if self.available_sources else None
167            picks = []
168            if llm_pick:
169                picks = [
170                    s for s in (llm_pick.get("additional_sources") or [])
171                    if s in self.available_sources
172                ][:3]
173            if not picks:
174                picks = _replan_candidates(
175                    state, ctx, self.available_sources
176                )[:2]
177            gap_reason = []
178            if missing_specialists:
179                gap_reason.append(f"missing={missing_specialists}")
180            if gap_markers:
181                gap_reason.append(f"markers={gap_markers}")
182            decision.update({
183                "should_replan": True,
184                "reason": "data gap: " + "; ".join(gap_reason) + (
185                    f" → fetch {picks}" if picks else " (no sources to add)"
186                ),
187                "additional_sources": picks,
188            })
189        elif confidence >= self.confidence_floor:
190            decision["reason"] = (
191                f"confidence {confidence:.2f} ≥ floor {self.confidence_floor:.2f}"
192            )
193        else:
194            # Decide via LLM, falling back to deterministic heuristic.
195            already_bound = _active_source_names(state, ctx)
196            candidate_pool = [
197                s for s in self.available_sources if s not in already_bound
198            ] or list(self.available_sources)
199            if not candidate_pool:
200                decision["reason"] = "no additional sources available"
201            else:
202                llm_pick = await _ask_llm_to_pick(
203                    ctx, state, candidate_pool,
204                    confidence=confidence, verdict=str(verdict),
205                )
206                if llm_pick is None:
207                    # Heuristic fallback — take the first 2 unused sources.
208                    fallback = candidate_pool[:2]
209                    decision.update({
210                        "should_replan": True,
211                        "reason": (
212                            f"low confidence ({confidence:.2f}) + LLM unavailable; "
213                            f"fallback fetch {fallback}"
214                        ),
215                        "additional_sources": fallback,
216                        "llm_fallback": True,
217                    })
218                else:
219                    decision["should_replan"] = bool(llm_pick.get("should_replan"))
220                    decision["reason"] = str(llm_pick.get("reason", ""))[:300]
221                    # Strict whitelist: drop any source name the LLM hallucinated.
222                    picks = [
223                        s for s in (llm_pick.get("additional_sources") or [])
224                        if isinstance(s, str) and s in candidate_pool
225                    ][:3]
226                    decision["additional_sources"] = picks
227                    if decision["should_replan"] and not picks:
228                        # LLM said replan but picked nothing usable — degrade to
229                        # heuristic so we don't loop with the same inputs.
230                        decision["additional_sources"] = candidate_pool[:2]
231                        decision["llm_fallback"] = True
232
233        # Persist
234        state["replan"] = decision
235
236        # If we're looping back, mutate the source registry + state.
237        if decision["should_replan"] and decision["additional_sources"]:
238            _enable_replan_sources(
239                ctx, state, decision["additional_sources"],
240            )
241            state["iteration"] = iteration + 1
242            state["_next_phase"] = self.loopback_phase
243            # Reset transient per-iteration buckets so the next pass writes
244            # fresh entries rather than appending to the prior iteration's.
245            state["agent_signals"] = []
246            state["reports"] = {}
247            # Stash a copy of the prior iteration's outputs for the trail.
248            state.setdefault("prior_iterations", []).append({
249                "iteration": iteration,
250                "verdict": str(verdict),
251                "confidence": confidence,
252                "score": float(state.get("synthesis_score") or 0.0),
253                "reasoning": str(state.get("synthesis_reasoning") or "")[:500],
254            })
255
256        # Trail entry for reviewability
257        state.setdefault("decisions", {})[self.name] = json.dumps(decision)
258
259        await bus.publish(
260            "decision.emit",
261            arena=arena_name, arena_id=arena_id,
262            correlation_id=correlation_id,
263            phase=state.get("current_phase", ""),
264            payload={
265                "agent": self.name,
266                "kind": "replan",
267                "should_replan": decision["should_replan"],
268                "additional_sources": decision["additional_sources"],
269                "iteration": decision["iteration"],
270                "reason": decision["reason"],
271            },
272        )
273        return state
274
275
276# ---------------------------------------------------------------------------
277# Helpers
278# ---------------------------------------------------------------------------
279
280
281_GAP_MARKER_PREFIXES = ("no_", "missing_", "stale_", "empty_")
282
283
284def _missing_specialists(state: ArenaState) -> list[str]:
285    """Specialists that the YAML declared for the analysis phase but didn't
286    emit a signal this iteration. Indicates a hard failure (exception in the
287    agent's ReAct loop).
288
289    We look up the analysis-phase agent roster via state["metadata"]["phase_agents"]
290    when the arena populates it; otherwise we infer from the trace.
291    """
292    expected = set(state.get("metadata", {}).get("phase_agents", []) or [])
293    if not expected:
294        # Fallback: trace records every specialist that *attempted* to run.
295        trace = state.get("trace") or {}
296        for name, entry in trace.items():
297            if isinstance(entry, dict) and entry.get("role") in (
298                "specialist", "analyst",
299            ):
300                expected.add(str(name))
301    if not expected:
302        return []
303    actual = {
304        str(s.get("agent", ""))
305        for s in (state.get("agent_signals") or []) if isinstance(s, dict)
306    }
307    return sorted(expected - actual)
308
309
310def _gap_markers(state: ArenaState) -> list[str]:
311    """Surface any signal key_factor that looks like a data-gap marker.
312
313    Specialists are encouraged (see vol_regime_analyst.txt) to emit short
314    machine-readable markers like ``no_vix_data``, ``stale_news``,
315    ``missing_macro_context``. We harvest them so the replan controller can
316    deterministically route fresh fetches to the right adapter.
317
318    Additionally: any signal with ``parse_failed=True`` contributes a
319    synthetic ``parse_failed:<agent>`` marker. A parse failure means the LLM
320    didn't emit a usable structured signal — either the model ran out of
321    react steps or the reasoning leaked. Replan should treat that the same
322    as missing data: re-run, this time with potentially more sources and a
323    fresh response budget.
324    """
325    markers: set[str] = set()
326    for sig in state.get("agent_signals") or []:
327        if not isinstance(sig, dict):
328            continue
329        if sig.get("parse_failed"):
330            agent = str(sig.get("agent", "")) or "unknown"
331            markers.add(f"parse_failed:{agent}")
332        for kf in sig.get("key_factors") or []:
333            s = str(kf).strip().lower().replace(" ", "_")
334            if s.startswith(_GAP_MARKER_PREFIXES):
335                markers.add(s)
336            elif s == "parse_failed":
337                # Specialist's own marker (Strategy 3 fallback in _parse_output)
338                # already encodes the same condition — keep one canonical form.
339                markers.add(f"parse_failed:{sig.get('agent', 'unknown')}")
340    return sorted(markers)
341
342
343def _replan_candidates(
344    state: ArenaState, ctx: AgentContext, available: list[str],
345) -> list[str]:
346    """Subset of ``available`` that isn't already in this iteration's bindings."""
347    used = _active_source_names(state, ctx) | set(
348        state.get("replan_enabled_sources") or []
349    )
350    return [s for s in available if s not in used]
351
352
353def _active_source_names(state: ArenaState, ctx: AgentContext) -> set[str]:
354    """Best-effort: collect source names the prior phases actually fetched.
355
356    Reads ``state["source_metrics"]`` first (every fetch records its source
357    name there). Falls back to the agent's own configured sources.
358    """
359    seen: set[str] = set()
360    for row in state.get("source_metrics") or []:
361        name = row.get("source") if isinstance(row, dict) else None
362        if name:
363            seen.add(str(name))
364    if not seen:
365        # Last-ditch: configured sources on this agent
366        for s in ctx.config.sources or []:
367            seen.add(s)
368    return seen
369
370
371def _enable_replan_sources(
372    ctx: AgentContext, state: ArenaState, picks: list[str],
373) -> None:
374    """Mark the chosen sources as 'enabled for next iteration'.
375
376    The actual binding lives on the SourceRegistry already (the arena's YAML
377    declared every potential source up front). All we do here is make sure
378    the specialists in the next iteration see them. We record the picks in
379    state so the next iteration's specialists can opt in by reading
380    ``state["replan_enabled_sources"]``.
381    """
382    enabled = set(state.get("replan_enabled_sources") or [])
383    enabled.update(picks)
384    state["replan_enabled_sources"] = sorted(enabled)
385
386
387async def _ask_llm_to_pick(
388    ctx: AgentContext,
389    state: ArenaState,
390    candidate_pool: list[str],
391    *,
392    confidence: float,
393    verdict: str,
394) -> dict[str, Any] | None:
395    """Call the LLM to pick which sources to re-fetch. Returns None on failure."""
396    signals = state.get("agent_signals") or []
397    sigs_view = [
398        {
399            "agent": s.get("agent", ""),
400            "signal": s.get("signal", ""),
401            "confidence": s.get("confidence", 0.0),
402            "key_factors": (s.get("key_factors") or [])[:5],
403        }
404        for s in signals if isinstance(s, dict)
405    ]
406    user_prompt = json.dumps({
407        "synthesis_verdict": verdict,
408        "synthesis_confidence": confidence,
409        "synthesis_reasoning": str(state.get("synthesis_reasoning") or "")[:500],
410        "agent_signals": sigs_view,
411        "sources_consulted": sorted(_active_source_names(state, ctx)),
412        "available_sources_for_replan": candidate_pool,
413    }, default=str)
414
415    system_prompt = ctx.config.system_prompt or _REPLAN_SYSTEM_PROMPT
416
417    try:
418        text = await ctx.llm.chat_text(
419            system_prompt=system_prompt,
420            user_prompt=user_prompt,
421            max_tokens=400,
422            temperature=0.1,
423        )
424    except Exception as exc:
425        logger.warning("ReplanAgent: LLM call failed: %s", exc)
426        return None
427
428    parsed = parse_json_lenient(text)
429    if not isinstance(parsed, dict):
430        logger.warning("ReplanAgent: JSON parse failed; got %r", text[:200])
431        return None
432    return parsed
433
434
435__all__ = ["ReplanAgent"]