"""Podcast ingestion → documents + 'transcribe' jobs (§4.1). RSS path: parse the feed, take episodes in [since, until], register documents pointing at the audio enclosure. YouTube path: enumerate a channel's videos in the date window via yt-dlp (the back-catalog route for the ~9 shows whose RSS is a truncated rolling window — see seeds/podcast_feeds.resolved.yaml). The transcribe worker downloads + processes either kind identically. """ from __future__ import annotations import hashlib import json import logging import sqlite3 import subprocess from ..backfill import queue from ..util import audio_dedup_key from .feeds import episode_records, fetch_feed log = logging.getLogger(__name__) def _enqueue_doc(conn, *, source_id, kind, external_id, url, title, date) -> tuple[int, int]: doc_id = f"pod:{source_id}:{hashlib.sha1(external_id.encode()).hexdigest()[:12]}" dkey = audio_dedup_key(title, date) # Cross-mirror dedup (pre-GPU): if this same episode was already processed (any source/feed), # record the sighting for provenance but DON'T re-transcribe. (external_id UNIQUE already covers # same-feed re-ingest; this covers the same episode via a different feed/YouTube mirror.) dup = conn.execute( "SELECT doc_id FROM documents WHERE dedup_key=? AND processed_at IS NOT NULL LIMIT 1", (dkey,) ).fetchone() cur = conn.execute( """INSERT OR IGNORE INTO documents (doc_id, source_id, kind, external_id, url, title, date, dedup_key) VALUES (?,?,?,?,?,?,?,?)""", (doc_id, source_id, kind, external_id, url, title, date, dkey), ) conn.commit() if not cur.rowcount: return (0, 0) # same (source_id, external_id) already known if dup: conn.execute( "UPDATE documents SET processed_at=datetime('now'), raw_path=? WHERE doc_id=?", (f"dup_of:{dup['doc_id']}", doc_id), ) conn.commit() log.info("skip transcribe for %s — duplicate content of %s", doc_id, dup["doc_id"]) return (1, 0) h = hashlib.sha256(f"{doc_id}|audio-v0".encode()).hexdigest() job = queue.enqueue(conn, job_type="transcribe", target_id=doc_id, input_hash=h, parent_doc_id=doc_id, priority=100) return (1, 1 if job is not None else 0) def ingest_rss(conn: sqlite3.Connection, source: sqlite3.Row, *, since=None, until=None, limit=20): if not source["rss_url"]: raise ValueError(f"{source['source_id']} has no rss_url") recs = episode_records(fetch_feed(source["rss_url"])) n_docs = n_jobs = count = 0 for r in recs: d = r["published"] if since and d and d < since: continue if until and d and d > until: continue if count >= limit: break count += 1 nd, nj = _enqueue_doc(conn, source_id=source["source_id"], kind="podcast", external_id=r["guid"], url=r["audio_url"], title=r["title"], date=d) n_docs += nd n_jobs += nj return n_docs, n_jobs def ingest_youtube(conn: sqlite3.Connection, source: sqlite3.Row, *, since=None, until=None, limit=20, max_scan=800): """Enumerate channel videos in the date window via yt-dlp (NON-flat, so upload_date is populated — flat mode returns NA). Videos come newest-first, so we use --dateafter/--datebefore to select the window and --break-match-filters to STOP scanning once we drop below `since` (avoids walking the entire channel history). The transcribe worker downloads audio on demand.""" if not source["channel_url"]: raise ValueError(f"{source['source_id']} has no channel_url") url = source["channel_url"].rstrip("/") if "/playlist" not in url and not url.endswith("/videos"): url = url + "/videos" cmd = ["yt-dlp", "--no-warnings", "--ignore-errors", "--skip-download", "--print", "%(id)s\t%(upload_date)s\t%(title)s", "--playlist-end", str(max_scan)] if since: s = since.replace("-", "") cmd += ["--dateafter", s, "--break-match-filters", f"upload_date>={s}"] if until: cmd += ["--datebefore", until.replace("-", "")] cmd.append(url) out = subprocess.run(cmd, capture_output=True, text=True, timeout=900) n_docs = n_jobs = count = 0 for line in out.stdout.splitlines(): parts = line.split("\t") if len(parts) < 2 or not parts[0] or parts[1] in ("NA", ""): continue vid, upd = parts[0], parts[1] title = parts[2] if len(parts) > 2 else vid date = f"{upd[:4]}-{upd[4:6]}-{upd[6:8]}" if len(upd) == 8 else None if count >= limit: break count += 1 nd, nj = _enqueue_doc(conn, source_id=source["source_id"], kind="youtube", external_id=vid, url=f"https://www.youtube.com/watch?v={vid}", title=title, date=date) n_docs += nd n_jobs += nj return n_docs, n_jobs