1"""Tail execution / outcome streams and feed them back into MAF memory. 2 3Three streams matter for the loop: 4 5 ``maf:actions:out`` — MAF says "consider this trade" (already published) 6 ``maf:executions:out`` — gate / engine says "this is what I did with it" 7 ``maf:outcomes:out`` — engine reports final result later 8 (fill price, close P&L, expiration) 9 10This module subscribes to the latter two. For each event we: 11 121. Decode the envelope (Pydantic-validated where possible, tolerant otherwise). 132. Correlate by ``arena_id`` (set on every TradingAction and echoed on every 14 downstream envelope) so we can find the originating arena run. 153. Append a JSON record to ``data/outcomes/<arena>.jsonl`` so a sidecar 16 process / human review can replay history without Redis. 174. Where ``DecisionMemory`` is provided (mastermind / report_to_action), 18 call ``add_outcome`` so the next decision's reflection pass can find this. 19 20We deliberately keep the harvester *passive*: it never sends orders, never 21modifies positions. The engine owns those side effects; MAF only learns 22from them. 23""" 24 25from __future__ import annotations 26 27import asyncio 28import json 29import logging 30import os 31from datetime import UTC, datetime 32from pathlib import Path 33from typing import Any 34 35from pydantic import BaseModel, ConfigDict, Field, ValidationError 36 37from maf.streaming import get_event_bus 38 39logger = logging.getLogger(__name__) 40 41 42DEFAULT_EXECUTIONS_STREAM = "maf:executions:out" 43DEFAULT_OUTCOMES_STREAM = "maf:outcomes:out" 44 45 46def _utcnow_iso() -> str: 47 return datetime.now(UTC).isoformat() 48 49 50class ExecutionEvent(BaseModel): 51 """Engine-side decision: was this action executed / queued / dropped? 52 53 Mirrors :class:`maf.consumers.action_consumer.ExecutionEnvelope` but 54 only the fields the harvester acts on (other fields are passed through 55 via ``extras``). 56 """ 57 58 model_config = ConfigDict(extra="ignore", frozen=True) 59 60 schema_version: str = "1" 61 arena_id: str 62 action_correlation_id: str = "" 63 ticker: str 64 verdict: str 65 mode_final: str = "" 66 gate_action: str 67 size_fraction: float = 0.0 68 confidence: float = 0.0 69 reason: str = "" 70 consumer: str = "" 71 ts: str = "" 72 extras: dict[str, Any] = Field(default_factory=dict) 73 74 75class OutcomeEvent(BaseModel): 76 """Engine-side fill / close / expiration. Optional — only published 77 when the engine actually executes and later observes a result. 78 79 Schema is intentionally minimal so different engines can publish what 80 they have without forcing a single shape. Required: ``arena_id``, 81 ``ticker``, ``status``. Everything else is opportunistic. 82 """ 83 84 model_config = ConfigDict(extra="ignore", frozen=True) 85 86 schema_version: str = "1" 87 arena_id: str 88 ticker: str 89 status: str # "filled" | "rejected" | "expired" | "closed" 90 side: str = "" # "long" | "short" | "" 91 qty: float = 0.0 92 fill_price: float | None = None 93 pnl: float | None = None 94 pnl_pct: float | None = None 95 note: str = "" 96 ts: str = "" 97 extras: dict[str, Any] = Field(default_factory=dict) 98 99 100class ExecutionHarvester: 101 """Tails the execution + outcome streams. 102 103 Lifecycle: ``await harvester.run()`` blocks; call ``harvester.stop()`` 104 from another task to exit. Each event is appended to a JSONL file 105 keyed by arena name (so the reflection pass can re-read history) and 106 published back to the global event bus as ``action.emit`` echoes so 107 the dashboard's live feed sees the closed loop. 108 """ 109 110 def __init__( 111 self, 112 *, 113 redis_url: str | None = None, 114 executions_stream: str = DEFAULT_EXECUTIONS_STREAM, 115 outcomes_stream: str = DEFAULT_OUTCOMES_STREAM, 116 data_dir: str = "./data", 117 decision_memory: Any = None, 118 risk_gate: Any = None, 119 ) -> None: 120 self.redis_url = redis_url or os.environ.get( 121 "REDIS_URL", "redis://localhost:6379/0", 122 ) 123 self.executions_stream = executions_stream 124 self.outcomes_stream = outcomes_stream 125 self.data_dir = Path(data_dir) 126 self.decision_memory = decision_memory 127 # Optional: shared risk gate. When provided, we propagate exits to 128 # release exposure on close/expiration so the gate stays accurate. 129 self.risk_gate = risk_gate 130 self._redis: Any = None 131 self._stop = asyncio.Event() 132 # Track last-id per stream so we can resume. 133 self._last_ids: dict[str, str] = { 134 executions_stream: "$", 135 outcomes_stream: "$", 136 } 137 138 async def _get_redis(self) -> Any: 139 if self._redis is None: 140 import redis.asyncio as aioredis 141 self._redis = aioredis.from_url(self.redis_url) 142 return self._redis 143 144 async def run(self) -> None: 145 """Tail both streams until :meth:`stop` is called.""" 146 client = await self._get_redis() 147 logger.info( 148 "ExecutionHarvester: tailing %s + %s", 149 self.executions_stream, self.outcomes_stream, 150 ) 151 while not self._stop.is_set(): 152 try: 153 resp = await client.xread( 154 self._last_ids, 155 block=5000, count=50, 156 ) 157 except Exception as exc: 158 logger.warning("ExecutionHarvester: xread failed: %s", exc) 159 await asyncio.sleep(1.0) 160 continue 161 if not resp: 162 continue 163 for stream_bytes, entries in resp: 164 stream = ( 165 stream_bytes.decode() if isinstance(stream_bytes, bytes) 166 else str(stream_bytes) 167 ) 168 for entry_id, fields in entries: 169 sid = ( 170 entry_id.decode() if isinstance(entry_id, bytes) 171 else str(entry_id) 172 ) 173 self._last_ids[stream] = sid 174 try: 175 await self._handle(stream, sid, fields) 176 except Exception: 177 logger.exception( 178 "ExecutionHarvester: handler crashed on %s/%s", 179 stream, sid, 180 ) 181 182 def stop(self) -> None: 183 self._stop.set() 184 185 async def aclose(self) -> None: 186 self._stop.set() 187 if self._redis is None: 188 return 189 try: 190 ac = getattr(self._redis, "aclose", None) 191 if ac: 192 await ac() 193 else: 194 await self._redis.close() 195 except Exception: 196 pass 197 198 # ── Handlers ──────────────────────────────────────────────────────────── 199 200 async def _handle(self, stream: str, stream_id: str, fields: Any) -> None: 201 raw = None 202 if isinstance(fields, dict): 203 raw = fields.get(b"data") or fields.get("data") 204 if isinstance(raw, bytes): 205 raw = raw.decode("utf-8") 206 if not raw: 207 return 208 209 if stream == self.executions_stream: 210 try: 211 evt = ExecutionEvent.model_validate_json(raw) 212 except ValidationError as exc: 213 logger.warning("ExecutionHarvester: malformed exec: %s", exc) 214 return 215 await self._record_execution(evt, stream_id) 216 elif stream == self.outcomes_stream: 217 try: 218 out = OutcomeEvent.model_validate_json(raw) 219 except ValidationError as exc: 220 logger.warning("ExecutionHarvester: malformed outcome: %s", exc) 221 return 222 await self._record_outcome(out, stream_id) 223 else: 224 logger.debug("ExecutionHarvester: unknown stream %s", stream) 225 226 async def _record_execution(self, evt: ExecutionEvent, stream_id: str) -> None: 227 """Append to JSONL + echo to event bus.""" 228 line = evt.model_dump(mode="json") 229 line["_stream_id"] = stream_id 230 line["_kind"] = "execution" 231 self._append_jsonl("executions", line) 232 233 bus = get_event_bus() 234 await bus.publish( 235 "action.emit", # reuse the action.emit kind — live feed treats this as the "closed-loop echo" 236 arena_id=evt.arena_id, 237 correlation_id=evt.action_correlation_id, 238 payload={ 239 "kind": "execution_echo", 240 "ticker": evt.ticker, 241 "verdict": evt.verdict, 242 "gate_action": evt.gate_action, 243 "mode_final": evt.mode_final, 244 "size_fraction": evt.size_fraction, 245 "reason": evt.reason, 246 "consumer": evt.consumer, 247 }, 248 ) 249 250 async def _record_outcome(self, out: OutcomeEvent, stream_id: str) -> None: 251 """Persist outcome and (when memory provided) feed reflection.""" 252 line = out.model_dump(mode="json") 253 line["_stream_id"] = stream_id 254 line["_kind"] = "outcome" 255 self._append_jsonl("outcomes", line) 256 257 # Release exposure when the engine reports the position closed. 258 if self.risk_gate is not None and out.status in ("closed", "expired"): 259 try: 260 self.risk_gate.register_exit(out.ticker) 261 except Exception: 262 logger.exception("ExecutionHarvester: risk_gate.register_exit failed") 263 264 # Feed the mastermind's DecisionMemory so the next decision's 265 # reflection pass can find this outcome. Defensive: the memory 266 # protocol varies per backend, so we call ``add_outcome`` only when 267 # the object exposes it. 268 if self.decision_memory is not None: 269 add = getattr(self.decision_memory, "add_outcome", None) 270 if callable(add): 271 try: 272 add( 273 arena_id=out.arena_id, 274 ticker=out.ticker, 275 status=out.status, 276 pnl=out.pnl, 277 pnl_pct=out.pnl_pct, 278 note=out.note, 279 meta=out.extras, 280 ) 281 except Exception: 282 logger.exception("ExecutionHarvester: add_outcome failed") 283 284 bus = get_event_bus() 285 await bus.publish( 286 "decision.emit", 287 arena_id=out.arena_id, 288 payload={ 289 "kind": "outcome", 290 "ticker": out.ticker, 291 "status": out.status, 292 "pnl": out.pnl, 293 "pnl_pct": out.pnl_pct, 294 "side": out.side, 295 }, 296 ) 297 298 def _append_jsonl(self, subdir: str, record: dict[str, Any]) -> None: 299 """Append a record to ``data/<subdir>/<arena_id>.jsonl``. 300 301 Keyed by arena_id so the reflection pass can collect everything for 302 a given arena run in one cheap file scan. 303 """ 304 arena_id = str(record.get("arena_id") or "unknown") 305 d = self.data_dir / subdir 306 d.mkdir(parents=True, exist_ok=True) 307 path = d / f"{arena_id}.jsonl" 308 try: 309 with path.open("a") as f: 310 f.write(json.dumps(record, default=str) + "\n") 311 except OSError: 312 logger.exception("ExecutionHarvester: write %s failed", path) 313 314 315__all__ = [ 316 "DEFAULT_EXECUTIONS_STREAM", 317 "DEFAULT_OUTCOMES_STREAM", 318 "ExecutionEvent", 319 "ExecutionHarvester", 320 "OutcomeEvent", 321]