Files
ten31-signal-engine/signal_engine/extract/worker.py
T

70 lines
3.2 KiB
Python

"""Extraction worker — drains 'extract' jobs from the backfill queue (§4.2, §13.4).
Single sequential worker by design: extraction is the heavier serial load on the one LLM GPU.
For each job: load the document, get its text (fetch+strip filing HTML, or read a stored transcript),
chunk it, run the §4.2 extractor per chunk, persist 0..N claims, complete the job.
"""
from __future__ import annotations
import logging
from pathlib import Path
import requests
from ..backfill import queue
from . import claims as claims_mod
from .html_text import html_to_text
log = logging.getLogger(__name__)
def _document_text(doc, *, user_agent: str) -> str:
if doc["transcript_path"]:
return Path(doc["transcript_path"]).read_text()
if doc["kind"] == "filing" and doc["url"]:
r = requests.get(doc["url"], headers={"User-Agent": user_agent}, timeout=90)
r.raise_for_status()
return html_to_text(r.text)
raise ValueError(f"no text source for {doc['doc_id']} (kind={doc['kind']}, url={doc['url']})")
def run_extract(conn, sc, cfg, *, limit: int = 10, max_chunks_per_doc: int = 4,
chunk_chars: int = 18_000, lease_seconds: int = 900,
worker_id: str = "extract-1") -> dict:
from .backends import from_config as backend_from_config
backend = backend_from_config(cfg, sc)
log.info("extraction backend: %s", backend.name)
claims_mod.register_seed_topics(conn)
processed = total_claims = 0
while processed < limit:
job = queue.lease_next(conn, worker_id=worker_id, job_types=["extract"], lease_seconds=lease_seconds)
if job is None:
break
processed += 1
doc = conn.execute("SELECT * FROM documents WHERE doc_id=?", (job["target_id"],)).fetchone()
if doc is None:
queue.skip(conn, job["job_id"], "document missing")
continue
src = conn.execute("SELECT * FROM sources WHERE source_id=?", (doc["source_id"],)).fetchone()
try:
text = _document_text(doc, user_agent=cfg.edgar_user_agent)
chunks = claims_mod.chunk_text(text, chunk_chars)[:max_chunks_per_doc]
doc_claims = 0
for idx, chunk in enumerate(chunks):
cl = claims_mod.extract_claims_from_text(
backend, chunk,
source_name=src["name"] if src else "",
source_cluster=src["source_cluster"] if src else None,
date=doc["date"], kind=doc["kind"],
)
doc_claims += claims_mod.persist_claims(conn, doc=doc, source=src, claims=cl, chunk_idx=idx)
conn.execute("UPDATE documents SET processed_at=datetime('now') WHERE doc_id=?", (doc["doc_id"],))
conn.commit()
queue.complete(conn, job["job_id"], output_ref=f"{doc_claims} claims / {len(chunks)} chunks")
total_claims += doc_claims
log.info("extracted %d claims from %s (%d chunks)", doc_claims, doc["doc_id"], len(chunks))
except Exception as e: # noqa: BLE001
state = queue.fail(conn, job["job_id"], e)
log.warning("extract failed for %s: %s (→ %s)", job["target_id"], e, state)
return {"jobs_processed": processed, "claims_written": total_claims}