checking system…
Docs / back / src/maf/sources/adapters/kronos_forecast.py · line 76
Python · 198 lines
  1"""Kronos forecast adapter — reads the cached Redis key, never calls torch.
  2
  3What it reads
  4-------------
  5``kronos:forecast:{symbol}:{timeframe}`` (JSON-encoded summary written by the
  6refresher). The shape is::
  7
  8    {
  9      "schema_version": "1",
 10      "symbol":       "SPY",
 11      "timeframe":    "1m",
 12      "model":        "NeoQuasar/Kronos-small",
 13      "generated_at": "2026-05-15T08:22:11+00:00",
 14      "horizon_min":  60,
 15      "summary": {
 16        "direction":         "BULLISH" | "BEARISH" | "NEUTRAL",
 17        "prob_up":           0.62,
 18        "exp_return_pct":    0.31,
 19        "vol_estimate_pct":  0.85,
 20        "sample_count":      5
 21      },
 22      "forecast": [           # optional — full per-bar predictions
 23        {"ts": "...", "open": 1.0, "high": ..., "low": ..., "close": ...},
 24        ...
 25      ]
 26    }
 27
 28Why no HTTP / no torch here
 29---------------------------
 30The MAF process has *zero* torch dep. The sidecar service writes the cache
 31(see :mod:`maf.scheduler.kronos_refresher`); the adapter only reads. Specialists
 32in arenas wire this adapter and get a near-zero-latency forecast.
 33
 34Staleness handling
 35------------------
 36The adapter returns ``{"stale": True, "age_seconds": …}`` and adds a
 37``stale_kronos_forecast`` marker to ``raw_data.key_factors`` when the cached
 38value is older than ``stale_after_seconds`` (default: 5× refresh cadence).
 39:class:`maf.agents.replan.ReplanAgent` already treats ``stale_*`` and
 40``no_*`` markers as a data gap → forces a re-run.
 41"""
 42
 43from __future__ import annotations
 44
 45import json
 46import logging
 47import os
 48import time
 49from datetime import datetime
 50from typing import Any
 51
 52from maf.sources.base import BaseSource
 53
 54logger = logging.getLogger(__name__)
 55
 56
 57DEFAULT_KEY_TEMPLATE = "kronos:forecast:{symbol}:{timeframe}"
 58
 59# How old the cached forecast can be before it's flagged stale. The refresher
 60# defaults are 60 s for 1m / 5 min for 1h, so 5× covers worst-case downtime
 61# without making "no recent refresh" indistinguishable from "service dead".
 62_STALE_MULTIPLIER = 5
 63
 64
 65class KronosForecastSource(BaseSource):
 66    """Read the cached Kronos forecast for a (symbol, timeframe).
 67
 68    Config keys (all overridable per-call via ``params``):
 69      symbol            target symbol (uppercased)
 70      timeframe         "1m" / "1h" / ...
 71      stale_after_seconds  override the default freshness budget
 72      include_forecast  bool; default False — only return the summary block
 73                         to keep prompts small. Full per-bar series is large.
 74    """
 75
 76    adapter_name = "kronos_forecast"
 77
 78    @classmethod
 79    def freshness_spec(cls, binding_config: dict[str, Any]) -> dict[str, Any]:
 80        tf = binding_config.get("timeframe") or "1m"
 81        return {
 82            "type": "key",
 83            "key_pattern": f"kronos:forecast:*:{tf}",
 84            "emit_stream": "kronos:forecasts:emitted",
 85        }
 86
 87    def __init__(self, config: dict[str, Any]) -> None:
 88        super().__init__(config)
 89        self._redis: Any = None
 90
 91    async def _get_redis(self) -> Any:
 92        if self._redis is None:
 93            import redis.asyncio as aioredis
 94            self._redis = aioredis.from_url(
 95                self.config.get(
 96                    "redis_url",
 97                    os.environ.get("REDIS_URL", "redis://localhost:6379/0"),
 98                )
 99            )
100        return self._redis
101
102    async def fetch(self, params: dict[str, Any] | None = None) -> dict[str, Any]:
103        p = {**self.config, **(params or {})}
104        symbol = str(p.get("symbol") or "").upper().strip()
105        timeframe = str(p.get("timeframe") or "1m").strip()
106        if not symbol:
107            return {
108                "type": "kronos_forecast",
109                "error": "missing required param 'symbol'",
110                "key_factors": ["no_kronos_forecast"],
111            }
112        # Per-timeframe default staleness budget (refresh cadence * multiplier)
113        default_cadence = 60 if timeframe == "1m" else 300
114        stale_after = int(p.get("stale_after_seconds") or default_cadence * _STALE_MULTIPLIER)
115        include_forecast = bool(p.get("include_forecast", False))
116
117        key = DEFAULT_KEY_TEMPLATE.format(symbol=symbol, timeframe=timeframe)
118        try:
119            client = await self._get_redis()
120            raw = await client.get(key)
121        except Exception as exc:
122            return {
123                "type": "kronos_forecast",
124                "symbol": symbol,
125                "timeframe": timeframe,
126                "error": f"redis: {type(exc).__name__}: {exc}",
127                "stale": True,
128                "key_factors": ["no_kronos_forecast"],
129            }
130
131        if not raw:
132            return {
133                "type": "kronos_forecast",
134                "symbol": symbol,
135                "timeframe": timeframe,
136                "stale": True,
137                "missing": True,
138                "key": key,
139                "key_factors": ["no_kronos_forecast"],
140                "message": (
141                    f"no cached forecast for ({symbol}, {timeframe}); "
142                    "is the kronos refresher running and watching this symbol?"
143                ),
144            }
145
146        if isinstance(raw, bytes):
147            raw = raw.decode("utf-8")
148        try:
149            payload = json.loads(raw)
150        except (json.JSONDecodeError, TypeError) as exc:
151            return {
152                "type": "kronos_forecast",
153                "symbol": symbol,
154                "timeframe": timeframe,
155                "error": f"decode: {exc}",
156                "stale": True,
157                "key_factors": ["no_kronos_forecast"],
158            }
159
160        # Compute age. ``generated_at`` is ISO-8601 written by the refresher.
161        age = _age_seconds(payload.get("generated_at"))
162        is_stale = age is None or age > stale_after
163
164        out: dict[str, Any] = {
165            "type": "kronos_forecast",
166            "symbol": payload.get("symbol", symbol),
167            "timeframe": payload.get("timeframe", timeframe),
168            "model": payload.get("model", ""),
169            "horizon_min": payload.get("horizon_min"),
170            "generated_at": payload.get("generated_at", ""),
171            "age_seconds": age,
172            "stale": is_stale,
173            "summary": payload.get("summary") or {},
174        }
175        if is_stale:
176            out["key_factors"] = ["stale_kronos_forecast"]
177            out["stale_after_seconds"] = stale_after
178        if include_forecast:
179            # Cap the size of full forecasts so we don't blow context budgets.
180            forecast = payload.get("forecast") or []
181            out["forecast"] = forecast[:240]
182            out["forecast_truncated"] = len(forecast) > 240
183        return out
184
185
186def _age_seconds(generated_at: Any) -> float | None:
187    """Compute seconds between ``generated_at`` and now. None on parse failure."""
188    if not generated_at:
189        return None
190    try:
191        dt = datetime.fromisoformat(str(generated_at).replace("Z", "+00:00"))
192    except (TypeError, ValueError):
193        return None
194    return max(0.0, time.time() - dt.timestamp())
195
196
197__all__ = ["DEFAULT_KEY_TEMPLATE", "KronosForecastSource"]