1"""Channels browser endpoints. 2 3* ``GET /api/channels`` — categorised list of every Redis Stream MAF knows about. 4* ``GET /api/channels/preview`` — decoded recent entries + inferred schema. 5* ``GET /channels`` — HTML page. 6 7Self-contained: depends only on ``maf.channels`` (discover/preview/infer) 8and the shared :mod:`maf.dashboard.state` for the runtime singletons. 9""" 10 11from __future__ import annotations 12 13from pathlib import Path 14from typing import Any 15 16from fastapi import APIRouter, Request 17from fastapi.responses import HTMLResponse 18from fastapi.templating import Jinja2Templates 19 20from maf.dashboard import state 21 22router = APIRouter() 23_TEMPLATES_DIR = Path(__file__).resolve().parents[1] / "templates" 24templates = Jinja2Templates(directory=str(_TEMPLATES_DIR)) 25 26 27@router.get("/api/channels") 28async def api_channels() -> dict[str, Any]: 29 """Categorised list of every channel MAF knows about, with length + last_id.""" 30 from maf.channels import discover_channels 31 32 maf_app = state.get_maf_app() 33 streams_cfg = getattr(maf_app.config, "streams", None) if maf_app else None 34 arena_streams: list[str] = [] 35 if maf_app: 36 for name in maf_app.list_arenas(): 37 arena = maf_app.get_arena(name) 38 if arena and arena.config.output_stream: 39 arena_streams.append(arena.config.output_stream) 40 41 channels = await discover_channels( 42 streams_cfg=streams_cfg, 43 arena_output_streams=arena_streams, 44 redis_url=state.get_redis_url(), 45 ) 46 return { 47 "channels": [ 48 { 49 "stream": c.stream, 50 "category": c.category, 51 "label": c.label, 52 "description": c.description, 53 "source": c.source, 54 "length": c.length, 55 "last_id": c.last_id, 56 } 57 for c in channels 58 ], 59 "categories": ["arena_output", "derived", "input", "control"], 60 } 61 62 63@router.get("/api/channels/preview") 64async def api_channel_preview(stream: str, count: int = 25) -> dict[str, Any]: 65 """Decoded recent entries + inferred schema for one stream.""" 66 from maf.channels import infer_schema, preview_stream 67 68 count = max(1, min(int(count), 200)) 69 entries = await preview_stream(state.get_redis_url(), stream, count=count) 70 schema = infer_schema(entries) 71 return { 72 "stream": stream, 73 "count": len(entries), 74 "entries": [ 75 {"stream_id": e.stream_id, "ts_ms": e.ts_ms, "fields": e.fields} 76 for e in entries 77 ], 78 "schema": [ 79 { 80 "name": f.name, "types": f.types, "presence": f.presence, 81 "samples": f.samples, "is_nested": f.is_nested, 82 } 83 for f in schema 84 ], 85 } 86 87 88@router.get("/channels", response_class=HTMLResponse) 89async def page_channels(request: Request) -> HTMLResponse: 90 """Channels browser — pick a stream, see schema + recent entries.""" 91 return templates.TemplateResponse("channels.html", { 92 "request": request, 93 "active_nav": "channels", 94 })