checking system…
Docs / Crowd Simulation Oracle
How MAF turns mirofish outputs into oracle-style envelopes.

Crowd-Simulation Oracle — Consumer Guide

A downstream bot opts into MAF's crowd_simulation arena by reading a single Redis stream — maf:arena:crowd_simulation:output — and decoding each entry's data field as a CrowdPredictionEnvelope. No MAF source dependency is required at runtime; consumers ship their own copy of the schema (or paste the pydantic definition below).

This page is the wire-contract reference. The code companion is examples/consume_crowd_oracle.py.


What this stream emits

Each Redis stream entry has exactly one field, data, whose value is the JSON-serialised CrowdPredictionEnvelope. Schema (defined in src/maf/arenas/crowd_simulation/stream.py):

Field Type / bounds Notes
schema_version Literal["1"] Branch on this; future v2 may change other fields.
arena Literal["crowd_simulation"] Pinned, lets consumers reject misrouted envelopes.
prediction CrowdPrediction (see below) The synthesised arena output.
decision dict[str, Any] Shape depends on decision_kind.
decision_kind Literal["fomo2_signal", "polymarket_side", "oddsoddy_multiplier"] Selector for decision.
published_at datetime (UTC, ISO-8601) Moment the publisher built the wire payload.
correlation_id str — 32-char uuid4 hex Use this for dedup + cross-system tracing.

CrowdPrediction (from src/maf/arenas/crowd_simulation/schema.py):

Field Type / bounds
outcome Literal["UP", "DOWN", "FLAT", "YES", "NO"]
probability float ∈ [0, 1] — probability the chosen outcome materialises
horizon_hours int > 0
top_drivers list[str] — 1..10 entries
dissent_pct float ∈ [0, 1]
source_type Literal["fomo2_report", "polymarket_bet", "oddsoddy_strategy"]
source_id non-empty str
model_votes list[ModelVote]
generated_at datetime (UTC)

decision shapes per decision_kind:

  • polymarket_side{side: "YES"|"NO"|"SKIP", edge_bps: int, suggested_size_usd: float}
  • fomo2_signal{signal: "BUY"|"SELL"|"HOLD", confidence: float [0,1], horizon_hours: int}
  • oddsoddy_multiplier{multiplier: float [0,2], gate: bool, reason: str}

The 5-line opt-in (Python)

Excluding imports, the minimum opt-in is exactly five lines:

import asyncio, json
import redis.asyncio as aioredis

async def main():
    r = aioredis.from_url("redis://localhost:6379/0")                                  # 1
    while resp := await r.xread({"maf:arena:crowd_simulation:output": "$"}, block=0):  # 2
        env = json.loads(resp[0][1][0][1][b"data"])                                    # 3
        kind, decision = env["decision_kind"], env["decision"]                         # 4
        print(kind, decision)  # ← act on `decision` here                              # 5

asyncio.run(main())

Lines do, in order: connect to Redis, XREAD BLOCK 0 from $ (only new entries), JSON-decode the envelope, dispatch by decision_kind, act on the decision payload.


Three consumer recipes

Each recipe is ≤ 25 lines, idiomatic Python. Drop into your bot, swap print() for whatever your trading loop expects.

Recipe A — fomo2 (signal-based)

# fomo2 consumer: BUY/SELL/HOLD with confidence + horizon.
import asyncio, json
import redis.asyncio as aioredis

STREAM = "maf:arena:crowd_simulation:output"

async def run() -> None:
    r = aioredis.from_url("redis://localhost:6379/0")
    last = "$"
    while True:
        resp = await r.xread({STREAM: last}, block=0)
        for _, entries in resp:
            for sid, fields in entries:
                last = sid
                env = json.loads(fields[b"data"])
                if env.get("schema_version") != "1": continue
                if env["decision_kind"] != "fomo2_signal": continue
                d = env["decision"]
                print(f"{sid.decode()} signal={d['signal']} conf={d['confidence']:.2f} h={d['horizon_hours']}h")
                # your_trader.place_signal(d["signal"], d["confidence"], d["horizon_hours"])

asyncio.run(run())

Recipe B — polymarket (side-based)

# polymarket consumer: YES/NO sides + edge_bps + suggested USD size.
import asyncio, json
import redis.asyncio as aioredis

STREAM = "maf:arena:crowd_simulation:output"

async def run() -> None:
    r = aioredis.from_url("redis://localhost:6379/0")
    last = "$"
    while True:
        resp = await r.xread({STREAM: last}, block=0)
        for _, entries in resp:
            for sid, fields in entries:
                last = sid
                env = json.loads(fields[b"data"])
                if env.get("schema_version") != "1": continue
                if env["decision_kind"] != "polymarket_side": continue
                d, p = env["decision"], env["prediction"]
                if d["side"] == "SKIP" or d["edge_bps"] < 200: continue
                print(f"{sid.decode()} bet {d['side']} ${d['suggested_size_usd']} on {p['source_id'][:10]}…")

asyncio.run(run())

Recipe C — oddsoddy (multiplier-based)

# oddsoddy strategy consumer: gate or scale a position by `multiplier`.
import asyncio, json
import redis.asyncio as aioredis

STREAM = "maf:arena:crowd_simulation:output"

async def run() -> None:
    r = aioredis.from_url("redis://localhost:6379/0")
    last = "$"
    while True:
        resp = await r.xread({STREAM: last}, block=0)
        for _, entries in resp:
            for sid, fields in entries:
                last = sid
                env = json.loads(fields[b"data"])
                if env.get("schema_version") != "1": continue
                if env["decision_kind"] != "oddsoddy_multiplier": continue
                d, p = env["decision"], env["prediction"]
                if d["gate"]: continue                                # crowd vetoed this trade
                print(f"{sid.decode()} mult={d['multiplier']:.2f} for {p['source_id']} ({d['reason'][:60]}…)")

asyncio.run(run())

Filtering & robustness

  • Forward-compat (schema_version): a future bump to "2" may add or rename fields. Always check env["schema_version"] == "1" before trusting the rest of the body, and continue on a mismatch (a WARN log is plenty).
  • Source filtering: read env["prediction"]["source_type"] and skip entries whose source you don't trade against. The example consumer exposes --source-type as a one-shot CLI filter.
  • Dedup via correlation_id: keep a small LRU set of seen correlation_id values. The publisher emits a fresh uuid4 per envelope, but consumers that resume after a crash and re-read XRANGE from a known cursor can see overlap; dedup makes resume idempotent.

Operational notes

  • Redis URL: defaults to redis://localhost:6379/0. Override via REDIS_URL (or --redis-url). The example consumer reads REDIS_URL before falling back to the default.
  • Switching streams for testing: pass --stream test:... (or set the env in your fork). Useful when running an arena demo against a fakeredis instance or a side-Redis.
  • Backfill via XREVRANGE: to seed from the most recent N envelopes before tailing, call await r.xrevrange("maf:arena:crowd_simulation:output", count=N) once, then start XREAD from the last sid seen.
  • Demo CLI: the runnable companion is examples/consume_crowd_oracle.py; it supports --count N (one-shot, newest-first) and --source-type (post-decode filter). The 5-line snippet above is the API floor — the example is what to reach for when you need a real script.