"""Extraction worker — drains 'extract' jobs from the backfill queue (§4.2, §13.4). Single sequential worker by design: extraction is the heavier serial load on the one LLM GPU. For each job: load the document, get its text (fetch+strip filing HTML, or read a stored transcript), chunk it, run the §4.2 extractor per chunk, persist 0..N claims, complete the job. """ from __future__ import annotations import logging from pathlib import Path import requests from ..backfill import queue from . import claims as claims_mod from .html_text import html_to_text log = logging.getLogger(__name__) def _document_text(doc, *, user_agent: str) -> str: if doc["transcript_path"]: return Path(doc["transcript_path"]).read_text() if doc["kind"] == "filing" and doc["url"]: r = requests.get(doc["url"], headers={"User-Agent": user_agent}, timeout=90) r.raise_for_status() return html_to_text(r.text) raise ValueError(f"no text source for {doc['doc_id']} (kind={doc['kind']}, url={doc['url']})") def run_extract(conn, sc, cfg, *, limit: int = 10, max_chunks_per_doc: int = 999, chunk_chars: int = 12_000, lease_seconds: int = 900, worker_id: str = "extract-1") -> dict: from .backends import from_config as backend_from_config backend = backend_from_config(cfg, sc) log.info("extraction backend: %s", backend.name) claims_mod.register_seed_topics(conn) processed = total_claims = 0 while processed < limit: job = queue.lease_next(conn, worker_id=worker_id, job_types=["extract"], lease_seconds=lease_seconds) if job is None: break processed += 1 doc = conn.execute("SELECT * FROM documents WHERE doc_id=?", (job["target_id"],)).fetchone() if doc is None: queue.skip(conn, job["job_id"], "document missing") continue src = conn.execute("SELECT * FROM sources WHERE source_id=?", (doc["source_id"],)).fetchone() try: text = _document_text(doc, user_agent=cfg.edgar_user_agent) chunks = claims_mod.chunk_text(text, chunk_chars)[:max_chunks_per_doc] doc_claims = 0 for idx, chunk in enumerate(chunks): cl = claims_mod.extract_claims_from_text( backend, chunk, source_name=src["name"] if src else "", source_cluster=src["source_cluster"] if src else None, date=doc["date"], kind=doc["kind"], ) doc_claims += claims_mod.persist_claims(conn, doc=doc, source=src, claims=cl, chunk_idx=idx) conn.execute("UPDATE documents SET processed_at=datetime('now') WHERE doc_id=?", (doc["doc_id"],)) conn.commit() queue.complete(conn, job["job_id"], output_ref=f"{doc_claims} claims / {len(chunks)} chunks") total_claims += doc_claims log.info("extracted %d claims from %s (%d chunks)", doc_claims, doc["doc_id"], len(chunks)) except Exception as e: # noqa: BLE001 state = queue.fail(conn, job["job_id"], e) log.warning("extract failed for %s: %s (→ %s)", job["target_id"], e, state) return {"jobs_processed": processed, "claims_written": total_claims}