1"""ControlInbox — consumer for ``maf:control:in`` commands. 2 3Commands 4-------- 5``run_arena`` 6 Trigger a single arena run. ``args``: ``{"arena": str, "target": dict, 7 "correlation_id"?: str}``. Result: ``{"signal", "synthesis_verdict", 8 "synthesis_confidence", "arena_id"}``. 9``configure_arena`` 10 Patch a live arena's selected_analysts list. ``args``: 11 ``{"arena": str, "selected_analysts": list[str]}``. 12``set_data_source`` 13 Rebind a source on a live arena. ``args``: ``{"arena": str, "name": str, 14 "adapter": str, "config": dict}``. 15``reload_config`` 16 Re-read ``config/default.yaml`` + arena YAMLs. ``args``: ``{}``. 17``health`` 18 Quick connectivity / arena listing. ``args``: ``{}``. 19``list_arenas`` 20 Return arena names. ``args``: ``{}``. 21 22Design 23------ 24*Independent task*: the inbox runs as an asyncio task started by 25:meth:`MAFApp.start`. Errors in a single command produce a NACK ack but 26never crash the loop. 27 28*Consumer group*: uses ``XREADGROUP`` with a stable group name so multiple 29MAF instances behind the same Redis can split traffic. Each command is 30XACKed after it completes — if MAF crashes mid-command, the next instance 31picks it up. 32""" 33 34from __future__ import annotations 35 36import asyncio 37import json 38import logging 39import os 40import time 41import uuid 42from datetime import UTC, datetime 43from typing import Any 44 45from maf.streaming import get_event_bus 46 47logger = logging.getLogger(__name__) 48 49 50def _utcnow_iso() -> str: 51 return datetime.now(UTC).isoformat() 52 53 54class ControlInbox: 55 """Async control-stream consumer. 56 57 Wires into a :class:`MAFApp` and dispatches commands against it. 58 """ 59 60 # Allow-list of commands. Unknown commands → NACK ack with descriptive 61 # error rather than silent drop. 62 COMMANDS = frozenset({ 63 "run_arena", 64 "configure_arena", 65 "set_data_source", 66 "reload_config", 67 "health", 68 "list_arenas", 69 }) 70 71 def __init__( 72 self, 73 maf_app: Any, 74 in_stream: str | None = None, 75 out_stream: str | None = None, 76 group: str = "maf-control", 77 consumer: str | None = None, 78 redis_url: str | None = None, 79 ) -> None: 80 self.maf_app = maf_app 81 cfg = getattr(maf_app, "config", None) 82 streams = getattr(cfg, "streams", None) 83 self.in_stream = in_stream or ( 84 getattr(streams, "control_in", None) if streams else None 85 ) or "maf:control:in" 86 self.out_stream = out_stream or ( 87 getattr(streams, "control_out", None) if streams else None 88 ) or "maf:control:out" 89 self.group = group 90 self.consumer = consumer or f"maf-{uuid.uuid4().hex[:8]}" 91 self.redis_url = redis_url or ( 92 getattr(cfg, "redis_url", None) 93 or os.environ.get("REDIS_URL", "redis://localhost:6379/0") 94 ) 95 self._redis: Any = None 96 self._stop = asyncio.Event() 97 98 async def _get_redis(self) -> Any: 99 if self._redis is None: 100 import redis.asyncio as aioredis 101 102 self._redis = aioredis.from_url(self.redis_url) 103 return self._redis 104 105 async def _ensure_group(self, client: Any) -> None: 106 """Create the consumer group if it doesn't exist.""" 107 try: 108 await client.xgroup_create( 109 self.in_stream, self.group, id="$", mkstream=True, 110 ) 111 logger.info( 112 "ControlInbox: created consumer group %s on %s", 113 self.group, self.in_stream, 114 ) 115 except Exception as exc: 116 # BUSYGROUP — already exists. That's fine. 117 if "BUSYGROUP" in str(exc): 118 return 119 logger.warning("ControlInbox: xgroup_create failed: %s", exc) 120 121 async def run(self) -> None: 122 """Consume commands until :meth:`stop` is called.""" 123 client = await self._get_redis() 124 await self._ensure_group(client) 125 logger.info( 126 "ControlInbox: listening on %s (group=%s consumer=%s) → acks on %s", 127 self.in_stream, self.group, self.consumer, self.out_stream, 128 ) 129 while not self._stop.is_set(): 130 try: 131 resp = await client.xreadgroup( 132 self.group, 133 self.consumer, 134 {self.in_stream: ">"}, 135 block=5000, 136 count=10, 137 ) 138 except Exception as exc: 139 logger.warning("ControlInbox: xreadgroup failed: %s", exc) 140 await asyncio.sleep(1.0) 141 continue 142 if not resp: 143 continue 144 for _stream, entries in resp: 145 for entry_id, fields in entries: 146 sid = ( 147 entry_id.decode("utf-8") 148 if isinstance(entry_id, bytes) 149 else str(entry_id) 150 ) 151 cmd = _decode(fields) 152 await self._handle(client, sid, cmd) 153 try: 154 await client.xack(self.in_stream, self.group, entry_id) 155 except Exception as exc: 156 logger.warning("ControlInbox: xack failed: %s", exc) 157 158 def stop(self) -> None: 159 self._stop.set() 160 161 async def aclose(self) -> None: 162 self._stop.set() 163 if self._redis is None: 164 return 165 try: 166 aclose = getattr(self._redis, "aclose", None) 167 if aclose is not None: 168 await aclose() 169 else: 170 close = getattr(self._redis, "close", None) 171 if close is not None: 172 await close() 173 except Exception: 174 pass 175 176 # ── Dispatch ──────────────────────────────────────────────────────────── 177 178 async def _handle(self, client: Any, stream_id: str, cmd: dict[str, Any]) -> None: 179 bus = get_event_bus() 180 command = cmd.get("command", "") 181 correlation_id = cmd.get("correlation_id") or uuid.uuid4().hex 182 args = cmd.get("args") or {} 183 184 await bus.publish( 185 "control.command", 186 correlation_id=correlation_id, 187 payload={"command": command, "args": args, "stream_id": stream_id}, 188 ) 189 190 ok = False 191 result: Any = None 192 error: str | None = None 193 t0 = time.monotonic() 194 try: 195 if command not in self.COMMANDS: 196 raise ValueError( 197 f"unknown command: {command!r}. Known: {sorted(self.COMMANDS)}" 198 ) 199 handler = getattr(self, f"_cmd_{command}") 200 result = await handler(args, correlation_id) 201 ok = True 202 except Exception as exc: 203 error = f"{type(exc).__name__}: {exc}" 204 logger.exception("ControlInbox: %s failed", command) 205 206 elapsed_s = round(time.monotonic() - t0, 3) 207 ack = { 208 "schema_version": "1", 209 "correlation_id": correlation_id, 210 "command": command, 211 "ok": ok, 212 "result": result, 213 "error": error, 214 "elapsed_s": elapsed_s, 215 "ts": _utcnow_iso(), 216 } 217 try: 218 await client.xadd( 219 self.out_stream, 220 {"data": json.dumps(ack, default=str)}, 221 maxlen=10_000, 222 approximate=True, 223 ) 224 except Exception as exc: 225 logger.warning("ControlInbox: ack publish failed: %s", exc) 226 227 await bus.publish( 228 "control.ack", 229 correlation_id=correlation_id, 230 payload={"command": command, "ok": ok, "elapsed_s": elapsed_s}, 231 ) 232 233 # ── Command implementations ───────────────────────────────────────────── 234 235 async def _cmd_run_arena(self, args: dict[str, Any], correlation_id: str) -> dict[str, Any]: 236 arena = args.get("arena") or "" 237 target = args.get("target") or {} 238 # Caller can pin the action mode per-request (auto/semi/manual). The 239 # default is conservative because anyone fanning commands through the 240 # control plane should explicitly opt in to auto-execution. 241 action_mode = args.get("action_mode") or "manual" 242 emit_action = bool(args.get("emit_action", arena == "trading_intelligence")) 243 if not arena: 244 raise ValueError("run_arena requires 'arena'") 245 if arena not in self.maf_app.list_arenas(): 246 raise KeyError( 247 f"arena {arena!r} not found. Available: {self.maf_app.list_arenas()}" 248 ) 249 arena_obj = self.maf_app.get_arena(arena) 250 state = await arena_obj.run(target=target or None, correlation_id=correlation_id) 251 252 # If trading_intelligence ran, also publish the rich envelope. 253 try: 254 if arena == "trading_intelligence": 255 output_stream = arena_obj.config.output_stream or "" 256 if output_stream: 257 await self.maf_app._publish_trading_intelligence_envelope( 258 state, output_stream, 259 ) 260 except Exception: 261 logger.exception("ControlInbox: post-run envelope publish failed") 262 263 # Structured output — routed by arena.config.target_key. Trading 264 # arenas (target_key="ticker") publish a TradingAction; everything 265 # else publishes a GenericDecision to maf:decisions:out. 266 try: 267 if emit_action: 268 await self.maf_app._publish_arena_output( 269 arena, state, action_mode=action_mode, 270 ) 271 except Exception: 272 logger.exception("ControlInbox: arena output publish failed") 273 274 return { 275 "arena": arena, 276 "arena_id": state.get("arena_id", ""), 277 "signal": state.get("signal", ""), 278 "synthesis_verdict": state.get("synthesis_verdict", ""), 279 "synthesis_confidence": state.get("synthesis_confidence", 0.0), 280 "synthesis_score": state.get("synthesis_score", 0.0), 281 "reports": list((state.get("reports") or {}).keys()), 282 "decisions": list((state.get("decisions") or {}).keys()), 283 "agent_signals_count": len(state.get("agent_signals") or []), 284 "elapsed_s": state.get("arena_total_seconds"), 285 } 286 287 async def _cmd_configure_arena( 288 self, args: dict[str, Any], correlation_id: str, 289 ) -> dict[str, Any]: 290 arena = args.get("arena") or "" 291 arena_obj = self.maf_app.get_arena(arena) 292 if arena_obj is None: 293 raise KeyError(f"arena {arena!r} not found") 294 changed: dict[str, Any] = {} 295 if "selected_analysts" in args: 296 arena_obj.config.selected_analysts = args["selected_analysts"] 297 # Invalidate cached graph so next run picks up the filter. 298 arena_obj._graph = None 299 changed["selected_analysts"] = arena_obj.config.selected_analysts 300 if "max_iterations" in args: 301 arena_obj.config.max_iterations = int(args["max_iterations"]) 302 changed["max_iterations"] = arena_obj.config.max_iterations 303 return {"arena": arena, "changed": changed} 304 305 async def _cmd_set_data_source( 306 self, args: dict[str, Any], correlation_id: str, 307 ) -> dict[str, Any]: 308 from maf.config import SourceBinding 309 310 arena = args.get("arena") or "" 311 name = args.get("name") or "" 312 adapter = args.get("adapter") or "" 313 cfg = args.get("config") or {} 314 if not arena or not name or not adapter: 315 raise ValueError("set_data_source requires arena, name, adapter") 316 arena_obj = self.maf_app.get_arena(arena) 317 if arena_obj is None: 318 raise KeyError(f"arena {arena!r} not found") 319 binding = SourceBinding(name=name, adapter=adapter, config=cfg) 320 arena_obj.source_registry.bind(binding) 321 # Also update the in-memory config so dashboard reflects the change. 322 existing = [b for b in arena_obj.config.sources if b.name != name] 323 existing.append(binding) 324 arena_obj.config.sources = existing 325 return {"arena": arena, "source": name, "adapter": adapter} 326 327 async def _cmd_reload_config( 328 self, args: dict[str, Any], correlation_id: str, 329 ) -> dict[str, Any]: 330 from maf.config import load_config 331 332 new_config = load_config() 333 # Rebuild arenas in place so live consumers don't see a flicker. 334 arenas_before = self.maf_app.list_arenas() 335 self.maf_app.config = new_config 336 new_arenas: dict[str, Any] = {} 337 for cfg in new_config.arenas: 338 new_arenas[cfg.name] = self.maf_app._build_arena(cfg) 339 self.maf_app._arenas = new_arenas 340 return { 341 "before": arenas_before, 342 "after": self.maf_app.list_arenas(), 343 } 344 345 async def _cmd_health( 346 self, args: dict[str, Any], correlation_id: str, 347 ) -> dict[str, Any]: 348 return { 349 "arenas": self.maf_app.list_arenas(), 350 "modules": [m.module_name for m in self.maf_app.modules], 351 "redis_url": self.maf_app.config.redis_url, 352 "streams": self.maf_app.config.streams.model_dump(), 353 } 354 355 async def _cmd_list_arenas( 356 self, args: dict[str, Any], correlation_id: str, 357 ) -> dict[str, Any]: 358 out = [] 359 for name in self.maf_app.list_arenas(): 360 arena = self.maf_app.get_arena(name) 361 if arena: 362 out.append({ 363 "name": name, 364 "description": arena.config.description, 365 "schedule": arena.config.schedule, 366 "phases": len(arena.config.phases), 367 "sources": len(arena.config.sources), 368 }) 369 return {"arenas": out} 370 371 372def _decode(fields: Any) -> dict[str, Any]: 373 """Decode a control message into a dict. Tolerates malformed JSON.""" 374 raw = None 375 if isinstance(fields, dict): 376 raw = fields.get(b"data") or fields.get("data") 377 if raw is None: 378 return {"command": "", "args": {}, "correlation_id": ""} 379 if isinstance(raw, bytes): 380 raw = raw.decode("utf-8") 381 try: 382 body = json.loads(raw) 383 except (json.JSONDecodeError, TypeError): 384 return {"command": "", "args": {}, "correlation_id": ""} 385 if not isinstance(body, dict): 386 return {"command": "", "args": {}, "correlation_id": ""} 387 return body 388 389 390__all__ = ["ControlInbox"]