220 lines
8.1 KiB
JavaScript
220 lines
8.1 KiB
JavaScript
// In-memory background-job tracker. Used by /relay/transcribe-url
|
||
// (and any future long-running endpoint) so the request that kicks
|
||
// off the work returns immediately with a job_id, and the client
|
||
// polls /relay/jobs/{id} to find out when it's done.
|
||
//
|
||
// Rationale: synchronous HTTP responses for multi-minute transcribes
|
||
// are fragile. Any intermediate proxy / load balancer / NAT in the
|
||
// path will drop the connection after some idle/total timeout (often
|
||
// 100s–10min), failing the whole job mid-flight even though the
|
||
// relay backend is working fine. Async jobs sidestep all of that:
|
||
// the long-running work happens off the request path and the client
|
||
// polls short, cheap requests until done.
|
||
//
|
||
// Storage is in-process memory. Restart-survivability is a known
|
||
// gap — a relay restart mid-job loses that job's state, and the
|
||
// client will re-poll forever until it gives up. Acceptable for v1
|
||
// at small relay scale; the audit log already captures every
|
||
// completed call so the operator has a paper trail either way.
|
||
// Migrate to SQLite if/when restart-resilience becomes important.
|
||
//
|
||
// Each job is { id, kind, install_id, status, started_at, updated_at,
|
||
// completed_at?, progress?, result?, error? }
|
||
// status: "queued" | "running" | "complete" | "failed"
|
||
|
||
import { randomUUID } from "crypto";
|
||
import { sanitizeErrorForClient } from "./sanitize-error.js";
|
||
|
||
// All in-memory; lost on restart.
|
||
const jobs = new Map();
|
||
|
||
// Cap how long completed jobs hang around so the map doesn't grow
|
||
// unbounded. Once a client has polled and seen "complete", it'll
|
||
// stop polling — keeping the record 24h gives slow / retried clients
|
||
// a generous window without exhausting memory.
|
||
const RETENTION_MS = 24 * 60 * 60 * 1000;
|
||
|
||
export function createJob({ kind, installId, metadata = {} }) {
|
||
pruneExpired();
|
||
const id = randomUUID();
|
||
const now = Date.now();
|
||
const job = {
|
||
id,
|
||
kind,
|
||
install_id: installId,
|
||
status: "queued",
|
||
started_at: now,
|
||
updated_at: now,
|
||
completed_at: null,
|
||
progress: null,
|
||
result: null,
|
||
error: null,
|
||
metadata,
|
||
// Event log + live subscriber list. Used by jobs that stream
|
||
// incremental results via SSE (e.g., /relay/summarize-url
|
||
// dispatches transcribe_progress, transcribe_complete,
|
||
// window_complete, done, error events). Each event is
|
||
// { type, data, ts } and gets BOTH appended to the log (so a
|
||
// late SSE-connecting client can replay missed events) and
|
||
// pushed to any currently-subscribed callbacks. `subscribers`
|
||
// is intentionally non-enumerable / non-serialized so it never
|
||
// leaks into snapshotJobs() or HTTP responses.
|
||
events: [],
|
||
};
|
||
Object.defineProperty(job, "subscribers", {
|
||
value: new Set(),
|
||
enumerable: false,
|
||
writable: false,
|
||
});
|
||
jobs.set(id, job);
|
||
return job;
|
||
}
|
||
|
||
// Append an event to a job's log AND notify any live SSE
|
||
// subscribers. Used by /relay/summarize-url's background worker to
|
||
// emit per-window progress as it streams in from runChunkedAnalysis.
|
||
// Event shape:
|
||
// { type: "window_complete"|"transcribe_complete"|"done"|"error"|"progress",
|
||
// data: <event payload>,
|
||
// ts: ms-epoch }
|
||
// Subscriber callbacks receive ONLY the new event (not the full log);
|
||
// new subscribers should replay the log themselves on connect.
|
||
export function appendEvent(jobId, type, data) {
|
||
const job = jobs.get(jobId);
|
||
if (!job) return;
|
||
const event = { type, data, ts: Date.now() };
|
||
job.events.push(event);
|
||
job.updated_at = event.ts;
|
||
// Cap the log so a runaway job doesn't blow memory. 1000 events
|
||
// is far beyond any plausible window count (typical: 10-20).
|
||
if (job.events.length > 1000) job.events.shift();
|
||
for (const cb of job.subscribers) {
|
||
try {
|
||
cb(event);
|
||
} catch (err) {
|
||
console.warn(`[jobs] subscriber callback failed: ${err?.message || err}`);
|
||
}
|
||
}
|
||
}
|
||
|
||
// Subscribe to live events from a job. Returns an unsubscribe
|
||
// function the caller MUST call (e.g., on SSE connection close)
|
||
// or the job state will leak the callback closure forever.
|
||
// Returns null when the job no longer exists.
|
||
export function subscribeToJob(jobId, callback) {
|
||
const job = jobs.get(jobId);
|
||
if (!job) return null;
|
||
job.subscribers.add(callback);
|
||
return () => {
|
||
job.subscribers.delete(callback);
|
||
};
|
||
}
|
||
|
||
export function getJob(jobId) {
|
||
pruneExpired();
|
||
return jobs.get(jobId) || null;
|
||
}
|
||
|
||
export function markRunning(jobId) {
|
||
const job = jobs.get(jobId);
|
||
if (!job) return;
|
||
job.status = "running";
|
||
job.updated_at = Date.now();
|
||
}
|
||
|
||
export function setProgress(jobId, message) {
|
||
const job = jobs.get(jobId);
|
||
if (!job) return;
|
||
job.progress = String(message).slice(0, 200);
|
||
job.updated_at = Date.now();
|
||
}
|
||
|
||
export function markComplete(jobId, envelope) {
|
||
const job = jobs.get(jobId);
|
||
if (!job) return;
|
||
job.status = "complete";
|
||
// Keep the full envelope shape on the job (caller decides what to
|
||
// pass — typically { result: {...inner...}, credit_charged, tier }).
|
||
// Internal consumers that read job.result directly still see the
|
||
// wrapped form.
|
||
job.result = envelope;
|
||
job.completed_at = Date.now();
|
||
job.updated_at = job.completed_at;
|
||
// SSE "done" event: emit the INNER result directly so subscribers
|
||
// can read fields off `data.result.title` (or `.transcript`,
|
||
// `.analysis`, etc.) instead of a confusing `data.result.result.title`.
|
||
// The wrapped form (envelope.result) is unwrapped here; if the
|
||
// caller passed a flat result without an inner `.result` key we
|
||
// just pass it through unchanged. credit_charged + tier travel
|
||
// alongside as siblings so the SSE consumer can update its
|
||
// balance display without digging into the result body.
|
||
//
|
||
// Why this matters: Recap-app's SSE handler does
|
||
// `finalResult = data.result`, then reads `finalResult.title`.
|
||
// Before this fix, that landed on the wrapping envelope and every
|
||
// title came back undefined — library entries persisted as
|
||
// "Untitled" despite the relay correctly extracting the real title
|
||
// via yt-dlp. The audit log was unaffected (it reads the local
|
||
// `title` variable directly) which made the bug look like a
|
||
// Recap-side issue. It wasn't.
|
||
const inner = envelope && typeof envelope === "object" && "result" in envelope
|
||
? envelope.result
|
||
: envelope;
|
||
appendEvent(jobId, "done", {
|
||
result: inner,
|
||
credit_charged: envelope?.credit_charged,
|
||
tier: envelope?.tier,
|
||
});
|
||
}
|
||
|
||
export function markFailed(jobId, errorMessage) {
|
||
const job = jobs.get(jobId);
|
||
if (!job) return;
|
||
job.status = "failed";
|
||
// Sanitize at the source so EVERY downstream surface that reads
|
||
// job.error (SSE error event, the per-job GET endpoints, etc.)
|
||
// gets the client-safe wording, without having to remember to
|
||
// sanitize at every call site. The raw operator-internal message
|
||
// stays available on job.error_internal for the admin dashboard +
|
||
// audit log (snapshotJobs exposes both fields).
|
||
const raw = String(errorMessage || "unknown error").slice(0, 500);
|
||
job.error_internal = raw;
|
||
job.error = sanitizeErrorForClient(raw).slice(0, 500);
|
||
job.completed_at = Date.now();
|
||
job.updated_at = job.completed_at;
|
||
// Same terminal event for failures — SSE clients close on this
|
||
// and surface the error to the user.
|
||
appendEvent(jobId, "error", { error: job.error });
|
||
}
|
||
|
||
export function snapshotJobs() {
|
||
pruneExpired();
|
||
return Array.from(jobs.values()).map((j) => ({
|
||
id: j.id,
|
||
kind: j.kind,
|
||
install_id: j.install_id,
|
||
status: j.status,
|
||
started_at: j.started_at,
|
||
updated_at: j.updated_at,
|
||
completed_at: j.completed_at,
|
||
progress: j.progress,
|
||
has_result: j.result != null,
|
||
// Both error variants exposed — the admin dashboard consumes
|
||
// snapshotJobs and can prefer error_internal for operator
|
||
// diagnosis (full backend / spark-control wording intact).
|
||
// External callers should always read `error` (sanitized).
|
||
error: j.error,
|
||
error_internal: j.error_internal || j.error,
|
||
}));
|
||
}
|
||
|
||
function pruneExpired() {
|
||
const cutoff = Date.now() - RETENTION_MS;
|
||
for (const [id, job] of jobs) {
|
||
const ref = job.completed_at || job.updated_at || job.started_at;
|
||
if (ref && ref < cutoff) {
|
||
jobs.delete(id);
|
||
}
|
||
}
|
||
}
|