Files
ten31-signal-engine/signal_engine/ingest/docs.py
T

160 lines
6.8 KiB
Python

"""Text-document fetcher for the Battery (bitcoin-collateralized lending) corpus and any non-filing,
non-audio source: policy primaries (SEC SABs, OCC/FDIC/Fed), lender/issuer blogs, credit-market data.
Unlike EDGAR (CIK-driven) and the podcast path (audio→transcribe), these are dated HTML pages, PDFs, or
article RSS feeds. We fetch ONCE, extract clean text (HTML via html_to_text, PDF via pypdf), save it, and
point documents.transcript_path at the saved text so the extract worker reads it directly (it already
supports transcript_path) — this also lets PDFs work, which the worker's on-demand html_to_text fetch can't.
A source row must exist first (FK). Lineage/axis live on the source's cluster/notes (set in the seed);
policy sources are axis=context and must NOT feed the supply resolver (weight 0) — enforced downstream.
"""
from __future__ import annotations
import hashlib
import io
import logging
import sqlite3
from pathlib import Path
import requests
from ..backfill import queue
from ..extract.html_text import html_to_text
from .feeds import fetch_feed
log = logging.getLogger(__name__)
DEFAULT_UA = "ten31-signal-engine/1.0 (research; contact ops@ten31.xyz)"
def _pdf_to_text(data: bytes, *, max_chars: int) -> str:
import pypdf
reader = pypdf.PdfReader(io.BytesIO(data))
parts: list[str] = []
total = 0
for page in reader.pages:
t = page.extract_text() or ""
parts.append(t)
total += len(t)
if total > max_chars:
break
return "\n".join(parts)[:max_chars]
def fetch_clean_text(url: str, *, method: str = "auto", ua: str = DEFAULT_UA,
timeout: int = 90, max_chars: int = 300_000) -> str:
"""Fetch a URL once and return clean text. Auto-detects PDF vs HTML by content-type + magic bytes."""
r = requests.get(url, headers={"User-Agent": ua}, timeout=timeout)
r.raise_for_status()
ctype = r.headers.get("Content-Type", "").lower()
is_pdf = method == "pdf" or "application/pdf" in ctype or r.content[:5] == b"%PDF-"
if is_pdf:
return _pdf_to_text(r.content, max_chars=max_chars)
return html_to_text(r.text, max_chars=max_chars)
_BLOCK_MARKERS = (
"aggressive automated scraping", "request access", "access denied", "are you a robot",
"enable javascript", "captcha", "verify you are human", "rate limit exceeded",
"403 forbidden", "unusual traffic", "checking your browser",
)
def _looks_blocked(text: str) -> bool:
"""Anti-scraping interstitials return 200 + a short access-denied body. Detect so we don't ingest
a block page as if it were the document (a real policy/blog doc is long and has no such markers)."""
low = text[:2500].lower()
return any(m in low for m in _BLOCK_MARKERS)
def _doc_id(source_id: str, url: str) -> str:
return f"doc:{source_id}:{hashlib.sha256(url.encode()).hexdigest()[:12]}"
def ingest_one(conn: sqlite3.Connection, cfg, *, source_id: str, url: str, title: str,
date: str | None, method: str = "auto", prompt_version: str = "extract-v0",
min_chars: int = 400) -> str | None:
"""Fetch+store one text document and enqueue extraction. Idempotent on (source_id, url).
Returns doc_id if newly ingested, else None (duplicate, too-short, or fetch error → logged)."""
doc_id = _doc_id(source_id, url)
if conn.execute("SELECT 1 FROM documents WHERE doc_id=?", (doc_id,)).fetchone():
return None
ua = getattr(cfg, "user_agent", None) or DEFAULT_UA
try:
text = fetch_clean_text(url, method=method, ua=ua)
except Exception as e: # noqa: BLE001
log.warning("doc fetch failed %s: %s", url, e)
return None
if not text or len(text) < min_chars:
log.warning("doc too short (%d chars), skipping %s", len(text or ""), url)
return None
if _looks_blocked(text):
log.warning("blocked/anti-scrape page detected, skipping %s", url)
return None
safe = doc_id.replace(":", "_")
tpath = Path(cfg.data_dir) / "docs" / f"{safe}.txt"
tpath.parent.mkdir(parents=True, exist_ok=True)
tpath.write_text(text)
content_hash = hashlib.sha256(text.encode()).hexdigest()
conn.execute(
"""INSERT OR IGNORE INTO documents
(doc_id, source_id, kind, external_id, url, title, date, transcript_path, content_hash, processed_at)
VALUES (?,?,?,?,?,?,?,?,?,datetime('now'))""",
(doc_id, source_id, "filing", url, url, title[:300] if title else url, date, str(tpath), content_hash),
)
conn.commit()
h = hashlib.sha256(f"{doc_id}|{prompt_version}".encode()).hexdigest()
queue.enqueue(conn, job_type="extract", target_id=doc_id, input_hash=h,
parent_doc_id=doc_id, priority=50)
conn.commit()
log.info("ingested doc %s (%d chars) for %s", doc_id, len(text), source_id)
return doc_id
def ingest_manifest(conn: sqlite3.Connection, cfg, path) -> dict:
"""Batch-ingest the docs listed in a YAML manifest ({docs:[{source,url,title,date,method}]}).
Returns {ingested, skipped, missing_source}. Each source must already exist (FK)."""
import yaml
from pathlib import Path as _Path
data = yaml.safe_load(_Path(path).read_text()) or {}
docs = data.get("docs", [])
ingested = skipped = missing = 0
for d in docs:
src = d.get("source")
if not conn.execute("SELECT 1 FROM sources WHERE source_id=?", (src,)).fetchone():
log.warning("manifest doc references missing source %r — skipping %s", src, d.get("url"))
missing += 1
continue
doc_id = ingest_one(conn, cfg, source_id=src, url=d["url"], title=d.get("title", d["url"]),
date=d.get("date"), method=d.get("method", "auto"))
if doc_id:
ingested += 1
else:
skipped += 1
return {"ingested": ingested, "skipped": skipped, "missing_source": missing}
def ingest_feed_text(conn: sqlite3.Connection, cfg, *, source_id: str, rss_url: str,
since: str | None = None, until: str | None = None, limit: int = 50) -> int:
"""Ingest the ARTICLE bodies behind a text RSS feed (blog/press feed). Each item's link is fetched
and stored as a dated text document. Returns count of newly-ingested docs."""
from .feeds import _published_iso
parsed = fetch_feed(rss_url, user_agent=getattr(cfg, "user_agent", None) or DEFAULT_UA)
n = 0
for entry in parsed.entries:
if n >= limit:
break
link = entry.get("link")
if not link:
continue
date = _published_iso(entry)
if since and date and date < since:
continue
if until and date and date > until:
continue
if ingest_one(conn, cfg, source_id=source_id, url=link,
title=entry.get("title", link), date=date):
n += 1
return n