checking system…
Docs / back / src/maf/core/graph.py · line 29
Python · 83 lines
 1"""Phase graph — wires phases into execution order with transitions.
 2
 3Dynamic transitions
 4-------------------
 5A phase can override the configured transition for one hop by setting
 6``state["_next_phase"]`` before returning. This is how :class:`ReplanAgent`
 7loops back to the analysis phase mid-arena. A hard safety cap on total
 8phase steps prevents a misbehaving replan from looping forever (the
 9arena's ``max_iterations`` puts an *intentional* cap; the safety cap
10catches *unintentional* infinite loops).
11"""
12
13from __future__ import annotations
14
15import logging
16
17from maf.core.phase import Phase
18from maf.core.state import ArenaState
19
20logger = logging.getLogger(__name__)
21
22
23# Hard safety cap on number of phase steps inside one arena.run(). Real
24# loops (replan) should self-terminate well before this. If we hit this,
25# something is misconfigured — log and bail.
26_HARD_PHASE_STEP_CAP = 100
27
28
29class PhaseGraph:
30    """Simple DAG executor for arena phases.
31
32    Phases are linked by transition names. Execution starts at the first phase
33    and follows transitions until reaching "END" or exhausting phases.
34    """
35
36    def __init__(self, phases: list[Phase]) -> None:
37        self.phases = {p.name: p for p in phases}
38        self.order = [p.name for p in phases]
39
40    async def run(self, state: ArenaState) -> ArenaState:
41        """Execute phases in order following transitions."""
42        if not self.order:
43            return state
44
45        current = self.order[0]
46        steps = 0
47
48        while current and current != "END":
49            steps += 1
50            if steps > _HARD_PHASE_STEP_CAP:
51                logger.error(
52                    "PhaseGraph: hard step cap %d hit — terminating run "
53                    "(possible replan loop misconfiguration)", _HARD_PHASE_STEP_CAP,
54                )
55                break
56
57            phase = self.phases.get(current)
58            if not phase:
59                logger.error("Phase %r not found in graph", current)
60                break
61
62            logger.info("Executing phase: %s", current)
63            state = await phase.run(state)
64
65            # Dynamic transition wins for one hop. ReplanAgent uses this to
66            # loop back to "analysis"; otherwise the configured transition
67            # determines the next phase.
68            dyn_next = state.pop("_next_phase", None)
69            if dyn_next:
70                logger.info(
71                    "PhaseGraph: dynamic transition %s%s (overrides %s)",
72                    current, dyn_next, phase.config.transition,
73                )
74                current = dyn_next
75                continue
76
77            transition = phase.config.transition
78            if transition == "END" or not transition:
79                break
80            current = transition
81
82        return state