Files

82 lines
3.8 KiB
Python

"""Scoring orchestrator. For Job B / the §7.1 backtest: march as_of dates, score every conviction +
fan-out derivative, gate, log the denominator, promote nodes.
"""
from __future__ import annotations
import logging
from ..extract.backends import from_config as backend_from_config
from . import bar, under_acted
from .asof import Scorer
from .ledger_writer import log_candidate, record_candidate_score
log = logging.getLogger(__name__)
def _nodes_for(conn, as_of, mode, conviction_ids):
nodes = []
where, params = "", []
if conviction_ids:
ph = ",".join("?" * len(conviction_ids))
where = f" WHERE conviction_id IN ({ph})"
params = list(conviction_ids)
for c in conn.execute(
f"SELECT conviction_id, thematic_proposition, conviction_level, current_exposure, is_thesis_breaker "
f"FROM conviction_log{where}", params,
):
nodes.append({"conviction_id": c[0], "node_id": None, "derivative": c[1],
"level": c[2], "exposure": c[3], "breaker": bool(c[4])})
fq = ("SELECT f.node_id, f.parent_conviction_id, f.derivative_proposition, c.conviction_level, "
"c.current_exposure, c.is_thesis_breaker FROM fanout_nodes f "
"JOIN conviction_log c ON c.conviction_id = f.parent_conviction_id")
conds, fparams = [], []
if conviction_ids:
conds.append(f"f.parent_conviction_id IN ({','.join('?' * len(conviction_ids))})")
fparams += list(conviction_ids)
if mode == "forward": # backtest uses the seeded tree as the as-of-2023 hypothesis (no created_at leak)
conds.append("f.created_at <= ?")
fparams.append(as_of)
if conds:
fq += " WHERE " + " AND ".join(conds)
for f in conn.execute(fq, fparams):
nodes.append({"conviction_id": f[1], "node_id": f[0], "derivative": f[2],
"level": f[3], "exposure": f[4], "breaker": bool(f[5])})
return nodes
def run_under_acted(conn, sc, cfg, *, as_of, mode="backtest", conviction_ids=None, window_days=28) -> list[dict]:
backend = backend_from_config(cfg, sc)
out = []
with Scorer(conn, as_of, mode=mode):
for nd in _nodes_for(conn, as_of, mode, conviction_ids):
r = under_acted.score_node(
conn, sc, backend, as_of=as_of, derivative=nd["derivative"],
conviction_id=nd["conviction_id"], node_id=nd["node_id"],
conviction_level=nd["level"], exposure=nd["exposure"], is_breaker=nd["breaker"],
window_days=window_days,
)
ev, pr = bar.evaluate("under_acted", r, conn=conn)
record_candidate_score(conn, r, as_of, ev, pr)
if ev:
log_candidate(conn, scorer="under_acted", as_of=as_of,
ledger_type="under_acted_conviction", proposition=nd["derivative"],
discourse_metric=r["inputs"], origin_conviction_id=nd["conviction_id"],
origin_node_id=nd["node_id"])
if nd["node_id"]:
conn.execute("UPDATE fanout_nodes SET status=? WHERE node_id=?",
("signal" if pr else "corroborated", nd["node_id"]))
conn.commit()
out.append({"node": nd, "result": r, "evidence": ev, "promotion": pr})
return out
def run_backtest(conn, sc, cfg, *, conviction_id, dates, window_days=90) -> list[tuple]:
timeline = []
for as_of in dates:
res = run_under_acted(conn, sc, cfg, as_of=as_of, mode="backtest",
conviction_ids=[conviction_id], window_days=window_days)
timeline.append((as_of, res))
fired = [r for r in res if r["evidence"]]
log.info("as_of %s: %d/%d nodes cleared evidence bar", as_of, len(fired), len(res))
return timeline