1"""Specialised arena output panels — crowd-simulation oracle + mastermind decisions. 2 3Both endpoints tail one Redis Stream, decode each entry via the arena's 4canonical ``decode_envelope`` helper, and shape the result for the UI. 5Unknown ``schema_version`` rows are surfaced as ``skipped`` so the panel 6can show "something arrived but we couldn't read it". 7""" 8 9from __future__ import annotations 10 11import os 12from typing import Any 13 14from fastapi import APIRouter 15 16router = APIRouter() 17 18 19_DEFAULT_ORACLE_STREAM = "maf:arena:crowd_simulation:output" 20_DEFAULT_MASTERMIND_STREAM = "maf:arena:mastermind:output" 21 22 23async def _close_redis(client: Any) -> None: 24 """Best-effort close; tolerate redis-py 4.x/5.x/6.x signature drift.""" 25 try: 26 ac = getattr(client, "aclose", None) 27 if ac is not None: 28 await ac() 29 return 30 await client.close() 31 except Exception: 32 pass 33 34 35@router.get("/api/oracle/envelopes") 36async def list_oracle_envelopes(count: int = 20) -> dict[str, Any]: 37 """Return the latest N envelopes from the crowd_simulation Redis stream.""" 38 from maf.arenas.crowd_simulation.stream import _extract_data_field, decode_envelope 39 40 redis_url = os.environ.get("REDIS_URL", "redis://localhost:6379/0") 41 stream = os.environ.get("CROWD_ORACLE_STREAM", _DEFAULT_ORACLE_STREAM) 42 count = max(1, min(int(count), 200)) 43 44 try: 45 import redis.asyncio as aioredis 46 except ImportError: 47 return {"envelopes": [], "stream": stream, "error": "redis package not installed"} 48 49 client = aioredis.from_url(redis_url) 50 try: 51 rows = await client.xrevrange(stream, count=count) 52 except Exception as exc: 53 await _close_redis(client) 54 return {"envelopes": [], "stream": stream, "error": f"{type(exc).__name__}: {exc}"} 55 await _close_redis(client) 56 57 envelopes: list[dict[str, Any]] = [] 58 for stream_id_raw, fields in rows: 59 sid = stream_id_raw.decode() if isinstance(stream_id_raw, bytes) else str(stream_id_raw) 60 data_payload = _extract_data_field(fields) 61 env = decode_envelope(data_payload) if data_payload is not None else None 62 if env is None: 63 envelopes.append({ 64 "stream_id": sid, "skipped": True, 65 "reason": "decode_failed_or_unsupported_schema_version", 66 }) 67 continue 68 envelopes.append({ 69 "stream_id": sid, 70 "schema_version": env.schema_version, 71 "decision_kind": env.decision_kind, 72 "outcome": env.prediction.outcome, 73 "probability": env.prediction.probability, 74 "dissent_pct": env.prediction.dissent_pct, 75 "horizon_hours": env.prediction.horizon_hours, 76 "source_type": env.prediction.source_type, 77 "source_id": env.prediction.source_id, 78 "top_drivers": list(env.prediction.top_drivers), 79 "model_votes": [v.model_dump(mode="json") for v in env.prediction.model_votes], 80 "generated_at": env.prediction.generated_at.isoformat(), 81 "decision": env.decision, 82 "published_at": env.published_at.isoformat(), 83 "correlation_id": env.correlation_id, 84 "meta": dict(env.meta), 85 }) 86 87 return {"envelopes": envelopes, "stream": stream, "redis_url": redis_url} 88 89 90@router.get("/api/mastermind/decisions") 91async def list_mastermind_decisions(count: int = 20) -> dict[str, Any]: 92 """Return the latest N Decision envelopes from the mastermind Redis stream.""" 93 from maf.arenas.mastermind.stream import _extract_data_field, decode_envelope 94 95 redis_url = os.environ.get("REDIS_URL", "redis://localhost:6379/0") 96 stream = os.environ.get("MASTERMIND_STREAM", _DEFAULT_MASTERMIND_STREAM) 97 count = max(1, min(int(count), 200)) 98 99 try: 100 import redis.asyncio as aioredis 101 except ImportError: 102 return { 103 "decisions": [], "stream": stream, "redis_url": redis_url, 104 "error": "redis package not installed", 105 } 106 107 client = aioredis.from_url(redis_url) 108 try: 109 rows = await client.xrevrange(stream, count=count) 110 except Exception as exc: 111 await _close_redis(client) 112 return { 113 "decisions": [], "stream": stream, "redis_url": redis_url, 114 "error": f"{type(exc).__name__}: {exc}", 115 } 116 await _close_redis(client) 117 118 decisions: list[dict[str, Any]] = [] 119 for stream_id_raw, fields in rows: 120 sid = stream_id_raw.decode() if isinstance(stream_id_raw, bytes) else str(stream_id_raw) 121 data_payload = _extract_data_field(fields) 122 env = decode_envelope(data_payload) if data_payload is not None else None 123 if env is None: 124 decisions.append({ 125 "stream_id": sid, "skipped": True, 126 "reason": "decode_failed_or_unsupported_schema_version", 127 }) 128 continue 129 d = env.decision 130 decisions.append({ 131 "stream_id": sid, 132 "schema_version": env.schema_version, 133 "decision_id": d.decision_id, 134 "question": d.question, 135 "recommendation": d.recommendation, 136 "confidence": d.confidence, 137 "dissent_pct": d.dissent_pct, 138 "domain": d.domain, 139 "horizon": d.horizon, 140 "reasoning": d.reasoning, 141 "argument_tree": d.argument_tree.model_dump(mode="json"), 142 "arena_votes": [v.model_dump(mode="json") for v in d.arena_votes], 143 "memory_citations": [m.model_dump(mode="json") for m in d.memory_citations], 144 "graph_citations": [g.model_dump(mode="json") for g in d.graph_citations], 145 "outcome": d.outcome, 146 "lesson": d.lesson, 147 "generated_at": d.generated_at.isoformat(), 148 "published_at": env.published_at.isoformat(), 149 "correlation_id": env.correlation_id, 150 "meta": dict(env.meta), 151 }) 152 153 return { 154 "decisions": decisions, "stream": stream, "redis_url": redis_url, "error": None, 155 }