From 84d56c94c9d57c01fdf50b1eadc9b2bc8f77903f Mon Sep 17 00:00:00 2001 From: Keysat Date: Sat, 13 Jun 2026 13:36:04 -0500 Subject: [PATCH] Add Spark Control hardware backend (diarize, queue, discovery) --- server/backends/hardware.js | 1099 +++++++++++++++++++++++++++----- server/hardware-config.js | 92 +++ server/hardware-queue.js | 110 ++++ server/lan-fetch.js | 41 ++ server/spark-control-events.js | 58 ++ server/spark-control.js | 207 ++++++ 6 files changed, 1435 insertions(+), 172 deletions(-) create mode 100644 server/hardware-config.js create mode 100644 server/hardware-queue.js create mode 100644 server/lan-fetch.js create mode 100644 server/spark-control-events.js create mode 100644 server/spark-control.js diff --git a/server/backends/hardware.js b/server/backends/hardware.js index f15b773..79f757b 100644 --- a/server/backends/hardware.js +++ b/server/backends/hardware.js @@ -9,22 +9,47 @@ // transcribeAudio → { text, segments, duration_seconds } // analyzeText → { text } // -// Both endpoints are reached via plain fetch — no SDK dependency keeps -// the relay container slim and the upstream wire format is dead-simple -// for these two well-known shapes. +// Long-audio handling: Parakeet wrappers on operator GPUs OOM on long +// audio (one shot of a 108-min file returned "CUDA error: unknown +// error" on a 24GB card). transcribeAudio chunks audio > 5 minutes +// into 5-minute slices, transcribes each sequentially, and stitches +// the segments with continuous timestamps. Single-pass for short +// audio, no perf hit. + +import fs from "fs/promises"; +import os from "os"; +import path from "path"; +import { + getAudioDurationSeconds, + splitAudioFile, +} from "../audio-meta.js"; +import { + sortAndDedupeTranscript, + mergeShortEntries, +} from "./gemini.js"; +import { lanFetch } from "../lan-fetch.js"; +import { + clusterSpeakers, + assignSpeakersToSegments, +} from "../speaker-clustering.js"; const ANALYZE_MAX_TOKENS = 16000; // Gemma served locally tends to live on the host's LAN, not the public // internet, so generous timeouts. Same scale as Recap's defaults. const DEFAULT_TIMEOUT_MS = 900_000; -// Defaults used only when the route handler doesn't supply explicit -// model names (e.g. a unit test instantiating the backend directly). -// In production the model names come from relay-config.json via -// setParakeetUrl / setGemmaUrl, so the operator can swap models on -// their Ollama deployment without rebuilding the relay. +// Default Parakeet model name — used only when the route handler +// doesn't supply one (e.g. a unit test instantiating the backend +// directly). In production the value comes from Spark Control +// discovery via resolveHardwareConfig(). +// +// NOTE: there is NO default for the analyze (Gemma) model. An empty +// gemmaModel intentionally triggers autodiscovery via /v1/models in +// analyzeText() below. Substituting a hardcoded default here (the +// old "gemma3:27b") defeats autodiscovery and causes "model does +// not exist" errors when the operator's hardware hosts a different +// model name. const DEFAULT_TRANSCRIBE_MODEL = "parakeet-tdt-0.6b-v3"; -const DEFAULT_ANALYZE_MODEL = "gemma3:27b"; // Normalize an OpenAI-API-compatible base URL: strip trailing slash // AND strip a trailing `/v1` segment if the operator pasted one, @@ -37,27 +62,402 @@ function normalizeApiBase(url) { return s; } +// 503 retry policy — both Parakeet and Sortformer/TitaNet endpoints +// on Spark Control return 503 + a "retry in ~60s" body when the GPU +// container CUDA-wedges and auto-restarts. The right thing to do is +// wait for the documented retry window (or the Retry-After header if +// supplied) and try again. Honor Retry-After when present, default +// to 60s, clamp to [5, 120], and add ±5s jitter so parallel chunks +// don't all wake up and pile on at the exact same moment (thundering +// herd → instant re-wedge). +const SPARK_503_MAX_ATTEMPTS = 4; +function parse503WaitSec(res) { + const ra = res?.headers?.get?.("retry-after"); + const raNum = parseInt(ra || "60", 10); + const base = Math.min(120, Math.max(5, Number.isFinite(raNum) ? raNum : 60)); + const jitter = Math.round((Math.random() * 10 - 5) * 10) / 10; // ±5s + return Math.max(5, Math.round((base + jitter) * 10) / 10); +} +function sleepMs(ms) { + return new Promise((r) => setTimeout(r, ms)); +} + export function createHardwareBackend({ parakeetBaseURL = "", gemmaBaseURL = "", + // Spark Control base URL (no path) — derived by the caller from + // relay_spark_control_url with the /api/endpoints suffix stripped. + // Used only for diarize-chunk (POST {spark}/api/audio/diarize-chunk). + // Transcribe and analyze go to parakeet/gemma respectively, which + // discovery reports as separate hosts on the operator's LAN. + // Empty = diarization can't run (the route handler should not have + // enabled it). + sparkControlBaseURL = "", parakeetModel = DEFAULT_TRANSCRIBE_MODEL, - gemmaModel = DEFAULT_ANALYZE_MODEL, + gemmaModel = "", timeoutMs = DEFAULT_TIMEOUT_MS, + // Chunking knobs — caller MUST source from relay-config.json. The + // defaults here are for unit-test ergonomics only. Production + // callers always pass these from getConfigSnapshot() so the + // operator's Settings-tab edits flow through. + // + // Parallel chunks: per the spark-control LLM developer, Parakeet can + // handle ~4 concurrent transcribe POSTs on a typical single-GPU + // operator setup. Earlier versions hard-serialized to 1 in flight + // out of caution; we now bound by the operator's chosen concurrency. + txChunkSeconds = 5 * 60, + // Overlap (seconds) between consecutive chunks. Defaults to 0 + // (matches pre-v0.2.77 behavior); production callers pass the + // operator's configured `relay_hardware_tx_chunk_overlap_seconds`. + // When > 0, chunks at the boundary share audio and the stitcher + // drops words/segments in chunk N+1 whose global timestamps fall + // within the prior chunk's tail. See splitAudioFile() for the + // chunking math. + txChunkOverlapSeconds = 0, + txConcurrency = 4, + // Diarization: when true, every chunk gets TWO parallel HTTP calls + // — the existing /v1/audio/transcriptions to Parakeet AND a sibling + // /api/audio/diarize-chunk to Sortformer+TitaNet on the same Spark + // Control host. Diarization output (segments + per-chunk voice + // fingerprints) is collected per chunk and returned alongside the + // transcript on the backend response so the relay's caller can run + // cross-chunk speaker clustering. In Phase 1C this just gathers the + // data; the clustering + transcript-merge step lands in Phase 1D. + // Default false (matches the current transcribe-only behavior). + diarizationEnabled = false, + // Cosine-similarity threshold for cross-chunk speaker clustering, + // as an integer percentage (slider system is int-only). 70 means + // "merge two chunks' speaker fingerprints into the same global + // speaker if their cosine similarity >= 0.70" — NeMo's recommended + // default for TitaNet embeddings. Range 50-95 enforced inside the + // clustering module; values outside the range are clamped. + clusterThresholdPct = 70, + // Post-cluster suppression knobs (Phase 2). Operator-tunable via + // Settings → Operator hardware. Used only when diarization is on. + // Defaults match speaker-clustering.js's hardcoded fallbacks. + anchorMinSpeakingSec = 30, + smallClusterMaxSpeakingSec = 15, + uncertainMarginPct = 10, + // Optional per-chunk completion callback for the pipelined-analyze + // path. Fires AFTER both transcribe + diarize have resolved for + // each chunk (Promise.allSettled), in the order chunks finish (not + // necessarily monotonic by chunkIndex when concurrency > 1). + // Receives: + // { + // chunkIndex, // 0-based + // totalChunks, // emitted on the first call so the caller + // // knows the count up front + // startSeconds, // global audio offset where this chunk begins + // durationSeconds, // chunk audio length + // overlapBoundarySec, // RELATIVE to chunk start; segments with + // // global start < (startSeconds + overlap- + // // BoundarySec) belong to the prior chunk + // segments, // raw chunk segments, globally-timestamped + // // (start/end already shifted by startSeconds) + // diarOk, // true | false | null (null = not run) + // } + // Caller uses this to feed a chunk buffer that the pipelined-analyze + // workers read from. When this callback is null (the default), + // transcribe behaves as today — caller gets the final stitched + // result only at the end. + onChunkComplete = null, + // Output cap passed to the operator's vLLM/Ollama chat-completion + // endpoint as `max_tokens`. Default 16000 was the historical + // hardcoded value. Operators with smaller models that produce + // shorter JSON sections may want to drop this; operators on + // larger models with reasoning preambles may want to bump it. + // Configurable via Settings → `relay_hardware_an_max_tokens`. + anMaxTokens = ANALYZE_MAX_TOKENS, } = {}) { const parakeet = normalizeApiBase(parakeetBaseURL); const gemma = normalizeApiBase(gemmaBaseURL); + // Spark Control URL — used ONLY by postDiarizeChunk. Strip any + // trailing /api/endpoints (the discovery path) and any trailing + // slash so the diarize URL composes cleanly as `${sparkBase}/api/ + // audio/diarize-chunk`. + const sparkBase = (sparkControlBaseURL || "") + .trim() + .replace(/\/$/, "") + .replace(/\/api\/endpoints$/, ""); const transcribeModel = parakeetModel || DEFAULT_TRANSCRIBE_MODEL; - const analyzeModel = gemmaModel || DEFAULT_ANALYZE_MODEL; + // Intentionally NOT coalesced to a default — empty string means + // "autodiscover from /v1/models at request time". See the comment + // at DEFAULT_TRANSCRIBE_MODEL above for why. + const analyzeModel = gemmaModel || ""; + + // Single Parakeet POST. The chunking layer in transcribeAudio calls + // this once per chunk; for short audio it's called once with the + // whole buffer. Returns { segments, duration_seconds } that the + // caller stitches. + // + // Tries the OpenAI standard `/v1/audio/transcriptions` first, falls + // back to bare `/audio/transcriptions` on 404. Retries without + // verbose_json + timestamp_granularities[] if the wrapper rejects + // the rich shape. + async function postOneChunk({ audioBuffer, mimeType, offsetSeconds }) { + const buildForm = (richMode) => { + const form = new FormData(); + const blob = new Blob([audioBuffer], { type: mimeType }); + form.append("file", blob, "audio.bin"); + form.append("model", transcribeModel); + if (richMode) { + form.append("response_format", "verbose_json"); + form.append("timestamp_granularities[]", "segment"); + } + return form; + }; + const pathCandidates = [ + "/v1/audio/transcriptions", + "/audio/transcriptions", + ]; + + // Resolve which path the wrapper exposes (404 from one → try the + // other). Done before the 503 loop so we don't waste retry + // budget on a path that doesn't exist. + let resolvedUrl = null; + let res = null; + for (const p of pathCandidates) { + const url = `${parakeet}${p}`; + try { + res = await lanFetch(url, { + method: "POST", + body: buildForm(true), + redirect: "follow", + signal: AbortSignal.timeout(timeoutMs), + }); + } catch (err) { + const cause = + err?.cause?.message || err?.cause?.code || err?.cause || ""; + const detail = cause ? `${err.message} (cause: ${cause})` : err?.message || String(err); + const e = new Error(`Parakeet transcribe network error at ${url}: ${detail}`); + e.status = 502; + throw e; + } + if (res.status !== 404) { + resolvedUrl = url; + break; + } + console.warn( + `[hardware] 404 at ${url} — trying next path candidate` + ); + } + if (!resolvedUrl) { + // All path candidates 404'd — return that 404 below. + resolvedUrl = `${parakeet}${pathCandidates[pathCandidates.length - 1]}`; + } + + // ── 503 retry loop ── + // Spark Control returns 503 + "retry in ~60s" body when the + // Parakeet container is CUDA-wedged and auto-restarting. We wait + // (honoring Retry-After) and retry up to SPARK_503_MAX_ATTEMPTS + // total. Without this loop, the very first chunks in a multi- + // chunk job all hit a single wedge simultaneously and each chunk + // permanently fails after one round-trip. + let attempt = 1; + while (res.status === 503 && attempt < SPARK_503_MAX_ATTEMPTS) { + const waitSec = parse503WaitSec(res); + console.warn( + `[hardware/tx] 503 at ${resolvedUrl} (attempt ${attempt}/${SPARK_503_MAX_ATTEMPTS}) — Spark Control reports transient CUDA wedge, waiting ${waitSec}s then retrying` + ); + await sleepMs(waitSec * 1000); + try { + res = await lanFetch(resolvedUrl, { + method: "POST", + body: buildForm(true), + redirect: "follow", + signal: AbortSignal.timeout(timeoutMs), + }); + } catch (err) { + const cause = + err?.cause?.message || err?.cause?.code || err?.cause || ""; + const detail = cause ? `${err.message} (cause: ${cause})` : err?.message || String(err); + const e = new Error(`Parakeet transcribe network error at ${resolvedUrl} (retry ${attempt}): ${detail}`); + e.status = 502; + throw e; + } + attempt += 1; + } + + // ── Rich-mode fallback ── + // If the wrapper rejected verbose_json + timestamp_granularities + // (some non-OpenAI Parakeet servers do — this is a PARAMETER + // issue, not a server-availability issue), retry once without + // those fields. Distinct from the 503 retry above — different + // failure mode, different fix. We skip this on 503 because the + // 503 loop has already exhausted retries. + if ( + !res.ok && + res.status !== 503 && + res.status >= 400 && + res.status < 600 && + res.status !== 404 + ) { + const richBody = await safeBody(res); + console.warn( + `[hardware] rich Parakeet request to ${resolvedUrl} returned ${res.status}: ${richBody.slice(0, 200)} — retrying without verbose_json` + ); + try { + res = await lanFetch(resolvedUrl, { + method: "POST", + body: buildForm(false), + redirect: "follow", + signal: AbortSignal.timeout(timeoutMs), + }); + } catch (err) { + const e = new Error( + `Parakeet transcribe network error (bare fallback) at ${resolvedUrl}: ${err?.message || err}` + ); + e.status = 502; + throw e; + } + } + if (!res.ok) { + const body = await safeBody(res); + const hint = + res.status === 404 + ? ` (tried ${pathCandidates.join(" and ")} on base ${parakeet} — wrapper may expose the endpoint at a different path; check the Parakeet URL or container logs)` + : res.status === 503 + ? ` (Spark Control reported transient CUDA wedge ${SPARK_503_MAX_ATTEMPTS} times in a row — operator container may need manual restart)` + : ""; + const e = new Error( + `Parakeet transcribe ${res.status} at ${resolvedUrl}: ${body.slice(0, 300)}${hint}` + ); + e.status = res.status; + throw e; + } + const data = await res.json(); + const segments = Array.isArray(data.segments) ? data.segments : []; + const shifted = segments.map((s) => ({ + start: (s.start || 0) + offsetSeconds, + end: (s.end || 0) + offsetSeconds, + text: (s.text || "").trim(), + })); + return { + segments: shifted, + duration_seconds: data.duration || 0, + }; + } + + // Parallel diarization call for the same audio chunk. Hits + // POST {spark_control}/api/audio/diarize-chunk (Sortformer for + // segmentation, TitaNet for per-speaker voice fingerprints). The + // returned `speaker` labels are LOCAL to this chunk — Speaker_0 + // here is not necessarily the same person as Speaker_0 in chunk + // N+1; the relay's cross-chunk clustering step (Phase 1D) + // reconciles them via cosine similarity on the fingerprints. + // + // Retry policy from the SC dev: + // 503 + Retry-After: Parakeet/Sortformer CUDA wedge — container + // is auto-restarting in the background. Wait + retry once. + // 502: container unreachable. Fail. + // 500: real error. Fail. + // + // Segment timestamps are shifted by offsetSeconds so they + // already sit on the global audio timeline when returned — + // matches postOneChunk's behavior so the merge step in Phase 1D + // can align word timestamps and segment timestamps directly + // without re-shifting. + async function postDiarizeChunk({ audioBuffer, mimeType, offsetSeconds }) { + if (!sparkBase) { + const e = new Error( + "Diarization called but sparkControlBaseURL is empty — caller should not have enabled diarization without Service Discovery configured" + ); + e.status = 500; + throw e; + } + const buildForm = () => { + const form = new FormData(); + const blob = new Blob([audioBuffer], { type: mimeType }); + form.append("file", blob, "audio.bin"); + return form; + }; + // Diarize-chunk lives on Spark Control (the same host that serves + // /api/endpoints) — NOT on the Parakeet wrapper. Discovery's + // parakeet.base_url usually points to a separate STT-only service + // that doesn't have this endpoint. Use lanFetch so Spark Control's + // StartOS Local Intermediate CA cert doesn't fail TLS validation. + const url = `${sparkBase}/api/audio/diarize-chunk`; + let res = null; + // 503 retry loop — same policy as the transcribe path. Up to + // SPARK_503_MAX_ATTEMPTS, honoring Retry-After with ±5s jitter + // so parallel chunks don't synchronize their retries and pile + // back onto the freshly-restarted container at the same instant. + for (let attempt = 0; attempt < SPARK_503_MAX_ATTEMPTS; attempt++) { + try { + res = await lanFetch(url, { + method: "POST", + body: buildForm(), + redirect: "follow", + signal: AbortSignal.timeout(timeoutMs), + }); + } catch (err) { + // undici wraps the underlying socket error in err.cause — + // surfacing it makes "fetch failed" diagnosable (ECONNREFUSED + // vs ETIMEDOUT vs TLS handshake vs DNS). + const cause = + err?.cause?.message || err?.cause?.code || err?.cause || ""; + const detail = cause ? `${err.message} (cause: ${cause})` : err?.message || String(err); + const e = new Error(`Diarize network error at ${url}: ${detail}`); + e.status = 502; + throw e; + } + if (res.status === 503 && attempt < SPARK_503_MAX_ATTEMPTS - 1) { + const waitSec = parse503WaitSec(res); + console.warn( + `[hardware/diarize] 503 at ${url} (attempt ${attempt + 1}/${SPARK_503_MAX_ATTEMPTS}) — Spark Control reports transient CUDA wedge, waiting ${waitSec}s then retrying` + ); + await sleepMs(waitSec * 1000); + continue; + } + break; + } + if (!res.ok) { + const body = await safeBody(res); + const hint = + res.status === 503 + ? ` (Spark Control reported transient CUDA wedge ${SPARK_503_MAX_ATTEMPTS} times in a row — operator container may need manual restart)` + : ""; + const e = new Error( + `Diarize ${res.status} at ${url}: ${body.slice(0, 300)}${hint}` + ); + e.status = res.status; + throw e; + } + const data = await res.json(); + const rawSegments = Array.isArray(data.segments) ? data.segments : []; + const shiftedSegments = rawSegments.map((s) => ({ + // Use the same `start` / `end` naming as postOneChunk to keep + // the merge code uniform across segment shapes. + start: (s.start_s || 0) + offsetSeconds, + end: (s.end_s || 0) + offsetSeconds, + speaker_local: s.speaker || "Speaker_unknown", + confidence: + typeof s.confidence === "number" && Number.isFinite(s.confidence) + ? s.confidence + : null, + })); + return { + duration_s: typeof data.duration === "number" ? data.duration : 0, + segments: shiftedSegments, + speakers_local: Array.isArray(data.speakers_detected) + ? data.speakers_detected + : Array.isArray(data.speakers_local) + ? data.speakers_local + : [], + fingerprints: + data.fingerprints && typeof data.fingerprints === "object" + ? data.fingerprints + : {}, + models: data.models || null, + }; + } return { hasTranscribe: !!parakeet, hasAnalyze: !!gemma, - // POST /v1/audio/transcriptions with the OpenAI Whisper - // multipart shape. Parakeet wrappers (NeMo + the patched one Recap - // already talks to) honor this format and return segments with - // per-segment timestamps when timestamp_granularities=segment is - // requested. Falls back to a bare request if the rich shape 4xx/5xxs. + // Public API. Receives a full audio buffer; decides whether to + // send it to Parakeet in one shot or to chunk-and-stitch. async transcribeAudio({ audio, mimeType = "application/octet-stream", @@ -65,204 +465,543 @@ export function createHardwareBackend({ }) { if (!parakeet) { const err = new Error( - "operator-hardware transcribe is not configured (relay_parakeet_base_url is empty)" + "operator-hardware transcribe is not configured — Spark Control discovery isn't reporting a ready parakeet endpoint" ); err.status = 503; throw err; } - // Try the rich request first (verbose_json + segment timestamps). - // FormData/Blob globals are available in Node 20+. Wrap the - // received Buffer in a Blob so the multipart body is properly - // chunked instead of falling back to base64. - const buildForm = (richMode) => { - const form = new FormData(); - const blob = new Blob([audio], { type: mimeType }); - form.append("file", blob, "audio.bin"); - form.append("model", transcribeModel); - if (richMode) { - form.append("response_format", "verbose_json"); - form.append("timestamp_granularities[]", "segment"); - } - return form; - }; + // Write to temp file so ffprobe + ffmpeg can read it (no + // reliable stdin pathway for either tool on every codec). + // Cleanup happens unconditionally in `finally`. + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "hw-tx-")); + const inputPath = path.join(tmpDir, "input." + extFromMime(mimeType)); + await fs.writeFile(inputPath, audio); - // Path candidates, in order. The OpenAI Whisper standard is - // `/v1/audio/transcriptions`; some self-hosted wrappers (or - // operators who pasted their base URL with a path already - // stripped) expose the endpoint at `/audio/transcriptions` - // instead. We try the standard path first, then fall back on - // 404 only — other status codes (rate-limit, 500) shouldn't - // trigger a different path retry. - const pathCandidates = [ - "/v1/audio/transcriptions", - "/audio/transcriptions", - ]; - let res = null; - let lastUrl = null; - let pathErrSummary = null; - for (const p of pathCandidates) { - const url = `${parakeet}${p}`; - lastUrl = url; - try { - res = await fetch(url, { - method: "POST", - body: buildForm(true), - signal: AbortSignal.timeout(timeoutMs), + try { + const duration = await getAudioDurationSeconds(inputPath); + // Build the chunk list. For audio that fits in a single chunk + // (or where ffprobe couldn't read the duration), we synthesize + // a 1-entry descriptor pointing at the ORIGINAL file — no + // ffmpeg cut needed. This routes the short-audio case through + // the SAME worker loop as multi-chunk audio so: + // - diarization still fires (when enabled) + // - onChunkComplete still fires (otherwise pipelined-analyze + // workers wait forever on a callback that never comes — the + // bug that hung Internal Meetings + short YouTube videos) + // - cross-chunk clustering still runs (N=1 cluster, but + // speakers are still labeled on the segments) + // - bracketed-text stitching runs (the single-shot path used + // to return text="" because postOneChunk doesn't emit text) + // For longer audio, splitAudioFile cuts on disk as before. + let chunks; + if (!duration || duration <= txChunkSeconds) { + chunks = [{ + filePath: inputPath, + startSeconds: 0, + durationSeconds: duration || 0, + overlapBoundarySec: 0, + index: 0, + }]; + } else { + chunks = await splitAudioFile({ + inputPath, + outputDir: tmpDir, + chunkSeconds: txChunkSeconds, + overlapSeconds: txChunkOverlapSeconds, }); - } catch (err) { - const e = new Error( - `Parakeet transcribe network error at ${url}: ${err?.message || err}` + } + const effectiveConcurrency = Math.min(chunks.length, Math.max(1, txConcurrency)); + const durLabel = duration ? `${duration.toFixed(0)}s` : "unknown-length"; + if (chunks.length === 1) { + console.log( + `[hardware] ${durLabel} audio fits in a single chunk — sending in one shot (no ffmpeg split)` ); - e.status = 502; + } else { + console.log( + `[hardware] chunking ${durLabel} audio into ${chunks.length} pieces of up to ${txChunkSeconds}s (overlap=${txChunkOverlapSeconds}s), ${effectiveConcurrency} in parallel` + ); + } + if (diarizationEnabled) { + console.log( + `[hardware] diarization ENABLED — each chunk fires 2 parallel calls (transcribe + diarize) against Spark Control` + ); + } + const chunkResults = new Array(chunks.length); + // Per-chunk wall-time tracking. Summed by callers into the + // audit row's transcribe_ms_sum so the operator can see total + // backend compute across all Parakeet POSTs (distinct from + // the outer wall-time which captures the parallel-fan-out). + const chunkDurationsMs = new Array(chunks.length).fill(null); + // Per-chunk diarization data. Populated when + // diarizationEnabled is true. Each entry mirrors the chunk's + // index in `chunks` so the clustering step (Phase 1D) can + // correlate per-chunk speaker labels back to chunk positions. + // null = diarization didn't run OR failed for this chunk. + const chunkDiarization = new Array(chunks.length).fill(null); + let nextIdx = 0; + const worker = async () => { + while (true) { + const i = nextIdx++; + if (i >= chunks.length) return; + const chunk = chunks[i]; + const chunkStart = Date.now(); + try { + console.log( + `[hardware] chunk ${i + 1}/${chunks.length} (start=${chunk.startSeconds}s, len=${chunk.durationSeconds.toFixed(0)}s)` + ); + const buf = await fs.readFile(chunk.filePath); + const chunkOffset = offsetSeconds + chunk.startSeconds; + // Diarization-enabled = parallel transcribe + diarize. + // Diarization-disabled = transcribe only. + // We use Promise.allSettled so a diarize failure doesn't + // take down the transcript for the chunk — the transcript + // is the critical output, diarization is additive metadata. + const calls = [ + postOneChunk({ + audioBuffer: buf, + mimeType, + offsetSeconds: chunkOffset, + }), + ]; + if (diarizationEnabled) { + calls.push( + postDiarizeChunk({ + audioBuffer: buf, + mimeType, + offsetSeconds: chunkOffset, + }) + ); + } + const settled = await Promise.allSettled(calls); + const txSettled = settled[0]; + if (txSettled.status === "rejected") { + throw txSettled.reason; + } + chunkDurationsMs[i] = Date.now() - chunkStart; + chunkResults[i] = { + ok: true, + ...txSettled.value, + startSeconds: chunk.startSeconds, + }; + let diarOk = null; // null = not run, true/false = ran + if (diarizationEnabled) { + const diarSettled = settled[1]; + if (diarSettled.status === "fulfilled") { + diarOk = true; + chunkDiarization[i] = { + ok: true, + chunkIndex: i, + startSeconds: chunk.startSeconds, + // Global timestamp BEFORE which segments in this + // chunk are duplicates of the prior chunk's tail + // (overlap zone). Used by clusterSpeakers to + // dedup diar segments — without this the same + // 30s overlap zone gets counted twice toward + // speaker totals on every chunk boundary. + chunkOverlapBoundarySec: + offsetSeconds + (chunk.overlapBoundarySec || 0), + ...diarSettled.value, + }; + } else { + diarOk = false; + console.warn( + `[hardware] diarize chunk ${i + 1}/${chunks.length} failed (transcript intact): ${diarSettled.reason?.message || diarSettled.reason}` + ); + chunkDiarization[i] = { + ok: false, + chunkIndex: i, + startSeconds: chunk.startSeconds, + error: diarSettled.reason, + }; + } + } + // Per-chunk completion log so the operator can see + // parallelism in the timeline. Earlier we logged only + // when each chunk STARTED, which made concurrent + // execution invisible (start lines look sequential + // because each pair of chunks finishes ~simultaneously, + // and only then does the next pair's first start log + // appear). Logging completion makes the in-flight + // overlap obvious. + const dur = ((chunkDurationsMs[i] || 0) / 1000).toFixed(1); + const diarTag = + diarOk === true ? " · diarize ok" : + diarOk === false ? " · diarize FAIL" : ""; + console.log( + `[hardware] chunk ${i + 1}/${chunks.length} done in ${dur}s${diarTag}` + ); + // Fire pipelined-analyze hook (no-op when not in + // pipelined mode). Hands the caller this chunk's + // segments + position so it can advance its time- + // covered cursor and unblock any window analyses + // that were waiting on this range. + if (onChunkComplete) { + try { + onChunkComplete({ + chunkIndex: i, + totalChunks: chunks.length, + startSeconds: chunk.startSeconds, + durationSeconds: chunk.durationSeconds, + overlapBoundarySec: chunk.overlapBoundarySec || 0, + segments: chunkResults[i].segments || [], + diarOk, + }); + } catch (cbErr) { + // Don't kill transcribe if the caller's callback + // throws — pipelined mode is best-effort, the + // final analyze fallback still works. + console.warn( + `[hardware] onChunkComplete callback failed for chunk ${i + 1}: ${cbErr?.message || cbErr}` + ); + } + } + } catch (err) { + chunkDurationsMs[i] = Date.now() - chunkStart; + console.warn( + `[hardware] chunk ${i + 1}/${chunks.length} failed: ${err?.message || err}` + ); + chunkResults[i] = { ok: false, error: err, startSeconds: chunk.startSeconds }; + } + } + }; + const workers = Array.from({ length: effectiveConcurrency }, worker); + await Promise.all(workers); + + const failedChunks = chunkResults.filter((r) => !r || !r.ok); + const succeededChunks = chunkResults.filter((r) => r && r.ok); + if (succeededChunks.length === 0) { + const first = failedChunks[0]?.error; + const e = new Error( + `Parakeet chunked transcribe: all ${chunks.length} chunks failed. First error: ${first?.message || "unknown"}` + ); + e.status = first?.status || 502; throw e; } - if (res.status !== 404) break; - // 404 → try the next path candidate. Capture the body for the - // final error message if all candidates 404. - pathErrSummary = await safeBody(res); - console.warn( - `[hardware] 404 at ${url} — trying next path candidate` - ); - } - - // If the wrapper rejects the rich params (4xx other than 404 we - // already exhausted, or 5xx), retry with bare-bones at the - // working URL. - if (!res.ok && res.status >= 400 && res.status < 600 && res.status !== 404) { - const richBody = await safeBody(res); - console.warn( - `[hardware] rich Parakeet request to ${lastUrl} returned ${res.status}: ${richBody.slice(0, 200)} — retrying bare` - ); - try { - res = await fetch(lastUrl, { - method: "POST", - body: buildForm(false), - signal: AbortSignal.timeout(timeoutMs), - }); - } catch (err) { - const e = new Error( - `Parakeet transcribe network error (fallback) at ${lastUrl}: ${err?.message || err}` + // Merge in chunk order so timestamps appear chronologically. + // Failed chunks leave gaps (their segments are simply absent); + // the stitched transcript may have time-gaps the analyzer can + // tolerate. Better than aborting the whole job for one + // transient chunk failure. + // + // Overlap dedup: when consecutive chunks share audio at their + // boundary (txChunkOverlapSeconds > 0), chunk N+1's first + // `overlap` seconds will repeat content from chunk N's tail. + // We drop segments from chunk N+1 whose global start time + // falls before the chunk's overlap boundary. Chunk 0 has no + // prior chunk so its boundary equals its own start time and + // the filter is a no-op. + const allSegments = []; + let totalDurationSec = 0; + let dedupedSegmentsCount = 0; + for (let i = 0; i < chunkResults.length; i++) { + const r = chunkResults[i]; + if (!r || !r.ok) continue; + const globalOverlapBoundary = + offsetSeconds + (chunks[i]?.overlapBoundarySec || 0); + for (const seg of r.segments) { + if (seg.start >= globalOverlapBoundary) { + allSegments.push(seg); + } else { + dedupedSegmentsCount += 1; + } + } + totalDurationSec = Math.max( + totalDurationSec, + (r.startSeconds || 0) + (r.duration_seconds || 0) ); - e.status = 502; - throw e; } + if (dedupedSegmentsCount > 0) { + console.log( + `[hardware] dropped ${dedupedSegmentsCount} duplicate segment(s) in chunk overlap regions` + ); + } + const lines = allSegments.length + ? allSegments.map((s) => `[${formatMmSs(s.start)}] ${s.text}`) + : [`[0:00] `]; + // Apply the same sort + merge passes Gemini's backend uses + // so the user-facing transcript is monotonically ordered and + // grouped into readable thought-sized entries instead of one + // entry per Parakeet segment (which can be 1-3 seconds each). + const stitchedText = mergeShortEntries(sortAndDedupeTranscript(lines.join("\n"))); + // Phase 1C: when diarization was enabled, surface the + // per-chunk diarization data so the caller can run the + // cross-chunk clustering step in Phase 1D. Null/absent when + // diarization was off (current default), preserving the + // existing response shape for callers that don't consume it. + // Each entry: { ok, chunkIndex, startSeconds, duration_s, + // segments, speakers_local, fingerprints, models } on + // success; { ok: false, chunkIndex, startSeconds, error } on + // failure (chunk's transcript is still in `segments`/`text`). + const diarizationOut = diarizationEnabled + ? chunkDiarization + : null; + + // ── Phase 1D: cross-chunk speaker clustering ── + // When diarization ran, cluster the per-chunk fingerprints + // into global speaker IDs (Speaker_A, Speaker_B, ...) and + // stamp them onto the merged transcript segments. The raw + // per-chunk diarization data stays in `diarizationOut` for + // debugging; `speakers` is the operator-facing summary; each + // entry in `allSegments` gains `.speaker` + `.speaker_confidence` + // in place. + let speakersOut = null; + if (diarizationEnabled) { + const okCount = chunkDiarization.filter((d) => d && d.ok).length; + const totalFps = chunkDiarization.reduce( + (n, d) => + n + (d && d.ok ? Object.keys(d.fingerprints || {}).length : 0), + 0 + ); + if (totalFps > 0) { + const t0 = Date.now(); + const { globalMap, uncertaintyMap, speakers, clusterCount, thresholdSimilarity } = + clusterSpeakers(chunkDiarization, clusterThresholdPct, { + anchorMinSpeakingSec, + smallClusterMaxSpeakingSec, + uncertainMarginPct, + }); + assignSpeakersToSegments(allSegments, chunkDiarization, globalMap, uncertaintyMap); + const clusterMs = Date.now() - t0; + speakersOut = speakers; + console.log( + `[hardware] diarization: ${okCount}/${chunks.length} chunks succeeded, ${totalFps} fingerprints → ${clusterCount} distinct speaker(s) at ${(thresholdSimilarity * 100).toFixed(0)}% cosine-sim threshold (clustering took ${clusterMs}ms)` + ); + } else { + console.log( + `[hardware] diarization: ${okCount}/${chunks.length} chunks succeeded, 0 fingerprints — clustering skipped` + ); + } + } + return { + text: stitchedText, + segments: allSegments, + duration_seconds: totalDurationSec || duration, + usage: null, + model: transcribeModel, + chunk_count: chunks.length, + chunks_failed: failedChunks.length, + chunk_durations_ms: chunkDurationsMs, + diarization: diarizationOut, + // Map of globalId → summary stats. Null when diarization + // was off OR ran but produced zero fingerprints. See + // server/speaker-clustering.js for the shape. + speakers: speakersOut, + }; + } finally { + try { + await fs.rm(tmpDir, { recursive: true, force: true }); + } catch {} } - - if (!res.ok) { - const body = await safeBody(res); - const hint = - res.status === 404 - ? ` (tried ${pathCandidates.join(" and ")} on base ${parakeet} — wrapper may expose the endpoint at a different path; check the Parakeet URL or container logs)` - : ""; - const e = new Error( - `Parakeet transcribe ${res.status} at ${lastUrl}: ${body.slice(0, 300)}${hint}` - ); - e.status = res.status; - throw e; - } - - const data = await res.json(); - const segments = Array.isArray(data.segments) ? data.segments : []; - - // Offset support: when the relay caller is processing a chunked - // audio file, it asks for transcripts at a non-zero base time. - // Parakeet returns timestamps relative to the chunk; shift them - // up by offsetSeconds so the combined transcript downstream - // lines up with the real video timeline. - const shifted = segments.map((s) => ({ - start: (s.start || 0) + offsetSeconds, - end: (s.end || 0) + offsetSeconds, - text: (s.text || "").trim(), - })); - - // Build the [MM:SS] text format Recap's parseTimestampedTranscript - // already speaks. The route handler will pass this straight back - // to Recap, which parses it on the client side. - const lines = shifted.length - ? shifted.map((s) => `[${formatMmSs(s.start)}] ${s.text}`) - : [`[0:00] ${(data.text || "").trim()}`]; - - return { - text: lines.join("\n"), - segments: shifted, - duration_seconds: data.duration || 0, - usage: null, // hardware backend doesn't expose token counts - model: transcribeModel, - }; }, + // (postOneChunk + helpers live below the returned object so they + // can close over `parakeet` / `transcribeModel` / `timeoutMs`.) + // POST /v1/chat/completions with the OpenAI shape. Ollama's // server, vLLM, llama.cpp's HTTP server, and most other OSS LLM // runners support this wire format — so we don't lock the relay // to one specific Gemma deployment. + // + // Model-name autodiscovery: if `gemmaModel` is empty OR the + // configured model returns "model does not exist", we query + // /v1/models and use the first listed model id. Spark Control's + // discovery normally supplies the model name explicitly so this + // path runs only when discovery's vllm entry is missing the + // `model` field. Result is cached for the process lifetime so we + // don't pay the /v1/models round-trip on every analyze call. async analyzeText({ prompt }) { if (!gemma) { const err = new Error( - "operator-hardware analyze is not configured (relay_gemma_base_url is empty)" + "operator-hardware analyze is not configured — Spark Control discovery isn't reporting a ready vllm endpoint" ); err.status = 503; throw err; } - // Same path-fallback shape as Parakeet transcribe. Standard - // OpenAI-compatible path is /v1/chat/completions; some Ollama - // versions also expose it at /chat/completions without the /v1. - const pathCandidates = ["/v1/chat/completions", "/chat/completions"]; - let res = null; - let lastUrl = null; - for (const p of pathCandidates) { - const url = `${gemma}${p}`; - lastUrl = url; - try { - res = await fetch(url, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - model: analyzeModel, - max_tokens: ANALYZE_MAX_TOKENS, - messages: [{ role: "user", content: prompt }], - stream: false, - }), - signal: AbortSignal.timeout(timeoutMs), - }); - } catch (err) { + // Resolve which model to send. Configured value wins; empty + // string triggers autodiscovery. + let modelToUse = (analyzeModel || "").trim(); + if (!modelToUse) { + modelToUse = await discoverFirstModel(gemma, timeoutMs); + if (!modelToUse) { const e = new Error( - `Gemma analyze network error at ${url}: ${err?.message || err}` + `No analyze model configured and no models discoverable at ${gemma}/v1/models — set Gemma Model Name or check the Gemma endpoint` ); - e.status = 502; + e.status = 503; throw e; } - if (res.status !== 404) break; - console.warn( - `[hardware] 404 at ${url} — trying next path candidate` - ); } - if (!res.ok) { - const body = await safeBody(res); - const hint = - res.status === 404 - ? ` (tried ${pathCandidates.join(" and ")} on base ${gemma} — check the Gemma/Ollama URL)` - : ""; + const result = await callChatCompletions({ + base: gemma, + model: modelToUse, + prompt, + maxTokens: anMaxTokens, + timeoutMs, + }); + if (result.ok) { + return { + text: result.text, + usage: null, + model: modelToUse, + }; + } + + // Detect "model does not exist" (vLLM / Ollama wording) and + // retry once with autodiscovered first model — operator may + // have swapped models on their hardware without updating the + // relay's config field, very common ergonomic miss. + const looksLikeModelMissing = + result.status === 404 && + /model.*(does not exist|not found|unknown)/i.test(result.body || ""); + if (looksLikeModelMissing) { + const discovered = await discoverFirstModel(gemma, timeoutMs); + if (discovered && discovered !== modelToUse) { + console.warn( + `[hardware] configured model "${modelToUse}" not found at ${gemma}; auto-using discovered "${discovered}"` + ); + const retry = await callChatCompletions({ + base: gemma, + model: discovered, + prompt, + maxTokens: anMaxTokens, + timeoutMs, + }); + if (retry.ok) { + return { + text: retry.text, + usage: null, + model: discovered, + }; + } + // Discovered model also failed — fall through to surface + // the FIRST failure (the configured-model one) so the + // operator sees what they configured + what we tried. + } const e = new Error( - `Gemma analyze ${res.status} at ${lastUrl}: ${body.slice(0, 300)}${hint}` + `Gemma analyze: configured model "${modelToUse}" does not exist on ${gemma}. ` + + `Update "Gemma Model Name" in Set Gemma URL — or leave it blank to auto-pick from /v1/models. ` + + `Response: ${(result.body || "").slice(0, 200)}` ); - e.status = res.status; + e.status = 404; throw e; } - const data = await res.json(); - const text = data?.choices?.[0]?.message?.content || ""; - return { - text, - usage: null, - model: analyzeModel, - }; + const hint = + result.status === 404 + ? ` (tried both /v1/chat/completions and /chat/completions on ${gemma} — check the Gemma URL)` + : ""; + const e = new Error( + `Gemma analyze ${result.status} at ${result.lastUrl}: ${(result.body || "").slice(0, 300)}${hint}` + ); + e.status = result.status; + throw e; }, }; } +// Shared helper that tries both /v1/chat/completions and +// /chat/completions, returns the unified result shape. +async function callChatCompletions({ base, model, prompt, maxTokens, timeoutMs }) { + const pathCandidates = ["/v1/chat/completions", "/chat/completions"]; + let res = null; + let body = null; + let lastUrl = null; + for (const p of pathCandidates) { + const url = `${base}${p}`; + lastUrl = url; + try { + // lanFetch — analyze goes through Spark Control over HTTPS + // with the StartOS Local CA cert. See lan-fetch.js. + res = await lanFetch(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + redirect: "follow", + body: JSON.stringify({ + model, + max_tokens: maxTokens, + messages: [{ role: "user", content: prompt }], + stream: false, + // Force JSON-mode output. Recap's analyze prompt already asks + // for a JSON shape; this constrains the model so it can't + // wrap the answer in "Here are the sections I found:" prose, + // which both costs tokens and trips our parser. + response_format: { type: "json_object" }, + // Qwen3.6 ships with a reasoning/thinking mode that's great + // for math but pure latency-noise for structured extraction. + // The chat_template_kwargs field is honored by vLLM's + // tokenizer when applying the chat template; non-Qwen + // backends ignore it. + chat_template_kwargs: { enable_thinking: false }, + }), + signal: AbortSignal.timeout(timeoutMs), + }); + } catch (err) { + const cause = + err?.cause?.message || err?.cause?.code || err?.cause || ""; + const detail = cause ? `${err.message} (cause: ${cause})` : err?.message || String(err); + const e = new Error( + `Gemma analyze network error at ${url}: ${detail}` + ); + e.status = 502; + throw e; + } + body = await safeBody(res); + // Only fall through to the next path candidate when the 404 is a + // PATH-shaped 404 (FastAPI's default `{"detail":"Not Found"}` or + // an empty body). A 404 with a content body — e.g. vLLM's + // `{"error":{"message":"The model X does not exist..."}}` — + // means the endpoint exists, the request was rejected for a + // different reason. Falling through would mask the real error + // behind a path-not-found from the alternate path. + const looksLikePath404 = + res.status === 404 && + (!body || + body.length < 60 || + /^\s*\{\s*"detail"\s*:\s*"Not Found"\s*\}\s*$/.test(body)); + if (!looksLikePath404) break; + console.warn(`[hardware] path-404 at ${url} — trying next path candidate`); + } + if (!res.ok) { + return { ok: false, status: res.status, body, lastUrl }; + } + let data; + try { + data = JSON.parse(body); + } catch { + return { ok: false, status: 502, body: "non-JSON response", lastUrl }; + } + const text = data?.choices?.[0]?.message?.content || ""; + return { ok: true, text, lastUrl }; +} + +// Cache of {base → modelId} so we don't pay the /v1/models round-trip +// on every call. Per-process; reset on container restart. +const _discoveredModelCache = new Map(); +async function discoverFirstModel(base, timeoutMs) { + if (_discoveredModelCache.has(base)) { + return _discoveredModelCache.get(base); + } + try { + const res = await lanFetch(`${base}/v1/models`, { + signal: AbortSignal.timeout(timeoutMs), + }); + if (!res.ok) { + console.warn(`[hardware] /v1/models returned ${res.status} at ${base}`); + return null; + } + const data = await res.json(); + const first = Array.isArray(data?.data) ? data.data[0] : null; + const id = first?.id || null; + if (id) _discoveredModelCache.set(base, id); + return id; + } catch (err) { + console.warn(`[hardware] /v1/models discovery failed at ${base}: ${err?.message || err}`); + return null; + } +} + function formatMmSs(seconds) { const s = Math.max(0, Math.floor(seconds)); const h = Math.floor(s / 3600); @@ -280,3 +1019,19 @@ async function safeBody(res) { return ""; } } + +// Map common audio MIME types to a sensible file extension so ffprobe +// + ffmpeg pick the right demuxer. Defaults to `mp3` which is the +// fallback Recap normalizes to before sending here. +function extFromMime(mime) { + if (!mime) return "mp3"; + const m = mime.toLowerCase(); + if (m.includes("mp4") || m.includes("m4a")) return "m4a"; + if (m.includes("ogg")) return "ogg"; + if (m.includes("opus")) return "opus"; + if (m.includes("wav")) return "wav"; + if (m.includes("aac")) return "aac"; + if (m.includes("webm")) return "webm"; + if (m.includes("flac")) return "flac"; + return "mp3"; +} diff --git a/server/hardware-config.js b/server/hardware-config.js new file mode 100644 index 0000000..97b80a1 --- /dev/null +++ b/server/hardware-config.js @@ -0,0 +1,92 @@ +// Resolves which (URL, model) the relay should use for each operator- +// hardware pipeline. Spark Control is the SINGLE host the relay +// talks to — it owns transcribe (/v1/audio/transcriptions), +// diarize-chunk (/api/audio/diarize-chunk), AND analyze +// (/v1/chat/completions, OpenAI-compatible) on the same port. The +// discovery JSON Spark Control serves at /api/endpoints lists the +// backing services SC delegates to internally (e.g. a Parakeet +// wrapper, a vLLM instance) along with their ready state + currently- +// loaded model name; the relay reads it ONLY to learn (a) whether +// each service is ready, and (b) which model name to send in the +// upstream request body. The per-service base URLs in the discovery +// JSON are informational — clients hit Spark Control directly, not +// those internal URLs. +// +// Priority order, per pipeline: +// +// 1. Spark Control says ready → use the SC base URL + the model +// name discovery reports for that +// service. +// +// 2. SC says not_ready → return null URL + a blocked_reason +// string. Route handlers surface +// this to the user instead of trying +// a doomed call. +// +// 3. SC unreachable or no URL → null URL; planBackend treats the +// hardware path as not configured. +// +// History: +// - pre-v0.2.84: operator-typed override URLs (relay_parakeet_*, +// relay_gemma_*) won over discovery. Removed. +// - pre-v0.2.85: discovery's per-service base_urls were used +// directly, so transcribe went to a different host (the Parakeet +// wrapper) than diarize (Spark Control). That meant the relay +// was talking to two hosts for one logical operation, and the +// transcribe wrapper didn't have the diarize endpoint. Now: SC +// is the single host. The per-service base_urls in discovery +// are informational — used only for the operator's awareness in +// the dashboard's Service Discovery health line. + +import { getSparkServiceState } from "./spark-control.js"; + +export async function resolveHardwareConfig(cfg) { + const sparkUrl = (cfg.relay_spark_control_url || "").trim(); + // Spark Control base URL with the /api/endpoints suffix and any + // trailing slash stripped. Used as the URL for every operator- + // hardware call: transcribe, diarize-chunk, analyze. + const sparkBase = sparkUrl.replace(/\/$/, "").replace(/\/api\/endpoints$/, ""); + + const transcribe = await resolveOne({ sparkUrl, sparkBase, kind: "transcribe" }); + const analyze = await resolveOne({ sparkUrl, sparkBase, kind: "analyze" }); + const tts = await resolveOne({ sparkUrl, sparkBase, kind: "tts" }); + + return { + transcribe, + analyze, + tts, + sparkBase, + }; +} + +async function resolveOne({ sparkUrl, sparkBase, kind }) { + if (!sparkUrl || !sparkBase) { + return { url: null, model: null, source: null }; + } + const state = await getSparkServiceState(sparkUrl, kind); + if (state.status === "ready") { + return { + // ALWAYS Spark Control's base URL — never the delegate's + // base_url. SC owns the wire-facing endpoint; the delegate + // URL is just where SC routes the request internally. + url: sparkBase, + // Model name comes from discovery so we send the right + // `model` field upstream. SC tells us what model is currently + // loaded on its parakeet / vllm delegate. + model: state.model, + source: "spark-control", + }; + } + if (state.status === "not_ready") { + return { + url: null, + model: null, + source: "spark-control", + blocked_reason: state.reason, + }; + } + // status === "unknown" → discovery unreachable or that service + // isn't listed. Return null URL; the route handler surfaces the + // error to the user. + return { url: null, model: null, source: null }; +} diff --git a/server/hardware-queue.js b/server/hardware-queue.js new file mode 100644 index 0000000..d8e8948 --- /dev/null +++ b/server/hardware-queue.js @@ -0,0 +1,110 @@ +// Process-global FIFO queue for hardware-backed jobs. +// +// Why this exists: Spark Control + the operator's GPU box can really +// only handle one full pipeline at a time. Pre-queue, two concurrent +// /relay/v1/summarize-url requests would each spin up their own +// async worker — both would fire 2-parallel transcribe chunks + 2 +// parallel diarize calls (and analyze on top), totalling 8+ +// simultaneous calls into Spark Control. The GPU thrashes and +// either both jobs slow down massively or chunks start failing. +// +// The fix is a single-slot semaphore around any code path that +// drives the hardware backend. Gemini paths bypass entirely — the +// Google API handles arbitrary concurrency at its end and we have +// no local bottleneck. Intra-job concurrency (the operator's +// configured 2-parallel chunk worker) stays — the operator's GPU +// can handle 2 simultaneous transcribe POSTs per the SC dev's +// guidance; the queue serializes whole jobs, not individual calls +// within a job. +// +// Use: +// const release = await acquireHardwareSlot({ +// jobId, +// onWait: (info) => { ... emit "queued" SSE event ... }, +// }); +// try { +// // ... do hardware-backed pipeline ... +// } finally { +// release(); +// } +// +// onWait fires AT MOST ONCE, before this caller starts waiting. +// `info.position` is 1-indexed (1 = next up after the current +// active job, 2 = one slot back, etc). Callers use this to push +// a "queued" event to the SSE client so the UI can show "Queued +// — N jobs ahead of you". + +let queueHead = Promise.resolve(); +let pendingCount = 0; // includes the currently-active job +let currentJobId = null; +let queueLog = []; // bounded ring buffer for /admin/queue visibility + +const MAX_LOG_ENTRIES = 50; + +function logQueueEvent(event) { + queueLog.push({ at: Date.now(), ...event }); + if (queueLog.length > MAX_LOG_ENTRIES) { + queueLog = queueLog.slice(-MAX_LOG_ENTRIES); + } +} + +export async function acquireHardwareSlot({ jobId, onWait } = {}) { + pendingCount += 1; + const positionInQueue = pendingCount - 1; // 0 = active immediately, 1+ = waiting + const prev = queueHead; + let releaseFn; + const slot = new Promise((resolve) => { + releaseFn = resolve; + }); + queueHead = prev.then(() => slot); + + if (positionInQueue > 0) { + logQueueEvent({ + kind: "enqueued", + jobId: jobId || null, + position: positionInQueue, + activeJobId: currentJobId, + }); + if (typeof onWait === "function") { + try { + onWait({ position: positionInQueue, activeJobId: currentJobId }); + } catch {} + } + console.log( + `[hardware-queue] job ${jobId ? jobId.slice(0, 8) : "(no-id)"} queued at position ${positionInQueue} (active: ${currentJobId ? currentJobId.slice(0, 8) : "none"})` + ); + } + + await prev; + currentJobId = jobId || null; + logQueueEvent({ kind: "started", jobId: jobId || null }); + console.log( + `[hardware-queue] job ${jobId ? jobId.slice(0, 8) : "(no-id)"} started — ${pendingCount - 1} waiting` + ); + + let released = false; + return () => { + if (released) return; + released = true; + const wasActive = currentJobId; + currentJobId = null; + pendingCount -= 1; + logQueueEvent({ kind: "released", jobId: wasActive }); + console.log( + `[hardware-queue] job ${wasActive ? wasActive.slice(0, 8) : "(no-id)"} released — ${pendingCount} still in queue` + ); + releaseFn(); + }; +} + +// Read-only snapshot for the operator dashboard. Returns: +// pendingCount: total jobs in the queue (active + waiting) +// currentJobId: the job currently holding the slot, or null +// log: recent queue events (most recent last; bounded to 50) +export function getHardwareQueueStatus() { + return { + pendingCount, + currentJobId, + log: queueLog.slice(), + }; +} diff --git a/server/lan-fetch.js b/server/lan-fetch.js new file mode 100644 index 0000000..fdae2b6 --- /dev/null +++ b/server/lan-fetch.js @@ -0,0 +1,41 @@ +// TLS-tolerant fetch for LAN calls to operator-managed services that +// use StartOS Local Intermediate CA certs (or other self-signed certs) +// the relay container doesn't trust by default. +// +// Scope: Spark Control discovery + diarize-chunk POST. Those are the +// only HTTPS endpoints the relay talks to over the operator's own +// LAN. Public-internet calls (Gemini, Keysat, BTCPay) keep going +// through the normal global fetch with full cert validation — we +// don't pipe those through this helper. +// +// Safety: the URL passed in came from the operator's own config +// (Service Discovery URL field). The relay isn't auto-discovering or +// following untrusted redirects to arbitrary hosts. +// +// Implementation: undici's Agent supports per-dispatcher +// rejectUnauthorized control; Node's built-in fetch globally does +// not. We construct one Agent at module init and pass it as the +// dispatcher on every call. + +import { Agent, fetch as undiciFetch } from "undici"; + +const lanAgent = new Agent({ + connect: { rejectUnauthorized: false }, + // 5s for TCP+TLS handshake — long enough for cold StartOS sockets, + // short enough to fail fast on a bad URL. + connectTimeout: 5000, + // Diarize-chunk and transcribe on the operator's GPU box are slow + // (Sortformer + TitaNet on a 5-min chunk can run 30-120s before + // SC sends back response headers). headersTimeout/bodyTimeout = 0 + // disables undici's own watchdog — we rely entirely on the + // AbortSignal.timeout the caller passes (900s by default for + // hardware backend calls). Without this, lanFetch was killing the + // connection at 10s with a bare "fetch failed" before SC had any + // chance to respond. + headersTimeout: 0, + bodyTimeout: 0, +}); + +export async function lanFetch(url, opts = {}) { + return undiciFetch(url, { ...opts, dispatcher: lanAgent }); +} diff --git a/server/spark-control-events.js b/server/spark-control-events.js new file mode 100644 index 0000000..6140ca8 --- /dev/null +++ b/server/spark-control-events.js @@ -0,0 +1,58 @@ +// Passive error reporting to Spark Control's /api/health-event +// endpoint. When a hardware-backed transcribe or analyze call fails +// (Parakeet 503, vLLM model not found, network timeout to Gemma, +// etc.), the relay fires a small POST so the operator's Spark +// Control dashboard can surface the failure in real time — without +// waiting for its own 5s health-check poll to catch the outage. +// +// Fire-and-forget: the request is kicked off but NOT awaited by the +// caller's error path, so reporting failure to Spark Control never +// adds latency to the user's transcribe-failed response. +// +// Reuses the operator's `relay_spark_control_url` config field +// (already used for /api/endpoints discovery). The relay just swaps +// the path for /api/health-event, keeping the operator config +// surface to one URL. + +import { getConfigSnapshot } from "./config.js"; + +const REPORT_TIMEOUT_MS = 3_000; + +// Fire-and-forget. service ∈ { "parakeet", "vllm", "gemma", ... } +// matching whatever Spark Control's poller knows about. Errors are +// swallowed silently — observability hiccups shouldn't bleed into +// the relay's error path. +export function reportHealthEvent({ service, ok = false, error, ms }) { + // Wrap the actual work in setImmediate so the caller's microtask + // queue isn't blocked at all. + setImmediate(async () => { + try { + const cfg = await getConfigSnapshot(); + const base = (cfg.relay_spark_control_url || "").trim(); + if (!base) return; // not configured — silent no-op + // Strip any path the operator may have configured (they + // typically set the /api/endpoints URL) and append health-event. + let origin; + try { + origin = new URL(base).origin; + } catch { + return; + } + const url = `${origin}/api/health-event`; + await fetch(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + service, + ok, + source: "recap-relay", + error: typeof error === "string" ? error.slice(0, 280) : null, + ms: typeof ms === "number" ? ms : null, + }), + signal: AbortSignal.timeout(REPORT_TIMEOUT_MS), + }); + } catch { + // Best-effort — swallow. + } + }); +} diff --git a/server/spark-control.js b/server/spark-control.js new file mode 100644 index 0000000..547b26d --- /dev/null +++ b/server/spark-control.js @@ -0,0 +1,207 @@ +// Optional service-discovery client. When the operator points +// relay_spark_control_url at a Spark Control (or compatible) endpoint +// that serves a JSON document describing the local LLM/STT/TTS +// services, the relay uses that to fill in any URL + model fields +// the operator left blank in their per-backend config. +// +// Expected JSON shape (Spark Control's /api/endpoints): +// { +// "vllm": { ready: bool, base_url: "...", model: "...", openai_compat: bool }, +// "parakeet": { ready: bool, base_url: "...", kind: "stt", model: "..." }, +// "kokoro": { ready: bool, base_url: "...", kind: "tts" } +// } +// +// Cached for SHORT_CACHE_TTL_MS to keep relay responses snappy while +// still picking up model swaps on the operator's GPU box within a +// minute. Unreachable / failing endpoint falls back to operator-typed +// values silently — no boot-time hard dep. + +import { lanFetch } from "./lan-fetch.js"; + +const SHORT_CACHE_TTL_MS = 60 * 1000; // 60s +const FETCH_TIMEOUT_MS = 3000; + +let cached = { fetched_at: 0, url: null, discovery: null }; + +// Last error from a discovery fetch attempt — surfaced in the +// dashboard so the operator can see when discovery is silently +// failing (the alternative is "operator-typed values silently win" +// which produces confusing "fetch failed" errors downstream when the +// override URL is also broken). Cleared on each successful fetch. +let lastError = { at: 0, message: null }; + +// Fire-and-forget background refresh while serving from cache — +// callers never block on the network. The first call after the cache +// expires returns the stale snapshot but kicks off a refresh in the +// background, so the next call gets the fresh data without ever +// having paid a round-trip in the critical path. +let inflight = null; + +// Operator-visible discovery snapshot used by the admin dashboard. +// Returns: { configured, url, lastFetched, lastError, services } +// - configured: whether the operator set a discovery URL at all +// - url: the configured URL (or null) +// - lastFetched: epoch ms of last successful fetch (or 0) +// - lastError: { at, message } of last failed fetch (cleared on success) +// - services: parsed map of { parakeet: {...}, vllm: {...}, ... } +// when the last fetch succeeded; null otherwise. +// +// `configured` reflects the operator's config; the other fields +// reflect what actually happened on the network. Read-only — never +// triggers a fetch, just reports the cached state. The dashboard's +// Settings tab renders this so the operator can spot a silently- +// failing discovery (typo, unreachable host, TLS cert that the +// container doesn't trust, etc.) without grepping container logs. +export function getSparkDiscoveryStatus(configuredUrl) { + const url = (configuredUrl || "").trim(); + return { + configured: !!url, + url: url || null, + lastFetched: cached.url === url ? cached.fetched_at : 0, + lastError: cached.url === url && lastError.at > cached.fetched_at + ? { at: lastError.at, message: lastError.message } + : null, + services: cached.url === url && cached.discovery + ? Object.fromEntries( + Object.entries(cached.discovery).map(([k, v]) => [ + k, + { + ready: !!(v && v.ready), + base_url: v && v.base_url ? String(v.base_url) : null, + model: v && v.model ? String(v.model) : null, + kind: v && v.kind ? String(v.kind) : null, + }, + ]) + ) + : null, + }; +} + +export async function getSparkControlDiscovery(url) { + if (!url) return null; + // If the URL changed (operator updated config), invalidate. + if (cached.url !== url) { + cached = { fetched_at: 0, url, discovery: null }; + } + const fresh = Date.now() - cached.fetched_at < SHORT_CACHE_TTL_MS; + if (fresh && cached.discovery) { + return cached.discovery; + } + // Stale (or never fetched). Trigger a background refresh; return + // whatever we currently have (might be null on cold boot). + if (!inflight) { + inflight = fetchDiscovery(url) + .then((discovery) => { + cached = { fetched_at: Date.now(), url, discovery }; + lastError = { at: 0, message: null }; + }) + .catch((err) => { + const msg = err?.message || String(err); + lastError = { at: Date.now(), message: msg }; + console.warn(`[spark-control] discovery fetch failed for ${url}: ${msg}`); + }) + .finally(() => { + inflight = null; + }); + } + // On cold start, wait briefly for the first fetch so we don't serve + // a request with null discovery if Spark Control is alive. + if (!cached.discovery) { + try { + await Promise.race([ + inflight, + new Promise((r) => setTimeout(r, FETCH_TIMEOUT_MS + 500)), + ]); + } catch {} + } + return cached.discovery; +} + +async function fetchDiscovery(url) { + // lanFetch uses an undici Agent with rejectUnauthorized:false so + // that Spark Control's StartOS Local Intermediate CA cert (the + // standard Start9 LAN setup) doesn't fail TLS validation in the + // relay container. Plain-http LAN URLs work through the same path + // without TLS overhead. Public-internet calls (Gemini, Keysat, + // BTCPay) keep using the global fetch with full cert validation + // — see lan-fetch.js for the scope rationale. + const res = await lanFetch(url, { + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + redirect: "follow", + }); + if (!res.ok) { + throw new Error(`HTTP ${res.status}`); + } + return await res.json(); +} + +// Given a Spark Control URL and a "kind" hint, return the live +// service state. Three outcomes: +// +// { status: "ready", base_url, model } +// Service is up + healthy. Use these values. +// +// { status: "not_ready", reason } +// Service is listed in discovery but ready=false. A model swap +// might be in progress; or the operator hasn't loaded a model +// yet; or the wrapper crashed. Route handlers should fail fast +// with this message so the user knows what to fix on Spark +// Control instead of seeing a generic 500. +// +// { status: "unknown" } +// Discovery URL not configured, not reachable, or the requested +// service isn't in the response. Caller should fall back to +// operator-typed config. +// +// kind: "transcribe" → uses .parakeet (any STT-shaped entry) +// kind: "analyze" → uses .vllm (any OpenAI-compat LLM entry) +// kind: "tts" → uses .kokoro (Kokoro TTS entry; no `model` +// field — voice is chosen per-request by the +// caller, so a ready kokoro with a base_url is +// enough) +export async function getSparkServiceState(url, kind) { + if (!url) return { status: "unknown" }; + let discovery; + try { + discovery = await getSparkControlDiscovery(url); + } catch { + return { status: "unknown" }; + } + if (!discovery) return { status: "unknown" }; + const target = + kind === "transcribe" + ? discovery.parakeet + : kind === "analyze" + ? discovery.vllm + : kind === "tts" + ? discovery.kokoro + : null; + if (!target) return { status: "unknown" }; + if (!target.base_url) return { status: "unknown" }; + if (target.ready === false) { + return { + status: "not_ready", + reason: + kind === "transcribe" + ? "Spark Control reports STT (parakeet) is offline. Check spark-control — a model swap may be in progress, or the wrapper needs attention." + : kind === "tts" + ? "Spark Control reports TTS (kokoro) is offline. Check spark-control — the Kokoro container may be down or restarting." + : "Spark Control reports the LLM (vllm) is offline. Check spark-control — load a model via the dashboard or wait for an in-progress swap to finish.", + }; + } + return { + status: "ready", + base_url: target.base_url, + model: target.model || null, + }; +} + +// Kept for backward compatibility — same signature as before, returns +// null when the service isn't ready. New code should call +// getSparkServiceState directly so it can distinguish "not configured" +// from "configured but not ready". +export async function getDiscoveredEndpoint(url, kind) { + const state = await getSparkServiceState(url, kind); + if (state.status !== "ready") return null; + return { base_url: state.base_url, model: state.model }; +}