"""Pilot CLI. Subcommands map to the build order in handoff §11. Currently implemented (foundation): init-db, seed-convictions, spark-status, db-tables. Later stages (ingest, extract, score, judge, eval-ui) are added as they're built. """ from __future__ import annotations import argparse import logging import sys from pathlib import Path from .config import load_config from .store import db from .store.seed import load_convictions, load_fanout from .store.sources import load_source_edges, load_sources, update_feeds DEFAULT_CONVICTION_SEED = Path("seeds/conviction_log.seed.yaml") DEFAULT_SOURCES_SEED = Path("seeds/sources.seed.yaml") DEFAULT_FEEDS_SEED = Path("seeds/podcast_feeds.resolved.yaml") def _setup_logging(level: str) -> None: logging.basicConfig(level=getattr(logging, level.upper(), logging.INFO), format="%(asctime)s %(levelname)s %(name)s: %(message)s") def cmd_init_db(args: argparse.Namespace) -> int: cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) print(f"Initialized DB at {cfg.db_path}") print("Tables/views:", ", ".join(db.table_names(conn))) return 0 def cmd_seed_convictions(args: argparse.Namespace) -> int: cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) # ensure schema exists path = Path(args.file) n = load_convictions(conn, path) print(f"Upserted {n} convictions from {path}") breakers = conn.execute( "SELECT conviction_id, thematic_proposition FROM conviction_log WHERE is_thesis_breaker = 1" ).fetchall() if breakers: print("Thesis-breakers loaded (engine must surface these AGAINST the thesis, §5.7):") for b in breakers: print(f" {b['conviction_id']}: {b['thematic_proposition'][:80]}...") return 0 def cmd_seed_sources(args: argparse.Namespace) -> int: cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) n = load_sources(conn, Path(args.file)) by_kind = conn.execute( "SELECT kind, COUNT(*) n FROM sources GROUP BY kind ORDER BY kind" ).fetchall() print(f"Upserted {n} sources from {args.file}") for r in by_kind: print(f" {r['kind']}: {r['n']}") return 0 def cmd_seed_edges(args: argparse.Namespace) -> int: cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) n = load_source_edges(conn, Path(args.file)) total = conn.execute("SELECT COUNT(*) FROM source_edges").fetchone()[0] print(f"Inserted {n} new edges from {args.file} ({total} edges total)") return 0 def cmd_load_feeds(args: argparse.Namespace) -> int: cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) n = update_feeds(conn, Path(args.file)) print(f"updated {n} podcast feeds") rows = conn.execute( "SELECT backtest_2022_2023, COUNT(*) c FROM sources WHERE kind='podcast' " "GROUP BY backtest_2022_2023 ORDER BY c DESC" ).fetchall() print("backtest 2022-2023 reach:") for r in rows: print(f" {r['backtest_2022_2023'] or 'unset'}: {r['c']}") return 0 def cmd_ingest_edgar(args: argparse.Namespace) -> int: from .ingest.edgar import EdgarClient, ingest_filings cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) client = EdgarClient(cfg.edgar_user_agent) forms = tuple(f.strip() for f in args.forms.split(",")) if args.forms else ("10-K", "10-Q", "8-K") # resolve source_id from ticker (create a lightweight source row if not seeded) row = conn.execute("SELECT source_id FROM sources WHERE upper(ticker)=upper(?)", (args.ticker,)).fetchone() if row: source_id = row["source_id"] else: source_id = f"co-{args.ticker.lower()}" conn.execute( "INSERT OR IGNORE INTO sources (source_id, name, kind, ticker) VALUES (?,?,?,?)", (source_id, args.ticker, "filing", args.ticker.upper()), ) conn.commit() n_docs, n_jobs = ingest_filings(conn, client, source_id=source_id, ticker=args.ticker, since=args.since, until=args.until, forms=forms) print(f"{args.ticker}: +{n_docs} filing documents, +{n_jobs} extract jobs queued " f"(forms={','.join(forms)}, since={args.since}, until={args.until})") return 0 def _resolve_source_id(conn, ticker: str, kind: str = "filing") -> str: row = conn.execute("SELECT source_id FROM sources WHERE upper(ticker)=upper(?)", (ticker,)).fetchone() if row: return row["source_id"] source_id = f"co-{ticker.lower()}" conn.execute("INSERT OR IGNORE INTO sources (source_id, name, kind, ticker) VALUES (?,?,?,?)", (source_id, ticker.upper(), kind, ticker.upper())) conn.commit() return source_id def cmd_ingest_doc(args: argparse.Namespace) -> int: from .ingest.docs import ingest_one cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) doc_id = ingest_one(conn, cfg, source_id=args.source, url=args.url, title=args.title or args.url, date=args.date, method=args.method) print(f"ingested: {doc_id}" if doc_id else "no new doc (duplicate / too short / fetch failed)") return 0 def cmd_ingest_feed_text(args: argparse.Namespace) -> int: from .ingest.docs import ingest_feed_text cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) n = ingest_feed_text(conn, cfg, source_id=args.source, rss_url=args.url, since=args.since, until=args.until, limit=args.limit) print(f"ingested {n} article docs from feed for {args.source}") return 0 def cmd_ingest_doc_manifest(args: argparse.Namespace) -> int: from .ingest.docs import ingest_manifest cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) r = ingest_manifest(conn, cfg, Path(args.file)) print(f"manifest: ingested={r['ingested']} skipped={r['skipped']} missing_source={r['missing_source']}") return 0 def cmd_ingest_earnings(args: argparse.Namespace) -> int: from .ingest.earnings import FMPClient, ingest_for_ticker cfg = load_config() if not cfg.fmp_api_key: print("FMP_API_KEY not set", file=sys.stderr) return 1 conn = db.connect(cfg.db_path) db.init_db(conn) fmp = FMPClient(cfg.fmp_api_key) source_id = _resolve_source_id(conn, args.ticker) n_docs, n_jobs = ingest_for_ticker(conn, fmp, source_id=source_id, symbol=args.ticker.upper(), data_dir=cfg.data_dir, since=args.since, until=args.until, limit=args.limit) print(f"{args.ticker}: +{n_docs} earnings transcripts, +{n_jobs} extract jobs (since={args.since}, until={args.until})") return 0 def cmd_embed_claims(args: argparse.Namespace) -> int: from .spark import from_config from .embedstore.qdrant_store import get_client, ensure_collection, upsert_pending from .embedstore.embedder import SparseEmbedder cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) sc = from_config(cfg) client = get_client(args.qdrant_url) created = ensure_collection(client) print(f"collection {'created' if created else 'exists'}") sparse = SparseEmbedder() if not args.no_sparse else None n = upsert_pending(conn, sc, client, sparse) print(f"embedded + upserted {n} propositions (sparse={'on' if sparse and sparse.available else 'off'})") return 0 def cmd_search(args: argparse.Namespace) -> int: from .spark import from_config cfg = load_config() sc = from_config(cfg) res = sc.search(args.query, collection="propositions", top_k=args.top_k, rerank=not args.no_rerank) hits = res.get("results") or res.get("hits") or res print(json.dumps(hits, indent=2)[:2500]) return 0 def cmd_ingest_podcast(args: argparse.Namespace) -> int: from .ingest.podcasts import ingest_rss, ingest_youtube cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) src = conn.execute("SELECT * FROM sources WHERE source_id=?", (args.source,)).fetchone() if not src: print(f"unknown source {args.source}", file=sys.stderr) return 1 via = args.via if via == "auto": via = "youtube" if (src["backtest_2022_2023"] == "youtube_only" and args.since) else "rss" fn = ingest_youtube if via == "youtube" else ingest_rss n_docs, n_jobs = fn(conn, src, since=args.since, until=args.until, limit=args.limit) print(f"{src['name']} via {via}: +{n_docs} episodes, +{n_jobs} transcribe jobs") return 0 def cmd_run_transcribe(args: argparse.Namespace) -> int: from .spark import from_config from .ingest.transcribe_worker import run_transcribe cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) sc = from_config(cfg) result = run_transcribe(conn, sc, cfg, limit=args.limit, max_chunks=args.max_chunks) print(f"transcription: {result['jobs_processed']} jobs processed") return 0 def cmd_run_transcribe_gemini(args: argparse.Namespace) -> int: from .ingest.gemini_transcribe import run_transcribe_gemini cfg = load_config() conn = db.connect(cfg.db_path) r = run_transcribe_gemini(conn, cfg, limit=args.limit, concurrency=args.concurrency) tok_in, tok_out = r["prompt_tokens"], r["output_tokens"] # Gemini 2.5 Flash list price: ~$0.30/1M text-in, audio-in ~$1.00/1M, $2.50/1M out. Audio dominates in. est = tok_in / 1_000_000 * 1.00 + tok_out / 1_000_000 * 2.50 print(f"gemini transcribe: done={r['done']} failed={r['failed']} | " f"tokens in={tok_in:,} out={tok_out:,} | ~${est:.2f} this run (≈${est/max(r['done'],1):.3f}/ep)") return 0 def cmd_run_extract(args: argparse.Namespace) -> int: from .spark import from_config from .extract.worker import run_extract cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) sc = from_config(cfg) result = run_extract(conn, sc, cfg, limit=args.limit, max_chunks_per_doc=args.max_chunks, chunk_chars=args.chunk_chars) print(f"extraction: {result['jobs_processed']} jobs, {result['claims_written']} claims written") return 0 def cmd_queue_status(args: argparse.Namespace) -> int: from .backfill import queue cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) s = queue.stats(conn) if not s: print("queue empty") return 0 for job_type, states in sorted(s.items()): parts = ", ".join(f"{st}={n}" for st, n in sorted(states.items())) print(f" {job_type}: {parts}") return 0 def cmd_feed_peek(args: argparse.Namespace) -> int: from .ingest.feeds import fetch_feed, episode_records parsed = fetch_feed(args.url) status = getattr(parsed, "status", None) recs = episode_records(parsed) print(f"status={status} bozo={getattr(parsed, 'bozo', None)} episodes_with_audio={len(recs)}") for r in recs[: args.limit]: print(f" [{r['published']}] {str(r['title'])[:70]}") if recs: print(f"oldest in feed: {recs[-1]['published']} newest: {recs[0]['published']}") return 0 def cmd_serve(args: argparse.Namespace) -> int: import uvicorn from .ui.app import create_app cfg = load_config() port = args.port or cfg.ui_port print(f"serving corpus UI on http://0.0.0.0:{port}") uvicorn.run(create_app(), host="0.0.0.0", port=port) return 0 def cmd_seed_fanout(args: argparse.Namespace) -> int: cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) n = load_fanout(conn, Path(args.file)) print(f"seeded {n} fan-out derivative nodes") return 0 def cmd_backtest(args: argparse.Namespace) -> int: from .spark import from_config from .signals.run import run_backtest from datetime import datetime, timedelta cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) sc = from_config(cfg) # monthly as_of march start = datetime.strptime(args.start, "%Y-%m-%d") end = datetime.strptime(args.end, "%Y-%m-%d") dates, d = [], start while d <= end: dates.append(d.strftime("%Y-%m-%d")) d = d + timedelta(days=args.step_days) print(f"§7.1 backtest: conviction={args.conviction}, as_of march {args.start}→{args.end} ({len(dates)} points)") timeline = run_backtest(conn, sc, cfg, conviction_id=args.conviction, dates=dates, window_days=args.window_days) # report: per-node first-clear date + score trajectory; highlight the headline derivative print("\n=== node trajectories (score by as_of; ★=cleared evidence bar) ===") nodes = {} for as_of, res in timeline: for r in res: key = r["node"]["node_id"] or r["node"]["conviction_id"] nodes.setdefault(key, []).append((as_of, r["result"]["score"], r["evidence"], r["promotion"], r["result"]["inputs"])) for key, traj in sorted(nodes.items()): first = next((t for t in traj if t[2]), None) peak = max(traj, key=lambda t: t[1]) mark = f"first-cleared {first[0]}" if first else "never cleared" print(f" {key:28} peak={peak[1]:.2f} {mark}") head = nodes.get(args.headline) if head: print(f"\n=== HEADLINE derivative: {args.headline} ===") for as_of, score, ev, pr, inp in head: star = "★" if ev else ("·" if score > 0 else " ") print(f" {as_of} {star} score={score:.2f} corrob={inp.get('corroboration',0)} " f"n_conf={inp.get('n_confirmed',0)} eisc={inp.get('eisc_corrob',0)} " f"a={inp.get('a_corrob',0)} k_eff={inp.get('k_eff0',0)}") firstclear = next((t for t in head if t[2]), None) print(f"\n VERDICT: headline power-infra derivative " f"{'SURFACED at ' + firstclear[0] if firstclear else 'did NOT surface'} " f"(bar = under_acted ≥ {0.3})") return 0 def cmd_two_sided(args: argparse.Namespace) -> int: """Two-sided net-corroboration trajectory (DESIGN_v2.1 H5) for the adversarial cases. BATTERY: demand-net should rise while supply-net stays flat. STRIKE: net stays quiet in live, fires in test.""" from .spark import from_config as spark_from_config from .extract.backends import from_config as backend_from_config from .signals.two_sided import trajectory cfg = load_config() conn = db.connect(cfg.db_path) sc = spark_from_config(cfg) backend = backend_from_config(cfg, sc) nodes = conn.execute( "SELECT node_id, derivative_proposition FROM fanout_nodes WHERE parent_conviction_id=? ORDER BY node_id", (args.conviction,), ).fetchall() dates = [d.strip() for d in args.dates.split(",")] filt = [s for s in args.nodes.split(",") if s] if args.nodes else [] for r in nodes: if filt and not any(k.lower() in r["node_id"].lower() for k in filt): continue for mode in [m.strip() for m in args.modes.split(",")]: traj = trajectory(conn, sc, backend, r["derivative_proposition"], dates, window_days=args.window_days, mode=mode) print(f"\n### {r['node_id']} [mode={mode}, window={args.window_days}d] ###") for pt in traj: print(f" {pt['as_of']}: net={pt['net']:+.2f} " f"affirm(eisc={pt['affirms_eisc']}, hard_src={pt.get('hard_affirm_src','?')}, " f"n_claims={pt['n_affirm']}, soft_dropped={pt.get('soft_affirm_src_dropped','?')}) " f"deny(eisc={pt['denies_eisc']}, n={pt['n_deny']}) " f"own_net={pt['own_network_affirm_src']}") return 0 def cmd_confusion(args: argparse.Namespace) -> int: from .signals.confusion import run_confusion cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) out = run_confusion(conn, cfg, args.spec) classify = out["classify"] print("=== PRE-REGISTERED confusion matrix (DESIGN_v2 §1) — precision AND recall; RUNWAY = frac of move still ahead at signal ===") print(f"{'derivative':26} {'reprice?':8} {'peak%':>6} {'whisper':>9} {'run_wh':>6} {'cleared':>9} {'run_cl':>6} cl/wh") for r in out["rows"]: cl, wh = classify(r, "cleared"), classify(r, "whisper") miss = f" (no px:{','.join(r['missing'])})" if r["missing"] else "" print(f"{r['node']:26} {('REAL' if r['confirmed'] else 'no'):8} {str(r['peak_pct']):>6} " f"{str(r['whisper_date'] or '-'):>9} {str(r['runway_whisper'] if r['runway_whisper'] is not None else '-'):>6} " f"{str(r['cleared_date'] or '-'):>9} {str(r['runway_cleared'] if r['runway_cleared'] is not None else '-'):>6} " f"{cl}/{wh}{miss}") for level in ("cleared", "whisper"): c, p, rec = out[level] print(f"\n{level.upper()} level: TP={c['TP']} FP={c['FP']} FN={c['FN']} TN={c['TN']} | " f"precision={p if p is None else round(p,2)} recall={rec if rec is None else round(rec,2)}") print("\nlead_* = days the repricing came AFTER the signal (positive = engine was early).") print("The cleared→whisper delta = what the independence floor cost in lead time / recall.") return 0 def cmd_provenance(args: argparse.Namespace) -> int: """The processing log — what's been ingested/processed, so we never reprocess silently.""" cfg = load_config() conn = db.connect(cfg.db_path) db.init_db(conn) print("processed documents (the durable log):") for r in conn.execute( "SELECT kind, COUNT(*) total, SUM(CASE WHEN processed_at IS NOT NULL THEN 1 ELSE 0 END) proc " "FROM documents GROUP BY kind ORDER BY kind" ): print(f" {r['kind']:14} {r['proc']}/{r['total']} processed") print("dedup model: (1) UNIQUE(source_id, external_id) = robust pre-GPU guard; " "(2) dedup_key = cross-mirror (title+date); content_hash = audit only.") dups = conn.execute( "SELECT dedup_key, COUNT(*) c FROM documents WHERE dedup_key IS NOT NULL " "GROUP BY dedup_key HAVING c > 1" ).fetchall() print(f"cross-mirror dedup_key groups (same episode via >1 feed): {len(dups)}") miss = conn.execute("SELECT COUNT(*) FROM documents WHERE dedup_key IS NULL").fetchone()[0] if miss: print(f" ({miss} docs missing dedup_key — run `provenance --backfill-hashes`)") if args.backfill_hashes: import hashlib import os from .util import audio_dedup_key ndk = nch = 0 for r in conn.execute("SELECT doc_id, kind, title, date, external_id, transcript_path, dedup_key, content_hash FROM documents"): updates: dict = {} if not r["dedup_key"]: updates["dedup_key"] = (audio_dedup_key(r["title"], r["date"]) if r["kind"] in ("podcast", "youtube") else r["external_id"]) ndk += 1 if not r["content_hash"] and r["transcript_path"] and os.path.exists(r["transcript_path"]): updates["content_hash"] = hashlib.sha256(open(r["transcript_path"], "rb").read()).hexdigest() nch += 1 if updates: sets = ", ".join(f"{k}=?" for k in updates) conn.execute(f"UPDATE documents SET {sets} WHERE doc_id=?", (*updates.values(), r["doc_id"])) conn.commit() print(f"backfilled {ndk} dedup_keys, {nch} content hashes (audit)") return 0 def cmd_db_tables(args: argparse.Namespace) -> int: cfg = load_config() conn = db.connect(cfg.db_path) for t in db.table_names(conn): print(t) return 0 def cmd_spark_status(args: argparse.Namespace) -> int: from .spark import from_config cfg = load_config() sc = from_config(cfg) try: print("status:", sc.status()) print("endpoints:", sc.endpoints()) return 0 except Exception as e: # noqa: BLE001 — health probe; surface, don't crash print(f"Spark Control unreachable at {cfg.spark_control_url}: {e}", file=sys.stderr) return 1 def build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser(prog="signal_engine", description="Ten31 Signal Engine (pilot)") sub = p.add_subparsers(dest="command", required=True) sub.add_parser("init-db", help="Create the SQLite schema").set_defaults(func=cmd_init_db) sp = sub.add_parser("seed-convictions", help="Load the conviction log (§3.1)") sp.add_argument("--file", default=str(DEFAULT_CONVICTION_SEED)) sp.set_defaults(func=cmd_seed_convictions) ss = sub.add_parser("seed-sources", help="Load the source registry (§7.3/§7.4)") ss.add_argument("--file", default=str(DEFAULT_SOURCES_SEED)) ss.set_defaults(func=cmd_seed_sources) sde = sub.add_parser("seed-edges", help="Seed EISC connectedness edges (priors) idempotently") sde.add_argument("--file", default="seeds/source_edges.bitcoin.seed.yaml") sde.set_defaults(func=cmd_seed_edges) lf = sub.add_parser("load-feeds", help="Apply resolved/verified podcast feed URLs + backtest reach") lf.add_argument("--file", default=str(DEFAULT_FEEDS_SEED)) lf.set_defaults(func=cmd_load_feeds) sf = sub.add_parser("seed-fanout", help="Load the hand-written fan-out tree (§7.1 backtest)") sf.add_argument("--file", default="seeds/fanout.K2023.seed.yaml") sf.set_defaults(func=cmd_seed_fanout) bt = sub.add_parser("backtest", help="Run the §7.1 under-acted-conviction backtest (as-of march)") bt.add_argument("--conviction", default="K2023") bt.add_argument("--start", default="2023-01-01") bt.add_argument("--end", default="2024-06-01") bt.add_argument("--step-days", type=int, default=30) bt.add_argument("--window-days", type=int, default=90, help="~quarterly for filings/earnings cadence") bt.add_argument("--headline", default="K2023-picks-and-shovels") bt.set_defaults(func=cmd_backtest) ie = sub.add_parser("ingest-edgar", help="Fetch SEC filings for a ticker → documents + extract jobs") ie.add_argument("--ticker", required=True) ie.add_argument("--since", help="ISO date lower bound, e.g. 2022-01-01") ie.add_argument("--until", help="ISO date upper bound, e.g. 2023-12-31") ie.add_argument("--forms", help="comma list, default 10-K,10-Q,8-K") ie.set_defaults(func=cmd_ingest_edgar) idoc = sub.add_parser("ingest-doc", help="Fetch one text doc (HTML/PDF) → document + extract job (Battery corpus)") idoc.add_argument("--source", required=True, help="source_id (must exist)") idoc.add_argument("--url", required=True) idoc.add_argument("--title") idoc.add_argument("--date", help="ISO date of the document") idoc.add_argument("--method", choices=["auto", "html", "pdf"], default="auto") idoc.set_defaults(func=cmd_ingest_doc) idm = sub.add_parser("ingest-doc-manifest", help="Batch-ingest a YAML doc manifest (Battery corpus)") idm.add_argument("--file", default="seeds/battery_docs.manifest.yaml") idm.set_defaults(func=cmd_ingest_doc_manifest) ift = sub.add_parser("ingest-feed-text", help="Ingest article bodies behind a text RSS feed (blog/press)") ift.add_argument("--source", required=True) ift.add_argument("--url", required=True, help="RSS feed URL") ift.add_argument("--since") ift.add_argument("--until") ift.add_argument("--limit", type=int, default=50) ift.set_defaults(func=cmd_ingest_feed_text) ge = sub.add_parser("ingest-earnings", help="Fetch FMP earnings transcripts → documents + extract jobs") ge.add_argument("--ticker", required=True) ge.add_argument("--since", help="ISO date lower bound (uses transcript date)") ge.add_argument("--until", help="ISO date upper bound") ge.add_argument("--limit", type=int, default=8) ge.set_defaults(func=cmd_ingest_earnings) ts = sub.add_parser("two-sided", help="Two-sided net-corroboration trajectory (Strike/Battery adversarial cases)") ts.add_argument("--conviction", default="BATTERY2022") ts.add_argument("--nodes", default="", help="comma substrings to filter fan-out nodes, e.g. demand,supply") ts.add_argument("--dates", default="2022-12-31,2023-06-30,2023-12-31,2024-06-30,2024-12-31") ts.add_argument("--modes", default="live", help="comma list: live,test") ts.add_argument("--window-days", type=int, default=365) ts.set_defaults(func=cmd_two_sided) ec = sub.add_parser("embed-claims", help="Embed pending propositions → Qdrant hybrid collection (§4.3)") ec.add_argument("--qdrant-url", default="http://192.168.1.87:6333") ec.add_argument("--no-sparse", action="store_true", help="dense-only (skip BM25)") ec.set_defaults(func=cmd_embed_claims) se = sub.add_parser("search", help="Hybrid search the proposition store via the gateway") se.add_argument("--query", required=True) se.add_argument("--top-k", type=int, default=8) se.add_argument("--no-rerank", action="store_true") se.set_defaults(func=cmd_search) ip = sub.add_parser("ingest-podcast", help="Register podcast episodes → transcribe jobs (RSS or YouTube)") ip.add_argument("--source", required=True, help="source_id, e.g. pod-dwarkesh") ip.add_argument("--via", choices=["auto", "rss", "youtube"], default="auto") ip.add_argument("--since") ip.add_argument("--until") ip.add_argument("--limit", type=int, default=20) ip.set_defaults(func=cmd_ingest_podcast) rt = sub.add_parser("run-transcribe", help="Drain 'transcribe' jobs → speaker-attributed transcripts + voiceprints") rt.add_argument("--limit", type=int, default=5) rt.add_argument("--max-chunks", type=int, default=999) rt.set_defaults(func=cmd_run_transcribe) rtg = sub.add_parser("run-transcribe-gemini", help="One-time backfill: drain 'transcribe' jobs via Gemini (off the Spark GPU)") rtg.add_argument("--limit", type=int, default=5) rtg.add_argument("--concurrency", type=int, default=4) rtg.set_defaults(func=cmd_run_transcribe_gemini) re = sub.add_parser("run-extract", help="Drain 'extract' jobs → claims via the local LLM (§4.2)") re.add_argument("--limit", type=int, default=5, help="max jobs to process this run") re.add_argument("--max-chunks", type=int, default=999, help="max chunks per document (default: full coverage (999))") re.add_argument("--chunk-chars", type=int, default=12_000, help="chars per extraction chunk; smaller = better recall, more LLM calls") re.set_defaults(func=cmd_run_extract) sub.add_parser("queue-status", help="Backfill queue counts by type/state").set_defaults(func=cmd_queue_status) fp = sub.add_parser("feed-peek", help="Parse an RSS feed and show episode coverage") fp.add_argument("--url", required=True) fp.add_argument("--limit", type=int, default=5) fp.set_defaults(func=cmd_feed_peek) sv = sub.add_parser("serve", help="Run the corpus-management web UI (FastAPI)") sv.add_argument("--port", type=int, default=None) sv.set_defaults(func=cmd_serve) cm = sub.add_parser("confusion-matrix", help="Pre-registered precision/recall on the §7.1 derivatives (resolver)") cm.add_argument("--spec", default="seeds/resolution.K2023.yaml") cm.set_defaults(func=cmd_confusion) pv = sub.add_parser("provenance", help="Processing log: what's ingested/processed (dedup-safe)") pv.add_argument("--backfill-hashes", action="store_true", help="compute content_hash for older transcripts") pv.set_defaults(func=cmd_provenance) sub.add_parser("db-tables", help="List tables/views").set_defaults(func=cmd_db_tables) sub.add_parser("spark-status", help="Probe Spark Control health").set_defaults(func=cmd_spark_status) return p def main(argv: list[str] | None = None) -> int: args = build_parser().parse_args(argv) cfg = load_config() _setup_logging(cfg.log_level) return args.func(args) if __name__ == "__main__": raise SystemExit(main())