1"""EventBus — best-effort lifecycle event publisher on Redis Streams. 2 3Wire shape 4---------- 5Each event is one Redis Stream entry with a single ``data`` field whose value 6is a JSON object:: 7 8 { 9 "schema_version": "1", 10 "kind": "phase.complete", # see EVENT_KINDS 11 "arena": "trading_intelligence", 12 "arena_id": "<uuid>", 13 "phase": "analysis", 14 "ts": "2026-05-14T11:22:33.123456+00:00", 15 "correlation_id": "<uuid>", 16 "payload": { ... arbitrary structured event data ... } 17 } 18 19Consumers JSON-decode and dispatch on ``kind``. Unknown kinds are dropped 20with a WARN — keeps schema evolution forward-compatible. 21 22Design notes 23------------ 24*Best-effort*: publishing never blocks arena execution. Failures are caught 25and logged; arenas keep running. We trade durability for the guarantee that 26"event bus down" never breaks "arenas work". 27 28*Lazy redis client*: created on first publish to avoid forcing every test 29to provide Redis. Tests can use :class:`NullEventBus` for full bypass. 30 31*Process-global*: :func:`get_event_bus` / :func:`set_event_bus` give a 32process-wide singleton so deep code paths (Phase.run, agents) can emit 33events without threading the bus through every signature. 34""" 35 36from __future__ import annotations 37 38import asyncio 39import json 40import logging 41import os 42import uuid 43from collections.abc import AsyncIterator 44from datetime import UTC, datetime 45from typing import Any 46 47logger = logging.getLogger(__name__) 48 49 50# Canonical event kinds. New kinds are fine — consumers tolerate unknowns. 51# Use dotted lower_case names so subscribers can prefix-match. 52EVENT_KINDS = frozenset({ 53 "arena.start", 54 "arena.complete", 55 "arena.error", 56 "phase.start", 57 "phase.complete", 58 "phase.error", 59 "agent.start", 60 "agent.complete", 61 "agent.signal", 62 "agent.error", 63 "decision.emit", 64 "source.fetch", 65 "source.error", 66 "llm.call", 67 "llm.error", 68 "control.command", 69 "control.ack", 70 "action.emit", 71 "system.status", 72}) 73 74 75def _utcnow_iso() -> str: 76 return datetime.now(UTC).isoformat() 77 78 79def _safe_payload(value: Any) -> Any: 80 """Make ``value`` JSON-serialisable, best-effort. 81 82 Anything Pydantic-shaped exposes ``model_dump``; anything dataclass-shaped 83 serialises via vars(); everything else falls back to ``str()``. Used at 84 the boundary so a stray non-JSON object in an event payload doesn't 85 poison the whole event publish. 86 """ 87 if value is None or isinstance(value, (str, int, float, bool)): 88 return value 89 if isinstance(value, dict): 90 return {k: _safe_payload(v) for k, v in value.items()} 91 if isinstance(value, (list, tuple, set, frozenset)): 92 return [_safe_payload(v) for v in value] 93 if hasattr(value, "model_dump"): 94 try: 95 return value.model_dump(mode="json") 96 except Exception: 97 return str(value) 98 if hasattr(value, "__dict__"): 99 try: 100 return {k: _safe_payload(v) for k, v in vars(value).items() 101 if not k.startswith("_")} 102 except Exception: 103 return str(value) 104 return str(value) 105 106 107class EventBus: 108 """Publishes lifecycle events to a Redis Stream. 109 110 Parameters 111 ---------- 112 redis_url: 113 Redis connection URL. Falls back to ``REDIS_URL`` env, then 114 ``redis://localhost:6379/0``. 115 stream: 116 Stream name. Defaults to ``maf:events``. 117 maxlen: 118 Approximate cap on the stream (``XADD ... MAXLEN ~``). Keeps the 119 stream from growing unbounded on a long-running service. Set to 0 120 to disable trimming. 121 """ 122 123 def __init__( 124 self, 125 redis_url: str | None = None, 126 stream: str = "maf:events", 127 maxlen: int = 100_000, 128 ) -> None: 129 self.redis_url = redis_url or os.environ.get( 130 "REDIS_URL", "redis://localhost:6379/0", 131 ) 132 self.stream = stream 133 self.maxlen = maxlen 134 self._redis: Any = None 135 # Track failures so we don't spam logs once Redis goes down. 136 self._consecutive_failures = 0 137 138 async def _get_redis(self) -> Any: 139 if self._redis is None: 140 import redis.asyncio as aioredis 141 142 self._redis = aioredis.from_url(self.redis_url) 143 return self._redis 144 145 async def publish( 146 self, 147 kind: str, 148 *, 149 arena: str = "", 150 arena_id: str = "", 151 phase: str = "", 152 correlation_id: str = "", 153 payload: dict[str, Any] | None = None, 154 ) -> str: 155 """Publish a single event. Returns the assigned stream id (or ''). 156 157 Failures swallowed and logged once-per-streak so a transient Redis 158 outage doesn't fill the log with the same exception. 159 """ 160 if kind not in EVENT_KINDS: 161 # Don't reject — log once and pass through so new kinds work. 162 logger.debug("EventBus.publish: unknown kind %r (forwarding)", kind) 163 164 body = { 165 "schema_version": "1", 166 "kind": kind, 167 "arena": arena, 168 "arena_id": arena_id, 169 "phase": phase, 170 "ts": _utcnow_iso(), 171 "correlation_id": correlation_id or uuid.uuid4().hex, 172 "payload": _safe_payload(payload or {}), 173 } 174 175 try: 176 client = await self._get_redis() 177 kwargs: dict[str, Any] = {} 178 if self.maxlen > 0: 179 kwargs["maxlen"] = self.maxlen 180 kwargs["approximate"] = True 181 raw_id = await client.xadd( 182 self.stream, 183 {"data": json.dumps(body, default=str)}, 184 **kwargs, 185 ) 186 self._consecutive_failures = 0 187 if isinstance(raw_id, bytes): 188 return raw_id.decode("utf-8") 189 return str(raw_id) 190 except Exception as exc: 191 self._consecutive_failures += 1 192 if self._consecutive_failures <= 1 or self._consecutive_failures % 100 == 0: 193 logger.warning( 194 "EventBus publish failed (%s) — stream=%s, suppressing further " 195 "warnings until next success", 196 exc, self.stream, 197 ) 198 return "" 199 200 def publish_nowait(self, kind: str, **kwargs: Any) -> None: 201 """Fire-and-forget — schedules :meth:`publish` if a loop is running. 202 203 Useful from sync code paths (e.g. agent error handlers) that can't 204 await. Silently drops if no event loop is available. 205 """ 206 try: 207 loop = asyncio.get_running_loop() 208 except RuntimeError: 209 return 210 loop.create_task(self.publish(kind, **kwargs)) 211 212 async def aclose(self) -> None: 213 if self._redis is None: 214 return 215 try: 216 aclose = getattr(self._redis, "aclose", None) 217 if aclose is not None: 218 await aclose() 219 else: 220 close = getattr(self._redis, "close", None) 221 if close is not None: 222 await close() 223 except Exception: 224 pass 225 226 227class NullEventBus(EventBus): 228 """No-op event bus for tests and offline runs. 229 230 Use this when Redis is intentionally unavailable. ``publish`` returns '' 231 immediately without touching the network. 232 """ 233 234 def __init__(self) -> None: # noqa: D401 — overrides parent 235 # Skip parent __init__ — we never touch redis. 236 self.redis_url = "" 237 self.stream = "" 238 self.maxlen = 0 239 self._redis = None 240 self._consecutive_failures = 0 241 242 async def publish(self, kind: str, **kwargs: Any) -> str: 243 return "" 244 245 def publish_nowait(self, kind: str, **kwargs: Any) -> None: 246 return None 247 248 async def aclose(self) -> None: 249 return None 250 251 252# --------------------------------------------------------------------------- 253# Reader 254# --------------------------------------------------------------------------- 255 256 257class EventBusReader: 258 """Async iterator over events on a Redis Stream. 259 260 Used by the dashboard WebSocket pump. Yields decoded event dicts. 261 Starts from ``last_id`` ("$" for live tail, "0" for replay-from-start). 262 """ 263 264 def __init__( 265 self, 266 redis_url: str | None = None, 267 stream: str = "maf:events", 268 last_id: str = "$", 269 block_ms: int = 5000, 270 count: int = 50, 271 ) -> None: 272 self.redis_url = redis_url or os.environ.get( 273 "REDIS_URL", "redis://localhost:6379/0", 274 ) 275 self.stream = stream 276 self.last_id = last_id 277 self.block_ms = block_ms 278 self.count = count 279 self._redis: Any = None 280 281 async def _get_redis(self) -> Any: 282 if self._redis is None: 283 import redis.asyncio as aioredis 284 285 self._redis = aioredis.from_url(self.redis_url) 286 return self._redis 287 288 async def __aiter__(self) -> AsyncIterator[dict[str, Any]]: 289 client = await self._get_redis() 290 while True: 291 try: 292 resp = await client.xread( 293 {self.stream: self.last_id}, 294 block=self.block_ms, 295 count=self.count, 296 ) 297 except Exception as exc: 298 logger.warning("EventBusReader xread failed: %s", exc) 299 await asyncio.sleep(1.0) 300 continue 301 if not resp: 302 continue 303 for _stream, entries in resp: 304 for entry_id, fields in entries: 305 sid = ( 306 entry_id.decode("utf-8") 307 if isinstance(entry_id, bytes) 308 else str(entry_id) 309 ) 310 self.last_id = sid 311 yield _decode_event(sid, fields) 312 313 async def aclose(self) -> None: 314 if self._redis is None: 315 return 316 try: 317 aclose = getattr(self._redis, "aclose", None) 318 if aclose is not None: 319 await aclose() 320 else: 321 close = getattr(self._redis, "close", None) 322 if close is not None: 323 await close() 324 except Exception: 325 pass 326 327 328def _decode_event(stream_id: str, fields: Any) -> dict[str, Any]: 329 """Decode a stream entry into an event dict. 330 331 Returns a dict even on failure (with ``error`` key) so the WebSocket 332 pump can still forward "something happened" rather than dropping. 333 """ 334 raw = None 335 if isinstance(fields, dict): 336 raw = fields.get(b"data") or fields.get("data") 337 if raw is None: 338 return {"stream_id": stream_id, "error": "no_data_field"} 339 if isinstance(raw, bytes): 340 raw = raw.decode("utf-8") 341 try: 342 body = json.loads(raw) 343 except (json.JSONDecodeError, TypeError) as exc: 344 return {"stream_id": stream_id, "error": f"json_decode: {exc}"} 345 body["stream_id"] = stream_id 346 return body 347 348 349# --------------------------------------------------------------------------- 350# Process-global accessor 351# --------------------------------------------------------------------------- 352 353 354_BUS: EventBus | None = None 355 356 357def set_event_bus(bus: EventBus | None) -> None: 358 """Install a process-wide :class:`EventBus`. 359 360 Pass ``None`` to clear (test cleanup). 361 """ 362 global _BUS 363 _BUS = bus 364 365 366def get_event_bus() -> EventBus: 367 """Return the process-wide :class:`EventBus`, defaulting to NullEventBus. 368 369 Deep code paths (phases, agents) call this so they don't need the bus 370 threaded through their signatures. :class:`MAFApp` installs the real bus 371 at startup. 372 """ 373 if _BUS is None: 374 return NullEventBus() 375 return _BUS 376 377 378def install_default_bus( 379 redis_url: str, 380 stream: str = "maf:events", 381 maxlen: int = 100_000, 382) -> EventBus: 383 """Install and return a fresh :class:`EventBus` as the process default.""" 384 bus = EventBus(redis_url=redis_url, stream=stream, maxlen=maxlen) 385 set_event_bus(bus) 386 return bus 387 388 389__all__ = [ 390 "EVENT_KINDS", 391 "EventBus", 392 "EventBusReader", 393 "NullEventBus", 394 "get_event_bus", 395 "install_default_bus", 396 "set_event_bus", 397]