Files

124 lines
4.6 KiB
Python

"""Backfill job queue over the `backfill_jobs` table (§13.4).
Model the corpus backfill as a managed GPU-hours queue, not a real-time fan-out. Producers
(ingestion) enqueue lightweight job descriptors; a SINGLE worker leases and drains them one at a
time so audio never goes parallel (→ 503). Jobs are:
- idempotent: UNIQUE(job_type, input_hash); re-enqueue of seen content is a no-op.
- crash-safe: leases expire, so a dead worker's job returns to the pool automatically.
- prioritized: lower `priority` runs first (backtest corpus + filings jump ahead).
This is plain SQLite so the whole queue is `SELECT * FROM backfill_jobs`.
"""
from __future__ import annotations
import sqlite3
from typing import Any, Optional, Sequence
LEASE_SECONDS_DEFAULT = 600
def enqueue(
conn: sqlite3.Connection,
*,
job_type: str,
target_id: str,
input_hash: str,
parent_doc_id: str | None = None,
priority: int = 100,
max_attempts: int = 5,
) -> Optional[int]:
"""Insert a job. Returns job_id, or None if this (job_type, input_hash) is already queued/done
(idempotent skip — §13.4)."""
cur = conn.execute(
"""INSERT OR IGNORE INTO backfill_jobs
(job_type, target_id, parent_doc_id, priority, max_attempts, input_hash, state)
VALUES (?,?,?,?,?,?, 'pending')""",
(job_type, target_id, parent_doc_id, priority, max_attempts, input_hash),
)
conn.commit()
return cur.lastrowid if cur.rowcount else None
def lease_next(
conn: sqlite3.Connection,
*,
worker_id: str,
job_types: Sequence[str] | None = None,
lease_seconds: int = LEASE_SECONDS_DEFAULT,
) -> Optional[sqlite3.Row]:
"""Atomically claim the highest-priority eligible job. Eligible = pending, OR a running/leased
job whose lease has expired (crash recovery). Increments `attempts`."""
params: list[Any] = []
type_filter = ""
if job_types:
type_filter = f" AND job_type IN ({','.join('?' * len(job_types))})"
params.extend(job_types)
row = conn.execute(
f"""SELECT job_id FROM backfill_jobs
WHERE (state = 'pending'
OR (state IN ('leased','running')
AND lease_expires_at IS NOT NULL
AND lease_expires_at < datetime('now')))
{type_filter}
ORDER BY priority ASC, job_id ASC
LIMIT 1""",
params,
).fetchone()
if row is None:
return None
conn.execute(
"""UPDATE backfill_jobs
SET state='running', lease_owner=?, lease_expires_at=datetime('now', ?),
attempts=attempts+1, updated_at=datetime('now')
WHERE job_id=?""",
(worker_id, f"+{int(lease_seconds)} seconds", row["job_id"]),
)
conn.commit()
return conn.execute("SELECT * FROM backfill_jobs WHERE job_id=?", (row["job_id"],)).fetchone()
def complete(conn: sqlite3.Connection, job_id: int, *, output_ref: str | None = None,
gpu_seconds: float | None = None) -> None:
conn.execute(
"""UPDATE backfill_jobs SET state='done', output_ref=?, gpu_seconds=?, error=NULL,
updated_at=datetime('now') WHERE job_id=?""",
(output_ref, gpu_seconds, job_id),
)
conn.commit()
def fail(conn: sqlite3.Connection, job_id: int, error: Any) -> str:
"""Retry (→ pending) if attempts remain, else dead-letter (→ failed). Returns the new state."""
row = conn.execute(
"SELECT attempts, max_attempts FROM backfill_jobs WHERE job_id=?", (job_id,)
).fetchone()
exhausted = bool(row) and row["attempts"] >= row["max_attempts"]
new_state = "failed" if exhausted else "pending"
conn.execute(
"""UPDATE backfill_jobs SET state=?, error=?, lease_owner=NULL, lease_expires_at=NULL,
updated_at=datetime('now') WHERE job_id=?""",
(new_state, str(error)[:2000], job_id),
)
conn.commit()
return new_state
def skip(conn: sqlite3.Connection, job_id: int, reason: str | None = None) -> None:
"""Terminal non-error skip (e.g. a chunk that produced zero claims is still 'done', but an
intentionally dropped job is 'skipped')."""
conn.execute(
"UPDATE backfill_jobs SET state='skipped', error=?, updated_at=datetime('now') WHERE job_id=?",
(reason, job_id),
)
conn.commit()
def stats(conn: sqlite3.Connection) -> dict[str, dict[str, int]]:
rows = conn.execute(
"SELECT job_type, state, COUNT(*) AS n FROM backfill_jobs GROUP BY job_type, state"
).fetchall()
out: dict[str, dict[str, int]] = {}
for r in rows:
out.setdefault(r["job_type"], {})[r["state"]] = r["n"]
return out