Files
Keysat 5deffddb17 Fix transcript chunker context overflow; full-coverage extraction defaults
chunk_text split only on "\n\n", but ASR transcripts have none (speaker turns are joined by a single "\n"), so whole 2-3h episodes (~250K chars) went to the extractor in one call and 400'd on context overflow. Fall through paragraph -> line -> sentence -> word -> hard char-slice so no chunk exceeds the cap regardless of punctuation; guard max_chars < 1.

Default extraction to recall-first full coverage (chunk_chars 12K, max_chunks 999) and expose both as run-extract --chunk-chars / --max-chunks.
2026-06-15 22:28:12 -05:00

70 lines
3.2 KiB
Python

"""Extraction worker — drains 'extract' jobs from the backfill queue (§4.2, §13.4).
Single sequential worker by design: extraction is the heavier serial load on the one LLM GPU.
For each job: load the document, get its text (fetch+strip filing HTML, or read a stored transcript),
chunk it, run the §4.2 extractor per chunk, persist 0..N claims, complete the job.
"""
from __future__ import annotations
import logging
from pathlib import Path
import requests
from ..backfill import queue
from . import claims as claims_mod
from .html_text import html_to_text
log = logging.getLogger(__name__)
def _document_text(doc, *, user_agent: str) -> str:
if doc["transcript_path"]:
return Path(doc["transcript_path"]).read_text()
if doc["kind"] == "filing" and doc["url"]:
r = requests.get(doc["url"], headers={"User-Agent": user_agent}, timeout=90)
r.raise_for_status()
return html_to_text(r.text)
raise ValueError(f"no text source for {doc['doc_id']} (kind={doc['kind']}, url={doc['url']})")
def run_extract(conn, sc, cfg, *, limit: int = 10, max_chunks_per_doc: int = 999,
chunk_chars: int = 12_000, lease_seconds: int = 900,
worker_id: str = "extract-1") -> dict:
from .backends import from_config as backend_from_config
backend = backend_from_config(cfg, sc)
log.info("extraction backend: %s", backend.name)
claims_mod.register_seed_topics(conn)
processed = total_claims = 0
while processed < limit:
job = queue.lease_next(conn, worker_id=worker_id, job_types=["extract"], lease_seconds=lease_seconds)
if job is None:
break
processed += 1
doc = conn.execute("SELECT * FROM documents WHERE doc_id=?", (job["target_id"],)).fetchone()
if doc is None:
queue.skip(conn, job["job_id"], "document missing")
continue
src = conn.execute("SELECT * FROM sources WHERE source_id=?", (doc["source_id"],)).fetchone()
try:
text = _document_text(doc, user_agent=cfg.edgar_user_agent)
chunks = claims_mod.chunk_text(text, chunk_chars)[:max_chunks_per_doc]
doc_claims = 0
for idx, chunk in enumerate(chunks):
cl = claims_mod.extract_claims_from_text(
backend, chunk,
source_name=src["name"] if src else "",
source_cluster=src["source_cluster"] if src else None,
date=doc["date"], kind=doc["kind"],
)
doc_claims += claims_mod.persist_claims(conn, doc=doc, source=src, claims=cl, chunk_idx=idx)
conn.execute("UPDATE documents SET processed_at=datetime('now') WHERE doc_id=?", (doc["doc_id"],))
conn.commit()
queue.complete(conn, job["job_id"], output_ref=f"{doc_claims} claims / {len(chunks)} chunks")
total_claims += doc_claims
log.info("extracted %d claims from %s (%d chunks)", doc_claims, doc["doc_id"], len(chunks))
except Exception as e: # noqa: BLE001
state = queue.fail(conn, job["job_id"], e)
log.warning("extract failed for %s: %s (→ %s)", job["target_id"], e, state)
return {"jobs_processed": processed, "claims_written": total_claims}