1"""Risk policy for the MAF→engine boundary. 2 3What it does 4------------ 5Takes a :class:`maf.actions.outbox.TradingAction` and produces a 6:class:`GateDecision` — accept / reduce / reject — plus a structured 7``reason``. The policy is pure: stateless aside from per-instance counters 8the caller is responsible for advancing (so multiple gates can share or 9diverge state cleanly). 10 11Policy knobs 12------------ 13``per_ticker_size_cap`` Max fraction of bookkeeping unit per ticker. 14``total_open_exposure_cap`` Max total fraction across all open positions. 15``mode_floor`` Lowest mode the gate will let through ('manual' 16 means everything is reduced to manual). Useful 17 for "only let auto-mode trades execute on 18 specific tickers" via a whitelist. 19``min_confidence`` Reject anything below this synthesis confidence. 20``ticker_whitelist`` If non-empty, only these tickers can execute in 21 auto mode (others are demoted to semi). 22``kill_switch_after_rejects`` After N consecutive rejects in a window, 23 demote everything to manual until reset. 24 25Decisions 26--------- 27``GateDecision.action`` 28 "execute" — engine should place the order at the (possibly reduced) size. 29 "queue" — engine should park for human ack. 30 "log" — engine should record the intent but take no action. 31 "reject" — engine should drop the action entirely. 32 33``GateDecision.size_fraction`` 34 Final size after gate-side reduction (≤ the original). 35 36``GateDecision.reason`` 37 Short human-readable string. Goes into the execution envelope so a 38 downstream auditor can see *why* the gate did what it did. 39""" 40 41from __future__ import annotations 42 43import logging 44from collections import deque 45from dataclasses import dataclass, field 46from typing import Any 47 48logger = logging.getLogger(__name__) 49 50 51GateAction = str # "execute" | "queue" | "log" | "reject" 52 53 54@dataclass(frozen=True) 55class GateDecision: 56 action: GateAction 57 size_fraction: float 58 reason: str 59 final_mode: str # echo of effective mode after gate ("auto"|"semi"|"manual") 60 61 62@dataclass 63class RiskPolicy: 64 """Tunable policy. All values default to permissive sane numbers.""" 65 66 per_ticker_size_cap: float = 0.10 67 total_open_exposure_cap: float = 0.40 68 min_confidence: float = 0.40 69 ticker_whitelist: list[str] = field(default_factory=list) 70 mode_floor: str = "auto" # most permissive 71 kill_switch_after_rejects: int = 5 72 kill_switch_window: int = 20 # last-N rejects considered 73 74 def is_whitelisted(self, ticker: str) -> bool: 75 if not self.ticker_whitelist: 76 return True 77 return ticker.upper() in {t.upper() for t in self.ticker_whitelist} 78 79 80class RiskGate: 81 """Apply a :class:`RiskPolicy` to incoming actions. 82 83 Maintains two pieces of state: 84 * ``open_exposure`` — sum of size_fraction across currently-open 85 positions, indexed by ticker. ``register_fill`` / ``register_exit`` 86 update this. The gate uses it to enforce the total-exposure cap. 87 * recent reject history for the kill-switch. 88 89 Designed for one process. If you need cross-process state, persist 90 ``open_exposure`` externally and reconstruct on startup. 91 """ 92 93 def __init__(self, policy: RiskPolicy | None = None) -> None: 94 self.policy = policy or RiskPolicy() 95 self.open_exposure: dict[str, float] = {} 96 self._recent_rejects: deque[bool] = deque(maxlen=self.policy.kill_switch_window) 97 98 @property 99 def total_exposure(self) -> float: 100 return sum(self.open_exposure.values()) 101 102 def is_killed(self) -> bool: 103 if not self._recent_rejects: 104 return False 105 rejects = sum(1 for r in self._recent_rejects if r) 106 return rejects >= self.policy.kill_switch_after_rejects 107 108 def register_fill(self, ticker: str, size_fraction: float) -> None: 109 """Engine reports a fill; gate updates exposure.""" 110 if size_fraction <= 0: 111 return 112 cur = self.open_exposure.get(ticker, 0.0) 113 self.open_exposure[ticker] = cur + size_fraction 114 115 def register_exit(self, ticker: str, size_fraction: float | None = None) -> None: 116 """Engine reports an exit; gate releases exposure.""" 117 if ticker not in self.open_exposure: 118 return 119 if size_fraction is None: 120 self.open_exposure.pop(ticker, None) 121 return 122 new = self.open_exposure[ticker] - size_fraction 123 if new <= 1e-6: 124 self.open_exposure.pop(ticker, None) 125 else: 126 self.open_exposure[ticker] = new 127 128 def evaluate(self, action: dict[str, Any]) -> GateDecision: 129 """Apply the policy. Pure function over (policy, gate state, action).""" 130 target = action.get("target") or {} 131 ticker = str(target.get("ticker") or "").upper() 132 sizing = action.get("sizing") or {} 133 confidence = float(sizing.get("confidence") or 0.0) 134 requested = float(sizing.get("size_fraction") or 0.0) 135 mode = str(action.get("mode") or "manual").lower() 136 verdict = str(action.get("verdict") or "HOLD").upper() 137 138 # Mode floor: never let through anything stronger than the configured 139 # floor. ``auto`` means "no demotion". ``semi`` means "auto becomes 140 # semi". ``manual`` means "everything is at most a log entry". 141 if self.policy.mode_floor == "semi" and mode == "auto": 142 mode = "semi" 143 elif self.policy.mode_floor == "manual" and mode in ("auto", "semi"): 144 mode = "manual" 145 146 # Kill switch overrides everything. 147 if self.is_killed(): 148 return self._record_reject(GateDecision( 149 action="log", 150 size_fraction=0.0, 151 reason=( 152 f"kill_switch ({self.policy.kill_switch_after_rejects} rejects " 153 f"in last {len(self._recent_rejects)})" 154 ), 155 final_mode="manual", 156 )) 157 158 # HOLD verdicts: log only — no order to place. 159 if verdict == "HOLD": 160 return self._record_ok(GateDecision( 161 action="log", 162 size_fraction=0.0, 163 reason="HOLD verdict — no order", 164 final_mode=mode, 165 )) 166 167 # Confidence floor. 168 if confidence < self.policy.min_confidence: 169 return self._record_reject(GateDecision( 170 action="reject", 171 size_fraction=0.0, 172 reason=( 173 f"confidence {confidence:.2f} < min {self.policy.min_confidence:.2f}" 174 ), 175 final_mode=mode, 176 )) 177 178 # Ticker whitelist (only enforced for auto execution). 179 if mode == "auto" and not self.policy.is_whitelisted(ticker): 180 mode = "semi" # demote — let human take a look 181 182 # Per-ticker size cap. 183 sized = min(requested, self.policy.per_ticker_size_cap) 184 185 # Total exposure cap — reserve only as much headroom as we have. 186 already_open = self.open_exposure.get(ticker, 0.0) 187 headroom = max( 188 0.0, 189 self.policy.total_open_exposure_cap 190 - (self.total_exposure - already_open), 191 ) 192 sized = min(sized, headroom) 193 194 if sized <= 0.0: 195 return self._record_reject(GateDecision( 196 action="reject", 197 size_fraction=0.0, 198 reason=( 199 f"no headroom (total_open={self.total_exposure:.2f}, " 200 f"cap={self.policy.total_open_exposure_cap:.2f})" 201 ), 202 final_mode=mode, 203 )) 204 205 gate_action = { 206 "auto": "execute", 207 "semi": "queue", 208 "manual": "log", 209 }.get(mode, "log") 210 211 return self._record_ok(GateDecision( 212 action=gate_action, 213 size_fraction=round(sized, 4), 214 reason=( 215 "ok" 216 if sized >= requested else 217 f"size reduced from {requested:.3f} to {sized:.3f} by caps" 218 ), 219 final_mode=mode, 220 )) 221 222 def _record_ok(self, d: GateDecision) -> GateDecision: 223 self._recent_rejects.append(False) 224 return d 225 226 def _record_reject(self, d: GateDecision) -> GateDecision: 227 self._recent_rejects.append(True) 228 return d 229 230 231__all__ = [ 232 "GateAction", 233 "GateDecision", 234 "RiskGate", 235 "RiskPolicy", 236]