checking system…
Docs / back / src/maf/channels/discovery.py · line 164
Python · 289 lines
  1"""Categorised channel enumeration.
  2
  3Combines three sources of truth so the dashboard can show *everything*
  4flowing through MAF on one page:
  5
  6  1. **Config** — every named stream in :class:`maf.config.StreamsConfig`.
  7  2. **Arenas** — each arena's ``output_stream`` (when set).
  8  3. **Live keyspace** — Redis SCAN over a curated set of prefixes
  9     (``trtools2:*``, ``fomo2:*``, ``kronos:*``, ``mirofish:*``,
 10     ``maf:arena:*``). Picks up *publishers we didn't configure*, e.g. a
 11     ``trtools2:bars:1m`` that a feed engine wrote but MAF has no binding
 12     for. That's the "see what's actually there" case.
 13
 14Each result is a :class:`Channel` with a category tag so the UI can group:
 15
 16  - ``input``         — upstream producers (trtools2, fomo2)
 17  - ``derived``       — MAF-side workers (kronos, mirofish, action outbox,
 18                         decision outbox)
 19  - ``arena_output``  — per-arena envelope streams
 20  - ``control``       — maf:control:*, maf:events, etc.
 21
 22A channel can carry a short ``description`` when it comes from the static
 23catalogue. SCAN-discovered streams have an empty description until a user
 24labels them.
 25"""
 26
 27from __future__ import annotations
 28
 29import logging
 30import os
 31from dataclasses import dataclass, field
 32from typing import Any
 33
 34logger = logging.getLogger(__name__)
 35
 36
 37CATEGORY_INPUT = "input"
 38CATEGORY_DERIVED = "derived"
 39CATEGORY_ARENA_OUTPUT = "arena_output"
 40CATEGORY_CONTROL = "control"
 41
 42
 43@dataclass(frozen=True)
 44class Channel:
 45    """One discovered stream + its metadata."""
 46
 47    stream: str
 48    category: str
 49    label: str = ""
 50    description: str = ""
 51    source: str = ""             # "config" | "arena" | "scan"
 52    length: int | None = None    # filled by xlen() when present
 53    last_id: str = ""            # most recent entry id, blank when empty
 54
 55
 56# Static catalogue: known streams with human-friendly labels and what they
 57# carry. Keys are stream names; values are (category, label, description).
 58_STATIC_CATALOG: dict[str, tuple[str, str, str]] = {
 59    # Input — upstream producers
 60    "trtools2:bars:1m":           (CATEGORY_INPUT, "Trtools2 — bars 1m",
 61                                     "OHLCV bars at 1-minute resolution per symbol."),
 62    "trtools2:bars:1h":           (CATEGORY_INPUT, "Trtools2 — bars 1h",
 63                                     "OHLCV bars at 1-hour resolution per symbol."),
 64    "trtools2:news":              (CATEGORY_INPUT, "Trtools2 — news",
 65                                     "Enriched news items with tickers, sentiment, source."),
 66    "trtools2:indicators":        (CATEGORY_INPUT, "Trtools2 — indicators",
 67                                     "Pre-computed technicals (RSI, MACD, ATR, …) per symbol."),
 68    "trtools2:strategy:events":   (CATEGORY_INPUT, "Trtools2 — strategy events",
 69                                     "Strategy state transitions: BUY/HOLD/SELL with confidence."),
 70    "fomo2:enriched":             (CATEGORY_INPUT, "Fomo2 — enriched items",
 71                                     "Feed items after dedup + NLP enrichment (sentiment, tickers, entities)."),
 72    "fomo2:reports":              (CATEGORY_INPUT, "Fomo2 — reports",
 73                                     "LLM-generated reports — drive mirofish-refresher on high-impact events."),
 74    "fomo2:requests:in":          (CATEGORY_INPUT, "Fomo2 — request inbox",
 75                                     "MAF agents publish here to demand fresh fomo2 data on-demand."),
 76    "fomo2:requests:out":         (CATEGORY_INPUT, "Fomo2 — request replies",
 77                                     "Fomo2's replies to MAF on-demand requests."),
 78    # Derived — MAF-side workers
 79    "kronos:forecasts:emitted":   (CATEGORY_DERIVED, "Kronos — forecasts emitted",
 80                                     "Compact emit per (symbol, timeframe) when prob_up moves or direction flips."),
 81    "mirofish:sims:emitted":      (CATEGORY_DERIVED, "Mirofish — sims emitted",
 82                                     "Compact emit when a fresh crowd-sim lands for a watched symbol's report."),
 83    "maf:actions:out":            (CATEGORY_DERIVED, "Actions outbox",
 84                                     "TradingAction envelopes from trading arenas — consumed by trtools2."),
 85    "maf:decisions:out":          (CATEGORY_DERIVED, "Decisions outbox",
 86                                     "GenericDecision envelopes from non-quant arenas (research_debate, …)."),
 87    "maf:executions:out":         (CATEGORY_DERIVED, "Executions outbox",
 88                                     "Engine-side execution decisions (executed/queued/rejected)."),
 89    "maf:outcomes:out":           (CATEGORY_DERIVED, "Outcomes outbox",
 90                                     "Engine-side final results (fills, closes, pnl) — fed to DecisionMemory."),
 91    # Control
 92    "maf:events":                 (CATEGORY_CONTROL, "Lifecycle events",
 93                                     "Every arena.start / phase.complete / agent.signal / decision.emit."),
 94    "maf:control:in":             (CATEGORY_CONTROL, "Control inbox",
 95                                     "Inbound commands: run_arena, configure_arena, set_data_source, …"),
 96    "maf:control:out":            (CATEGORY_CONTROL, "Control acks",
 97                                     "Acks for control-plane commands, keyed by correlation_id."),
 98}
 99
100# SCAN prefixes — anything matching gets surfaced even if we don't know about it.
101_SCAN_PREFIXES: tuple[str, ...] = (
102    "trtools2:",
103    "fomo2:",
104    "kronos:",
105    "mirofish:",
106    "maf:arena:",
107    "maf:events",
108    "maf:control:",
109    "maf:actions:",
110    "maf:decisions:",
111    "maf:executions:",
112    "maf:outcomes:",
113)
114
115# Maximum keys to inspect per SCAN pass. Stream count in practice is tiny
116# (~30), so a 10k cap is conservative-but-fast.
117_SCAN_MAX_KEYS = 10_000
118
119
120def _categorise(stream: str) -> str:
121    """Best-effort category for SCAN-discovered streams."""
122    if stream in _STATIC_CATALOG:
123        return _STATIC_CATALOG[stream][0]
124    if stream.startswith("maf:arena:"):
125        return CATEGORY_ARENA_OUTPUT
126    if stream.startswith(("maf:control:", "maf:events")):
127        return CATEGORY_CONTROL
128    if stream.startswith(("maf:actions:", "maf:decisions:",
129                           "maf:executions:", "maf:outcomes:",
130                           "kronos:", "mirofish:")):
131        return CATEGORY_DERIVED
132    return CATEGORY_INPUT
133
134
135async def _is_stream(client: Any, key: str) -> bool:
136    """``TYPE key`` → "stream". Used to filter SCAN hits."""
137    try:
138        t = await client.type(key)
139    except Exception:
140        return False
141    if isinstance(t, bytes):
142        t = t.decode("utf-8", errors="replace")
143    return str(t) == "stream"
144
145
146async def _xlen_safe(client: Any, key: str) -> int | None:
147    try:
148        return int(await client.xlen(key))
149    except Exception:
150        return None
151
152
153async def _last_id(client: Any, key: str) -> str:
154    try:
155        rows = await client.xrevrange(key, count=1)
156    except Exception:
157        return ""
158    if not rows:
159        return ""
160    raw = rows[0][0]
161    return raw.decode("utf-8") if isinstance(raw, bytes) else str(raw)
162
163
164async def discover_channels(
165    *,
166    streams_cfg: Any = None,
167    arena_output_streams: list[str] | None = None,
168    redis_url: str | None = None,
169    include_length: bool = True,
170    include_last_id: bool = True,
171) -> list[Channel]:
172    """Return every channel MAF knows about, categorised and de-duped.
173
174    Parameters
175    ----------
176    streams_cfg:
177        A :class:`maf.config.StreamsConfig` (or any object with the same
178        attribute names). Optional — when omitted the discovery falls back
179        to the static catalogue + redis scan.
180    arena_output_streams:
181        Each arena's configured ``output_stream``. Surfaces them under
182        ``arena_output`` even when the arena has never published.
183    include_length / include_last_id:
184        Whether to call ``XLEN`` / ``XREVRANGE`` for each channel.
185        Disable for cheaper enumeration when the UI doesn't need them.
186    """
187    seen: dict[str, Channel] = {}
188
189    def _add(stream: str, *, category: str, label: str = "",
190              description: str = "", source: str = "") -> None:
191        if not stream or stream in seen:
192            return
193        if not category:
194            category = _categorise(stream)
195        seen[stream] = Channel(
196            stream=stream, category=category, label=label,
197            description=description, source=source,
198        )
199
200    # 1. Static catalogue — every entry, even when not in current config.
201    for stream, (cat, label, desc) in _STATIC_CATALOG.items():
202        _add(stream, category=cat, label=label, description=desc, source="catalog")
203
204    # 2. StreamsConfig — pulls in user-customised stream names.
205    if streams_cfg is not None:
206        for field_name in (
207            "events_stream", "control_in", "control_out", "actions_out",
208            "decisions_out",
209            "trtools2_bars_1m", "trtools2_bars_1h", "trtools2_news",
210            "trtools2_indicators", "trtools2_strategy_events",
211            "fomo2_enriched", "fomo2_reports", "fomo2_requests",
212            "kronos_forecasts_emitted", "mirofish_sims_emitted",
213        ):
214            stream = getattr(streams_cfg, field_name, "")
215            if stream:
216                _add(stream, category=_categorise(stream),
217                      label=field_name.replace("_", " ").title(),
218                      source="config")
219
220    # 3. Arena output streams.
221    for s in arena_output_streams or []:
222        _add(s, category=CATEGORY_ARENA_OUTPUT, label=s.rsplit(":", 1)[-1],
223              description="Arena envelope stream.", source="arena")
224
225    # 4. Live scan — surfaces things nobody configured but Redis has.
226    if redis_url:
227        try:
228            import redis.asyncio as aioredis
229            client = aioredis.from_url(redis_url)
230        except Exception as exc:
231            logger.warning("discover_channels: redis connect failed: %s", exc)
232            return list(seen.values())
233        try:
234            scanned = 0
235            for prefix in _SCAN_PREFIXES:
236                pattern = f"{prefix}*"
237                async for raw in client.scan_iter(match=pattern, count=200):
238                    if scanned > _SCAN_MAX_KEYS:
239                        break
240                    scanned += 1
241                    key = raw.decode("utf-8") if isinstance(raw, bytes) else str(raw)
242                    if not await _is_stream(client, key):
243                        continue
244                    _add(key, category=_categorise(key), source="scan")
245                if scanned > _SCAN_MAX_KEYS:
246                    break
247
248            # Hydrate length + last_id for everything we know about now.
249            if include_length or include_last_id:
250                hydrated: dict[str, Channel] = {}
251                for stream, ch in seen.items():
252                    length = await _xlen_safe(client, stream) if include_length else None
253                    last = await _last_id(client, stream) if include_last_id else ""
254                    hydrated[stream] = Channel(
255                        stream=ch.stream, category=ch.category,
256                        label=ch.label, description=ch.description,
257                        source=ch.source, length=length, last_id=last,
258                    )
259                seen = hydrated
260        finally:
261            try:
262                ac = getattr(client, "aclose", None)
263                if ac:
264                    await ac()
265                else:
266                    await client.close()
267            except Exception:
268                pass
269
270    # Return sorted: arena_output → derived → input → control, then alphabetical.
271    cat_order = {
272        CATEGORY_ARENA_OUTPUT: 0, CATEGORY_DERIVED: 1,
273        CATEGORY_INPUT: 2, CATEGORY_CONTROL: 3,
274    }
275    return sorted(
276        seen.values(),
277        key=lambda c: (cat_order.get(c.category, 99), c.stream),
278    )
279
280
281__all__ = [
282    "CATEGORY_ARENA_OUTPUT",
283    "CATEGORY_CONTROL",
284    "CATEGORY_DERIVED",
285    "CATEGORY_INPUT",
286    "Channel",
287    "discover_channels",
288]