checking system…
Docs / back / src/maf/scheduler/mirofish_refresher.py · line 59
Python · 421 lines
  1"""Mirofish refresher — tails ``fomo2:reports`` and runs crowd simulations.
  2
  3Heavy duty
  4----------
  5A single MiroFish crowd-sim takes 10–30 minutes (the five-stage flow inside
  6the vendored Flask backend: ontology → graph build → simulation prepare →
  7simulation run → report generate). So this refresher is **event-driven and
  8budget-gated**, not on a cadence:
  9
 10  1. Tail ``fomo2:reports`` for new analyses.
 11  2. Skip events that don't reference a watched symbol (zero-cost cold path).
 12  3. Skip events that have already been simulated (``report_id`` dedup).
 13  4. Skip events when the daily sim budget is exhausted.
 14  5. Call :class:`maf.sources.adapters.mirofish.MirofishCrowdSource` —
 15     reusing all of its existing five-stage logic. We don't reimplement
 16     anything here, just orchestrate the trigger.
 17  6. Cache the result at ``mirofish:sim:{report_id}`` with a long TTL
 18     (24 h default — sims age slowly).
 19  7. XADD a compact event to ``mirofish:sims:emitted`` so trigger rules
 20     can react ("re-run trading_intelligence now that a crowd-sim landed").
 21
 22The budget counter is a Redis key ``mirofish:budget:{YYYYMMDD}`` that the
 23refresher INCRs after a successful sim. Once the day rolls over the new key
 24starts at zero. This costs nothing when no symbols are watched.
 25
 26Failures
 27--------
 28Mirofish failures don't crash the loop. On any non-2xx or timeout we log
 29the exception and move on. We don't retry the same report — the refresher
 30is opportunistic; if a report's first sim fails, the human-review path
 31takes over via the dashboard.
 32"""
 33
 34from __future__ import annotations
 35
 36import asyncio
 37import json
 38import logging
 39import os
 40import time
 41from datetime import UTC, datetime
 42from typing import Any
 43
 44from maf.streaming import get_event_bus
 45from maf.watch.list import KIND_DOCUMENT, KIND_SYMBOL, WatchList
 46
 47logger = logging.getLogger(__name__)
 48
 49
 50DEFAULT_REPORTS_STREAM = "fomo2:reports"
 51DEFAULT_SIM_KEY = "mirofish:sim:{report_id}"
 52DEFAULT_EMIT_STREAM = "mirofish:sims:emitted"
 53DEFAULT_BUDGET_KEY = "mirofish:budget:{date}"
 54DEFAULT_DAILY_BUDGET = 10
 55DEFAULT_SIM_TTL_S = 24 * 3600
 56DEFAULT_IMPACT_THRESHOLD = 0.6
 57
 58
 59class MirofishRefresher:
 60    """Event-driven crowd-sim launcher for high-impact fomo2 reports."""
 61
 62    def __init__(
 63        self,
 64        *,
 65        mirofish_url: str | None = None,
 66        redis_url: str | None = None,
 67        watch_list: WatchList | None = None,
 68        reports_stream: str = DEFAULT_REPORTS_STREAM,
 69        sim_key_template: str = DEFAULT_SIM_KEY,
 70        emit_stream: str = DEFAULT_EMIT_STREAM,
 71        budget_key_template: str = DEFAULT_BUDGET_KEY,
 72        daily_budget: int = DEFAULT_DAILY_BUDGET,
 73        sim_ttl_s: int = DEFAULT_SIM_TTL_S,
 74        impact_threshold: float = DEFAULT_IMPACT_THRESHOLD,
 75        require_watched_symbol: bool = True,
 76    ) -> None:
 77        self.mirofish_url = (mirofish_url
 78                              or os.environ.get("MIROFISH_URL")
 79                              or "http://localhost:5101")
 80        self.redis_url = redis_url or os.environ.get(
 81            "REDIS_URL", "redis://localhost:6379/0",
 82        )
 83        self.watch = watch_list or WatchList(redis_url=self.redis_url)
 84        self.reports_stream = reports_stream
 85        self.sim_key_template = sim_key_template
 86        self.emit_stream = emit_stream
 87        self.budget_key_template = budget_key_template
 88        self.daily_budget = int(daily_budget)
 89        self.sim_ttl_s = int(sim_ttl_s)
 90        self.impact_threshold = float(impact_threshold)
 91        self.require_watched_symbol = require_watched_symbol
 92
 93        self._redis: Any = None
 94        self._stop = asyncio.Event()
 95        self._cursor: str = "$"
 96
 97    # ── lifecycle ──────────────────────────────────────────────────────────
 98
 99    async def _get_redis(self) -> Any:
100        if self._redis is None:
101            import redis.asyncio as aioredis
102            self._redis = aioredis.from_url(self.redis_url)
103        return self._redis
104
105    def stop(self) -> None:
106        self._stop.set()
107
108    async def aclose(self) -> None:
109        self._stop.set()
110        if self._redis is None:
111            return
112        try:
113            ac = getattr(self._redis, "aclose", None)
114            if ac:
115                await ac()
116            else:
117                await self._redis.close()
118        except Exception:
119            pass
120
121    async def _write_heartbeat(self) -> None:
122        """Heartbeat key consumed by /api/system/status."""
123        try:
124            client = await self._get_redis()
125            payload = json.dumps({
126                "ts": time.time(),
127                "reports_stream": self.reports_stream,
128                "daily_budget": self.daily_budget,
129            })
130            await client.set("maf:refresher:mirofish:heartbeat", payload, ex=300)
131        except Exception as exc:
132            logger.debug("mirofish heartbeat write failed: %s", exc)
133
134    async def run(self) -> None:
135        """Tail the reports stream and process new entries forever."""
136        logger.info(
137            "MirofishRefresher started — mirofish=%s reports=%s budget/day=%d",
138            self.mirofish_url, self.reports_stream, self.daily_budget,
139        )
140        client = await self._get_redis()
141        await self._write_heartbeat()
142        while not self._stop.is_set():
143            try:
144                resp = await client.xread(
145                    {self.reports_stream: self._cursor},
146                    block=5000, count=10,
147                )
148            except Exception as exc:
149                logger.warning("MirofishRefresher xread failed: %s", exc)
150                await asyncio.sleep(2.0)
151                continue
152            # Tick happened (whether or not we got data). Refresh heartbeat.
153            await self._write_heartbeat()
154            if not resp:
155                continue
156            for stream_raw, entries in resp:
157                for entry_id, fields in entries:
158                    sid = (
159                        entry_id.decode() if isinstance(entry_id, bytes)
160                        else str(entry_id)
161                    )
162                    self._cursor = sid
163                    report = _decode(fields)
164                    try:
165                        await self._maybe_sim(report)
166                    except Exception:
167                        logger.exception(
168                            "MirofishRefresher: handler crashed on %s", sid,
169                        )
170
171    # ── core ───────────────────────────────────────────────────────────────
172
173    async def _maybe_sim(self, report: dict[str, Any]) -> None:
174        """One-pass: should we sim this report? if yes, run it; cache; emit."""
175        report_id = _report_id(report)
176        if not report_id:
177            return
178
179        impact = float(report.get("impact") or report.get("impact_score") or 0.0)
180        if impact < self.impact_threshold:
181            return
182
183        tickers = _tickers(report)
184        if self.require_watched_symbol:
185            watched = {e.target_id.upper() for e in
186                        await self.watch.members(kind=KIND_SYMBOL)}
187            if not (set(tickers) & watched):
188                logger.debug(
189                    "mirofish skip %s — no watched ticker in %s",
190                    report_id, tickers,
191                )
192                return
193
194        # Dedup
195        client = await self._get_redis()
196        cache_key = self.sim_key_template.format(report_id=report_id)
197        if await client.exists(cache_key):
198            logger.debug("mirofish skip %s — already simulated", report_id)
199            return
200
201        # Budget
202        if not await self._consume_budget():
203            logger.info(
204                "mirofish skip %s — daily budget %d exhausted",
205                report_id, self.daily_budget,
206            )
207            return
208
209        # Build the document payload the MirofishCrowdSource expects.
210        document = (
211            report.get("markdown")
212            or report.get("body")
213            or report.get("summary")
214            or report.get("content")
215            or ""
216        )
217        if not document or len(document) < 200:
218            logger.debug("mirofish skip %s — no usable document body", report_id)
219            return
220
221        sim_brief = (
222            f"How will retail and institutional investors react to this report "
223            f"about {', '.join(tickers) or 'the topic'} over the next 24 h?"
224        )
225
226        # Delegate to the existing adapter — it owns the 5-stage flow + retries.
227        try:
228            from maf.sources.adapters.mirofish import MirofishCrowdSource
229        except ImportError as exc:
230            logger.warning("mirofish adapter unavailable: %s", exc)
231            return
232
233        adapter = MirofishCrowdSource({
234            "base_url": self.mirofish_url,
235            "request_timeout_s": 60,
236            "total_timeout_s": 1800,
237            "poll_interval_s": 5,
238        })
239        t0 = time.monotonic()
240        try:
241            sim = await adapter.fetch({
242                "document_text": document,
243                "simulation_requirement": sim_brief,
244                "max_rounds": int(report.get("max_rounds") or 3),
245            })
246        except Exception as exc:
247            logger.warning("mirofish sim %s failed: %s", report_id, exc)
248            # Refund the budget — failed sims shouldn't count toward the cap.
249            await self._refund_budget()
250            return
251
252        elapsed = time.monotonic() - t0
253
254        # Cache the sim payload + summary block.
255        envelope = {
256            "schema_version": "1",
257            "report_id": report_id,
258            "tickers": tickers,
259            "generated_at": datetime.now(UTC).isoformat(),
260            "elapsed_seconds": round(elapsed, 1),
261            "sim": sim,
262            "summary": _summarise(sim, tickers),
263        }
264        await client.set(
265            cache_key, json.dumps(envelope, default=str), ex=self.sim_ttl_s,
266        )
267
268        # Emit a compact event.
269        try:
270            await client.xadd(
271                self.emit_stream,
272                {"data": json.dumps({
273                    "schema_version": "1",
274                    "report_id": report_id,
275                    "tickers": tickers,
276                    "summary": envelope["summary"],
277                    "generated_at": envelope["generated_at"],
278                }, default=str)},
279                maxlen=10_000, approximate=True,
280            )
281        except Exception as exc:
282            logger.warning("mirofish emit failed: %s", exc)
283
284        bus = get_event_bus()
285        await bus.publish(
286            "system.status", payload={
287                "kind": "mirofish.sim",
288                "report_id": report_id,
289                "tickers": tickers,
290                "elapsed_s": round(elapsed, 1),
291            },
292        )
293        logger.info(
294            "mirofish sim done: report=%s tickers=%s elapsed=%.1fs",
295            report_id, tickers, elapsed,
296        )
297
298    # ── budget helpers ─────────────────────────────────────────────────────
299
300    def _budget_key(self) -> str:
301        return self.budget_key_template.format(
302            date=datetime.now(UTC).strftime("%Y%m%d"),
303        )
304
305    async def _consume_budget(self) -> bool:
306        """Atomic INCR + check. Returns False when over budget; the budget
307        is *rolled back* (DECR) so a follow-up call can succeed."""
308        client = await self._get_redis()
309        key = self._budget_key()
310        used = int(await client.incr(key))
311        # Set a 36 h TTL on first set so the key auto-expires.
312        if used == 1:
313            await client.expire(key, 36 * 3600)
314        if used > self.daily_budget:
315            await client.decr(key)
316            return False
317        return True
318
319    async def _refund_budget(self) -> None:
320        try:
321            client = await self._get_redis()
322            await client.decr(self._budget_key())
323        except Exception:
324            pass
325
326
327# ── helpers ────────────────────────────────────────────────────────────────
328
329
330def _decode(fields: Any) -> dict[str, Any]:
331    if not isinstance(fields, dict):
332        return {}
333    out: dict[str, Any] = {}
334    for k, v in fields.items():
335        key = k.decode("utf-8") if isinstance(k, bytes) else str(k)
336        if isinstance(v, bytes):
337            try:
338                v = v.decode("utf-8")
339            except UnicodeDecodeError:
340                continue
341        if isinstance(v, str):
342            try:
343                v = json.loads(v)
344            except (json.JSONDecodeError, TypeError):
345                pass
346        out[key] = v
347    # Some fomo2 publishers wrap the whole payload under a single ``data``
348    # field; if so unwrap it so callers see a flat dict.
349    inner = out.get("data")
350    if isinstance(inner, dict) and len(out) == 1:
351        return inner
352    return out
353
354
355def _report_id(report: dict[str, Any]) -> str:
356    """Pull a stable id out of a fomo2 report event."""
357    for k in ("report_id", "id", "filename", "slug"):
358        v = report.get(k)
359        if v:
360            return str(v)
361    # Last resort: hash the (timestamp, headline) pair.
362    return str(hash((
363        str(report.get("timestamp") or report.get("generated_at") or ""),
364        str(report.get("headline") or report.get("title") or "")[:100],
365    )))
366
367
368def _tickers(report: dict[str, Any]) -> list[str]:
369    """Normalise the tickers list. Handles list, comma-string, single string."""
370    raw = report.get("tickers") or report.get("symbols") or report.get("ticker")
371    if raw is None:
372        return []
373    if isinstance(raw, str):
374        if "," in raw:
375            return [t.strip().upper() for t in raw.split(",") if t.strip()]
376        return [raw.strip().upper()] if raw.strip() else []
377    if isinstance(raw, list):
378        return [str(t).strip().upper() for t in raw if str(t).strip()]
379    return []
380
381
382def _summarise(sim: dict[str, Any], tickers: list[str]) -> dict[str, Any]:
383    """Compress the full mirofish sim into a few numbers downstream agents read."""
384    # The MirofishCrowdSource returns the raw report dict from the backend.
385    # Extract the most useful headline metrics; pass through unknown fields
386    # so we don't lose information on future versions.
387    summary: dict[str, Any] = {
388        "tickers": tickers,
389        "outcome": sim.get("outcome") or sim.get("verdict") or "",
390        "probability": _f(sim.get("probability")
391                            or sim.get("consensus_probability")),
392        "dissent_pct": _f(sim.get("dissent_pct")
393                            or sim.get("dissent_percentage")),
394        "top_drivers": list(sim.get("top_drivers") or [])[:5],
395        "horizon_hours": int(sim.get("horizon_hours") or 24),
396        "personas_count": int(sim.get("personas_count")
397                                or sim.get("persona_count")
398                                or 0),
399    }
400    return summary
401
402
403def _f(v: Any) -> float | None:
404    if v is None:
405        return None
406    try:
407        return float(v)
408    except (TypeError, ValueError):
409        return None
410
411
412__all__ = [
413    "DEFAULT_BUDGET_KEY",
414    "DEFAULT_DAILY_BUDGET",
415    "DEFAULT_EMIT_STREAM",
416    "DEFAULT_IMPACT_THRESHOLD",
417    "DEFAULT_REPORTS_STREAM",
418    "DEFAULT_SIM_KEY",
419    "MirofishRefresher",
420]