1"""System health + stream freshness endpoints. 2 3* ``GET /api/system/status`` — connectivity pills, cached 5s. 4* ``GET /api/streams/health`` — XLEN + age per known Redis Stream. 5* ``GET /api/arenas/{name}/freshness`` — per-binding freshness for the 6 Setup tab. Resolves the freshness 7 spec from each adapter's classmethod 8 first, then ``_LEGACY_FRESHNESS_FALLBACK``. 9 10The endpoint that lists / samples ``/api/data/sources`` still lives in 11:mod:`maf.dashboard.api` because it's coupled to the data-tab page; this 12module owns only the health surface. 13""" 14 15from __future__ import annotations 16 17import logging 18import os 19import time 20from typing import Any 21 22from fastapi import APIRouter, HTTPException 23 24from maf.dashboard import state 25 26logger = logging.getLogger(__name__) 27 28router = APIRouter() 29 30 31async def _close_redis_client(client: Any) -> None: 32 """Best-effort close — tolerate redis-py 4.x/5.x/6.x signature drift.""" 33 try: 34 ac = getattr(client, "aclose", None) 35 if ac is not None: 36 await ac() 37 return 38 await client.close() 39 except Exception: 40 pass 41 42 43_STATUS_CACHE: dict[str, Any] = {"ts": 0.0, "payload": None} 44_STATUS_TTL_S = 5.0 45 46 47@router.get("/api/system/status") 48async def system_status(force: bool = False) -> dict[str, Any]: 49 """Check connectivity to every external dependency MAF needs. 50 51 Frontend renders this as a coloured row of pills. Each entry has 52 ``name``, ``ok`` (bool), ``detail`` (one-liner), and ``latency_ms`` so 53 a user can see at a glance whether the engine has everything it needs. 54 55 Result is cached for ``_STATUS_TTL_S`` seconds — the status bar polls 56 every ~5s per tab so without caching we'd hammer Redis/Ollama/mirofish 57 on every render with N tabs × N users. Pass ``?force=1`` to bypass. 58 """ 59 import os 60 import time 61 62 now = time.time() 63 if not force and _STATUS_CACHE["payload"] is not None: 64 if now - _STATUS_CACHE["ts"] < _STATUS_TTL_S: 65 return _STATUS_CACHE["payload"] 66 67 results: list[dict[str, Any]] = [] 68 69 # Redis (events stream) 70 t0 = time.monotonic() 71 redis_url = ( 72 state.get_maf_app().config.redis_url if state.get_maf_app() else 73 os.environ.get("REDIS_URL", "redis://localhost:6379/0") 74 ) 75 try: 76 import redis.asyncio as aioredis 77 client = aioredis.from_url(redis_url) 78 await client.ping() 79 await _close_redis_client(client) 80 results.append({ 81 "name": "redis", 82 "ok": True, 83 "detail": redis_url, 84 "latency_ms": round((time.monotonic() - t0) * 1000, 1), 85 }) 86 except Exception as exc: 87 results.append({ 88 "name": "redis", 89 "ok": False, 90 "detail": f"{type(exc).__name__}: {exc}", 91 "latency_ms": round((time.monotonic() - t0) * 1000, 1), 92 }) 93 94 # Ollama Cloud — single-shot small completion. Cached briefly to avoid 95 # one ping per dashboard render. 96 t0 = time.monotonic() 97 api_key = os.environ.get("OLLAMA_API_KEY", "") 98 if not api_key: 99 results.append({ 100 "name": "ollama", 101 "ok": False, 102 "detail": "OLLAMA_API_KEY not set", 103 "latency_ms": 0.0, 104 }) 105 else: 106 try: 107 import httpx 108 async with httpx.AsyncClient(timeout=10.0) as hc: 109 resp = await hc.get( 110 "https://ollama.com/v1/models", 111 headers={"Authorization": f"Bearer {api_key}"}, 112 ) 113 ok = resp.status_code == 200 114 detail = ( 115 f"{len(resp.json().get('data', []))} models" if ok else 116 f"HTTP {resp.status_code}" 117 ) 118 except Exception as exc: 119 ok = False 120 detail = f"{type(exc).__name__}: {exc}" 121 results.append({ 122 "name": "ollama", 123 "ok": ok, 124 "detail": detail, 125 "latency_ms": round((time.monotonic() - t0) * 1000, 1), 126 }) 127 128 # trtools2 importability 129 t0 = time.monotonic() 130 try: 131 import importlib 132 importlib.import_module("engine.db.questdb") 133 results.append({ 134 "name": "trtools2", 135 "ok": True, 136 "detail": "engine.db.questdb importable", 137 "latency_ms": round((time.monotonic() - t0) * 1000, 1), 138 }) 139 except Exception as exc: 140 results.append({ 141 "name": "trtools2", 142 "ok": False, 143 "detail": f"{type(exc).__name__}: {exc}", 144 "latency_ms": round((time.monotonic() - t0) * 1000, 1), 145 }) 146 147 # fomo2 importability 148 t0 = time.monotonic() 149 try: 150 import importlib 151 importlib.import_module("fomo2") 152 results.append({ 153 "name": "fomo2", 154 "ok": True, 155 "detail": "fomo2 package importable", 156 "latency_ms": round((time.monotonic() - t0) * 1000, 1), 157 }) 158 except Exception as exc: 159 results.append({ 160 "name": "fomo2", 161 "ok": False, 162 "detail": f"{type(exc).__name__}: {exc}", 163 "latency_ms": round((time.monotonic() - t0) * 1000, 1), 164 }) 165 166 # MiroFish reachability 167 t0 = time.monotonic() 168 mirofish_base = "http://localhost:5101" 169 if state.get_maf_app(): 170 for m in state.get_maf_app().config.modules: 171 if m.name == "mirofish": 172 mirofish_base = m.config.get("base_url", mirofish_base) 173 break 174 try: 175 import httpx 176 async with httpx.AsyncClient(timeout=3.0) as hc: 177 resp = await hc.get(f"{mirofish_base}/health") 178 ok = resp.status_code == 200 179 detail = f"{mirofish_base} {resp.status_code}" 180 except Exception as exc: 181 ok = False 182 detail = f"{type(exc).__name__}: {exc}" 183 results.append({ 184 "name": "mirofish", 185 "ok": ok, 186 "detail": detail, 187 "latency_ms": round((time.monotonic() - t0) * 1000, 1), 188 }) 189 190 # Refresher heartbeats — read JSON heartbeat keys written by the workers. 191 # Each refresher writes a TTL'd key on every tick; if it's still there and 192 # young, the worker is alive. 193 import json as _json 194 import time as _time 195 refresher_specs = [ 196 ("kronos_refresher", "maf:refresher:kronos:heartbeat", "Kronos forecast refresher"), 197 ("mirofish_refresher", "maf:refresher:mirofish:heartbeat", "MiroFish crowd-sim refresher"), 198 ] 199 for pill_name, hb_key, label in refresher_specs: 200 t0 = _time.monotonic() 201 try: 202 import redis.asyncio as aioredis 203 client = aioredis.from_url(redis_url) 204 raw = await client.get(hb_key) 205 await _close_redis_client(client) 206 if raw is None: 207 results.append({ 208 "name": pill_name, "ok": False, 209 "detail": "not running (no heartbeat — start `python -m maf` for service mode)", 210 "latency_ms": round((_time.monotonic() - t0) * 1000, 1), 211 }) 212 continue 213 hb = _json.loads(raw if isinstance(raw, (bytes, str)) else raw) 214 if isinstance(hb, bytes): 215 hb = _json.loads(hb) 216 age = _time.time() - float(hb.get("ts", 0)) 217 cadence = float(hb.get("min_cadence_s", 60)) 218 ok = age < cadence * 2.5 219 watched = hb.get("watched") 220 detail = ( 221 f"alive · {age:.0f}s ago" 222 + (f" · {watched} watched" if watched is not None else "") 223 ) 224 if not ok: 225 detail = f"stale · last heartbeat {age:.0f}s ago" 226 results.append({ 227 "name": pill_name, "ok": ok, "detail": detail, 228 "latency_ms": round((_time.monotonic() - t0) * 1000, 1), 229 }) 230 except Exception as exc: 231 results.append({ 232 "name": pill_name, "ok": False, 233 "detail": f"{type(exc).__name__}: {exc}", 234 "latency_ms": round((_time.monotonic() - t0) * 1000, 1), 235 }) 236 237 payload = {"checks": results, "all_ok": all(r["ok"] for r in results), "cached_at": now} 238 _STATUS_CACHE["ts"] = now 239 _STATUS_CACHE["payload"] = payload 240 return payload 241 242 243@router.get("/api/streams/health") 244async def streams_health() -> dict[str, Any]: 245 """Snapshot of every Redis Stream MAF reads from or writes to. 246 247 Returns each stream's length (XLEN) and the timestamp of the most recent 248 entry (parsed from the stream id, which is a millisecond timestamp). The 249 frontend uses this to flag stale streams (e.g. trtools2 hasn't published 250 a bar in 10 min → likely engine down). 251 """ 252 import os 253 import time 254 255 streams_cfg = getattr(state.get_maf_app().config, "streams", None) if state.get_maf_app() else None 256 redis_url = ( 257 state.get_maf_app().config.redis_url if state.get_maf_app() else 258 os.environ.get("REDIS_URL", "redis://localhost:6379/0") 259 ) 260 261 # Build the list of streams to inspect. Skip blanks so a partial config 262 # doesn't try to XLEN "". 263 targets: list[tuple[str, str]] = [] 264 if streams_cfg is not None: 265 for field, label in ( 266 ("events_stream", "events"), 267 ("control_in", "control:in"), 268 ("control_out", "control:out"), 269 ("actions_out", "actions:out"), 270 ("decisions_out", "decisions:out"), 271 ("trtools2_bars_1m", "trtools2:bars:1m"), 272 ("trtools2_bars_1h", "trtools2:bars:1h"), 273 ("trtools2_news", "trtools2:news"), 274 ("trtools2_indicators", "trtools2:indicators"), 275 ("trtools2_strategy_events", "trtools2:strategy:events"), 276 ("fomo2_enriched", "fomo2:enriched"), 277 ("fomo2_reports", "fomo2:reports"), 278 ("fomo2_requests", "fomo2:requests:in"), 279 ("kronos_forecasts_emitted", "kronos:forecasts:emitted"), 280 ("mirofish_sims_emitted", "mirofish:sims:emitted"), 281 ): 282 value = getattr(streams_cfg, field, "") 283 if value: 284 targets.append((label, value)) 285 286 # Also include each arena's own output_stream — the user wants to see 287 # decision streams in the same panel. 288 if state.get_maf_app(): 289 for name in state.get_maf_app().list_arenas(): 290 arena = state.get_maf_app().get_arena(name) 291 if arena and arena.config.output_stream: 292 targets.append((f"arena:{name}", arena.config.output_stream)) 293 294 try: 295 import redis.asyncio as aioredis 296 client = aioredis.from_url(redis_url) 297 except Exception as exc: 298 return {"streams": [], "error": f"{type(exc).__name__}: {exc}"} 299 300 out: list[dict[str, Any]] = [] 301 now_ms = int(time.time() * 1000) 302 for label, stream in targets: 303 try: 304 length = int(await client.xlen(stream)) 305 last = await client.xrevrange(stream, count=1) 306 if last: 307 raw_id = last[0][0] 308 sid = raw_id.decode() if isinstance(raw_id, bytes) else str(raw_id) 309 ts_part = sid.split("-", 1)[0] 310 try: 311 last_ms = int(ts_part) 312 except ValueError: 313 last_ms = 0 314 age_s = round(max(0, now_ms - last_ms) / 1000.0, 1) if last_ms else None 315 else: 316 sid = "" 317 age_s = None 318 out.append({ 319 "label": label, 320 "stream": stream, 321 "length": length, 322 "last_id": sid, 323 "age_seconds": age_s, 324 }) 325 except Exception as exc: 326 out.append({ 327 "label": label, 328 "stream": stream, 329 "length": None, 330 "error": f"{type(exc).__name__}: {exc}", 331 }) 332 await _close_redis_client(client) 333 334 return {"streams": out, "redis_url": redis_url, "now_ms": now_ms} 335 336 337# ── Per-binding freshness — maps each arena's source bindings to the 338# Redis stream or key they read from and reports age/length so the Setup 339# tab can render "live · 12s" / "stale · 1.6h" badges per source. 340# ───────────────────────────────────────────────────────────────────── 341 342 343# Fallback freshness specs for adapters that haven't migrated to declaring 344# their own ``freshness_spec()`` classmethod yet. The endpoint prefers the 345# classmethod when present (so a new adapter's spec lives next to its 346# code), and only consults this map as a last resort. Each entry here is 347# a candidate for migration — moving a row out of this map onto the 348# adapter class is a pure refactor with no behaviour change. 349_LEGACY_FRESHNESS_FALLBACK: dict[str, dict[str, Any]] = { 350 "trtools2_signals": {"type": "stream", "stream": "trtools2:signals"}, 351 "trtools2_live": {"type": "stream", "stream": "trtools2:live"}, 352 "fomo2_stream": {"type": "stream", "stream": "fomo2:enriched"}, 353 "fomo2_items": {"type": "stream", "stream": "fomo2:items"}, 354 "fomo2_reports": {"type": "stream", "stream": "fomo2:reports"}, 355 "fomo2_report": {"type": "stream", "stream": "fomo2:reports"}, 356 "fomo2_enrichment": {"type": "stream", "stream": "fomo2:enriched"}, 357 "fomo2_request": {"type": "request_response", 358 "out_stream": "fomo2:requests:out"}, 359 "fomo2_knowledge": {"type": "scan", "scan_pattern": "fomo2:knowledge:*"}, 360 "fomo2_analysis": {"type": "scan", "scan_pattern": "fomo2:analysis:*"}, 361 "mirofish_crowd": {"type": "external", 362 "detail": "Each fetch calls the MiroFish HTTP API directly."}, 363 "tradingagents": {"type": "stream", "stream": "trading_intelligence:in"}, 364 # External HTTP / SQL — no cache 365 "questdb": {"type": "external", "detail": "Direct SQL against QuestDB"}, 366 "yahoo_finance": {"type": "external", "detail": "Yahoo Finance HTTP"}, 367 "web_search": {"type": "external", "detail": "Web search HTTP"}, 368 "sec_filings": {"type": "external", "detail": "SEC EDGAR HTTP"}, 369 "newsapi": {"type": "external", "detail": "NewsAPI HTTP"}, 370 "fred_api": {"type": "external", "detail": "FRED HTTP"}, 371 "coingecko": {"type": "external", "detail": "CoinGecko HTTP"}, 372 "alpaca": {"type": "external", "detail": "Alpaca REST/SDK (deprecated — use trtools2_api)"}, 373 "polymarket_bet": {"type": "external", "detail": "Polymarket Gamma/CLOB HTTP"}, 374 "oddsoddy_strategy": {"type": "external", 375 "detail": "Reads ../oddsoddy/strategies/{name}/"}, 376 "technical": {"type": "external", "detail": "Computed on demand"}, 377} 378 379 380def _resolve_template(template: str, cfg: dict[str, Any]) -> str: 381 out = template 382 for k, v in (cfg or {}).items(): 383 out = out.replace("{" + str(k) + "}", str(v)) 384 return out 385 386 387def _freshness_spec_for_adapter(adapter_name: str, binding_config: dict[str, Any]) -> dict[str, Any]: 388 """Resolve a freshness spec for one binding. 389 390 Preference order: 391 1. The adapter class's ``freshness_spec(binding_config)`` classmethod — 392 this is where new adapters declare their own spec, co-located with 393 the fetch logic that produces the data. 394 2. The legacy fallback map above — for adapters that haven't migrated 395 yet. Removing rows from this map should be a no-op as long as the 396 corresponding adapter class declares ``freshness_spec``. 397 3. ``{"type": "unknown"}`` — honest default, dashboard renders "unknown". 398 """ 399 from maf.sources.registry import _ADAPTER_REGISTRY 400 cls = _ADAPTER_REGISTRY.get(adapter_name) 401 if cls is not None: 402 try: 403 spec = cls.freshness_spec(binding_config or {}) 404 if isinstance(spec, dict) and spec.get("type") and spec["type"] != "unknown": 405 return spec 406 except Exception as exc: 407 logger.warning("%s.freshness_spec raised %s — falling back", adapter_name, exc) 408 if adapter_name in _LEGACY_FRESHNESS_FALLBACK: 409 return _LEGACY_FRESHNESS_FALLBACK[adapter_name] 410 return {"type": "unknown"} 411 412 413def _classify_age(age_s: float | None, length: int | None) -> str: 414 if length == 0: 415 return "empty" 416 if age_s is None: 417 return "unknown" 418 if age_s < 300: 419 return "live" 420 if age_s < 3600: 421 return "stale" 422 return "old" 423 424 425@router.get("/api/arenas/{name}/freshness") 426async def arena_freshness(name: str) -> dict[str, Any]: 427 """Per-source freshness for an arena's bindings. 428 429 Backs the badges on the Setup tab — "live · 12s ago" / "stale · 1.6h" / 430 "empty" / "external API". The map of adapter→stream/key lives at 431 ``_ADAPTER_FRESHNESS_MAP`` near this handler. 432 """ 433 import os 434 import time as _time 435 436 if not state.get_maf_app(): 437 raise HTTPException(500, "MAF not initialized") 438 arena = state.get_maf_app().get_arena(name) 439 if not arena: 440 raise HTTPException(404, f"Arena {name!r} not found") 441 442 redis_url = ( 443 state.get_maf_app().config.redis_url if state.get_maf_app() else 444 os.environ.get("REDIS_URL", "redis://localhost:6379/0") 445 ) 446 try: 447 import redis.asyncio as aioredis 448 client = aioredis.from_url(redis_url) 449 except Exception as exc: 450 raise HTTPException(500, f"redis: {exc}") 451 452 now_ms = int(_time.time() * 1000) 453 bindings: list[dict[str, Any]] = [] 454 for b in arena.config.sources: 455 spec = _freshness_spec_for_adapter(b.adapter, b.config or {}) 456 entry: dict[str, Any] = { 457 "name": b.name, 458 "adapter": b.adapter, 459 "source_type": spec.get("type", "unknown"), 460 "detail": spec.get("detail"), 461 "stream": None, "length": None, "age_seconds": None, 462 "status": "n/a", 463 } 464 st = spec.get("type") 465 try: 466 if st == "stream": 467 stream = spec.get("stream") or _resolve_template( 468 spec.get("stream_template", ""), b.config or {}, 469 ) 470 entry["stream"] = stream 471 entry["length"] = int(await client.xlen(stream)) 472 last = await client.xrevrange(stream, count=1) 473 if last: 474 raw_id = last[0][0] 475 sid = raw_id.decode() if isinstance(raw_id, bytes) else str(raw_id) 476 try: 477 last_ms = int(sid.split("-", 1)[0]) 478 except ValueError: 479 last_ms = 0 480 entry["age_seconds"] = round(max(0, now_ms - last_ms) / 1000.0, 1) if last_ms else None 481 entry["status"] = _classify_age(entry["age_seconds"], entry["length"]) 482 elif st == "key": 483 template = spec.get("key_pattern") or spec.get("key_template", "") 484 pattern = _resolve_template(template, b.config or {}) 485 # Count how many cache keys exist matching the pattern. 486 # We deliberately do NOT use the remaining TTL as an age 487 # proxy — TTL is configured per refresher and unknown 488 # here, so the math is misleading. Instead, the emit 489 # stream (when defined) gives us a real "last refreshed" 490 # timestamp. If no emit stream, report cache presence 491 # only — age is honestly unknown. 492 cursor = 0 493 count = 0 494 while True: 495 cursor, batch = await client.scan(cursor=cursor, match=pattern, count=100) 496 count += len(batch) 497 if cursor == 0 or count >= 200: 498 break 499 entry["length"] = count 500 entry["age_seconds"] = None 501 entry["status"] = "live" if count > 0 else "missing" 502 if count > 0: 503 entry["detail"] = ( 504 f"{count} cached key(s) matching {pattern} " 505 f"(write-age unknown without emit stream)" 506 ) 507 # Cross-reference the emit stream for a real "last refresh" signal. 508 emit = spec.get("emit_stream") 509 if emit: 510 try: 511 last = await client.xrevrange(emit, count=1) 512 if last: 513 sid = last[0][0] 514 sid = sid.decode() if isinstance(sid, bytes) else str(sid) 515 try: 516 last_ms = int(sid.split("-", 1)[0]) 517 except ValueError: 518 last_ms = 0 519 if last_ms: 520 age = round(max(0, now_ms - last_ms) / 1000.0, 1) 521 entry["age_seconds"] = age 522 entry["status"] = _classify_age(age, count) 523 entry["emit_stream"] = emit 524 entry["detail"] = None 525 except Exception: 526 pass 527 elif st == "scan": 528 pattern = spec["scan_pattern"] 529 count = 0 530 cursor = 0 531 while True: 532 cursor, batch = await client.scan(cursor=cursor, match=pattern, count=100) 533 count += len(batch) 534 if cursor == 0 or count >= 500: 535 break 536 entry["length"] = count 537 entry["status"] = "live" if count > 0 else "missing" 538 # Cross-reference an emit stream if defined 539 emit = spec.get("emit_stream") 540 if emit: 541 try: 542 last = await client.xrevrange(emit, count=1) 543 if last: 544 sid = last[0][0] 545 sid = sid.decode() if isinstance(sid, bytes) else str(sid) 546 try: 547 last_ms = int(sid.split("-", 1)[0]) 548 except ValueError: 549 last_ms = 0 550 if last_ms: 551 age = round(max(0, now_ms - last_ms) / 1000.0, 1) 552 entry["age_seconds"] = age 553 entry["status"] = _classify_age(age, count) 554 entry["emit_stream"] = emit 555 except Exception: 556 pass 557 elif st == "external": 558 entry["status"] = "external" 559 elif st == "request_response": 560 entry["stream"] = spec.get("out_stream") 561 entry["status"] = "request_response" 562 else: 563 entry["status"] = "unknown" 564 except Exception as exc: 565 entry["status"] = "error" 566 entry["error"] = f"{type(exc).__name__}: {exc}" 567 bindings.append(entry) 568 569 await _close_redis_client(client) 570 return {"arena": name, "bindings": bindings, "now_ms": now_ms}