checking system…
Docs / back / src/maf/outcomes/harvester.py · line 100
Python · 322 lines
  1"""Tail execution / outcome streams and feed them back into MAF memory.
  2
  3Three streams matter for the loop:
  4
  5  ``maf:actions:out``     — MAF says "consider this trade" (already published)
  6  ``maf:executions:out``  — gate / engine says "this is what I did with it"
  7  ``maf:outcomes:out``    — engine reports final result later
  8                            (fill price, close P&L, expiration)
  9
 10This module subscribes to the latter two. For each event we:
 11
 121. Decode the envelope (Pydantic-validated where possible, tolerant otherwise).
 132. Correlate by ``arena_id`` (set on every TradingAction and echoed on every
 14   downstream envelope) so we can find the originating arena run.
 153. Append a JSON record to ``data/outcomes/<arena>.jsonl`` so a sidecar
 16   process / human review can replay history without Redis.
 174. Where ``DecisionMemory`` is provided (mastermind / report_to_action),
 18   call ``add_outcome`` so the next decision's reflection pass can find this.
 19
 20We deliberately keep the harvester *passive*: it never sends orders, never
 21modifies positions. The engine owns those side effects; MAF only learns
 22from them.
 23"""
 24
 25from __future__ import annotations
 26
 27import asyncio
 28import json
 29import logging
 30import os
 31from datetime import UTC, datetime
 32from pathlib import Path
 33from typing import Any
 34
 35from pydantic import BaseModel, ConfigDict, Field, ValidationError
 36
 37from maf.streaming import get_event_bus
 38
 39logger = logging.getLogger(__name__)
 40
 41
 42DEFAULT_EXECUTIONS_STREAM = "maf:executions:out"
 43DEFAULT_OUTCOMES_STREAM = "maf:outcomes:out"
 44
 45
 46def _utcnow_iso() -> str:
 47    return datetime.now(UTC).isoformat()
 48
 49
 50class ExecutionEvent(BaseModel):
 51    """Engine-side decision: was this action executed / queued / dropped?
 52
 53    Mirrors :class:`maf.consumers.action_consumer.ExecutionEnvelope` but
 54    only the fields the harvester acts on (other fields are passed through
 55    via ``extras``).
 56    """
 57
 58    model_config = ConfigDict(extra="ignore", frozen=True)
 59
 60    schema_version: str = "1"
 61    arena_id: str
 62    action_correlation_id: str = ""
 63    ticker: str
 64    verdict: str
 65    mode_final: str = ""
 66    gate_action: str
 67    size_fraction: float = 0.0
 68    confidence: float = 0.0
 69    reason: str = ""
 70    consumer: str = ""
 71    ts: str = ""
 72    extras: dict[str, Any] = Field(default_factory=dict)
 73
 74
 75class OutcomeEvent(BaseModel):
 76    """Engine-side fill / close / expiration. Optional — only published
 77    when the engine actually executes and later observes a result.
 78
 79    Schema is intentionally minimal so different engines can publish what
 80    they have without forcing a single shape. Required: ``arena_id``,
 81    ``ticker``, ``status``. Everything else is opportunistic.
 82    """
 83
 84    model_config = ConfigDict(extra="ignore", frozen=True)
 85
 86    schema_version: str = "1"
 87    arena_id: str
 88    ticker: str
 89    status: str           # "filled" | "rejected" | "expired" | "closed"
 90    side: str = ""        # "long" | "short" | ""
 91    qty: float = 0.0
 92    fill_price: float | None = None
 93    pnl: float | None = None
 94    pnl_pct: float | None = None
 95    note: str = ""
 96    ts: str = ""
 97    extras: dict[str, Any] = Field(default_factory=dict)
 98
 99
100class ExecutionHarvester:
101    """Tails the execution + outcome streams.
102
103    Lifecycle: ``await harvester.run()`` blocks; call ``harvester.stop()``
104    from another task to exit. Each event is appended to a JSONL file
105    keyed by arena name (so the reflection pass can re-read history) and
106    published back to the global event bus as ``action.emit`` echoes so
107    the dashboard's live feed sees the closed loop.
108    """
109
110    def __init__(
111        self,
112        *,
113        redis_url: str | None = None,
114        executions_stream: str = DEFAULT_EXECUTIONS_STREAM,
115        outcomes_stream: str = DEFAULT_OUTCOMES_STREAM,
116        data_dir: str = "./data",
117        decision_memory: Any = None,
118        risk_gate: Any = None,
119    ) -> None:
120        self.redis_url = redis_url or os.environ.get(
121            "REDIS_URL", "redis://localhost:6379/0",
122        )
123        self.executions_stream = executions_stream
124        self.outcomes_stream = outcomes_stream
125        self.data_dir = Path(data_dir)
126        self.decision_memory = decision_memory
127        # Optional: shared risk gate. When provided, we propagate exits to
128        # release exposure on close/expiration so the gate stays accurate.
129        self.risk_gate = risk_gate
130        self._redis: Any = None
131        self._stop = asyncio.Event()
132        # Track last-id per stream so we can resume.
133        self._last_ids: dict[str, str] = {
134            executions_stream: "$",
135            outcomes_stream: "$",
136        }
137
138    async def _get_redis(self) -> Any:
139        if self._redis is None:
140            import redis.asyncio as aioredis
141            self._redis = aioredis.from_url(self.redis_url)
142        return self._redis
143
144    async def run(self) -> None:
145        """Tail both streams until :meth:`stop` is called."""
146        client = await self._get_redis()
147        logger.info(
148            "ExecutionHarvester: tailing %s + %s",
149            self.executions_stream, self.outcomes_stream,
150        )
151        while not self._stop.is_set():
152            try:
153                resp = await client.xread(
154                    self._last_ids,
155                    block=5000, count=50,
156                )
157            except Exception as exc:
158                logger.warning("ExecutionHarvester: xread failed: %s", exc)
159                await asyncio.sleep(1.0)
160                continue
161            if not resp:
162                continue
163            for stream_bytes, entries in resp:
164                stream = (
165                    stream_bytes.decode() if isinstance(stream_bytes, bytes)
166                    else str(stream_bytes)
167                )
168                for entry_id, fields in entries:
169                    sid = (
170                        entry_id.decode() if isinstance(entry_id, bytes)
171                        else str(entry_id)
172                    )
173                    self._last_ids[stream] = sid
174                    try:
175                        await self._handle(stream, sid, fields)
176                    except Exception:
177                        logger.exception(
178                            "ExecutionHarvester: handler crashed on %s/%s",
179                            stream, sid,
180                        )
181
182    def stop(self) -> None:
183        self._stop.set()
184
185    async def aclose(self) -> None:
186        self._stop.set()
187        if self._redis is None:
188            return
189        try:
190            ac = getattr(self._redis, "aclose", None)
191            if ac:
192                await ac()
193            else:
194                await self._redis.close()
195        except Exception:
196            pass
197
198    # ── Handlers ────────────────────────────────────────────────────────────
199
200    async def _handle(self, stream: str, stream_id: str, fields: Any) -> None:
201        raw = None
202        if isinstance(fields, dict):
203            raw = fields.get(b"data") or fields.get("data")
204        if isinstance(raw, bytes):
205            raw = raw.decode("utf-8")
206        if not raw:
207            return
208
209        if stream == self.executions_stream:
210            try:
211                evt = ExecutionEvent.model_validate_json(raw)
212            except ValidationError as exc:
213                logger.warning("ExecutionHarvester: malformed exec: %s", exc)
214                return
215            await self._record_execution(evt, stream_id)
216        elif stream == self.outcomes_stream:
217            try:
218                out = OutcomeEvent.model_validate_json(raw)
219            except ValidationError as exc:
220                logger.warning("ExecutionHarvester: malformed outcome: %s", exc)
221                return
222            await self._record_outcome(out, stream_id)
223        else:
224            logger.debug("ExecutionHarvester: unknown stream %s", stream)
225
226    async def _record_execution(self, evt: ExecutionEvent, stream_id: str) -> None:
227        """Append to JSONL + echo to event bus."""
228        line = evt.model_dump(mode="json")
229        line["_stream_id"] = stream_id
230        line["_kind"] = "execution"
231        self._append_jsonl("executions", line)
232
233        bus = get_event_bus()
234        await bus.publish(
235            "action.emit",  # reuse the action.emit kind — live feed treats this as the "closed-loop echo"
236            arena_id=evt.arena_id,
237            correlation_id=evt.action_correlation_id,
238            payload={
239                "kind": "execution_echo",
240                "ticker": evt.ticker,
241                "verdict": evt.verdict,
242                "gate_action": evt.gate_action,
243                "mode_final": evt.mode_final,
244                "size_fraction": evt.size_fraction,
245                "reason": evt.reason,
246                "consumer": evt.consumer,
247            },
248        )
249
250    async def _record_outcome(self, out: OutcomeEvent, stream_id: str) -> None:
251        """Persist outcome and (when memory provided) feed reflection."""
252        line = out.model_dump(mode="json")
253        line["_stream_id"] = stream_id
254        line["_kind"] = "outcome"
255        self._append_jsonl("outcomes", line)
256
257        # Release exposure when the engine reports the position closed.
258        if self.risk_gate is not None and out.status in ("closed", "expired"):
259            try:
260                self.risk_gate.register_exit(out.ticker)
261            except Exception:
262                logger.exception("ExecutionHarvester: risk_gate.register_exit failed")
263
264        # Feed the mastermind's DecisionMemory so the next decision's
265        # reflection pass can find this outcome. Defensive: the memory
266        # protocol varies per backend, so we call ``add_outcome`` only when
267        # the object exposes it.
268        if self.decision_memory is not None:
269            add = getattr(self.decision_memory, "add_outcome", None)
270            if callable(add):
271                try:
272                    add(
273                        arena_id=out.arena_id,
274                        ticker=out.ticker,
275                        status=out.status,
276                        pnl=out.pnl,
277                        pnl_pct=out.pnl_pct,
278                        note=out.note,
279                        meta=out.extras,
280                    )
281                except Exception:
282                    logger.exception("ExecutionHarvester: add_outcome failed")
283
284        bus = get_event_bus()
285        await bus.publish(
286            "decision.emit",
287            arena_id=out.arena_id,
288            payload={
289                "kind": "outcome",
290                "ticker": out.ticker,
291                "status": out.status,
292                "pnl": out.pnl,
293                "pnl_pct": out.pnl_pct,
294                "side": out.side,
295            },
296        )
297
298    def _append_jsonl(self, subdir: str, record: dict[str, Any]) -> None:
299        """Append a record to ``data/<subdir>/<arena_id>.jsonl``.
300
301        Keyed by arena_id so the reflection pass can collect everything for
302        a given arena run in one cheap file scan.
303        """
304        arena_id = str(record.get("arena_id") or "unknown")
305        d = self.data_dir / subdir
306        d.mkdir(parents=True, exist_ok=True)
307        path = d / f"{arena_id}.jsonl"
308        try:
309            with path.open("a") as f:
310                f.write(json.dumps(record, default=str) + "\n")
311        except OSError:
312            logger.exception("ExecutionHarvester: write %s failed", path)
313
314
315__all__ = [
316    "DEFAULT_EXECUTIONS_STREAM",
317    "DEFAULT_OUTCOMES_STREAM",
318    "ExecutionEvent",
319    "ExecutionHarvester",
320    "OutcomeEvent",
321]