91 lines
3.4 KiB
Python
91 lines
3.4 KiB
Python
"""Load the source registry (companies + podcasts, §7.3/§7.4) into SQLite. Idempotent upsert."""
|
|
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
_COLS = ("source_id", "name", "kind", "source_cluster", "role", "rss_url",
|
|
"channel_url", "ticker", "cluster_capped_low", "own_network", "backtest_2022_2023", "notes")
|
|
|
|
|
|
def _row(s: dict[str, Any]) -> dict[str, Any]:
|
|
return {
|
|
"source_id": s["id"],
|
|
"name": s["name"],
|
|
"kind": s["kind"],
|
|
"source_cluster": s.get("cluster"),
|
|
"role": s.get("role", "none"),
|
|
"rss_url": s.get("rss_url"),
|
|
"channel_url": s.get("channel_url"),
|
|
"ticker": s.get("ticker"),
|
|
"cluster_capped_low": 1 if s.get("cluster_capped_low") else 0,
|
|
"own_network": 1 if s.get("own_network") else 0,
|
|
"backtest_2022_2023": s.get("backtest_2022_2023"),
|
|
"notes": s.get("notes"),
|
|
}
|
|
|
|
|
|
def update_feeds(conn: sqlite3.Connection, path: Path) -> int:
|
|
"""Apply resolved/verified podcast feed URLs + backtest-reach to existing source rows."""
|
|
try:
|
|
conn.execute("ALTER TABLE sources ADD COLUMN backtest_2022_2023 TEXT")
|
|
conn.commit()
|
|
except sqlite3.OperationalError:
|
|
pass # column already exists
|
|
data = yaml.safe_load(Path(path).read_text()) or {}
|
|
rows = data.get("feeds", [])
|
|
for f in rows:
|
|
conn.execute(
|
|
"""UPDATE sources
|
|
SET rss_url=:rss_url, channel_url=:youtube_channel_url,
|
|
backtest_2022_2023=:backtest_2022_2023, notes=COALESCE(:note, notes)
|
|
WHERE source_id=:id""",
|
|
{
|
|
"id": f["id"], "rss_url": f.get("rss_url"),
|
|
"youtube_channel_url": f.get("youtube_channel_url"),
|
|
"backtest_2022_2023": f.get("backtest_2022_2023"), "note": f.get("note"),
|
|
},
|
|
)
|
|
conn.commit()
|
|
return len(rows)
|
|
|
|
|
|
def load_source_edges(conn: sqlite3.Connection, path: Path) -> int:
|
|
"""Seed EISC connectedness edges (priors) idempotently. Stores src_a,src_b in sorted order to
|
|
match the transcribe_worker's convention (sorted([a,b]) + ON CONFLICT weight+=1) so real detections
|
|
accumulate on the same PK instead of creating a reversed duplicate. DO NOTHING on conflict → a
|
|
re-run won't inflate, and won't clobber a stronger auto-detected weight."""
|
|
data = yaml.safe_load(Path(path).read_text()) or {}
|
|
rows = data.get("edges", [])
|
|
applied = 0
|
|
for e in rows:
|
|
a, b = sorted([e["a"], e["b"]])
|
|
cur = conn.execute(
|
|
"""INSERT INTO source_edges (src_a, src_b, edge_type, weight, evidence)
|
|
VALUES (?,?,?,?,?)
|
|
ON CONFLICT(src_a, src_b, edge_type) DO NOTHING""",
|
|
(a, b, e["type"], float(e.get("weight", 1.0)), e.get("evidence")),
|
|
)
|
|
applied += cur.rowcount
|
|
conn.commit()
|
|
return applied
|
|
|
|
|
|
def load_sources(conn: sqlite3.Connection, path: Path) -> int:
|
|
data = yaml.safe_load(Path(path).read_text()) or {}
|
|
rows = data.get("sources", [])
|
|
cols = ", ".join(_COLS)
|
|
placeholders = ", ".join(f":{c}" for c in _COLS)
|
|
updates = ", ".join(f"{c}=excluded.{c}" for c in _COLS if c != "source_id")
|
|
sql = (
|
|
f"INSERT INTO sources ({cols}, created_at) VALUES ({placeholders}, datetime('now')) "
|
|
f"ON CONFLICT(source_id) DO UPDATE SET {updates}"
|
|
)
|
|
for s in rows:
|
|
conn.execute(sql, _row(s))
|
|
conn.commit()
|
|
return len(rows)
|