Files

106 lines
5.5 KiB
Python

"""Two-sided net-corroboration (DESIGN_v2.1 H5 + condition 3) — the instrument for the adversarial cases.
For a derivative, track the INDEPENDENCE-WEIGHTED affirms MINUS denies over time. This is the right
output for Strike/Battery (where the question is "did the engine distinguish real adoption from
narrative, and catch the contradiction?"), not runway:
- STRIKE (reflexivity): a PASS = net stays low/quiet in LIVE mode (own_network dropped) while it
would have fired in TEST mode (own_network kept) → the engine refuses the intra-cluster echo.
- BATTERY (timing): the DEMAND derivative's net rises while the SUPPLY derivative's net stays flat →
"half-confirmed, the load-bearing half isn't moving" = the eroding-conviction signal.
Reuses the §4.6 relevance helper, which already returns direction affirms|contradicts|tangential.
"""
from __future__ import annotations
from .independence import eisc_for
from .llm_helpers import derivative_relevance
from .windows import window_bounds
def classify_corpus(sc, backend, derivative: str, as_of: str, *, top_k: int = 60) -> list[dict]:
"""Retrieve (as-of filtered) + LLM-classify each claim's direction toward the derivative.
Returns affirms/contradicts claims with source_id + date (tangential dropped)."""
res = sc.search(derivative, collection="propositions", top_k=top_k, rerank=True)
hits = res.get("data", []) if isinstance(res, dict) else []
cand = []
for h in hits:
pl = (h.get("payload") or {})
d = pl.get("date")
if not pl.get("claim_id") or not d or d[:10] > as_of:
continue
cand.append({"claim_id": pl["claim_id"], "proposition": pl.get("proposition", ""),
"date": d[:10], "source_id": pl.get("source_id")})
if not cand:
return []
rel = derivative_relevance(backend, derivative,
[{"claim_id": c["claim_id"], "proposition": c["proposition"]} for c in cand])
out = []
for c in cand:
direction = rel.get(c["claim_id"], {}).get("direction", "tangential")
if direction in ("affirms", "contradicts"):
out.append({**c, "direction": direction})
return out
# DESIGN_v2 ADOPT #1 (claim-type weighting): a node "resolves" on REALIZED, descriptive disclosure —
# not on forecasts/intent. A source counts toward the net only if it carries a HARD (realized-fact)
# claim on this side; predictive/interpretive claims (forecasts, opinion, 'may consider', 'expects')
# are the exact material that fooled the supply axis on Battery, so they don't qualify a source alone.
_HARD_CLAIM_TYPES = ("descriptive", "reactive")
def _hard_sources(conn, claim_ids: list[str]) -> set:
"""Sources that contributed at least one realized-fact (descriptive/reactive) claim among claim_ids."""
if not claim_ids:
return set()
ph = ",".join("?" * len(claim_ids))
qph = ",".join("?" * len(_HARD_CLAIM_TYPES))
rows = conn.execute(
f"SELECT DISTINCT source_id FROM claims WHERE claim_id IN ({ph}) AND claim_type IN ({qph})",
list(claim_ids) + list(_HARD_CLAIM_TYPES),
).fetchall()
return {r[0] for r in rows}
def net_at(conn, classified: list[dict], as_of: str, *, window_days: int = 90, mode: str = "live",
require_hard_evidence: bool = True) -> dict:
"""Net independence-weighted corroboration in the trailing window ending at as_of. With
require_hard_evidence (default), a source only counts on a side if it carries a realized-fact claim
there — forecasts/intent alone don't qualify it (the announced-vs-deployed / opinion-vs-fact guard)."""
_, start, end = window_bounds(as_of, n=1, days=window_days)[0]
win = [c for c in classified if start < c["date"] <= end]
aff = [c for c in win if c["direction"] == "affirms"]
den = [c for c in win if c["direction"] == "contradicts"]
aff_src_all = {c["source_id"] for c in aff}
den_src_all = {c["source_id"] for c in den}
if require_hard_evidence:
hard_aff = _hard_sources(conn, [c["claim_id"] for c in aff])
hard_den = _hard_sources(conn, [c["claim_id"] for c in den])
aff_src = list(aff_src_all & hard_aff)
den_src = list(den_src_all & hard_den)
else:
aff_src, den_src = list(aff_src_all), list(den_src_all)
aff_e = eisc_for(conn, aff_src, mode=mode)["eisc_adj"] if aff_src else 0.0
den_e = eisc_for(conn, den_src, mode=mode)["eisc_adj"] if den_src else 0.0
own = 0
if aff_src:
ph = ",".join("?" * len(aff_src))
own = conn.execute(
f"SELECT COUNT(*) FROM sources WHERE source_id IN ({ph}) AND COALESCE(own_network,0)=1", aff_src
).fetchone()[0]
return {"as_of": as_of, "affirms_eisc": round(aff_e, 2), "denies_eisc": round(den_e, 2),
"net": round(aff_e - den_e, 2),
"n_affirm": len(aff), "n_deny": len(den),
"hard_affirm_src": len(aff_src), "soft_affirm_src_dropped": len(aff_src_all) - len(aff_src),
"own_network_affirm_src": own}
def trajectory(conn, sc, backend, derivative: str, as_of_dates: list[str], *,
window_days: int = 90, mode: str = "live", top_k: int = 60) -> list[dict]:
"""The net-corroboration curve over as_of_dates. Run twice (mode='live' vs 'test') to see what the
own_network quarantine removes — the reflexivity measurement."""
out = []
for as_of in as_of_dates:
classified = classify_corpus(sc, backend, derivative, as_of, top_k=top_k)
out.append(net_at(conn, classified, as_of, window_days=window_days, mode=mode))
return out