checking system…
Docs / back / src/maf/actions/outbox.py · line 88
Python · 216 lines
  1"""Action outbox — publishes MAF trading decisions as executable actions.
  2
  3Schema
  4------
  5:class:`TradingAction` is the typed wire contract. The downstream engine
  6(trtools2) reads this and decides whether to execute, queue for human review,
  7or just log — based on the ``mode`` field:
  8
  9``auto``    Execute immediately (within risk limits).
 10``semi``    Queue + emit a notification; require human ack before execute.
 11``manual``  Log only, surface in dashboard for human review.
 12
 13Why an explicit mode rather than a global setting? Because the right policy
 14varies per ticker / per arena / per market regime. Encoding it in the action
 15lets the user crank a single arena up to ``auto`` without flipping
 16everything else.
 17
 18Wire shape: XADD on ``maf:actions:out`` (or the configured stream) with one
 19``data`` field holding ``action.model_dump_json()``. Mirrors the envelope
 20pattern used by ``trading_intelligence/stream.py``.
 21"""
 22
 23from __future__ import annotations
 24
 25import logging
 26import os
 27import uuid
 28from datetime import UTC, datetime
 29from typing import Any, Literal
 30
 31from pydantic import BaseModel, ConfigDict, Field
 32
 33logger = logging.getLogger(__name__)
 34
 35
 36DEFAULT_ACTIONS_STREAM = "maf:actions:out"
 37
 38
 39ActionMode = Literal["auto", "semi", "manual"]
 40ActionVerdict = Literal["BUY", "HOLD", "SELL"]
 41
 42
 43def _utcnow() -> datetime:
 44    return datetime.now(UTC)
 45
 46
 47class ActionTarget(BaseModel):
 48    model_config = ConfigDict(frozen=True)
 49
 50    ticker: str
 51    exchange: str = ""
 52    asset_class: str = "equity"
 53    trade_date: str = ""
 54
 55
 56class ActionSizing(BaseModel):
 57    """Recommended sizing for the action. Engine may override under risk."""
 58
 59    model_config = ConfigDict(frozen=True)
 60
 61    confidence: float = Field(ge=0.0, le=1.0, default=0.0)
 62    ensemble_score: float = Field(ge=-1.0, le=1.0, default=0.0)
 63    # Position size hint as fraction of available bookkeeping unit (0..1).
 64    # Default 0.0 means "engine, use your default sizing for this confidence".
 65    size_fraction: float = Field(ge=0.0, le=1.0, default=0.0)
 66    horizon: str = ""  # "intraday" | "swing" | "position" | "long_term"
 67
 68
 69class TradingAction(BaseModel):
 70    """An executable trading action MAF wants the engine to consider."""
 71
 72    model_config = ConfigDict(frozen=True)
 73
 74    schema_version: Literal["1"] = "1"
 75    arena: str
 76    arena_id: str = ""
 77    correlation_id: str = Field(default_factory=lambda: uuid.uuid4().hex)
 78    published_at: datetime = Field(default_factory=_utcnow)
 79    target: ActionTarget
 80    verdict: ActionVerdict
 81    mode: ActionMode = "manual"
 82    sizing: ActionSizing = Field(default_factory=ActionSizing)
 83    reasoning: str = ""
 84    # Free-form metadata: model name, agent count, source-metric digest, etc.
 85    meta: dict[str, Any] = Field(default_factory=dict)
 86
 87
 88class ActionOutbox:
 89    """Async publisher for :class:`TradingAction` to a Redis Stream."""
 90
 91    def __init__(
 92        self,
 93        redis_url: str | None = None,
 94        stream: str = DEFAULT_ACTIONS_STREAM,
 95        maxlen: int = 50_000,
 96    ) -> None:
 97        self.redis_url = redis_url or os.environ.get(
 98            "REDIS_URL", "redis://localhost:6379/0",
 99        )
100        self.stream = stream
101        self.maxlen = maxlen
102        self._redis: Any = None
103
104    async def _get_redis(self) -> Any:
105        if self._redis is None:
106            import redis.asyncio as aioredis
107
108            self._redis = aioredis.from_url(self.redis_url)
109        return self._redis
110
111    async def publish(self, action: TradingAction) -> str:
112        """Publish ``action`` and return the assigned stream id."""
113        try:
114            client = await self._get_redis()
115            kwargs: dict[str, Any] = {}
116            if self.maxlen > 0:
117                kwargs["maxlen"] = self.maxlen
118                kwargs["approximate"] = True
119            raw = await client.xadd(
120                self.stream,
121                {"data": action.model_dump_json()},
122                **kwargs,
123            )
124            if isinstance(raw, bytes):
125                return raw.decode("utf-8")
126            return str(raw)
127        except Exception:
128            logger.exception("ActionOutbox.publish failed for %s", action.target.ticker)
129            return ""
130
131    async def aclose(self) -> None:
132        if self._redis is None:
133            return
134        try:
135            aclose = getattr(self._redis, "aclose", None)
136            if aclose is not None:
137                await aclose()
138            else:
139                close = getattr(self._redis, "close", None)
140                if close is not None:
141                    await close()
142        except Exception:
143            pass
144
145
146# ---------------------------------------------------------------------------
147# Build from arena state
148# ---------------------------------------------------------------------------
149
150
151def build_action_from_state(
152    state: dict[str, Any],
153    *,
154    arena: str,
155    mode: ActionMode = "manual",
156    horizon: str = "",
157) -> TradingAction | None:
158    """Build a :class:`TradingAction` from a post-run arena state.
159
160    Returns ``None`` if the state has no usable ticker — the engine has
161    nothing to act on so we skip rather than publishing a malformed action.
162    """
163    raw_target = state.get("target") or {}
164    ticker = str(raw_target.get("ticker") or "")
165    if not ticker:
166        return None
167    verdict_raw = (
168        state.get("synthesis_verdict")
169        or state.get("signal")
170        or "HOLD"
171    )
172    verdict = str(verdict_raw).upper().strip()
173    if verdict not in ("BUY", "HOLD", "SELL"):
174        verdict = "HOLD"
175
176    confidence = float(state.get("synthesis_confidence") or 0.0)
177    ensemble = float(state.get("synthesis_score") or 0.0)
178    # Default size_fraction: scale linearly off confidence, capped at 0.25.
179    size_fraction = min(0.25, max(0.0, confidence * 0.25))
180
181    return TradingAction(
182        arena=arena,
183        arena_id=str(state.get("arena_id") or ""),
184        target=ActionTarget(
185            ticker=ticker,
186            exchange=str(raw_target.get("exchange") or ""),
187            asset_class=str(raw_target.get("asset_class") or "equity"),
188            trade_date=str(raw_target.get("trade_date") or raw_target.get("date") or ""),
189        ),
190        verdict=verdict,  # type: ignore[arg-type]
191        mode=mode,
192        sizing=ActionSizing(
193            confidence=confidence,
194            ensemble_score=ensemble,
195            size_fraction=size_fraction,
196            horizon=horizon,
197        ),
198        reasoning=str(state.get("synthesis_reasoning") or "")[:1000],
199        meta={
200            "agent_signals": len(state.get("agent_signals") or []),
201            "reports": list((state.get("reports") or {}).keys()),
202        },
203    )
204
205
206__all__ = [
207    "ActionMode",
208    "ActionOutbox",
209    "ActionSizing",
210    "ActionTarget",
211    "ActionVerdict",
212    "DEFAULT_ACTIONS_STREAM",
213    "TradingAction",
214    "build_action_from_state",
215]