checking system…
Docs / back / src/maf/dashboard/routers/specialised.py · line 38
Python · 156 lines
  1"""Specialised arena output panels — crowd-simulation oracle + mastermind decisions.
  2
  3Both endpoints tail one Redis Stream, decode each entry via the arena's
  4canonical ``decode_envelope`` helper, and shape the result for the UI.
  5Unknown ``schema_version`` rows are surfaced as ``skipped`` so the panel
  6can show "something arrived but we couldn't read it".
  7"""
  8
  9from __future__ import annotations
 10
 11import os
 12from typing import Any
 13
 14from fastapi import APIRouter
 15
 16router = APIRouter()
 17
 18
 19_DEFAULT_ORACLE_STREAM = "maf:arena:crowd_simulation:output"
 20_DEFAULT_MASTERMIND_STREAM = "maf:arena:mastermind:output"
 21
 22
 23async def _close_redis(client: Any) -> None:
 24    """Best-effort close; tolerate redis-py 4.x/5.x/6.x signature drift."""
 25    try:
 26        ac = getattr(client, "aclose", None)
 27        if ac is not None:
 28            await ac()
 29            return
 30        await client.close()
 31    except Exception:
 32        pass
 33
 34
 35@router.get("/api/oracle/envelopes")
 36async def list_oracle_envelopes(count: int = 20) -> dict[str, Any]:
 37    """Return the latest N envelopes from the crowd_simulation Redis stream."""
 38    from maf.arenas.crowd_simulation.stream import _extract_data_field, decode_envelope
 39
 40    redis_url = os.environ.get("REDIS_URL", "redis://localhost:6379/0")
 41    stream = os.environ.get("CROWD_ORACLE_STREAM", _DEFAULT_ORACLE_STREAM)
 42    count = max(1, min(int(count), 200))
 43
 44    try:
 45        import redis.asyncio as aioredis
 46    except ImportError:
 47        return {"envelopes": [], "stream": stream, "error": "redis package not installed"}
 48
 49    client = aioredis.from_url(redis_url)
 50    try:
 51        rows = await client.xrevrange(stream, count=count)
 52    except Exception as exc:
 53        await _close_redis(client)
 54        return {"envelopes": [], "stream": stream, "error": f"{type(exc).__name__}: {exc}"}
 55    await _close_redis(client)
 56
 57    envelopes: list[dict[str, Any]] = []
 58    for stream_id_raw, fields in rows:
 59        sid = stream_id_raw.decode() if isinstance(stream_id_raw, bytes) else str(stream_id_raw)
 60        data_payload = _extract_data_field(fields)
 61        env = decode_envelope(data_payload) if data_payload is not None else None
 62        if env is None:
 63            envelopes.append({
 64                "stream_id": sid, "skipped": True,
 65                "reason": "decode_failed_or_unsupported_schema_version",
 66            })
 67            continue
 68        envelopes.append({
 69            "stream_id": sid,
 70            "schema_version": env.schema_version,
 71            "decision_kind": env.decision_kind,
 72            "outcome": env.prediction.outcome,
 73            "probability": env.prediction.probability,
 74            "dissent_pct": env.prediction.dissent_pct,
 75            "horizon_hours": env.prediction.horizon_hours,
 76            "source_type": env.prediction.source_type,
 77            "source_id": env.prediction.source_id,
 78            "top_drivers": list(env.prediction.top_drivers),
 79            "model_votes": [v.model_dump(mode="json") for v in env.prediction.model_votes],
 80            "generated_at": env.prediction.generated_at.isoformat(),
 81            "decision": env.decision,
 82            "published_at": env.published_at.isoformat(),
 83            "correlation_id": env.correlation_id,
 84            "meta": dict(env.meta),
 85        })
 86
 87    return {"envelopes": envelopes, "stream": stream, "redis_url": redis_url}
 88
 89
 90@router.get("/api/mastermind/decisions")
 91async def list_mastermind_decisions(count: int = 20) -> dict[str, Any]:
 92    """Return the latest N Decision envelopes from the mastermind Redis stream."""
 93    from maf.arenas.mastermind.stream import _extract_data_field, decode_envelope
 94
 95    redis_url = os.environ.get("REDIS_URL", "redis://localhost:6379/0")
 96    stream = os.environ.get("MASTERMIND_STREAM", _DEFAULT_MASTERMIND_STREAM)
 97    count = max(1, min(int(count), 200))
 98
 99    try:
100        import redis.asyncio as aioredis
101    except ImportError:
102        return {
103            "decisions": [], "stream": stream, "redis_url": redis_url,
104            "error": "redis package not installed",
105        }
106
107    client = aioredis.from_url(redis_url)
108    try:
109        rows = await client.xrevrange(stream, count=count)
110    except Exception as exc:
111        await _close_redis(client)
112        return {
113            "decisions": [], "stream": stream, "redis_url": redis_url,
114            "error": f"{type(exc).__name__}: {exc}",
115        }
116    await _close_redis(client)
117
118    decisions: list[dict[str, Any]] = []
119    for stream_id_raw, fields in rows:
120        sid = stream_id_raw.decode() if isinstance(stream_id_raw, bytes) else str(stream_id_raw)
121        data_payload = _extract_data_field(fields)
122        env = decode_envelope(data_payload) if data_payload is not None else None
123        if env is None:
124            decisions.append({
125                "stream_id": sid, "skipped": True,
126                "reason": "decode_failed_or_unsupported_schema_version",
127            })
128            continue
129        d = env.decision
130        decisions.append({
131            "stream_id": sid,
132            "schema_version": env.schema_version,
133            "decision_id": d.decision_id,
134            "question": d.question,
135            "recommendation": d.recommendation,
136            "confidence": d.confidence,
137            "dissent_pct": d.dissent_pct,
138            "domain": d.domain,
139            "horizon": d.horizon,
140            "reasoning": d.reasoning,
141            "argument_tree": d.argument_tree.model_dump(mode="json"),
142            "arena_votes": [v.model_dump(mode="json") for v in d.arena_votes],
143            "memory_citations": [m.model_dump(mode="json") for m in d.memory_citations],
144            "graph_citations": [g.model_dump(mode="json") for g in d.graph_citations],
145            "outcome": d.outcome,
146            "lesson": d.lesson,
147            "generated_at": d.generated_at.isoformat(),
148            "published_at": env.published_at.isoformat(),
149            "correlation_id": env.correlation_id,
150            "meta": dict(env.meta),
151        })
152
153    return {
154        "decisions": decisions, "stream": stream, "redis_url": redis_url, "error": None,
155    }