checking system…
Docs / back / src/maf/dashboard/routers/channels.py · line 64
Python · 95 lines
 1"""Channels browser endpoints.
 2
 3* ``GET /api/channels`` — categorised list of every Redis Stream MAF knows about.
 4* ``GET /api/channels/preview`` — decoded recent entries + inferred schema.
 5* ``GET /channels`` — HTML page.
 6
 7Self-contained: depends only on ``maf.channels`` (discover/preview/infer)
 8and the shared :mod:`maf.dashboard.state` for the runtime singletons.
 9"""
10
11from __future__ import annotations
12
13from pathlib import Path
14from typing import Any
15
16from fastapi import APIRouter, Request
17from fastapi.responses import HTMLResponse
18from fastapi.templating import Jinja2Templates
19
20from maf.dashboard import state
21
22router = APIRouter()
23_TEMPLATES_DIR = Path(__file__).resolve().parents[1] / "templates"
24templates = Jinja2Templates(directory=str(_TEMPLATES_DIR))
25
26
27@router.get("/api/channels")
28async def api_channels() -> dict[str, Any]:
29    """Categorised list of every channel MAF knows about, with length + last_id."""
30    from maf.channels import discover_channels
31
32    maf_app = state.get_maf_app()
33    streams_cfg = getattr(maf_app.config, "streams", None) if maf_app else None
34    arena_streams: list[str] = []
35    if maf_app:
36        for name in maf_app.list_arenas():
37            arena = maf_app.get_arena(name)
38            if arena and arena.config.output_stream:
39                arena_streams.append(arena.config.output_stream)
40
41    channels = await discover_channels(
42        streams_cfg=streams_cfg,
43        arena_output_streams=arena_streams,
44        redis_url=state.get_redis_url(),
45    )
46    return {
47        "channels": [
48            {
49                "stream": c.stream,
50                "category": c.category,
51                "label": c.label,
52                "description": c.description,
53                "source": c.source,
54                "length": c.length,
55                "last_id": c.last_id,
56            }
57            for c in channels
58        ],
59        "categories": ["arena_output", "derived", "input", "control"],
60    }
61
62
63@router.get("/api/channels/preview")
64async def api_channel_preview(stream: str, count: int = 25) -> dict[str, Any]:
65    """Decoded recent entries + inferred schema for one stream."""
66    from maf.channels import infer_schema, preview_stream
67
68    count = max(1, min(int(count), 200))
69    entries = await preview_stream(state.get_redis_url(), stream, count=count)
70    schema = infer_schema(entries)
71    return {
72        "stream": stream,
73        "count": len(entries),
74        "entries": [
75            {"stream_id": e.stream_id, "ts_ms": e.ts_ms, "fields": e.fields}
76            for e in entries
77        ],
78        "schema": [
79            {
80                "name": f.name, "types": f.types, "presence": f.presence,
81                "samples": f.samples, "is_nested": f.is_nested,
82            }
83            for f in schema
84        ],
85    }
86
87
88@router.get("/channels", response_class=HTMLResponse)
89async def page_channels(request: Request) -> HTMLResponse:
90    """Channels browser — pick a stream, see schema + recent entries."""
91    return templates.TemplateResponse("channels.html", {
92        "request": request,
93        "active_nav": "channels",
94    })