checking system…
Docs / back / src/maf/control/inbox.py · line 70
Python · 391 lines
  1"""ControlInbox — consumer for ``maf:control:in`` commands.
  2
  3Commands
  4--------
  5``run_arena``
  6    Trigger a single arena run. ``args``: ``{"arena": str, "target": dict,
  7    "correlation_id"?: str}``. Result: ``{"signal", "synthesis_verdict",
  8    "synthesis_confidence", "arena_id"}``.
  9``configure_arena``
 10    Patch a live arena's selected_analysts list. ``args``:
 11    ``{"arena": str, "selected_analysts": list[str]}``.
 12``set_data_source``
 13    Rebind a source on a live arena. ``args``: ``{"arena": str, "name": str,
 14    "adapter": str, "config": dict}``.
 15``reload_config``
 16    Re-read ``config/default.yaml`` + arena YAMLs. ``args``: ``{}``.
 17``health``
 18    Quick connectivity / arena listing. ``args``: ``{}``.
 19``list_arenas``
 20    Return arena names. ``args``: ``{}``.
 21
 22Design
 23------
 24*Independent task*: the inbox runs as an asyncio task started by
 25:meth:`MAFApp.start`. Errors in a single command produce a NACK ack but
 26never crash the loop.
 27
 28*Consumer group*: uses ``XREADGROUP`` with a stable group name so multiple
 29MAF instances behind the same Redis can split traffic. Each command is
 30XACKed after it completes — if MAF crashes mid-command, the next instance
 31picks it up.
 32"""
 33
 34from __future__ import annotations
 35
 36import asyncio
 37import json
 38import logging
 39import os
 40import time
 41import uuid
 42from datetime import UTC, datetime
 43from typing import Any
 44
 45from maf.streaming import get_event_bus
 46
 47logger = logging.getLogger(__name__)
 48
 49
 50def _utcnow_iso() -> str:
 51    return datetime.now(UTC).isoformat()
 52
 53
 54class ControlInbox:
 55    """Async control-stream consumer.
 56
 57    Wires into a :class:`MAFApp` and dispatches commands against it.
 58    """
 59
 60    # Allow-list of commands. Unknown commands → NACK ack with descriptive
 61    # error rather than silent drop.
 62    COMMANDS = frozenset({
 63        "run_arena",
 64        "configure_arena",
 65        "set_data_source",
 66        "reload_config",
 67        "health",
 68        "list_arenas",
 69    })
 70
 71    def __init__(
 72        self,
 73        maf_app: Any,
 74        in_stream: str | None = None,
 75        out_stream: str | None = None,
 76        group: str = "maf-control",
 77        consumer: str | None = None,
 78        redis_url: str | None = None,
 79    ) -> None:
 80        self.maf_app = maf_app
 81        cfg = getattr(maf_app, "config", None)
 82        streams = getattr(cfg, "streams", None)
 83        self.in_stream = in_stream or (
 84            getattr(streams, "control_in", None) if streams else None
 85        ) or "maf:control:in"
 86        self.out_stream = out_stream or (
 87            getattr(streams, "control_out", None) if streams else None
 88        ) or "maf:control:out"
 89        self.group = group
 90        self.consumer = consumer or f"maf-{uuid.uuid4().hex[:8]}"
 91        self.redis_url = redis_url or (
 92            getattr(cfg, "redis_url", None)
 93            or os.environ.get("REDIS_URL", "redis://localhost:6379/0")
 94        )
 95        self._redis: Any = None
 96        self._stop = asyncio.Event()
 97
 98    async def _get_redis(self) -> Any:
 99        if self._redis is None:
100            import redis.asyncio as aioredis
101
102            self._redis = aioredis.from_url(self.redis_url)
103        return self._redis
104
105    async def _ensure_group(self, client: Any) -> None:
106        """Create the consumer group if it doesn't exist."""
107        try:
108            await client.xgroup_create(
109                self.in_stream, self.group, id="$", mkstream=True,
110            )
111            logger.info(
112                "ControlInbox: created consumer group %s on %s",
113                self.group, self.in_stream,
114            )
115        except Exception as exc:
116            # BUSYGROUP — already exists. That's fine.
117            if "BUSYGROUP" in str(exc):
118                return
119            logger.warning("ControlInbox: xgroup_create failed: %s", exc)
120
121    async def run(self) -> None:
122        """Consume commands until :meth:`stop` is called."""
123        client = await self._get_redis()
124        await self._ensure_group(client)
125        logger.info(
126            "ControlInbox: listening on %s (group=%s consumer=%s) → acks on %s",
127            self.in_stream, self.group, self.consumer, self.out_stream,
128        )
129        while not self._stop.is_set():
130            try:
131                resp = await client.xreadgroup(
132                    self.group,
133                    self.consumer,
134                    {self.in_stream: ">"},
135                    block=5000,
136                    count=10,
137                )
138            except Exception as exc:
139                logger.warning("ControlInbox: xreadgroup failed: %s", exc)
140                await asyncio.sleep(1.0)
141                continue
142            if not resp:
143                continue
144            for _stream, entries in resp:
145                for entry_id, fields in entries:
146                    sid = (
147                        entry_id.decode("utf-8")
148                        if isinstance(entry_id, bytes)
149                        else str(entry_id)
150                    )
151                    cmd = _decode(fields)
152                    await self._handle(client, sid, cmd)
153                    try:
154                        await client.xack(self.in_stream, self.group, entry_id)
155                    except Exception as exc:
156                        logger.warning("ControlInbox: xack failed: %s", exc)
157
158    def stop(self) -> None:
159        self._stop.set()
160
161    async def aclose(self) -> None:
162        self._stop.set()
163        if self._redis is None:
164            return
165        try:
166            aclose = getattr(self._redis, "aclose", None)
167            if aclose is not None:
168                await aclose()
169            else:
170                close = getattr(self._redis, "close", None)
171                if close is not None:
172                    await close()
173        except Exception:
174            pass
175
176    # ── Dispatch ────────────────────────────────────────────────────────────
177
178    async def _handle(self, client: Any, stream_id: str, cmd: dict[str, Any]) -> None:
179        bus = get_event_bus()
180        command = cmd.get("command", "")
181        correlation_id = cmd.get("correlation_id") or uuid.uuid4().hex
182        args = cmd.get("args") or {}
183
184        await bus.publish(
185            "control.command",
186            correlation_id=correlation_id,
187            payload={"command": command, "args": args, "stream_id": stream_id},
188        )
189
190        ok = False
191        result: Any = None
192        error: str | None = None
193        t0 = time.monotonic()
194        try:
195            if command not in self.COMMANDS:
196                raise ValueError(
197                    f"unknown command: {command!r}. Known: {sorted(self.COMMANDS)}"
198                )
199            handler = getattr(self, f"_cmd_{command}")
200            result = await handler(args, correlation_id)
201            ok = True
202        except Exception as exc:
203            error = f"{type(exc).__name__}: {exc}"
204            logger.exception("ControlInbox: %s failed", command)
205
206        elapsed_s = round(time.monotonic() - t0, 3)
207        ack = {
208            "schema_version": "1",
209            "correlation_id": correlation_id,
210            "command": command,
211            "ok": ok,
212            "result": result,
213            "error": error,
214            "elapsed_s": elapsed_s,
215            "ts": _utcnow_iso(),
216        }
217        try:
218            await client.xadd(
219                self.out_stream,
220                {"data": json.dumps(ack, default=str)},
221                maxlen=10_000,
222                approximate=True,
223            )
224        except Exception as exc:
225            logger.warning("ControlInbox: ack publish failed: %s", exc)
226
227        await bus.publish(
228            "control.ack",
229            correlation_id=correlation_id,
230            payload={"command": command, "ok": ok, "elapsed_s": elapsed_s},
231        )
232
233    # ── Command implementations ─────────────────────────────────────────────
234
235    async def _cmd_run_arena(self, args: dict[str, Any], correlation_id: str) -> dict[str, Any]:
236        arena = args.get("arena") or ""
237        target = args.get("target") or {}
238        # Caller can pin the action mode per-request (auto/semi/manual). The
239        # default is conservative because anyone fanning commands through the
240        # control plane should explicitly opt in to auto-execution.
241        action_mode = args.get("action_mode") or "manual"
242        emit_action = bool(args.get("emit_action", arena == "trading_intelligence"))
243        if not arena:
244            raise ValueError("run_arena requires 'arena'")
245        if arena not in self.maf_app.list_arenas():
246            raise KeyError(
247                f"arena {arena!r} not found. Available: {self.maf_app.list_arenas()}"
248            )
249        arena_obj = self.maf_app.get_arena(arena)
250        state = await arena_obj.run(target=target or None, correlation_id=correlation_id)
251
252        # If trading_intelligence ran, also publish the rich envelope.
253        try:
254            if arena == "trading_intelligence":
255                output_stream = arena_obj.config.output_stream or ""
256                if output_stream:
257                    await self.maf_app._publish_trading_intelligence_envelope(
258                        state, output_stream,
259                    )
260        except Exception:
261            logger.exception("ControlInbox: post-run envelope publish failed")
262
263        # Structured output — routed by arena.config.target_key. Trading
264        # arenas (target_key="ticker") publish a TradingAction; everything
265        # else publishes a GenericDecision to maf:decisions:out.
266        try:
267            if emit_action:
268                await self.maf_app._publish_arena_output(
269                    arena, state, action_mode=action_mode,
270                )
271        except Exception:
272            logger.exception("ControlInbox: arena output publish failed")
273
274        return {
275            "arena": arena,
276            "arena_id": state.get("arena_id", ""),
277            "signal": state.get("signal", ""),
278            "synthesis_verdict": state.get("synthesis_verdict", ""),
279            "synthesis_confidence": state.get("synthesis_confidence", 0.0),
280            "synthesis_score": state.get("synthesis_score", 0.0),
281            "reports": list((state.get("reports") or {}).keys()),
282            "decisions": list((state.get("decisions") or {}).keys()),
283            "agent_signals_count": len(state.get("agent_signals") or []),
284            "elapsed_s": state.get("arena_total_seconds"),
285        }
286
287    async def _cmd_configure_arena(
288        self, args: dict[str, Any], correlation_id: str,
289    ) -> dict[str, Any]:
290        arena = args.get("arena") or ""
291        arena_obj = self.maf_app.get_arena(arena)
292        if arena_obj is None:
293            raise KeyError(f"arena {arena!r} not found")
294        changed: dict[str, Any] = {}
295        if "selected_analysts" in args:
296            arena_obj.config.selected_analysts = args["selected_analysts"]
297            # Invalidate cached graph so next run picks up the filter.
298            arena_obj._graph = None
299            changed["selected_analysts"] = arena_obj.config.selected_analysts
300        if "max_iterations" in args:
301            arena_obj.config.max_iterations = int(args["max_iterations"])
302            changed["max_iterations"] = arena_obj.config.max_iterations
303        return {"arena": arena, "changed": changed}
304
305    async def _cmd_set_data_source(
306        self, args: dict[str, Any], correlation_id: str,
307    ) -> dict[str, Any]:
308        from maf.config import SourceBinding
309
310        arena = args.get("arena") or ""
311        name = args.get("name") or ""
312        adapter = args.get("adapter") or ""
313        cfg = args.get("config") or {}
314        if not arena or not name or not adapter:
315            raise ValueError("set_data_source requires arena, name, adapter")
316        arena_obj = self.maf_app.get_arena(arena)
317        if arena_obj is None:
318            raise KeyError(f"arena {arena!r} not found")
319        binding = SourceBinding(name=name, adapter=adapter, config=cfg)
320        arena_obj.source_registry.bind(binding)
321        # Also update the in-memory config so dashboard reflects the change.
322        existing = [b for b in arena_obj.config.sources if b.name != name]
323        existing.append(binding)
324        arena_obj.config.sources = existing
325        return {"arena": arena, "source": name, "adapter": adapter}
326
327    async def _cmd_reload_config(
328        self, args: dict[str, Any], correlation_id: str,
329    ) -> dict[str, Any]:
330        from maf.config import load_config
331
332        new_config = load_config()
333        # Rebuild arenas in place so live consumers don't see a flicker.
334        arenas_before = self.maf_app.list_arenas()
335        self.maf_app.config = new_config
336        new_arenas: dict[str, Any] = {}
337        for cfg in new_config.arenas:
338            new_arenas[cfg.name] = self.maf_app._build_arena(cfg)
339        self.maf_app._arenas = new_arenas
340        return {
341            "before": arenas_before,
342            "after": self.maf_app.list_arenas(),
343        }
344
345    async def _cmd_health(
346        self, args: dict[str, Any], correlation_id: str,
347    ) -> dict[str, Any]:
348        return {
349            "arenas": self.maf_app.list_arenas(),
350            "modules": [m.module_name for m in self.maf_app.modules],
351            "redis_url": self.maf_app.config.redis_url,
352            "streams": self.maf_app.config.streams.model_dump(),
353        }
354
355    async def _cmd_list_arenas(
356        self, args: dict[str, Any], correlation_id: str,
357    ) -> dict[str, Any]:
358        out = []
359        for name in self.maf_app.list_arenas():
360            arena = self.maf_app.get_arena(name)
361            if arena:
362                out.append({
363                    "name": name,
364                    "description": arena.config.description,
365                    "schedule": arena.config.schedule,
366                    "phases": len(arena.config.phases),
367                    "sources": len(arena.config.sources),
368                })
369        return {"arenas": out}
370
371
372def _decode(fields: Any) -> dict[str, Any]:
373    """Decode a control message into a dict. Tolerates malformed JSON."""
374    raw = None
375    if isinstance(fields, dict):
376        raw = fields.get(b"data") or fields.get("data")
377    if raw is None:
378        return {"command": "", "args": {}, "correlation_id": ""}
379    if isinstance(raw, bytes):
380        raw = raw.decode("utf-8")
381    try:
382        body = json.loads(raw)
383    except (json.JSONDecodeError, TypeError):
384        return {"command": "", "args": {}, "correlation_id": ""}
385    if not isinstance(body, dict):
386        return {"command": "", "args": {}, "correlation_id": ""}
387    return body
388
389
390__all__ = ["ControlInbox"]