checking system…
Docs / back / scripts/verify_kronos_loop.py · line 1
Python · 257 lines
  1"""Hermetic Kronos end-to-end verification.
  2
  3Proves every piece of the Kronos integration with deterministic checks,
  4in this order:
  5
  6  1. Real sidecar /forecast — torch inference produces a sensible payload.
  7  2. Refresher tick — reads bars from Redis, calls the sidecar, writes cache,
  8     emits to the change stream.
  9  3. KronosForecastSource — adapter reads the cache and returns the exact
 10     shape the kronos_specialist consumes.
 11  4. kronos_specialist with a stub LLM — agent calls the source, parses the
 12     forecast, emits a structured AgentSignal. Skips the live LLM so rate
 13     limits don't gate the test.
 14
 15Run against real Redis and a running kronos-svc::
 16
 17    services/kronos-svc/.venv/bin/python services/kronos-svc/server.py &
 18    python scripts/verify_kronos_loop.py
 19"""
 20
 21from __future__ import annotations
 22
 23import argparse
 24import asyncio
 25import json
 26import os
 27import sys
 28import uuid
 29
 30sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
 31
 32
 33_BANNER = "─" * 78
 34
 35
 36async def _seed_bars(redis_url: str, symbol: str, count: int = 80) -> None:
 37    import redis.asyncio as aioredis
 38    r = aioredis.from_url(redis_url)
 39    await r.delete("trtools2:bars:1m")
 40    price = 451.20
 41    for i in range(count):
 42        # Mix up + down so Kronos has a real signal to work with.
 43        price += 0.05 * (1 if i % 3 == 0 else -0.3)
 44        await r.xadd("trtools2:bars:1m", {
 45            "symbol": symbol,
 46            "ts": f"2026-05-16T09:{i % 60:02d}:00+00:00",
 47            "open": json.dumps(round(price, 4)),
 48            "high": json.dumps(round(price + 0.15, 4)),
 49            "low":  json.dumps(round(price - 0.15, 4)),
 50            "close": json.dumps(round(price + 0.05, 4)),
 51            "volume": json.dumps(2_000_000.0 + i * 100),
 52        })
 53    try:
 54        await r.aclose()
 55    except AttributeError:
 56        await r.close()
 57
 58
 59async def step1_sidecar_health(sidecar_url: str) -> bool:
 60    import httpx
 61    async with httpx.AsyncClient(timeout=5) as c:
 62        try:
 63            r = await c.get(f"{sidecar_url}/health")
 64        except Exception as exc:
 65            print(f"  ✗ sidecar unreachable: {exc}")
 66            return False
 67    if r.status_code != 200:
 68        print(f"  ✗ sidecar /health → HTTP {r.status_code}")
 69        return False
 70    body = r.json()
 71    print(f"  ✓ /health → model={body.get('model')}  loaded={body.get('loaded')}")
 72    return True
 73
 74
 75async def step2_refresher_tick(
 76    sidecar_url: str, redis_url: str, symbol: str,
 77) -> dict | None:
 78    from maf.scheduler.kronos_refresher import KronosRefresher
 79    from maf.watch.list import WatchList
 80    import httpx
 81
 82    wl = WatchList(redis_url=redis_url)
 83    await wl.add(symbol, ttl_seconds=600)
 84    refresher = KronosRefresher(
 85        sidecar_url=sidecar_url,
 86        redis_url=redis_url,
 87        watch_list=wl,
 88        profiles={"1m": {"cadence_s": 60, "lookback_bars": 64,
 89                          "pred_len": 30, "sample_count": 3}},
 90    )
 91    async with httpx.AsyncClient(timeout=120) as http:
 92        await refresher._tick(http)
 93    await refresher.aclose()
 94
 95    import redis.asyncio as aioredis
 96    r = aioredis.from_url(redis_url)
 97    raw = await r.get(f"kronos:forecast:{symbol}:1m")
 98    try:
 99        await r.aclose()
100    except AttributeError:
101        await r.close()
102    if not raw:
103        print(f"  ✗ no cache at kronos:forecast:{symbol}:1m")
104        return None
105    payload = json.loads(raw)
106    s = payload["summary"]
107    print(f"  ✓ cache populated: model={payload['model']}")
108    print(f"     direction={s['direction']}  prob_up={s['prob_up']}  "
109          f"exp_return%={s['exp_return_pct']}  vol%={s['vol_estimate_pct']}")
110    await wl.remove(symbol)
111    await wl.aclose()
112    return payload
113
114
115async def step3_adapter_read(redis_url: str, symbol: str) -> dict | None:
116    from maf.sources.adapters.kronos_forecast import KronosForecastSource
117    src = KronosForecastSource({"redis_url": redis_url})
118    out = await src.fetch({"symbol": symbol, "timeframe": "1m"})
119    if out.get("stale", True):
120        print(f"  ✗ adapter reports stale: {out}")
121        return None
122    print(f"  ✓ adapter read: age={out['age_seconds']:.1f}s  "
123          f"direction={out['summary']['direction']}  "
124          f"prob_up={out['summary']['prob_up']}")
125    return out
126
127
128async def step4_specialist_agent_signal(
129    redis_url: str, symbol: str,
130) -> dict | None:
131    """Run only the kronos_specialist with a stubbed LLM.
132
133    The stub emits exactly the shape the production prompt asks for, so we
134    verify the agent → AgentSignal pipeline without needing the live LLM.
135    """
136    from maf.agents.specialist import SpecialistAgent
137    from maf.config import AgentConfig
138    from maf.core.agent import AgentContext
139    from maf.sources.adapters.kronos_forecast import KronosForecastSource
140    from maf.sources.registry import SourceRegistry
141
142    # The agent calls .chat() multiple times during a ReAct loop:
143    #   turn 1: model picks a tool   → return tool_calls JSON
144    #   turn 2: model reads the result → return the structured signal
145    # The stub counts turns rather than parsing the prompt, which is robust.
146    class _StubLLM:
147        def __init__(self) -> None:
148            self.turn = 0
149
150        async def chat(self, *args, **kw) -> object:
151            self.turn += 1
152            if self.turn == 1:
153                text = (
154                    '```json\n'
155                    '{"tool_calls":[{"name":"fetch_kronos_forecast_1m",'
156                    f'"params":{{"symbol":"{symbol}","timeframe":"1m"}}}}]}}\n'
157                    '```'
158                )
159            else:
160                # Production prompt prescribes:
161                #   prob_up → confidence,  direction → signal
162                text = (
163                    '{"signal":"NEUTRAL","confidence":0.27,'
164                    '"summary":"Kronos prob_up=0.27 over 30m — leans flat with downside skew",'
165                    '"key_factors":["prob_up=0.27","direction=NEUTRAL","exp_return%=-0.01"]}'
166                    '\n---NARRATIVE---\n'
167                    f'Kronos forecast for {symbol} reports direction NEUTRAL with '
168                    'prob_up 0.27 over a 30-minute horizon. The mapping rule pins '
169                    'confidence at the prob_up value; key_factors echo the numeric prior.'
170                )
171            return type("R", (), {"text": text})()
172
173        async def chat_text(self, *args, **kw) -> str:
174            r = await self.chat(*args, **kw)
175            return r.text
176
177    cfg = AgentConfig(
178        name="kronos_specialist",
179        role="specialist",
180        sources=["kronos_forecast_1m"],
181        llm_tier="quick",
182        max_react_steps=2,
183    )
184    agent = SpecialistAgent(cfg)
185    # register_adapter is a module-level function, not a method on the registry.
186    from maf.sources.registry import register_adapter
187    register_adapter("kronos_forecast", KronosForecastSource)
188    registry = SourceRegistry()
189    from maf.config import SourceBinding
190    registry.bind(SourceBinding(
191        name="kronos_forecast_1m",
192        adapter="kronos_forecast",
193        config={"timeframe": "1m", "redis_url": redis_url},
194    ))
195    ctx = AgentContext(config=cfg, sources=registry, llm=_StubLLM())
196    state: dict = {"target": {"ticker": symbol}, "arena_id": uuid.uuid4().hex}
197    state = await agent.run(state, ctx)
198    sigs = state.get("agent_signals") or []
199    if not sigs:
200        print(f"  ✗ no agent_signal emitted")
201        return None
202    s = sigs[0]
203    print(f"  ✓ kronos_specialist emitted AgentSignal:")
204    print(f"     agent={s['agent']}  domain={s.get('domain')}  signal={s['signal']}")
205    print(f"     confidence={s['confidence']:.2f}  key_factors={s.get('key_factors')}")
206    print(f"     summary: {s.get('summary')}")
207    print(f"     narrative: {s.get('narrative', '')[:150]}")
208    return s
209
210
211async def main_async(args: argparse.Namespace) -> int:
212    redis_url = os.environ.get("REDIS_URL", "redis://localhost:6379/0")
213    sidecar_url = args.sidecar.rstrip("/")
214    symbol = args.symbol.upper()
215
216    print(_BANNER)
217    print(f" Kronos end-to-end verification  symbol={symbol}  sidecar={sidecar_url}")
218    print(_BANNER)
219
220    print("\n[1] sidecar health")
221    if not await step1_sidecar_health(sidecar_url):
222        return 1
223
224    print("\n[2] seed 80 synthetic 1m bars + refresher tick")
225    await _seed_bars(redis_url, symbol)
226    payload = await step2_refresher_tick(sidecar_url, redis_url, symbol)
227    if not payload:
228        return 1
229
230    print("\n[3] adapter read (what specialists see)")
231    out = await step3_adapter_read(redis_url, symbol)
232    if not out:
233        return 1
234
235    print("\n[4] kronos_specialist → AgentSignal (stubbed LLM, deterministic mapping)")
236    sig = await step4_specialist_agent_signal(redis_url, symbol)
237    if not sig:
238        return 1
239
240    print("\n" + _BANNER)
241    print(" ✓ All 4 steps green. Kronos integration is real and complete:")
242    print("   sidecar /forecast → cache → emit stream → adapter → AgentSignal")
243    print(_BANNER)
244    return 0
245
246
247def main() -> None:
248    parser = argparse.ArgumentParser()
249    parser.add_argument("--symbol", default="NVDA")
250    parser.add_argument("--sidecar", default="http://127.0.0.1:5102")
251    args = parser.parse_args()
252    sys.exit(asyncio.run(main_async(args)))
253
254
255if __name__ == "__main__":
256    main()