1"""Phase graph — wires phases into execution order with transitions. 2 3Dynamic transitions 4------------------- 5A phase can override the configured transition for one hop by setting 6``state["_next_phase"]`` before returning. This is how :class:`ReplanAgent` 7loops back to the analysis phase mid-arena. A hard safety cap on total 8phase steps prevents a misbehaving replan from looping forever (the 9arena's ``max_iterations`` puts an *intentional* cap; the safety cap 10catches *unintentional* infinite loops). 11""" 12 13from __future__ import annotations 14 15import logging 16 17from maf.core.phase import Phase 18from maf.core.state import ArenaState 19 20logger = logging.getLogger(__name__) 21 22 23# Hard safety cap on number of phase steps inside one arena.run(). Real 24# loops (replan) should self-terminate well before this. If we hit this, 25# something is misconfigured — log and bail. 26_HARD_PHASE_STEP_CAP = 100 27 28 29class PhaseGraph: 30 """Simple DAG executor for arena phases. 31 32 Phases are linked by transition names. Execution starts at the first phase 33 and follows transitions until reaching "END" or exhausting phases. 34 """ 35 36 def __init__(self, phases: list[Phase]) -> None: 37 self.phases = {p.name: p for p in phases} 38 self.order = [p.name for p in phases] 39 40 async def run(self, state: ArenaState) -> ArenaState: 41 """Execute phases in order following transitions.""" 42 if not self.order: 43 return state 44 45 current = self.order[0] 46 steps = 0 47 48 while current and current != "END": 49 steps += 1 50 if steps > _HARD_PHASE_STEP_CAP: 51 logger.error( 52 "PhaseGraph: hard step cap %d hit — terminating run " 53 "(possible replan loop misconfiguration)", _HARD_PHASE_STEP_CAP, 54 ) 55 break 56 57 phase = self.phases.get(current) 58 if not phase: 59 logger.error("Phase %r not found in graph", current) 60 break 61 62 logger.info("Executing phase: %s", current) 63 state = await phase.run(state) 64 65 # Dynamic transition wins for one hop. ReplanAgent uses this to 66 # loop back to "analysis"; otherwise the configured transition 67 # determines the next phase. 68 dyn_next = state.pop("_next_phase", None) 69 if dyn_next: 70 logger.info( 71 "PhaseGraph: dynamic transition %s → %s (overrides %s)", 72 current, dyn_next, phase.config.transition, 73 ) 74 current = dyn_next 75 continue 76 77 transition = phase.config.transition 78 if transition == "END" or not transition: 79 break 80 current = transition 81 82 return state