"""Backfill job queue over the `backfill_jobs` table (§13.4). Model the corpus backfill as a managed GPU-hours queue, not a real-time fan-out. Producers (ingestion) enqueue lightweight job descriptors; a SINGLE worker leases and drains them one at a time so audio never goes parallel (→ 503). Jobs are: - idempotent: UNIQUE(job_type, input_hash); re-enqueue of seen content is a no-op. - crash-safe: leases expire, so a dead worker's job returns to the pool automatically. - prioritized: lower `priority` runs first (backtest corpus + filings jump ahead). This is plain SQLite so the whole queue is `SELECT * FROM backfill_jobs`. """ from __future__ import annotations import sqlite3 from typing import Any, Optional, Sequence LEASE_SECONDS_DEFAULT = 600 def enqueue( conn: sqlite3.Connection, *, job_type: str, target_id: str, input_hash: str, parent_doc_id: str | None = None, priority: int = 100, max_attempts: int = 5, ) -> Optional[int]: """Insert a job. Returns job_id, or None if this (job_type, input_hash) is already queued/done (idempotent skip — §13.4).""" cur = conn.execute( """INSERT OR IGNORE INTO backfill_jobs (job_type, target_id, parent_doc_id, priority, max_attempts, input_hash, state) VALUES (?,?,?,?,?,?, 'pending')""", (job_type, target_id, parent_doc_id, priority, max_attempts, input_hash), ) conn.commit() return cur.lastrowid if cur.rowcount else None def lease_next( conn: sqlite3.Connection, *, worker_id: str, job_types: Sequence[str] | None = None, lease_seconds: int = LEASE_SECONDS_DEFAULT, ) -> Optional[sqlite3.Row]: """Atomically claim the highest-priority eligible job. Eligible = pending, OR a running/leased job whose lease has expired (crash recovery). Increments `attempts`.""" params: list[Any] = [] type_filter = "" if job_types: type_filter = f" AND job_type IN ({','.join('?' * len(job_types))})" params.extend(job_types) row = conn.execute( f"""SELECT job_id FROM backfill_jobs WHERE (state = 'pending' OR (state IN ('leased','running') AND lease_expires_at IS NOT NULL AND lease_expires_at < datetime('now'))) {type_filter} ORDER BY priority ASC, job_id ASC LIMIT 1""", params, ).fetchone() if row is None: return None conn.execute( """UPDATE backfill_jobs SET state='running', lease_owner=?, lease_expires_at=datetime('now', ?), attempts=attempts+1, updated_at=datetime('now') WHERE job_id=?""", (worker_id, f"+{int(lease_seconds)} seconds", row["job_id"]), ) conn.commit() return conn.execute("SELECT * FROM backfill_jobs WHERE job_id=?", (row["job_id"],)).fetchone() def complete(conn: sqlite3.Connection, job_id: int, *, output_ref: str | None = None, gpu_seconds: float | None = None) -> None: conn.execute( """UPDATE backfill_jobs SET state='done', output_ref=?, gpu_seconds=?, error=NULL, updated_at=datetime('now') WHERE job_id=?""", (output_ref, gpu_seconds, job_id), ) conn.commit() def fail(conn: sqlite3.Connection, job_id: int, error: Any) -> str: """Retry (→ pending) if attempts remain, else dead-letter (→ failed). Returns the new state.""" row = conn.execute( "SELECT attempts, max_attempts FROM backfill_jobs WHERE job_id=?", (job_id,) ).fetchone() exhausted = bool(row) and row["attempts"] >= row["max_attempts"] new_state = "failed" if exhausted else "pending" conn.execute( """UPDATE backfill_jobs SET state=?, error=?, lease_owner=NULL, lease_expires_at=NULL, updated_at=datetime('now') WHERE job_id=?""", (new_state, str(error)[:2000], job_id), ) conn.commit() return new_state def skip(conn: sqlite3.Connection, job_id: int, reason: str | None = None) -> None: """Terminal non-error skip (e.g. a chunk that produced zero claims is still 'done', but an intentionally dropped job is 'skipped').""" conn.execute( "UPDATE backfill_jobs SET state='skipped', error=?, updated_at=datetime('now') WHERE job_id=?", (reason, job_id), ) conn.commit() def stats(conn: sqlite3.Connection) -> dict[str, dict[str, int]]: rows = conn.execute( "SELECT job_type, state, COUNT(*) AS n FROM backfill_jobs GROUP BY job_type, state" ).fetchall() out: dict[str, dict[str, int]] = {} for r in rows: out.setdefault(r["job_type"], {})[r["state"]] = r["n"] return out