87 lines
3.9 KiB
Python
87 lines
3.9 KiB
Python
"""Pre-registered confusion matrix on the §7.1 derivatives (DESIGN_v2 §1.3).
|
||
|
||
Measures PRECISION and RECALL, not recall alone. Uses the engine's already-stored candidate_scores
|
||
(cleared_date + whisper_date) × the pre-registered external repricing (resolution.K2023.yaml). Reports
|
||
the matrix at BOTH the cleared level (what the engine fired) and the whisper level (what it saw before
|
||
the independence floor) — the delta is the empirical answer to the gate debate.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
from datetime import datetime
|
||
|
||
import yaml
|
||
|
||
from .external import basket_index, fetch_eod, resolve_reprice, runway_at_signal
|
||
|
||
|
||
def _engine_dates(conn) -> dict[str, dict]:
|
||
"""For each under_acted node: earliest cleared as_of and earliest whisper as_of (n_conf>=4, a>0)."""
|
||
rows = conn.execute(
|
||
"SELECT node_id, conviction_id, as_of, cleared_evidence_bar ev, inputs_json "
|
||
"FROM candidate_scores WHERE scorer='under_acted'"
|
||
).fetchall()
|
||
out: dict[str, dict] = {}
|
||
for r in rows:
|
||
k = r["node_id"] or r["conviction_id"]
|
||
i = json.loads(r["inputs_json"])
|
||
d = out.setdefault(k, {"cleared": None, "whisper": None})
|
||
if r["ev"] and (d["cleared"] is None or r["as_of"] < d["cleared"]):
|
||
d["cleared"] = r["as_of"]
|
||
if i.get("n_confirmed", 0) >= 4 and i.get("a_corrob", 0) > 0:
|
||
if d["whisper"] is None or r["as_of"] < d["whisper"]:
|
||
d["whisper"] = r["as_of"]
|
||
return out
|
||
|
||
|
||
def _lead_days(repricing_date: str, signal_date: str | None) -> int | None:
|
||
if not signal_date or not repricing_date:
|
||
return None
|
||
return (datetime.strptime(repricing_date, "%Y-%m-%d") - datetime.strptime(signal_date, "%Y-%m-%d")).days
|
||
|
||
|
||
def run_confusion(conn, cfg, spec_path: str) -> dict:
|
||
spec = yaml.safe_load(open(spec_path))
|
||
w, rule = spec["window"], spec["rule"]
|
||
engine = _engine_dates(conn)
|
||
price_cache: dict[str, list] = {}
|
||
|
||
rows = []
|
||
for node, basket in spec["baskets"].items():
|
||
prices = {}
|
||
for sym in basket:
|
||
if sym not in price_cache:
|
||
price_cache[sym] = fetch_eod(cfg.fmp_api_key, sym, w["start"], w["end"])
|
||
prices[sym] = price_cache[sym]
|
||
missing = [s for s in basket if not prices[s]]
|
||
idx = basket_index(prices)
|
||
res = resolve_reprice(idx, threshold_pct=rule["threshold_pct"], hold_pct=rule["hold_pct"],
|
||
hold_days=rule["hold_days"])
|
||
ed = engine.get(node, {"cleared": None, "whisper": None})
|
||
rows.append({
|
||
"node": node, "basket": basket, "missing": missing,
|
||
"confirmed": res["confirmed"], "repricing_date": res["repricing_date"], "peak_pct": res["peak_pct"],
|
||
"cleared_date": ed["cleared"], "whisper_date": ed["whisper"],
|
||
"lead_cleared": _lead_days(res["repricing_date"], ed["cleared"]) if res["confirmed"] else None,
|
||
"lead_whisper": _lead_days(res["repricing_date"], ed["whisper"]) if res["confirmed"] else None,
|
||
# DESIGN_v2.1 Correction A: runway = fraction of the durable move still ahead at signal
|
||
"runway_cleared": runway_at_signal(idx, ed["cleared"]) if res["confirmed"] else None,
|
||
"runway_whisper": runway_at_signal(idx, ed["whisper"]) if res["confirmed"] else None,
|
||
})
|
||
|
||
def classify(r, level):
|
||
fired = bool(r[f"{level}_date"])
|
||
real = r["confirmed"]
|
||
return "TP" if (fired and real) else "FP" if (fired and not real) else "FN" if real else "TN"
|
||
|
||
def matrix(level):
|
||
c = {"TP": 0, "FP": 0, "FN": 0, "TN": 0}
|
||
for r in rows:
|
||
c[classify(r, level)] += 1
|
||
p = c["TP"] / (c["TP"] + c["FP"]) if (c["TP"] + c["FP"]) else None
|
||
rec = c["TP"] / (c["TP"] + c["FN"]) if (c["TP"] + c["FN"]) else None
|
||
return c, p, rec
|
||
|
||
return {"rows": rows, "cleared": matrix("cleared"), "whisper": matrix("whisper"),
|
||
"classify": classify}
|