checking system…
Docs / back / src/maf/llm/openrouter_rankings.py · line 532
Python · 634 lines
  1"""OpenRouter rankings scraper + Ollama Cloud cross-reference.
  2
  3The user asked to feed `openrouter.ai/rankings` data into the smart picker so
  4each task profile (e.g. ``programming``, ``finance``) picks the best-ranked
  5Ollama Cloud model that exists in OpenRouter's category leaderboard.
  6
  7Wire shape
  8----------
  9OpenRouter's ranking pages are Next.js SSR. The leaderboard data is embedded
 10in the streamed RSC payload as ``"rankingData":[{model_permaslug, ...}, ...]``.
 11We pull the page HTML, extract that payload, aggregate token counts across
 12the dated entries, and produce a sorted list of ``(permaslug, total_tokens)``.
 13
 14We then map each OpenRouter permaslug (e.g. ``deepseek/deepseek-v3.2``) to
 15the corresponding Ollama Cloud model id (e.g. ``deepseek-v3.2``) by family
 16+ size + version matching. The Ollama Cloud catalog comes from
 17``GET https://ollama.com/v1/models``.
 18
 19Public API
 20----------
 21:func:`fetch_category_ranking(category)` — async, returns
 22``list[RankedModel]`` (one entry per model), sorted descending by tokens.
 23``RankedModel.ollama_id`` is non-empty when an Ollama match was found.
 24
 25:class:`RankingsCache` — TTL-backed cache so repeated picker calls don't hit
 26the network. Default TTL: 12h (rankings change slowly).
 27
 28:func:`best_ollama_model_for(category, ranked)` — given a ranking and the
 29Ollama catalog, return the highest-ranked permaslug whose Ollama match is
 30available.
 31
 32Failure modes
 33-------------
 34*Network errors / parse failures* return an empty list — the picker falls
 35back to its static PROFILE_MAP. We don't want this feature to take the
 36arena offline.
 37"""
 38
 39from __future__ import annotations
 40
 41import asyncio
 42import json
 43import logging
 44import re
 45import time
 46from dataclasses import dataclass, field
 47from typing import Any
 48
 49import httpx
 50
 51logger = logging.getLogger(__name__)
 52
 53
 54# Categories that OpenRouter's rankings page exposes. Verified live; safe to
 55# extend if OpenRouter ships new ones.
 56KNOWN_CATEGORIES: tuple[str, ...] = (
 57    "programming",
 58    "roleplay",
 59    "marketing",
 60    "technology",
 61    "science",
 62    "translation",
 63    "finance",
 64    "trivia",
 65    "academia",
 66    "legal",
 67    "health",
 68    "education",
 69    "writing",
 70)
 71
 72
 73# Map MAF task profiles → OpenRouter category. New profiles can fall back to
 74# DEFAULT_CATEGORY when a one-to-one match doesn't exist.
 75PROFILE_TO_CATEGORY: dict[str, str] = {
 76    "narrative":      "writing",
 77    "synthesis":      "finance",
 78    "judge":          "finance",
 79    "debate":         "roleplay",
 80    "json_strict":    "programming",
 81    "classification": "trivia",
 82    "quick":          "trivia",
 83    "signal":         "finance",
 84    "long_context":   "programming",
 85    "coding":         "programming",
 86    "research":       "science",
 87    "trillion":       "technology",
 88}
 89DEFAULT_CATEGORY = "technology"
 90
 91
 92_RANKINGS_BASE = "https://openrouter.ai/rankings"
 93_OLLAMA_MODELS_URL = "https://ollama.com/v1/models"
 94
 95_HTTP_TIMEOUT_S = 15.0
 96
 97
 98# ---------------------------------------------------------------------------
 99# Data classes
100# ---------------------------------------------------------------------------
101
102
103@dataclass(frozen=True)
104class RankedModel:
105    """One row of the per-category leaderboard, with Ollama matching."""
106
107    rank: int
108    permaslug: str           # e.g. "deepseek/deepseek-v3.2-20251201"
109    author: str              # e.g. "deepseek"
110    short_id: str            # e.g. "deepseek-v3.2"  (last path segment, version-stripped)
111    total_tokens: int
112    ollama_id: str = ""       # e.g. "deepseek-v3.2" (empty if no match)
113    ollama_match_quality: str = ""  # "exact" | "prefix" | "family" | ""
114    # Quality benchmark scores (Artificial Analysis index). Missing for
115    # models that aren't on the AA benchmark — fall back to 0.0.
116    score_intelligence: float = 0.0
117    score_coding: float = 0.0
118    score_agentic: float = 0.0
119
120
121@dataclass(frozen=True)
122class BenchmarkScore:
123    """One row of the Artificial Analysis benchmark leaderboard."""
124
125    permaslug: str
126    aa_name: str
127    intelligence: float
128    coding: float
129    agentic: float
130
131
132@dataclass
133class _CacheEntry:
134    ranking: list[RankedModel]
135    fetched_at: float
136
137
138# ---------------------------------------------------------------------------
139# Scraper
140# ---------------------------------------------------------------------------
141
142
143_PUSH_RE = re.compile(r'self\.__next_f\.push\(\[1,"((?:\\.|[^"\\])*)"\]\)')
144_RANKING_DATA_RE = re.compile(r'"rankingData":\s*(\[.+?\])\s*[,\}]')
145_BENCHMARK_KEYS = ("intelligence", "coding", "agentic")
146
147
148def _extract_balanced_array(s: str, start: int) -> tuple[int, str]:
149    """Return (end_index, slice) for the JSON array starting at ``s[start]``.
150
151    ``start`` must point at ``[``. Returns ``(start, "")`` if balancing fails.
152    """
153    if start >= len(s) or s[start] != "[":
154        return start, ""
155    depth = 0
156    in_str = False
157    esc = False
158    for j in range(start, len(s)):
159        c = s[j]
160        if esc:
161            esc = False
162            continue
163        if c == "\\":
164            esc = True
165            continue
166        if c == '"':
167            in_str = not in_str
168            continue
169        if in_str:
170            continue
171        if c == "[":
172            depth += 1
173        elif c == "]":
174            depth -= 1
175            if depth == 0:
176                return j + 1, s[start:j + 1]
177    return start, ""
178
179
180def parse_benchmark_scores(html: str) -> list[BenchmarkScore]:
181    """Pull the AA benchmark scores embedded on the rankings pages.
182
183    OpenRouter embeds three benchmark dimensions (``intelligence``, ``coding``,
184    ``agentic``) per model. We zip them by ``permaslug`` so each model gets
185    one merged :class:`BenchmarkScore`.
186    """
187    pushes = _PUSH_RE.findall(html)
188    per_dim: dict[str, dict[str, Any]] = {}
189    for p in pushes:
190        # Early filter — every push that contains the benchmark dimension
191        # keys has them in the (escaped) raw form ``\"intelligence\"``.
192        # Falling back to a bare-key check covers both encodings.
193        if not any(k in p for k in _BENCHMARK_KEYS):
194            continue
195        try:
196            u = p.encode().decode("unicode_escape")
197        except UnicodeDecodeError:
198            continue
199        for key in _BENCHMARK_KEYS:
200            m = re.search(rf'"{key}":\s*\[', u)
201            if not m:
202                continue
203            _end, slice_ = _extract_balanced_array(u, m.end() - 1)
204            if not slice_:
205                continue
206            try:
207                arr = json.loads(slice_)
208            except json.JSONDecodeError:
209                continue
210            if not isinstance(arr, list):
211                continue
212            per_dim[key] = {
213                r.get("permaslug", ""): r
214                for r in arr if isinstance(r, dict) and r.get("permaslug")
215            }
216    if not per_dim:
217        return []
218
219    all_slugs: set[str] = set()
220    for d in per_dim.values():
221        all_slugs.update(d.keys())
222
223    out: list[BenchmarkScore] = []
224    for slug in all_slugs:
225        intel = per_dim.get("intelligence", {}).get(slug, {})
226        codin = per_dim.get("coding", {}).get(slug, {})
227        agent = per_dim.get("agentic", {}).get(slug, {})
228        name = intel.get("aa_name") or codin.get("aa_name") or agent.get("aa_name") or ""
229        out.append(BenchmarkScore(
230            permaslug=slug,
231            aa_name=str(name),
232            intelligence=float(intel.get("score") or 0.0),
233            coding=float(codin.get("score") or 0.0),
234            agentic=float(agent.get("score") or 0.0),
235        ))
236    return out
237
238
239def parse_ranking_html(html: str) -> list[RankedModel]:
240    """Aggregate per-day rows in the page into a sorted leaderboard.
241
242    Tolerant to: variant suffixes (``:free``), missing tokens, escape-encoded
243    JSON in the Next.js push. Returns ``[]`` when no rankingData found —
244    upstream falls back to PROFILE_MAP.
245    """
246    pushes = _PUSH_RE.findall(html)
247    rows: list[dict[str, Any]] = []
248    for p in pushes:
249        if "rankingData" not in p:
250            continue
251        try:
252            unescaped = p.encode().decode("unicode_escape")
253        except UnicodeDecodeError:
254            continue
255        m = _RANKING_DATA_RE.search(unescaped)
256        if not m:
257            continue
258        try:
259            arr = json.loads(m.group(1))
260        except json.JSONDecodeError:
261            continue
262        if isinstance(arr, list):
263            rows.extend(r for r in arr if isinstance(r, dict))
264
265    if not rows:
266        return []
267
268    agg: dict[str, int] = {}
269    for r in rows:
270        slug = r.get("model_permaslug") or r.get("variant_permaslug") or ""
271        if not slug:
272            continue
273        # Strip ``:free`` / ``:beta`` variant suffix so different variants of
274        # the same base model aggregate together.
275        slug = slug.split(":", 1)[0]
276        prompt = int(r.get("total_prompt_tokens") or 0)
277        completion = int(r.get("total_completion_tokens") or 0)
278        agg[slug] = agg.get(slug, 0) + prompt + completion
279
280    out: list[RankedModel] = []
281    for i, (slug, tokens) in enumerate(
282        sorted(agg.items(), key=lambda kv: -kv[1]), start=1,
283    ):
284        author, short_id = _split_permaslug(slug)
285        out.append(RankedModel(
286            rank=i,
287            permaslug=slug,
288            author=author,
289            short_id=short_id,
290            total_tokens=tokens,
291        ))
292    return out
293
294
295def _split_permaslug(slug: str) -> tuple[str, str]:
296    """``deepseek/deepseek-v3.2-20251201`` → ("deepseek", "deepseek-v3.2")."""
297    if "/" in slug:
298        author, rest = slug.split("/", 1)
299    else:
300        author, rest = "", slug
301    # Trim trailing ``-YYYYMMDD`` date stamp if present.
302    rest = re.sub(r"-?\d{8}$", "", rest)
303    # Trim trailing model-size-suffix dates like -20260512 already covered above.
304    return author, rest
305
306
307async def _fetch_html(url: str, client: httpx.AsyncClient) -> str | None:
308    try:
309        resp = await client.get(url, headers={"User-Agent": "Mozilla/5.0 (MAF)"})
310        resp.raise_for_status()
311        return resp.text
312    except Exception as exc:
313        logger.warning("openrouter_rankings: fetch %s failed: %s", url, exc)
314        return None
315
316
317async def fetch_category_ranking(
318    category: str, *, client: httpx.AsyncClient | None = None,
319) -> tuple[list[RankedModel], list[BenchmarkScore]]:
320    """Return ``(usage_ranking, benchmark_scores)`` for ``category``.
321
322    Both lists come from the same page fetch — OpenRouter embeds the global
323    usage table plus the AA benchmark scores side-by-side. Does *not* attach
324    Ollama matches — that's the caller's job (one Ollama catalog fetch can
325    serve many categories).
326    """
327    if category not in KNOWN_CATEGORIES:
328        logger.debug("openrouter_rankings: unknown category %r", category)
329        return [], []
330    url = f"{_RANKINGS_BASE}/{category}"
331    own_client = client is None
332    if own_client:
333        client = httpx.AsyncClient(timeout=_HTTP_TIMEOUT_S, follow_redirects=True)
334    try:
335        html = await _fetch_html(url, client)  # type: ignore[arg-type]
336    finally:
337        if own_client:
338            await client.aclose()  # type: ignore[union-attr]
339    if not html:
340        return [], []
341    return parse_ranking_html(html), parse_benchmark_scores(html)
342
343
344# ---------------------------------------------------------------------------
345# Ollama catalog + matching
346# ---------------------------------------------------------------------------
347
348
349async def fetch_ollama_catalog(api_key: str, *, client: httpx.AsyncClient | None = None) -> list[str]:
350    """Return the list of model ids the Ollama Cloud account can call."""
351    if not api_key:
352        return []
353    own_client = client is None
354    if own_client:
355        client = httpx.AsyncClient(timeout=_HTTP_TIMEOUT_S)
356    try:
357        try:
358            resp = await client.get(  # type: ignore[union-attr]
359                _OLLAMA_MODELS_URL,
360                headers={"Authorization": f"Bearer {api_key}"},
361            )
362            resp.raise_for_status()
363            data = resp.json()
364        except Exception as exc:
365            logger.warning("ollama catalog fetch failed: %s", exc)
366            return []
367    finally:
368        if own_client:
369            await client.aclose()  # type: ignore[union-attr]
370    return [
371        str(m.get("id"))
372        for m in data.get("data", [])
373        if m.get("id")
374    ]
375
376
377_PUNCT_RE = re.compile(r"[-:_\.]+")
378
379
380def _normalize(s: str) -> str:
381    """Strip punctuation + lowercase for loose comparison.
382
383    ``gpt-oss:120b`` and ``gpt-oss-120b`` both normalize to ``gptoss120b``.
384    """
385    return _PUNCT_RE.sub("", s.lower()).strip()
386
387
388def match_to_ollama(
389    permaslug: str, *, catalog: list[str],
390) -> tuple[str, str]:
391    """Best-effort match: (ollama_id, quality).
392
393    Quality scale (decreasing):
394      ``exact``  — short id matches a catalog id exactly (after punctuation strip)
395      ``prefix`` — short id is a prefix of a catalog id (handles size variants)
396      ``family`` — author/family name matches catalog id loosely
397      ``""``    — no match
398    """
399    if not catalog:
400        return "", ""
401    author, short_id = _split_permaslug(permaslug)
402    short_id_l = short_id.lower()
403    short_norm = _normalize(short_id)
404    catalog_norm = [(c, _normalize(c)) for c in catalog]
405
406    # Exact (punctuation-insensitive) match on short id.
407    for orig, norm in catalog_norm:
408        if norm == short_norm:
409            return orig, "exact"
410
411    # Prefix match either direction — handles "gpt-oss" vs "gpt-oss:20b"
412    # and the inverse "gpt-oss-120b" vs "gpt-oss:120b".
413    base = _normalize(re.sub(r"-?\d+(\.\d+)*$", "", short_id_l))
414    for orig, norm in catalog_norm:
415        if not norm or not short_norm:
416            continue
417        if norm.startswith(short_norm) or short_norm.startswith(norm):
418            return orig, "prefix"
419        if base and (norm.startswith(base) or base.startswith(norm)):
420            return orig, "prefix"
421
422    # Family-style match: short id starts with the author tag (eg "deepseek-v3.2"
423    # → author "deepseek") and the catalog id starts with the same tag.
424    author_l = (author or short_id_l.split("-", 1)[0]).lower()
425    if author_l and len(author_l) >= 4:
426        for orig, norm in catalog_norm:
427            if norm.startswith(_normalize(author_l)):
428                return orig, "family"
429
430    return "", ""
431
432
433def attach_ollama_matches(
434    ranked: list[RankedModel],
435    catalog: list[str],
436    *,
437    benchmarks: list[BenchmarkScore] | None = None,
438) -> list[RankedModel]:
439    """Return a copy of ``ranked`` with ``ollama_id`` / scores attached."""
440    bench_by_short_id: dict[str, BenchmarkScore] = {}
441    if benchmarks:
442        for b in benchmarks:
443            _, short = _split_permaslug(b.permaslug)
444            bench_by_short_id.setdefault(short.lower(), b)
445
446    out: list[RankedModel] = []
447    for r in ranked:
448        oid, q = match_to_ollama(r.permaslug, catalog=catalog)
449        b = bench_by_short_id.get(r.short_id.lower())
450        out.append(RankedModel(
451            rank=r.rank,
452            permaslug=r.permaslug,
453            author=r.author,
454            short_id=r.short_id,
455            total_tokens=r.total_tokens,
456            ollama_id=oid,
457            ollama_match_quality=q,
458            score_intelligence=b.intelligence if b else 0.0,
459            score_coding=b.coding if b else 0.0,
460            score_agentic=b.agentic if b else 0.0,
461        ))
462    return out
463
464
465def best_ollama_model_for(ranked: list[RankedModel]) -> str | None:
466    """Top-ranked-by-usage Ollama model. Returns the ``ollama_id`` or None."""
467    for r in ranked:
468        if r.ollama_id:
469            return r.ollama_id
470    return None
471
472
473# Map task profile → benchmark dimension. The picker uses these scores to
474# break ties or override popularity-based ranking. Profiles that aren't
475# benchmark-bound fall back to ``best_ollama_model_for`` (popularity).
476PROFILE_TO_BENCHMARK: dict[str, str] = {
477    "coding":         "coding",
478    "long_context":   "coding",
479    "json_strict":    "intelligence",
480    "synthesis":      "intelligence",
481    "judge":          "intelligence",
482    "research":       "intelligence",
483    "debate":         "agentic",
484    # quick / narrative / signal / classification → popularity-only
485}
486
487
488def best_ollama_by_score(
489    ranked: list[RankedModel], dimension: str,
490) -> str | None:
491    """Return the Ollama-available model with the highest ``dimension`` score.
492
493    ``dimension`` ∈ {"intelligence", "coding", "agentic"}. Returns None when
494    no Ollama-matched model has a non-zero score in that dimension — caller
495    falls back to popularity ranking.
496    """
497    if dimension not in ("intelligence", "coding", "agentic"):
498        return None
499    matched = [r for r in ranked if r.ollama_id]
500    if not matched:
501        return None
502    matched.sort(key=lambda r: -getattr(r, f"score_{dimension}", 0.0))
503    top = matched[0]
504    if getattr(top, f"score_{dimension}", 0.0) <= 0.0:
505        return None
506    return top.ollama_id
507
508
509def best_ollama_for_profile(
510    ranked: list[RankedModel], profile: str,
511) -> tuple[str | None, str]:
512    """Return ``(model_id, dimension)`` where ``dimension`` records *why* it was picked.
513
514    dimension is ``"intelligence" / "coding" / "agentic"`` when benchmark
515    scores were the deciding factor, ``"popularity"`` when the picker fell
516    back to usage ranking, or ``""`` when nothing matched.
517    """
518    dim = PROFILE_TO_BENCHMARK.get(profile)
519    if dim:
520        m = best_ollama_by_score(ranked, dim)
521        if m:
522            return m, dim
523    m = best_ollama_model_for(ranked)
524    return m, ("popularity" if m else "")
525
526
527# ---------------------------------------------------------------------------
528# Cache
529# ---------------------------------------------------------------------------
530
531
532class RankingsCache:
533    """TTL cache for per-category rankings (matched against Ollama).
534
535    Keyed by category. Designed for one process. Concurrent fetches for the
536    same key coalesce so a burst of picker calls only triggers one HTTP fetch.
537    """
538
539    def __init__(self, ttl_seconds: float = 12 * 3600) -> None:
540        self.ttl = ttl_seconds
541        self._entries: dict[str, _CacheEntry] = {}
542        self._inflight: dict[str, asyncio.Future[list[RankedModel]]] = {}
543        self._catalog: list[str] = []
544        self._catalog_fetched_at: float = 0.0
545        self._catalog_lock = asyncio.Lock()
546
547    def is_fresh(self, category: str) -> bool:
548        entry = self._entries.get(category)
549        return bool(entry and (time.monotonic() - entry.fetched_at) < self.ttl)
550
551    def peek(self, category: str) -> list[RankedModel] | None:
552        entry = self._entries.get(category)
553        if entry is None:
554            return None
555        return list(entry.ranking)
556
557    async def _refresh_catalog(self, api_key: str) -> None:
558        async with self._catalog_lock:
559            if self._catalog and (time.monotonic() - self._catalog_fetched_at) < self.ttl:
560                return
561            self._catalog = await fetch_ollama_catalog(api_key)
562            self._catalog_fetched_at = time.monotonic()
563
564    async def get(
565        self,
566        category: str,
567        *,
568        ollama_api_key: str = "",
569        force: bool = False,
570    ) -> list[RankedModel]:
571        """Return the cached / freshly-fetched leaderboard for ``category``."""
572        if not force and self.is_fresh(category):
573            return list(self._entries[category].ranking)
574
575        # Coalesce concurrent fetches.
576        in_flight = self._inflight.get(category)
577        if in_flight is not None and not in_flight.done():
578            return await in_flight
579
580        fut: asyncio.Future[list[RankedModel]] = asyncio.get_event_loop().create_future()
581        self._inflight[category] = fut
582
583        try:
584            ranking, benchmarks = await fetch_category_ranking(category)
585            if ollama_api_key:
586                await self._refresh_catalog(ollama_api_key)
587            ranking_with_match = attach_ollama_matches(
588                ranking, self._catalog, benchmarks=benchmarks,
589            )
590            self._entries[category] = _CacheEntry(
591                ranking=ranking_with_match,
592                fetched_at=time.monotonic(),
593            )
594            fut.set_result(ranking_with_match)
595            return ranking_with_match
596        except Exception as exc:
597            logger.warning("RankingsCache: %s fetch failed: %s", category, exc)
598            fut.set_result([])
599            return []
600        finally:
601            self._inflight.pop(category, None)
602
603
604# ---------------------------------------------------------------------------
605# Process-global accessor
606# ---------------------------------------------------------------------------
607
608
609_GLOBAL_CACHE: RankingsCache | None = None
610
611
612def get_cache() -> RankingsCache:
613    """Lazy-singleton accessor used by the picker and dashboard."""
614    global _GLOBAL_CACHE
615    if _GLOBAL_CACHE is None:
616        _GLOBAL_CACHE = RankingsCache()
617    return _GLOBAL_CACHE
618
619
620__all__ = [
621    "DEFAULT_CATEGORY",
622    "KNOWN_CATEGORIES",
623    "PROFILE_TO_CATEGORY",
624    "RankedModel",
625    "RankingsCache",
626    "attach_ollama_matches",
627    "best_ollama_model_for",
628    "fetch_category_ranking",
629    "fetch_ollama_catalog",
630    "get_cache",
631    "match_to_ollama",
632    "parse_ranking_html",
633]