1"""MAF Application — wraps StreamMachine App with arena lifecycle management.""" 2 3from __future__ import annotations 4 5import asyncio 6import logging 7import os 8from pathlib import Path 9from typing import Any 10 11from maf.config import AppConfig, ArenaConfig, ModuleConfig, load_config 12from maf.core.arena import Arena 13from maf.core.state import ArenaState 14from maf.llm.factory import LLMFactory 15from maf.memory.store import MemoryStore 16from maf.modules.base import DataModule 17from maf.sources.registry import SourceRegistry 18from maf.actions.decisions import ( 19 DecisionOutbox, 20 build_decision_from_state, 21) 22from maf.actions.outbox import ActionOutbox, build_action_from_state 23from maf.streaming.bus import EventBus, install_default_bus 24 25logger = logging.getLogger(__name__) 26 27 28# --------------------------------------------------------------------------- 29# Module registry — maps module names to classes 30# --------------------------------------------------------------------------- 31 32_MODULE_REGISTRY: dict[str, type[DataModule]] = {} 33 34 35def register_module(name: str, cls: type[DataModule]) -> None: 36 """Register a data module class.""" 37 _MODULE_REGISTRY[name] = cls 38 39 40def _register_defaults() -> None: 41 """Register built-in agent roles and data modules.""" 42 from maf.core.arena import register_role 43 44 # Agent roles 45 from maf.agents.analyst import AnalystAgent 46 from maf.agents.debater import DebaterAgent 47 from maf.agents.judge import JudgeAgent 48 from maf.agents.executor import ExecutorAgent 49 from maf.agents.replan import ReplanAgent 50 from maf.agents.watcher import WatcherAgent 51 from maf.agents.specialist import SpecialistAgent 52 from maf.agents.synthesis import SynthesisAgent 53 54 register_role("analyst", AnalystAgent) 55 register_role("specialist", SpecialistAgent) 56 register_role("debater", DebaterAgent) 57 register_role("judge", JudgeAgent) 58 register_role("executor", ExecutorAgent) 59 register_role("watcher", WatcherAgent) 60 # synthesis role maps to the judge slot — it reads all agent_signals 61 register_role("synthesis", SynthesisAgent) 62 # replan: confidence-gated re-run controller. See agents/replan.py. 63 register_role("replan", ReplanAgent) 64 65 # Data modules (each registers a family of adapters) 66 from maf.modules.fomo2_module import Fomo2Module 67 from maf.modules.kronos_module import KronosModule 68 from maf.modules.mirofish_module import MirofishModule 69 from maf.modules.oddsoddy_module import OddsoddyModule 70 from maf.modules.polymarket_module import PolymarketModule 71 from maf.modules.tradingagents_module import TradingAgentsModule 72 from maf.modules.trtools2_module import Trtools2Module 73 from maf.modules.web_module import WebModule 74 75 register_module("fomo2", Fomo2Module) 76 register_module("trtools2", Trtools2Module) 77 register_module("kronos", KronosModule) 78 register_module("web", WebModule) 79 register_module("mirofish", MirofishModule) 80 register_module("oddsoddy", OddsoddyModule) 81 register_module("polymarket", PolymarketModule) 82 register_module("tradingagents", TradingAgentsModule) 83 84 85def _init_modules(config: AppConfig) -> list[DataModule]: 86 """Initialize and register all data modules. 87 88 If no modules are explicitly configured, all built-in modules are 89 loaded with default config. This ensures backward compatibility. 90 """ 91 modules: list[DataModule] = [] 92 93 if config.modules: 94 # Explicit module configuration 95 for mod_cfg in config.modules: 96 if not mod_cfg.enabled: 97 logger.info("Module %r disabled, skipping", mod_cfg.name) 98 continue 99 cls = _MODULE_REGISTRY.get(mod_cfg.name) 100 if cls is None: 101 logger.warning("Unknown module %r, skipping", mod_cfg.name) 102 continue 103 # Inject redis_url from app config if not set in module config 104 mod_config = dict(mod_cfg.config) 105 if "redis_url" not in mod_config: 106 mod_config["redis_url"] = config.redis_url 107 module = cls(config=mod_config) 108 module.register() 109 modules.append(module) 110 else: 111 # Default: load all registered modules 112 for name, cls in _MODULE_REGISTRY.items(): 113 module = cls(config={"redis_url": config.redis_url}) 114 module.register() 115 modules.append(module) 116 117 logger.info( 118 "Loaded %d data modules: %s", 119 len(modules), [m.module_name for m in modules], 120 ) 121 return modules 122 123 124class MAFApp: 125 """Multi-Agent Framework application. 126 127 Manages arena lifecycle: loads configs, creates arenas, schedules runs 128 via StreamMachine timers, and monitors execution. 129 """ 130 131 def __init__(self, config: AppConfig | None = None) -> None: 132 _register_defaults() 133 134 self.config = config or load_config() 135 136 # Install the process-wide EventBus before anything else touches the 137 # arena layer — so every arena run, even ones triggered directly from 138 # code, emits lifecycle events to the realtime stream. 139 self._event_bus: EventBus = install_default_bus( 140 redis_url=self.config.redis_url, 141 stream=self.config.streams.events_stream, 142 ) 143 # Action outbox — built lazily on first use to avoid forcing redis on 144 # tests that only import MAFApp. 145 self._action_outbox: ActionOutbox | None = None 146 # Generic-decision outbox for non-trading arenas (target_key != "ticker"). 147 self._decision_outbox: DecisionOutbox | None = None 148 # Shared DecisionMemory — used by mastermind arena runs *and* by the 149 # ExecutionHarvester so outcome events can update the originating 150 # decision. Built lazily so tests don't pay the chromadb import cost. 151 self._decision_memory: Any = None 152 153 self._modules = _init_modules(self.config) 154 self._arenas: dict[str, Arena] = {} 155 self._sm_app: Any = None 156 self._control_task: asyncio.Task[Any] | None = None 157 158 # Build arenas from config 159 for arena_cfg in self.config.arenas: 160 self._arenas[arena_cfg.name] = self._build_arena(arena_cfg) 161 162 def _build_arena(self, cfg: ArenaConfig) -> Arena: 163 """Create an Arena with its dependencies.""" 164 # LLM factory 165 llm_factory = LLMFactory(cfg.llm) 166 167 # Source registry 168 sources = SourceRegistry() 169 sources.bind_all(cfg.sources) 170 171 # Memory store 172 memory = MemoryStore(cfg.memory, data_dir=self.config.data_dir) 173 174 return Arena( 175 config=cfg, 176 llm_factory=llm_factory, 177 source_registry=sources, 178 memory_store=memory, 179 ) 180 181 @property 182 def modules(self) -> list[DataModule]: 183 """Get loaded data modules.""" 184 return self._modules 185 186 def get_module(self, name: str) -> DataModule | None: 187 """Get a loaded module by name.""" 188 for m in self._modules: 189 if m.module_name == name: 190 return m 191 return None 192 193 def get_arena(self, name: str) -> Arena | None: 194 """Get an arena by name.""" 195 return self._arenas.get(name) 196 197 def list_arenas(self) -> list[str]: 198 """List all registered arena names.""" 199 return list(self._arenas) 200 201 @property 202 def decision_memory(self) -> Any: 203 """Lazy-built shared :class:`DecisionMemory`. 204 205 Sized to be safe-to-call from anywhere — first call pays the chromadb 206 import; subsequent calls return the same instance so writes from one 207 path (mastermind arena run) are visible to readers in another 208 (ExecutionHarvester.add_outcome). 209 """ 210 if self._decision_memory is None: 211 try: 212 from maf.arenas.mastermind.memory import DecisionMemory 213 except ImportError: 214 logger.warning("MAFApp.decision_memory: mastermind not importable") 215 return None 216 self._decision_memory = DecisionMemory( 217 name="mastermind", 218 persist_path=f"{self.config.data_dir}/decision_memory_mastermind", 219 ) 220 return self._decision_memory 221 222 async def run_arena( 223 self, 224 name: str, 225 target: dict[str, Any] | None = None, 226 *, 227 correlation_id: str | None = None, 228 emit_action: bool = False, 229 action_mode: str = "manual", 230 ) -> ArenaState: 231 """Run a specific arena and return the result. 232 233 ``emit_action`` set true publishes a :class:`TradingAction` to the 234 action outbox after the run completes. Useful for the control plane 235 and scheduled timer paths so a downstream engine sees a clean, 236 typed action payload. 237 """ 238 arena = self._arenas.get(name) 239 if not arena: 240 raise KeyError(f"Arena {name!r} not found. Available: {self.list_arenas()}") 241 state = await arena.run(target=target, correlation_id=correlation_id) 242 243 if emit_action: 244 await self._publish_arena_output(name, state, action_mode=action_mode) 245 return state 246 247 async def _publish_arena_output( 248 self, 249 arena_name: str, 250 state: ArenaState, 251 *, 252 action_mode: str = "manual", 253 ) -> None: 254 """Dispatch to the right outbox based on the arena's ``target_key``. 255 256 Trading arenas (target_key="ticker") publish a TradingAction to 257 ``maf:actions:out``; any other arena publishes a GenericDecision to 258 ``maf:decisions:out``. Backwards compatible: the existing 259 :meth:`_publish_action` keeps working and is still the path used 260 by trading arenas. 261 262 Additionally, every arena run emits a unified 263 :class:`maf.actions.prognosis.Prognosis` envelope to that arena's 264 ``maf:arena:{name}:output`` stream. The Prognosis carries the 265 agnostic-arena fields (``ideas``, ``alternatives_considered``, 266 typed target, key_factors) that the GenericDecision/TradingAction 267 wire shapes don't have room for — the dashboard and external 268 consumers read this single shape across all arenas. 269 """ 270 arena = self._arenas.get(arena_name) 271 target_key = (arena.config.target_key if arena else "ticker") or "ticker" 272 273 # Publish to the legacy outboxes (backwards compatibility). 274 if target_key == "ticker": 275 await self._publish_action( 276 arena_name, state, action_mode=action_mode, 277 ) 278 else: 279 await self._publish_decision( 280 arena_name, state, target_key=target_key, 281 ) 282 283 # Always emit Prognosis to the arena's dedicated output stream. 284 await self._publish_prognosis(arena_name, state) 285 286 async def _publish_prognosis( 287 self, 288 arena_name: str, 289 state: ArenaState, 290 ) -> None: 291 """Build + publish a Prognosis envelope. Logs + swallows errors. 292 293 Prognosis is the unified shape every arena emits — verdict 294 (BUY/SELL/HOLD/None) plus a free-form statement, key factors, 295 ideas shortlist, alternatives considered, data quality. 296 297 Published to ``<arena.output_stream>:prognosis`` (a dedicated 298 Prognosis-only stream) so the legacy ``output_stream`` shape 299 is preserved unchanged for existing consumers. New consumers 300 subscribe to the ``:prognosis`` suffix to get the agnostic 301 envelope. 302 """ 303 try: 304 from maf.actions.prognosis import from_arena_state 305 306 arena = self._arenas.get(arena_name) 307 base_stream = ( 308 arena.config.output_stream if arena else None 309 ) or f"maf:arena:{arena_name}:output" 310 # Distinct stream — does not collide with the legacy thin 311 # payload some scheduled arenas write to base_stream. 312 stream = f"{base_stream}:prognosis" 313 314 prognosis = from_arena_state(arena_name, dict(state)) 315 payload = prognosis.model_dump_json() 316 317 import redis.asyncio as aioredis 318 client = aioredis.from_url(self.config.redis_url) 319 try: 320 stream_id = await client.xadd( 321 stream, {"data": payload}, maxlen=50_000, approximate=True, 322 ) 323 finally: 324 ac = getattr(client, "aclose", None) 325 if ac is not None: 326 await ac() 327 else: 328 await client.close() 329 330 logger.debug( 331 "Prognosis published: arena=%s stream=%s verdict=%s ideas=%d alts=%d", 332 arena_name, stream, prognosis.verdict, 333 len(prognosis.ideas), len(prognosis.alternatives_considered), 334 ) 335 await self._event_bus.publish( 336 "decision.emit", 337 arena=arena_name, 338 arena_id=prognosis.arena_id, 339 correlation_id=prognosis.correlation_id, 340 payload={ 341 "kind": "prognosis", 342 "stream": stream, 343 "verdict": prognosis.verdict, 344 "statement": prognosis.statement[:200], 345 "ideas_count": len(prognosis.ideas), 346 "alternatives_count": len(prognosis.alternatives_considered), 347 "stream_id": stream_id.decode() if isinstance(stream_id, bytes) else str(stream_id), 348 }, 349 ) 350 except Exception: 351 logger.exception( 352 "Prognosis publish failed (arena=%s) — falling back to " 353 "legacy envelopes only", arena_name, 354 ) 355 356 async def _publish_decision( 357 self, 358 arena_name: str, 359 state: ArenaState, 360 *, 361 target_key: str, 362 ) -> None: 363 """Build + publish a GenericDecision. Logs+swallows errors.""" 364 try: 365 decision = build_decision_from_state( 366 dict(state), arena=arena_name, target_key=target_key, 367 ) 368 if decision is None: 369 logger.debug( 370 "Decision outbox: skipping (no %r in target for arena=%s)", 371 target_key, arena_name, 372 ) 373 return 374 if self._decision_outbox is None: 375 self._decision_outbox = DecisionOutbox( 376 redis_url=self.config.redis_url, 377 ) 378 stream_id = await self._decision_outbox.publish(decision) 379 logger.info( 380 "Decision published: arena=%s target_key=%s target=%s " 381 "verdict=%s stream_id=%s", 382 arena_name, target_key, decision.target, 383 decision.verdict, stream_id, 384 ) 385 await self._event_bus.publish( 386 "decision.emit", 387 arena=arena_name, 388 arena_id=decision.arena_id, 389 correlation_id=decision.correlation_id, 390 payload={ 391 "kind": "generic_decision", 392 "target_key": target_key, 393 "target": decision.target, 394 "verdict": decision.verdict, 395 "confidence": decision.confidence, 396 "stream_id": stream_id, 397 }, 398 ) 399 except Exception: 400 logger.exception( 401 "Decision publish failed (arena=%s)", arena_name, 402 ) 403 404 async def _publish_action( 405 self, 406 arena_name: str, 407 state: ArenaState, 408 *, 409 action_mode: str = "manual", 410 ) -> None: 411 """Build + publish a TradingAction. Logs+swallows errors.""" 412 try: 413 action = build_action_from_state( 414 dict(state), arena=arena_name, mode=action_mode, # type: ignore[arg-type] 415 ) 416 if action is None: 417 logger.debug( 418 "Action outbox: skipping (no ticker in state for arena=%s)", 419 arena_name, 420 ) 421 return 422 if self._action_outbox is None: 423 self._action_outbox = ActionOutbox( 424 redis_url=self.config.redis_url, 425 stream=self.config.streams.actions_out, 426 ) 427 stream_id = await self._action_outbox.publish(action) 428 logger.info( 429 "Action published: ticker=%s verdict=%s mode=%s stream_id=%s", 430 action.target.ticker, action.verdict, action.mode, stream_id, 431 ) 432 await self._event_bus.publish( 433 "action.emit", 434 arena=arena_name, 435 arena_id=action.arena_id, 436 correlation_id=action.correlation_id, 437 payload={ 438 "ticker": action.target.ticker, 439 "verdict": action.verdict, 440 "mode": action.mode, 441 "confidence": action.sizing.confidence, 442 "size_fraction": action.sizing.size_fraction, 443 "stream_id": stream_id, 444 }, 445 ) 446 except Exception: 447 logger.exception("Action publish failed (arena=%s)", arena_name) 448 449 def start(self) -> None: 450 """Start the MAF application with StreamMachine integration. 451 452 Registers all scheduled arenas as StreamMachine timers, starts the 453 inbound control plane consumer on ``maf:control:in``, and starts the 454 event loop. 455 """ 456 try: 457 from streammachine import App 458 from streammachine.models import TimerConfig 459 except ImportError: 460 logger.error("streammachine is required to run MAF as a service") 461 raise 462 463 self._sm_app = App(name=self.config.name, to_scan=False) 464 465 # Inbound control plane — register as a StreamMachine timer that fires 466 # the consumer loop once on startup. ``ControlInbox.run`` is 467 # long-running; we kick it off as a fire-and-forget asyncio task. 468 from maf.control.inbox import ControlInbox 469 470 inbox = ControlInbox(self) 471 472 async def _start_control_inbox() -> None: 473 if self._control_task is None or self._control_task.done(): 474 self._control_task = asyncio.create_task(inbox.run()) 475 logger.info("ControlInbox task scheduled") 476 477 # StreamMachine doesn't expose a clean "on_startup" hook in our 478 # version, so we register a one-shot timer with a tiny interval and 479 # let it exit fast. Re-arming is idempotent thanks to the 480 # ``done()`` check above. 481 self._sm_app.registry.add(TimerConfig( 482 interval=15, 483 obj_name="maf_control_inbox", 484 mod=__name__, 485 fn=_start_control_inbox, 486 )) 487 488 # ── Phase 2 workers ─────────────────────────────────────────────── 489 # Kronos refresher, Mirofish refresher, and TriggerDispatcher all 490 # follow the same idempotent-launch pattern as ControlInbox. They 491 # cost zero when the watch list is empty / no triggers fire. 492 493 from maf.scheduler.kronos_refresher import KronosRefresher 494 from maf.scheduler.mirofish_refresher import MirofishRefresher 495 from maf.triggers.dispatcher import TriggerDispatcher 496 from maf.triggers.dispatcher import register_arena_triggers 497 498 self._kronos_task: asyncio.Task[Any] | None = None 499 self._mirofish_task: asyncio.Task[Any] | None = None 500 self._dispatcher_task: asyncio.Task[Any] | None = None 501 502 kronos_module = self.get_module("kronos") 503 kronos_sidecar = ( 504 kronos_module.config.get("sidecar_url") 505 if kronos_module else None 506 ) 507 kronos = KronosRefresher( 508 sidecar_url=kronos_sidecar, 509 redis_url=self.config.redis_url, 510 ) 511 mirofish_module = self.get_module("mirofish") 512 mirofish_url = ( 513 mirofish_module.config.get("base_url") 514 if mirofish_module else None 515 ) 516 mirofish_enabled = bool(mirofish_module) 517 mirofish = MirofishRefresher( 518 mirofish_url=mirofish_url, 519 redis_url=self.config.redis_url, 520 ) if mirofish_enabled else None 521 522 all_rules: list[Any] = [] 523 for arena_name, arena in self._arenas.items(): 524 block = [t.model_dump() for t in (arena.config.triggers or [])] 525 all_rules.extend(register_arena_triggers(arena_name, block)) 526 dispatcher = TriggerDispatcher( 527 all_rules, 528 redis_url=self.config.redis_url, 529 cost_cap_eur_per_hour=self.config.max_cost_per_hour_eur, 530 ) if all_rules else None 531 532 async def _start_workers() -> None: 533 if self._kronos_task is None or self._kronos_task.done(): 534 self._kronos_task = asyncio.create_task(kronos.run()) 535 logger.info("KronosRefresher task scheduled") 536 if mirofish and ( 537 self._mirofish_task is None or self._mirofish_task.done() 538 ): 539 self._mirofish_task = asyncio.create_task(mirofish.run()) 540 logger.info("MirofishRefresher task scheduled") 541 if dispatcher and ( 542 self._dispatcher_task is None or self._dispatcher_task.done() 543 ): 544 self._dispatcher_task = asyncio.create_task(dispatcher.run()) 545 logger.info( 546 "TriggerDispatcher task scheduled (%d rules)", 547 len(all_rules), 548 ) 549 550 self._sm_app.registry.add(TimerConfig( 551 interval=15, 552 obj_name="maf_phase2_workers", 553 mod=__name__, 554 fn=_start_workers, 555 )) 556 557 # Register scheduled arenas as timers 558 for arena_name, arena in self._arenas.items(): 559 schedule = arena.config.schedule 560 if not schedule: 561 continue 562 563 # Create timer handler for this arena 564 async def _timer_handler( 565 _name: str = arena_name, 566 ) -> None: 567 try: 568 state = await self.run_arena(_name) 569 signal = state.get("signal", "") 570 logger.info("Arena %s completed: signal=%s", _name, signal) 571 572 # Publish output to Redis stream if configured. 573 # 574 # Per-arena dispatch: trading_intelligence publishes a 575 # rich :class:`TradingIntelligenceEnvelope` (full agent 576 # signals + synthesis + source-metrics summary + per- 577 # (ticker, date) lookup hash) so downstream services 578 # like the trtools2 engine can build real strategies 579 # against a typed contract. Other arenas keep the 580 # legacy thin {arena, signal, target} payload until 581 # they grow their own envelope (crowd_simulation and 582 # mastermind already have theirs and publish from 583 # inside their own emit phases). 584 output_stream = self._arenas[_name].config.output_stream 585 if not output_stream or not self._sm_app: 586 return 587 588 # trading_intelligence still emits its rich envelope 589 # on top of the standard publish path. 590 if _name == "trading_intelligence": 591 await self._publish_trading_intelligence_envelope( 592 state, output_stream, 593 ) 594 else: 595 # Preserve the legacy thin payload on the arena's 596 # output_stream so existing consumers of scheduled 597 # arena output keep working. The Prognosis envelope 598 # rides on a separate :prognosis suffix stream — see 599 # _publish_prognosis. 600 await self._sm_app.send(output_stream, { 601 "arena": _name, 602 "signal": signal, 603 "target": str(state.get("target", {})), 604 }) 605 606 # Every scheduled arena ALSO flows through the unified 607 # publish path: TradingAction OR GenericDecision on 608 # the legacy outboxes (back-compat), PLUS a Prognosis 609 # envelope on a distinct stream — so the new ideas / 610 # alternatives_considered fields reach consumers on 611 # both the ad-hoc /run path and the cron path. 612 sched_mode = os.environ.get( 613 "MAF_SCHEDULED_ACTION_MODE", "manual", 614 ) 615 await self._publish_arena_output( 616 _name, state, action_mode=sched_mode, 617 ) 618 except Exception: 619 logger.exception("Arena %s failed", _name) 620 621 # Parse schedule — could be cron or interval in seconds 622 interval = self._parse_schedule(schedule) 623 self._sm_app.registry.add(TimerConfig( 624 interval=interval, 625 obj_name=f"arena_{arena_name}", 626 mod=__name__, 627 fn=_timer_handler, 628 )) 629 logger.info("Registered arena %r with interval %ds", arena_name, interval) 630 631 logger.info("MAF starting with %d arenas", len(self._arenas)) 632 self._sm_app.start() 633 634 async def _publish_trading_intelligence_envelope( 635 self, 636 state: Any, 637 output_stream: str, 638 ) -> None: 639 """Build + publish a rich envelope for the trading_intelligence arena. 640 641 Lazy imports keep the rest of MAFApp free of trading_intelligence 642 coupling — other arenas keep working when this module fails to 643 import (e.g. during partial test bootstrap). 644 645 Publishes via two raw-redis paths (not StreamMachine.send): 646 1. ``XADD <output_stream> data <envelope.model_dump_json()>`` 647 2. ``HSET maf:arena:trading_intelligence:decisions:{ticker}:{date}`` 648 with the same JSON + flat lookup fields, so trtools2 can answer 649 "what does MAF think about NVDA right now?" without tailing. 650 651 Failures are logged but do not propagate — the arena run itself 652 already succeeded; the publish is best-effort. 653 """ 654 try: 655 from maf.arenas.trading_intelligence.stream import ( 656 build_envelope_from_state, 657 publish_decision_hash, 658 publish_envelope, 659 ) 660 except ImportError: 661 logger.exception("trading_intelligence stream module unavailable — skipping envelope publish") 662 return 663 664 try: 665 envelope = build_envelope_from_state(state) 666 except Exception: 667 logger.exception("trading_intelligence: envelope build failed") 668 return 669 670 # Skip publish if degraded run with no ticker — nothing meaningful 671 # to write, and it would create stream pollution. 672 if not envelope.decision.target.ticker: 673 logger.warning( 674 "trading_intelligence: skipping publish (empty ticker, verdict=%s)", 675 envelope.decision.synthesis.verdict, 676 ) 677 return 678 679 import redis.asyncio as aioredis 680 681 redis_url = self.config.redis_url 682 redis_client = aioredis.from_url(redis_url) 683 try: 684 stream_id = await publish_envelope(redis_client, output_stream, envelope) 685 hash_key = await publish_decision_hash(redis_client, envelope) 686 logger.info( 687 "trading_intelligence: published envelope stream=%s id=%s hash=%s " 688 "ticker=%s verdict=%s confidence=%.2f", 689 output_stream, stream_id, hash_key, 690 envelope.decision.target.ticker, 691 envelope.decision.synthesis.verdict, 692 envelope.decision.synthesis.confidence, 693 ) 694 except Exception: 695 logger.exception( 696 "trading_intelligence: envelope publish failed (stream=%s)", output_stream, 697 ) 698 finally: 699 try: 700 await redis_client.aclose() 701 except AttributeError: 702 # older redis-py used .close() 703 await redis_client.close() 704 705 @staticmethod 706 def _parse_schedule(schedule: str) -> int: 707 """Parse a schedule string to interval in seconds.""" 708 s = schedule.strip() 709 # Simple interval: "60", "300", "3600" 710 try: 711 return int(s) 712 except ValueError: 713 pass 714 # Interval with unit: "5m", "1h", "30s" 715 if s.endswith("s"): 716 return int(s[:-1]) 717 if s.endswith("m"): 718 return int(s[:-1]) * 60 719 if s.endswith("h"): 720 return int(s[:-1]) * 3600 721 # Cron expression — default to 1 hour for now 722 # TODO: integrate croniter for proper cron scheduling 723 logger.warning("Cron scheduling not yet implemented, defaulting to 3600s: %s", s) 724 return 3600