1"""DecisionMemory — typed wrapper around ``HybridMemory`` for ``Decision`` objects. 2 3Layered on top of ``maf.memory.hybrid.HybridMemory`` (BM25 + chroma) and a 4small JSON-line append store on disk. The hybrid memory holds a flat 5(situation, recommendation) text pair for retrieval; the JSONL store holds 6the full ``Decision`` JSON keyed by ``decision_id`` so we can hydrate the 7typed envelope on recall. 8 9Why a JSONL sidecar instead of stuffing the whole ``Decision`` JSON into a 10ChromaDB metadata field? 11 12- Chroma metadata values must be primitives, so a nested ``argument_tree`` 13 cannot live there directly. Stringifying defeats the typing contract. 14- BM25 is BM25; it doesn't carry metadata at all. 15- The JSONL append-store is trivially auditable (one decision per line) and 16 is the same shape we will tail in the dashboard during iter-11. 17 18ChromaDB graceful degrade 19------------------------- 20If ``chromadb`` is not importable, ``HybridMemory.add`` already swallows the 21exception and BM25 carries the full load. We do **not** require chromadb in 22unit tests; tests pass with a temp-dir BM25-only configuration. 23 24Idempotency 25----------- 26Re-adding the same ``decision_id`` is a no-op. We track seen ids in an 27in-memory set bootstrapped from the JSONL store on init. 28""" 29 30from __future__ import annotations 31 32import json 33import logging 34from pathlib import Path 35from typing import Any 36 37from maf.arenas.mastermind.schema import Decision 38from maf.memory.hybrid import HybridMemory 39 40logger = logging.getLogger(__name__) 41 42# How much of the decision text to use as the "situation" (the indexed/searched 43# part). Kept around 500 chars per the ticket so BM25 has enough signal but 44# token budgets stay sane. 45_SITUATION_MAX_CHARS = 500 46 47 48def _format_lesson_stub( 49 *, 50 decision: Decision, 51 status: str, 52 ticker: str, 53 pnl: float | None, 54 pnl_pct: float | None, 55) -> str: 56 """Produce a short pre-reflection lesson string for a fresh outcome. 57 58 The ReflectionAgent overwrites this on its next pass with a more 59 thoughtful "in hindsight" lesson. But this stub already encodes the 60 most useful pattern — outcome direction vs decision direction — so a 61 recall in the meantime returns useful evidence. 62 """ 63 rec = (decision.recommendation or "").upper() 64 direction_msg = "" 65 if pnl_pct is not None: 66 if pnl_pct > 0 and rec.startswith("BUY"): 67 direction_msg = "decision direction matched outcome" 68 elif pnl_pct < 0 and rec.startswith("BUY"): 69 direction_msg = "decision direction opposed outcome" 70 elif pnl_pct < 0 and rec.startswith("SELL"): 71 direction_msg = "decision direction matched outcome (short)" 72 elif pnl_pct > 0 and rec.startswith("SELL"): 73 direction_msg = "decision direction opposed outcome (short)" 74 pnl_part = f"pnl={pnl:+.2f}" if pnl is not None else "" 75 pct_part = f"pnl_pct={pnl_pct*100:+.2f}%" if pnl_pct is not None else "" 76 summary = ", ".join(p for p in (status, ticker, pnl_part, pct_part) if p) 77 if direction_msg: 78 return f"In hindsight: {direction_msg} ({summary}). If we see X again, weight signals similarly." 79 return f"In hindsight: {summary}. If we see X again, weight signals similarly."[:500] 80 81 82def _build_situation(decision: Decision) -> str: 83 """Compose the indexable "situation" text from a ``Decision``. 84 85 Uses ``question`` + an excerpt of ``reasoning`` so retrieval can match on 86 either the original framing or the analytical content, and so a recalled 87 citation summary stays informative on its own. 88 """ 89 reasoning_excerpt = decision.reasoning[:_SITUATION_MAX_CHARS].rstrip() 90 return f"Q: {decision.question}\nReasoning: {reasoning_excerpt}".strip() 91 92 93class DecisionMemory: 94 """Typed memory wrapper that stores and recalls full :class:`Decision` 95 envelopes. 96 97 Storage layers: 98 - ``HybridMemory`` (BM25 + chroma) for retrieval over question/reasoning. 99 - JSON-line append file for full ``Decision`` JSON keyed by id. 100 101 The hybrid layer's ``recommendation`` slot doubles as our envelope 102 pointer: we stuff the ``decision_id`` there so a recall result can find 103 the matching JSONL row without having to embed the whole envelope into 104 chroma metadata. 105 """ 106 107 def __init__( 108 self, 109 name: str, 110 persist_path: str | None = None, 111 chromadb_path: str = "./data/chromadb", 112 ) -> None: 113 self.name = name 114 base_dir = Path(persist_path) if persist_path else Path(f"./data/decision_memory_{name}") 115 base_dir.mkdir(parents=True, exist_ok=True) 116 self._base_dir = base_dir 117 self._jsonl_path = base_dir / "decisions.jsonl" 118 # The BM25 index sidecar lives alongside the JSONL store. 119 self._bm25_path = base_dir / "bm25.json" 120 self._chromadb_path = chromadb_path 121 122 self._hybrid = HybridMemory( 123 name=name, 124 persist_path=str(self._bm25_path), 125 chromadb_path=chromadb_path, 126 ) 127 128 # Bootstrap the seen-id set + in-memory id→Decision cache from disk so 129 # restart is idempotent and ``get(id)`` is O(1). 130 self._cache: dict[str, Decision] = {} 131 self._load_jsonl() 132 133 # ------------------------------------------------------------------ 134 # Public API 135 # ------------------------------------------------------------------ 136 137 def add(self, decision: Decision) -> None: 138 """Store ``decision``. Re-add of an already-seen id is a no-op.""" 139 if decision.decision_id in self._cache: 140 return 141 situation = _build_situation(decision) 142 # We stash the decision_id as the recommendation so recall results 143 # carry an unambiguous pointer back to the JSONL row, regardless of 144 # whether BM25 or chroma served the hit. 145 self._hybrid.add(situation, decision.decision_id) 146 self._append_jsonl(decision) 147 self._cache[decision.decision_id] = decision 148 149 def recall(self, query: str, n: int = 5) -> list[Decision]: 150 """Retrieve up to ``n`` decisions relevant to ``query``. 151 152 Hybrid hits whose ``recommendation`` is not a known ``decision_id`` 153 (e.g. a stray entry from before this code shipped) or that fail to 154 deserialize cleanly are skipped with a warning. 155 """ 156 return [d for d, _ in self.recall_with_scores(query, n)] 157 158 def recall_with_scores(self, query: str, n: int = 5) -> list[tuple[Decision, float]]: 159 """Same as :meth:`recall` but pairs each Decision with its similarity 160 score in [0, 1]. Results are ordered by descending score. 161 162 BM25 raw scores are non-bounded; we min-max normalise the per-query 163 score vector to [0, 1] so the result can flow straight into a 164 :class:`MemoryCitation.similarity` field without further massaging. 165 """ 166 hits = self._hybrid.recall(query, n) 167 if not hits: 168 return [] 169 170 raw_scores = [float(h.get("score", 0.0)) for h in hits] 171 max_score = max(raw_scores) if raw_scores else 0.0 172 min_score = min(raw_scores) if raw_scores else 0.0 173 span = max_score - min_score 174 175 out: list[tuple[Decision, float]] = [] 176 for hit, raw in zip(hits, raw_scores, strict=True): 177 decision_id = hit.get("recommendation", "") 178 decision = self._cache.get(decision_id) 179 if decision is None: 180 logger.warning( 181 "DecisionMemory recall: orphan hit (id=%r) — skipping", 182 decision_id, 183 ) 184 continue 185 if span > 0: 186 norm = (raw - min_score) / span 187 else: 188 # All hits tied (common for a 1-element corpus); treat as full match. 189 norm = 1.0 if raw > 0 else 0.0 190 # Clamp defensively in case of float drift. 191 score = max(0.0, min(1.0, norm)) 192 out.append((decision, score)) 193 194 out.sort(key=lambda pair: pair[1], reverse=True) 195 return out 196 197 def get(self, decision_id: str) -> Decision | None: 198 """Return the stored :class:`Decision` for ``decision_id`` or ``None``.""" 199 return self._cache.get(decision_id) 200 201 def find_by_arena_id(self, arena_id: str) -> Decision | None: 202 """Return the stored :class:`Decision` whose ``arena_id`` matches. 203 204 Falls back to ``meta.arena_id`` for legacy decisions written before 205 the dedicated field existed. Returns ``None`` when nothing matches — 206 the caller (typically the execution harvester) treats that as 207 "decision wasn't produced by mastermind; nothing to update". 208 """ 209 if not arena_id: 210 return None 211 for d in self._cache.values(): 212 if d.arena_id == arena_id: 213 return d 214 if d.meta.get("arena_id") == arena_id: 215 return d 216 return None 217 218 def add_outcome( 219 self, 220 *, 221 arena_id: str, 222 ticker: str = "", 223 status: str = "", 224 pnl: float | None = None, 225 pnl_pct: float | None = None, 226 note: str = "", 227 meta: dict[str, Any] | None = None, 228 ) -> Decision | None: 229 """Persist an execution outcome onto the originating decision. 230 231 This is the contract :class:`maf.outcomes.harvester.ExecutionHarvester` 232 calls into when it sees an ``OutcomeEvent`` on ``maf:outcomes:out``. 233 We correlate by ``arena_id`` (set on every TradingAction, echoed on 234 every OutcomeEvent), build a structured outcome dict, and call 235 :meth:`update_outcome`. A short auto-generated lesson stub gives 236 :class:`ReflectionAgent` something to refine on its next pass. 237 238 Returns the updated Decision, or ``None`` when there's no matching 239 Decision (e.g. the outcome came from a non-mastermind arena). 240 """ 241 decision = self.find_by_arena_id(arena_id) 242 if decision is None: 243 logger.debug( 244 "DecisionMemory.add_outcome: no decision for arena_id=%r — skipping", 245 arena_id, 246 ) 247 return None 248 249 outcome: dict[str, Any] = { 250 "arena_id": arena_id, 251 "ticker": ticker, 252 "status": status, 253 } 254 if pnl is not None: 255 outcome["pnl"] = float(pnl) 256 if pnl_pct is not None: 257 outcome["pnl_pct"] = float(pnl_pct) 258 if note: 259 outcome["note"] = note 260 if meta: 261 outcome["meta"] = dict(meta) 262 263 # Auto-generated lesson stub. ReflectionAgent's deeper pass can 264 # rewrite this — but storing *something* gives the next decision's 265 # recall pass a hit even before reflection runs. 266 lesson_stub = _format_lesson_stub( 267 decision=decision, status=status, ticker=ticker, 268 pnl=pnl, pnl_pct=pnl_pct, 269 ) 270 271 return self.update_outcome( 272 decision_id=decision.decision_id, 273 outcome=outcome, 274 lesson=lesson_stub, 275 ) 276 277 def update_outcome( 278 self, 279 decision_id: str, 280 outcome: dict[str, Any], 281 lesson: str | None = None, 282 ) -> Decision: 283 """Backfill ``outcome`` and (optionally) ``lesson`` on a stored decision. 284 285 Returns a *new* :class:`Decision` (the schema is frozen, so this uses 286 ``model_copy(update=...)``). Persists the updated envelope by 287 rewriting the JSONL row and refreshing the cache. 288 289 If a ``lesson`` is provided, it is also added as a *separate* hybrid 290 memory entry under the lesson text so that future recalls find the 291 decision under the lesson's wording too. The new entry shares the 292 same ``decision_id`` (i.e. the recommendation slot) — recall 293 deduplicates on id, so there's no double-counting. 294 """ 295 existing = self._cache.get(decision_id) 296 if existing is None: 297 raise KeyError(f"Unknown decision_id: {decision_id!r}") 298 299 update: dict[str, Any] = {"outcome": outcome} 300 if lesson is not None: 301 update["lesson"] = lesson 302 updated = existing.model_copy(update=update) 303 304 self._cache[decision_id] = updated 305 self._rewrite_jsonl() 306 307 if lesson is not None: 308 self._hybrid.add(f"Lesson: {lesson}", decision_id) 309 310 return updated 311 312 # ------------------------------------------------------------------ 313 # JSONL persistence helpers 314 # ------------------------------------------------------------------ 315 316 def _append_jsonl(self, decision: Decision) -> None: 317 """Append a single decision to the JSONL store.""" 318 line = json.dumps(decision.model_dump(mode="json"), separators=(",", ":")) 319 with self._jsonl_path.open("a", encoding="utf-8") as fh: 320 fh.write(line + "\n") 321 322 def _rewrite_jsonl(self) -> None: 323 """Rewrite the JSONL store from the in-memory cache. 324 325 Used when an existing decision is updated (frozen schema means we 326 produce a new envelope; the old line is stale). Acceptable cost for 327 the iter-6 slice — mastermind is not high-throughput; we rewrite at 328 most once per reflection pass. 329 """ 330 tmp = self._jsonl_path.with_suffix(".jsonl.tmp") 331 with tmp.open("w", encoding="utf-8") as fh: 332 for decision in self._cache.values(): 333 fh.write(json.dumps(decision.model_dump(mode="json"), separators=(",", ":"))) 334 fh.write("\n") 335 tmp.replace(self._jsonl_path) 336 337 def _load_jsonl(self) -> None: 338 """Hydrate the in-memory cache from the JSONL store on startup.""" 339 if not self._jsonl_path.exists(): 340 return 341 with self._jsonl_path.open("r", encoding="utf-8") as fh: 342 for line_no, raw in enumerate(fh, start=1): 343 raw = raw.strip() 344 if not raw: 345 continue 346 try: 347 payload = json.loads(raw) 348 decision = Decision.model_validate(payload) 349 except Exception: 350 logger.warning( 351 "DecisionMemory load: skipping malformed line %d in %s", 352 line_no, 353 self._jsonl_path, 354 ) 355 continue 356 self._cache[decision.decision_id] = decision