1"""ReplanAgent — decides whether the arena should fetch more data and re-run. 2 3Where this fits 4--------------- 5A specialist + synthesis pipeline produces a verdict + confidence. Sometimes 6the confidence is low because the *picture is mixed* (real disagreement — 7nothing more data can fix) and sometimes because the *picture is incomplete* 8(a key source was empty / a regime is ambiguous). ReplanAgent looks at the 9post-synthesis state and decides which of those two cases applies. 10 11If "incomplete", it picks 1–3 additional source bindings from a pool the 12arena config declares as ``available_sources_for_replan`` and signals the 13phase graph to loop back to the analysis phase. On the next iteration the 14specialists see the extra data. The arena caps total iterations via 15``ArenaConfig.max_iterations`` so a runaway can't loop forever. 16 17If "mixed", or the iteration cap was hit, it commits to the existing verdict 18and lets the graph transition to ``emit``. 19 20Output written to state 21----------------------- 22* ``state["replan"]`` — diagnostic dict ({should_replan, reason, 23 additional_sources, iteration, max_iterations}) 24* ``state["_next_phase"]`` — only set when looping back. The :class:`PhaseGraph` 25 reads this and overrides the configured transition for this hop. 26* ``state["iteration"]`` — incremented when looping back. 27 28Decision algorithm 29------------------ 301. If ``iteration >= max_iterations - 1`` → no replan (cap hit). 312. Detect explicit data gaps: 32 * a specialist failed (fewer signals than the analysis-phase agent count); or 33 * any signal flagged a ``no_<...>_data`` key_factor. 34 In either case, force replan (still bounded by the iteration cap). 353. If ``synthesis_confidence >= confidence_floor`` → no replan. 364. Otherwise: ask the LLM (with a tight JSON-only prompt) to pick from 37 ``available`` the sources most likely to close the data gap, *and* 38 to write a one-line reason. If the LLM fails to produce parseable 39 JSON, fall back to a deterministic heuristic that bumps every 40 available source whose name isn't already in the active binding list. 41 42This last fallback matters: an LLM hiccup on the replan call shouldn't 43permanently kill the loop — it should still iterate at least once. 44""" 45 46from __future__ import annotations 47 48import json 49import logging 50from typing import Any 51 52from maf.config import AgentConfig 53from maf.core.agent import AgentContext, BaseAgent 54from maf.core.state import ArenaState 55from maf.llm.json_parsing import parse_json_lenient 56from maf.streaming import get_event_bus 57 58logger = logging.getLogger(__name__) 59 60 61# Default thresholds. Arena config can override via agent ``extra`` block: 62# extra: 63# confidence_floor: 0.55 64# loopback_phase: "analysis" 65# available_sources: ["news_macro", "fomo2_knowledge", ...] 66DEFAULT_CONFIDENCE_FLOOR = 0.55 67DEFAULT_LOOPBACK_PHASE = "analysis" 68 69 70_REPLAN_SYSTEM_PROMPT = """\ 71You are a replan-controller for a multi-agent trading arena. Your job is to 72decide whether the arena needs additional data before committing to a verdict. 73 74You receive: 75 - the current synthesis verdict + confidence + reasoning 76 - the list of agent signals already produced (with confidences and key factors) 77 - the list of sources already consulted this iteration 78 - the list of additional sources the arena can ENABLE for a re-run 79 80Output rules (STRICT JSON, nothing else): 81 82 { 83 "should_replan": <true|false>, 84 "reason": "<one sentence — why>", 85 "additional_sources": ["<source_name>", ...] // pick 1-3, only from the 86 // available list, only when 87 // should_replan is true 88 } 89 90Pick `should_replan=true` ONLY when: 91 - synthesis_confidence is low AND 92 - the low confidence reflects MISSING DATA (e.g. no macro context, no recent 93 news, conflicting signals where one signal has very low confidence). 94 95Pick `should_replan=false` when: 96 - confidence is already high enough, OR 97 - the low confidence reflects GENUINE DISAGREEMENT (specialists have high 98 confidence but disagree — adding more data is unlikely to resolve it). 99""" 100 101 102class ReplanAgent(BaseAgent): 103 """Decide if the arena should re-run with more data.""" 104 105 def __init__(self, config: AgentConfig) -> None: 106 super().__init__(config) 107 extra = self.config.extra or {} 108 self.confidence_floor: float = float( 109 extra.get("confidence_floor", DEFAULT_CONFIDENCE_FLOOR) 110 ) 111 self.loopback_phase: str = str( 112 extra.get("loopback_phase", DEFAULT_LOOPBACK_PHASE) 113 ) 114 # Source pool the agent can choose from. Arena config wires this; if 115 # empty, replan still works but it can't *add* anything — useful for 116 # arenas that just want a confidence-gated "rerun with same inputs". 117 self.available_sources: list[str] = list( 118 extra.get("available_sources", []) 119 ) 120 121 async def run(self, state: ArenaState, ctx: AgentContext) -> ArenaState: 122 bus = get_event_bus() 123 arena_name = state.get("arena_name", "") 124 arena_id = state.get("arena_id", "") 125 correlation_id = state.get("metadata", {}).get("correlation_id", "") 126 127 confidence = float(state.get("synthesis_confidence") or 0.0) 128 verdict = state.get("synthesis_verdict") or state.get("signal") or "HOLD" 129 iteration = int(state.get("iteration") or 0) 130 max_iterations = int(state.get("metadata", {}).get("max_iterations", 1)) 131 132 # Cap: never loop past max_iterations - 1 (we're 0-indexed; if cap is 133 # 3, allowed iteration ids are 0, 1, 2 → on iter==2 stop). 134 iterations_remaining = max(0, max_iterations - 1 - iteration) 135 136 decision: dict[str, Any] = { 137 "should_replan": False, 138 "reason": "", 139 "additional_sources": [], 140 "iteration": iteration, 141 "max_iterations": max_iterations, 142 "confidence_floor": self.confidence_floor, 143 } 144 145 # Detect explicit data gaps. These force replan regardless of the 146 # confidence float because a missing specialist or a "no_X_data" 147 # marker is a *known* incomplete picture — exactly what replan is for. 148 missing_specialists = _missing_specialists(state) 149 gap_markers = _gap_markers(state) 150 has_data_gap = bool(missing_specialists or gap_markers) 151 decision["missing_specialists"] = missing_specialists 152 decision["gap_markers"] = gap_markers 153 154 if iterations_remaining <= 0: 155 decision["reason"] = ( 156 f"iteration cap reached ({iteration + 1}/{max_iterations})" 157 ) 158 elif has_data_gap: 159 # Forced replan path — pick sources but don't ask the LLM if it's 160 # not worth the latency. Fall back to deterministic if the LLM 161 # call would just stall the loop. 162 decision["forced"] = True 163 llm_pick = await _ask_llm_to_pick( 164 ctx, state, _replan_candidates(state, ctx, self.available_sources), 165 confidence=confidence, verdict=str(verdict), 166 ) if self.available_sources else None 167 picks = [] 168 if llm_pick: 169 picks = [ 170 s for s in (llm_pick.get("additional_sources") or []) 171 if s in self.available_sources 172 ][:3] 173 if not picks: 174 picks = _replan_candidates( 175 state, ctx, self.available_sources 176 )[:2] 177 gap_reason = [] 178 if missing_specialists: 179 gap_reason.append(f"missing={missing_specialists}") 180 if gap_markers: 181 gap_reason.append(f"markers={gap_markers}") 182 decision.update({ 183 "should_replan": True, 184 "reason": "data gap: " + "; ".join(gap_reason) + ( 185 f" → fetch {picks}" if picks else " (no sources to add)" 186 ), 187 "additional_sources": picks, 188 }) 189 elif confidence >= self.confidence_floor: 190 decision["reason"] = ( 191 f"confidence {confidence:.2f} ≥ floor {self.confidence_floor:.2f}" 192 ) 193 else: 194 # Decide via LLM, falling back to deterministic heuristic. 195 already_bound = _active_source_names(state, ctx) 196 candidate_pool = [ 197 s for s in self.available_sources if s not in already_bound 198 ] or list(self.available_sources) 199 if not candidate_pool: 200 decision["reason"] = "no additional sources available" 201 else: 202 llm_pick = await _ask_llm_to_pick( 203 ctx, state, candidate_pool, 204 confidence=confidence, verdict=str(verdict), 205 ) 206 if llm_pick is None: 207 # Heuristic fallback — take the first 2 unused sources. 208 fallback = candidate_pool[:2] 209 decision.update({ 210 "should_replan": True, 211 "reason": ( 212 f"low confidence ({confidence:.2f}) + LLM unavailable; " 213 f"fallback fetch {fallback}" 214 ), 215 "additional_sources": fallback, 216 "llm_fallback": True, 217 }) 218 else: 219 decision["should_replan"] = bool(llm_pick.get("should_replan")) 220 decision["reason"] = str(llm_pick.get("reason", ""))[:300] 221 # Strict whitelist: drop any source name the LLM hallucinated. 222 picks = [ 223 s for s in (llm_pick.get("additional_sources") or []) 224 if isinstance(s, str) and s in candidate_pool 225 ][:3] 226 decision["additional_sources"] = picks 227 if decision["should_replan"] and not picks: 228 # LLM said replan but picked nothing usable — degrade to 229 # heuristic so we don't loop with the same inputs. 230 decision["additional_sources"] = candidate_pool[:2] 231 decision["llm_fallback"] = True 232 233 # Persist 234 state["replan"] = decision 235 236 # If we're looping back, mutate the source registry + state. 237 if decision["should_replan"] and decision["additional_sources"]: 238 _enable_replan_sources( 239 ctx, state, decision["additional_sources"], 240 ) 241 state["iteration"] = iteration + 1 242 state["_next_phase"] = self.loopback_phase 243 # Reset transient per-iteration buckets so the next pass writes 244 # fresh entries rather than appending to the prior iteration's. 245 state["agent_signals"] = [] 246 state["reports"] = {} 247 # Stash a copy of the prior iteration's outputs for the trail. 248 state.setdefault("prior_iterations", []).append({ 249 "iteration": iteration, 250 "verdict": str(verdict), 251 "confidence": confidence, 252 "score": float(state.get("synthesis_score") or 0.0), 253 "reasoning": str(state.get("synthesis_reasoning") or "")[:500], 254 }) 255 256 # Trail entry for reviewability 257 state.setdefault("decisions", {})[self.name] = json.dumps(decision) 258 259 await bus.publish( 260 "decision.emit", 261 arena=arena_name, arena_id=arena_id, 262 correlation_id=correlation_id, 263 phase=state.get("current_phase", ""), 264 payload={ 265 "agent": self.name, 266 "kind": "replan", 267 "should_replan": decision["should_replan"], 268 "additional_sources": decision["additional_sources"], 269 "iteration": decision["iteration"], 270 "reason": decision["reason"], 271 }, 272 ) 273 return state 274 275 276# --------------------------------------------------------------------------- 277# Helpers 278# --------------------------------------------------------------------------- 279 280 281_GAP_MARKER_PREFIXES = ("no_", "missing_", "stale_", "empty_") 282 283 284def _missing_specialists(state: ArenaState) -> list[str]: 285 """Specialists that the YAML declared for the analysis phase but didn't 286 emit a signal this iteration. Indicates a hard failure (exception in the 287 agent's ReAct loop). 288 289 We look up the analysis-phase agent roster via state["metadata"]["phase_agents"] 290 when the arena populates it; otherwise we infer from the trace. 291 """ 292 expected = set(state.get("metadata", {}).get("phase_agents", []) or []) 293 if not expected: 294 # Fallback: trace records every specialist that *attempted* to run. 295 trace = state.get("trace") or {} 296 for name, entry in trace.items(): 297 if isinstance(entry, dict) and entry.get("role") in ( 298 "specialist", "analyst", 299 ): 300 expected.add(str(name)) 301 if not expected: 302 return [] 303 actual = { 304 str(s.get("agent", "")) 305 for s in (state.get("agent_signals") or []) if isinstance(s, dict) 306 } 307 return sorted(expected - actual) 308 309 310def _gap_markers(state: ArenaState) -> list[str]: 311 """Surface any signal key_factor that looks like a data-gap marker. 312 313 Specialists are encouraged (see vol_regime_analyst.txt) to emit short 314 machine-readable markers like ``no_vix_data``, ``stale_news``, 315 ``missing_macro_context``. We harvest them so the replan controller can 316 deterministically route fresh fetches to the right adapter. 317 318 Additionally: any signal with ``parse_failed=True`` contributes a 319 synthetic ``parse_failed:<agent>`` marker. A parse failure means the LLM 320 didn't emit a usable structured signal — either the model ran out of 321 react steps or the reasoning leaked. Replan should treat that the same 322 as missing data: re-run, this time with potentially more sources and a 323 fresh response budget. 324 """ 325 markers: set[str] = set() 326 for sig in state.get("agent_signals") or []: 327 if not isinstance(sig, dict): 328 continue 329 if sig.get("parse_failed"): 330 agent = str(sig.get("agent", "")) or "unknown" 331 markers.add(f"parse_failed:{agent}") 332 for kf in sig.get("key_factors") or []: 333 s = str(kf).strip().lower().replace(" ", "_") 334 if s.startswith(_GAP_MARKER_PREFIXES): 335 markers.add(s) 336 elif s == "parse_failed": 337 # Specialist's own marker (Strategy 3 fallback in _parse_output) 338 # already encodes the same condition — keep one canonical form. 339 markers.add(f"parse_failed:{sig.get('agent', 'unknown')}") 340 return sorted(markers) 341 342 343def _replan_candidates( 344 state: ArenaState, ctx: AgentContext, available: list[str], 345) -> list[str]: 346 """Subset of ``available`` that isn't already in this iteration's bindings.""" 347 used = _active_source_names(state, ctx) | set( 348 state.get("replan_enabled_sources") or [] 349 ) 350 return [s for s in available if s not in used] 351 352 353def _active_source_names(state: ArenaState, ctx: AgentContext) -> set[str]: 354 """Best-effort: collect source names the prior phases actually fetched. 355 356 Reads ``state["source_metrics"]`` first (every fetch records its source 357 name there). Falls back to the agent's own configured sources. 358 """ 359 seen: set[str] = set() 360 for row in state.get("source_metrics") or []: 361 name = row.get("source") if isinstance(row, dict) else None 362 if name: 363 seen.add(str(name)) 364 if not seen: 365 # Last-ditch: configured sources on this agent 366 for s in ctx.config.sources or []: 367 seen.add(s) 368 return seen 369 370 371def _enable_replan_sources( 372 ctx: AgentContext, state: ArenaState, picks: list[str], 373) -> None: 374 """Mark the chosen sources as 'enabled for next iteration'. 375 376 The actual binding lives on the SourceRegistry already (the arena's YAML 377 declared every potential source up front). All we do here is make sure 378 the specialists in the next iteration see them. We record the picks in 379 state so the next iteration's specialists can opt in by reading 380 ``state["replan_enabled_sources"]``. 381 """ 382 enabled = set(state.get("replan_enabled_sources") or []) 383 enabled.update(picks) 384 state["replan_enabled_sources"] = sorted(enabled) 385 386 387async def _ask_llm_to_pick( 388 ctx: AgentContext, 389 state: ArenaState, 390 candidate_pool: list[str], 391 *, 392 confidence: float, 393 verdict: str, 394) -> dict[str, Any] | None: 395 """Call the LLM to pick which sources to re-fetch. Returns None on failure.""" 396 signals = state.get("agent_signals") or [] 397 sigs_view = [ 398 { 399 "agent": s.get("agent", ""), 400 "signal": s.get("signal", ""), 401 "confidence": s.get("confidence", 0.0), 402 "key_factors": (s.get("key_factors") or [])[:5], 403 } 404 for s in signals if isinstance(s, dict) 405 ] 406 user_prompt = json.dumps({ 407 "synthesis_verdict": verdict, 408 "synthesis_confidence": confidence, 409 "synthesis_reasoning": str(state.get("synthesis_reasoning") or "")[:500], 410 "agent_signals": sigs_view, 411 "sources_consulted": sorted(_active_source_names(state, ctx)), 412 "available_sources_for_replan": candidate_pool, 413 }, default=str) 414 415 system_prompt = ctx.config.system_prompt or _REPLAN_SYSTEM_PROMPT 416 417 try: 418 text = await ctx.llm.chat_text( 419 system_prompt=system_prompt, 420 user_prompt=user_prompt, 421 max_tokens=400, 422 temperature=0.1, 423 ) 424 except Exception as exc: 425 logger.warning("ReplanAgent: LLM call failed: %s", exc) 426 return None 427 428 parsed = parse_json_lenient(text) 429 if not isinstance(parsed, dict): 430 logger.warning("ReplanAgent: JSON parse failed; got %r", text[:200]) 431 return None 432 return parsed 433 434 435__all__ = ["ReplanAgent"]