checking system…
Docs / back / src/maf/streaming/bus.py · line 210
Python · 398 lines
  1"""EventBus — best-effort lifecycle event publisher on Redis Streams.
  2
  3Wire shape
  4----------
  5Each event is one Redis Stream entry with a single ``data`` field whose value
  6is a JSON object::
  7
  8    {
  9      "schema_version": "1",
 10      "kind":         "phase.complete",     # see EVENT_KINDS
 11      "arena":        "trading_intelligence",
 12      "arena_id":     "<uuid>",
 13      "phase":        "analysis",
 14      "ts":           "2026-05-14T11:22:33.123456+00:00",
 15      "correlation_id": "<uuid>",
 16      "payload":      { ... arbitrary structured event data ... }
 17    }
 18
 19Consumers JSON-decode and dispatch on ``kind``. Unknown kinds are dropped
 20with a WARN — keeps schema evolution forward-compatible.
 21
 22Design notes
 23------------
 24*Best-effort*: publishing never blocks arena execution. Failures are caught
 25and logged; arenas keep running. We trade durability for the guarantee that
 26"event bus down" never breaks "arenas work".
 27
 28*Lazy redis client*: created on first publish to avoid forcing every test
 29to provide Redis. Tests can use :class:`NullEventBus` for full bypass.
 30
 31*Process-global*: :func:`get_event_bus` / :func:`set_event_bus` give a
 32process-wide singleton so deep code paths (Phase.run, agents) can emit
 33events without threading the bus through every signature.
 34"""
 35
 36from __future__ import annotations
 37
 38import asyncio
 39import json
 40import logging
 41import os
 42import uuid
 43from collections.abc import AsyncIterator
 44from datetime import UTC, datetime
 45from typing import Any
 46
 47logger = logging.getLogger(__name__)
 48
 49
 50# Canonical event kinds. New kinds are fine — consumers tolerate unknowns.
 51# Use dotted lower_case names so subscribers can prefix-match.
 52EVENT_KINDS = frozenset({
 53    "arena.start",
 54    "arena.complete",
 55    "arena.error",
 56    "phase.start",
 57    "phase.complete",
 58    "phase.error",
 59    "agent.start",
 60    "agent.complete",
 61    "agent.signal",
 62    "agent.error",
 63    "decision.emit",
 64    "source.fetch",
 65    "source.error",
 66    "llm.call",
 67    "llm.error",
 68    "control.command",
 69    "control.ack",
 70    "action.emit",
 71    "system.status",
 72})
 73
 74
 75def _utcnow_iso() -> str:
 76    return datetime.now(UTC).isoformat()
 77
 78
 79def _safe_payload(value: Any) -> Any:
 80    """Make ``value`` JSON-serialisable, best-effort.
 81
 82    Anything Pydantic-shaped exposes ``model_dump``; anything dataclass-shaped
 83    serialises via vars(); everything else falls back to ``str()``. Used at
 84    the boundary so a stray non-JSON object in an event payload doesn't
 85    poison the whole event publish.
 86    """
 87    if value is None or isinstance(value, (str, int, float, bool)):
 88        return value
 89    if isinstance(value, dict):
 90        return {k: _safe_payload(v) for k, v in value.items()}
 91    if isinstance(value, (list, tuple, set, frozenset)):
 92        return [_safe_payload(v) for v in value]
 93    if hasattr(value, "model_dump"):
 94        try:
 95            return value.model_dump(mode="json")
 96        except Exception:
 97            return str(value)
 98    if hasattr(value, "__dict__"):
 99        try:
100            return {k: _safe_payload(v) for k, v in vars(value).items()
101                    if not k.startswith("_")}
102        except Exception:
103            return str(value)
104    return str(value)
105
106
107class EventBus:
108    """Publishes lifecycle events to a Redis Stream.
109
110    Parameters
111    ----------
112    redis_url:
113        Redis connection URL. Falls back to ``REDIS_URL`` env, then
114        ``redis://localhost:6379/0``.
115    stream:
116        Stream name. Defaults to ``maf:events``.
117    maxlen:
118        Approximate cap on the stream (``XADD ... MAXLEN ~``). Keeps the
119        stream from growing unbounded on a long-running service. Set to 0
120        to disable trimming.
121    """
122
123    def __init__(
124        self,
125        redis_url: str | None = None,
126        stream: str = "maf:events",
127        maxlen: int = 100_000,
128    ) -> None:
129        self.redis_url = redis_url or os.environ.get(
130            "REDIS_URL", "redis://localhost:6379/0",
131        )
132        self.stream = stream
133        self.maxlen = maxlen
134        self._redis: Any = None
135        # Track failures so we don't spam logs once Redis goes down.
136        self._consecutive_failures = 0
137
138    async def _get_redis(self) -> Any:
139        if self._redis is None:
140            import redis.asyncio as aioredis
141
142            self._redis = aioredis.from_url(self.redis_url)
143        return self._redis
144
145    async def publish(
146        self,
147        kind: str,
148        *,
149        arena: str = "",
150        arena_id: str = "",
151        phase: str = "",
152        correlation_id: str = "",
153        payload: dict[str, Any] | None = None,
154    ) -> str:
155        """Publish a single event. Returns the assigned stream id (or '').
156
157        Failures swallowed and logged once-per-streak so a transient Redis
158        outage doesn't fill the log with the same exception.
159        """
160        if kind not in EVENT_KINDS:
161            # Don't reject — log once and pass through so new kinds work.
162            logger.debug("EventBus.publish: unknown kind %r (forwarding)", kind)
163
164        body = {
165            "schema_version": "1",
166            "kind": kind,
167            "arena": arena,
168            "arena_id": arena_id,
169            "phase": phase,
170            "ts": _utcnow_iso(),
171            "correlation_id": correlation_id or uuid.uuid4().hex,
172            "payload": _safe_payload(payload or {}),
173        }
174
175        try:
176            client = await self._get_redis()
177            kwargs: dict[str, Any] = {}
178            if self.maxlen > 0:
179                kwargs["maxlen"] = self.maxlen
180                kwargs["approximate"] = True
181            raw_id = await client.xadd(
182                self.stream,
183                {"data": json.dumps(body, default=str)},
184                **kwargs,
185            )
186            self._consecutive_failures = 0
187            if isinstance(raw_id, bytes):
188                return raw_id.decode("utf-8")
189            return str(raw_id)
190        except Exception as exc:
191            self._consecutive_failures += 1
192            if self._consecutive_failures <= 1 or self._consecutive_failures % 100 == 0:
193                logger.warning(
194                    "EventBus publish failed (%s) — stream=%s, suppressing further "
195                    "warnings until next success",
196                    exc, self.stream,
197                )
198            return ""
199
200    def publish_nowait(self, kind: str, **kwargs: Any) -> None:
201        """Fire-and-forget — schedules :meth:`publish` if a loop is running.
202
203        Useful from sync code paths (e.g. agent error handlers) that can't
204        await. Silently drops if no event loop is available.
205        """
206        try:
207            loop = asyncio.get_running_loop()
208        except RuntimeError:
209            return
210        loop.create_task(self.publish(kind, **kwargs))
211
212    async def aclose(self) -> None:
213        if self._redis is None:
214            return
215        try:
216            aclose = getattr(self._redis, "aclose", None)
217            if aclose is not None:
218                await aclose()
219            else:
220                close = getattr(self._redis, "close", None)
221                if close is not None:
222                    await close()
223        except Exception:
224            pass
225
226
227class NullEventBus(EventBus):
228    """No-op event bus for tests and offline runs.
229
230    Use this when Redis is intentionally unavailable. ``publish`` returns ''
231    immediately without touching the network.
232    """
233
234    def __init__(self) -> None:  # noqa: D401 — overrides parent
235        # Skip parent __init__ — we never touch redis.
236        self.redis_url = ""
237        self.stream = ""
238        self.maxlen = 0
239        self._redis = None
240        self._consecutive_failures = 0
241
242    async def publish(self, kind: str, **kwargs: Any) -> str:
243        return ""
244
245    def publish_nowait(self, kind: str, **kwargs: Any) -> None:
246        return None
247
248    async def aclose(self) -> None:
249        return None
250
251
252# ---------------------------------------------------------------------------
253# Reader
254# ---------------------------------------------------------------------------
255
256
257class EventBusReader:
258    """Async iterator over events on a Redis Stream.
259
260    Used by the dashboard WebSocket pump. Yields decoded event dicts.
261    Starts from ``last_id`` ("$" for live tail, "0" for replay-from-start).
262    """
263
264    def __init__(
265        self,
266        redis_url: str | None = None,
267        stream: str = "maf:events",
268        last_id: str = "$",
269        block_ms: int = 5000,
270        count: int = 50,
271    ) -> None:
272        self.redis_url = redis_url or os.environ.get(
273            "REDIS_URL", "redis://localhost:6379/0",
274        )
275        self.stream = stream
276        self.last_id = last_id
277        self.block_ms = block_ms
278        self.count = count
279        self._redis: Any = None
280
281    async def _get_redis(self) -> Any:
282        if self._redis is None:
283            import redis.asyncio as aioredis
284
285            self._redis = aioredis.from_url(self.redis_url)
286        return self._redis
287
288    async def __aiter__(self) -> AsyncIterator[dict[str, Any]]:
289        client = await self._get_redis()
290        while True:
291            try:
292                resp = await client.xread(
293                    {self.stream: self.last_id},
294                    block=self.block_ms,
295                    count=self.count,
296                )
297            except Exception as exc:
298                logger.warning("EventBusReader xread failed: %s", exc)
299                await asyncio.sleep(1.0)
300                continue
301            if not resp:
302                continue
303            for _stream, entries in resp:
304                for entry_id, fields in entries:
305                    sid = (
306                        entry_id.decode("utf-8")
307                        if isinstance(entry_id, bytes)
308                        else str(entry_id)
309                    )
310                    self.last_id = sid
311                    yield _decode_event(sid, fields)
312
313    async def aclose(self) -> None:
314        if self._redis is None:
315            return
316        try:
317            aclose = getattr(self._redis, "aclose", None)
318            if aclose is not None:
319                await aclose()
320            else:
321                close = getattr(self._redis, "close", None)
322                if close is not None:
323                    await close()
324        except Exception:
325            pass
326
327
328def _decode_event(stream_id: str, fields: Any) -> dict[str, Any]:
329    """Decode a stream entry into an event dict.
330
331    Returns a dict even on failure (with ``error`` key) so the WebSocket
332    pump can still forward "something happened" rather than dropping.
333    """
334    raw = None
335    if isinstance(fields, dict):
336        raw = fields.get(b"data") or fields.get("data")
337    if raw is None:
338        return {"stream_id": stream_id, "error": "no_data_field"}
339    if isinstance(raw, bytes):
340        raw = raw.decode("utf-8")
341    try:
342        body = json.loads(raw)
343    except (json.JSONDecodeError, TypeError) as exc:
344        return {"stream_id": stream_id, "error": f"json_decode: {exc}"}
345    body["stream_id"] = stream_id
346    return body
347
348
349# ---------------------------------------------------------------------------
350# Process-global accessor
351# ---------------------------------------------------------------------------
352
353
354_BUS: EventBus | None = None
355
356
357def set_event_bus(bus: EventBus | None) -> None:
358    """Install a process-wide :class:`EventBus`.
359
360    Pass ``None`` to clear (test cleanup).
361    """
362    global _BUS
363    _BUS = bus
364
365
366def get_event_bus() -> EventBus:
367    """Return the process-wide :class:`EventBus`, defaulting to NullEventBus.
368
369    Deep code paths (phases, agents) call this so they don't need the bus
370    threaded through their signatures. :class:`MAFApp` installs the real bus
371    at startup.
372    """
373    if _BUS is None:
374        return NullEventBus()
375    return _BUS
376
377
378def install_default_bus(
379    redis_url: str,
380    stream: str = "maf:events",
381    maxlen: int = 100_000,
382) -> EventBus:
383    """Install and return a fresh :class:`EventBus` as the process default."""
384    bus = EventBus(redis_url=redis_url, stream=stream, maxlen=maxlen)
385    set_event_bus(bus)
386    return bus
387
388
389__all__ = [
390    "EVENT_KINDS",
391    "EventBus",
392    "EventBusReader",
393    "NullEventBus",
394    "get_event_bus",
395    "install_default_bus",
396    "set_event_bus",
397]