checking system…
Docs / back / src/maf/channels/preview.py · line 151
Python · 186 lines
  1"""Stream preview + schema inference.
  2
  3``preview_stream`` reads the most recent N entries and decodes each entry's
  4fields into a JSON-friendly dict (per-field JSON-aware: a string that
  5looks like JSON is parsed). The decoded entries surface in the dashboard
  6as a "what's in this channel?" table.
  7
  8``infer_schema`` walks a list of entries and produces a per-key
  9:class:`SchemaField` describing observed types, sample values, and
 10presence frequency. That's enough for a user to decide whether a channel
 11is what they want before binding it.
 12"""
 13
 14from __future__ import annotations
 15
 16import json
 17import logging
 18from collections import Counter
 19from dataclasses import dataclass, field
 20from typing import Any
 21
 22logger = logging.getLogger(__name__)
 23
 24
 25_MAX_SAMPLE_CHARS = 200    # per sample value
 26
 27
 28@dataclass
 29class PreviewEntry:
 30    """One decoded stream entry."""
 31
 32    stream_id: str
 33    ts_ms: int                   # parsed from the stream id (first segment)
 34    fields: dict[str, Any] = field(default_factory=dict)
 35
 36
 37@dataclass
 38class SchemaField:
 39    """Observed shape of one field across N samples."""
 40
 41    name: str
 42    types: dict[str, int] = field(default_factory=dict)   # type name → count
 43    presence: int = 0                                      # n entries containing it
 44    samples: list[str] = field(default_factory=list)       # short string samples
 45    is_nested: bool = False                                # carries dict/list
 46
 47
 48def _parse_ts_ms(stream_id: str) -> int:
 49    if not stream_id:
 50        return 0
 51    head = stream_id.split("-", 1)[0]
 52    try:
 53        return int(head)
 54    except ValueError:
 55        return 0
 56
 57
 58def _decode_value(v: Any) -> Any:
 59    """Decode one Redis field value, trying JSON before falling back to str."""
 60    if isinstance(v, bytes):
 61        try:
 62            v = v.decode("utf-8")
 63        except UnicodeDecodeError:
 64            return repr(v)
 65    if isinstance(v, str):
 66        s = v.strip()
 67        if s.startswith(("{", "[", '"')) or s in ("true", "false", "null") or _looks_numeric(s):
 68            try:
 69                return json.loads(s)
 70            except (json.JSONDecodeError, ValueError):
 71                return v
 72    return v
 73
 74
 75def _looks_numeric(s: str) -> bool:
 76    if not s:
 77        return False
 78    return s.lstrip("-+").replace(".", "", 1).isdigit()
 79
 80
 81def _decode_entry(entry_id: Any, raw_fields: Any) -> PreviewEntry:
 82    sid = entry_id.decode("utf-8") if isinstance(entry_id, bytes) else str(entry_id)
 83    out: dict[str, Any] = {}
 84    if isinstance(raw_fields, dict):
 85        for k, v in raw_fields.items():
 86            key = k.decode("utf-8") if isinstance(k, bytes) else str(k)
 87            out[key] = _decode_value(v)
 88    # Convention across MAF: many publishers stuff the whole payload under a
 89    # single ``data`` field. Unwrap it if it's the only field and it's a dict.
 90    inner = out.get("data")
 91    if len(out) == 1 and isinstance(inner, dict):
 92        out = inner
 93    return PreviewEntry(stream_id=sid, ts_ms=_parse_ts_ms(sid), fields=out)
 94
 95
 96async def preview_stream(
 97    redis_url: str, stream: str, *, count: int = 25,
 98) -> list[PreviewEntry]:
 99    """Return the most recent ``count`` entries from ``stream``, decoded."""
100    try:
101        import redis.asyncio as aioredis
102        client = aioredis.from_url(redis_url)
103    except Exception as exc:
104        logger.warning("preview_stream connect %s failed: %s", stream, exc)
105        return []
106    try:
107        rows = await client.xrevrange(stream, count=count)
108    except Exception as exc:
109        logger.warning("preview_stream xrevrange %s failed: %s", stream, exc)
110        return []
111    finally:
112        try:
113            ac = getattr(client, "aclose", None)
114            if ac:
115                await ac()
116            else:
117                await client.close()
118        except Exception:
119            pass
120    return [_decode_entry(e_id, fields) for e_id, fields in rows]
121
122
123def _typename(v: Any) -> str:
124    if v is None:
125        return "null"
126    if isinstance(v, bool):
127        return "bool"          # check before int (bool is an int subclass)
128    if isinstance(v, int):
129        return "int"
130    if isinstance(v, float):
131        return "float"
132    if isinstance(v, str):
133        return "string"
134    if isinstance(v, dict):
135        return "object"
136    if isinstance(v, list):
137        return "array"
138    return type(v).__name__
139
140
141def _sample(v: Any) -> str:
142    if isinstance(v, (dict, list)):
143        s = json.dumps(v, default=str, separators=(",", ":"))
144    else:
145        s = str(v)
146    if len(s) > _MAX_SAMPLE_CHARS:
147        s = s[:_MAX_SAMPLE_CHARS] + "…"
148    return s
149
150
151def infer_schema(entries: list[PreviewEntry]) -> list[SchemaField]:
152    """Build a per-field schema from a sample of entries.
153
154    For each key observed across the sample we report:
155      - which types we saw (counter)
156      - presence count (how many entries had this key)
157      - up to 3 short sample values
158      - whether the field ever carries nested dict / list values
159    """
160    by_key: dict[str, SchemaField] = {}
161    for entry in entries:
162        for k, v in entry.fields.items():
163            sf = by_key.setdefault(k, SchemaField(name=k))
164            sf.presence += 1
165            tname = _typename(v)
166            sf.types[tname] = sf.types.get(tname, 0) + 1
167            if tname in ("object", "array"):
168                sf.is_nested = True
169            if len(sf.samples) < 3:
170                sample = _sample(v)
171                if sample not in sf.samples:
172                    sf.samples.append(sample)
173    # Most-frequent first, then alphabetical.
174    return sorted(
175        by_key.values(),
176        key=lambda f: (-f.presence, f.name),
177    )
178
179
180__all__ = [
181    "PreviewEntry",
182    "SchemaField",
183    "infer_schema",
184    "preview_stream",
185]