"""SEC EDGAR ingestion (§4.1). Hits the official data.sec.gov / www.sec.gov APIs directly (free, keyless, full history). Two hard requirements: - a descriptive User-Agent (SEC 403s requests without one) — from config.edgar_user_agent. - ≤10 requests/sec aggregate — enforced by a min-interval throttle here. Supports an explicit date range AND historical shards (filings.files[]), so the §7.1 backtest can reach 2022–2023 filings, not just the most-recent ~1000. """ from __future__ import annotations import hashlib import sqlite3 import time from typing import Iterator import requests _FILING_COLS = ("accessionNumber", "form", "filingDate", "primaryDocument", "primaryDocDescription") class EdgarClient: BASE_DATA = "https://data.sec.gov" BASE_WWW = "https://www.sec.gov" def __init__(self, user_agent: str, *, min_interval: float = 0.12) -> None: if not user_agent or "@" not in user_agent: raise ValueError("EDGAR requires a descriptive User-Agent with contact email (config.edgar_user_agent)") self.s = requests.Session() self.s.headers.update({"User-Agent": user_agent, "Accept-Encoding": "gzip, deflate"}) self.min_interval = min_interval self._last = 0.0 self._tickers: dict[str, int] | None = None def _throttle(self) -> None: dt = time.monotonic() - self._last if dt < self.min_interval: time.sleep(self.min_interval - dt) self._last = time.monotonic() def _get(self, url: str) -> requests.Response: self._throttle() r = self.s.get(url, timeout=30) r.raise_for_status() return r # ---- ticker → CIK ---- def ticker_map(self) -> dict[str, int]: if self._tickers is None: data = self._get(f"{self.BASE_WWW}/files/company_tickers.json").json() self._tickers = {row["ticker"].upper(): int(row["cik_str"]) for row in data.values()} return self._tickers def cik_for(self, ticker: str) -> int | None: return self.ticker_map().get(ticker.upper()) # ---- filings ---- def _iter_array(self, block: dict, forms, since, until) -> Iterator[dict]: arrays = [block.get(c, []) for c in _FILING_COLS] for acc, form, fdate, pdoc, pdesc in zip(*arrays): if forms and form not in forms: continue if since and fdate < since: continue if until and fdate > until: continue yield {"accession": acc, "form": form, "filing_date": fdate, "primary_document": pdoc, "description": pdesc} def iter_filings( self, cik: int, *, forms: tuple[str, ...] = ("10-K", "10-Q", "8-K"), since: str | None = None, until: str | None = None, ) -> Iterator[dict]: """Yield filing descriptors. Pulls the inline 'recent' block AND any historical shards whose date window overlaps [since, until] — required to reach the backtest era for active filers.""" sub = self._get(f"{self.BASE_DATA}/submissions/CIK{cik:010d}.json").json() recent = sub.get("filings", {}).get("recent", {}) for f in self._iter_array(recent, forms, since, until): yield self._with_url(cik, f) for shard in sub.get("filings", {}).get("files", []): # shard has filingFrom / filingTo; skip shards entirely outside the window. if until and shard.get("filingFrom", "") > until: continue if since and shard.get("filingTo", "9999") < since: continue block = self._get(f"{self.BASE_DATA}/submissions/{shard['name']}").json() for f in self._iter_array(block, forms, since, until): yield self._with_url(cik, f) def _with_url(self, cik: int, f: dict) -> dict: acc_nodash = f["accession"].replace("-", "") f["cik"] = cik f["url"] = f"{self.BASE_WWW}/Archives/edgar/data/{cik}/{acc_nodash}/{f['primary_document']}" return f def fetch_html(self, filing: dict) -> str: return self._get(filing["url"]).text # Domestic annual/quarterly + foreign-private-issuer equivalents. 20-F (foreign annual, e.g. TSM/IREN), # 40-F (Canadian annual, e.g. CCJ). 8-K/6-K (current reports) excluded by default — low claim yield. HIGH_YIELD_FORMS = ("10-K", "10-Q", "20-F", "40-F") def ingest_filings( conn: sqlite3.Connection, client: EdgarClient, *, source_id: str, ticker: str, since: str | None = None, until: str | None = None, forms: tuple[str, ...] = HIGH_YIELD_FORMS, prompt_version: str = "extract-v0", ) -> tuple[int, int]: """Insert filing documents and enqueue 'extract' jobs. Filings are text → no transcription; they go straight to extraction (the extract worker fetches + strips the HTML later). Default forms cover both domestic (10-K/10-Q) and foreign-private-issuer (20-F/40-F) filers. Returns (new_documents, new_jobs). Idempotent on (source_id, accession).""" from ..backfill import queue cik = client.cik_for(ticker) if cik is None: raise ValueError(f"No CIK found for ticker {ticker!r}") n_docs = n_jobs = 0 for f in client.iter_filings(cik, forms=forms, since=since, until=until): doc_id = f"edgar:{f['accession']}" cur = conn.execute( """INSERT OR IGNORE INTO documents (doc_id, source_id, kind, external_id, url, title, date) VALUES (?,?,?,?,?,?,?)""", (doc_id, source_id, "filing", f["accession"], f["url"], f"{ticker} {f['form']} {f['filing_date']}", f["filing_date"]), ) conn.commit() if not cur.rowcount: continue n_docs += 1 h = hashlib.sha256(f"{doc_id}|{prompt_version}".encode()).hexdigest() # priority 50: filings are high-info-density (§4.1) → ahead of podcasts (100) if queue.enqueue(conn, job_type="extract", target_id=doc_id, input_hash=h, parent_doc_id=doc_id, priority=50) is not None: n_jobs += 1 return n_docs, n_jobs