checking system…
Docs / back / src/maf/dashboard/routers/system.py · line 48
Python · 571 lines
  1"""System health + stream freshness endpoints.
  2
  3* ``GET /api/system/status``                — connectivity pills, cached 5s.
  4* ``GET /api/streams/health``               — XLEN + age per known Redis Stream.
  5* ``GET /api/arenas/{name}/freshness``      — per-binding freshness for the
  6                                              Setup tab. Resolves the freshness
  7                                              spec from each adapter's classmethod
  8                                              first, then ``_LEGACY_FRESHNESS_FALLBACK``.
  9
 10The endpoint that lists / samples ``/api/data/sources`` still lives in
 11:mod:`maf.dashboard.api` because it's coupled to the data-tab page; this
 12module owns only the health surface.
 13"""
 14
 15from __future__ import annotations
 16
 17import logging
 18import os
 19import time
 20from typing import Any
 21
 22from fastapi import APIRouter, HTTPException
 23
 24from maf.dashboard import state
 25
 26logger = logging.getLogger(__name__)
 27
 28router = APIRouter()
 29
 30
 31async def _close_redis_client(client: Any) -> None:
 32    """Best-effort close — tolerate redis-py 4.x/5.x/6.x signature drift."""
 33    try:
 34        ac = getattr(client, "aclose", None)
 35        if ac is not None:
 36            await ac()
 37            return
 38        await client.close()
 39    except Exception:
 40        pass
 41
 42
 43_STATUS_CACHE: dict[str, Any] = {"ts": 0.0, "payload": None}
 44_STATUS_TTL_S = 5.0
 45
 46
 47@router.get("/api/system/status")
 48async def system_status(force: bool = False) -> dict[str, Any]:
 49    """Check connectivity to every external dependency MAF needs.
 50
 51    Frontend renders this as a coloured row of pills. Each entry has
 52    ``name``, ``ok`` (bool), ``detail`` (one-liner), and ``latency_ms`` so
 53    a user can see at a glance whether the engine has everything it needs.
 54
 55    Result is cached for ``_STATUS_TTL_S`` seconds — the status bar polls
 56    every ~5s per tab so without caching we'd hammer Redis/Ollama/mirofish
 57    on every render with N tabs × N users. Pass ``?force=1`` to bypass.
 58    """
 59    import os
 60    import time
 61
 62    now = time.time()
 63    if not force and _STATUS_CACHE["payload"] is not None:
 64        if now - _STATUS_CACHE["ts"] < _STATUS_TTL_S:
 65            return _STATUS_CACHE["payload"]
 66
 67    results: list[dict[str, Any]] = []
 68
 69    # Redis (events stream)
 70    t0 = time.monotonic()
 71    redis_url = (
 72        state.get_maf_app().config.redis_url if state.get_maf_app() else
 73        os.environ.get("REDIS_URL", "redis://localhost:6379/0")
 74    )
 75    try:
 76        import redis.asyncio as aioredis
 77        client = aioredis.from_url(redis_url)
 78        await client.ping()
 79        await _close_redis_client(client)
 80        results.append({
 81            "name": "redis",
 82            "ok": True,
 83            "detail": redis_url,
 84            "latency_ms": round((time.monotonic() - t0) * 1000, 1),
 85        })
 86    except Exception as exc:
 87        results.append({
 88            "name": "redis",
 89            "ok": False,
 90            "detail": f"{type(exc).__name__}: {exc}",
 91            "latency_ms": round((time.monotonic() - t0) * 1000, 1),
 92        })
 93
 94    # Ollama Cloud — single-shot small completion. Cached briefly to avoid
 95    # one ping per dashboard render.
 96    t0 = time.monotonic()
 97    api_key = os.environ.get("OLLAMA_API_KEY", "")
 98    if not api_key:
 99        results.append({
100            "name": "ollama",
101            "ok": False,
102            "detail": "OLLAMA_API_KEY not set",
103            "latency_ms": 0.0,
104        })
105    else:
106        try:
107            import httpx
108            async with httpx.AsyncClient(timeout=10.0) as hc:
109                resp = await hc.get(
110                    "https://ollama.com/v1/models",
111                    headers={"Authorization": f"Bearer {api_key}"},
112                )
113                ok = resp.status_code == 200
114                detail = (
115                    f"{len(resp.json().get('data', []))} models" if ok else
116                    f"HTTP {resp.status_code}"
117                )
118        except Exception as exc:
119            ok = False
120            detail = f"{type(exc).__name__}: {exc}"
121        results.append({
122            "name": "ollama",
123            "ok": ok,
124            "detail": detail,
125            "latency_ms": round((time.monotonic() - t0) * 1000, 1),
126        })
127
128    # trtools2 importability
129    t0 = time.monotonic()
130    try:
131        import importlib
132        importlib.import_module("engine.db.questdb")
133        results.append({
134            "name": "trtools2",
135            "ok": True,
136            "detail": "engine.db.questdb importable",
137            "latency_ms": round((time.monotonic() - t0) * 1000, 1),
138        })
139    except Exception as exc:
140        results.append({
141            "name": "trtools2",
142            "ok": False,
143            "detail": f"{type(exc).__name__}: {exc}",
144            "latency_ms": round((time.monotonic() - t0) * 1000, 1),
145        })
146
147    # fomo2 importability
148    t0 = time.monotonic()
149    try:
150        import importlib
151        importlib.import_module("fomo2")
152        results.append({
153            "name": "fomo2",
154            "ok": True,
155            "detail": "fomo2 package importable",
156            "latency_ms": round((time.monotonic() - t0) * 1000, 1),
157        })
158    except Exception as exc:
159        results.append({
160            "name": "fomo2",
161            "ok": False,
162            "detail": f"{type(exc).__name__}: {exc}",
163            "latency_ms": round((time.monotonic() - t0) * 1000, 1),
164        })
165
166    # MiroFish reachability
167    t0 = time.monotonic()
168    mirofish_base = "http://localhost:5101"
169    if state.get_maf_app():
170        for m in state.get_maf_app().config.modules:
171            if m.name == "mirofish":
172                mirofish_base = m.config.get("base_url", mirofish_base)
173                break
174    try:
175        import httpx
176        async with httpx.AsyncClient(timeout=3.0) as hc:
177            resp = await hc.get(f"{mirofish_base}/health")
178            ok = resp.status_code == 200
179            detail = f"{mirofish_base} {resp.status_code}"
180    except Exception as exc:
181        ok = False
182        detail = f"{type(exc).__name__}: {exc}"
183    results.append({
184        "name": "mirofish",
185        "ok": ok,
186        "detail": detail,
187        "latency_ms": round((time.monotonic() - t0) * 1000, 1),
188    })
189
190    # Refresher heartbeats — read JSON heartbeat keys written by the workers.
191    # Each refresher writes a TTL'd key on every tick; if it's still there and
192    # young, the worker is alive.
193    import json as _json
194    import time as _time
195    refresher_specs = [
196        ("kronos_refresher", "maf:refresher:kronos:heartbeat", "Kronos forecast refresher"),
197        ("mirofish_refresher", "maf:refresher:mirofish:heartbeat", "MiroFish crowd-sim refresher"),
198    ]
199    for pill_name, hb_key, label in refresher_specs:
200        t0 = _time.monotonic()
201        try:
202            import redis.asyncio as aioredis
203            client = aioredis.from_url(redis_url)
204            raw = await client.get(hb_key)
205            await _close_redis_client(client)
206            if raw is None:
207                results.append({
208                    "name": pill_name, "ok": False,
209                    "detail": "not running (no heartbeat — start `python -m maf` for service mode)",
210                    "latency_ms": round((_time.monotonic() - t0) * 1000, 1),
211                })
212                continue
213            hb = _json.loads(raw if isinstance(raw, (bytes, str)) else raw)
214            if isinstance(hb, bytes):
215                hb = _json.loads(hb)
216            age = _time.time() - float(hb.get("ts", 0))
217            cadence = float(hb.get("min_cadence_s", 60))
218            ok = age < cadence * 2.5
219            watched = hb.get("watched")
220            detail = (
221                f"alive · {age:.0f}s ago"
222                + (f" · {watched} watched" if watched is not None else "")
223            )
224            if not ok:
225                detail = f"stale · last heartbeat {age:.0f}s ago"
226            results.append({
227                "name": pill_name, "ok": ok, "detail": detail,
228                "latency_ms": round((_time.monotonic() - t0) * 1000, 1),
229            })
230        except Exception as exc:
231            results.append({
232                "name": pill_name, "ok": False,
233                "detail": f"{type(exc).__name__}: {exc}",
234                "latency_ms": round((_time.monotonic() - t0) * 1000, 1),
235            })
236
237    payload = {"checks": results, "all_ok": all(r["ok"] for r in results), "cached_at": now}
238    _STATUS_CACHE["ts"] = now
239    _STATUS_CACHE["payload"] = payload
240    return payload
241
242
243@router.get("/api/streams/health")
244async def streams_health() -> dict[str, Any]:
245    """Snapshot of every Redis Stream MAF reads from or writes to.
246
247    Returns each stream's length (XLEN) and the timestamp of the most recent
248    entry (parsed from the stream id, which is a millisecond timestamp). The
249    frontend uses this to flag stale streams (e.g. trtools2 hasn't published
250    a bar in 10 min → likely engine down).
251    """
252    import os
253    import time
254
255    streams_cfg = getattr(state.get_maf_app().config, "streams", None) if state.get_maf_app() else None
256    redis_url = (
257        state.get_maf_app().config.redis_url if state.get_maf_app() else
258        os.environ.get("REDIS_URL", "redis://localhost:6379/0")
259    )
260
261    # Build the list of streams to inspect. Skip blanks so a partial config
262    # doesn't try to XLEN "".
263    targets: list[tuple[str, str]] = []
264    if streams_cfg is not None:
265        for field, label in (
266            ("events_stream", "events"),
267            ("control_in", "control:in"),
268            ("control_out", "control:out"),
269            ("actions_out", "actions:out"),
270            ("decisions_out", "decisions:out"),
271            ("trtools2_bars_1m", "trtools2:bars:1m"),
272            ("trtools2_bars_1h", "trtools2:bars:1h"),
273            ("trtools2_news", "trtools2:news"),
274            ("trtools2_indicators", "trtools2:indicators"),
275            ("trtools2_strategy_events", "trtools2:strategy:events"),
276            ("fomo2_enriched", "fomo2:enriched"),
277            ("fomo2_reports", "fomo2:reports"),
278            ("fomo2_requests", "fomo2:requests:in"),
279            ("kronos_forecasts_emitted", "kronos:forecasts:emitted"),
280            ("mirofish_sims_emitted", "mirofish:sims:emitted"),
281        ):
282            value = getattr(streams_cfg, field, "")
283            if value:
284                targets.append((label, value))
285
286    # Also include each arena's own output_stream — the user wants to see
287    # decision streams in the same panel.
288    if state.get_maf_app():
289        for name in state.get_maf_app().list_arenas():
290            arena = state.get_maf_app().get_arena(name)
291            if arena and arena.config.output_stream:
292                targets.append((f"arena:{name}", arena.config.output_stream))
293
294    try:
295        import redis.asyncio as aioredis
296        client = aioredis.from_url(redis_url)
297    except Exception as exc:
298        return {"streams": [], "error": f"{type(exc).__name__}: {exc}"}
299
300    out: list[dict[str, Any]] = []
301    now_ms = int(time.time() * 1000)
302    for label, stream in targets:
303        try:
304            length = int(await client.xlen(stream))
305            last = await client.xrevrange(stream, count=1)
306            if last:
307                raw_id = last[0][0]
308                sid = raw_id.decode() if isinstance(raw_id, bytes) else str(raw_id)
309                ts_part = sid.split("-", 1)[0]
310                try:
311                    last_ms = int(ts_part)
312                except ValueError:
313                    last_ms = 0
314                age_s = round(max(0, now_ms - last_ms) / 1000.0, 1) if last_ms else None
315            else:
316                sid = ""
317                age_s = None
318            out.append({
319                "label": label,
320                "stream": stream,
321                "length": length,
322                "last_id": sid,
323                "age_seconds": age_s,
324            })
325        except Exception as exc:
326            out.append({
327                "label": label,
328                "stream": stream,
329                "length": None,
330                "error": f"{type(exc).__name__}: {exc}",
331            })
332    await _close_redis_client(client)
333
334    return {"streams": out, "redis_url": redis_url, "now_ms": now_ms}
335
336
337# ── Per-binding freshness — maps each arena's source bindings to the
338# Redis stream or key they read from and reports age/length so the Setup
339# tab can render "live · 12s" / "stale · 1.6h" badges per source.
340# ─────────────────────────────────────────────────────────────────────
341
342
343# Fallback freshness specs for adapters that haven't migrated to declaring
344# their own ``freshness_spec()`` classmethod yet. The endpoint prefers the
345# classmethod when present (so a new adapter's spec lives next to its
346# code), and only consults this map as a last resort. Each entry here is
347# a candidate for migration — moving a row out of this map onto the
348# adapter class is a pure refactor with no behaviour change.
349_LEGACY_FRESHNESS_FALLBACK: dict[str, dict[str, Any]] = {
350    "trtools2_signals":        {"type": "stream", "stream": "trtools2:signals"},
351    "trtools2_live":           {"type": "stream", "stream": "trtools2:live"},
352    "fomo2_stream":            {"type": "stream", "stream": "fomo2:enriched"},
353    "fomo2_items":             {"type": "stream", "stream": "fomo2:items"},
354    "fomo2_reports":           {"type": "stream", "stream": "fomo2:reports"},
355    "fomo2_report":            {"type": "stream", "stream": "fomo2:reports"},
356    "fomo2_enrichment":        {"type": "stream", "stream": "fomo2:enriched"},
357    "fomo2_request":           {"type": "request_response",
358                                "out_stream": "fomo2:requests:out"},
359    "fomo2_knowledge":         {"type": "scan", "scan_pattern": "fomo2:knowledge:*"},
360    "fomo2_analysis":          {"type": "scan", "scan_pattern": "fomo2:analysis:*"},
361    "mirofish_crowd":          {"type": "external",
362                                "detail": "Each fetch calls the MiroFish HTTP API directly."},
363    "tradingagents":           {"type": "stream", "stream": "trading_intelligence:in"},
364    # External HTTP / SQL — no cache
365    "questdb":                 {"type": "external", "detail": "Direct SQL against QuestDB"},
366    "yahoo_finance":           {"type": "external", "detail": "Yahoo Finance HTTP"},
367    "web_search":              {"type": "external", "detail": "Web search HTTP"},
368    "sec_filings":             {"type": "external", "detail": "SEC EDGAR HTTP"},
369    "newsapi":                 {"type": "external", "detail": "NewsAPI HTTP"},
370    "fred_api":                {"type": "external", "detail": "FRED HTTP"},
371    "coingecko":               {"type": "external", "detail": "CoinGecko HTTP"},
372    "alpaca":                  {"type": "external", "detail": "Alpaca REST/SDK (deprecated — use trtools2_api)"},
373    "polymarket_bet":          {"type": "external", "detail": "Polymarket Gamma/CLOB HTTP"},
374    "oddsoddy_strategy":       {"type": "external",
375                                "detail": "Reads ../oddsoddy/strategies/{name}/"},
376    "technical":               {"type": "external", "detail": "Computed on demand"},
377}
378
379
380def _resolve_template(template: str, cfg: dict[str, Any]) -> str:
381    out = template
382    for k, v in (cfg or {}).items():
383        out = out.replace("{" + str(k) + "}", str(v))
384    return out
385
386
387def _freshness_spec_for_adapter(adapter_name: str, binding_config: dict[str, Any]) -> dict[str, Any]:
388    """Resolve a freshness spec for one binding.
389
390    Preference order:
391      1. The adapter class's ``freshness_spec(binding_config)`` classmethod —
392         this is where new adapters declare their own spec, co-located with
393         the fetch logic that produces the data.
394      2. The legacy fallback map above — for adapters that haven't migrated
395         yet. Removing rows from this map should be a no-op as long as the
396         corresponding adapter class declares ``freshness_spec``.
397      3. ``{"type": "unknown"}`` — honest default, dashboard renders "unknown".
398    """
399    from maf.sources.registry import _ADAPTER_REGISTRY
400    cls = _ADAPTER_REGISTRY.get(adapter_name)
401    if cls is not None:
402        try:
403            spec = cls.freshness_spec(binding_config or {})
404            if isinstance(spec, dict) and spec.get("type") and spec["type"] != "unknown":
405                return spec
406        except Exception as exc:
407            logger.warning("%s.freshness_spec raised %s — falling back", adapter_name, exc)
408    if adapter_name in _LEGACY_FRESHNESS_FALLBACK:
409        return _LEGACY_FRESHNESS_FALLBACK[adapter_name]
410    return {"type": "unknown"}
411
412
413def _classify_age(age_s: float | None, length: int | None) -> str:
414    if length == 0:
415        return "empty"
416    if age_s is None:
417        return "unknown"
418    if age_s < 300:
419        return "live"
420    if age_s < 3600:
421        return "stale"
422    return "old"
423
424
425@router.get("/api/arenas/{name}/freshness")
426async def arena_freshness(name: str) -> dict[str, Any]:
427    """Per-source freshness for an arena's bindings.
428
429    Backs the badges on the Setup tab — "live · 12s ago" / "stale · 1.6h" /
430    "empty" / "external API". The map of adapter→stream/key lives at
431    ``_ADAPTER_FRESHNESS_MAP`` near this handler.
432    """
433    import os
434    import time as _time
435
436    if not state.get_maf_app():
437        raise HTTPException(500, "MAF not initialized")
438    arena = state.get_maf_app().get_arena(name)
439    if not arena:
440        raise HTTPException(404, f"Arena {name!r} not found")
441
442    redis_url = (
443        state.get_maf_app().config.redis_url if state.get_maf_app() else
444        os.environ.get("REDIS_URL", "redis://localhost:6379/0")
445    )
446    try:
447        import redis.asyncio as aioredis
448        client = aioredis.from_url(redis_url)
449    except Exception as exc:
450        raise HTTPException(500, f"redis: {exc}")
451
452    now_ms = int(_time.time() * 1000)
453    bindings: list[dict[str, Any]] = []
454    for b in arena.config.sources:
455        spec = _freshness_spec_for_adapter(b.adapter, b.config or {})
456        entry: dict[str, Any] = {
457            "name": b.name,
458            "adapter": b.adapter,
459            "source_type": spec.get("type", "unknown"),
460            "detail": spec.get("detail"),
461            "stream": None, "length": None, "age_seconds": None,
462            "status": "n/a",
463        }
464        st = spec.get("type")
465        try:
466            if st == "stream":
467                stream = spec.get("stream") or _resolve_template(
468                    spec.get("stream_template", ""), b.config or {},
469                )
470                entry["stream"] = stream
471                entry["length"] = int(await client.xlen(stream))
472                last = await client.xrevrange(stream, count=1)
473                if last:
474                    raw_id = last[0][0]
475                    sid = raw_id.decode() if isinstance(raw_id, bytes) else str(raw_id)
476                    try:
477                        last_ms = int(sid.split("-", 1)[0])
478                    except ValueError:
479                        last_ms = 0
480                    entry["age_seconds"] = round(max(0, now_ms - last_ms) / 1000.0, 1) if last_ms else None
481                entry["status"] = _classify_age(entry["age_seconds"], entry["length"])
482            elif st == "key":
483                template = spec.get("key_pattern") or spec.get("key_template", "")
484                pattern = _resolve_template(template, b.config or {})
485                # Count how many cache keys exist matching the pattern.
486                # We deliberately do NOT use the remaining TTL as an age
487                # proxy — TTL is configured per refresher and unknown
488                # here, so the math is misleading. Instead, the emit
489                # stream (when defined) gives us a real "last refreshed"
490                # timestamp. If no emit stream, report cache presence
491                # only — age is honestly unknown.
492                cursor = 0
493                count = 0
494                while True:
495                    cursor, batch = await client.scan(cursor=cursor, match=pattern, count=100)
496                    count += len(batch)
497                    if cursor == 0 or count >= 200:
498                        break
499                entry["length"] = count
500                entry["age_seconds"] = None
501                entry["status"] = "live" if count > 0 else "missing"
502                if count > 0:
503                    entry["detail"] = (
504                        f"{count} cached key(s) matching {pattern} "
505                        f"(write-age unknown without emit stream)"
506                    )
507                # Cross-reference the emit stream for a real "last refresh" signal.
508                emit = spec.get("emit_stream")
509                if emit:
510                    try:
511                        last = await client.xrevrange(emit, count=1)
512                        if last:
513                            sid = last[0][0]
514                            sid = sid.decode() if isinstance(sid, bytes) else str(sid)
515                            try:
516                                last_ms = int(sid.split("-", 1)[0])
517                            except ValueError:
518                                last_ms = 0
519                            if last_ms:
520                                age = round(max(0, now_ms - last_ms) / 1000.0, 1)
521                                entry["age_seconds"] = age
522                                entry["status"] = _classify_age(age, count)
523                                entry["emit_stream"] = emit
524                                entry["detail"] = None
525                    except Exception:
526                        pass
527            elif st == "scan":
528                pattern = spec["scan_pattern"]
529                count = 0
530                cursor = 0
531                while True:
532                    cursor, batch = await client.scan(cursor=cursor, match=pattern, count=100)
533                    count += len(batch)
534                    if cursor == 0 or count >= 500:
535                        break
536                entry["length"] = count
537                entry["status"] = "live" if count > 0 else "missing"
538                # Cross-reference an emit stream if defined
539                emit = spec.get("emit_stream")
540                if emit:
541                    try:
542                        last = await client.xrevrange(emit, count=1)
543                        if last:
544                            sid = last[0][0]
545                            sid = sid.decode() if isinstance(sid, bytes) else str(sid)
546                            try:
547                                last_ms = int(sid.split("-", 1)[0])
548                            except ValueError:
549                                last_ms = 0
550                            if last_ms:
551                                age = round(max(0, now_ms - last_ms) / 1000.0, 1)
552                                entry["age_seconds"] = age
553                                entry["status"] = _classify_age(age, count)
554                                entry["emit_stream"] = emit
555                    except Exception:
556                        pass
557            elif st == "external":
558                entry["status"] = "external"
559            elif st == "request_response":
560                entry["stream"] = spec.get("out_stream")
561                entry["status"] = "request_response"
562            else:
563                entry["status"] = "unknown"
564        except Exception as exc:
565            entry["status"] = "error"
566            entry["error"] = f"{type(exc).__name__}: {exc}"
567        bindings.append(entry)
568
569    await _close_redis_client(client)
570    return {"arena": name, "bindings": bindings, "now_ms": now_ms}