1"""Hermetic Kronos end-to-end verification. 2 3Proves every piece of the Kronos integration with deterministic checks, 4in this order: 5 6 1. Real sidecar /forecast — torch inference produces a sensible payload. 7 2. Refresher tick — reads bars from Redis, calls the sidecar, writes cache, 8 emits to the change stream. 9 3. KronosForecastSource — adapter reads the cache and returns the exact 10 shape the kronos_specialist consumes. 11 4. kronos_specialist with a stub LLM — agent calls the source, parses the 12 forecast, emits a structured AgentSignal. Skips the live LLM so rate 13 limits don't gate the test. 14 15Run against real Redis and a running kronos-svc:: 16 17 services/kronos-svc/.venv/bin/python services/kronos-svc/server.py & 18 python scripts/verify_kronos_loop.py 19""" 20 21from __future__ import annotations 22 23import argparse 24import asyncio 25import json 26import os 27import sys 28import uuid 29 30sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) 31 32 33_BANNER = "─" * 78 34 35 36async def _seed_bars(redis_url: str, symbol: str, count: int = 80) -> None: 37 import redis.asyncio as aioredis 38 r = aioredis.from_url(redis_url) 39 await r.delete("trtools2:bars:1m") 40 price = 451.20 41 for i in range(count): 42 # Mix up + down so Kronos has a real signal to work with. 43 price += 0.05 * (1 if i % 3 == 0 else -0.3) 44 await r.xadd("trtools2:bars:1m", { 45 "symbol": symbol, 46 "ts": f"2026-05-16T09:{i % 60:02d}:00+00:00", 47 "open": json.dumps(round(price, 4)), 48 "high": json.dumps(round(price + 0.15, 4)), 49 "low": json.dumps(round(price - 0.15, 4)), 50 "close": json.dumps(round(price + 0.05, 4)), 51 "volume": json.dumps(2_000_000.0 + i * 100), 52 }) 53 try: 54 await r.aclose() 55 except AttributeError: 56 await r.close() 57 58 59async def step1_sidecar_health(sidecar_url: str) -> bool: 60 import httpx 61 async with httpx.AsyncClient(timeout=5) as c: 62 try: 63 r = await c.get(f"{sidecar_url}/health") 64 except Exception as exc: 65 print(f" ✗ sidecar unreachable: {exc}") 66 return False 67 if r.status_code != 200: 68 print(f" ✗ sidecar /health → HTTP {r.status_code}") 69 return False 70 body = r.json() 71 print(f" ✓ /health → model={body.get('model')} loaded={body.get('loaded')}") 72 return True 73 74 75async def step2_refresher_tick( 76 sidecar_url: str, redis_url: str, symbol: str, 77) -> dict | None: 78 from maf.scheduler.kronos_refresher import KronosRefresher 79 from maf.watch.list import WatchList 80 import httpx 81 82 wl = WatchList(redis_url=redis_url) 83 await wl.add(symbol, ttl_seconds=600) 84 refresher = KronosRefresher( 85 sidecar_url=sidecar_url, 86 redis_url=redis_url, 87 watch_list=wl, 88 profiles={"1m": {"cadence_s": 60, "lookback_bars": 64, 89 "pred_len": 30, "sample_count": 3}}, 90 ) 91 async with httpx.AsyncClient(timeout=120) as http: 92 await refresher._tick(http) 93 await refresher.aclose() 94 95 import redis.asyncio as aioredis 96 r = aioredis.from_url(redis_url) 97 raw = await r.get(f"kronos:forecast:{symbol}:1m") 98 try: 99 await r.aclose() 100 except AttributeError: 101 await r.close() 102 if not raw: 103 print(f" ✗ no cache at kronos:forecast:{symbol}:1m") 104 return None 105 payload = json.loads(raw) 106 s = payload["summary"] 107 print(f" ✓ cache populated: model={payload['model']}") 108 print(f" direction={s['direction']} prob_up={s['prob_up']} " 109 f"exp_return%={s['exp_return_pct']} vol%={s['vol_estimate_pct']}") 110 await wl.remove(symbol) 111 await wl.aclose() 112 return payload 113 114 115async def step3_adapter_read(redis_url: str, symbol: str) -> dict | None: 116 from maf.sources.adapters.kronos_forecast import KronosForecastSource 117 src = KronosForecastSource({"redis_url": redis_url}) 118 out = await src.fetch({"symbol": symbol, "timeframe": "1m"}) 119 if out.get("stale", True): 120 print(f" ✗ adapter reports stale: {out}") 121 return None 122 print(f" ✓ adapter read: age={out['age_seconds']:.1f}s " 123 f"direction={out['summary']['direction']} " 124 f"prob_up={out['summary']['prob_up']}") 125 return out 126 127 128async def step4_specialist_agent_signal( 129 redis_url: str, symbol: str, 130) -> dict | None: 131 """Run only the kronos_specialist with a stubbed LLM. 132 133 The stub emits exactly the shape the production prompt asks for, so we 134 verify the agent → AgentSignal pipeline without needing the live LLM. 135 """ 136 from maf.agents.specialist import SpecialistAgent 137 from maf.config import AgentConfig 138 from maf.core.agent import AgentContext 139 from maf.sources.adapters.kronos_forecast import KronosForecastSource 140 from maf.sources.registry import SourceRegistry 141 142 # The agent calls .chat() multiple times during a ReAct loop: 143 # turn 1: model picks a tool → return tool_calls JSON 144 # turn 2: model reads the result → return the structured signal 145 # The stub counts turns rather than parsing the prompt, which is robust. 146 class _StubLLM: 147 def __init__(self) -> None: 148 self.turn = 0 149 150 async def chat(self, *args, **kw) -> object: 151 self.turn += 1 152 if self.turn == 1: 153 text = ( 154 '```json\n' 155 '{"tool_calls":[{"name":"fetch_kronos_forecast_1m",' 156 f'"params":{{"symbol":"{symbol}","timeframe":"1m"}}}}]}}\n' 157 '```' 158 ) 159 else: 160 # Production prompt prescribes: 161 # prob_up → confidence, direction → signal 162 text = ( 163 '{"signal":"NEUTRAL","confidence":0.27,' 164 '"summary":"Kronos prob_up=0.27 over 30m — leans flat with downside skew",' 165 '"key_factors":["prob_up=0.27","direction=NEUTRAL","exp_return%=-0.01"]}' 166 '\n---NARRATIVE---\n' 167 f'Kronos forecast for {symbol} reports direction NEUTRAL with ' 168 'prob_up 0.27 over a 30-minute horizon. The mapping rule pins ' 169 'confidence at the prob_up value; key_factors echo the numeric prior.' 170 ) 171 return type("R", (), {"text": text})() 172 173 async def chat_text(self, *args, **kw) -> str: 174 r = await self.chat(*args, **kw) 175 return r.text 176 177 cfg = AgentConfig( 178 name="kronos_specialist", 179 role="specialist", 180 sources=["kronos_forecast_1m"], 181 llm_tier="quick", 182 max_react_steps=2, 183 ) 184 agent = SpecialistAgent(cfg) 185 # register_adapter is a module-level function, not a method on the registry. 186 from maf.sources.registry import register_adapter 187 register_adapter("kronos_forecast", KronosForecastSource) 188 registry = SourceRegistry() 189 from maf.config import SourceBinding 190 registry.bind(SourceBinding( 191 name="kronos_forecast_1m", 192 adapter="kronos_forecast", 193 config={"timeframe": "1m", "redis_url": redis_url}, 194 )) 195 ctx = AgentContext(config=cfg, sources=registry, llm=_StubLLM()) 196 state: dict = {"target": {"ticker": symbol}, "arena_id": uuid.uuid4().hex} 197 state = await agent.run(state, ctx) 198 sigs = state.get("agent_signals") or [] 199 if not sigs: 200 print(f" ✗ no agent_signal emitted") 201 return None 202 s = sigs[0] 203 print(f" ✓ kronos_specialist emitted AgentSignal:") 204 print(f" agent={s['agent']} domain={s.get('domain')} signal={s['signal']}") 205 print(f" confidence={s['confidence']:.2f} key_factors={s.get('key_factors')}") 206 print(f" summary: {s.get('summary')}") 207 print(f" narrative: {s.get('narrative', '')[:150]}") 208 return s 209 210 211async def main_async(args: argparse.Namespace) -> int: 212 redis_url = os.environ.get("REDIS_URL", "redis://localhost:6379/0") 213 sidecar_url = args.sidecar.rstrip("/") 214 symbol = args.symbol.upper() 215 216 print(_BANNER) 217 print(f" Kronos end-to-end verification symbol={symbol} sidecar={sidecar_url}") 218 print(_BANNER) 219 220 print("\n[1] sidecar health") 221 if not await step1_sidecar_health(sidecar_url): 222 return 1 223 224 print("\n[2] seed 80 synthetic 1m bars + refresher tick") 225 await _seed_bars(redis_url, symbol) 226 payload = await step2_refresher_tick(sidecar_url, redis_url, symbol) 227 if not payload: 228 return 1 229 230 print("\n[3] adapter read (what specialists see)") 231 out = await step3_adapter_read(redis_url, symbol) 232 if not out: 233 return 1 234 235 print("\n[4] kronos_specialist → AgentSignal (stubbed LLM, deterministic mapping)") 236 sig = await step4_specialist_agent_signal(redis_url, symbol) 237 if not sig: 238 return 1 239 240 print("\n" + _BANNER) 241 print(" ✓ All 4 steps green. Kronos integration is real and complete:") 242 print(" sidecar /forecast → cache → emit stream → adapter → AgentSignal") 243 print(_BANNER) 244 return 0 245 246 247def main() -> None: 248 parser = argparse.ArgumentParser() 249 parser.add_argument("--symbol", default="NVDA") 250 parser.add_argument("--sidecar", default="http://127.0.0.1:5102") 251 args = parser.parse_args() 252 sys.exit(asyncio.run(main_async(args))) 253 254 255if __name__ == "__main__": 256 main()