checking system…
Docs / back / src/maf/arenas/trading_intelligence/stream.py · line 1
Python · 383 lines
  1"""Redis-stream envelope + publisher for the ``trading_intelligence`` arena.
  2
  3Mirrors the mastermind / crowd_simulation envelope-publisher pattern. Arenas
  4are deliberately independent so a future schema bump on one side cannot
  5ripple to the other.
  6
  7Wire shape
  8----------
  9Each XADD writes one Redis field named ``data`` whose value is
 10``envelope.model_dump_json()``. Consumers JSON-decode + ``model_validate_json``.
 11A single field per entry keeps consumers schema-naive — they don't have to know
 12which envelope key maps to which Redis field.
 13
 14In parallel, every publish also HSETs a lookup hash at
 15``maf:arena:trading_intelligence:decisions:{ticker}:{trade_date}`` so a
 16consumer like the trtools2 engine can answer "what does MAF currently
 17think about NVDA?" with one HGETALL rather than tailing the stream.
 18
 19Schema versioning
 20-----------------
 21:class:`TradingIntelligenceEnvelope` pins ``schema_version: Literal["1"]``.
 22The :func:`decode_envelope` helper checks this **before** validating the
 23rest of the body so a future v2 envelope skips cleanly with a WARN.
 24
 25Consumer usage (trtools2 engine)
 26--------------------------------
 27
 28Tail the stream::
 29
 30    import redis.asyncio as redis
 31    from maf.arenas.trading_intelligence.stream import (
 32        DEFAULT_STREAM_NAME, decode_envelope,
 33    )
 34
 35    r = redis.from_url("redis://localhost:6379/0")
 36    last_id = "$"  # start from new entries; use "0" to replay history
 37    while True:
 38        resp = await r.xread({DEFAULT_STREAM_NAME: last_id}, block=5000, count=10)
 39        for _stream, entries in resp:
 40            for entry_id, fields in entries:
 41                env = decode_envelope(fields[b"data"])
 42                if env is None:
 43                    continue
 44                d = env.decision
 45                # apply strategy: e.g. open position if confidence > 0.7
 46                last_id = entry_id
 47
 48Point lookup the current verdict::
 49
 50    fields = await r.hgetall(f"maf:arena:trading_intelligence:decisions:NVDA:2026-05-11")
 51    decision = MarketMindDecision.model_validate_json(fields[b"json"])
 52"""
 53
 54from __future__ import annotations
 55
 56import logging
 57import uuid
 58from datetime import UTC, datetime
 59from typing import Any, Literal
 60
 61from pydantic import BaseModel, ConfigDict, Field, ValidationError
 62
 63logger = logging.getLogger(__name__)
 64
 65
 66DEFAULT_STREAM_NAME = "maf:arena:trading_intelligence:output"
 67DECISION_HASH_PREFIX = "maf:arena:trading_intelligence:decisions"
 68
 69
 70def _utcnow() -> datetime:
 71    """Factory for ``published_at`` (separate so tests can monkey-patch)."""
 72    return datetime.now(UTC)
 73
 74
 75def _new_correlation_id() -> str:
 76    """Factory for ``correlation_id`` — 32-char uuid4 hex."""
 77    return uuid.uuid4().hex
 78
 79
 80# ---------------------------------------------------------------------------
 81# Payload schemas
 82# ---------------------------------------------------------------------------
 83
 84
 85Verdict = Literal["BUY", "HOLD", "SELL"]
 86Direction = Literal["BULLISH", "BEARISH", "NEUTRAL"]
 87
 88
 89class AgentSignalView(BaseModel):
 90    """Per-specialist signal on the wire.
 91
 92    Mirrors :class:`maf.core.state.AgentSignal` but validated. ``narrative``
 93    is kept off the envelope — the dashboard fetches it directly from
 94    ``state["reports"]`` and including it here would 5–10× the payload size.
 95    """
 96
 97    model_config = ConfigDict(extra="ignore", frozen=True)
 98
 99    agent: str
100    domain: str
101    signal: Direction
102    confidence: float = Field(ge=0.0, le=1.0)
103    summary: str = ""
104    key_factors: list[str] = Field(default_factory=list)
105
106
107class TargetView(BaseModel):
108    model_config = ConfigDict(extra="ignore", frozen=True)
109
110    ticker: str
111    trade_date: str = ""
112    exchange: str = ""
113    asset_class: str = "equity"
114
115
116class SynthesisView(BaseModel):
117    """The synthesis agent's final verdict on the wire."""
118
119    model_config = ConfigDict(extra="ignore", frozen=True)
120
121    verdict: Verdict
122    confidence: float = Field(ge=0.0, le=1.0)
123    ensemble_score: float = Field(ge=-1.0, le=1.0)
124    reasoning: str = ""
125
126
127class SourceMetricsSummary(BaseModel):
128    """Aggregated source-fetch stats for the run (no per-call detail)."""
129
130    model_config = ConfigDict(extra="ignore", frozen=True)
131
132    total_calls: int = 0
133    successful: int = 0
134    failed: int = 0
135    total_bytes: int = 0
136    by_source: dict[str, int] = Field(default_factory=dict)  # source_name -> call count
137
138
139class MarketMindDecision(BaseModel):
140    """The full per-(ticker, trade_date) decision payload.
141
142    This is the consumer's contract — trtools2 strategy code should be
143    written against this schema, not against the raw arena state. Adding
144    fields is backward-compatible (consumers see ``extra='ignore'`` style
145    behavior in practice via Pydantic v2 defaults); changing or removing
146    fields requires a schema_version bump on the envelope.
147    """
148
149    model_config = ConfigDict(frozen=True)
150
151    target: TargetView
152    synthesis: SynthesisView
153    agent_signals: list[AgentSignalView] = Field(default_factory=list)
154    source_metrics: SourceMetricsSummary = Field(default_factory=SourceMetricsSummary)
155    phase_timings: list[dict[str, Any]] = Field(default_factory=list)
156    arena_total_seconds: float | None = None
157
158
159class TradingIntelligenceEnvelope(BaseModel):
160    """Versioned envelope published to ``maf:arena:trading_intelligence:output``."""
161
162    model_config = ConfigDict(frozen=True)
163
164    schema_version: Literal["1"] = "1"
165    arena: Literal["trading_intelligence"] = "trading_intelligence"
166    decision: MarketMindDecision
167    published_at: datetime = Field(default_factory=_utcnow)
168    correlation_id: str = Field(default_factory=_new_correlation_id)
169    arena_id: str = ""
170    meta: dict[str, Any] = Field(default_factory=dict)
171
172
173# ---------------------------------------------------------------------------
174# State → envelope
175# ---------------------------------------------------------------------------
176
177
178def _summarise_source_metrics(metrics: list[dict[str, Any]]) -> SourceMetricsSummary:
179    total = len(metrics)
180    successful = sum(1 for m in metrics if m.get("success") is True)
181    total_bytes = sum(int(m.get("result_bytes", 0) or 0) for m in metrics)
182    by_source: dict[str, int] = {}
183    for m in metrics:
184        name = str(m.get("source", "") or "")
185        if name:
186            by_source[name] = by_source.get(name, 0) + 1
187    return SourceMetricsSummary(
188        total_calls=total,
189        successful=successful,
190        failed=total - successful,
191        total_bytes=total_bytes,
192        by_source=by_source,
193    )
194
195
196def _coerce_signal(value: Any) -> Direction:
197    s = str(value or "NEUTRAL").upper().strip()
198    if s not in ("BULLISH", "BEARISH", "NEUTRAL"):
199        return "NEUTRAL"
200    return s  # type: ignore[return-value]
201
202
203def _coerce_verdict(value: Any) -> Verdict:
204    s = str(value or "HOLD").upper().strip()
205    if s not in ("BUY", "HOLD", "SELL"):
206        return "HOLD"
207    return s  # type: ignore[return-value]
208
209
210def build_envelope_from_state(
211    state: dict[str, Any],
212    *,
213    correlation_id: str | None = None,
214    extra_meta: dict[str, Any] | None = None,
215) -> TradingIntelligenceEnvelope:
216    """Build an envelope from a post-run ``trading_intelligence`` state dict.
217
218    Tolerant of partial state — if synthesis didn't run, defaults to HOLD with
219    zero confidence so a consumer doesn't see a malformed envelope. Caller
220    decides whether to publish that or skip.
221    """
222    raw_target = state.get("target") or {}
223    target = TargetView(
224        ticker=str(raw_target.get("ticker") or ""),
225        trade_date=str(raw_target.get("trade_date") or raw_target.get("date") or ""),
226        exchange=str(raw_target.get("exchange") or ""),
227        asset_class=str(raw_target.get("asset_class") or "equity"),
228    )
229
230    synthesis = SynthesisView(
231        verdict=_coerce_verdict(state.get("synthesis_verdict") or state.get("signal")),
232        confidence=float(state.get("synthesis_confidence") or 0.0),
233        ensemble_score=float(state.get("synthesis_score") or 0.0),
234        reasoning=str(state.get("synthesis_reasoning") or ""),
235    )
236
237    raw_signals = state.get("agent_signals") or []
238    agent_signals: list[AgentSignalView] = []
239    for sig in raw_signals:
240        if not isinstance(sig, dict):
241            continue
242        try:
243            agent_signals.append(AgentSignalView(
244                agent=str(sig.get("agent") or ""),
245                domain=str(sig.get("domain") or ""),
246                signal=_coerce_signal(sig.get("signal")),
247                confidence=float(sig.get("confidence") or 0.0),
248                summary=str(sig.get("summary") or "")[:300],
249                key_factors=[str(x) for x in (sig.get("key_factors") or [])][:10],
250            ))
251        except (ValidationError, TypeError, ValueError) as exc:
252            logger.warning("Skipping malformed agent signal %r: %s", sig, exc)
253
254    metrics_summary = _summarise_source_metrics(state.get("source_metrics") or [])
255
256    decision = MarketMindDecision(
257        target=target,
258        synthesis=synthesis,
259        agent_signals=agent_signals,
260        source_metrics=metrics_summary,
261        phase_timings=list(state.get("phase_timings") or []),
262        arena_total_seconds=state.get("arena_total_seconds"),
263    )
264
265    meta: dict[str, Any] = {
266        "agents_count": len(agent_signals),
267        "model_used": state.get("llm_model") or "",
268    }
269    if extra_meta:
270        meta.update(extra_meta)
271
272    return TradingIntelligenceEnvelope(
273        decision=decision,
274        correlation_id=correlation_id or _new_correlation_id(),
275        arena_id=str(state.get("arena_id") or ""),
276        meta=meta,
277    )
278
279
280# ---------------------------------------------------------------------------
281# Redis publishers
282# ---------------------------------------------------------------------------
283
284
285def _decision_hash_key(ticker: str, trade_date: str) -> str:
286    return f"{DECISION_HASH_PREFIX}:{ticker}:{trade_date}"
287
288
289async def publish_envelope(
290    redis_client: Any,
291    stream_name: str,
292    envelope: TradingIntelligenceEnvelope,
293) -> str:
294    """Publish ``envelope`` to ``stream_name`` as a single ``data`` field.
295
296    Returns the assigned stream id, decoded if Redis returned bytes.
297    """
298    payload = envelope.model_dump_json()
299    raw_id = await redis_client.xadd(stream_name, {"data": payload})
300    if isinstance(raw_id, bytes):
301        return raw_id.decode("utf-8")
302    return str(raw_id)
303
304
305async def publish_decision_hash(
306    redis_client: Any,
307    envelope: TradingIntelligenceEnvelope,
308) -> str:
309    """Write the decision to a per-(ticker, trade_date) hash for point-lookup.
310
311    Returns the hash key written. Skips silently when the envelope's target
312    is missing a ticker (degraded run).
313    """
314    d = envelope.decision
315    if not d.target.ticker:
316        logger.debug("publish_decision_hash: empty ticker — skipping hash write")
317        return ""
318    key = _decision_hash_key(d.target.ticker, d.target.trade_date)
319    await redis_client.hset(key, mapping={
320        "json": envelope.model_dump_json(),
321        "ticker": d.target.ticker,
322        "trade_date": d.target.trade_date,
323        "verdict": d.synthesis.verdict,
324        "confidence": str(d.synthesis.confidence),
325        "ensemble_score": str(d.synthesis.ensemble_score),
326        "published_at": envelope.published_at.isoformat(),
327        "correlation_id": envelope.correlation_id,
328        "arena_id": envelope.arena_id,
329    })
330    return key
331
332
333# ---------------------------------------------------------------------------
334# Decoder
335# ---------------------------------------------------------------------------
336
337
338def decode_envelope(payload: str | bytes) -> TradingIntelligenceEnvelope | None:
339    """Decode a stream payload into an envelope, or return ``None`` on skip.
340
341    Skips (returning ``None`` after a WARN) when:
342
343    * ``schema_version`` is anything other than ``"1"`` (future-compat)
344    * The body fails Pydantic validation (malformed entry)
345    """
346    if isinstance(payload, bytes):
347        payload = payload.decode("utf-8")
348    try:
349        import json
350        head = json.loads(payload)
351    except (json.JSONDecodeError, TypeError, ValueError) as exc:
352        logger.warning("decode_envelope: malformed JSON: %s", exc)
353        return None
354
355    sv = head.get("schema_version")
356    if sv != "1":
357        logger.warning(
358            "decode_envelope: skipping unknown schema_version=%r (this consumer speaks v1)", sv,
359        )
360        return None
361
362    try:
363        return TradingIntelligenceEnvelope.model_validate(head)
364    except ValidationError as exc:
365        logger.warning("decode_envelope: validation failed: %s", exc)
366        return None
367
368
369__all__ = [
370    "DECISION_HASH_PREFIX",
371    "DEFAULT_STREAM_NAME",
372    "AgentSignalView",
373    "MarketMindDecision",
374    "SourceMetricsSummary",
375    "SynthesisView",
376    "TargetView",
377    "TradingIntelligenceEnvelope",
378    "build_envelope_from_state",
379    "decode_envelope",
380    "publish_decision_hash",
381    "publish_envelope",
382]