checking system…
Docs / back / src/maf/sources/adapters/mirofish_sim.py · line 49
Python · 203 lines
  1"""Mirofish-sim cache adapter — reads the result the refresher wrote.
  2
  3Two lookup modes:
  4
  5  by ``report_id``    direct cache hit on ``mirofish:sim:{report_id}``.
  6  by ``symbol``       SCAN the cache, return the *most recent* sim that
  7                       mentions this symbol in its tickers list.
  8
  9Most arena agents want the symbol-mode call: "given the target ticker,
 10do we have a recent crowd-sim?". The report-id mode is for arenas that
 11already know the specific document (e.g. a follow-up reflection pass).
 12
 13Staleness
 14---------
 15Mirofish sims age slowly (the consensus on yesterday's earnings beat is
 16still informative today). Default freshness budget: 6 h. Set ``stale_after_seconds``
 17explicitly to tighten or relax.
 18"""
 19
 20from __future__ import annotations
 21
 22import json
 23import logging
 24import os
 25import time
 26from datetime import datetime
 27from typing import Any
 28
 29from maf.sources.base import BaseSource
 30
 31logger = logging.getLogger(__name__)
 32
 33
 34DEFAULT_KEY_TEMPLATE = "mirofish:sim:{report_id}"
 35DEFAULT_SCAN_PREFIX = "mirofish:sim:*"
 36DEFAULT_STALE_AFTER_S = 6 * 3600
 37
 38
 39class MirofishSimSource(BaseSource):
 40    """Read a cached crowd-sim envelope produced by :class:`MirofishRefresher`.
 41
 42    Config keys (all overridable per-call):
 43      symbol             optional — find the most recent sim mentioning this ticker
 44      report_id          optional — direct lookup
 45      stale_after_seconds  freshness budget (default 6 h)
 46      include_full_sim   bool; default False — only return the summary block
 47    """
 48
 49    adapter_name = "mirofish_sim"
 50
 51    @classmethod
 52    def freshness_spec(cls, binding_config: dict[str, Any]) -> dict[str, Any]:
 53        return {
 54            "type": "scan",
 55            "scan_pattern": "mirofish:sim:*",
 56            "emit_stream": "mirofish:sims:emitted",
 57        }
 58
 59    def __init__(self, config: dict[str, Any]) -> None:
 60        super().__init__(config)
 61        self._redis: Any = None
 62
 63    async def _get_redis(self) -> Any:
 64        if self._redis is None:
 65            import redis.asyncio as aioredis
 66            self._redis = aioredis.from_url(
 67                self.config.get(
 68                    "redis_url",
 69                    os.environ.get("REDIS_URL", "redis://localhost:6379/0"),
 70                )
 71            )
 72        return self._redis
 73
 74    async def fetch(self, params: dict[str, Any] | None = None) -> dict[str, Any]:
 75        p = {**self.config, **(params or {})}
 76        report_id = str(p.get("report_id") or "").strip()
 77        symbol = str(p.get("symbol") or "").upper().strip()
 78        stale_after = int(p.get("stale_after_seconds") or DEFAULT_STALE_AFTER_S)
 79        include_full = bool(p.get("include_full_sim", False))
 80
 81        if not report_id and not symbol:
 82            return {
 83                "type": "mirofish_sim",
 84                "error": "need either 'report_id' or 'symbol'",
 85                "key_factors": ["no_crowd_sim"],
 86            }
 87
 88        client = await self._get_redis()
 89        envelope = None
 90        cache_key = ""
 91        if report_id:
 92            cache_key = DEFAULT_KEY_TEMPLATE.format(report_id=report_id)
 93            try:
 94                raw = await client.get(cache_key)
 95            except Exception as exc:
 96                return _err(symbol or report_id, f"redis: {exc}")
 97            envelope = _decode(raw)
 98        else:
 99            # Symbol mode — scan keys, keep most-recent envelope mentioning the symbol.
100            envelope = await _find_latest_for_symbol(client, symbol)
101
102        if envelope is None:
103            return {
104                "type": "mirofish_sim",
105                "report_id": report_id,
106                "symbol": symbol,
107                "stale": True,
108                "missing": True,
109                "key_factors": ["no_crowd_sim"],
110                "message": (
111                    f"no cached crowd-sim for "
112                    f"{'report ' + report_id if report_id else 'symbol ' + symbol}"
113                    f" — is mirofish-svc up and is the symbol watched?"
114                ),
115            }
116
117        age = _age_seconds(envelope.get("generated_at"))
118        is_stale = age is None or age > stale_after
119        summary = envelope.get("summary") or {}
120        out: dict[str, Any] = {
121            "type": "mirofish_sim",
122            "report_id": envelope.get("report_id", report_id),
123            "symbol": symbol or (envelope.get("tickers") or [""])[0],
124            "tickers": envelope.get("tickers") or [],
125            "generated_at": envelope.get("generated_at", ""),
126            "age_seconds": age,
127            "stale": is_stale,
128            "summary": summary,
129        }
130        if is_stale:
131            out["key_factors"] = ["stale_crowd_sim"]
132            out["stale_after_seconds"] = stale_after
133        if include_full:
134            out["sim"] = envelope.get("sim") or {}
135        return out
136
137
138# ── helpers ────────────────────────────────────────────────────────────────
139
140
141async def _find_latest_for_symbol(
142    client: Any, symbol: str,
143) -> dict[str, Any] | None:
144    """Scan the mirofish:sim:* keyspace; return the most recent matching envelope."""
145    best: tuple[float, dict[str, Any]] | None = None
146    async for k in client.scan_iter(match=DEFAULT_SCAN_PREFIX, count=200):
147        try:
148            raw = await client.get(k)
149        except Exception:
150            continue
151        env = _decode(raw)
152        if not env:
153            continue
154        tickers = [str(t).upper() for t in (env.get("tickers") or [])]
155        if symbol not in tickers:
156            continue
157        gen = _age_seconds(env.get("generated_at")) or 1e12
158        if best is None or gen < best[0]:
159            best = (gen, env)
160    return best[1] if best else None
161
162
163def _decode(raw: Any) -> dict[str, Any] | None:
164    if not raw:
165        return None
166    if isinstance(raw, bytes):
167        raw = raw.decode("utf-8", errors="replace")
168    try:
169        env = json.loads(raw)
170    except (json.JSONDecodeError, TypeError):
171        return None
172    if not isinstance(env, dict):
173        return None
174    return env
175
176
177def _age_seconds(generated_at: Any) -> float | None:
178    if not generated_at:
179        return None
180    try:
181        dt = datetime.fromisoformat(str(generated_at).replace("Z", "+00:00"))
182    except (TypeError, ValueError):
183        return None
184    return max(0.0, time.time() - dt.timestamp())
185
186
187def _err(target: str, msg: str) -> dict[str, Any]:
188    return {
189        "type": "mirofish_sim",
190        "target": target,
191        "error": msg,
192        "stale": True,
193        "key_factors": ["no_crowd_sim"],
194    }
195
196
197__all__ = [
198    "DEFAULT_KEY_TEMPLATE",
199    "DEFAULT_SCAN_PREFIX",
200    "DEFAULT_STALE_AFTER_S",
201    "MirofishSimSource",
202]