1"""Stream preview + schema inference. 2 3``preview_stream`` reads the most recent N entries and decodes each entry's 4fields into a JSON-friendly dict (per-field JSON-aware: a string that 5looks like JSON is parsed). The decoded entries surface in the dashboard 6as a "what's in this channel?" table. 7 8``infer_schema`` walks a list of entries and produces a per-key 9:class:`SchemaField` describing observed types, sample values, and 10presence frequency. That's enough for a user to decide whether a channel 11is what they want before binding it. 12""" 13 14from __future__ import annotations 15 16import json 17import logging 18from collections import Counter 19from dataclasses import dataclass, field 20from typing import Any 21 22logger = logging.getLogger(__name__) 23 24 25_MAX_SAMPLE_CHARS = 200 # per sample value 26 27 28@dataclass 29class PreviewEntry: 30 """One decoded stream entry.""" 31 32 stream_id: str 33 ts_ms: int # parsed from the stream id (first segment) 34 fields: dict[str, Any] = field(default_factory=dict) 35 36 37@dataclass 38class SchemaField: 39 """Observed shape of one field across N samples.""" 40 41 name: str 42 types: dict[str, int] = field(default_factory=dict) # type name → count 43 presence: int = 0 # n entries containing it 44 samples: list[str] = field(default_factory=list) # short string samples 45 is_nested: bool = False # carries dict/list 46 47 48def _parse_ts_ms(stream_id: str) -> int: 49 if not stream_id: 50 return 0 51 head = stream_id.split("-", 1)[0] 52 try: 53 return int(head) 54 except ValueError: 55 return 0 56 57 58def _decode_value(v: Any) -> Any: 59 """Decode one Redis field value, trying JSON before falling back to str.""" 60 if isinstance(v, bytes): 61 try: 62 v = v.decode("utf-8") 63 except UnicodeDecodeError: 64 return repr(v) 65 if isinstance(v, str): 66 s = v.strip() 67 if s.startswith(("{", "[", '"')) or s in ("true", "false", "null") or _looks_numeric(s): 68 try: 69 return json.loads(s) 70 except (json.JSONDecodeError, ValueError): 71 return v 72 return v 73 74 75def _looks_numeric(s: str) -> bool: 76 if not s: 77 return False 78 return s.lstrip("-+").replace(".", "", 1).isdigit() 79 80 81def _decode_entry(entry_id: Any, raw_fields: Any) -> PreviewEntry: 82 sid = entry_id.decode("utf-8") if isinstance(entry_id, bytes) else str(entry_id) 83 out: dict[str, Any] = {} 84 if isinstance(raw_fields, dict): 85 for k, v in raw_fields.items(): 86 key = k.decode("utf-8") if isinstance(k, bytes) else str(k) 87 out[key] = _decode_value(v) 88 # Convention across MAF: many publishers stuff the whole payload under a 89 # single ``data`` field. Unwrap it if it's the only field and it's a dict. 90 inner = out.get("data") 91 if len(out) == 1 and isinstance(inner, dict): 92 out = inner 93 return PreviewEntry(stream_id=sid, ts_ms=_parse_ts_ms(sid), fields=out) 94 95 96async def preview_stream( 97 redis_url: str, stream: str, *, count: int = 25, 98) -> list[PreviewEntry]: 99 """Return the most recent ``count`` entries from ``stream``, decoded.""" 100 try: 101 import redis.asyncio as aioredis 102 client = aioredis.from_url(redis_url) 103 except Exception as exc: 104 logger.warning("preview_stream connect %s failed: %s", stream, exc) 105 return [] 106 try: 107 rows = await client.xrevrange(stream, count=count) 108 except Exception as exc: 109 logger.warning("preview_stream xrevrange %s failed: %s", stream, exc) 110 return [] 111 finally: 112 try: 113 ac = getattr(client, "aclose", None) 114 if ac: 115 await ac() 116 else: 117 await client.close() 118 except Exception: 119 pass 120 return [_decode_entry(e_id, fields) for e_id, fields in rows] 121 122 123def _typename(v: Any) -> str: 124 if v is None: 125 return "null" 126 if isinstance(v, bool): 127 return "bool" # check before int (bool is an int subclass) 128 if isinstance(v, int): 129 return "int" 130 if isinstance(v, float): 131 return "float" 132 if isinstance(v, str): 133 return "string" 134 if isinstance(v, dict): 135 return "object" 136 if isinstance(v, list): 137 return "array" 138 return type(v).__name__ 139 140 141def _sample(v: Any) -> str: 142 if isinstance(v, (dict, list)): 143 s = json.dumps(v, default=str, separators=(",", ":")) 144 else: 145 s = str(v) 146 if len(s) > _MAX_SAMPLE_CHARS: 147 s = s[:_MAX_SAMPLE_CHARS] + "…" 148 return s 149 150 151def infer_schema(entries: list[PreviewEntry]) -> list[SchemaField]: 152 """Build a per-field schema from a sample of entries. 153 154 For each key observed across the sample we report: 155 - which types we saw (counter) 156 - presence count (how many entries had this key) 157 - up to 3 short sample values 158 - whether the field ever carries nested dict / list values 159 """ 160 by_key: dict[str, SchemaField] = {} 161 for entry in entries: 162 for k, v in entry.fields.items(): 163 sf = by_key.setdefault(k, SchemaField(name=k)) 164 sf.presence += 1 165 tname = _typename(v) 166 sf.types[tname] = sf.types.get(tname, 0) + 1 167 if tname in ("object", "array"): 168 sf.is_nested = True 169 if len(sf.samples) < 3: 170 sample = _sample(v) 171 if sample not in sf.samples: 172 sf.samples.append(sample) 173 # Most-frequent first, then alphabetical. 174 return sorted( 175 by_key.values(), 176 key=lambda f: (-f.presence, f.name), 177 ) 178 179 180__all__ = [ 181 "PreviewEntry", 182 "SchemaField", 183 "infer_schema", 184 "preview_stream", 185]