1"""Redis-stream envelope + publisher for the ``trading_intelligence`` arena. 2 3Mirrors the mastermind / crowd_simulation envelope-publisher pattern. Arenas 4are deliberately independent so a future schema bump on one side cannot 5ripple to the other. 6 7Wire shape 8---------- 9Each XADD writes one Redis field named ``data`` whose value is 10``envelope.model_dump_json()``. Consumers JSON-decode + ``model_validate_json``. 11A single field per entry keeps consumers schema-naive — they don't have to know 12which envelope key maps to which Redis field. 13 14In parallel, every publish also HSETs a lookup hash at 15``maf:arena:trading_intelligence:decisions:{ticker}:{trade_date}`` so a 16consumer like the trtools2 engine can answer "what does MAF currently 17think about NVDA?" with one HGETALL rather than tailing the stream. 18 19Schema versioning 20----------------- 21:class:`TradingIntelligenceEnvelope` pins ``schema_version: Literal["1"]``. 22The :func:`decode_envelope` helper checks this **before** validating the 23rest of the body so a future v2 envelope skips cleanly with a WARN. 24 25Consumer usage (trtools2 engine) 26-------------------------------- 27 28Tail the stream:: 29 30 import redis.asyncio as redis 31 from maf.arenas.trading_intelligence.stream import ( 32 DEFAULT_STREAM_NAME, decode_envelope, 33 ) 34 35 r = redis.from_url("redis://localhost:6379/0") 36 last_id = "$" # start from new entries; use "0" to replay history 37 while True: 38 resp = await r.xread({DEFAULT_STREAM_NAME: last_id}, block=5000, count=10) 39 for _stream, entries in resp: 40 for entry_id, fields in entries: 41 env = decode_envelope(fields[b"data"]) 42 if env is None: 43 continue 44 d = env.decision 45 # apply strategy: e.g. open position if confidence > 0.7 46 last_id = entry_id 47 48Point lookup the current verdict:: 49 50 fields = await r.hgetall(f"maf:arena:trading_intelligence:decisions:NVDA:2026-05-11") 51 decision = MarketMindDecision.model_validate_json(fields[b"json"]) 52""" 53 54from __future__ import annotations 55 56import logging 57import uuid 58from datetime import UTC, datetime 59from typing import Any, Literal 60 61from pydantic import BaseModel, ConfigDict, Field, ValidationError 62 63logger = logging.getLogger(__name__) 64 65 66DEFAULT_STREAM_NAME = "maf:arena:trading_intelligence:output" 67DECISION_HASH_PREFIX = "maf:arena:trading_intelligence:decisions" 68 69 70def _utcnow() -> datetime: 71 """Factory for ``published_at`` (separate so tests can monkey-patch).""" 72 return datetime.now(UTC) 73 74 75def _new_correlation_id() -> str: 76 """Factory for ``correlation_id`` — 32-char uuid4 hex.""" 77 return uuid.uuid4().hex 78 79 80# --------------------------------------------------------------------------- 81# Payload schemas 82# --------------------------------------------------------------------------- 83 84 85Verdict = Literal["BUY", "HOLD", "SELL"] 86Direction = Literal["BULLISH", "BEARISH", "NEUTRAL"] 87 88 89class AgentSignalView(BaseModel): 90 """Per-specialist signal on the wire. 91 92 Mirrors :class:`maf.core.state.AgentSignal` but validated. ``narrative`` 93 is kept off the envelope — the dashboard fetches it directly from 94 ``state["reports"]`` and including it here would 5–10× the payload size. 95 """ 96 97 model_config = ConfigDict(extra="ignore", frozen=True) 98 99 agent: str 100 domain: str 101 signal: Direction 102 confidence: float = Field(ge=0.0, le=1.0) 103 summary: str = "" 104 key_factors: list[str] = Field(default_factory=list) 105 106 107class TargetView(BaseModel): 108 model_config = ConfigDict(extra="ignore", frozen=True) 109 110 ticker: str 111 trade_date: str = "" 112 exchange: str = "" 113 asset_class: str = "equity" 114 115 116class SynthesisView(BaseModel): 117 """The synthesis agent's final verdict on the wire.""" 118 119 model_config = ConfigDict(extra="ignore", frozen=True) 120 121 verdict: Verdict 122 confidence: float = Field(ge=0.0, le=1.0) 123 ensemble_score: float = Field(ge=-1.0, le=1.0) 124 reasoning: str = "" 125 126 127class SourceMetricsSummary(BaseModel): 128 """Aggregated source-fetch stats for the run (no per-call detail).""" 129 130 model_config = ConfigDict(extra="ignore", frozen=True) 131 132 total_calls: int = 0 133 successful: int = 0 134 failed: int = 0 135 total_bytes: int = 0 136 by_source: dict[str, int] = Field(default_factory=dict) # source_name -> call count 137 138 139class MarketMindDecision(BaseModel): 140 """The full per-(ticker, trade_date) decision payload. 141 142 This is the consumer's contract — trtools2 strategy code should be 143 written against this schema, not against the raw arena state. Adding 144 fields is backward-compatible (consumers see ``extra='ignore'`` style 145 behavior in practice via Pydantic v2 defaults); changing or removing 146 fields requires a schema_version bump on the envelope. 147 """ 148 149 model_config = ConfigDict(frozen=True) 150 151 target: TargetView 152 synthesis: SynthesisView 153 agent_signals: list[AgentSignalView] = Field(default_factory=list) 154 source_metrics: SourceMetricsSummary = Field(default_factory=SourceMetricsSummary) 155 phase_timings: list[dict[str, Any]] = Field(default_factory=list) 156 arena_total_seconds: float | None = None 157 158 159class TradingIntelligenceEnvelope(BaseModel): 160 """Versioned envelope published to ``maf:arena:trading_intelligence:output``.""" 161 162 model_config = ConfigDict(frozen=True) 163 164 schema_version: Literal["1"] = "1" 165 arena: Literal["trading_intelligence"] = "trading_intelligence" 166 decision: MarketMindDecision 167 published_at: datetime = Field(default_factory=_utcnow) 168 correlation_id: str = Field(default_factory=_new_correlation_id) 169 arena_id: str = "" 170 meta: dict[str, Any] = Field(default_factory=dict) 171 172 173# --------------------------------------------------------------------------- 174# State → envelope 175# --------------------------------------------------------------------------- 176 177 178def _summarise_source_metrics(metrics: list[dict[str, Any]]) -> SourceMetricsSummary: 179 total = len(metrics) 180 successful = sum(1 for m in metrics if m.get("success") is True) 181 total_bytes = sum(int(m.get("result_bytes", 0) or 0) for m in metrics) 182 by_source: dict[str, int] = {} 183 for m in metrics: 184 name = str(m.get("source", "") or "") 185 if name: 186 by_source[name] = by_source.get(name, 0) + 1 187 return SourceMetricsSummary( 188 total_calls=total, 189 successful=successful, 190 failed=total - successful, 191 total_bytes=total_bytes, 192 by_source=by_source, 193 ) 194 195 196def _coerce_signal(value: Any) -> Direction: 197 s = str(value or "NEUTRAL").upper().strip() 198 if s not in ("BULLISH", "BEARISH", "NEUTRAL"): 199 return "NEUTRAL" 200 return s # type: ignore[return-value] 201 202 203def _coerce_verdict(value: Any) -> Verdict: 204 s = str(value or "HOLD").upper().strip() 205 if s not in ("BUY", "HOLD", "SELL"): 206 return "HOLD" 207 return s # type: ignore[return-value] 208 209 210def build_envelope_from_state( 211 state: dict[str, Any], 212 *, 213 correlation_id: str | None = None, 214 extra_meta: dict[str, Any] | None = None, 215) -> TradingIntelligenceEnvelope: 216 """Build an envelope from a post-run ``trading_intelligence`` state dict. 217 218 Tolerant of partial state — if synthesis didn't run, defaults to HOLD with 219 zero confidence so a consumer doesn't see a malformed envelope. Caller 220 decides whether to publish that or skip. 221 """ 222 raw_target = state.get("target") or {} 223 target = TargetView( 224 ticker=str(raw_target.get("ticker") or ""), 225 trade_date=str(raw_target.get("trade_date") or raw_target.get("date") or ""), 226 exchange=str(raw_target.get("exchange") or ""), 227 asset_class=str(raw_target.get("asset_class") or "equity"), 228 ) 229 230 synthesis = SynthesisView( 231 verdict=_coerce_verdict(state.get("synthesis_verdict") or state.get("signal")), 232 confidence=float(state.get("synthesis_confidence") or 0.0), 233 ensemble_score=float(state.get("synthesis_score") or 0.0), 234 reasoning=str(state.get("synthesis_reasoning") or ""), 235 ) 236 237 raw_signals = state.get("agent_signals") or [] 238 agent_signals: list[AgentSignalView] = [] 239 for sig in raw_signals: 240 if not isinstance(sig, dict): 241 continue 242 try: 243 agent_signals.append(AgentSignalView( 244 agent=str(sig.get("agent") or ""), 245 domain=str(sig.get("domain") or ""), 246 signal=_coerce_signal(sig.get("signal")), 247 confidence=float(sig.get("confidence") or 0.0), 248 summary=str(sig.get("summary") or "")[:300], 249 key_factors=[str(x) for x in (sig.get("key_factors") or [])][:10], 250 )) 251 except (ValidationError, TypeError, ValueError) as exc: 252 logger.warning("Skipping malformed agent signal %r: %s", sig, exc) 253 254 metrics_summary = _summarise_source_metrics(state.get("source_metrics") or []) 255 256 decision = MarketMindDecision( 257 target=target, 258 synthesis=synthesis, 259 agent_signals=agent_signals, 260 source_metrics=metrics_summary, 261 phase_timings=list(state.get("phase_timings") or []), 262 arena_total_seconds=state.get("arena_total_seconds"), 263 ) 264 265 meta: dict[str, Any] = { 266 "agents_count": len(agent_signals), 267 "model_used": state.get("llm_model") or "", 268 } 269 if extra_meta: 270 meta.update(extra_meta) 271 272 return TradingIntelligenceEnvelope( 273 decision=decision, 274 correlation_id=correlation_id or _new_correlation_id(), 275 arena_id=str(state.get("arena_id") or ""), 276 meta=meta, 277 ) 278 279 280# --------------------------------------------------------------------------- 281# Redis publishers 282# --------------------------------------------------------------------------- 283 284 285def _decision_hash_key(ticker: str, trade_date: str) -> str: 286 return f"{DECISION_HASH_PREFIX}:{ticker}:{trade_date}" 287 288 289async def publish_envelope( 290 redis_client: Any, 291 stream_name: str, 292 envelope: TradingIntelligenceEnvelope, 293) -> str: 294 """Publish ``envelope`` to ``stream_name`` as a single ``data`` field. 295 296 Returns the assigned stream id, decoded if Redis returned bytes. 297 """ 298 payload = envelope.model_dump_json() 299 raw_id = await redis_client.xadd(stream_name, {"data": payload}) 300 if isinstance(raw_id, bytes): 301 return raw_id.decode("utf-8") 302 return str(raw_id) 303 304 305async def publish_decision_hash( 306 redis_client: Any, 307 envelope: TradingIntelligenceEnvelope, 308) -> str: 309 """Write the decision to a per-(ticker, trade_date) hash for point-lookup. 310 311 Returns the hash key written. Skips silently when the envelope's target 312 is missing a ticker (degraded run). 313 """ 314 d = envelope.decision 315 if not d.target.ticker: 316 logger.debug("publish_decision_hash: empty ticker — skipping hash write") 317 return "" 318 key = _decision_hash_key(d.target.ticker, d.target.trade_date) 319 await redis_client.hset(key, mapping={ 320 "json": envelope.model_dump_json(), 321 "ticker": d.target.ticker, 322 "trade_date": d.target.trade_date, 323 "verdict": d.synthesis.verdict, 324 "confidence": str(d.synthesis.confidence), 325 "ensemble_score": str(d.synthesis.ensemble_score), 326 "published_at": envelope.published_at.isoformat(), 327 "correlation_id": envelope.correlation_id, 328 "arena_id": envelope.arena_id, 329 }) 330 return key 331 332 333# --------------------------------------------------------------------------- 334# Decoder 335# --------------------------------------------------------------------------- 336 337 338def decode_envelope(payload: str | bytes) -> TradingIntelligenceEnvelope | None: 339 """Decode a stream payload into an envelope, or return ``None`` on skip. 340 341 Skips (returning ``None`` after a WARN) when: 342 343 * ``schema_version`` is anything other than ``"1"`` (future-compat) 344 * The body fails Pydantic validation (malformed entry) 345 """ 346 if isinstance(payload, bytes): 347 payload = payload.decode("utf-8") 348 try: 349 import json 350 head = json.loads(payload) 351 except (json.JSONDecodeError, TypeError, ValueError) as exc: 352 logger.warning("decode_envelope: malformed JSON: %s", exc) 353 return None 354 355 sv = head.get("schema_version") 356 if sv != "1": 357 logger.warning( 358 "decode_envelope: skipping unknown schema_version=%r (this consumer speaks v1)", sv, 359 ) 360 return None 361 362 try: 363 return TradingIntelligenceEnvelope.model_validate(head) 364 except ValidationError as exc: 365 logger.warning("decode_envelope: validation failed: %s", exc) 366 return None 367 368 369__all__ = [ 370 "DECISION_HASH_PREFIX", 371 "DEFAULT_STREAM_NAME", 372 "AgentSignalView", 373 "MarketMindDecision", 374 "SourceMetricsSummary", 375 "SynthesisView", 376 "TargetView", 377 "TradingIntelligenceEnvelope", 378 "build_envelope_from_state", 379 "decode_envelope", 380 "publish_decision_hash", 381 "publish_envelope", 382]