1"""GenericDecision envelope + outbox for non-trading arenas. 2 3Why a separate type from TradingAction 4-------------------------------------- 5TradingAction is shaped for order routers: ``verdict`` is BUY/HOLD/SELL, 6``mode`` controls execution policy, ``sizing`` is the position size hint. 7A research-debate arena's "verdict" might be "approve", "reject", 8"needs-revision"; a policy-review arena's verdict might be a 3-tier severity 9label. Forcing those into the BUY/HOLD/SELL slot loses information. 10 11GenericDecision keeps verdict as a free-string and carries the full opaque 12target dict, so any arena can publish without distorting its meaning. 13 14Routing 15------- 16``MAFApp._publish_arena_output`` picks the envelope based on the arena's 17``target_key`` config: 18 19 target_key == "ticker" → TradingAction → ``maf:actions:out`` 20 otherwise → GenericDecision → ``maf:decisions:out`` 21 22A consumer that wants both can XREAD both streams. 23""" 24 25from __future__ import annotations 26 27import logging 28import os 29import uuid 30from datetime import UTC, datetime 31from typing import Any 32 33from pydantic import BaseModel, ConfigDict, Field 34 35logger = logging.getLogger(__name__) 36 37 38DEFAULT_DECISIONS_STREAM = "maf:decisions:out" 39 40 41def _utcnow() -> datetime: 42 return datetime.now(UTC) 43 44 45class GenericDecision(BaseModel): 46 """Domain-agnostic deliberation outcome from any non-trading arena.""" 47 48 model_config = ConfigDict(frozen=True) 49 50 schema_version: str = "1" 51 arena: str 52 arena_id: str = "" 53 correlation_id: str = Field(default_factory=lambda: uuid.uuid4().hex) 54 published_at: datetime = Field(default_factory=_utcnow) 55 # The arena-specific subject. ``target_key`` (defined on the arena 56 # config) names the field within this dict that uniquely identifies 57 # the subject — e.g. ``target["question_id"]`` for a research debate. 58 target: dict[str, Any] = Field(default_factory=dict) 59 target_key: str = "" 60 # Free-form verdict — arenas pick the vocabulary that fits. 61 # Examples: "approve" / "reject", "ship" / "block", "high" / "medium" / 62 # "low" severity, or even a short natural-language sentence. 63 verdict: str 64 confidence: float = Field(ge=0.0, le=1.0, default=0.0) 65 reasoning: str = "" 66 # Per-specialist signals the synthesis read, so consumers can audit. 67 contributors: list[dict[str, Any]] = Field(default_factory=list) 68 meta: dict[str, Any] = Field(default_factory=dict) 69 70 71class DecisionOutbox: 72 """Async publisher for :class:`GenericDecision` to a Redis Stream.""" 73 74 def __init__( 75 self, 76 redis_url: str | None = None, 77 stream: str = DEFAULT_DECISIONS_STREAM, 78 maxlen: int = 50_000, 79 ) -> None: 80 self.redis_url = redis_url or os.environ.get( 81 "REDIS_URL", "redis://localhost:6379/0", 82 ) 83 self.stream = stream 84 self.maxlen = maxlen 85 self._redis: Any = None 86 87 async def _get_redis(self) -> Any: 88 if self._redis is None: 89 import redis.asyncio as aioredis 90 self._redis = aioredis.from_url(self.redis_url) 91 return self._redis 92 93 async def publish(self, decision: GenericDecision) -> str: 94 try: 95 client = await self._get_redis() 96 kwargs: dict[str, Any] = {} 97 if self.maxlen > 0: 98 kwargs["maxlen"] = self.maxlen 99 kwargs["approximate"] = True 100 raw = await client.xadd( 101 self.stream, 102 {"data": decision.model_dump_json()}, 103 **kwargs, 104 ) 105 return raw.decode("utf-8") if isinstance(raw, bytes) else str(raw) 106 except Exception: 107 logger.exception( 108 "DecisionOutbox.publish failed for arena=%s target=%s", 109 decision.arena, decision.target, 110 ) 111 return "" 112 113 async def aclose(self) -> None: 114 if self._redis is None: 115 return 116 try: 117 ac = getattr(self._redis, "aclose", None) 118 if ac: 119 await ac() 120 else: 121 await self._redis.close() 122 except Exception: 123 pass 124 125 126def build_decision_from_state( 127 state: dict[str, Any], 128 *, 129 arena: str, 130 target_key: str, 131) -> GenericDecision | None: 132 """Build a :class:`GenericDecision` from a post-run arena state. 133 134 Returns ``None`` when the state has no usable target value for 135 ``target_key`` — the caller skips publishing rather than emit garbage. 136 """ 137 raw_target = state.get("target") or {} 138 target_value = str(raw_target.get(target_key) or "").strip() 139 if not target_value: 140 return None 141 verdict = str( 142 state.get("synthesis_verdict") 143 or state.get("signal") 144 or "UNKNOWN" 145 ) 146 confidence = float(state.get("synthesis_confidence") or 0.0) 147 reasoning = str(state.get("synthesis_reasoning") or "")[:2000] 148 149 contributors: list[dict[str, Any]] = [] 150 for sig in state.get("agent_signals") or []: 151 if not isinstance(sig, dict): 152 continue 153 contributors.append({ 154 "agent": sig.get("agent", ""), 155 "domain": sig.get("domain", ""), 156 "signal": sig.get("signal", ""), 157 "confidence": float(sig.get("confidence", 0.0) or 0.0), 158 "summary": str(sig.get("summary", ""))[:300], 159 }) 160 161 return GenericDecision( 162 arena=arena, 163 arena_id=str(state.get("arena_id") or ""), 164 target=dict(raw_target), 165 target_key=target_key, 166 verdict=verdict, 167 confidence=confidence, 168 reasoning=reasoning, 169 contributors=contributors, 170 meta={ 171 "agent_signals_count": len(contributors), 172 "reports": list((state.get("reports") or {}).keys()), 173 "synthesis_score": float(state.get("synthesis_score") or 0.0), 174 }, 175 ) 176 177 178__all__ = [ 179 "DEFAULT_DECISIONS_STREAM", 180 "DecisionOutbox", 181 "GenericDecision", 182 "build_decision_from_state", 183]