// Process-global FIFO queue for hardware-backed jobs. // // Why this exists: Spark Control + the operator's GPU box can really // only handle one full pipeline at a time. Pre-queue, two concurrent // /relay/v1/summarize-url requests would each spin up their own // async worker — both would fire 2-parallel transcribe chunks + 2 // parallel diarize calls (and analyze on top), totalling 8+ // simultaneous calls into Spark Control. The GPU thrashes and // either both jobs slow down massively or chunks start failing. // // The fix is a single-slot semaphore around any code path that // drives the hardware backend. Gemini paths bypass entirely — the // Google API handles arbitrary concurrency at its end and we have // no local bottleneck. Intra-job concurrency (the operator's // configured 2-parallel chunk worker) stays — the operator's GPU // can handle 2 simultaneous transcribe POSTs per the SC dev's // guidance; the queue serializes whole jobs, not individual calls // within a job. // // Use: // const release = await acquireHardwareSlot({ // jobId, // onWait: (info) => { ... emit "queued" SSE event ... }, // }); // try { // // ... do hardware-backed pipeline ... // } finally { // release(); // } // // onWait fires AT MOST ONCE, before this caller starts waiting. // `info.position` is 1-indexed (1 = next up after the current // active job, 2 = one slot back, etc). Callers use this to push // a "queued" event to the SSE client so the UI can show "Queued // — N jobs ahead of you". let queueHead = Promise.resolve(); let pendingCount = 0; // includes the currently-active job let currentJobId = null; let queueLog = []; // bounded ring buffer for /admin/queue visibility const MAX_LOG_ENTRIES = 50; function logQueueEvent(event) { queueLog.push({ at: Date.now(), ...event }); if (queueLog.length > MAX_LOG_ENTRIES) { queueLog = queueLog.slice(-MAX_LOG_ENTRIES); } } export async function acquireHardwareSlot({ jobId, onWait } = {}) { pendingCount += 1; const positionInQueue = pendingCount - 1; // 0 = active immediately, 1+ = waiting const prev = queueHead; let releaseFn; const slot = new Promise((resolve) => { releaseFn = resolve; }); queueHead = prev.then(() => slot); if (positionInQueue > 0) { logQueueEvent({ kind: "enqueued", jobId: jobId || null, position: positionInQueue, activeJobId: currentJobId, }); if (typeof onWait === "function") { try { onWait({ position: positionInQueue, activeJobId: currentJobId }); } catch {} } console.log( `[hardware-queue] job ${jobId ? jobId.slice(0, 8) : "(no-id)"} queued at position ${positionInQueue} (active: ${currentJobId ? currentJobId.slice(0, 8) : "none"})` ); } await prev; currentJobId = jobId || null; logQueueEvent({ kind: "started", jobId: jobId || null }); console.log( `[hardware-queue] job ${jobId ? jobId.slice(0, 8) : "(no-id)"} started — ${pendingCount - 1} waiting` ); let released = false; return () => { if (released) return; released = true; const wasActive = currentJobId; currentJobId = null; pendingCount -= 1; logQueueEvent({ kind: "released", jobId: wasActive }); console.log( `[hardware-queue] job ${wasActive ? wasActive.slice(0, 8) : "(no-id)"} released — ${pendingCount} still in queue` ); releaseFn(); }; } // Read-only snapshot for the operator dashboard. Returns: // pendingCount: total jobs in the queue (active + waiting) // currentJobId: the job currently holding the slot, or null // log: recent queue events (most recent last; bounded to 50) export function getHardwareQueueStatus() { return { pendingCount, currentJobId, log: queueLog.slice(), }; }