1"""Action outbox — publishes MAF trading decisions as executable actions. 2 3Schema 4------ 5:class:`TradingAction` is the typed wire contract. The downstream engine 6(trtools2) reads this and decides whether to execute, queue for human review, 7or just log — based on the ``mode`` field: 8 9``auto`` Execute immediately (within risk limits). 10``semi`` Queue + emit a notification; require human ack before execute. 11``manual`` Log only, surface in dashboard for human review. 12 13Why an explicit mode rather than a global setting? Because the right policy 14varies per ticker / per arena / per market regime. Encoding it in the action 15lets the user crank a single arena up to ``auto`` without flipping 16everything else. 17 18Wire shape: XADD on ``maf:actions:out`` (or the configured stream) with one 19``data`` field holding ``action.model_dump_json()``. Mirrors the envelope 20pattern used by ``trading_intelligence/stream.py``. 21""" 22 23from __future__ import annotations 24 25import logging 26import os 27import uuid 28from datetime import UTC, datetime 29from typing import Any, Literal 30 31from pydantic import BaseModel, ConfigDict, Field 32 33logger = logging.getLogger(__name__) 34 35 36DEFAULT_ACTIONS_STREAM = "maf:actions:out" 37 38 39ActionMode = Literal["auto", "semi", "manual"] 40ActionVerdict = Literal["BUY", "HOLD", "SELL"] 41 42 43def _utcnow() -> datetime: 44 return datetime.now(UTC) 45 46 47class ActionTarget(BaseModel): 48 model_config = ConfigDict(frozen=True) 49 50 ticker: str 51 exchange: str = "" 52 asset_class: str = "equity" 53 trade_date: str = "" 54 55 56class ActionSizing(BaseModel): 57 """Recommended sizing for the action. Engine may override under risk.""" 58 59 model_config = ConfigDict(frozen=True) 60 61 confidence: float = Field(ge=0.0, le=1.0, default=0.0) 62 ensemble_score: float = Field(ge=-1.0, le=1.0, default=0.0) 63 # Position size hint as fraction of available bookkeeping unit (0..1). 64 # Default 0.0 means "engine, use your default sizing for this confidence". 65 size_fraction: float = Field(ge=0.0, le=1.0, default=0.0) 66 horizon: str = "" # "intraday" | "swing" | "position" | "long_term" 67 68 69class TradingAction(BaseModel): 70 """An executable trading action MAF wants the engine to consider.""" 71 72 model_config = ConfigDict(frozen=True) 73 74 schema_version: Literal["1"] = "1" 75 arena: str 76 arena_id: str = "" 77 correlation_id: str = Field(default_factory=lambda: uuid.uuid4().hex) 78 published_at: datetime = Field(default_factory=_utcnow) 79 target: ActionTarget 80 verdict: ActionVerdict 81 mode: ActionMode = "manual" 82 sizing: ActionSizing = Field(default_factory=ActionSizing) 83 reasoning: str = "" 84 # Free-form metadata: model name, agent count, source-metric digest, etc. 85 meta: dict[str, Any] = Field(default_factory=dict) 86 87 88class ActionOutbox: 89 """Async publisher for :class:`TradingAction` to a Redis Stream.""" 90 91 def __init__( 92 self, 93 redis_url: str | None = None, 94 stream: str = DEFAULT_ACTIONS_STREAM, 95 maxlen: int = 50_000, 96 ) -> None: 97 self.redis_url = redis_url or os.environ.get( 98 "REDIS_URL", "redis://localhost:6379/0", 99 ) 100 self.stream = stream 101 self.maxlen = maxlen 102 self._redis: Any = None 103 104 async def _get_redis(self) -> Any: 105 if self._redis is None: 106 import redis.asyncio as aioredis 107 108 self._redis = aioredis.from_url(self.redis_url) 109 return self._redis 110 111 async def publish(self, action: TradingAction) -> str: 112 """Publish ``action`` and return the assigned stream id.""" 113 try: 114 client = await self._get_redis() 115 kwargs: dict[str, Any] = {} 116 if self.maxlen > 0: 117 kwargs["maxlen"] = self.maxlen 118 kwargs["approximate"] = True 119 raw = await client.xadd( 120 self.stream, 121 {"data": action.model_dump_json()}, 122 **kwargs, 123 ) 124 if isinstance(raw, bytes): 125 return raw.decode("utf-8") 126 return str(raw) 127 except Exception: 128 logger.exception("ActionOutbox.publish failed for %s", action.target.ticker) 129 return "" 130 131 async def aclose(self) -> None: 132 if self._redis is None: 133 return 134 try: 135 aclose = getattr(self._redis, "aclose", None) 136 if aclose is not None: 137 await aclose() 138 else: 139 close = getattr(self._redis, "close", None) 140 if close is not None: 141 await close() 142 except Exception: 143 pass 144 145 146# --------------------------------------------------------------------------- 147# Build from arena state 148# --------------------------------------------------------------------------- 149 150 151def build_action_from_state( 152 state: dict[str, Any], 153 *, 154 arena: str, 155 mode: ActionMode = "manual", 156 horizon: str = "", 157) -> TradingAction | None: 158 """Build a :class:`TradingAction` from a post-run arena state. 159 160 Returns ``None`` if the state has no usable ticker — the engine has 161 nothing to act on so we skip rather than publishing a malformed action. 162 """ 163 raw_target = state.get("target") or {} 164 ticker = str(raw_target.get("ticker") or "") 165 if not ticker: 166 return None 167 verdict_raw = ( 168 state.get("synthesis_verdict") 169 or state.get("signal") 170 or "HOLD" 171 ) 172 verdict = str(verdict_raw).upper().strip() 173 if verdict not in ("BUY", "HOLD", "SELL"): 174 verdict = "HOLD" 175 176 confidence = float(state.get("synthesis_confidence") or 0.0) 177 ensemble = float(state.get("synthesis_score") or 0.0) 178 # Default size_fraction: scale linearly off confidence, capped at 0.25. 179 size_fraction = min(0.25, max(0.0, confidence * 0.25)) 180 181 return TradingAction( 182 arena=arena, 183 arena_id=str(state.get("arena_id") or ""), 184 target=ActionTarget( 185 ticker=ticker, 186 exchange=str(raw_target.get("exchange") or ""), 187 asset_class=str(raw_target.get("asset_class") or "equity"), 188 trade_date=str(raw_target.get("trade_date") or raw_target.get("date") or ""), 189 ), 190 verdict=verdict, # type: ignore[arg-type] 191 mode=mode, 192 sizing=ActionSizing( 193 confidence=confidence, 194 ensemble_score=ensemble, 195 size_fraction=size_fraction, 196 horizon=horizon, 197 ), 198 reasoning=str(state.get("synthesis_reasoning") or "")[:1000], 199 meta={ 200 "agent_signals": len(state.get("agent_signals") or []), 201 "reports": list((state.get("reports") or {}).keys()), 202 }, 203 ) 204 205 206__all__ = [ 207 "ActionMode", 208 "ActionOutbox", 209 "ActionSizing", 210 "ActionTarget", 211 "ActionVerdict", 212 "DEFAULT_ACTIONS_STREAM", 213 "TradingAction", 214 "build_action_from_state", 215]