checking system…
Docs / back / src/maf/app.py · line 222
Python · 725 lines
  1"""MAF Application — wraps StreamMachine App with arena lifecycle management."""
  2
  3from __future__ import annotations
  4
  5import asyncio
  6import logging
  7import os
  8from pathlib import Path
  9from typing import Any
 10
 11from maf.config import AppConfig, ArenaConfig, ModuleConfig, load_config
 12from maf.core.arena import Arena
 13from maf.core.state import ArenaState
 14from maf.llm.factory import LLMFactory
 15from maf.memory.store import MemoryStore
 16from maf.modules.base import DataModule
 17from maf.sources.registry import SourceRegistry
 18from maf.actions.decisions import (
 19    DecisionOutbox,
 20    build_decision_from_state,
 21)
 22from maf.actions.outbox import ActionOutbox, build_action_from_state
 23from maf.streaming.bus import EventBus, install_default_bus
 24
 25logger = logging.getLogger(__name__)
 26
 27
 28# ---------------------------------------------------------------------------
 29# Module registry — maps module names to classes
 30# ---------------------------------------------------------------------------
 31
 32_MODULE_REGISTRY: dict[str, type[DataModule]] = {}
 33
 34
 35def register_module(name: str, cls: type[DataModule]) -> None:
 36    """Register a data module class."""
 37    _MODULE_REGISTRY[name] = cls
 38
 39
 40def _register_defaults() -> None:
 41    """Register built-in agent roles and data modules."""
 42    from maf.core.arena import register_role
 43
 44    # Agent roles
 45    from maf.agents.analyst import AnalystAgent
 46    from maf.agents.debater import DebaterAgent
 47    from maf.agents.judge import JudgeAgent
 48    from maf.agents.executor import ExecutorAgent
 49    from maf.agents.replan import ReplanAgent
 50    from maf.agents.watcher import WatcherAgent
 51    from maf.agents.specialist import SpecialistAgent
 52    from maf.agents.synthesis import SynthesisAgent
 53
 54    register_role("analyst", AnalystAgent)
 55    register_role("specialist", SpecialistAgent)
 56    register_role("debater", DebaterAgent)
 57    register_role("judge", JudgeAgent)
 58    register_role("executor", ExecutorAgent)
 59    register_role("watcher", WatcherAgent)
 60    # synthesis role maps to the judge slot — it reads all agent_signals
 61    register_role("synthesis", SynthesisAgent)
 62    # replan: confidence-gated re-run controller. See agents/replan.py.
 63    register_role("replan", ReplanAgent)
 64
 65    # Data modules (each registers a family of adapters)
 66    from maf.modules.fomo2_module import Fomo2Module
 67    from maf.modules.kronos_module import KronosModule
 68    from maf.modules.mirofish_module import MirofishModule
 69    from maf.modules.oddsoddy_module import OddsoddyModule
 70    from maf.modules.polymarket_module import PolymarketModule
 71    from maf.modules.tradingagents_module import TradingAgentsModule
 72    from maf.modules.trtools2_module import Trtools2Module
 73    from maf.modules.web_module import WebModule
 74
 75    register_module("fomo2", Fomo2Module)
 76    register_module("trtools2", Trtools2Module)
 77    register_module("kronos", KronosModule)
 78    register_module("web", WebModule)
 79    register_module("mirofish", MirofishModule)
 80    register_module("oddsoddy", OddsoddyModule)
 81    register_module("polymarket", PolymarketModule)
 82    register_module("tradingagents", TradingAgentsModule)
 83
 84
 85def _init_modules(config: AppConfig) -> list[DataModule]:
 86    """Initialize and register all data modules.
 87
 88    If no modules are explicitly configured, all built-in modules are
 89    loaded with default config. This ensures backward compatibility.
 90    """
 91    modules: list[DataModule] = []
 92
 93    if config.modules:
 94        # Explicit module configuration
 95        for mod_cfg in config.modules:
 96            if not mod_cfg.enabled:
 97                logger.info("Module %r disabled, skipping", mod_cfg.name)
 98                continue
 99            cls = _MODULE_REGISTRY.get(mod_cfg.name)
100            if cls is None:
101                logger.warning("Unknown module %r, skipping", mod_cfg.name)
102                continue
103            # Inject redis_url from app config if not set in module config
104            mod_config = dict(mod_cfg.config)
105            if "redis_url" not in mod_config:
106                mod_config["redis_url"] = config.redis_url
107            module = cls(config=mod_config)
108            module.register()
109            modules.append(module)
110    else:
111        # Default: load all registered modules
112        for name, cls in _MODULE_REGISTRY.items():
113            module = cls(config={"redis_url": config.redis_url})
114            module.register()
115            modules.append(module)
116
117    logger.info(
118        "Loaded %d data modules: %s",
119        len(modules), [m.module_name for m in modules],
120    )
121    return modules
122
123
124class MAFApp:
125    """Multi-Agent Framework application.
126
127    Manages arena lifecycle: loads configs, creates arenas, schedules runs
128    via StreamMachine timers, and monitors execution.
129    """
130
131    def __init__(self, config: AppConfig | None = None) -> None:
132        _register_defaults()
133
134        self.config = config or load_config()
135
136        # Install the process-wide EventBus before anything else touches the
137        # arena layer — so every arena run, even ones triggered directly from
138        # code, emits lifecycle events to the realtime stream.
139        self._event_bus: EventBus = install_default_bus(
140            redis_url=self.config.redis_url,
141            stream=self.config.streams.events_stream,
142        )
143        # Action outbox — built lazily on first use to avoid forcing redis on
144        # tests that only import MAFApp.
145        self._action_outbox: ActionOutbox | None = None
146        # Generic-decision outbox for non-trading arenas (target_key != "ticker").
147        self._decision_outbox: DecisionOutbox | None = None
148        # Shared DecisionMemory — used by mastermind arena runs *and* by the
149        # ExecutionHarvester so outcome events can update the originating
150        # decision. Built lazily so tests don't pay the chromadb import cost.
151        self._decision_memory: Any = None
152
153        self._modules = _init_modules(self.config)
154        self._arenas: dict[str, Arena] = {}
155        self._sm_app: Any = None
156        self._control_task: asyncio.Task[Any] | None = None
157
158        # Build arenas from config
159        for arena_cfg in self.config.arenas:
160            self._arenas[arena_cfg.name] = self._build_arena(arena_cfg)
161
162    def _build_arena(self, cfg: ArenaConfig) -> Arena:
163        """Create an Arena with its dependencies."""
164        # LLM factory
165        llm_factory = LLMFactory(cfg.llm)
166
167        # Source registry
168        sources = SourceRegistry()
169        sources.bind_all(cfg.sources)
170
171        # Memory store
172        memory = MemoryStore(cfg.memory, data_dir=self.config.data_dir)
173
174        return Arena(
175            config=cfg,
176            llm_factory=llm_factory,
177            source_registry=sources,
178            memory_store=memory,
179        )
180
181    @property
182    def modules(self) -> list[DataModule]:
183        """Get loaded data modules."""
184        return self._modules
185
186    def get_module(self, name: str) -> DataModule | None:
187        """Get a loaded module by name."""
188        for m in self._modules:
189            if m.module_name == name:
190                return m
191        return None
192
193    def get_arena(self, name: str) -> Arena | None:
194        """Get an arena by name."""
195        return self._arenas.get(name)
196
197    def list_arenas(self) -> list[str]:
198        """List all registered arena names."""
199        return list(self._arenas)
200
201    @property
202    def decision_memory(self) -> Any:
203        """Lazy-built shared :class:`DecisionMemory`.
204
205        Sized to be safe-to-call from anywhere — first call pays the chromadb
206        import; subsequent calls return the same instance so writes from one
207        path (mastermind arena run) are visible to readers in another
208        (ExecutionHarvester.add_outcome).
209        """
210        if self._decision_memory is None:
211            try:
212                from maf.arenas.mastermind.memory import DecisionMemory
213            except ImportError:
214                logger.warning("MAFApp.decision_memory: mastermind not importable")
215                return None
216            self._decision_memory = DecisionMemory(
217                name="mastermind",
218                persist_path=f"{self.config.data_dir}/decision_memory_mastermind",
219            )
220        return self._decision_memory
221
222    async def run_arena(
223        self,
224        name: str,
225        target: dict[str, Any] | None = None,
226        *,
227        correlation_id: str | None = None,
228        emit_action: bool = False,
229        action_mode: str = "manual",
230    ) -> ArenaState:
231        """Run a specific arena and return the result.
232
233        ``emit_action`` set true publishes a :class:`TradingAction` to the
234        action outbox after the run completes. Useful for the control plane
235        and scheduled timer paths so a downstream engine sees a clean,
236        typed action payload.
237        """
238        arena = self._arenas.get(name)
239        if not arena:
240            raise KeyError(f"Arena {name!r} not found. Available: {self.list_arenas()}")
241        state = await arena.run(target=target, correlation_id=correlation_id)
242
243        if emit_action:
244            await self._publish_arena_output(name, state, action_mode=action_mode)
245        return state
246
247    async def _publish_arena_output(
248        self,
249        arena_name: str,
250        state: ArenaState,
251        *,
252        action_mode: str = "manual",
253    ) -> None:
254        """Dispatch to the right outbox based on the arena's ``target_key``.
255
256        Trading arenas (target_key="ticker") publish a TradingAction to
257        ``maf:actions:out``; any other arena publishes a GenericDecision to
258        ``maf:decisions:out``. Backwards compatible: the existing
259        :meth:`_publish_action` keeps working and is still the path used
260        by trading arenas.
261
262        Additionally, every arena run emits a unified
263        :class:`maf.actions.prognosis.Prognosis` envelope to that arena's
264        ``maf:arena:{name}:output`` stream. The Prognosis carries the
265        agnostic-arena fields (``ideas``, ``alternatives_considered``,
266        typed target, key_factors) that the GenericDecision/TradingAction
267        wire shapes don't have room for — the dashboard and external
268        consumers read this single shape across all arenas.
269        """
270        arena = self._arenas.get(arena_name)
271        target_key = (arena.config.target_key if arena else "ticker") or "ticker"
272
273        # Publish to the legacy outboxes (backwards compatibility).
274        if target_key == "ticker":
275            await self._publish_action(
276                arena_name, state, action_mode=action_mode,
277            )
278        else:
279            await self._publish_decision(
280                arena_name, state, target_key=target_key,
281            )
282
283        # Always emit Prognosis to the arena's dedicated output stream.
284        await self._publish_prognosis(arena_name, state)
285
286    async def _publish_prognosis(
287        self,
288        arena_name: str,
289        state: ArenaState,
290    ) -> None:
291        """Build + publish a Prognosis envelope. Logs + swallows errors.
292
293        Prognosis is the unified shape every arena emits — verdict
294        (BUY/SELL/HOLD/None) plus a free-form statement, key factors,
295        ideas shortlist, alternatives considered, data quality.
296
297        Published to ``<arena.output_stream>:prognosis`` (a dedicated
298        Prognosis-only stream) so the legacy ``output_stream`` shape
299        is preserved unchanged for existing consumers. New consumers
300        subscribe to the ``:prognosis`` suffix to get the agnostic
301        envelope.
302        """
303        try:
304            from maf.actions.prognosis import from_arena_state
305
306            arena = self._arenas.get(arena_name)
307            base_stream = (
308                arena.config.output_stream if arena else None
309            ) or f"maf:arena:{arena_name}:output"
310            # Distinct stream — does not collide with the legacy thin
311            # payload some scheduled arenas write to base_stream.
312            stream = f"{base_stream}:prognosis"
313
314            prognosis = from_arena_state(arena_name, dict(state))
315            payload = prognosis.model_dump_json()
316
317            import redis.asyncio as aioredis
318            client = aioredis.from_url(self.config.redis_url)
319            try:
320                stream_id = await client.xadd(
321                    stream, {"data": payload}, maxlen=50_000, approximate=True,
322                )
323            finally:
324                ac = getattr(client, "aclose", None)
325                if ac is not None:
326                    await ac()
327                else:
328                    await client.close()
329
330            logger.debug(
331                "Prognosis published: arena=%s stream=%s verdict=%s ideas=%d alts=%d",
332                arena_name, stream, prognosis.verdict,
333                len(prognosis.ideas), len(prognosis.alternatives_considered),
334            )
335            await self._event_bus.publish(
336                "decision.emit",
337                arena=arena_name,
338                arena_id=prognosis.arena_id,
339                correlation_id=prognosis.correlation_id,
340                payload={
341                    "kind": "prognosis",
342                    "stream": stream,
343                    "verdict": prognosis.verdict,
344                    "statement": prognosis.statement[:200],
345                    "ideas_count": len(prognosis.ideas),
346                    "alternatives_count": len(prognosis.alternatives_considered),
347                    "stream_id": stream_id.decode() if isinstance(stream_id, bytes) else str(stream_id),
348                },
349            )
350        except Exception:
351            logger.exception(
352                "Prognosis publish failed (arena=%s) — falling back to "
353                "legacy envelopes only", arena_name,
354            )
355
356    async def _publish_decision(
357        self,
358        arena_name: str,
359        state: ArenaState,
360        *,
361        target_key: str,
362    ) -> None:
363        """Build + publish a GenericDecision. Logs+swallows errors."""
364        try:
365            decision = build_decision_from_state(
366                dict(state), arena=arena_name, target_key=target_key,
367            )
368            if decision is None:
369                logger.debug(
370                    "Decision outbox: skipping (no %r in target for arena=%s)",
371                    target_key, arena_name,
372                )
373                return
374            if self._decision_outbox is None:
375                self._decision_outbox = DecisionOutbox(
376                    redis_url=self.config.redis_url,
377                )
378            stream_id = await self._decision_outbox.publish(decision)
379            logger.info(
380                "Decision published: arena=%s target_key=%s target=%s "
381                "verdict=%s stream_id=%s",
382                arena_name, target_key, decision.target,
383                decision.verdict, stream_id,
384            )
385            await self._event_bus.publish(
386                "decision.emit",
387                arena=arena_name,
388                arena_id=decision.arena_id,
389                correlation_id=decision.correlation_id,
390                payload={
391                    "kind": "generic_decision",
392                    "target_key": target_key,
393                    "target": decision.target,
394                    "verdict": decision.verdict,
395                    "confidence": decision.confidence,
396                    "stream_id": stream_id,
397                },
398            )
399        except Exception:
400            logger.exception(
401                "Decision publish failed (arena=%s)", arena_name,
402            )
403
404    async def _publish_action(
405        self,
406        arena_name: str,
407        state: ArenaState,
408        *,
409        action_mode: str = "manual",
410    ) -> None:
411        """Build + publish a TradingAction. Logs+swallows errors."""
412        try:
413            action = build_action_from_state(
414                dict(state), arena=arena_name, mode=action_mode,  # type: ignore[arg-type]
415            )
416            if action is None:
417                logger.debug(
418                    "Action outbox: skipping (no ticker in state for arena=%s)",
419                    arena_name,
420                )
421                return
422            if self._action_outbox is None:
423                self._action_outbox = ActionOutbox(
424                    redis_url=self.config.redis_url,
425                    stream=self.config.streams.actions_out,
426                )
427            stream_id = await self._action_outbox.publish(action)
428            logger.info(
429                "Action published: ticker=%s verdict=%s mode=%s stream_id=%s",
430                action.target.ticker, action.verdict, action.mode, stream_id,
431            )
432            await self._event_bus.publish(
433                "action.emit",
434                arena=arena_name,
435                arena_id=action.arena_id,
436                correlation_id=action.correlation_id,
437                payload={
438                    "ticker": action.target.ticker,
439                    "verdict": action.verdict,
440                    "mode": action.mode,
441                    "confidence": action.sizing.confidence,
442                    "size_fraction": action.sizing.size_fraction,
443                    "stream_id": stream_id,
444                },
445            )
446        except Exception:
447            logger.exception("Action publish failed (arena=%s)", arena_name)
448
449    def start(self) -> None:
450        """Start the MAF application with StreamMachine integration.
451
452        Registers all scheduled arenas as StreamMachine timers, starts the
453        inbound control plane consumer on ``maf:control:in``, and starts the
454        event loop.
455        """
456        try:
457            from streammachine import App
458            from streammachine.models import TimerConfig
459        except ImportError:
460            logger.error("streammachine is required to run MAF as a service")
461            raise
462
463        self._sm_app = App(name=self.config.name, to_scan=False)
464
465        # Inbound control plane — register as a StreamMachine timer that fires
466        # the consumer loop once on startup. ``ControlInbox.run`` is
467        # long-running; we kick it off as a fire-and-forget asyncio task.
468        from maf.control.inbox import ControlInbox
469
470        inbox = ControlInbox(self)
471
472        async def _start_control_inbox() -> None:
473            if self._control_task is None or self._control_task.done():
474                self._control_task = asyncio.create_task(inbox.run())
475                logger.info("ControlInbox task scheduled")
476
477        # StreamMachine doesn't expose a clean "on_startup" hook in our
478        # version, so we register a one-shot timer with a tiny interval and
479        # let it exit fast. Re-arming is idempotent thanks to the
480        # ``done()`` check above.
481        self._sm_app.registry.add(TimerConfig(
482            interval=15,
483            obj_name="maf_control_inbox",
484            mod=__name__,
485            fn=_start_control_inbox,
486        ))
487
488        # ── Phase 2 workers ───────────────────────────────────────────────
489        # Kronos refresher, Mirofish refresher, and TriggerDispatcher all
490        # follow the same idempotent-launch pattern as ControlInbox. They
491        # cost zero when the watch list is empty / no triggers fire.
492
493        from maf.scheduler.kronos_refresher import KronosRefresher
494        from maf.scheduler.mirofish_refresher import MirofishRefresher
495        from maf.triggers.dispatcher import TriggerDispatcher
496        from maf.triggers.dispatcher import register_arena_triggers
497
498        self._kronos_task: asyncio.Task[Any] | None = None
499        self._mirofish_task: asyncio.Task[Any] | None = None
500        self._dispatcher_task: asyncio.Task[Any] | None = None
501
502        kronos_module = self.get_module("kronos")
503        kronos_sidecar = (
504            kronos_module.config.get("sidecar_url")
505            if kronos_module else None
506        )
507        kronos = KronosRefresher(
508            sidecar_url=kronos_sidecar,
509            redis_url=self.config.redis_url,
510        )
511        mirofish_module = self.get_module("mirofish")
512        mirofish_url = (
513            mirofish_module.config.get("base_url")
514            if mirofish_module else None
515        )
516        mirofish_enabled = bool(mirofish_module)
517        mirofish = MirofishRefresher(
518            mirofish_url=mirofish_url,
519            redis_url=self.config.redis_url,
520        ) if mirofish_enabled else None
521
522        all_rules: list[Any] = []
523        for arena_name, arena in self._arenas.items():
524            block = [t.model_dump() for t in (arena.config.triggers or [])]
525            all_rules.extend(register_arena_triggers(arena_name, block))
526        dispatcher = TriggerDispatcher(
527            all_rules,
528            redis_url=self.config.redis_url,
529            cost_cap_eur_per_hour=self.config.max_cost_per_hour_eur,
530        ) if all_rules else None
531
532        async def _start_workers() -> None:
533            if self._kronos_task is None or self._kronos_task.done():
534                self._kronos_task = asyncio.create_task(kronos.run())
535                logger.info("KronosRefresher task scheduled")
536            if mirofish and (
537                self._mirofish_task is None or self._mirofish_task.done()
538            ):
539                self._mirofish_task = asyncio.create_task(mirofish.run())
540                logger.info("MirofishRefresher task scheduled")
541            if dispatcher and (
542                self._dispatcher_task is None or self._dispatcher_task.done()
543            ):
544                self._dispatcher_task = asyncio.create_task(dispatcher.run())
545                logger.info(
546                    "TriggerDispatcher task scheduled (%d rules)",
547                    len(all_rules),
548                )
549
550        self._sm_app.registry.add(TimerConfig(
551            interval=15,
552            obj_name="maf_phase2_workers",
553            mod=__name__,
554            fn=_start_workers,
555        ))
556
557        # Register scheduled arenas as timers
558        for arena_name, arena in self._arenas.items():
559            schedule = arena.config.schedule
560            if not schedule:
561                continue
562
563            # Create timer handler for this arena
564            async def _timer_handler(
565                _name: str = arena_name,
566            ) -> None:
567                try:
568                    state = await self.run_arena(_name)
569                    signal = state.get("signal", "")
570                    logger.info("Arena %s completed: signal=%s", _name, signal)
571
572                    # Publish output to Redis stream if configured.
573                    #
574                    # Per-arena dispatch: trading_intelligence publishes a
575                    # rich :class:`TradingIntelligenceEnvelope` (full agent
576                    # signals + synthesis + source-metrics summary + per-
577                    # (ticker, date) lookup hash) so downstream services
578                    # like the trtools2 engine can build real strategies
579                    # against a typed contract. Other arenas keep the
580                    # legacy thin {arena, signal, target} payload until
581                    # they grow their own envelope (crowd_simulation and
582                    # mastermind already have theirs and publish from
583                    # inside their own emit phases).
584                    output_stream = self._arenas[_name].config.output_stream
585                    if not output_stream or not self._sm_app:
586                        return
587
588                    # trading_intelligence still emits its rich envelope
589                    # on top of the standard publish path.
590                    if _name == "trading_intelligence":
591                        await self._publish_trading_intelligence_envelope(
592                            state, output_stream,
593                        )
594                    else:
595                        # Preserve the legacy thin payload on the arena's
596                        # output_stream so existing consumers of scheduled
597                        # arena output keep working. The Prognosis envelope
598                        # rides on a separate :prognosis suffix stream — see
599                        # _publish_prognosis.
600                        await self._sm_app.send(output_stream, {
601                            "arena": _name,
602                            "signal": signal,
603                            "target": str(state.get("target", {})),
604                        })
605
606                    # Every scheduled arena ALSO flows through the unified
607                    # publish path: TradingAction OR GenericDecision on
608                    # the legacy outboxes (back-compat), PLUS a Prognosis
609                    # envelope on a distinct stream — so the new ideas /
610                    # alternatives_considered fields reach consumers on
611                    # both the ad-hoc /run path and the cron path.
612                    sched_mode = os.environ.get(
613                        "MAF_SCHEDULED_ACTION_MODE", "manual",
614                    )
615                    await self._publish_arena_output(
616                        _name, state, action_mode=sched_mode,
617                    )
618                except Exception:
619                    logger.exception("Arena %s failed", _name)
620
621            # Parse schedule — could be cron or interval in seconds
622            interval = self._parse_schedule(schedule)
623            self._sm_app.registry.add(TimerConfig(
624                interval=interval,
625                obj_name=f"arena_{arena_name}",
626                mod=__name__,
627                fn=_timer_handler,
628            ))
629            logger.info("Registered arena %r with interval %ds", arena_name, interval)
630
631        logger.info("MAF starting with %d arenas", len(self._arenas))
632        self._sm_app.start()
633
634    async def _publish_trading_intelligence_envelope(
635        self,
636        state: Any,
637        output_stream: str,
638    ) -> None:
639        """Build + publish a rich envelope for the trading_intelligence arena.
640
641        Lazy imports keep the rest of MAFApp free of trading_intelligence
642        coupling — other arenas keep working when this module fails to
643        import (e.g. during partial test bootstrap).
644
645        Publishes via two raw-redis paths (not StreamMachine.send):
646          1. ``XADD <output_stream> data <envelope.model_dump_json()>``
647          2. ``HSET maf:arena:trading_intelligence:decisions:{ticker}:{date}``
648             with the same JSON + flat lookup fields, so trtools2 can answer
649             "what does MAF think about NVDA right now?" without tailing.
650
651        Failures are logged but do not propagate — the arena run itself
652        already succeeded; the publish is best-effort.
653        """
654        try:
655            from maf.arenas.trading_intelligence.stream import (
656                build_envelope_from_state,
657                publish_decision_hash,
658                publish_envelope,
659            )
660        except ImportError:
661            logger.exception("trading_intelligence stream module unavailable — skipping envelope publish")
662            return
663
664        try:
665            envelope = build_envelope_from_state(state)
666        except Exception:
667            logger.exception("trading_intelligence: envelope build failed")
668            return
669
670        # Skip publish if degraded run with no ticker — nothing meaningful
671        # to write, and it would create stream pollution.
672        if not envelope.decision.target.ticker:
673            logger.warning(
674                "trading_intelligence: skipping publish (empty ticker, verdict=%s)",
675                envelope.decision.synthesis.verdict,
676            )
677            return
678
679        import redis.asyncio as aioredis
680
681        redis_url = self.config.redis_url
682        redis_client = aioredis.from_url(redis_url)
683        try:
684            stream_id = await publish_envelope(redis_client, output_stream, envelope)
685            hash_key = await publish_decision_hash(redis_client, envelope)
686            logger.info(
687                "trading_intelligence: published envelope stream=%s id=%s hash=%s "
688                "ticker=%s verdict=%s confidence=%.2f",
689                output_stream, stream_id, hash_key,
690                envelope.decision.target.ticker,
691                envelope.decision.synthesis.verdict,
692                envelope.decision.synthesis.confidence,
693            )
694        except Exception:
695            logger.exception(
696                "trading_intelligence: envelope publish failed (stream=%s)", output_stream,
697            )
698        finally:
699            try:
700                await redis_client.aclose()
701            except AttributeError:
702                # older redis-py used .close()
703                await redis_client.close()
704
705    @staticmethod
706    def _parse_schedule(schedule: str) -> int:
707        """Parse a schedule string to interval in seconds."""
708        s = schedule.strip()
709        # Simple interval: "60", "300", "3600"
710        try:
711            return int(s)
712        except ValueError:
713            pass
714        # Interval with unit: "5m", "1h", "30s"
715        if s.endswith("s"):
716            return int(s[:-1])
717        if s.endswith("m"):
718            return int(s[:-1]) * 60
719        if s.endswith("h"):
720            return int(s[:-1]) * 3600
721        # Cron expression — default to 1 hour for now
722        # TODO: integrate croniter for proper cron scheduling
723        logger.warning("Cron scheduling not yet implemented, defaulting to 3600s: %s", s)
724        return 3600