124 lines
4.6 KiB
Python
124 lines
4.6 KiB
Python
"""Backfill job queue over the `backfill_jobs` table (§13.4).
|
|
|
|
Model the corpus backfill as a managed GPU-hours queue, not a real-time fan-out. Producers
|
|
(ingestion) enqueue lightweight job descriptors; a SINGLE worker leases and drains them one at a
|
|
time so audio never goes parallel (→ 503). Jobs are:
|
|
- idempotent: UNIQUE(job_type, input_hash); re-enqueue of seen content is a no-op.
|
|
- crash-safe: leases expire, so a dead worker's job returns to the pool automatically.
|
|
- prioritized: lower `priority` runs first (backtest corpus + filings jump ahead).
|
|
|
|
This is plain SQLite so the whole queue is `SELECT * FROM backfill_jobs`.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
from typing import Any, Optional, Sequence
|
|
|
|
LEASE_SECONDS_DEFAULT = 600
|
|
|
|
|
|
def enqueue(
|
|
conn: sqlite3.Connection,
|
|
*,
|
|
job_type: str,
|
|
target_id: str,
|
|
input_hash: str,
|
|
parent_doc_id: str | None = None,
|
|
priority: int = 100,
|
|
max_attempts: int = 5,
|
|
) -> Optional[int]:
|
|
"""Insert a job. Returns job_id, or None if this (job_type, input_hash) is already queued/done
|
|
(idempotent skip — §13.4)."""
|
|
cur = conn.execute(
|
|
"""INSERT OR IGNORE INTO backfill_jobs
|
|
(job_type, target_id, parent_doc_id, priority, max_attempts, input_hash, state)
|
|
VALUES (?,?,?,?,?,?, 'pending')""",
|
|
(job_type, target_id, parent_doc_id, priority, max_attempts, input_hash),
|
|
)
|
|
conn.commit()
|
|
return cur.lastrowid if cur.rowcount else None
|
|
|
|
|
|
def lease_next(
|
|
conn: sqlite3.Connection,
|
|
*,
|
|
worker_id: str,
|
|
job_types: Sequence[str] | None = None,
|
|
lease_seconds: int = LEASE_SECONDS_DEFAULT,
|
|
) -> Optional[sqlite3.Row]:
|
|
"""Atomically claim the highest-priority eligible job. Eligible = pending, OR a running/leased
|
|
job whose lease has expired (crash recovery). Increments `attempts`."""
|
|
params: list[Any] = []
|
|
type_filter = ""
|
|
if job_types:
|
|
type_filter = f" AND job_type IN ({','.join('?' * len(job_types))})"
|
|
params.extend(job_types)
|
|
row = conn.execute(
|
|
f"""SELECT job_id FROM backfill_jobs
|
|
WHERE (state = 'pending'
|
|
OR (state IN ('leased','running')
|
|
AND lease_expires_at IS NOT NULL
|
|
AND lease_expires_at < datetime('now')))
|
|
{type_filter}
|
|
ORDER BY priority ASC, job_id ASC
|
|
LIMIT 1""",
|
|
params,
|
|
).fetchone()
|
|
if row is None:
|
|
return None
|
|
conn.execute(
|
|
"""UPDATE backfill_jobs
|
|
SET state='running', lease_owner=?, lease_expires_at=datetime('now', ?),
|
|
attempts=attempts+1, updated_at=datetime('now')
|
|
WHERE job_id=?""",
|
|
(worker_id, f"+{int(lease_seconds)} seconds", row["job_id"]),
|
|
)
|
|
conn.commit()
|
|
return conn.execute("SELECT * FROM backfill_jobs WHERE job_id=?", (row["job_id"],)).fetchone()
|
|
|
|
|
|
def complete(conn: sqlite3.Connection, job_id: int, *, output_ref: str | None = None,
|
|
gpu_seconds: float | None = None) -> None:
|
|
conn.execute(
|
|
"""UPDATE backfill_jobs SET state='done', output_ref=?, gpu_seconds=?, error=NULL,
|
|
updated_at=datetime('now') WHERE job_id=?""",
|
|
(output_ref, gpu_seconds, job_id),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def fail(conn: sqlite3.Connection, job_id: int, error: Any) -> str:
|
|
"""Retry (→ pending) if attempts remain, else dead-letter (→ failed). Returns the new state."""
|
|
row = conn.execute(
|
|
"SELECT attempts, max_attempts FROM backfill_jobs WHERE job_id=?", (job_id,)
|
|
).fetchone()
|
|
exhausted = bool(row) and row["attempts"] >= row["max_attempts"]
|
|
new_state = "failed" if exhausted else "pending"
|
|
conn.execute(
|
|
"""UPDATE backfill_jobs SET state=?, error=?, lease_owner=NULL, lease_expires_at=NULL,
|
|
updated_at=datetime('now') WHERE job_id=?""",
|
|
(new_state, str(error)[:2000], job_id),
|
|
)
|
|
conn.commit()
|
|
return new_state
|
|
|
|
|
|
def skip(conn: sqlite3.Connection, job_id: int, reason: str | None = None) -> None:
|
|
"""Terminal non-error skip (e.g. a chunk that produced zero claims is still 'done', but an
|
|
intentionally dropped job is 'skipped')."""
|
|
conn.execute(
|
|
"UPDATE backfill_jobs SET state='skipped', error=?, updated_at=datetime('now') WHERE job_id=?",
|
|
(reason, job_id),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def stats(conn: sqlite3.Connection) -> dict[str, dict[str, int]]:
|
|
rows = conn.execute(
|
|
"SELECT job_type, state, COUNT(*) AS n FROM backfill_jobs GROUP BY job_type, state"
|
|
).fetchall()
|
|
out: dict[str, dict[str, int]] = {}
|
|
for r in rows:
|
|
out.setdefault(r["job_type"], {})[r["state"]] = r["n"]
|
|
return out
|