checking system…
Docs / back / src/maf/arenas/mastermind/memory.py · line 1
Python · 357 lines
  1"""DecisionMemory — typed wrapper around ``HybridMemory`` for ``Decision`` objects.
  2
  3Layered on top of ``maf.memory.hybrid.HybridMemory`` (BM25 + chroma) and a
  4small JSON-line append store on disk. The hybrid memory holds a flat
  5(situation, recommendation) text pair for retrieval; the JSONL store holds
  6the full ``Decision`` JSON keyed by ``decision_id`` so we can hydrate the
  7typed envelope on recall.
  8
  9Why a JSONL sidecar instead of stuffing the whole ``Decision`` JSON into a
 10ChromaDB metadata field?
 11
 12- Chroma metadata values must be primitives, so a nested ``argument_tree``
 13  cannot live there directly. Stringifying defeats the typing contract.
 14- BM25 is BM25; it doesn't carry metadata at all.
 15- The JSONL append-store is trivially auditable (one decision per line) and
 16  is the same shape we will tail in the dashboard during iter-11.
 17
 18ChromaDB graceful degrade
 19-------------------------
 20If ``chromadb`` is not importable, ``HybridMemory.add`` already swallows the
 21exception and BM25 carries the full load. We do **not** require chromadb in
 22unit tests; tests pass with a temp-dir BM25-only configuration.
 23
 24Idempotency
 25-----------
 26Re-adding the same ``decision_id`` is a no-op. We track seen ids in an
 27in-memory set bootstrapped from the JSONL store on init.
 28"""
 29
 30from __future__ import annotations
 31
 32import json
 33import logging
 34from pathlib import Path
 35from typing import Any
 36
 37from maf.arenas.mastermind.schema import Decision
 38from maf.memory.hybrid import HybridMemory
 39
 40logger = logging.getLogger(__name__)
 41
 42# How much of the decision text to use as the "situation" (the indexed/searched
 43# part). Kept around 500 chars per the ticket so BM25 has enough signal but
 44# token budgets stay sane.
 45_SITUATION_MAX_CHARS = 500
 46
 47
 48def _format_lesson_stub(
 49    *,
 50    decision: Decision,
 51    status: str,
 52    ticker: str,
 53    pnl: float | None,
 54    pnl_pct: float | None,
 55) -> str:
 56    """Produce a short pre-reflection lesson string for a fresh outcome.
 57
 58    The ReflectionAgent overwrites this on its next pass with a more
 59    thoughtful "in hindsight" lesson. But this stub already encodes the
 60    most useful pattern — outcome direction vs decision direction — so a
 61    recall in the meantime returns useful evidence.
 62    """
 63    rec = (decision.recommendation or "").upper()
 64    direction_msg = ""
 65    if pnl_pct is not None:
 66        if pnl_pct > 0 and rec.startswith("BUY"):
 67            direction_msg = "decision direction matched outcome"
 68        elif pnl_pct < 0 and rec.startswith("BUY"):
 69            direction_msg = "decision direction opposed outcome"
 70        elif pnl_pct < 0 and rec.startswith("SELL"):
 71            direction_msg = "decision direction matched outcome (short)"
 72        elif pnl_pct > 0 and rec.startswith("SELL"):
 73            direction_msg = "decision direction opposed outcome (short)"
 74    pnl_part = f"pnl={pnl:+.2f}" if pnl is not None else ""
 75    pct_part = f"pnl_pct={pnl_pct*100:+.2f}%" if pnl_pct is not None else ""
 76    summary = ", ".join(p for p in (status, ticker, pnl_part, pct_part) if p)
 77    if direction_msg:
 78        return f"In hindsight: {direction_msg} ({summary}). If we see X again, weight signals similarly."
 79    return f"In hindsight: {summary}. If we see X again, weight signals similarly."[:500]
 80
 81
 82def _build_situation(decision: Decision) -> str:
 83    """Compose the indexable "situation" text from a ``Decision``.
 84
 85    Uses ``question`` + an excerpt of ``reasoning`` so retrieval can match on
 86    either the original framing or the analytical content, and so a recalled
 87    citation summary stays informative on its own.
 88    """
 89    reasoning_excerpt = decision.reasoning[:_SITUATION_MAX_CHARS].rstrip()
 90    return f"Q: {decision.question}\nReasoning: {reasoning_excerpt}".strip()
 91
 92
 93class DecisionMemory:
 94    """Typed memory wrapper that stores and recalls full :class:`Decision`
 95    envelopes.
 96
 97    Storage layers:
 98    - ``HybridMemory`` (BM25 + chroma) for retrieval over question/reasoning.
 99    - JSON-line append file for full ``Decision`` JSON keyed by id.
100
101    The hybrid layer's ``recommendation`` slot doubles as our envelope
102    pointer: we stuff the ``decision_id`` there so a recall result can find
103    the matching JSONL row without having to embed the whole envelope into
104    chroma metadata.
105    """
106
107    def __init__(
108        self,
109        name: str,
110        persist_path: str | None = None,
111        chromadb_path: str = "./data/chromadb",
112    ) -> None:
113        self.name = name
114        base_dir = Path(persist_path) if persist_path else Path(f"./data/decision_memory_{name}")
115        base_dir.mkdir(parents=True, exist_ok=True)
116        self._base_dir = base_dir
117        self._jsonl_path = base_dir / "decisions.jsonl"
118        # The BM25 index sidecar lives alongside the JSONL store.
119        self._bm25_path = base_dir / "bm25.json"
120        self._chromadb_path = chromadb_path
121
122        self._hybrid = HybridMemory(
123            name=name,
124            persist_path=str(self._bm25_path),
125            chromadb_path=chromadb_path,
126        )
127
128        # Bootstrap the seen-id set + in-memory id→Decision cache from disk so
129        # restart is idempotent and ``get(id)`` is O(1).
130        self._cache: dict[str, Decision] = {}
131        self._load_jsonl()
132
133    # ------------------------------------------------------------------
134    # Public API
135    # ------------------------------------------------------------------
136
137    def add(self, decision: Decision) -> None:
138        """Store ``decision``. Re-add of an already-seen id is a no-op."""
139        if decision.decision_id in self._cache:
140            return
141        situation = _build_situation(decision)
142        # We stash the decision_id as the recommendation so recall results
143        # carry an unambiguous pointer back to the JSONL row, regardless of
144        # whether BM25 or chroma served the hit.
145        self._hybrid.add(situation, decision.decision_id)
146        self._append_jsonl(decision)
147        self._cache[decision.decision_id] = decision
148
149    def recall(self, query: str, n: int = 5) -> list[Decision]:
150        """Retrieve up to ``n`` decisions relevant to ``query``.
151
152        Hybrid hits whose ``recommendation`` is not a known ``decision_id``
153        (e.g. a stray entry from before this code shipped) or that fail to
154        deserialize cleanly are skipped with a warning.
155        """
156        return [d for d, _ in self.recall_with_scores(query, n)]
157
158    def recall_with_scores(self, query: str, n: int = 5) -> list[tuple[Decision, float]]:
159        """Same as :meth:`recall` but pairs each Decision with its similarity
160        score in [0, 1]. Results are ordered by descending score.
161
162        BM25 raw scores are non-bounded; we min-max normalise the per-query
163        score vector to [0, 1] so the result can flow straight into a
164        :class:`MemoryCitation.similarity` field without further massaging.
165        """
166        hits = self._hybrid.recall(query, n)
167        if not hits:
168            return []
169
170        raw_scores = [float(h.get("score", 0.0)) for h in hits]
171        max_score = max(raw_scores) if raw_scores else 0.0
172        min_score = min(raw_scores) if raw_scores else 0.0
173        span = max_score - min_score
174
175        out: list[tuple[Decision, float]] = []
176        for hit, raw in zip(hits, raw_scores, strict=True):
177            decision_id = hit.get("recommendation", "")
178            decision = self._cache.get(decision_id)
179            if decision is None:
180                logger.warning(
181                    "DecisionMemory recall: orphan hit (id=%r) — skipping",
182                    decision_id,
183                )
184                continue
185            if span > 0:
186                norm = (raw - min_score) / span
187            else:
188                # All hits tied (common for a 1-element corpus); treat as full match.
189                norm = 1.0 if raw > 0 else 0.0
190            # Clamp defensively in case of float drift.
191            score = max(0.0, min(1.0, norm))
192            out.append((decision, score))
193
194        out.sort(key=lambda pair: pair[1], reverse=True)
195        return out
196
197    def get(self, decision_id: str) -> Decision | None:
198        """Return the stored :class:`Decision` for ``decision_id`` or ``None``."""
199        return self._cache.get(decision_id)
200
201    def find_by_arena_id(self, arena_id: str) -> Decision | None:
202        """Return the stored :class:`Decision` whose ``arena_id`` matches.
203
204        Falls back to ``meta.arena_id`` for legacy decisions written before
205        the dedicated field existed. Returns ``None`` when nothing matches —
206        the caller (typically the execution harvester) treats that as
207        "decision wasn't produced by mastermind; nothing to update".
208        """
209        if not arena_id:
210            return None
211        for d in self._cache.values():
212            if d.arena_id == arena_id:
213                return d
214            if d.meta.get("arena_id") == arena_id:
215                return d
216        return None
217
218    def add_outcome(
219        self,
220        *,
221        arena_id: str,
222        ticker: str = "",
223        status: str = "",
224        pnl: float | None = None,
225        pnl_pct: float | None = None,
226        note: str = "",
227        meta: dict[str, Any] | None = None,
228    ) -> Decision | None:
229        """Persist an execution outcome onto the originating decision.
230
231        This is the contract :class:`maf.outcomes.harvester.ExecutionHarvester`
232        calls into when it sees an ``OutcomeEvent`` on ``maf:outcomes:out``.
233        We correlate by ``arena_id`` (set on every TradingAction, echoed on
234        every OutcomeEvent), build a structured outcome dict, and call
235        :meth:`update_outcome`. A short auto-generated lesson stub gives
236        :class:`ReflectionAgent` something to refine on its next pass.
237
238        Returns the updated Decision, or ``None`` when there's no matching
239        Decision (e.g. the outcome came from a non-mastermind arena).
240        """
241        decision = self.find_by_arena_id(arena_id)
242        if decision is None:
243            logger.debug(
244                "DecisionMemory.add_outcome: no decision for arena_id=%r — skipping",
245                arena_id,
246            )
247            return None
248
249        outcome: dict[str, Any] = {
250            "arena_id": arena_id,
251            "ticker": ticker,
252            "status": status,
253        }
254        if pnl is not None:
255            outcome["pnl"] = float(pnl)
256        if pnl_pct is not None:
257            outcome["pnl_pct"] = float(pnl_pct)
258        if note:
259            outcome["note"] = note
260        if meta:
261            outcome["meta"] = dict(meta)
262
263        # Auto-generated lesson stub. ReflectionAgent's deeper pass can
264        # rewrite this — but storing *something* gives the next decision's
265        # recall pass a hit even before reflection runs.
266        lesson_stub = _format_lesson_stub(
267            decision=decision, status=status, ticker=ticker,
268            pnl=pnl, pnl_pct=pnl_pct,
269        )
270
271        return self.update_outcome(
272            decision_id=decision.decision_id,
273            outcome=outcome,
274            lesson=lesson_stub,
275        )
276
277    def update_outcome(
278        self,
279        decision_id: str,
280        outcome: dict[str, Any],
281        lesson: str | None = None,
282    ) -> Decision:
283        """Backfill ``outcome`` and (optionally) ``lesson`` on a stored decision.
284
285        Returns a *new* :class:`Decision` (the schema is frozen, so this uses
286        ``model_copy(update=...)``). Persists the updated envelope by
287        rewriting the JSONL row and refreshing the cache.
288
289        If a ``lesson`` is provided, it is also added as a *separate* hybrid
290        memory entry under the lesson text so that future recalls find the
291        decision under the lesson's wording too. The new entry shares the
292        same ``decision_id`` (i.e. the recommendation slot) — recall
293        deduplicates on id, so there's no double-counting.
294        """
295        existing = self._cache.get(decision_id)
296        if existing is None:
297            raise KeyError(f"Unknown decision_id: {decision_id!r}")
298
299        update: dict[str, Any] = {"outcome": outcome}
300        if lesson is not None:
301            update["lesson"] = lesson
302        updated = existing.model_copy(update=update)
303
304        self._cache[decision_id] = updated
305        self._rewrite_jsonl()
306
307        if lesson is not None:
308            self._hybrid.add(f"Lesson: {lesson}", decision_id)
309
310        return updated
311
312    # ------------------------------------------------------------------
313    # JSONL persistence helpers
314    # ------------------------------------------------------------------
315
316    def _append_jsonl(self, decision: Decision) -> None:
317        """Append a single decision to the JSONL store."""
318        line = json.dumps(decision.model_dump(mode="json"), separators=(",", ":"))
319        with self._jsonl_path.open("a", encoding="utf-8") as fh:
320            fh.write(line + "\n")
321
322    def _rewrite_jsonl(self) -> None:
323        """Rewrite the JSONL store from the in-memory cache.
324
325        Used when an existing decision is updated (frozen schema means we
326        produce a new envelope; the old line is stale). Acceptable cost for
327        the iter-6 slice — mastermind is not high-throughput; we rewrite at
328        most once per reflection pass.
329        """
330        tmp = self._jsonl_path.with_suffix(".jsonl.tmp")
331        with tmp.open("w", encoding="utf-8") as fh:
332            for decision in self._cache.values():
333                fh.write(json.dumps(decision.model_dump(mode="json"), separators=(",", ":")))
334                fh.write("\n")
335        tmp.replace(self._jsonl_path)
336
337    def _load_jsonl(self) -> None:
338        """Hydrate the in-memory cache from the JSONL store on startup."""
339        if not self._jsonl_path.exists():
340            return
341        with self._jsonl_path.open("r", encoding="utf-8") as fh:
342            for line_no, raw in enumerate(fh, start=1):
343                raw = raw.strip()
344                if not raw:
345                    continue
346                try:
347                    payload = json.loads(raw)
348                    decision = Decision.model_validate(payload)
349                except Exception:
350                    logger.warning(
351                        "DecisionMemory load: skipping malformed line %d in %s",
352                        line_no,
353                        self._jsonl_path,
354                    )
355                    continue
356                self._cache[decision.decision_id] = decision