1"""Mirofish refresher — tails ``fomo2:reports`` and runs crowd simulations. 2 3Heavy duty 4---------- 5A single MiroFish crowd-sim takes 10–30 minutes (the five-stage flow inside 6the vendored Flask backend: ontology → graph build → simulation prepare → 7simulation run → report generate). So this refresher is **event-driven and 8budget-gated**, not on a cadence: 9 10 1. Tail ``fomo2:reports`` for new analyses. 11 2. Skip events that don't reference a watched symbol (zero-cost cold path). 12 3. Skip events that have already been simulated (``report_id`` dedup). 13 4. Skip events when the daily sim budget is exhausted. 14 5. Call :class:`maf.sources.adapters.mirofish.MirofishCrowdSource` — 15 reusing all of its existing five-stage logic. We don't reimplement 16 anything here, just orchestrate the trigger. 17 6. Cache the result at ``mirofish:sim:{report_id}`` with a long TTL 18 (24 h default — sims age slowly). 19 7. XADD a compact event to ``mirofish:sims:emitted`` so trigger rules 20 can react ("re-run trading_intelligence now that a crowd-sim landed"). 21 22The budget counter is a Redis key ``mirofish:budget:{YYYYMMDD}`` that the 23refresher INCRs after a successful sim. Once the day rolls over the new key 24starts at zero. This costs nothing when no symbols are watched. 25 26Failures 27-------- 28Mirofish failures don't crash the loop. On any non-2xx or timeout we log 29the exception and move on. We don't retry the same report — the refresher 30is opportunistic; if a report's first sim fails, the human-review path 31takes over via the dashboard. 32""" 33 34from __future__ import annotations 35 36import asyncio 37import json 38import logging 39import os 40import time 41from datetime import UTC, datetime 42from typing import Any 43 44from maf.streaming import get_event_bus 45from maf.watch.list import KIND_DOCUMENT, KIND_SYMBOL, WatchList 46 47logger = logging.getLogger(__name__) 48 49 50DEFAULT_REPORTS_STREAM = "fomo2:reports" 51DEFAULT_SIM_KEY = "mirofish:sim:{report_id}" 52DEFAULT_EMIT_STREAM = "mirofish:sims:emitted" 53DEFAULT_BUDGET_KEY = "mirofish:budget:{date}" 54DEFAULT_DAILY_BUDGET = 10 55DEFAULT_SIM_TTL_S = 24 * 3600 56DEFAULT_IMPACT_THRESHOLD = 0.6 57 58 59class MirofishRefresher: 60 """Event-driven crowd-sim launcher for high-impact fomo2 reports.""" 61 62 def __init__( 63 self, 64 *, 65 mirofish_url: str | None = None, 66 redis_url: str | None = None, 67 watch_list: WatchList | None = None, 68 reports_stream: str = DEFAULT_REPORTS_STREAM, 69 sim_key_template: str = DEFAULT_SIM_KEY, 70 emit_stream: str = DEFAULT_EMIT_STREAM, 71 budget_key_template: str = DEFAULT_BUDGET_KEY, 72 daily_budget: int = DEFAULT_DAILY_BUDGET, 73 sim_ttl_s: int = DEFAULT_SIM_TTL_S, 74 impact_threshold: float = DEFAULT_IMPACT_THRESHOLD, 75 require_watched_symbol: bool = True, 76 ) -> None: 77 self.mirofish_url = (mirofish_url 78 or os.environ.get("MIROFISH_URL") 79 or "http://localhost:5101") 80 self.redis_url = redis_url or os.environ.get( 81 "REDIS_URL", "redis://localhost:6379/0", 82 ) 83 self.watch = watch_list or WatchList(redis_url=self.redis_url) 84 self.reports_stream = reports_stream 85 self.sim_key_template = sim_key_template 86 self.emit_stream = emit_stream 87 self.budget_key_template = budget_key_template 88 self.daily_budget = int(daily_budget) 89 self.sim_ttl_s = int(sim_ttl_s) 90 self.impact_threshold = float(impact_threshold) 91 self.require_watched_symbol = require_watched_symbol 92 93 self._redis: Any = None 94 self._stop = asyncio.Event() 95 self._cursor: str = "$" 96 97 # ── lifecycle ────────────────────────────────────────────────────────── 98 99 async def _get_redis(self) -> Any: 100 if self._redis is None: 101 import redis.asyncio as aioredis 102 self._redis = aioredis.from_url(self.redis_url) 103 return self._redis 104 105 def stop(self) -> None: 106 self._stop.set() 107 108 async def aclose(self) -> None: 109 self._stop.set() 110 if self._redis is None: 111 return 112 try: 113 ac = getattr(self._redis, "aclose", None) 114 if ac: 115 await ac() 116 else: 117 await self._redis.close() 118 except Exception: 119 pass 120 121 async def _write_heartbeat(self) -> None: 122 """Heartbeat key consumed by /api/system/status.""" 123 try: 124 client = await self._get_redis() 125 payload = json.dumps({ 126 "ts": time.time(), 127 "reports_stream": self.reports_stream, 128 "daily_budget": self.daily_budget, 129 }) 130 await client.set("maf:refresher:mirofish:heartbeat", payload, ex=300) 131 except Exception as exc: 132 logger.debug("mirofish heartbeat write failed: %s", exc) 133 134 async def run(self) -> None: 135 """Tail the reports stream and process new entries forever.""" 136 logger.info( 137 "MirofishRefresher started — mirofish=%s reports=%s budget/day=%d", 138 self.mirofish_url, self.reports_stream, self.daily_budget, 139 ) 140 client = await self._get_redis() 141 await self._write_heartbeat() 142 while not self._stop.is_set(): 143 try: 144 resp = await client.xread( 145 {self.reports_stream: self._cursor}, 146 block=5000, count=10, 147 ) 148 except Exception as exc: 149 logger.warning("MirofishRefresher xread failed: %s", exc) 150 await asyncio.sleep(2.0) 151 continue 152 # Tick happened (whether or not we got data). Refresh heartbeat. 153 await self._write_heartbeat() 154 if not resp: 155 continue 156 for stream_raw, entries in resp: 157 for entry_id, fields in entries: 158 sid = ( 159 entry_id.decode() if isinstance(entry_id, bytes) 160 else str(entry_id) 161 ) 162 self._cursor = sid 163 report = _decode(fields) 164 try: 165 await self._maybe_sim(report) 166 except Exception: 167 logger.exception( 168 "MirofishRefresher: handler crashed on %s", sid, 169 ) 170 171 # ── core ─────────────────────────────────────────────────────────────── 172 173 async def _maybe_sim(self, report: dict[str, Any]) -> None: 174 """One-pass: should we sim this report? if yes, run it; cache; emit.""" 175 report_id = _report_id(report) 176 if not report_id: 177 return 178 179 impact = float(report.get("impact") or report.get("impact_score") or 0.0) 180 if impact < self.impact_threshold: 181 return 182 183 tickers = _tickers(report) 184 if self.require_watched_symbol: 185 watched = {e.target_id.upper() for e in 186 await self.watch.members(kind=KIND_SYMBOL)} 187 if not (set(tickers) & watched): 188 logger.debug( 189 "mirofish skip %s — no watched ticker in %s", 190 report_id, tickers, 191 ) 192 return 193 194 # Dedup 195 client = await self._get_redis() 196 cache_key = self.sim_key_template.format(report_id=report_id) 197 if await client.exists(cache_key): 198 logger.debug("mirofish skip %s — already simulated", report_id) 199 return 200 201 # Budget 202 if not await self._consume_budget(): 203 logger.info( 204 "mirofish skip %s — daily budget %d exhausted", 205 report_id, self.daily_budget, 206 ) 207 return 208 209 # Build the document payload the MirofishCrowdSource expects. 210 document = ( 211 report.get("markdown") 212 or report.get("body") 213 or report.get("summary") 214 or report.get("content") 215 or "" 216 ) 217 if not document or len(document) < 200: 218 logger.debug("mirofish skip %s — no usable document body", report_id) 219 return 220 221 sim_brief = ( 222 f"How will retail and institutional investors react to this report " 223 f"about {', '.join(tickers) or 'the topic'} over the next 24 h?" 224 ) 225 226 # Delegate to the existing adapter — it owns the 5-stage flow + retries. 227 try: 228 from maf.sources.adapters.mirofish import MirofishCrowdSource 229 except ImportError as exc: 230 logger.warning("mirofish adapter unavailable: %s", exc) 231 return 232 233 adapter = MirofishCrowdSource({ 234 "base_url": self.mirofish_url, 235 "request_timeout_s": 60, 236 "total_timeout_s": 1800, 237 "poll_interval_s": 5, 238 }) 239 t0 = time.monotonic() 240 try: 241 sim = await adapter.fetch({ 242 "document_text": document, 243 "simulation_requirement": sim_brief, 244 "max_rounds": int(report.get("max_rounds") or 3), 245 }) 246 except Exception as exc: 247 logger.warning("mirofish sim %s failed: %s", report_id, exc) 248 # Refund the budget — failed sims shouldn't count toward the cap. 249 await self._refund_budget() 250 return 251 252 elapsed = time.monotonic() - t0 253 254 # Cache the sim payload + summary block. 255 envelope = { 256 "schema_version": "1", 257 "report_id": report_id, 258 "tickers": tickers, 259 "generated_at": datetime.now(UTC).isoformat(), 260 "elapsed_seconds": round(elapsed, 1), 261 "sim": sim, 262 "summary": _summarise(sim, tickers), 263 } 264 await client.set( 265 cache_key, json.dumps(envelope, default=str), ex=self.sim_ttl_s, 266 ) 267 268 # Emit a compact event. 269 try: 270 await client.xadd( 271 self.emit_stream, 272 {"data": json.dumps({ 273 "schema_version": "1", 274 "report_id": report_id, 275 "tickers": tickers, 276 "summary": envelope["summary"], 277 "generated_at": envelope["generated_at"], 278 }, default=str)}, 279 maxlen=10_000, approximate=True, 280 ) 281 except Exception as exc: 282 logger.warning("mirofish emit failed: %s", exc) 283 284 bus = get_event_bus() 285 await bus.publish( 286 "system.status", payload={ 287 "kind": "mirofish.sim", 288 "report_id": report_id, 289 "tickers": tickers, 290 "elapsed_s": round(elapsed, 1), 291 }, 292 ) 293 logger.info( 294 "mirofish sim done: report=%s tickers=%s elapsed=%.1fs", 295 report_id, tickers, elapsed, 296 ) 297 298 # ── budget helpers ───────────────────────────────────────────────────── 299 300 def _budget_key(self) -> str: 301 return self.budget_key_template.format( 302 date=datetime.now(UTC).strftime("%Y%m%d"), 303 ) 304 305 async def _consume_budget(self) -> bool: 306 """Atomic INCR + check. Returns False when over budget; the budget 307 is *rolled back* (DECR) so a follow-up call can succeed.""" 308 client = await self._get_redis() 309 key = self._budget_key() 310 used = int(await client.incr(key)) 311 # Set a 36 h TTL on first set so the key auto-expires. 312 if used == 1: 313 await client.expire(key, 36 * 3600) 314 if used > self.daily_budget: 315 await client.decr(key) 316 return False 317 return True 318 319 async def _refund_budget(self) -> None: 320 try: 321 client = await self._get_redis() 322 await client.decr(self._budget_key()) 323 except Exception: 324 pass 325 326 327# ── helpers ──────────────────────────────────────────────────────────────── 328 329 330def _decode(fields: Any) -> dict[str, Any]: 331 if not isinstance(fields, dict): 332 return {} 333 out: dict[str, Any] = {} 334 for k, v in fields.items(): 335 key = k.decode("utf-8") if isinstance(k, bytes) else str(k) 336 if isinstance(v, bytes): 337 try: 338 v = v.decode("utf-8") 339 except UnicodeDecodeError: 340 continue 341 if isinstance(v, str): 342 try: 343 v = json.loads(v) 344 except (json.JSONDecodeError, TypeError): 345 pass 346 out[key] = v 347 # Some fomo2 publishers wrap the whole payload under a single ``data`` 348 # field; if so unwrap it so callers see a flat dict. 349 inner = out.get("data") 350 if isinstance(inner, dict) and len(out) == 1: 351 return inner 352 return out 353 354 355def _report_id(report: dict[str, Any]) -> str: 356 """Pull a stable id out of a fomo2 report event.""" 357 for k in ("report_id", "id", "filename", "slug"): 358 v = report.get(k) 359 if v: 360 return str(v) 361 # Last resort: hash the (timestamp, headline) pair. 362 return str(hash(( 363 str(report.get("timestamp") or report.get("generated_at") or ""), 364 str(report.get("headline") or report.get("title") or "")[:100], 365 ))) 366 367 368def _tickers(report: dict[str, Any]) -> list[str]: 369 """Normalise the tickers list. Handles list, comma-string, single string.""" 370 raw = report.get("tickers") or report.get("symbols") or report.get("ticker") 371 if raw is None: 372 return [] 373 if isinstance(raw, str): 374 if "," in raw: 375 return [t.strip().upper() for t in raw.split(",") if t.strip()] 376 return [raw.strip().upper()] if raw.strip() else [] 377 if isinstance(raw, list): 378 return [str(t).strip().upper() for t in raw if str(t).strip()] 379 return [] 380 381 382def _summarise(sim: dict[str, Any], tickers: list[str]) -> dict[str, Any]: 383 """Compress the full mirofish sim into a few numbers downstream agents read.""" 384 # The MirofishCrowdSource returns the raw report dict from the backend. 385 # Extract the most useful headline metrics; pass through unknown fields 386 # so we don't lose information on future versions. 387 summary: dict[str, Any] = { 388 "tickers": tickers, 389 "outcome": sim.get("outcome") or sim.get("verdict") or "", 390 "probability": _f(sim.get("probability") 391 or sim.get("consensus_probability")), 392 "dissent_pct": _f(sim.get("dissent_pct") 393 or sim.get("dissent_percentage")), 394 "top_drivers": list(sim.get("top_drivers") or [])[:5], 395 "horizon_hours": int(sim.get("horizon_hours") or 24), 396 "personas_count": int(sim.get("personas_count") 397 or sim.get("persona_count") 398 or 0), 399 } 400 return summary 401 402 403def _f(v: Any) -> float | None: 404 if v is None: 405 return None 406 try: 407 return float(v) 408 except (TypeError, ValueError): 409 return None 410 411 412__all__ = [ 413 "DEFAULT_BUDGET_KEY", 414 "DEFAULT_DAILY_BUDGET", 415 "DEFAULT_EMIT_STREAM", 416 "DEFAULT_IMPACT_THRESHOLD", 417 "DEFAULT_REPORTS_STREAM", 418 "DEFAULT_SIM_KEY", 419 "MirofishRefresher", 420]