checking system…
Docs / back / src/maf/actions/decisions.py · line 45
Python · 184 lines
  1"""GenericDecision envelope + outbox for non-trading arenas.
  2
  3Why a separate type from TradingAction
  4--------------------------------------
  5TradingAction is shaped for order routers: ``verdict`` is BUY/HOLD/SELL,
  6``mode`` controls execution policy, ``sizing`` is the position size hint.
  7A research-debate arena's "verdict" might be "approve", "reject",
  8"needs-revision"; a policy-review arena's verdict might be a 3-tier severity
  9label. Forcing those into the BUY/HOLD/SELL slot loses information.
 10
 11GenericDecision keeps verdict as a free-string and carries the full opaque
 12target dict, so any arena can publish without distorting its meaning.
 13
 14Routing
 15-------
 16``MAFApp._publish_arena_output`` picks the envelope based on the arena's
 17``target_key`` config:
 18
 19    target_key == "ticker"  → TradingAction → ``maf:actions:out``
 20    otherwise               → GenericDecision → ``maf:decisions:out``
 21
 22A consumer that wants both can XREAD both streams.
 23"""
 24
 25from __future__ import annotations
 26
 27import logging
 28import os
 29import uuid
 30from datetime import UTC, datetime
 31from typing import Any
 32
 33from pydantic import BaseModel, ConfigDict, Field
 34
 35logger = logging.getLogger(__name__)
 36
 37
 38DEFAULT_DECISIONS_STREAM = "maf:decisions:out"
 39
 40
 41def _utcnow() -> datetime:
 42    return datetime.now(UTC)
 43
 44
 45class GenericDecision(BaseModel):
 46    """Domain-agnostic deliberation outcome from any non-trading arena."""
 47
 48    model_config = ConfigDict(frozen=True)
 49
 50    schema_version: str = "1"
 51    arena: str
 52    arena_id: str = ""
 53    correlation_id: str = Field(default_factory=lambda: uuid.uuid4().hex)
 54    published_at: datetime = Field(default_factory=_utcnow)
 55    # The arena-specific subject. ``target_key`` (defined on the arena
 56    # config) names the field within this dict that uniquely identifies
 57    # the subject — e.g. ``target["question_id"]`` for a research debate.
 58    target: dict[str, Any] = Field(default_factory=dict)
 59    target_key: str = ""
 60    # Free-form verdict — arenas pick the vocabulary that fits.
 61    # Examples: "approve" / "reject", "ship" / "block", "high" / "medium" /
 62    # "low" severity, or even a short natural-language sentence.
 63    verdict: str
 64    confidence: float = Field(ge=0.0, le=1.0, default=0.0)
 65    reasoning: str = ""
 66    # Per-specialist signals the synthesis read, so consumers can audit.
 67    contributors: list[dict[str, Any]] = Field(default_factory=list)
 68    meta: dict[str, Any] = Field(default_factory=dict)
 69
 70
 71class DecisionOutbox:
 72    """Async publisher for :class:`GenericDecision` to a Redis Stream."""
 73
 74    def __init__(
 75        self,
 76        redis_url: str | None = None,
 77        stream: str = DEFAULT_DECISIONS_STREAM,
 78        maxlen: int = 50_000,
 79    ) -> None:
 80        self.redis_url = redis_url or os.environ.get(
 81            "REDIS_URL", "redis://localhost:6379/0",
 82        )
 83        self.stream = stream
 84        self.maxlen = maxlen
 85        self._redis: Any = None
 86
 87    async def _get_redis(self) -> Any:
 88        if self._redis is None:
 89            import redis.asyncio as aioredis
 90            self._redis = aioredis.from_url(self.redis_url)
 91        return self._redis
 92
 93    async def publish(self, decision: GenericDecision) -> str:
 94        try:
 95            client = await self._get_redis()
 96            kwargs: dict[str, Any] = {}
 97            if self.maxlen > 0:
 98                kwargs["maxlen"] = self.maxlen
 99                kwargs["approximate"] = True
100            raw = await client.xadd(
101                self.stream,
102                {"data": decision.model_dump_json()},
103                **kwargs,
104            )
105            return raw.decode("utf-8") if isinstance(raw, bytes) else str(raw)
106        except Exception:
107            logger.exception(
108                "DecisionOutbox.publish failed for arena=%s target=%s",
109                decision.arena, decision.target,
110            )
111            return ""
112
113    async def aclose(self) -> None:
114        if self._redis is None:
115            return
116        try:
117            ac = getattr(self._redis, "aclose", None)
118            if ac:
119                await ac()
120            else:
121                await self._redis.close()
122        except Exception:
123            pass
124
125
126def build_decision_from_state(
127    state: dict[str, Any],
128    *,
129    arena: str,
130    target_key: str,
131) -> GenericDecision | None:
132    """Build a :class:`GenericDecision` from a post-run arena state.
133
134    Returns ``None`` when the state has no usable target value for
135    ``target_key`` — the caller skips publishing rather than emit garbage.
136    """
137    raw_target = state.get("target") or {}
138    target_value = str(raw_target.get(target_key) or "").strip()
139    if not target_value:
140        return None
141    verdict = str(
142        state.get("synthesis_verdict")
143        or state.get("signal")
144        or "UNKNOWN"
145    )
146    confidence = float(state.get("synthesis_confidence") or 0.0)
147    reasoning = str(state.get("synthesis_reasoning") or "")[:2000]
148
149    contributors: list[dict[str, Any]] = []
150    for sig in state.get("agent_signals") or []:
151        if not isinstance(sig, dict):
152            continue
153        contributors.append({
154            "agent": sig.get("agent", ""),
155            "domain": sig.get("domain", ""),
156            "signal": sig.get("signal", ""),
157            "confidence": float(sig.get("confidence", 0.0) or 0.0),
158            "summary": str(sig.get("summary", ""))[:300],
159        })
160
161    return GenericDecision(
162        arena=arena,
163        arena_id=str(state.get("arena_id") or ""),
164        target=dict(raw_target),
165        target_key=target_key,
166        verdict=verdict,
167        confidence=confidence,
168        reasoning=reasoning,
169        contributors=contributors,
170        meta={
171            "agent_signals_count": len(contributors),
172            "reports": list((state.get("reports") or {}).keys()),
173            "synthesis_score": float(state.get("synthesis_score") or 0.0),
174        },
175    )
176
177
178__all__ = [
179    "DEFAULT_DECISIONS_STREAM",
180    "DecisionOutbox",
181    "GenericDecision",
182    "build_decision_from_state",
183]