checking system…
Docs / back / src/maf/arenas/crowd_simulation/stream.py · line 1
Python · 340 lines
  1"""Redis-stream envelope + publisher for the ``crowd_simulation`` arena.
  2
  3Closes T-0005 (envelope schema) and T-0026 (Redis publisher + tail consumer)
  4by combining the two contracts in a single module:
  5
  6* :class:`CrowdPredictionEnvelope` — a frozen Pydantic v2 model that wraps a
  7  validated :class:`CrowdPrediction` plus the consumer-specific decision-layer
  8  payload chosen by ``decision_kind``. The model is the **wire contract**
  9  shipped to downstream bots; bumping ``schema_version`` is the only
 10  schema-evolution lever.
 11* :func:`build_envelope` — pure constructor that picks the right decision
 12  mapping given a :class:`CrowdPrediction`'s ``source_type``.
 13* :func:`publish_envelope` — single-Redis-write publisher. Encodes the envelope
 14  as one ``data`` field whose value is ``envelope.model_dump_json()``. One
 15  field per stream entry keeps consumers dead simple.
 16* :func:`read_envelopes` / :func:`follow_envelopes` — small async helpers used
 17  by ``maf.arenas.crowd_simulation.tail``.
 18
 19All Redis interaction goes through :mod:`redis.asyncio`; the publisher accepts
 20any client object exposing an awaitable ``xadd`` (so unit tests can pass a
 21``fakeredis.aioredis.FakeRedis``).
 22"""
 23
 24from __future__ import annotations
 25
 26import logging
 27import uuid
 28from collections.abc import AsyncIterator
 29from datetime import UTC, datetime
 30from typing import Any, Literal
 31
 32from pydantic import BaseModel, ConfigDict, Field, ValidationError
 33
 34from maf.arenas.crowd_simulation.decision import (
 35    crowd_to_fomo2_signal,
 36    crowd_to_oddsoddy_multiplier,
 37    crowd_to_polymarket_side,
 38)
 39from maf.arenas.crowd_simulation.schema import CrowdPrediction
 40
 41logger = logging.getLogger(__name__)
 42
 43
 44DEFAULT_STREAM_NAME = "maf:arena:crowd_simulation:output"
 45
 46DecisionKind = Literal["fomo2_signal", "polymarket_side", "oddsoddy_multiplier"]
 47
 48
 49def _utcnow() -> datetime:
 50    """Factory for ``published_at`` — UTC, separate so tests can monkeypatch."""
 51    return datetime.now(UTC)
 52
 53
 54def _new_correlation_id() -> str:
 55    """Factory for ``correlation_id`` — 32-char uuid4 hex."""
 56    return uuid.uuid4().hex
 57
 58
 59class CrowdPredictionEnvelope(BaseModel):
 60    """Versioned envelope published to ``maf:arena:crowd_simulation:output``.
 61
 62    Fields
 63    ------
 64    schema_version:
 65        Literal ``"1"`` so a downstream consumer can branch on the version
 66        before the rest of the body is unpacked.
 67    arena:
 68        Literal ``"crowd_simulation"`` — pinned so a misrouted envelope from
 69        another arena can be rejected at decode time.
 70    prediction:
 71        The validated :class:`CrowdPrediction` produced by the synthesis phase.
 72    decision:
 73        JSON-mode dump of the consumer-specific decision-layer object. The
 74        shape depends on ``decision_kind``; see :func:`build_envelope`.
 75    decision_kind:
 76        Selector for ``decision``: ``fomo2_signal`` | ``polymarket_side`` |
 77        ``oddsoddy_multiplier``. Chosen by :class:`CrowdPrediction`'s
 78        ``source_type``.
 79    published_at:
 80        UTC timestamp of envelope creation (the moment the publisher built
 81        the wire payload).
 82    correlation_id:
 83        Stable id for downstream tracing / log joining. Defaults to a fresh
 84        uuid4 hex so it is unique across publishes.
 85    meta:
 86        Optional free-form bag of operator-facing metadata: per-phase timings,
 87        the simulation_requirement ("ask"), source-config snapshot, document
 88        excerpt, MiroFish offline/live mode, the deep-tier model used, etc.
 89        Consumers MAY ignore this entirely — the prediction + decision are the
 90        wire contract; ``meta`` is for dashboards and humans. Backwards-compat:
 91        envelopes published before ``meta`` existed validate cleanly (default
 92        is an empty dict).
 93
 94    Frozen (no strict mode) — same call as :class:`CrowdPrediction` (T-0017):
 95    field bounds remain enforced via the literal types and nested model, while
 96    ``model_dump_json`` → ``model_validate_json`` round-trips cleanly.
 97    """
 98
 99    model_config = ConfigDict(frozen=True)
100
101    schema_version: Literal["1"] = "1"
102    arena: Literal["crowd_simulation"] = "crowd_simulation"
103    prediction: CrowdPrediction
104    decision: dict[str, Any]
105    decision_kind: DecisionKind
106    published_at: datetime = Field(default_factory=_utcnow)
107    correlation_id: str = Field(default_factory=_new_correlation_id)
108    meta: dict[str, Any] = Field(default_factory=dict)
109
110
111# ---------------------------------------------------------------------------
112# Envelope construction
113# ---------------------------------------------------------------------------
114
115
116_DECISION_BUILDERS: dict[str, tuple[DecisionKind, Any]] = {
117    "fomo2_report": ("fomo2_signal", crowd_to_fomo2_signal),
118    "polymarket_bet": ("polymarket_side", crowd_to_polymarket_side),
119    "oddsoddy_strategy": ("oddsoddy_multiplier", crowd_to_oddsoddy_multiplier),
120}
121
122
123def build_envelope(
124    prediction: CrowdPrediction,
125    *,
126    correlation_id: str | None = None,
127    meta: dict[str, Any] | None = None,
128) -> CrowdPredictionEnvelope:
129    """Build an envelope around ``prediction``, picking the decision kind.
130
131    The decision-layer mapping is chosen by ``prediction.source_type``:
132
133    * ``fomo2_report``      → ``decision_kind="fomo2_signal"``
134    * ``polymarket_bet``    → ``decision_kind="polymarket_side"``
135    * ``oddsoddy_strategy`` → ``decision_kind="oddsoddy_multiplier"``
136
137    The mapper is the corresponding pure function in
138    :mod:`maf.arenas.crowd_simulation.decision`; we serialise its return
139    via ``model_dump(mode="json")`` so the envelope is a pure JSON tree.
140
141    ``meta`` is an optional dict of operator-facing metadata (timings,
142    simulation_requirement, document excerpt, etc.). Consumers MAY ignore it.
143    """
144    try:
145        kind, builder = _DECISION_BUILDERS[prediction.source_type]
146    except KeyError as exc:  # pragma: no cover — schema literal forbids this
147        raise ValueError(
148            f"build_envelope: unknown source_type {prediction.source_type!r}"
149        ) from exc
150
151    decision_obj = builder(prediction)
152    decision_dict: dict[str, Any] = decision_obj.model_dump(mode="json")
153
154    kwargs: dict[str, Any] = {
155        "prediction": prediction,
156        "decision": decision_dict,
157        "decision_kind": kind,
158    }
159    if correlation_id is not None:
160        kwargs["correlation_id"] = correlation_id
161    if meta is not None:
162        kwargs["meta"] = meta
163    return CrowdPredictionEnvelope(**kwargs)
164
165
166# ---------------------------------------------------------------------------
167# Redis publisher
168# ---------------------------------------------------------------------------
169
170
171async def publish_envelope(
172    redis_client: Any,
173    stream_name: str,
174    envelope: CrowdPredictionEnvelope,
175) -> str:
176    """Publish ``envelope`` to ``stream_name`` as a single ``data`` field.
177
178    The whole envelope body is encoded as ``envelope.model_dump_json()`` and
179    written to one Redis stream field named ``data``. Spreading individual
180    envelope fields across multiple Redis fields would force every consumer
181    to know our schema; one ``data`` blob keeps the wire contract self-
182    contained (consumers JSON-decode + ``model_validate_json``).
183
184    Returns the assigned stream id (e.g. ``"1735580000000-0"``) as a string,
185    decoded if Redis returned bytes.
186    """
187    payload = envelope.model_dump_json()
188    raw_id = await redis_client.xadd(stream_name, {"data": payload})
189    if isinstance(raw_id, bytes):
190        return raw_id.decode("utf-8")
191    return str(raw_id)
192
193
194# ---------------------------------------------------------------------------
195# Decoder helpers (used by the tail CLI + tests)
196# ---------------------------------------------------------------------------
197
198
199def decode_envelope(payload: str | bytes) -> CrowdPredictionEnvelope | None:
200    """Decode a stream payload into an envelope, or return ``None`` on a skip.
201
202    Skips (returning ``None`` after a WARN) when:
203
204    * The payload's ``schema_version`` is anything other than ``"1"`` — a
205      future v2 must not crash today's consumers.
206    * The JSON does not validate against :class:`CrowdPredictionEnvelope`.
207
208    Other decode errors propagate so the caller can surface them.
209    """
210    if isinstance(payload, bytes):
211        payload = payload.decode("utf-8")
212
213    # Cheap pre-check on schema_version so we can skip cleanly without
214    # raising a ValidationError for a forward-compat field change.
215    try:
216        import json as _json
217
218        head = _json.loads(payload)
219    except (ValueError, TypeError) as exc:
220        logger.warning("decode_envelope: payload is not valid JSON: %s", exc)
221        return None
222    if isinstance(head, dict):
223        version = head.get("schema_version")
224        if version is not None and version != "1":
225            logger.warning(
226                "decode_envelope: unsupported schema_version=%r — skipping envelope",
227                version,
228            )
229            return None
230
231    try:
232        return CrowdPredictionEnvelope.model_validate_json(payload)
233    except ValidationError as exc:
234        logger.warning("decode_envelope: validation failed, skipping: %s", exc)
235        return None
236
237
238def _extract_data_field(fields: Any) -> str | None:
239    """Pull the ``data`` field out of a Redis xrevrange/xread message body.
240
241    Tolerates both bytes and str keys/values (the ``redis.asyncio`` default
242    is bytes; ``decode_responses=True`` clients return str).
243    """
244    if not isinstance(fields, dict):
245        return None
246    for key, val in fields.items():
247        if isinstance(key, bytes):
248            key_s = key.decode("utf-8", errors="replace")
249        else:
250            key_s = str(key)
251        if key_s != "data":
252            continue
253        if isinstance(val, bytes):
254            return val.decode("utf-8", errors="replace")
255        return str(val)
256    return None
257
258
259async def read_envelopes(
260    redis_client: Any,
261    stream_name: str,
262    count: int,
263) -> list[tuple[str, CrowdPredictionEnvelope]]:
264    """Read the latest ``count`` envelopes from ``stream_name`` (newest first).
265
266    Returns a list of ``(stream_id, envelope)`` pairs. Entries with unknown
267    ``schema_version`` or invalid JSON are silently skipped (after a WARN
268    inside :func:`decode_envelope`).
269    """
270    raw = await redis_client.xrevrange(stream_name, count=count)
271    out: list[tuple[str, CrowdPredictionEnvelope]] = []
272    for raw_id, fields in raw:
273        stream_id = raw_id.decode("utf-8") if isinstance(raw_id, bytes) else str(raw_id)
274        payload = _extract_data_field(fields)
275        if payload is None:
276            logger.warning(
277                "read_envelopes: stream entry %s missing 'data' field — skipping",
278                stream_id,
279            )
280            continue
281        env = decode_envelope(payload)
282        if env is None:
283            continue
284        out.append((stream_id, env))
285    return out
286
287
288async def follow_envelopes(
289    redis_client: Any,
290    stream_name: str,
291    *,
292    block_ms: int = 0,
293    last_id: str = "$",
294) -> AsyncIterator[tuple[str, CrowdPredictionEnvelope]]:
295    """Yield ``(stream_id, envelope)`` pairs forever via ``XREAD BLOCK``.
296
297    ``last_id`` defaults to ``"$"`` (only entries arriving after the call).
298    The caller controls termination — interrupt via ``KeyboardInterrupt`` /
299    cancellation.
300    """
301    cursor = last_id
302    while True:
303        resp = await redis_client.xread(
304            {stream_name: cursor},
305            count=100,
306            block=block_ms,
307        )
308        if not resp:
309            continue
310        for _stream, entries in resp:
311            for raw_id, fields in entries:
312                stream_id = (
313                    raw_id.decode("utf-8") if isinstance(raw_id, bytes) else str(raw_id)
314                )
315                cursor = stream_id
316                payload = _extract_data_field(fields)
317                if payload is None:
318                    logger.warning(
319                        "follow_envelopes: stream entry %s missing 'data' field "
320                        "— skipping",
321                        stream_id,
322                    )
323                    continue
324                env = decode_envelope(payload)
325                if env is None:
326                    continue
327                yield stream_id, env
328
329
330__all__ = [
331    "DEFAULT_STREAM_NAME",
332    "CrowdPredictionEnvelope",
333    "DecisionKind",
334    "build_envelope",
335    "decode_envelope",
336    "follow_envelopes",
337    "publish_envelope",
338    "read_envelopes",
339]