1"""Mirofish-sim cache adapter — reads the result the refresher wrote. 2 3Two lookup modes: 4 5 by ``report_id`` direct cache hit on ``mirofish:sim:{report_id}``. 6 by ``symbol`` SCAN the cache, return the *most recent* sim that 7 mentions this symbol in its tickers list. 8 9Most arena agents want the symbol-mode call: "given the target ticker, 10do we have a recent crowd-sim?". The report-id mode is for arenas that 11already know the specific document (e.g. a follow-up reflection pass). 12 13Staleness 14--------- 15Mirofish sims age slowly (the consensus on yesterday's earnings beat is 16still informative today). Default freshness budget: 6 h. Set ``stale_after_seconds`` 17explicitly to tighten or relax. 18""" 19 20from __future__ import annotations 21 22import json 23import logging 24import os 25import time 26from datetime import datetime 27from typing import Any 28 29from maf.sources.base import BaseSource 30 31logger = logging.getLogger(__name__) 32 33 34DEFAULT_KEY_TEMPLATE = "mirofish:sim:{report_id}" 35DEFAULT_SCAN_PREFIX = "mirofish:sim:*" 36DEFAULT_STALE_AFTER_S = 6 * 3600 37 38 39class MirofishSimSource(BaseSource): 40 """Read a cached crowd-sim envelope produced by :class:`MirofishRefresher`. 41 42 Config keys (all overridable per-call): 43 symbol optional — find the most recent sim mentioning this ticker 44 report_id optional — direct lookup 45 stale_after_seconds freshness budget (default 6 h) 46 include_full_sim bool; default False — only return the summary block 47 """ 48 49 adapter_name = "mirofish_sim" 50 51 @classmethod 52 def freshness_spec(cls, binding_config: dict[str, Any]) -> dict[str, Any]: 53 return { 54 "type": "scan", 55 "scan_pattern": "mirofish:sim:*", 56 "emit_stream": "mirofish:sims:emitted", 57 } 58 59 def __init__(self, config: dict[str, Any]) -> None: 60 super().__init__(config) 61 self._redis: Any = None 62 63 async def _get_redis(self) -> Any: 64 if self._redis is None: 65 import redis.asyncio as aioredis 66 self._redis = aioredis.from_url( 67 self.config.get( 68 "redis_url", 69 os.environ.get("REDIS_URL", "redis://localhost:6379/0"), 70 ) 71 ) 72 return self._redis 73 74 async def fetch(self, params: dict[str, Any] | None = None) -> dict[str, Any]: 75 p = {**self.config, **(params or {})} 76 report_id = str(p.get("report_id") or "").strip() 77 symbol = str(p.get("symbol") or "").upper().strip() 78 stale_after = int(p.get("stale_after_seconds") or DEFAULT_STALE_AFTER_S) 79 include_full = bool(p.get("include_full_sim", False)) 80 81 if not report_id and not symbol: 82 return { 83 "type": "mirofish_sim", 84 "error": "need either 'report_id' or 'symbol'", 85 "key_factors": ["no_crowd_sim"], 86 } 87 88 client = await self._get_redis() 89 envelope = None 90 cache_key = "" 91 if report_id: 92 cache_key = DEFAULT_KEY_TEMPLATE.format(report_id=report_id) 93 try: 94 raw = await client.get(cache_key) 95 except Exception as exc: 96 return _err(symbol or report_id, f"redis: {exc}") 97 envelope = _decode(raw) 98 else: 99 # Symbol mode — scan keys, keep most-recent envelope mentioning the symbol. 100 envelope = await _find_latest_for_symbol(client, symbol) 101 102 if envelope is None: 103 return { 104 "type": "mirofish_sim", 105 "report_id": report_id, 106 "symbol": symbol, 107 "stale": True, 108 "missing": True, 109 "key_factors": ["no_crowd_sim"], 110 "message": ( 111 f"no cached crowd-sim for " 112 f"{'report ' + report_id if report_id else 'symbol ' + symbol}" 113 f" — is mirofish-svc up and is the symbol watched?" 114 ), 115 } 116 117 age = _age_seconds(envelope.get("generated_at")) 118 is_stale = age is None or age > stale_after 119 summary = envelope.get("summary") or {} 120 out: dict[str, Any] = { 121 "type": "mirofish_sim", 122 "report_id": envelope.get("report_id", report_id), 123 "symbol": symbol or (envelope.get("tickers") or [""])[0], 124 "tickers": envelope.get("tickers") or [], 125 "generated_at": envelope.get("generated_at", ""), 126 "age_seconds": age, 127 "stale": is_stale, 128 "summary": summary, 129 } 130 if is_stale: 131 out["key_factors"] = ["stale_crowd_sim"] 132 out["stale_after_seconds"] = stale_after 133 if include_full: 134 out["sim"] = envelope.get("sim") or {} 135 return out 136 137 138# ── helpers ──────────────────────────────────────────────────────────────── 139 140 141async def _find_latest_for_symbol( 142 client: Any, symbol: str, 143) -> dict[str, Any] | None: 144 """Scan the mirofish:sim:* keyspace; return the most recent matching envelope.""" 145 best: tuple[float, dict[str, Any]] | None = None 146 async for k in client.scan_iter(match=DEFAULT_SCAN_PREFIX, count=200): 147 try: 148 raw = await client.get(k) 149 except Exception: 150 continue 151 env = _decode(raw) 152 if not env: 153 continue 154 tickers = [str(t).upper() for t in (env.get("tickers") or [])] 155 if symbol not in tickers: 156 continue 157 gen = _age_seconds(env.get("generated_at")) or 1e12 158 if best is None or gen < best[0]: 159 best = (gen, env) 160 return best[1] if best else None 161 162 163def _decode(raw: Any) -> dict[str, Any] | None: 164 if not raw: 165 return None 166 if isinstance(raw, bytes): 167 raw = raw.decode("utf-8", errors="replace") 168 try: 169 env = json.loads(raw) 170 except (json.JSONDecodeError, TypeError): 171 return None 172 if not isinstance(env, dict): 173 return None 174 return env 175 176 177def _age_seconds(generated_at: Any) -> float | None: 178 if not generated_at: 179 return None 180 try: 181 dt = datetime.fromisoformat(str(generated_at).replace("Z", "+00:00")) 182 except (TypeError, ValueError): 183 return None 184 return max(0.0, time.time() - dt.timestamp()) 185 186 187def _err(target: str, msg: str) -> dict[str, Any]: 188 return { 189 "type": "mirofish_sim", 190 "target": target, 191 "error": msg, 192 "stale": True, 193 "key_factors": ["no_crowd_sim"], 194 } 195 196 197__all__ = [ 198 "DEFAULT_KEY_TEMPLATE", 199 "DEFAULT_SCAN_PREFIX", 200 "DEFAULT_STALE_AFTER_S", 201 "MirofishSimSource", 202]