1"""Categorised channel enumeration. 2 3Combines three sources of truth so the dashboard can show *everything* 4flowing through MAF on one page: 5 6 1. **Config** — every named stream in :class:`maf.config.StreamsConfig`. 7 2. **Arenas** — each arena's ``output_stream`` (when set). 8 3. **Live keyspace** — Redis SCAN over a curated set of prefixes 9 (``trtools2:*``, ``fomo2:*``, ``kronos:*``, ``mirofish:*``, 10 ``maf:arena:*``). Picks up *publishers we didn't configure*, e.g. a 11 ``trtools2:bars:1m`` that a feed engine wrote but MAF has no binding 12 for. That's the "see what's actually there" case. 13 14Each result is a :class:`Channel` with a category tag so the UI can group: 15 16 - ``input`` — upstream producers (trtools2, fomo2) 17 - ``derived`` — MAF-side workers (kronos, mirofish, action outbox, 18 decision outbox) 19 - ``arena_output`` — per-arena envelope streams 20 - ``control`` — maf:control:*, maf:events, etc. 21 22A channel can carry a short ``description`` when it comes from the static 23catalogue. SCAN-discovered streams have an empty description until a user 24labels them. 25""" 26 27from __future__ import annotations 28 29import logging 30import os 31from dataclasses import dataclass, field 32from typing import Any 33 34logger = logging.getLogger(__name__) 35 36 37CATEGORY_INPUT = "input" 38CATEGORY_DERIVED = "derived" 39CATEGORY_ARENA_OUTPUT = "arena_output" 40CATEGORY_CONTROL = "control" 41 42 43@dataclass(frozen=True) 44class Channel: 45 """One discovered stream + its metadata.""" 46 47 stream: str 48 category: str 49 label: str = "" 50 description: str = "" 51 source: str = "" # "config" | "arena" | "scan" 52 length: int | None = None # filled by xlen() when present 53 last_id: str = "" # most recent entry id, blank when empty 54 55 56# Static catalogue: known streams with human-friendly labels and what they 57# carry. Keys are stream names; values are (category, label, description). 58_STATIC_CATALOG: dict[str, tuple[str, str, str]] = { 59 # Input — upstream producers 60 "trtools2:bars:1m": (CATEGORY_INPUT, "Trtools2 — bars 1m", 61 "OHLCV bars at 1-minute resolution per symbol."), 62 "trtools2:bars:1h": (CATEGORY_INPUT, "Trtools2 — bars 1h", 63 "OHLCV bars at 1-hour resolution per symbol."), 64 "trtools2:news": (CATEGORY_INPUT, "Trtools2 — news", 65 "Enriched news items with tickers, sentiment, source."), 66 "trtools2:indicators": (CATEGORY_INPUT, "Trtools2 — indicators", 67 "Pre-computed technicals (RSI, MACD, ATR, …) per symbol."), 68 "trtools2:strategy:events": (CATEGORY_INPUT, "Trtools2 — strategy events", 69 "Strategy state transitions: BUY/HOLD/SELL with confidence."), 70 "fomo2:enriched": (CATEGORY_INPUT, "Fomo2 — enriched items", 71 "Feed items after dedup + NLP enrichment (sentiment, tickers, entities)."), 72 "fomo2:reports": (CATEGORY_INPUT, "Fomo2 — reports", 73 "LLM-generated reports — drive mirofish-refresher on high-impact events."), 74 "fomo2:requests:in": (CATEGORY_INPUT, "Fomo2 — request inbox", 75 "MAF agents publish here to demand fresh fomo2 data on-demand."), 76 "fomo2:requests:out": (CATEGORY_INPUT, "Fomo2 — request replies", 77 "Fomo2's replies to MAF on-demand requests."), 78 # Derived — MAF-side workers 79 "kronos:forecasts:emitted": (CATEGORY_DERIVED, "Kronos — forecasts emitted", 80 "Compact emit per (symbol, timeframe) when prob_up moves or direction flips."), 81 "mirofish:sims:emitted": (CATEGORY_DERIVED, "Mirofish — sims emitted", 82 "Compact emit when a fresh crowd-sim lands for a watched symbol's report."), 83 "maf:actions:out": (CATEGORY_DERIVED, "Actions outbox", 84 "TradingAction envelopes from trading arenas — consumed by trtools2."), 85 "maf:decisions:out": (CATEGORY_DERIVED, "Decisions outbox", 86 "GenericDecision envelopes from non-quant arenas (research_debate, …)."), 87 "maf:executions:out": (CATEGORY_DERIVED, "Executions outbox", 88 "Engine-side execution decisions (executed/queued/rejected)."), 89 "maf:outcomes:out": (CATEGORY_DERIVED, "Outcomes outbox", 90 "Engine-side final results (fills, closes, pnl) — fed to DecisionMemory."), 91 # Control 92 "maf:events": (CATEGORY_CONTROL, "Lifecycle events", 93 "Every arena.start / phase.complete / agent.signal / decision.emit."), 94 "maf:control:in": (CATEGORY_CONTROL, "Control inbox", 95 "Inbound commands: run_arena, configure_arena, set_data_source, …"), 96 "maf:control:out": (CATEGORY_CONTROL, "Control acks", 97 "Acks for control-plane commands, keyed by correlation_id."), 98} 99 100# SCAN prefixes — anything matching gets surfaced even if we don't know about it. 101_SCAN_PREFIXES: tuple[str, ...] = ( 102 "trtools2:", 103 "fomo2:", 104 "kronos:", 105 "mirofish:", 106 "maf:arena:", 107 "maf:events", 108 "maf:control:", 109 "maf:actions:", 110 "maf:decisions:", 111 "maf:executions:", 112 "maf:outcomes:", 113) 114 115# Maximum keys to inspect per SCAN pass. Stream count in practice is tiny 116# (~30), so a 10k cap is conservative-but-fast. 117_SCAN_MAX_KEYS = 10_000 118 119 120def _categorise(stream: str) -> str: 121 """Best-effort category for SCAN-discovered streams.""" 122 if stream in _STATIC_CATALOG: 123 return _STATIC_CATALOG[stream][0] 124 if stream.startswith("maf:arena:"): 125 return CATEGORY_ARENA_OUTPUT 126 if stream.startswith(("maf:control:", "maf:events")): 127 return CATEGORY_CONTROL 128 if stream.startswith(("maf:actions:", "maf:decisions:", 129 "maf:executions:", "maf:outcomes:", 130 "kronos:", "mirofish:")): 131 return CATEGORY_DERIVED 132 return CATEGORY_INPUT 133 134 135async def _is_stream(client: Any, key: str) -> bool: 136 """``TYPE key`` → "stream". Used to filter SCAN hits.""" 137 try: 138 t = await client.type(key) 139 except Exception: 140 return False 141 if isinstance(t, bytes): 142 t = t.decode("utf-8", errors="replace") 143 return str(t) == "stream" 144 145 146async def _xlen_safe(client: Any, key: str) -> int | None: 147 try: 148 return int(await client.xlen(key)) 149 except Exception: 150 return None 151 152 153async def _last_id(client: Any, key: str) -> str: 154 try: 155 rows = await client.xrevrange(key, count=1) 156 except Exception: 157 return "" 158 if not rows: 159 return "" 160 raw = rows[0][0] 161 return raw.decode("utf-8") if isinstance(raw, bytes) else str(raw) 162 163 164async def discover_channels( 165 *, 166 streams_cfg: Any = None, 167 arena_output_streams: list[str] | None = None, 168 redis_url: str | None = None, 169 include_length: bool = True, 170 include_last_id: bool = True, 171) -> list[Channel]: 172 """Return every channel MAF knows about, categorised and de-duped. 173 174 Parameters 175 ---------- 176 streams_cfg: 177 A :class:`maf.config.StreamsConfig` (or any object with the same 178 attribute names). Optional — when omitted the discovery falls back 179 to the static catalogue + redis scan. 180 arena_output_streams: 181 Each arena's configured ``output_stream``. Surfaces them under 182 ``arena_output`` even when the arena has never published. 183 include_length / include_last_id: 184 Whether to call ``XLEN`` / ``XREVRANGE`` for each channel. 185 Disable for cheaper enumeration when the UI doesn't need them. 186 """ 187 seen: dict[str, Channel] = {} 188 189 def _add(stream: str, *, category: str, label: str = "", 190 description: str = "", source: str = "") -> None: 191 if not stream or stream in seen: 192 return 193 if not category: 194 category = _categorise(stream) 195 seen[stream] = Channel( 196 stream=stream, category=category, label=label, 197 description=description, source=source, 198 ) 199 200 # 1. Static catalogue — every entry, even when not in current config. 201 for stream, (cat, label, desc) in _STATIC_CATALOG.items(): 202 _add(stream, category=cat, label=label, description=desc, source="catalog") 203 204 # 2. StreamsConfig — pulls in user-customised stream names. 205 if streams_cfg is not None: 206 for field_name in ( 207 "events_stream", "control_in", "control_out", "actions_out", 208 "decisions_out", 209 "trtools2_bars_1m", "trtools2_bars_1h", "trtools2_news", 210 "trtools2_indicators", "trtools2_strategy_events", 211 "fomo2_enriched", "fomo2_reports", "fomo2_requests", 212 "kronos_forecasts_emitted", "mirofish_sims_emitted", 213 ): 214 stream = getattr(streams_cfg, field_name, "") 215 if stream: 216 _add(stream, category=_categorise(stream), 217 label=field_name.replace("_", " ").title(), 218 source="config") 219 220 # 3. Arena output streams. 221 for s in arena_output_streams or []: 222 _add(s, category=CATEGORY_ARENA_OUTPUT, label=s.rsplit(":", 1)[-1], 223 description="Arena envelope stream.", source="arena") 224 225 # 4. Live scan — surfaces things nobody configured but Redis has. 226 if redis_url: 227 try: 228 import redis.asyncio as aioredis 229 client = aioredis.from_url(redis_url) 230 except Exception as exc: 231 logger.warning("discover_channels: redis connect failed: %s", exc) 232 return list(seen.values()) 233 try: 234 scanned = 0 235 for prefix in _SCAN_PREFIXES: 236 pattern = f"{prefix}*" 237 async for raw in client.scan_iter(match=pattern, count=200): 238 if scanned > _SCAN_MAX_KEYS: 239 break 240 scanned += 1 241 key = raw.decode("utf-8") if isinstance(raw, bytes) else str(raw) 242 if not await _is_stream(client, key): 243 continue 244 _add(key, category=_categorise(key), source="scan") 245 if scanned > _SCAN_MAX_KEYS: 246 break 247 248 # Hydrate length + last_id for everything we know about now. 249 if include_length or include_last_id: 250 hydrated: dict[str, Channel] = {} 251 for stream, ch in seen.items(): 252 length = await _xlen_safe(client, stream) if include_length else None 253 last = await _last_id(client, stream) if include_last_id else "" 254 hydrated[stream] = Channel( 255 stream=ch.stream, category=ch.category, 256 label=ch.label, description=ch.description, 257 source=ch.source, length=length, last_id=last, 258 ) 259 seen = hydrated 260 finally: 261 try: 262 ac = getattr(client, "aclose", None) 263 if ac: 264 await ac() 265 else: 266 await client.close() 267 except Exception: 268 pass 269 270 # Return sorted: arena_output → derived → input → control, then alphabetical. 271 cat_order = { 272 CATEGORY_ARENA_OUTPUT: 0, CATEGORY_DERIVED: 1, 273 CATEGORY_INPUT: 2, CATEGORY_CONTROL: 3, 274 } 275 return sorted( 276 seen.values(), 277 key=lambda c: (cat_order.get(c.category, 99), c.stream), 278 ) 279 280 281__all__ = [ 282 "CATEGORY_ARENA_OUTPUT", 283 "CATEGORY_CONTROL", 284 "CATEGORY_DERIVED", 285 "CATEGORY_INPUT", 286 "Channel", 287 "discover_channels", 288]