1"""Redis-stream envelope + publisher for the ``crowd_simulation`` arena. 2 3Closes T-0005 (envelope schema) and T-0026 (Redis publisher + tail consumer) 4by combining the two contracts in a single module: 5 6* :class:`CrowdPredictionEnvelope` — a frozen Pydantic v2 model that wraps a 7 validated :class:`CrowdPrediction` plus the consumer-specific decision-layer 8 payload chosen by ``decision_kind``. The model is the **wire contract** 9 shipped to downstream bots; bumping ``schema_version`` is the only 10 schema-evolution lever. 11* :func:`build_envelope` — pure constructor that picks the right decision 12 mapping given a :class:`CrowdPrediction`'s ``source_type``. 13* :func:`publish_envelope` — single-Redis-write publisher. Encodes the envelope 14 as one ``data`` field whose value is ``envelope.model_dump_json()``. One 15 field per stream entry keeps consumers dead simple. 16* :func:`read_envelopes` / :func:`follow_envelopes` — small async helpers used 17 by ``maf.arenas.crowd_simulation.tail``. 18 19All Redis interaction goes through :mod:`redis.asyncio`; the publisher accepts 20any client object exposing an awaitable ``xadd`` (so unit tests can pass a 21``fakeredis.aioredis.FakeRedis``). 22""" 23 24from __future__ import annotations 25 26import logging 27import uuid 28from collections.abc import AsyncIterator 29from datetime import UTC, datetime 30from typing import Any, Literal 31 32from pydantic import BaseModel, ConfigDict, Field, ValidationError 33 34from maf.arenas.crowd_simulation.decision import ( 35 crowd_to_fomo2_signal, 36 crowd_to_oddsoddy_multiplier, 37 crowd_to_polymarket_side, 38) 39from maf.arenas.crowd_simulation.schema import CrowdPrediction 40 41logger = logging.getLogger(__name__) 42 43 44DEFAULT_STREAM_NAME = "maf:arena:crowd_simulation:output" 45 46DecisionKind = Literal["fomo2_signal", "polymarket_side", "oddsoddy_multiplier"] 47 48 49def _utcnow() -> datetime: 50 """Factory for ``published_at`` — UTC, separate so tests can monkeypatch.""" 51 return datetime.now(UTC) 52 53 54def _new_correlation_id() -> str: 55 """Factory for ``correlation_id`` — 32-char uuid4 hex.""" 56 return uuid.uuid4().hex 57 58 59class CrowdPredictionEnvelope(BaseModel): 60 """Versioned envelope published to ``maf:arena:crowd_simulation:output``. 61 62 Fields 63 ------ 64 schema_version: 65 Literal ``"1"`` so a downstream consumer can branch on the version 66 before the rest of the body is unpacked. 67 arena: 68 Literal ``"crowd_simulation"`` — pinned so a misrouted envelope from 69 another arena can be rejected at decode time. 70 prediction: 71 The validated :class:`CrowdPrediction` produced by the synthesis phase. 72 decision: 73 JSON-mode dump of the consumer-specific decision-layer object. The 74 shape depends on ``decision_kind``; see :func:`build_envelope`. 75 decision_kind: 76 Selector for ``decision``: ``fomo2_signal`` | ``polymarket_side`` | 77 ``oddsoddy_multiplier``. Chosen by :class:`CrowdPrediction`'s 78 ``source_type``. 79 published_at: 80 UTC timestamp of envelope creation (the moment the publisher built 81 the wire payload). 82 correlation_id: 83 Stable id for downstream tracing / log joining. Defaults to a fresh 84 uuid4 hex so it is unique across publishes. 85 meta: 86 Optional free-form bag of operator-facing metadata: per-phase timings, 87 the simulation_requirement ("ask"), source-config snapshot, document 88 excerpt, MiroFish offline/live mode, the deep-tier model used, etc. 89 Consumers MAY ignore this entirely — the prediction + decision are the 90 wire contract; ``meta`` is for dashboards and humans. Backwards-compat: 91 envelopes published before ``meta`` existed validate cleanly (default 92 is an empty dict). 93 94 Frozen (no strict mode) — same call as :class:`CrowdPrediction` (T-0017): 95 field bounds remain enforced via the literal types and nested model, while 96 ``model_dump_json`` → ``model_validate_json`` round-trips cleanly. 97 """ 98 99 model_config = ConfigDict(frozen=True) 100 101 schema_version: Literal["1"] = "1" 102 arena: Literal["crowd_simulation"] = "crowd_simulation" 103 prediction: CrowdPrediction 104 decision: dict[str, Any] 105 decision_kind: DecisionKind 106 published_at: datetime = Field(default_factory=_utcnow) 107 correlation_id: str = Field(default_factory=_new_correlation_id) 108 meta: dict[str, Any] = Field(default_factory=dict) 109 110 111# --------------------------------------------------------------------------- 112# Envelope construction 113# --------------------------------------------------------------------------- 114 115 116_DECISION_BUILDERS: dict[str, tuple[DecisionKind, Any]] = { 117 "fomo2_report": ("fomo2_signal", crowd_to_fomo2_signal), 118 "polymarket_bet": ("polymarket_side", crowd_to_polymarket_side), 119 "oddsoddy_strategy": ("oddsoddy_multiplier", crowd_to_oddsoddy_multiplier), 120} 121 122 123def build_envelope( 124 prediction: CrowdPrediction, 125 *, 126 correlation_id: str | None = None, 127 meta: dict[str, Any] | None = None, 128) -> CrowdPredictionEnvelope: 129 """Build an envelope around ``prediction``, picking the decision kind. 130 131 The decision-layer mapping is chosen by ``prediction.source_type``: 132 133 * ``fomo2_report`` → ``decision_kind="fomo2_signal"`` 134 * ``polymarket_bet`` → ``decision_kind="polymarket_side"`` 135 * ``oddsoddy_strategy`` → ``decision_kind="oddsoddy_multiplier"`` 136 137 The mapper is the corresponding pure function in 138 :mod:`maf.arenas.crowd_simulation.decision`; we serialise its return 139 via ``model_dump(mode="json")`` so the envelope is a pure JSON tree. 140 141 ``meta`` is an optional dict of operator-facing metadata (timings, 142 simulation_requirement, document excerpt, etc.). Consumers MAY ignore it. 143 """ 144 try: 145 kind, builder = _DECISION_BUILDERS[prediction.source_type] 146 except KeyError as exc: # pragma: no cover — schema literal forbids this 147 raise ValueError( 148 f"build_envelope: unknown source_type {prediction.source_type!r}" 149 ) from exc 150 151 decision_obj = builder(prediction) 152 decision_dict: dict[str, Any] = decision_obj.model_dump(mode="json") 153 154 kwargs: dict[str, Any] = { 155 "prediction": prediction, 156 "decision": decision_dict, 157 "decision_kind": kind, 158 } 159 if correlation_id is not None: 160 kwargs["correlation_id"] = correlation_id 161 if meta is not None: 162 kwargs["meta"] = meta 163 return CrowdPredictionEnvelope(**kwargs) 164 165 166# --------------------------------------------------------------------------- 167# Redis publisher 168# --------------------------------------------------------------------------- 169 170 171async def publish_envelope( 172 redis_client: Any, 173 stream_name: str, 174 envelope: CrowdPredictionEnvelope, 175) -> str: 176 """Publish ``envelope`` to ``stream_name`` as a single ``data`` field. 177 178 The whole envelope body is encoded as ``envelope.model_dump_json()`` and 179 written to one Redis stream field named ``data``. Spreading individual 180 envelope fields across multiple Redis fields would force every consumer 181 to know our schema; one ``data`` blob keeps the wire contract self- 182 contained (consumers JSON-decode + ``model_validate_json``). 183 184 Returns the assigned stream id (e.g. ``"1735580000000-0"``) as a string, 185 decoded if Redis returned bytes. 186 """ 187 payload = envelope.model_dump_json() 188 raw_id = await redis_client.xadd(stream_name, {"data": payload}) 189 if isinstance(raw_id, bytes): 190 return raw_id.decode("utf-8") 191 return str(raw_id) 192 193 194# --------------------------------------------------------------------------- 195# Decoder helpers (used by the tail CLI + tests) 196# --------------------------------------------------------------------------- 197 198 199def decode_envelope(payload: str | bytes) -> CrowdPredictionEnvelope | None: 200 """Decode a stream payload into an envelope, or return ``None`` on a skip. 201 202 Skips (returning ``None`` after a WARN) when: 203 204 * The payload's ``schema_version`` is anything other than ``"1"`` — a 205 future v2 must not crash today's consumers. 206 * The JSON does not validate against :class:`CrowdPredictionEnvelope`. 207 208 Other decode errors propagate so the caller can surface them. 209 """ 210 if isinstance(payload, bytes): 211 payload = payload.decode("utf-8") 212 213 # Cheap pre-check on schema_version so we can skip cleanly without 214 # raising a ValidationError for a forward-compat field change. 215 try: 216 import json as _json 217 218 head = _json.loads(payload) 219 except (ValueError, TypeError) as exc: 220 logger.warning("decode_envelope: payload is not valid JSON: %s", exc) 221 return None 222 if isinstance(head, dict): 223 version = head.get("schema_version") 224 if version is not None and version != "1": 225 logger.warning( 226 "decode_envelope: unsupported schema_version=%r — skipping envelope", 227 version, 228 ) 229 return None 230 231 try: 232 return CrowdPredictionEnvelope.model_validate_json(payload) 233 except ValidationError as exc: 234 logger.warning("decode_envelope: validation failed, skipping: %s", exc) 235 return None 236 237 238def _extract_data_field(fields: Any) -> str | None: 239 """Pull the ``data`` field out of a Redis xrevrange/xread message body. 240 241 Tolerates both bytes and str keys/values (the ``redis.asyncio`` default 242 is bytes; ``decode_responses=True`` clients return str). 243 """ 244 if not isinstance(fields, dict): 245 return None 246 for key, val in fields.items(): 247 if isinstance(key, bytes): 248 key_s = key.decode("utf-8", errors="replace") 249 else: 250 key_s = str(key) 251 if key_s != "data": 252 continue 253 if isinstance(val, bytes): 254 return val.decode("utf-8", errors="replace") 255 return str(val) 256 return None 257 258 259async def read_envelopes( 260 redis_client: Any, 261 stream_name: str, 262 count: int, 263) -> list[tuple[str, CrowdPredictionEnvelope]]: 264 """Read the latest ``count`` envelopes from ``stream_name`` (newest first). 265 266 Returns a list of ``(stream_id, envelope)`` pairs. Entries with unknown 267 ``schema_version`` or invalid JSON are silently skipped (after a WARN 268 inside :func:`decode_envelope`). 269 """ 270 raw = await redis_client.xrevrange(stream_name, count=count) 271 out: list[tuple[str, CrowdPredictionEnvelope]] = [] 272 for raw_id, fields in raw: 273 stream_id = raw_id.decode("utf-8") if isinstance(raw_id, bytes) else str(raw_id) 274 payload = _extract_data_field(fields) 275 if payload is None: 276 logger.warning( 277 "read_envelopes: stream entry %s missing 'data' field — skipping", 278 stream_id, 279 ) 280 continue 281 env = decode_envelope(payload) 282 if env is None: 283 continue 284 out.append((stream_id, env)) 285 return out 286 287 288async def follow_envelopes( 289 redis_client: Any, 290 stream_name: str, 291 *, 292 block_ms: int = 0, 293 last_id: str = "$", 294) -> AsyncIterator[tuple[str, CrowdPredictionEnvelope]]: 295 """Yield ``(stream_id, envelope)`` pairs forever via ``XREAD BLOCK``. 296 297 ``last_id`` defaults to ``"$"`` (only entries arriving after the call). 298 The caller controls termination — interrupt via ``KeyboardInterrupt`` / 299 cancellation. 300 """ 301 cursor = last_id 302 while True: 303 resp = await redis_client.xread( 304 {stream_name: cursor}, 305 count=100, 306 block=block_ms, 307 ) 308 if not resp: 309 continue 310 for _stream, entries in resp: 311 for raw_id, fields in entries: 312 stream_id = ( 313 raw_id.decode("utf-8") if isinstance(raw_id, bytes) else str(raw_id) 314 ) 315 cursor = stream_id 316 payload = _extract_data_field(fields) 317 if payload is None: 318 logger.warning( 319 "follow_envelopes: stream entry %s missing 'data' field " 320 "— skipping", 321 stream_id, 322 ) 323 continue 324 env = decode_envelope(payload) 325 if env is None: 326 continue 327 yield stream_id, env 328 329 330__all__ = [ 331 "DEFAULT_STREAM_NAME", 332 "CrowdPredictionEnvelope", 333 "DecisionKind", 334 "build_envelope", 335 "decode_envelope", 336 "follow_envelopes", 337 "publish_envelope", 338 "read_envelopes", 339]