1"""Arena state management — typed dicts for structured agent communication.""" 2 3from __future__ import annotations 4 5from typing import Any, TypedDict 6 7 8class DebateMessage(TypedDict): 9 agent: str 10 content: str 11 round: int 12 13 14class AgentSignal(TypedDict, total=False): 15 """Structured output from a specialist agent. 16 17 Every specialist agent produces one of these in addition to a full 18 narrative report. The synthesis agent reads all AgentSignals and 19 computes the confidence-weighted ensemble score. 20 """ 21 22 agent: str # agent name, e.g. "price_analyst" 23 domain: str # "price" | "sentiment" | "onchain" | "macro" | "risk" 24 signal: str # "BULLISH" | "BEARISH" | "NEUTRAL" 25 confidence: float # 0.0 – 1.0, agent's self-assessed certainty 26 summary: str # ≤ 25-word one-liner 27 key_factors: list # 2–5 bullet points (short strings) 28 narrative: str # full analysis text (narrative report) 29 raw_data: dict # data points used (for reviewability) 30 31 # True when the structured-output JSON couldn't be parsed from the LLM's 32 # final response (max_react_steps exhausted, reasoning leak, malformed 33 # JSON). The synthesis layer weights these signals down; ReplanAgent 34 # treats parse failures as a data gap and forces a re-run. 35 parse_failed: bool 36 37 # Specialist-defined payload fields outside the canonical schema. The 38 # agnostic prompts (event_impact_analyst → impacted_tickers + net_impact, 39 # general_analyst → answer + counter_view + answer_kind, intent_classifier 40 # → classification + route + alternates) ship custom fields here that the 41 # synthesis layer reads to compose the Prognosis envelope. 42 extras: dict 43 44 45class DebateState(TypedDict, total=False): 46 history: list[DebateMessage] 47 per_agent: dict[str, list[str]] # agent_name -> list of their arguments 48 latest_per_agent: dict[str, str] # agent_name -> most recent argument 49 judge_decision: str 50 count: int 51 52 53class ArenaState(TypedDict, total=False): 54 """Base state shared across all agents in an arena. 55 56 Agents write to structured fields (reports, debates, decisions) rather than 57 appending to a shared message history — this avoids the "telephone effect" 58 described in the TradingAgents paper. 59 """ 60 61 arena_id: str 62 arena_name: str 63 iteration: int 64 current_phase: str 65 66 # Target context (e.g. ticker + date for trading arenas) 67 target: dict[str, Any] 68 69 # Analyst reports: agent_name -> report text 70 reports: dict[str, str] 71 72 # Debate states: debate_name -> DebateState 73 debates: dict[str, DebateState] 74 75 # Decisions from judges/executors 76 decisions: dict[str, str] 77 78 # Final output 79 signal: str # extracted clean signal (e.g. BUY/SELL/HOLD) 80 output: dict[str, Any] 81 82 # Structured specialist signals (populated by SpecialistAgent runs) 83 agent_signals: list[AgentSignal] 84 85 # Synthesis scoring (populated by SynthesisAgent) 86 synthesis_score: float # -1.0 (fully bearish) .. +1.0 (fully bullish) 87 synthesis_verdict: str # BUY | HOLD | SELL 88 synthesis_confidence: float # 0.0 – 1.0 89 synthesis_reasoning: str # LLM narrative 90 91 # Source-level monitoring — every fetch call recorded here 92 source_metrics: list[dict[str, Any]] 93 94 # Execution trace: agent_name -> list of tool call records 95 trace: dict[str, list[dict[str, Any]]] 96 97 # Arbitrary metadata 98 metadata: dict[str, Any] 99 100 101def create_initial_state( 102 arena_id: str, 103 arena_name: str, 104 target: dict[str, Any] | None = None, 105) -> ArenaState: 106 """Create a fresh arena state.""" 107 return ArenaState( 108 arena_id=arena_id, 109 arena_name=arena_name, 110 iteration=0, 111 current_phase="", 112 target=target or {}, 113 reports={}, 114 debates={}, 115 decisions={}, 116 signal="", 117 output={}, 118 agent_signals=[], 119 synthesis_score=0.0, 120 synthesis_verdict="", 121 synthesis_confidence=0.0, 122 synthesis_reasoning="", 123 source_metrics=[], 124 trace={}, 125 metadata={}, 126 )