// Operator-hardware fallback backend. Forwards transcribe requests to // a Parakeet endpoint (or any Whisper-API-compatible server — same wire // format) and analyze requests to a Gemma endpoint (or any // OpenAI-compatible chat-completions server). // // Used when a Pro/Max user has exceeded their monthly Gemini cap. // Returns the same shape gemini.js produces so route handlers don't // need a backend-specific branch downstream: // transcribeAudio → { text, segments, duration_seconds } // analyzeText → { text } // // Long-audio handling: Parakeet wrappers on operator GPUs OOM on long // audio (one shot of a 108-min file returned "CUDA error: unknown // error" on a 24GB card). transcribeAudio chunks audio > 5 minutes // into 5-minute slices, transcribes each sequentially, and stitches // the segments with continuous timestamps. Single-pass for short // audio, no perf hit. import fs from "fs/promises"; import os from "os"; import path from "path"; import { getAudioDurationSeconds, splitAudioFile, } from "../audio-meta.js"; import { sortAndDedupeTranscript, mergeShortEntries, } from "./gemini.js"; import { lanFetch } from "../lan-fetch.js"; import { clusterSpeakers, assignSpeakersToSegments, } from "../speaker-clustering.js"; const ANALYZE_MAX_TOKENS = 16000; // Gemma served locally tends to live on the host's LAN, not the public // internet, so generous timeouts. Same scale as Recap's defaults. const DEFAULT_TIMEOUT_MS = 900_000; // Default Parakeet model name — used only when the route handler // doesn't supply one (e.g. a unit test instantiating the backend // directly). In production the value comes from Spark Control // discovery via resolveHardwareConfig(). // // NOTE: there is NO default for the analyze (Gemma) model. An empty // gemmaModel intentionally triggers autodiscovery via /v1/models in // analyzeText() below. Substituting a hardcoded default here (the // old "gemma3:27b") defeats autodiscovery and causes "model does // not exist" errors when the operator's hardware hosts a different // model name. const DEFAULT_TRANSCRIBE_MODEL = "parakeet-tdt-0.6b-v3"; // Normalize an OpenAI-API-compatible base URL: strip trailing slash // AND strip a trailing `/v1` segment if the operator pasted one, // because we always append `/v1/...` below. Without this, a base URL // of `http://192.168.1.87:8000/v1` would produce // `http://192.168.1.87:8000/v1/v1/audio/transcriptions` → 404. function normalizeApiBase(url) { let s = (url || "").trim().replace(/\/$/, ""); s = s.replace(/\/v1$/, ""); return s; } // 503 retry policy — both Parakeet and Sortformer/TitaNet endpoints // on Spark Control return 503 + a "retry in ~60s" body when the GPU // container CUDA-wedges and auto-restarts. The right thing to do is // wait for the documented retry window (or the Retry-After header if // supplied) and try again. Honor Retry-After when present, default // to 60s, clamp to [5, 120], and add ±5s jitter so parallel chunks // don't all wake up and pile on at the exact same moment (thundering // herd → instant re-wedge). const SPARK_503_MAX_ATTEMPTS = 4; function parse503WaitSec(res) { const ra = res?.headers?.get?.("retry-after"); const raNum = parseInt(ra || "60", 10); const base = Math.min(120, Math.max(5, Number.isFinite(raNum) ? raNum : 60)); const jitter = Math.round((Math.random() * 10 - 5) * 10) / 10; // ±5s return Math.max(5, Math.round((base + jitter) * 10) / 10); } function sleepMs(ms) { return new Promise((r) => setTimeout(r, ms)); } export function createHardwareBackend({ parakeetBaseURL = "", gemmaBaseURL = "", // Spark Control base URL (no path) — derived by the caller from // relay_spark_control_url with the /api/endpoints suffix stripped. // Used only for diarize-chunk (POST {spark}/api/audio/diarize-chunk). // Transcribe and analyze go to parakeet/gemma respectively, which // discovery reports as separate hosts on the operator's LAN. // Empty = diarization can't run (the route handler should not have // enabled it). sparkControlBaseURL = "", parakeetModel = DEFAULT_TRANSCRIBE_MODEL, gemmaModel = "", timeoutMs = DEFAULT_TIMEOUT_MS, // Chunking knobs — caller MUST source from relay-config.json. The // defaults here are for unit-test ergonomics only. Production // callers always pass these from getConfigSnapshot() so the // operator's Settings-tab edits flow through. // // Parallel chunks: per the spark-control LLM developer, Parakeet can // handle ~4 concurrent transcribe POSTs on a typical single-GPU // operator setup. Earlier versions hard-serialized to 1 in flight // out of caution; we now bound by the operator's chosen concurrency. txChunkSeconds = 5 * 60, // Overlap (seconds) between consecutive chunks. Defaults to 0 // (matches pre-v0.2.77 behavior); production callers pass the // operator's configured `relay_hardware_tx_chunk_overlap_seconds`. // When > 0, chunks at the boundary share audio and the stitcher // drops words/segments in chunk N+1 whose global timestamps fall // within the prior chunk's tail. See splitAudioFile() for the // chunking math. txChunkOverlapSeconds = 0, txConcurrency = 4, // Diarization: when true, every chunk gets TWO parallel HTTP calls // — the existing /v1/audio/transcriptions to Parakeet AND a sibling // /api/audio/diarize-chunk to Sortformer+TitaNet on the same Spark // Control host. Diarization output (segments + per-chunk voice // fingerprints) is collected per chunk and returned alongside the // transcript on the backend response so the relay's caller can run // cross-chunk speaker clustering. In Phase 1C this just gathers the // data; the clustering + transcript-merge step lands in Phase 1D. // Default false (matches the current transcribe-only behavior). diarizationEnabled = false, // Cosine-similarity threshold for cross-chunk speaker clustering, // as an integer percentage (slider system is int-only). 70 means // "merge two chunks' speaker fingerprints into the same global // speaker if their cosine similarity >= 0.70" — NeMo's recommended // default for TitaNet embeddings. Range 50-95 enforced inside the // clustering module; values outside the range are clamped. clusterThresholdPct = 70, // Post-cluster suppression knobs (Phase 2). Operator-tunable via // Settings → Operator hardware. Used only when diarization is on. // Defaults match speaker-clustering.js's hardcoded fallbacks. anchorMinSpeakingSec = 30, smallClusterMaxSpeakingSec = 15, uncertainMarginPct = 10, // Optional per-chunk completion callback for the pipelined-analyze // path. Fires AFTER both transcribe + diarize have resolved for // each chunk (Promise.allSettled), in the order chunks finish (not // necessarily monotonic by chunkIndex when concurrency > 1). // Receives: // { // chunkIndex, // 0-based // totalChunks, // emitted on the first call so the caller // // knows the count up front // startSeconds, // global audio offset where this chunk begins // durationSeconds, // chunk audio length // overlapBoundarySec, // RELATIVE to chunk start; segments with // // global start < (startSeconds + overlap- // // BoundarySec) belong to the prior chunk // segments, // raw chunk segments, globally-timestamped // // (start/end already shifted by startSeconds) // diarOk, // true | false | null (null = not run) // } // Caller uses this to feed a chunk buffer that the pipelined-analyze // workers read from. When this callback is null (the default), // transcribe behaves as today — caller gets the final stitched // result only at the end. onChunkComplete = null, // Output cap passed to the operator's vLLM/Ollama chat-completion // endpoint as `max_tokens`. Default 16000 was the historical // hardcoded value. Operators with smaller models that produce // shorter JSON sections may want to drop this; operators on // larger models with reasoning preambles may want to bump it. // Configurable via Settings → `relay_hardware_an_max_tokens`. anMaxTokens = ANALYZE_MAX_TOKENS, } = {}) { const parakeet = normalizeApiBase(parakeetBaseURL); const gemma = normalizeApiBase(gemmaBaseURL); // Spark Control URL — used ONLY by postDiarizeChunk. Strip any // trailing /api/endpoints (the discovery path) and any trailing // slash so the diarize URL composes cleanly as `${sparkBase}/api/ // audio/diarize-chunk`. const sparkBase = (sparkControlBaseURL || "") .trim() .replace(/\/$/, "") .replace(/\/api\/endpoints$/, ""); const transcribeModel = parakeetModel || DEFAULT_TRANSCRIBE_MODEL; // Intentionally NOT coalesced to a default — empty string means // "autodiscover from /v1/models at request time". See the comment // at DEFAULT_TRANSCRIBE_MODEL above for why. const analyzeModel = gemmaModel || ""; // Single Parakeet POST. The chunking layer in transcribeAudio calls // this once per chunk; for short audio it's called once with the // whole buffer. Returns { segments, duration_seconds } that the // caller stitches. // // Tries the OpenAI standard `/v1/audio/transcriptions` first, falls // back to bare `/audio/transcriptions` on 404. Retries without // verbose_json + timestamp_granularities[] if the wrapper rejects // the rich shape. async function postOneChunk({ audioBuffer, mimeType, offsetSeconds }) { const buildForm = (richMode) => { const form = new FormData(); const blob = new Blob([audioBuffer], { type: mimeType }); form.append("file", blob, "audio.bin"); form.append("model", transcribeModel); if (richMode) { form.append("response_format", "verbose_json"); form.append("timestamp_granularities[]", "segment"); } return form; }; const pathCandidates = [ "/v1/audio/transcriptions", "/audio/transcriptions", ]; // Resolve which path the wrapper exposes (404 from one → try the // other). Done before the 503 loop so we don't waste retry // budget on a path that doesn't exist. let resolvedUrl = null; let res = null; for (const p of pathCandidates) { const url = `${parakeet}${p}`; try { res = await lanFetch(url, { method: "POST", body: buildForm(true), redirect: "follow", signal: AbortSignal.timeout(timeoutMs), }); } catch (err) { const cause = err?.cause?.message || err?.cause?.code || err?.cause || ""; const detail = cause ? `${err.message} (cause: ${cause})` : err?.message || String(err); const e = new Error(`Parakeet transcribe network error at ${url}: ${detail}`); e.status = 502; throw e; } if (res.status !== 404) { resolvedUrl = url; break; } console.warn( `[hardware] 404 at ${url} — trying next path candidate` ); } if (!resolvedUrl) { // All path candidates 404'd — return that 404 below. resolvedUrl = `${parakeet}${pathCandidates[pathCandidates.length - 1]}`; } // ── 503 retry loop ── // Spark Control returns 503 + "retry in ~60s" body when the // Parakeet container is CUDA-wedged and auto-restarting. We wait // (honoring Retry-After) and retry up to SPARK_503_MAX_ATTEMPTS // total. Without this loop, the very first chunks in a multi- // chunk job all hit a single wedge simultaneously and each chunk // permanently fails after one round-trip. let attempt = 1; while (res.status === 503 && attempt < SPARK_503_MAX_ATTEMPTS) { const waitSec = parse503WaitSec(res); console.warn( `[hardware/tx] 503 at ${resolvedUrl} (attempt ${attempt}/${SPARK_503_MAX_ATTEMPTS}) — Spark Control reports transient CUDA wedge, waiting ${waitSec}s then retrying` ); await sleepMs(waitSec * 1000); try { res = await lanFetch(resolvedUrl, { method: "POST", body: buildForm(true), redirect: "follow", signal: AbortSignal.timeout(timeoutMs), }); } catch (err) { const cause = err?.cause?.message || err?.cause?.code || err?.cause || ""; const detail = cause ? `${err.message} (cause: ${cause})` : err?.message || String(err); const e = new Error(`Parakeet transcribe network error at ${resolvedUrl} (retry ${attempt}): ${detail}`); e.status = 502; throw e; } attempt += 1; } // ── Rich-mode fallback ── // If the wrapper rejected verbose_json + timestamp_granularities // (some non-OpenAI Parakeet servers do — this is a PARAMETER // issue, not a server-availability issue), retry once without // those fields. Distinct from the 503 retry above — different // failure mode, different fix. We skip this on 503 because the // 503 loop has already exhausted retries. if ( !res.ok && res.status !== 503 && res.status >= 400 && res.status < 600 && res.status !== 404 ) { const richBody = await safeBody(res); console.warn( `[hardware] rich Parakeet request to ${resolvedUrl} returned ${res.status}: ${richBody.slice(0, 200)} — retrying without verbose_json` ); try { res = await lanFetch(resolvedUrl, { method: "POST", body: buildForm(false), redirect: "follow", signal: AbortSignal.timeout(timeoutMs), }); } catch (err) { const e = new Error( `Parakeet transcribe network error (bare fallback) at ${resolvedUrl}: ${err?.message || err}` ); e.status = 502; throw e; } } if (!res.ok) { const body = await safeBody(res); const hint = res.status === 404 ? ` (tried ${pathCandidates.join(" and ")} on base ${parakeet} — wrapper may expose the endpoint at a different path; check the Parakeet URL or container logs)` : res.status === 503 ? ` (Spark Control reported transient CUDA wedge ${SPARK_503_MAX_ATTEMPTS} times in a row — operator container may need manual restart)` : ""; const e = new Error( `Parakeet transcribe ${res.status} at ${resolvedUrl}: ${body.slice(0, 300)}${hint}` ); e.status = res.status; throw e; } const data = await res.json(); const segments = Array.isArray(data.segments) ? data.segments : []; const shifted = segments.map((s) => ({ start: (s.start || 0) + offsetSeconds, end: (s.end || 0) + offsetSeconds, text: (s.text || "").trim(), })); return { segments: shifted, duration_seconds: data.duration || 0, }; } // Parallel diarization call for the same audio chunk. Hits // POST {spark_control}/api/audio/diarize-chunk (Sortformer for // segmentation, TitaNet for per-speaker voice fingerprints). The // returned `speaker` labels are LOCAL to this chunk — Speaker_0 // here is not necessarily the same person as Speaker_0 in chunk // N+1; the relay's cross-chunk clustering step (Phase 1D) // reconciles them via cosine similarity on the fingerprints. // // Retry policy from the SC dev: // 503 + Retry-After: Parakeet/Sortformer CUDA wedge — container // is auto-restarting in the background. Wait + retry once. // 502: container unreachable. Fail. // 500: real error. Fail. // // Segment timestamps are shifted by offsetSeconds so they // already sit on the global audio timeline when returned — // matches postOneChunk's behavior so the merge step in Phase 1D // can align word timestamps and segment timestamps directly // without re-shifting. async function postDiarizeChunk({ audioBuffer, mimeType, offsetSeconds }) { if (!sparkBase) { const e = new Error( "Diarization called but sparkControlBaseURL is empty — caller should not have enabled diarization without Service Discovery configured" ); e.status = 500; throw e; } const buildForm = () => { const form = new FormData(); const blob = new Blob([audioBuffer], { type: mimeType }); form.append("file", blob, "audio.bin"); return form; }; // Diarize-chunk lives on Spark Control (the same host that serves // /api/endpoints) — NOT on the Parakeet wrapper. Discovery's // parakeet.base_url usually points to a separate STT-only service // that doesn't have this endpoint. Use lanFetch so Spark Control's // StartOS Local Intermediate CA cert doesn't fail TLS validation. const url = `${sparkBase}/api/audio/diarize-chunk`; let res = null; // 503 retry loop — same policy as the transcribe path. Up to // SPARK_503_MAX_ATTEMPTS, honoring Retry-After with ±5s jitter // so parallel chunks don't synchronize their retries and pile // back onto the freshly-restarted container at the same instant. for (let attempt = 0; attempt < SPARK_503_MAX_ATTEMPTS; attempt++) { try { res = await lanFetch(url, { method: "POST", body: buildForm(), redirect: "follow", signal: AbortSignal.timeout(timeoutMs), }); } catch (err) { // undici wraps the underlying socket error in err.cause — // surfacing it makes "fetch failed" diagnosable (ECONNREFUSED // vs ETIMEDOUT vs TLS handshake vs DNS). const cause = err?.cause?.message || err?.cause?.code || err?.cause || ""; const detail = cause ? `${err.message} (cause: ${cause})` : err?.message || String(err); const e = new Error(`Diarize network error at ${url}: ${detail}`); e.status = 502; throw e; } if (res.status === 503 && attempt < SPARK_503_MAX_ATTEMPTS - 1) { const waitSec = parse503WaitSec(res); console.warn( `[hardware/diarize] 503 at ${url} (attempt ${attempt + 1}/${SPARK_503_MAX_ATTEMPTS}) — Spark Control reports transient CUDA wedge, waiting ${waitSec}s then retrying` ); await sleepMs(waitSec * 1000); continue; } break; } if (!res.ok) { const body = await safeBody(res); const hint = res.status === 503 ? ` (Spark Control reported transient CUDA wedge ${SPARK_503_MAX_ATTEMPTS} times in a row — operator container may need manual restart)` : ""; const e = new Error( `Diarize ${res.status} at ${url}: ${body.slice(0, 300)}${hint}` ); e.status = res.status; throw e; } const data = await res.json(); const rawSegments = Array.isArray(data.segments) ? data.segments : []; const shiftedSegments = rawSegments.map((s) => ({ // Use the same `start` / `end` naming as postOneChunk to keep // the merge code uniform across segment shapes. start: (s.start_s || 0) + offsetSeconds, end: (s.end_s || 0) + offsetSeconds, speaker_local: s.speaker || "Speaker_unknown", confidence: typeof s.confidence === "number" && Number.isFinite(s.confidence) ? s.confidence : null, })); return { duration_s: typeof data.duration === "number" ? data.duration : 0, segments: shiftedSegments, speakers_local: Array.isArray(data.speakers_detected) ? data.speakers_detected : Array.isArray(data.speakers_local) ? data.speakers_local : [], fingerprints: data.fingerprints && typeof data.fingerprints === "object" ? data.fingerprints : {}, models: data.models || null, }; } return { hasTranscribe: !!parakeet, hasAnalyze: !!gemma, // Public API. Receives a full audio buffer; decides whether to // send it to Parakeet in one shot or to chunk-and-stitch. async transcribeAudio({ audio, mimeType = "application/octet-stream", offsetSeconds = 0, }) { if (!parakeet) { const err = new Error( "operator-hardware transcribe is not configured — Spark Control discovery isn't reporting a ready parakeet endpoint" ); err.status = 503; throw err; } // Write to temp file so ffprobe + ffmpeg can read it (no // reliable stdin pathway for either tool on every codec). // Cleanup happens unconditionally in `finally`. const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "hw-tx-")); const inputPath = path.join(tmpDir, "input." + extFromMime(mimeType)); await fs.writeFile(inputPath, audio); try { const duration = await getAudioDurationSeconds(inputPath); // Build the chunk list. For audio that fits in a single chunk // (or where ffprobe couldn't read the duration), we synthesize // a 1-entry descriptor pointing at the ORIGINAL file — no // ffmpeg cut needed. This routes the short-audio case through // the SAME worker loop as multi-chunk audio so: // - diarization still fires (when enabled) // - onChunkComplete still fires (otherwise pipelined-analyze // workers wait forever on a callback that never comes — the // bug that hung Internal Meetings + short YouTube videos) // - cross-chunk clustering still runs (N=1 cluster, but // speakers are still labeled on the segments) // - bracketed-text stitching runs (the single-shot path used // to return text="" because postOneChunk doesn't emit text) // For longer audio, splitAudioFile cuts on disk as before. let chunks; if (!duration || duration <= txChunkSeconds) { chunks = [{ filePath: inputPath, startSeconds: 0, durationSeconds: duration || 0, overlapBoundarySec: 0, index: 0, }]; } else { chunks = await splitAudioFile({ inputPath, outputDir: tmpDir, chunkSeconds: txChunkSeconds, overlapSeconds: txChunkOverlapSeconds, }); } const effectiveConcurrency = Math.min(chunks.length, Math.max(1, txConcurrency)); const durLabel = duration ? `${duration.toFixed(0)}s` : "unknown-length"; if (chunks.length === 1) { console.log( `[hardware] ${durLabel} audio fits in a single chunk — sending in one shot (no ffmpeg split)` ); } else { console.log( `[hardware] chunking ${durLabel} audio into ${chunks.length} pieces of up to ${txChunkSeconds}s (overlap=${txChunkOverlapSeconds}s), ${effectiveConcurrency} in parallel` ); } if (diarizationEnabled) { console.log( `[hardware] diarization ENABLED — each chunk fires 2 parallel calls (transcribe + diarize) against Spark Control` ); } const chunkResults = new Array(chunks.length); // Per-chunk wall-time tracking. Summed by callers into the // audit row's transcribe_ms_sum so the operator can see total // backend compute across all Parakeet POSTs (distinct from // the outer wall-time which captures the parallel-fan-out). const chunkDurationsMs = new Array(chunks.length).fill(null); // Per-chunk diarization data. Populated when // diarizationEnabled is true. Each entry mirrors the chunk's // index in `chunks` so the clustering step (Phase 1D) can // correlate per-chunk speaker labels back to chunk positions. // null = diarization didn't run OR failed for this chunk. const chunkDiarization = new Array(chunks.length).fill(null); let nextIdx = 0; const worker = async () => { while (true) { const i = nextIdx++; if (i >= chunks.length) return; const chunk = chunks[i]; const chunkStart = Date.now(); try { console.log( `[hardware] chunk ${i + 1}/${chunks.length} (start=${chunk.startSeconds}s, len=${chunk.durationSeconds.toFixed(0)}s)` ); const buf = await fs.readFile(chunk.filePath); const chunkOffset = offsetSeconds + chunk.startSeconds; // Diarization-enabled = parallel transcribe + diarize. // Diarization-disabled = transcribe only. // We use Promise.allSettled so a diarize failure doesn't // take down the transcript for the chunk — the transcript // is the critical output, diarization is additive metadata. const calls = [ postOneChunk({ audioBuffer: buf, mimeType, offsetSeconds: chunkOffset, }), ]; if (diarizationEnabled) { calls.push( postDiarizeChunk({ audioBuffer: buf, mimeType, offsetSeconds: chunkOffset, }) ); } const settled = await Promise.allSettled(calls); const txSettled = settled[0]; if (txSettled.status === "rejected") { throw txSettled.reason; } chunkDurationsMs[i] = Date.now() - chunkStart; chunkResults[i] = { ok: true, ...txSettled.value, startSeconds: chunk.startSeconds, }; let diarOk = null; // null = not run, true/false = ran if (diarizationEnabled) { const diarSettled = settled[1]; if (diarSettled.status === "fulfilled") { diarOk = true; chunkDiarization[i] = { ok: true, chunkIndex: i, startSeconds: chunk.startSeconds, // Global timestamp BEFORE which segments in this // chunk are duplicates of the prior chunk's tail // (overlap zone). Used by clusterSpeakers to // dedup diar segments — without this the same // 30s overlap zone gets counted twice toward // speaker totals on every chunk boundary. chunkOverlapBoundarySec: offsetSeconds + (chunk.overlapBoundarySec || 0), ...diarSettled.value, }; } else { diarOk = false; console.warn( `[hardware] diarize chunk ${i + 1}/${chunks.length} failed (transcript intact): ${diarSettled.reason?.message || diarSettled.reason}` ); chunkDiarization[i] = { ok: false, chunkIndex: i, startSeconds: chunk.startSeconds, error: diarSettled.reason, }; } } // Per-chunk completion log so the operator can see // parallelism in the timeline. Earlier we logged only // when each chunk STARTED, which made concurrent // execution invisible (start lines look sequential // because each pair of chunks finishes ~simultaneously, // and only then does the next pair's first start log // appear). Logging completion makes the in-flight // overlap obvious. const dur = ((chunkDurationsMs[i] || 0) / 1000).toFixed(1); const diarTag = diarOk === true ? " · diarize ok" : diarOk === false ? " · diarize FAIL" : ""; console.log( `[hardware] chunk ${i + 1}/${chunks.length} done in ${dur}s${diarTag}` ); // Fire pipelined-analyze hook (no-op when not in // pipelined mode). Hands the caller this chunk's // segments + position so it can advance its time- // covered cursor and unblock any window analyses // that were waiting on this range. if (onChunkComplete) { try { onChunkComplete({ chunkIndex: i, totalChunks: chunks.length, startSeconds: chunk.startSeconds, durationSeconds: chunk.durationSeconds, overlapBoundarySec: chunk.overlapBoundarySec || 0, segments: chunkResults[i].segments || [], diarOk, }); } catch (cbErr) { // Don't kill transcribe if the caller's callback // throws — pipelined mode is best-effort, the // final analyze fallback still works. console.warn( `[hardware] onChunkComplete callback failed for chunk ${i + 1}: ${cbErr?.message || cbErr}` ); } } } catch (err) { chunkDurationsMs[i] = Date.now() - chunkStart; console.warn( `[hardware] chunk ${i + 1}/${chunks.length} failed: ${err?.message || err}` ); chunkResults[i] = { ok: false, error: err, startSeconds: chunk.startSeconds }; } } }; const workers = Array.from({ length: effectiveConcurrency }, worker); await Promise.all(workers); const failedChunks = chunkResults.filter((r) => !r || !r.ok); const succeededChunks = chunkResults.filter((r) => r && r.ok); if (succeededChunks.length === 0) { const first = failedChunks[0]?.error; const e = new Error( `Parakeet chunked transcribe: all ${chunks.length} chunks failed. First error: ${first?.message || "unknown"}` ); e.status = first?.status || 502; throw e; } // Merge in chunk order so timestamps appear chronologically. // Failed chunks leave gaps (their segments are simply absent); // the stitched transcript may have time-gaps the analyzer can // tolerate. Better than aborting the whole job for one // transient chunk failure. // // Overlap dedup: when consecutive chunks share audio at their // boundary (txChunkOverlapSeconds > 0), chunk N+1's first // `overlap` seconds will repeat content from chunk N's tail. // We drop segments from chunk N+1 whose global start time // falls before the chunk's overlap boundary. Chunk 0 has no // prior chunk so its boundary equals its own start time and // the filter is a no-op. const allSegments = []; let totalDurationSec = 0; let dedupedSegmentsCount = 0; for (let i = 0; i < chunkResults.length; i++) { const r = chunkResults[i]; if (!r || !r.ok) continue; const globalOverlapBoundary = offsetSeconds + (chunks[i]?.overlapBoundarySec || 0); for (const seg of r.segments) { if (seg.start >= globalOverlapBoundary) { allSegments.push(seg); } else { dedupedSegmentsCount += 1; } } totalDurationSec = Math.max( totalDurationSec, (r.startSeconds || 0) + (r.duration_seconds || 0) ); } if (dedupedSegmentsCount > 0) { console.log( `[hardware] dropped ${dedupedSegmentsCount} duplicate segment(s) in chunk overlap regions` ); } const lines = allSegments.length ? allSegments.map((s) => `[${formatMmSs(s.start)}] ${s.text}`) : [`[0:00] `]; // Apply the same sort + merge passes Gemini's backend uses // so the user-facing transcript is monotonically ordered and // grouped into readable thought-sized entries instead of one // entry per Parakeet segment (which can be 1-3 seconds each). const stitchedText = mergeShortEntries(sortAndDedupeTranscript(lines.join("\n"))); // Phase 1C: when diarization was enabled, surface the // per-chunk diarization data so the caller can run the // cross-chunk clustering step in Phase 1D. Null/absent when // diarization was off (current default), preserving the // existing response shape for callers that don't consume it. // Each entry: { ok, chunkIndex, startSeconds, duration_s, // segments, speakers_local, fingerprints, models } on // success; { ok: false, chunkIndex, startSeconds, error } on // failure (chunk's transcript is still in `segments`/`text`). const diarizationOut = diarizationEnabled ? chunkDiarization : null; // ── Phase 1D: cross-chunk speaker clustering ── // When diarization ran, cluster the per-chunk fingerprints // into global speaker IDs (Speaker_A, Speaker_B, ...) and // stamp them onto the merged transcript segments. The raw // per-chunk diarization data stays in `diarizationOut` for // debugging; `speakers` is the operator-facing summary; each // entry in `allSegments` gains `.speaker` + `.speaker_confidence` // in place. let speakersOut = null; if (diarizationEnabled) { const okCount = chunkDiarization.filter((d) => d && d.ok).length; const totalFps = chunkDiarization.reduce( (n, d) => n + (d && d.ok ? Object.keys(d.fingerprints || {}).length : 0), 0 ); if (totalFps > 0) { const t0 = Date.now(); const { globalMap, uncertaintyMap, speakers, clusterCount, thresholdSimilarity } = clusterSpeakers(chunkDiarization, clusterThresholdPct, { anchorMinSpeakingSec, smallClusterMaxSpeakingSec, uncertainMarginPct, }); assignSpeakersToSegments(allSegments, chunkDiarization, globalMap, uncertaintyMap); const clusterMs = Date.now() - t0; speakersOut = speakers; console.log( `[hardware] diarization: ${okCount}/${chunks.length} chunks succeeded, ${totalFps} fingerprints → ${clusterCount} distinct speaker(s) at ${(thresholdSimilarity * 100).toFixed(0)}% cosine-sim threshold (clustering took ${clusterMs}ms)` ); } else { console.log( `[hardware] diarization: ${okCount}/${chunks.length} chunks succeeded, 0 fingerprints — clustering skipped` ); } } return { text: stitchedText, segments: allSegments, duration_seconds: totalDurationSec || duration, usage: null, model: transcribeModel, chunk_count: chunks.length, chunks_failed: failedChunks.length, chunk_durations_ms: chunkDurationsMs, diarization: diarizationOut, // Map of globalId → summary stats. Null when diarization // was off OR ran but produced zero fingerprints. See // server/speaker-clustering.js for the shape. speakers: speakersOut, }; } finally { try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {} } }, // (postOneChunk + helpers live below the returned object so they // can close over `parakeet` / `transcribeModel` / `timeoutMs`.) // POST /v1/chat/completions with the OpenAI shape. Ollama's // server, vLLM, llama.cpp's HTTP server, and most other OSS LLM // runners support this wire format — so we don't lock the relay // to one specific Gemma deployment. // // Model-name autodiscovery: if `gemmaModel` is empty OR the // configured model returns "model does not exist", we query // /v1/models and use the first listed model id. Spark Control's // discovery normally supplies the model name explicitly so this // path runs only when discovery's vllm entry is missing the // `model` field. Result is cached for the process lifetime so we // don't pay the /v1/models round-trip on every analyze call. async analyzeText({ prompt }) { if (!gemma) { const err = new Error( "operator-hardware analyze is not configured — Spark Control discovery isn't reporting a ready vllm endpoint" ); err.status = 503; throw err; } // Resolve which model to send. Configured value wins; empty // string triggers autodiscovery. let modelToUse = (analyzeModel || "").trim(); if (!modelToUse) { modelToUse = await discoverFirstModel(gemma, timeoutMs); if (!modelToUse) { const e = new Error( `No analyze model configured and no models discoverable at ${gemma}/v1/models — set Gemma Model Name or check the Gemma endpoint` ); e.status = 503; throw e; } } const result = await callChatCompletions({ base: gemma, model: modelToUse, prompt, maxTokens: anMaxTokens, timeoutMs, }); if (result.ok) { return { text: result.text, usage: null, model: modelToUse, }; } // Detect "model does not exist" (vLLM / Ollama wording) and // retry once with autodiscovered first model — operator may // have swapped models on their hardware without updating the // relay's config field, very common ergonomic miss. const looksLikeModelMissing = result.status === 404 && /model.*(does not exist|not found|unknown)/i.test(result.body || ""); if (looksLikeModelMissing) { const discovered = await discoverFirstModel(gemma, timeoutMs); if (discovered && discovered !== modelToUse) { console.warn( `[hardware] configured model "${modelToUse}" not found at ${gemma}; auto-using discovered "${discovered}"` ); const retry = await callChatCompletions({ base: gemma, model: discovered, prompt, maxTokens: anMaxTokens, timeoutMs, }); if (retry.ok) { return { text: retry.text, usage: null, model: discovered, }; } // Discovered model also failed — fall through to surface // the FIRST failure (the configured-model one) so the // operator sees what they configured + what we tried. } const e = new Error( `Gemma analyze: configured model "${modelToUse}" does not exist on ${gemma}. ` + `Update "Gemma Model Name" in Set Gemma URL — or leave it blank to auto-pick from /v1/models. ` + `Response: ${(result.body || "").slice(0, 200)}` ); e.status = 404; throw e; } const hint = result.status === 404 ? ` (tried both /v1/chat/completions and /chat/completions on ${gemma} — check the Gemma URL)` : ""; const e = new Error( `Gemma analyze ${result.status} at ${result.lastUrl}: ${(result.body || "").slice(0, 300)}${hint}` ); e.status = result.status; throw e; }, }; } // Shared helper that tries both /v1/chat/completions and // /chat/completions, returns the unified result shape. async function callChatCompletions({ base, model, prompt, maxTokens, timeoutMs }) { const pathCandidates = ["/v1/chat/completions", "/chat/completions"]; let res = null; let body = null; let lastUrl = null; for (const p of pathCandidates) { const url = `${base}${p}`; lastUrl = url; try { // lanFetch — analyze goes through Spark Control over HTTPS // with the StartOS Local CA cert. See lan-fetch.js. res = await lanFetch(url, { method: "POST", headers: { "Content-Type": "application/json" }, redirect: "follow", body: JSON.stringify({ model, max_tokens: maxTokens, messages: [{ role: "user", content: prompt }], stream: false, // Force JSON-mode output. Recap's analyze prompt already asks // for a JSON shape; this constrains the model so it can't // wrap the answer in "Here are the sections I found:" prose, // which both costs tokens and trips our parser. response_format: { type: "json_object" }, // Qwen3.6 ships with a reasoning/thinking mode that's great // for math but pure latency-noise for structured extraction. // The chat_template_kwargs field is honored by vLLM's // tokenizer when applying the chat template; non-Qwen // backends ignore it. chat_template_kwargs: { enable_thinking: false }, }), signal: AbortSignal.timeout(timeoutMs), }); } catch (err) { const cause = err?.cause?.message || err?.cause?.code || err?.cause || ""; const detail = cause ? `${err.message} (cause: ${cause})` : err?.message || String(err); const e = new Error( `Gemma analyze network error at ${url}: ${detail}` ); e.status = 502; throw e; } body = await safeBody(res); // Only fall through to the next path candidate when the 404 is a // PATH-shaped 404 (FastAPI's default `{"detail":"Not Found"}` or // an empty body). A 404 with a content body — e.g. vLLM's // `{"error":{"message":"The model X does not exist..."}}` — // means the endpoint exists, the request was rejected for a // different reason. Falling through would mask the real error // behind a path-not-found from the alternate path. const looksLikePath404 = res.status === 404 && (!body || body.length < 60 || /^\s*\{\s*"detail"\s*:\s*"Not Found"\s*\}\s*$/.test(body)); if (!looksLikePath404) break; console.warn(`[hardware] path-404 at ${url} — trying next path candidate`); } if (!res.ok) { return { ok: false, status: res.status, body, lastUrl }; } let data; try { data = JSON.parse(body); } catch { return { ok: false, status: 502, body: "non-JSON response", lastUrl }; } const text = data?.choices?.[0]?.message?.content || ""; return { ok: true, text, lastUrl }; } // Cache of {base → modelId} so we don't pay the /v1/models round-trip // on every call. Per-process; reset on container restart. const _discoveredModelCache = new Map(); async function discoverFirstModel(base, timeoutMs) { if (_discoveredModelCache.has(base)) { return _discoveredModelCache.get(base); } try { const res = await lanFetch(`${base}/v1/models`, { signal: AbortSignal.timeout(timeoutMs), }); if (!res.ok) { console.warn(`[hardware] /v1/models returned ${res.status} at ${base}`); return null; } const data = await res.json(); const first = Array.isArray(data?.data) ? data.data[0] : null; const id = first?.id || null; if (id) _discoveredModelCache.set(base, id); return id; } catch (err) { console.warn(`[hardware] /v1/models discovery failed at ${base}: ${err?.message || err}`); return null; } } function formatMmSs(seconds) { const s = Math.max(0, Math.floor(seconds)); const h = Math.floor(s / 3600); const m = Math.floor((s % 3600) / 60); const sec = s % 60; if (h > 0) return `${h}:${String(m).padStart(2, "0")}:${String(sec).padStart(2, "0")}`; return `${m}:${String(sec).padStart(2, "0")}`; } async function safeBody(res) { try { return await res.text(); } catch { return ""; } } // Map common audio MIME types to a sensible file extension so ffprobe // + ffmpeg pick the right demuxer. Defaults to `mp3` which is the // fallback Recap normalizes to before sending here. function extFromMime(mime) { if (!mime) return "mp3"; const m = mime.toLowerCase(); if (m.includes("mp4") || m.includes("m4a")) return "m4a"; if (m.includes("ogg")) return "ogg"; if (m.includes("opus")) return "opus"; if (m.includes("wav")) return "wav"; if (m.includes("aac")) return "aac"; if (m.includes("webm")) return "webm"; if (m.includes("flac")) return "flac"; return "mp3"; }