Files
ten31-signal-engine/signal_engine/ingest/podcasts.py
T

112 lines
4.9 KiB
Python

"""Podcast ingestion → documents + 'transcribe' jobs (§4.1).
RSS path: parse the feed, take episodes in [since, until], register documents pointing at the audio
enclosure. YouTube path: enumerate a channel's videos in the date window via yt-dlp (the back-catalog
route for the ~9 shows whose RSS is a truncated rolling window — see seeds/podcast_feeds.resolved.yaml).
The transcribe worker downloads + processes either kind identically.
"""
from __future__ import annotations
import hashlib
import json
import logging
import sqlite3
import subprocess
from ..backfill import queue
from ..util import audio_dedup_key
from .feeds import episode_records, fetch_feed
log = logging.getLogger(__name__)
def _enqueue_doc(conn, *, source_id, kind, external_id, url, title, date) -> tuple[int, int]:
doc_id = f"pod:{source_id}:{hashlib.sha1(external_id.encode()).hexdigest()[:12]}"
dkey = audio_dedup_key(title, date)
# Cross-mirror dedup (pre-GPU): if this same episode was already processed (any source/feed),
# record the sighting for provenance but DON'T re-transcribe. (external_id UNIQUE already covers
# same-feed re-ingest; this covers the same episode via a different feed/YouTube mirror.)
dup = conn.execute(
"SELECT doc_id FROM documents WHERE dedup_key=? AND processed_at IS NOT NULL LIMIT 1", (dkey,)
).fetchone()
cur = conn.execute(
"""INSERT OR IGNORE INTO documents (doc_id, source_id, kind, external_id, url, title, date, dedup_key)
VALUES (?,?,?,?,?,?,?,?)""",
(doc_id, source_id, kind, external_id, url, title, date, dkey),
)
conn.commit()
if not cur.rowcount:
return (0, 0) # same (source_id, external_id) already known
if dup:
conn.execute(
"UPDATE documents SET processed_at=datetime('now'), raw_path=? WHERE doc_id=?",
(f"dup_of:{dup['doc_id']}", doc_id),
)
conn.commit()
log.info("skip transcribe for %s — duplicate content of %s", doc_id, dup["doc_id"])
return (1, 0)
h = hashlib.sha256(f"{doc_id}|audio-v0".encode()).hexdigest()
job = queue.enqueue(conn, job_type="transcribe", target_id=doc_id, input_hash=h,
parent_doc_id=doc_id, priority=100)
return (1, 1 if job is not None else 0)
def ingest_rss(conn: sqlite3.Connection, source: sqlite3.Row, *, since=None, until=None, limit=20):
if not source["rss_url"]:
raise ValueError(f"{source['source_id']} has no rss_url")
recs = episode_records(fetch_feed(source["rss_url"]))
n_docs = n_jobs = count = 0
for r in recs:
d = r["published"]
if since and d and d < since:
continue
if until and d and d > until:
continue
if count >= limit:
break
count += 1
nd, nj = _enqueue_doc(conn, source_id=source["source_id"], kind="podcast",
external_id=r["guid"], url=r["audio_url"], title=r["title"], date=d)
n_docs += nd
n_jobs += nj
return n_docs, n_jobs
def ingest_youtube(conn: sqlite3.Connection, source: sqlite3.Row, *, since=None, until=None,
limit=20, max_scan=800):
"""Enumerate channel videos in the date window via yt-dlp (NON-flat, so upload_date is populated —
flat mode returns NA). Videos come newest-first, so we use --dateafter/--datebefore to select the
window and --break-match-filters to STOP scanning once we drop below `since` (avoids walking the
entire channel history). The transcribe worker downloads audio on demand."""
if not source["channel_url"]:
raise ValueError(f"{source['source_id']} has no channel_url")
url = source["channel_url"].rstrip("/")
if "/playlist" not in url and not url.endswith("/videos"):
url = url + "/videos"
cmd = ["yt-dlp", "--no-warnings", "--ignore-errors", "--skip-download",
"--print", "%(id)s\t%(upload_date)s\t%(title)s", "--playlist-end", str(max_scan)]
if since:
s = since.replace("-", "")
cmd += ["--dateafter", s, "--break-match-filters", f"upload_date>={s}"]
if until:
cmd += ["--datebefore", until.replace("-", "")]
cmd.append(url)
out = subprocess.run(cmd, capture_output=True, text=True, timeout=900)
n_docs = n_jobs = count = 0
for line in out.stdout.splitlines():
parts = line.split("\t")
if len(parts) < 2 or not parts[0] or parts[1] in ("NA", ""):
continue
vid, upd = parts[0], parts[1]
title = parts[2] if len(parts) > 2 else vid
date = f"{upd[:4]}-{upd[4:6]}-{upd[6:8]}" if len(upd) == 8 else None
if count >= limit:
break
count += 1
nd, nj = _enqueue_doc(conn, source_id=source["source_id"], kind="youtube",
external_id=vid, url=f"https://www.youtube.com/watch?v={vid}",
title=title, date=date)
n_docs += nd
n_jobs += nj
return n_docs, n_jobs