From 5deffddb1774eeb46aab647d5822610ff146ce64 Mon Sep 17 00:00:00 2001 From: Keysat Date: Mon, 15 Jun 2026 22:28:12 -0500 Subject: [PATCH] Fix transcript chunker context overflow; full-coverage extraction defaults chunk_text split only on "\n\n", but ASR transcripts have none (speaker turns are joined by a single "\n"), so whole 2-3h episodes (~250K chars) went to the extractor in one call and 400'd on context overflow. Fall through paragraph -> line -> sentence -> word -> hard char-slice so no chunk exceeds the cap regardless of punctuation; guard max_chars < 1. Default extraction to recall-first full coverage (chunk_chars 12K, max_chunks 999) and expose both as run-extract --chunk-chars / --max-chunks. --- ROADMAP.md | 4 +++ signal_engine/cli.py | 8 ++++-- signal_engine/extract/claims.py | 50 +++++++++++++++++++++++++-------- signal_engine/extract/worker.py | 4 +-- 4 files changed, 50 insertions(+), 16 deletions(-) diff --git a/ROADMAP.md b/ROADMAP.md index 6c2f051..342ee42 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -33,6 +33,10 @@ falsification hypotheses (H1–H6) are in `DESIGN_v2.md`. - **Episode-pipelining** in `transcribe_worker` — download/chunk the next episode while transcribing the current one, to close the inter-episode GPU idle gap (the per-chunk 2-in-flight path is already done). - **Corpus-management UI** — add to the corpus over time and see the full corpus selection. +- **Expose pipeline tunables in the UI (with the UI topic).** Extraction chunk size + per-doc chunk cap, + audio chunk length, audio concurrency, etc. are currently hardcoded defaults (now also CLI flags on + `run-extract`: `--chunk-chars`, `--max-chunks`). Surface them in the UI so they're visible/adjustable, + not black-box assumptions we forget about. Tie to the corpus-management UI work. - **Forward live operation** — the only real test: scoring un-pre-selected signals as they arrive, with the dual-evaluation ledger as arbiter. diff --git a/signal_engine/cli.py b/signal_engine/cli.py index 27eefd4..9f96bbe 100644 --- a/signal_engine/cli.py +++ b/signal_engine/cli.py @@ -254,7 +254,8 @@ def cmd_run_extract(args: argparse.Namespace) -> int: conn = db.connect(cfg.db_path) db.init_db(conn) sc = from_config(cfg) - result = run_extract(conn, sc, cfg, limit=args.limit, max_chunks_per_doc=args.max_chunks) + result = run_extract(conn, sc, cfg, limit=args.limit, max_chunks_per_doc=args.max_chunks, + chunk_chars=args.chunk_chars) print(f"extraction: {result['jobs_processed']} jobs, {result['claims_written']} claims written") return 0 @@ -581,7 +582,10 @@ def build_parser() -> argparse.ArgumentParser: re = sub.add_parser("run-extract", help="Drain 'extract' jobs → claims via the local LLM (§4.2)") re.add_argument("--limit", type=int, default=5, help="max jobs to process this run") - re.add_argument("--max-chunks", type=int, default=4, help="max chunks per document") + re.add_argument("--max-chunks", type=int, default=999, + help="max chunks per document (default: full coverage (999))") + re.add_argument("--chunk-chars", type=int, default=12_000, + help="chars per extraction chunk; smaller = better recall, more LLM calls") re.set_defaults(func=cmd_run_extract) sub.add_parser("queue-status", help="Backfill queue counts by type/state").set_defaults(func=cmd_queue_status) diff --git a/signal_engine/extract/claims.py b/signal_engine/extract/claims.py index 1e6fb4b..3351221 100644 --- a/signal_engine/extract/claims.py +++ b/signal_engine/extract/claims.py @@ -30,25 +30,51 @@ def register_seed_topics(conn: sqlite3.Connection) -> None: conn.commit() +# Coarse→fine split boundaries. Transcripts arrive as `Speaker: turn` lines joined by a SINGLE +# newline (ASR output has no blank-line paragraphs), filings as paragraph text — so splitting on +# "\n\n" alone never fires on a transcript and the whole episode would go in one call. "" is the +# per-character hard cap that guarantees termination regardless of punctuation. +_SEPARATORS = ["\n\n", "\n", ". ", " ", ""] + + def chunk_text(text: str, max_chars: int) -> list[str]: - """Split on paragraph boundaries into windows that fit the model context alongside the prompt.""" + """Pack text into windows that each fit the model context alongside the prompt. + + Falls through paragraph → line → sentence → word → hard char-slice, so NO chunk ever exceeds + max_chars however the source is punctuated, while keeping speaker turns intact when they fit. + """ + if max_chars < 1: # else _pack recurses past the last separator → IndexError + raise ValueError(f"max_chars must be >= 1, got {max_chars}") text = text.strip() if not text: return [] + return _pack(text, max_chars, _SEPARATORS) + + +def _pack(text: str, max_chars: int, seps: list[str]) -> list[str]: + """Recursively pack `text` on the coarsest separator in `seps` that keeps chunks within + max_chars, descending to a finer one only for a part that is itself still too big.""" if len(text) <= max_chars: return [text] - chunks: list[str] = [] - cur: list[str] = [] - size = 0 - for para in text.split("\n\n"): - if size + len(para) > max_chars and cur: - chunks.append("\n\n".join(cur)) - cur, size = [], 0 - cur.append(para) - size += len(para) + 2 + sep, rest = seps[0], seps[1:] + parts = list(text) if sep == "" else text.split(sep) + out: list[str] = [] + cur = "" + for p in parts: + candidate = p if not cur else cur + sep + p + if len(candidate) <= max_chars: + cur = candidate + continue + if cur: + out.append(cur) + if len(p) <= max_chars: + cur = p + else: # a single part still too big → split it on the next-finer boundary + out.extend(_pack(p, max_chars, rest)) + cur = "" if cur: - chunks.append("\n\n".join(cur)) - return chunks + out.append(cur) + return out def _parse_claims(content: str) -> list[dict]: diff --git a/signal_engine/extract/worker.py b/signal_engine/extract/worker.py index fa0d0a0..a55aee9 100644 --- a/signal_engine/extract/worker.py +++ b/signal_engine/extract/worker.py @@ -28,8 +28,8 @@ def _document_text(doc, *, user_agent: str) -> str: raise ValueError(f"no text source for {doc['doc_id']} (kind={doc['kind']}, url={doc['url']})") -def run_extract(conn, sc, cfg, *, limit: int = 10, max_chunks_per_doc: int = 4, - chunk_chars: int = 18_000, lease_seconds: int = 900, +def run_extract(conn, sc, cfg, *, limit: int = 10, max_chunks_per_doc: int = 999, + chunk_chars: int = 12_000, lease_seconds: int = 900, worker_id: str = "extract-1") -> dict: from .backends import from_config as backend_from_config backend = backend_from_config(cfg, sc)