1"""Kronos forecast adapter — reads the cached Redis key, never calls torch. 2 3What it reads 4------------- 5``kronos:forecast:{symbol}:{timeframe}`` (JSON-encoded summary written by the 6refresher). The shape is:: 7 8 { 9 "schema_version": "1", 10 "symbol": "SPY", 11 "timeframe": "1m", 12 "model": "NeoQuasar/Kronos-small", 13 "generated_at": "2026-05-15T08:22:11+00:00", 14 "horizon_min": 60, 15 "summary": { 16 "direction": "BULLISH" | "BEARISH" | "NEUTRAL", 17 "prob_up": 0.62, 18 "exp_return_pct": 0.31, 19 "vol_estimate_pct": 0.85, 20 "sample_count": 5 21 }, 22 "forecast": [ # optional — full per-bar predictions 23 {"ts": "...", "open": 1.0, "high": ..., "low": ..., "close": ...}, 24 ... 25 ] 26 } 27 28Why no HTTP / no torch here 29--------------------------- 30The MAF process has *zero* torch dep. The sidecar service writes the cache 31(see :mod:`maf.scheduler.kronos_refresher`); the adapter only reads. Specialists 32in arenas wire this adapter and get a near-zero-latency forecast. 33 34Staleness handling 35------------------ 36The adapter returns ``{"stale": True, "age_seconds": …}`` and adds a 37``stale_kronos_forecast`` marker to ``raw_data.key_factors`` when the cached 38value is older than ``stale_after_seconds`` (default: 5× refresh cadence). 39:class:`maf.agents.replan.ReplanAgent` already treats ``stale_*`` and 40``no_*`` markers as a data gap → forces a re-run. 41""" 42 43from __future__ import annotations 44 45import json 46import logging 47import os 48import time 49from datetime import datetime 50from typing import Any 51 52from maf.sources.base import BaseSource 53 54logger = logging.getLogger(__name__) 55 56 57DEFAULT_KEY_TEMPLATE = "kronos:forecast:{symbol}:{timeframe}" 58 59# How old the cached forecast can be before it's flagged stale. The refresher 60# defaults are 60 s for 1m / 5 min for 1h, so 5× covers worst-case downtime 61# without making "no recent refresh" indistinguishable from "service dead". 62_STALE_MULTIPLIER = 5 63 64 65class KronosForecastSource(BaseSource): 66 """Read the cached Kronos forecast for a (symbol, timeframe). 67 68 Config keys (all overridable per-call via ``params``): 69 symbol target symbol (uppercased) 70 timeframe "1m" / "1h" / ... 71 stale_after_seconds override the default freshness budget 72 include_forecast bool; default False — only return the summary block 73 to keep prompts small. Full per-bar series is large. 74 """ 75 76 adapter_name = "kronos_forecast" 77 78 @classmethod 79 def freshness_spec(cls, binding_config: dict[str, Any]) -> dict[str, Any]: 80 tf = binding_config.get("timeframe") or "1m" 81 return { 82 "type": "key", 83 "key_pattern": f"kronos:forecast:*:{tf}", 84 "emit_stream": "kronos:forecasts:emitted", 85 } 86 87 def __init__(self, config: dict[str, Any]) -> None: 88 super().__init__(config) 89 self._redis: Any = None 90 91 async def _get_redis(self) -> Any: 92 if self._redis is None: 93 import redis.asyncio as aioredis 94 self._redis = aioredis.from_url( 95 self.config.get( 96 "redis_url", 97 os.environ.get("REDIS_URL", "redis://localhost:6379/0"), 98 ) 99 ) 100 return self._redis 101 102 async def fetch(self, params: dict[str, Any] | None = None) -> dict[str, Any]: 103 p = {**self.config, **(params or {})} 104 symbol = str(p.get("symbol") or "").upper().strip() 105 timeframe = str(p.get("timeframe") or "1m").strip() 106 if not symbol: 107 return { 108 "type": "kronos_forecast", 109 "error": "missing required param 'symbol'", 110 "key_factors": ["no_kronos_forecast"], 111 } 112 # Per-timeframe default staleness budget (refresh cadence * multiplier) 113 default_cadence = 60 if timeframe == "1m" else 300 114 stale_after = int(p.get("stale_after_seconds") or default_cadence * _STALE_MULTIPLIER) 115 include_forecast = bool(p.get("include_forecast", False)) 116 117 key = DEFAULT_KEY_TEMPLATE.format(symbol=symbol, timeframe=timeframe) 118 try: 119 client = await self._get_redis() 120 raw = await client.get(key) 121 except Exception as exc: 122 return { 123 "type": "kronos_forecast", 124 "symbol": symbol, 125 "timeframe": timeframe, 126 "error": f"redis: {type(exc).__name__}: {exc}", 127 "stale": True, 128 "key_factors": ["no_kronos_forecast"], 129 } 130 131 if not raw: 132 return { 133 "type": "kronos_forecast", 134 "symbol": symbol, 135 "timeframe": timeframe, 136 "stale": True, 137 "missing": True, 138 "key": key, 139 "key_factors": ["no_kronos_forecast"], 140 "message": ( 141 f"no cached forecast for ({symbol}, {timeframe}); " 142 "is the kronos refresher running and watching this symbol?" 143 ), 144 } 145 146 if isinstance(raw, bytes): 147 raw = raw.decode("utf-8") 148 try: 149 payload = json.loads(raw) 150 except (json.JSONDecodeError, TypeError) as exc: 151 return { 152 "type": "kronos_forecast", 153 "symbol": symbol, 154 "timeframe": timeframe, 155 "error": f"decode: {exc}", 156 "stale": True, 157 "key_factors": ["no_kronos_forecast"], 158 } 159 160 # Compute age. ``generated_at`` is ISO-8601 written by the refresher. 161 age = _age_seconds(payload.get("generated_at")) 162 is_stale = age is None or age > stale_after 163 164 out: dict[str, Any] = { 165 "type": "kronos_forecast", 166 "symbol": payload.get("symbol", symbol), 167 "timeframe": payload.get("timeframe", timeframe), 168 "model": payload.get("model", ""), 169 "horizon_min": payload.get("horizon_min"), 170 "generated_at": payload.get("generated_at", ""), 171 "age_seconds": age, 172 "stale": is_stale, 173 "summary": payload.get("summary") or {}, 174 } 175 if is_stale: 176 out["key_factors"] = ["stale_kronos_forecast"] 177 out["stale_after_seconds"] = stale_after 178 if include_forecast: 179 # Cap the size of full forecasts so we don't blow context budgets. 180 forecast = payload.get("forecast") or [] 181 out["forecast"] = forecast[:240] 182 out["forecast_truncated"] = len(forecast) > 240 183 return out 184 185 186def _age_seconds(generated_at: Any) -> float | None: 187 """Compute seconds between ``generated_at`` and now. None on parse failure.""" 188 if not generated_at: 189 return None 190 try: 191 dt = datetime.fromisoformat(str(generated_at).replace("Z", "+00:00")) 192 except (TypeError, ValueError): 193 return None 194 return max(0.0, time.time() - dt.timestamp()) 195 196 197__all__ = ["DEFAULT_KEY_TEMPLATE", "KronosForecastSource"]