Files

91 lines
3.4 KiB
Python

"""Load the source registry (companies + podcasts, §7.3/§7.4) into SQLite. Idempotent upsert."""
from __future__ import annotations
import sqlite3
from pathlib import Path
from typing import Any
import yaml
_COLS = ("source_id", "name", "kind", "source_cluster", "role", "rss_url",
"channel_url", "ticker", "cluster_capped_low", "own_network", "backtest_2022_2023", "notes")
def _row(s: dict[str, Any]) -> dict[str, Any]:
return {
"source_id": s["id"],
"name": s["name"],
"kind": s["kind"],
"source_cluster": s.get("cluster"),
"role": s.get("role", "none"),
"rss_url": s.get("rss_url"),
"channel_url": s.get("channel_url"),
"ticker": s.get("ticker"),
"cluster_capped_low": 1 if s.get("cluster_capped_low") else 0,
"own_network": 1 if s.get("own_network") else 0,
"backtest_2022_2023": s.get("backtest_2022_2023"),
"notes": s.get("notes"),
}
def update_feeds(conn: sqlite3.Connection, path: Path) -> int:
"""Apply resolved/verified podcast feed URLs + backtest-reach to existing source rows."""
try:
conn.execute("ALTER TABLE sources ADD COLUMN backtest_2022_2023 TEXT")
conn.commit()
except sqlite3.OperationalError:
pass # column already exists
data = yaml.safe_load(Path(path).read_text()) or {}
rows = data.get("feeds", [])
for f in rows:
conn.execute(
"""UPDATE sources
SET rss_url=:rss_url, channel_url=:youtube_channel_url,
backtest_2022_2023=:backtest_2022_2023, notes=COALESCE(:note, notes)
WHERE source_id=:id""",
{
"id": f["id"], "rss_url": f.get("rss_url"),
"youtube_channel_url": f.get("youtube_channel_url"),
"backtest_2022_2023": f.get("backtest_2022_2023"), "note": f.get("note"),
},
)
conn.commit()
return len(rows)
def load_source_edges(conn: sqlite3.Connection, path: Path) -> int:
"""Seed EISC connectedness edges (priors) idempotently. Stores src_a,src_b in sorted order to
match the transcribe_worker's convention (sorted([a,b]) + ON CONFLICT weight+=1) so real detections
accumulate on the same PK instead of creating a reversed duplicate. DO NOTHING on conflict → a
re-run won't inflate, and won't clobber a stronger auto-detected weight."""
data = yaml.safe_load(Path(path).read_text()) or {}
rows = data.get("edges", [])
applied = 0
for e in rows:
a, b = sorted([e["a"], e["b"]])
cur = conn.execute(
"""INSERT INTO source_edges (src_a, src_b, edge_type, weight, evidence)
VALUES (?,?,?,?,?)
ON CONFLICT(src_a, src_b, edge_type) DO NOTHING""",
(a, b, e["type"], float(e.get("weight", 1.0)), e.get("evidence")),
)
applied += cur.rowcount
conn.commit()
return applied
def load_sources(conn: sqlite3.Connection, path: Path) -> int:
data = yaml.safe_load(Path(path).read_text()) or {}
rows = data.get("sources", [])
cols = ", ".join(_COLS)
placeholders = ", ".join(f":{c}" for c in _COLS)
updates = ", ".join(f"{c}=excluded.{c}" for c in _COLS if c != "source_id")
sql = (
f"INSERT INTO sources ({cols}, created_at) VALUES ({placeholders}, datetime('now')) "
f"ON CONFLICT(source_id) DO UPDATE SET {updates}"
)
for s in rows:
conn.execute(sql, _row(s))
conn.commit()
return len(rows)