Crowd-Simulation Oracle — Consumer Guide
A downstream bot opts into MAF's crowd_simulation arena by reading a single
Redis stream — maf:arena:crowd_simulation:output — and decoding each entry's
data field as a CrowdPredictionEnvelope. No MAF source dependency is
required at runtime; consumers ship their own copy of the schema (or paste the
pydantic definition below).
This page is the wire-contract reference. The code companion is
examples/consume_crowd_oracle.py.
What this stream emits
Each Redis stream entry has exactly one field, data, whose value is the
JSON-serialised CrowdPredictionEnvelope. Schema (defined in
src/maf/arenas/crowd_simulation/stream.py):
| Field | Type / bounds | Notes |
|---|---|---|
schema_version |
Literal["1"] |
Branch on this; future v2 may change other fields. |
arena |
Literal["crowd_simulation"] |
Pinned, lets consumers reject misrouted envelopes. |
prediction |
CrowdPrediction (see below) |
The synthesised arena output. |
decision |
dict[str, Any] |
Shape depends on decision_kind. |
decision_kind |
Literal["fomo2_signal", "polymarket_side", "oddsoddy_multiplier"] |
Selector for decision. |
published_at |
datetime (UTC, ISO-8601) |
Moment the publisher built the wire payload. |
correlation_id |
str — 32-char uuid4 hex |
Use this for dedup + cross-system tracing. |
CrowdPrediction (from src/maf/arenas/crowd_simulation/schema.py):
| Field | Type / bounds |
|---|---|
outcome |
Literal["UP", "DOWN", "FLAT", "YES", "NO"] |
probability |
float ∈ [0, 1] — probability the chosen outcome materialises |
horizon_hours |
int > 0 |
top_drivers |
list[str] — 1..10 entries |
dissent_pct |
float ∈ [0, 1] |
source_type |
Literal["fomo2_report", "polymarket_bet", "oddsoddy_strategy"] |
source_id |
non-empty str |
model_votes |
list[ModelVote] |
generated_at |
datetime (UTC) |
decision shapes per decision_kind:
polymarket_side→{side: "YES"|"NO"|"SKIP", edge_bps: int, suggested_size_usd: float}fomo2_signal→{signal: "BUY"|"SELL"|"HOLD", confidence: float [0,1], horizon_hours: int}oddsoddy_multiplier→{multiplier: float [0,2], gate: bool, reason: str}
The 5-line opt-in (Python)
Excluding imports, the minimum opt-in is exactly five lines:
import asyncio, json
import redis.asyncio as aioredis
async def main():
r = aioredis.from_url("redis://localhost:6379/0") # 1
while resp := await r.xread({"maf:arena:crowd_simulation:output": "$"}, block=0): # 2
env = json.loads(resp[0][1][0][1][b"data"]) # 3
kind, decision = env["decision_kind"], env["decision"] # 4
print(kind, decision) # ← act on `decision` here # 5
asyncio.run(main())
Lines do, in order: connect to Redis, XREAD BLOCK 0 from $ (only new
entries), JSON-decode the envelope, dispatch by decision_kind, act on the
decision payload.
Three consumer recipes
Each recipe is ≤ 25 lines, idiomatic Python. Drop into your bot, swap
print() for whatever your trading loop expects.
Recipe A — fomo2 (signal-based)
# fomo2 consumer: BUY/SELL/HOLD with confidence + horizon.
import asyncio, json
import redis.asyncio as aioredis
STREAM = "maf:arena:crowd_simulation:output"
async def run() -> None:
r = aioredis.from_url("redis://localhost:6379/0")
last = "$"
while True:
resp = await r.xread({STREAM: last}, block=0)
for _, entries in resp:
for sid, fields in entries:
last = sid
env = json.loads(fields[b"data"])
if env.get("schema_version") != "1": continue
if env["decision_kind"] != "fomo2_signal": continue
d = env["decision"]
print(f"{sid.decode()} signal={d['signal']} conf={d['confidence']:.2f} h={d['horizon_hours']}h")
# your_trader.place_signal(d["signal"], d["confidence"], d["horizon_hours"])
asyncio.run(run())
Recipe B — polymarket (side-based)
# polymarket consumer: YES/NO sides + edge_bps + suggested USD size.
import asyncio, json
import redis.asyncio as aioredis
STREAM = "maf:arena:crowd_simulation:output"
async def run() -> None:
r = aioredis.from_url("redis://localhost:6379/0")
last = "$"
while True:
resp = await r.xread({STREAM: last}, block=0)
for _, entries in resp:
for sid, fields in entries:
last = sid
env = json.loads(fields[b"data"])
if env.get("schema_version") != "1": continue
if env["decision_kind"] != "polymarket_side": continue
d, p = env["decision"], env["prediction"]
if d["side"] == "SKIP" or d["edge_bps"] < 200: continue
print(f"{sid.decode()} bet {d['side']} ${d['suggested_size_usd']} on {p['source_id'][:10]}…")
asyncio.run(run())
Recipe C — oddsoddy (multiplier-based)
# oddsoddy strategy consumer: gate or scale a position by `multiplier`.
import asyncio, json
import redis.asyncio as aioredis
STREAM = "maf:arena:crowd_simulation:output"
async def run() -> None:
r = aioredis.from_url("redis://localhost:6379/0")
last = "$"
while True:
resp = await r.xread({STREAM: last}, block=0)
for _, entries in resp:
for sid, fields in entries:
last = sid
env = json.loads(fields[b"data"])
if env.get("schema_version") != "1": continue
if env["decision_kind"] != "oddsoddy_multiplier": continue
d, p = env["decision"], env["prediction"]
if d["gate"]: continue # crowd vetoed this trade
print(f"{sid.decode()} mult={d['multiplier']:.2f} for {p['source_id']} ({d['reason'][:60]}…)")
asyncio.run(run())
Filtering & robustness
- Forward-compat (
schema_version): a future bump to"2"may add or rename fields. Always checkenv["schema_version"] == "1"before trusting the rest of the body, andcontinueon a mismatch (a WARN log is plenty). - Source filtering: read
env["prediction"]["source_type"]and skip entries whose source you don't trade against. The example consumer exposes--source-typeas a one-shot CLI filter. - Dedup via
correlation_id: keep a small LRU set of seencorrelation_idvalues. The publisher emits a freshuuid4per envelope, but consumers that resume after a crash and re-readXRANGEfrom a known cursor can see overlap; dedup makes resume idempotent.
Operational notes
- Redis URL: defaults to
redis://localhost:6379/0. Override viaREDIS_URL(or--redis-url). The example consumer readsREDIS_URLbefore falling back to the default. - Switching streams for testing: pass
--stream test:...(or set the env in your fork). Useful when running an arena demo against a fakeredis instance or a side-Redis. - Backfill via
XREVRANGE: to seed from the most recent N envelopes before tailing, callawait r.xrevrange("maf:arena:crowd_simulation:output", count=N)once, then startXREADfrom the lastsidseen. - Demo CLI: the runnable companion is
examples/consume_crowd_oracle.py; it supports--count N(one-shot, newest-first) and--source-type(post-decode filter). The 5-line snippet above is the API floor — the example is what to reach for when you need a real script.