checking system…
Docs / back / src/maf/consumers/risk_gate.py · line 80
Python · 237 lines
  1"""Risk policy for the MAF→engine boundary.
  2
  3What it does
  4------------
  5Takes a :class:`maf.actions.outbox.TradingAction` and produces a
  6:class:`GateDecision` — accept / reduce / reject — plus a structured
  7``reason``. The policy is pure: stateless aside from per-instance counters
  8the caller is responsible for advancing (so multiple gates can share or
  9diverge state cleanly).
 10
 11Policy knobs
 12------------
 13``per_ticker_size_cap``     Max fraction of bookkeeping unit per ticker.
 14``total_open_exposure_cap`` Max total fraction across all open positions.
 15``mode_floor``              Lowest mode the gate will let through ('manual'
 16                            means everything is reduced to manual). Useful
 17                            for "only let auto-mode trades execute on
 18                            specific tickers" via a whitelist.
 19``min_confidence``          Reject anything below this synthesis confidence.
 20``ticker_whitelist``        If non-empty, only these tickers can execute in
 21                            auto mode (others are demoted to semi).
 22``kill_switch_after_rejects`` After N consecutive rejects in a window,
 23                            demote everything to manual until reset.
 24
 25Decisions
 26---------
 27``GateDecision.action``
 28    "execute" — engine should place the order at the (possibly reduced) size.
 29    "queue"   — engine should park for human ack.
 30    "log"     — engine should record the intent but take no action.
 31    "reject"  — engine should drop the action entirely.
 32
 33``GateDecision.size_fraction``
 34    Final size after gate-side reduction (≤ the original).
 35
 36``GateDecision.reason``
 37    Short human-readable string. Goes into the execution envelope so a
 38    downstream auditor can see *why* the gate did what it did.
 39"""
 40
 41from __future__ import annotations
 42
 43import logging
 44from collections import deque
 45from dataclasses import dataclass, field
 46from typing import Any
 47
 48logger = logging.getLogger(__name__)
 49
 50
 51GateAction = str  # "execute" | "queue" | "log" | "reject"
 52
 53
 54@dataclass(frozen=True)
 55class GateDecision:
 56    action: GateAction
 57    size_fraction: float
 58    reason: str
 59    final_mode: str  # echo of effective mode after gate ("auto"|"semi"|"manual")
 60
 61
 62@dataclass
 63class RiskPolicy:
 64    """Tunable policy. All values default to permissive sane numbers."""
 65
 66    per_ticker_size_cap: float = 0.10
 67    total_open_exposure_cap: float = 0.40
 68    min_confidence: float = 0.40
 69    ticker_whitelist: list[str] = field(default_factory=list)
 70    mode_floor: str = "auto"  # most permissive
 71    kill_switch_after_rejects: int = 5
 72    kill_switch_window: int = 20  # last-N rejects considered
 73
 74    def is_whitelisted(self, ticker: str) -> bool:
 75        if not self.ticker_whitelist:
 76            return True
 77        return ticker.upper() in {t.upper() for t in self.ticker_whitelist}
 78
 79
 80class RiskGate:
 81    """Apply a :class:`RiskPolicy` to incoming actions.
 82
 83    Maintains two pieces of state:
 84      * ``open_exposure`` — sum of size_fraction across currently-open
 85        positions, indexed by ticker. ``register_fill`` / ``register_exit``
 86        update this. The gate uses it to enforce the total-exposure cap.
 87      * recent reject history for the kill-switch.
 88
 89    Designed for one process. If you need cross-process state, persist
 90    ``open_exposure`` externally and reconstruct on startup.
 91    """
 92
 93    def __init__(self, policy: RiskPolicy | None = None) -> None:
 94        self.policy = policy or RiskPolicy()
 95        self.open_exposure: dict[str, float] = {}
 96        self._recent_rejects: deque[bool] = deque(maxlen=self.policy.kill_switch_window)
 97
 98    @property
 99    def total_exposure(self) -> float:
100        return sum(self.open_exposure.values())
101
102    def is_killed(self) -> bool:
103        if not self._recent_rejects:
104            return False
105        rejects = sum(1 for r in self._recent_rejects if r)
106        return rejects >= self.policy.kill_switch_after_rejects
107
108    def register_fill(self, ticker: str, size_fraction: float) -> None:
109        """Engine reports a fill; gate updates exposure."""
110        if size_fraction <= 0:
111            return
112        cur = self.open_exposure.get(ticker, 0.0)
113        self.open_exposure[ticker] = cur + size_fraction
114
115    def register_exit(self, ticker: str, size_fraction: float | None = None) -> None:
116        """Engine reports an exit; gate releases exposure."""
117        if ticker not in self.open_exposure:
118            return
119        if size_fraction is None:
120            self.open_exposure.pop(ticker, None)
121            return
122        new = self.open_exposure[ticker] - size_fraction
123        if new <= 1e-6:
124            self.open_exposure.pop(ticker, None)
125        else:
126            self.open_exposure[ticker] = new
127
128    def evaluate(self, action: dict[str, Any]) -> GateDecision:
129        """Apply the policy. Pure function over (policy, gate state, action)."""
130        target = action.get("target") or {}
131        ticker = str(target.get("ticker") or "").upper()
132        sizing = action.get("sizing") or {}
133        confidence = float(sizing.get("confidence") or 0.0)
134        requested = float(sizing.get("size_fraction") or 0.0)
135        mode = str(action.get("mode") or "manual").lower()
136        verdict = str(action.get("verdict") or "HOLD").upper()
137
138        # Mode floor: never let through anything stronger than the configured
139        # floor. ``auto`` means "no demotion". ``semi`` means "auto becomes
140        # semi". ``manual`` means "everything is at most a log entry".
141        if self.policy.mode_floor == "semi" and mode == "auto":
142            mode = "semi"
143        elif self.policy.mode_floor == "manual" and mode in ("auto", "semi"):
144            mode = "manual"
145
146        # Kill switch overrides everything.
147        if self.is_killed():
148            return self._record_reject(GateDecision(
149                action="log",
150                size_fraction=0.0,
151                reason=(
152                    f"kill_switch ({self.policy.kill_switch_after_rejects} rejects "
153                    f"in last {len(self._recent_rejects)})"
154                ),
155                final_mode="manual",
156            ))
157
158        # HOLD verdicts: log only — no order to place.
159        if verdict == "HOLD":
160            return self._record_ok(GateDecision(
161                action="log",
162                size_fraction=0.0,
163                reason="HOLD verdict — no order",
164                final_mode=mode,
165            ))
166
167        # Confidence floor.
168        if confidence < self.policy.min_confidence:
169            return self._record_reject(GateDecision(
170                action="reject",
171                size_fraction=0.0,
172                reason=(
173                    f"confidence {confidence:.2f} < min {self.policy.min_confidence:.2f}"
174                ),
175                final_mode=mode,
176            ))
177
178        # Ticker whitelist (only enforced for auto execution).
179        if mode == "auto" and not self.policy.is_whitelisted(ticker):
180            mode = "semi"  # demote — let human take a look
181
182        # Per-ticker size cap.
183        sized = min(requested, self.policy.per_ticker_size_cap)
184
185        # Total exposure cap — reserve only as much headroom as we have.
186        already_open = self.open_exposure.get(ticker, 0.0)
187        headroom = max(
188            0.0,
189            self.policy.total_open_exposure_cap
190            - (self.total_exposure - already_open),
191        )
192        sized = min(sized, headroom)
193
194        if sized <= 0.0:
195            return self._record_reject(GateDecision(
196                action="reject",
197                size_fraction=0.0,
198                reason=(
199                    f"no headroom (total_open={self.total_exposure:.2f}, "
200                    f"cap={self.policy.total_open_exposure_cap:.2f})"
201                ),
202                final_mode=mode,
203            ))
204
205        gate_action = {
206            "auto":   "execute",
207            "semi":   "queue",
208            "manual": "log",
209        }.get(mode, "log")
210
211        return self._record_ok(GateDecision(
212            action=gate_action,
213            size_fraction=round(sized, 4),
214            reason=(
215                "ok"
216                if sized >= requested else
217                f"size reduced from {requested:.3f} to {sized:.3f} by caps"
218            ),
219            final_mode=mode,
220        ))
221
222    def _record_ok(self, d: GateDecision) -> GateDecision:
223        self._recent_rejects.append(False)
224        return d
225
226    def _record_reject(self, d: GateDecision) -> GateDecision:
227        self._recent_rejects.append(True)
228        return d
229
230
231__all__ = [
232    "GateAction",
233    "GateDecision",
234    "RiskGate",
235    "RiskPolicy",
236]