1038 lines
44 KiB
JavaScript
1038 lines
44 KiB
JavaScript
// Operator-hardware fallback backend. Forwards transcribe requests to
|
|
// a Parakeet endpoint (or any Whisper-API-compatible server — same wire
|
|
// format) and analyze requests to a Gemma endpoint (or any
|
|
// OpenAI-compatible chat-completions server).
|
|
//
|
|
// Used when a Pro/Max user has exceeded their monthly Gemini cap.
|
|
// Returns the same shape gemini.js produces so route handlers don't
|
|
// need a backend-specific branch downstream:
|
|
// transcribeAudio → { text, segments, duration_seconds }
|
|
// analyzeText → { text }
|
|
//
|
|
// Long-audio handling: Parakeet wrappers on operator GPUs OOM on long
|
|
// audio (one shot of a 108-min file returned "CUDA error: unknown
|
|
// error" on a 24GB card). transcribeAudio chunks audio > 5 minutes
|
|
// into 5-minute slices, transcribes each sequentially, and stitches
|
|
// the segments with continuous timestamps. Single-pass for short
|
|
// audio, no perf hit.
|
|
|
|
import fs from "fs/promises";
|
|
import os from "os";
|
|
import path from "path";
|
|
import {
|
|
getAudioDurationSeconds,
|
|
splitAudioFile,
|
|
} from "../audio-meta.js";
|
|
import {
|
|
sortAndDedupeTranscript,
|
|
mergeShortEntries,
|
|
} from "./gemini.js";
|
|
import { lanFetch } from "../lan-fetch.js";
|
|
import {
|
|
clusterSpeakers,
|
|
assignSpeakersToSegments,
|
|
} from "../speaker-clustering.js";
|
|
|
|
const ANALYZE_MAX_TOKENS = 16000;
|
|
// Gemma served locally tends to live on the host's LAN, not the public
|
|
// internet, so generous timeouts. Same scale as Recap's defaults.
|
|
const DEFAULT_TIMEOUT_MS = 900_000;
|
|
|
|
// Default Parakeet model name — used only when the route handler
|
|
// doesn't supply one (e.g. a unit test instantiating the backend
|
|
// directly). In production the value comes from Spark Control
|
|
// discovery via resolveHardwareConfig().
|
|
//
|
|
// NOTE: there is NO default for the analyze (Gemma) model. An empty
|
|
// gemmaModel intentionally triggers autodiscovery via /v1/models in
|
|
// analyzeText() below. Substituting a hardcoded default here (the
|
|
// old "gemma3:27b") defeats autodiscovery and causes "model does
|
|
// not exist" errors when the operator's hardware hosts a different
|
|
// model name.
|
|
const DEFAULT_TRANSCRIBE_MODEL = "parakeet-tdt-0.6b-v3";
|
|
|
|
// Normalize an OpenAI-API-compatible base URL: strip trailing slash
|
|
// AND strip a trailing `/v1` segment if the operator pasted one,
|
|
// because we always append `/v1/...` below. Without this, a base URL
|
|
// of `http://192.168.1.87:8000/v1` would produce
|
|
// `http://192.168.1.87:8000/v1/v1/audio/transcriptions` → 404.
|
|
function normalizeApiBase(url) {
|
|
let s = (url || "").trim().replace(/\/$/, "");
|
|
s = s.replace(/\/v1$/, "");
|
|
return s;
|
|
}
|
|
|
|
// 503 retry policy — both Parakeet and Sortformer/TitaNet endpoints
|
|
// on Spark Control return 503 + a "retry in ~60s" body when the GPU
|
|
// container CUDA-wedges and auto-restarts. The right thing to do is
|
|
// wait for the documented retry window (or the Retry-After header if
|
|
// supplied) and try again. Honor Retry-After when present, default
|
|
// to 60s, clamp to [5, 120], and add ±5s jitter so parallel chunks
|
|
// don't all wake up and pile on at the exact same moment (thundering
|
|
// herd → instant re-wedge).
|
|
const SPARK_503_MAX_ATTEMPTS = 4;
|
|
function parse503WaitSec(res) {
|
|
const ra = res?.headers?.get?.("retry-after");
|
|
const raNum = parseInt(ra || "60", 10);
|
|
const base = Math.min(120, Math.max(5, Number.isFinite(raNum) ? raNum : 60));
|
|
const jitter = Math.round((Math.random() * 10 - 5) * 10) / 10; // ±5s
|
|
return Math.max(5, Math.round((base + jitter) * 10) / 10);
|
|
}
|
|
function sleepMs(ms) {
|
|
return new Promise((r) => setTimeout(r, ms));
|
|
}
|
|
|
|
export function createHardwareBackend({
|
|
parakeetBaseURL = "",
|
|
gemmaBaseURL = "",
|
|
// Spark Control base URL (no path) — derived by the caller from
|
|
// relay_spark_control_url with the /api/endpoints suffix stripped.
|
|
// Used only for diarize-chunk (POST {spark}/api/audio/diarize-chunk).
|
|
// Transcribe and analyze go to parakeet/gemma respectively, which
|
|
// discovery reports as separate hosts on the operator's LAN.
|
|
// Empty = diarization can't run (the route handler should not have
|
|
// enabled it).
|
|
sparkControlBaseURL = "",
|
|
parakeetModel = DEFAULT_TRANSCRIBE_MODEL,
|
|
gemmaModel = "",
|
|
timeoutMs = DEFAULT_TIMEOUT_MS,
|
|
// Chunking knobs — caller MUST source from relay-config.json. The
|
|
// defaults here are for unit-test ergonomics only. Production
|
|
// callers always pass these from getConfigSnapshot() so the
|
|
// operator's Settings-tab edits flow through.
|
|
//
|
|
// Parallel chunks: per the spark-control LLM developer, Parakeet can
|
|
// handle ~4 concurrent transcribe POSTs on a typical single-GPU
|
|
// operator setup. Earlier versions hard-serialized to 1 in flight
|
|
// out of caution; we now bound by the operator's chosen concurrency.
|
|
txChunkSeconds = 5 * 60,
|
|
// Overlap (seconds) between consecutive chunks. Defaults to 0
|
|
// (matches pre-v0.2.77 behavior); production callers pass the
|
|
// operator's configured `relay_hardware_tx_chunk_overlap_seconds`.
|
|
// When > 0, chunks at the boundary share audio and the stitcher
|
|
// drops words/segments in chunk N+1 whose global timestamps fall
|
|
// within the prior chunk's tail. See splitAudioFile() for the
|
|
// chunking math.
|
|
txChunkOverlapSeconds = 0,
|
|
txConcurrency = 4,
|
|
// Diarization: when true, every chunk gets TWO parallel HTTP calls
|
|
// — the existing /v1/audio/transcriptions to Parakeet AND a sibling
|
|
// /api/audio/diarize-chunk to Sortformer+TitaNet on the same Spark
|
|
// Control host. Diarization output (segments + per-chunk voice
|
|
// fingerprints) is collected per chunk and returned alongside the
|
|
// transcript on the backend response so the relay's caller can run
|
|
// cross-chunk speaker clustering. In Phase 1C this just gathers the
|
|
// data; the clustering + transcript-merge step lands in Phase 1D.
|
|
// Default false (matches the current transcribe-only behavior).
|
|
diarizationEnabled = false,
|
|
// Cosine-similarity threshold for cross-chunk speaker clustering,
|
|
// as an integer percentage (slider system is int-only). 70 means
|
|
// "merge two chunks' speaker fingerprints into the same global
|
|
// speaker if their cosine similarity >= 0.70" — NeMo's recommended
|
|
// default for TitaNet embeddings. Range 50-95 enforced inside the
|
|
// clustering module; values outside the range are clamped.
|
|
clusterThresholdPct = 70,
|
|
// Post-cluster suppression knobs (Phase 2). Operator-tunable via
|
|
// Settings → Operator hardware. Used only when diarization is on.
|
|
// Defaults match speaker-clustering.js's hardcoded fallbacks.
|
|
anchorMinSpeakingSec = 30,
|
|
smallClusterMaxSpeakingSec = 15,
|
|
uncertainMarginPct = 10,
|
|
// Optional per-chunk completion callback for the pipelined-analyze
|
|
// path. Fires AFTER both transcribe + diarize have resolved for
|
|
// each chunk (Promise.allSettled), in the order chunks finish (not
|
|
// necessarily monotonic by chunkIndex when concurrency > 1).
|
|
// Receives:
|
|
// {
|
|
// chunkIndex, // 0-based
|
|
// totalChunks, // emitted on the first call so the caller
|
|
// // knows the count up front
|
|
// startSeconds, // global audio offset where this chunk begins
|
|
// durationSeconds, // chunk audio length
|
|
// overlapBoundarySec, // RELATIVE to chunk start; segments with
|
|
// // global start < (startSeconds + overlap-
|
|
// // BoundarySec) belong to the prior chunk
|
|
// segments, // raw chunk segments, globally-timestamped
|
|
// // (start/end already shifted by startSeconds)
|
|
// diarOk, // true | false | null (null = not run)
|
|
// }
|
|
// Caller uses this to feed a chunk buffer that the pipelined-analyze
|
|
// workers read from. When this callback is null (the default),
|
|
// transcribe behaves as today — caller gets the final stitched
|
|
// result only at the end.
|
|
onChunkComplete = null,
|
|
// Output cap passed to the operator's vLLM/Ollama chat-completion
|
|
// endpoint as `max_tokens`. Default 16000 was the historical
|
|
// hardcoded value. Operators with smaller models that produce
|
|
// shorter JSON sections may want to drop this; operators on
|
|
// larger models with reasoning preambles may want to bump it.
|
|
// Configurable via Settings → `relay_hardware_an_max_tokens`.
|
|
anMaxTokens = ANALYZE_MAX_TOKENS,
|
|
} = {}) {
|
|
const parakeet = normalizeApiBase(parakeetBaseURL);
|
|
const gemma = normalizeApiBase(gemmaBaseURL);
|
|
// Spark Control URL — used ONLY by postDiarizeChunk. Strip any
|
|
// trailing /api/endpoints (the discovery path) and any trailing
|
|
// slash so the diarize URL composes cleanly as `${sparkBase}/api/
|
|
// audio/diarize-chunk`.
|
|
const sparkBase = (sparkControlBaseURL || "")
|
|
.trim()
|
|
.replace(/\/$/, "")
|
|
.replace(/\/api\/endpoints$/, "");
|
|
const transcribeModel = parakeetModel || DEFAULT_TRANSCRIBE_MODEL;
|
|
// Intentionally NOT coalesced to a default — empty string means
|
|
// "autodiscover from /v1/models at request time". See the comment
|
|
// at DEFAULT_TRANSCRIBE_MODEL above for why.
|
|
const analyzeModel = gemmaModel || "";
|
|
|
|
// Single Parakeet POST. The chunking layer in transcribeAudio calls
|
|
// this once per chunk; for short audio it's called once with the
|
|
// whole buffer. Returns { segments, duration_seconds } that the
|
|
// caller stitches.
|
|
//
|
|
// Tries the OpenAI standard `/v1/audio/transcriptions` first, falls
|
|
// back to bare `/audio/transcriptions` on 404. Retries without
|
|
// verbose_json + timestamp_granularities[] if the wrapper rejects
|
|
// the rich shape.
|
|
async function postOneChunk({ audioBuffer, mimeType, offsetSeconds }) {
|
|
const buildForm = (richMode) => {
|
|
const form = new FormData();
|
|
const blob = new Blob([audioBuffer], { type: mimeType });
|
|
form.append("file", blob, "audio.bin");
|
|
form.append("model", transcribeModel);
|
|
if (richMode) {
|
|
form.append("response_format", "verbose_json");
|
|
form.append("timestamp_granularities[]", "segment");
|
|
}
|
|
return form;
|
|
};
|
|
const pathCandidates = [
|
|
"/v1/audio/transcriptions",
|
|
"/audio/transcriptions",
|
|
];
|
|
|
|
// Resolve which path the wrapper exposes (404 from one → try the
|
|
// other). Done before the 503 loop so we don't waste retry
|
|
// budget on a path that doesn't exist.
|
|
let resolvedUrl = null;
|
|
let res = null;
|
|
for (const p of pathCandidates) {
|
|
const url = `${parakeet}${p}`;
|
|
try {
|
|
res = await lanFetch(url, {
|
|
method: "POST",
|
|
body: buildForm(true),
|
|
redirect: "follow",
|
|
signal: AbortSignal.timeout(timeoutMs),
|
|
});
|
|
} catch (err) {
|
|
const cause =
|
|
err?.cause?.message || err?.cause?.code || err?.cause || "";
|
|
const detail = cause ? `${err.message} (cause: ${cause})` : err?.message || String(err);
|
|
const e = new Error(`Parakeet transcribe network error at ${url}: ${detail}`);
|
|
e.status = 502;
|
|
throw e;
|
|
}
|
|
if (res.status !== 404) {
|
|
resolvedUrl = url;
|
|
break;
|
|
}
|
|
console.warn(
|
|
`[hardware] 404 at ${url} — trying next path candidate`
|
|
);
|
|
}
|
|
if (!resolvedUrl) {
|
|
// All path candidates 404'd — return that 404 below.
|
|
resolvedUrl = `${parakeet}${pathCandidates[pathCandidates.length - 1]}`;
|
|
}
|
|
|
|
// ── 503 retry loop ──
|
|
// Spark Control returns 503 + "retry in ~60s" body when the
|
|
// Parakeet container is CUDA-wedged and auto-restarting. We wait
|
|
// (honoring Retry-After) and retry up to SPARK_503_MAX_ATTEMPTS
|
|
// total. Without this loop, the very first chunks in a multi-
|
|
// chunk job all hit a single wedge simultaneously and each chunk
|
|
// permanently fails after one round-trip.
|
|
let attempt = 1;
|
|
while (res.status === 503 && attempt < SPARK_503_MAX_ATTEMPTS) {
|
|
const waitSec = parse503WaitSec(res);
|
|
console.warn(
|
|
`[hardware/tx] 503 at ${resolvedUrl} (attempt ${attempt}/${SPARK_503_MAX_ATTEMPTS}) — Spark Control reports transient CUDA wedge, waiting ${waitSec}s then retrying`
|
|
);
|
|
await sleepMs(waitSec * 1000);
|
|
try {
|
|
res = await lanFetch(resolvedUrl, {
|
|
method: "POST",
|
|
body: buildForm(true),
|
|
redirect: "follow",
|
|
signal: AbortSignal.timeout(timeoutMs),
|
|
});
|
|
} catch (err) {
|
|
const cause =
|
|
err?.cause?.message || err?.cause?.code || err?.cause || "";
|
|
const detail = cause ? `${err.message} (cause: ${cause})` : err?.message || String(err);
|
|
const e = new Error(`Parakeet transcribe network error at ${resolvedUrl} (retry ${attempt}): ${detail}`);
|
|
e.status = 502;
|
|
throw e;
|
|
}
|
|
attempt += 1;
|
|
}
|
|
|
|
// ── Rich-mode fallback ──
|
|
// If the wrapper rejected verbose_json + timestamp_granularities
|
|
// (some non-OpenAI Parakeet servers do — this is a PARAMETER
|
|
// issue, not a server-availability issue), retry once without
|
|
// those fields. Distinct from the 503 retry above — different
|
|
// failure mode, different fix. We skip this on 503 because the
|
|
// 503 loop has already exhausted retries.
|
|
if (
|
|
!res.ok &&
|
|
res.status !== 503 &&
|
|
res.status >= 400 &&
|
|
res.status < 600 &&
|
|
res.status !== 404
|
|
) {
|
|
const richBody = await safeBody(res);
|
|
console.warn(
|
|
`[hardware] rich Parakeet request to ${resolvedUrl} returned ${res.status}: ${richBody.slice(0, 200)} — retrying without verbose_json`
|
|
);
|
|
try {
|
|
res = await lanFetch(resolvedUrl, {
|
|
method: "POST",
|
|
body: buildForm(false),
|
|
redirect: "follow",
|
|
signal: AbortSignal.timeout(timeoutMs),
|
|
});
|
|
} catch (err) {
|
|
const e = new Error(
|
|
`Parakeet transcribe network error (bare fallback) at ${resolvedUrl}: ${err?.message || err}`
|
|
);
|
|
e.status = 502;
|
|
throw e;
|
|
}
|
|
}
|
|
if (!res.ok) {
|
|
const body = await safeBody(res);
|
|
const hint =
|
|
res.status === 404
|
|
? ` (tried ${pathCandidates.join(" and ")} on base ${parakeet} — wrapper may expose the endpoint at a different path; check the Parakeet URL or container logs)`
|
|
: res.status === 503
|
|
? ` (Spark Control reported transient CUDA wedge ${SPARK_503_MAX_ATTEMPTS} times in a row — operator container may need manual restart)`
|
|
: "";
|
|
const e = new Error(
|
|
`Parakeet transcribe ${res.status} at ${resolvedUrl}: ${body.slice(0, 300)}${hint}`
|
|
);
|
|
e.status = res.status;
|
|
throw e;
|
|
}
|
|
const data = await res.json();
|
|
const segments = Array.isArray(data.segments) ? data.segments : [];
|
|
const shifted = segments.map((s) => ({
|
|
start: (s.start || 0) + offsetSeconds,
|
|
end: (s.end || 0) + offsetSeconds,
|
|
text: (s.text || "").trim(),
|
|
}));
|
|
return {
|
|
segments: shifted,
|
|
duration_seconds: data.duration || 0,
|
|
};
|
|
}
|
|
|
|
// Parallel diarization call for the same audio chunk. Hits
|
|
// POST {spark_control}/api/audio/diarize-chunk (Sortformer for
|
|
// segmentation, TitaNet for per-speaker voice fingerprints). The
|
|
// returned `speaker` labels are LOCAL to this chunk — Speaker_0
|
|
// here is not necessarily the same person as Speaker_0 in chunk
|
|
// N+1; the relay's cross-chunk clustering step (Phase 1D)
|
|
// reconciles them via cosine similarity on the fingerprints.
|
|
//
|
|
// Retry policy from the SC dev:
|
|
// 503 + Retry-After: Parakeet/Sortformer CUDA wedge — container
|
|
// is auto-restarting in the background. Wait + retry once.
|
|
// 502: container unreachable. Fail.
|
|
// 500: real error. Fail.
|
|
//
|
|
// Segment timestamps are shifted by offsetSeconds so they
|
|
// already sit on the global audio timeline when returned —
|
|
// matches postOneChunk's behavior so the merge step in Phase 1D
|
|
// can align word timestamps and segment timestamps directly
|
|
// without re-shifting.
|
|
async function postDiarizeChunk({ audioBuffer, mimeType, offsetSeconds }) {
|
|
if (!sparkBase) {
|
|
const e = new Error(
|
|
"Diarization called but sparkControlBaseURL is empty — caller should not have enabled diarization without Service Discovery configured"
|
|
);
|
|
e.status = 500;
|
|
throw e;
|
|
}
|
|
const buildForm = () => {
|
|
const form = new FormData();
|
|
const blob = new Blob([audioBuffer], { type: mimeType });
|
|
form.append("file", blob, "audio.bin");
|
|
return form;
|
|
};
|
|
// Diarize-chunk lives on Spark Control (the same host that serves
|
|
// /api/endpoints) — NOT on the Parakeet wrapper. Discovery's
|
|
// parakeet.base_url usually points to a separate STT-only service
|
|
// that doesn't have this endpoint. Use lanFetch so Spark Control's
|
|
// StartOS Local Intermediate CA cert doesn't fail TLS validation.
|
|
const url = `${sparkBase}/api/audio/diarize-chunk`;
|
|
let res = null;
|
|
// 503 retry loop — same policy as the transcribe path. Up to
|
|
// SPARK_503_MAX_ATTEMPTS, honoring Retry-After with ±5s jitter
|
|
// so parallel chunks don't synchronize their retries and pile
|
|
// back onto the freshly-restarted container at the same instant.
|
|
for (let attempt = 0; attempt < SPARK_503_MAX_ATTEMPTS; attempt++) {
|
|
try {
|
|
res = await lanFetch(url, {
|
|
method: "POST",
|
|
body: buildForm(),
|
|
redirect: "follow",
|
|
signal: AbortSignal.timeout(timeoutMs),
|
|
});
|
|
} catch (err) {
|
|
// undici wraps the underlying socket error in err.cause —
|
|
// surfacing it makes "fetch failed" diagnosable (ECONNREFUSED
|
|
// vs ETIMEDOUT vs TLS handshake vs DNS).
|
|
const cause =
|
|
err?.cause?.message || err?.cause?.code || err?.cause || "";
|
|
const detail = cause ? `${err.message} (cause: ${cause})` : err?.message || String(err);
|
|
const e = new Error(`Diarize network error at ${url}: ${detail}`);
|
|
e.status = 502;
|
|
throw e;
|
|
}
|
|
if (res.status === 503 && attempt < SPARK_503_MAX_ATTEMPTS - 1) {
|
|
const waitSec = parse503WaitSec(res);
|
|
console.warn(
|
|
`[hardware/diarize] 503 at ${url} (attempt ${attempt + 1}/${SPARK_503_MAX_ATTEMPTS}) — Spark Control reports transient CUDA wedge, waiting ${waitSec}s then retrying`
|
|
);
|
|
await sleepMs(waitSec * 1000);
|
|
continue;
|
|
}
|
|
break;
|
|
}
|
|
if (!res.ok) {
|
|
const body = await safeBody(res);
|
|
const hint =
|
|
res.status === 503
|
|
? ` (Spark Control reported transient CUDA wedge ${SPARK_503_MAX_ATTEMPTS} times in a row — operator container may need manual restart)`
|
|
: "";
|
|
const e = new Error(
|
|
`Diarize ${res.status} at ${url}: ${body.slice(0, 300)}${hint}`
|
|
);
|
|
e.status = res.status;
|
|
throw e;
|
|
}
|
|
const data = await res.json();
|
|
const rawSegments = Array.isArray(data.segments) ? data.segments : [];
|
|
const shiftedSegments = rawSegments.map((s) => ({
|
|
// Use the same `start` / `end` naming as postOneChunk to keep
|
|
// the merge code uniform across segment shapes.
|
|
start: (s.start_s || 0) + offsetSeconds,
|
|
end: (s.end_s || 0) + offsetSeconds,
|
|
speaker_local: s.speaker || "Speaker_unknown",
|
|
confidence:
|
|
typeof s.confidence === "number" && Number.isFinite(s.confidence)
|
|
? s.confidence
|
|
: null,
|
|
}));
|
|
return {
|
|
duration_s: typeof data.duration === "number" ? data.duration : 0,
|
|
segments: shiftedSegments,
|
|
speakers_local: Array.isArray(data.speakers_detected)
|
|
? data.speakers_detected
|
|
: Array.isArray(data.speakers_local)
|
|
? data.speakers_local
|
|
: [],
|
|
fingerprints:
|
|
data.fingerprints && typeof data.fingerprints === "object"
|
|
? data.fingerprints
|
|
: {},
|
|
models: data.models || null,
|
|
};
|
|
}
|
|
|
|
return {
|
|
hasTranscribe: !!parakeet,
|
|
hasAnalyze: !!gemma,
|
|
|
|
// Public API. Receives a full audio buffer; decides whether to
|
|
// send it to Parakeet in one shot or to chunk-and-stitch.
|
|
async transcribeAudio({
|
|
audio,
|
|
mimeType = "application/octet-stream",
|
|
offsetSeconds = 0,
|
|
}) {
|
|
if (!parakeet) {
|
|
const err = new Error(
|
|
"operator-hardware transcribe is not configured — Spark Control discovery isn't reporting a ready parakeet endpoint"
|
|
);
|
|
err.status = 503;
|
|
throw err;
|
|
}
|
|
|
|
// Write to temp file so ffprobe + ffmpeg can read it (no
|
|
// reliable stdin pathway for either tool on every codec).
|
|
// Cleanup happens unconditionally in `finally`.
|
|
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "hw-tx-"));
|
|
const inputPath = path.join(tmpDir, "input." + extFromMime(mimeType));
|
|
await fs.writeFile(inputPath, audio);
|
|
|
|
try {
|
|
const duration = await getAudioDurationSeconds(inputPath);
|
|
// Build the chunk list. For audio that fits in a single chunk
|
|
// (or where ffprobe couldn't read the duration), we synthesize
|
|
// a 1-entry descriptor pointing at the ORIGINAL file — no
|
|
// ffmpeg cut needed. This routes the short-audio case through
|
|
// the SAME worker loop as multi-chunk audio so:
|
|
// - diarization still fires (when enabled)
|
|
// - onChunkComplete still fires (otherwise pipelined-analyze
|
|
// workers wait forever on a callback that never comes — the
|
|
// bug that hung Internal Meetings + short YouTube videos)
|
|
// - cross-chunk clustering still runs (N=1 cluster, but
|
|
// speakers are still labeled on the segments)
|
|
// - bracketed-text stitching runs (the single-shot path used
|
|
// to return text="" because postOneChunk doesn't emit text)
|
|
// For longer audio, splitAudioFile cuts on disk as before.
|
|
let chunks;
|
|
if (!duration || duration <= txChunkSeconds) {
|
|
chunks = [{
|
|
filePath: inputPath,
|
|
startSeconds: 0,
|
|
durationSeconds: duration || 0,
|
|
overlapBoundarySec: 0,
|
|
index: 0,
|
|
}];
|
|
} else {
|
|
chunks = await splitAudioFile({
|
|
inputPath,
|
|
outputDir: tmpDir,
|
|
chunkSeconds: txChunkSeconds,
|
|
overlapSeconds: txChunkOverlapSeconds,
|
|
});
|
|
}
|
|
const effectiveConcurrency = Math.min(chunks.length, Math.max(1, txConcurrency));
|
|
const durLabel = duration ? `${duration.toFixed(0)}s` : "unknown-length";
|
|
if (chunks.length === 1) {
|
|
console.log(
|
|
`[hardware] ${durLabel} audio fits in a single chunk — sending in one shot (no ffmpeg split)`
|
|
);
|
|
} else {
|
|
console.log(
|
|
`[hardware] chunking ${durLabel} audio into ${chunks.length} pieces of up to ${txChunkSeconds}s (overlap=${txChunkOverlapSeconds}s), ${effectiveConcurrency} in parallel`
|
|
);
|
|
}
|
|
if (diarizationEnabled) {
|
|
console.log(
|
|
`[hardware] diarization ENABLED — each chunk fires 2 parallel calls (transcribe + diarize) against Spark Control`
|
|
);
|
|
}
|
|
const chunkResults = new Array(chunks.length);
|
|
// Per-chunk wall-time tracking. Summed by callers into the
|
|
// audit row's transcribe_ms_sum so the operator can see total
|
|
// backend compute across all Parakeet POSTs (distinct from
|
|
// the outer wall-time which captures the parallel-fan-out).
|
|
const chunkDurationsMs = new Array(chunks.length).fill(null);
|
|
// Per-chunk diarization data. Populated when
|
|
// diarizationEnabled is true. Each entry mirrors the chunk's
|
|
// index in `chunks` so the clustering step (Phase 1D) can
|
|
// correlate per-chunk speaker labels back to chunk positions.
|
|
// null = diarization didn't run OR failed for this chunk.
|
|
const chunkDiarization = new Array(chunks.length).fill(null);
|
|
let nextIdx = 0;
|
|
const worker = async () => {
|
|
while (true) {
|
|
const i = nextIdx++;
|
|
if (i >= chunks.length) return;
|
|
const chunk = chunks[i];
|
|
const chunkStart = Date.now();
|
|
try {
|
|
console.log(
|
|
`[hardware] chunk ${i + 1}/${chunks.length} (start=${chunk.startSeconds}s, len=${chunk.durationSeconds.toFixed(0)}s)`
|
|
);
|
|
const buf = await fs.readFile(chunk.filePath);
|
|
const chunkOffset = offsetSeconds + chunk.startSeconds;
|
|
// Diarization-enabled = parallel transcribe + diarize.
|
|
// Diarization-disabled = transcribe only.
|
|
// We use Promise.allSettled so a diarize failure doesn't
|
|
// take down the transcript for the chunk — the transcript
|
|
// is the critical output, diarization is additive metadata.
|
|
const calls = [
|
|
postOneChunk({
|
|
audioBuffer: buf,
|
|
mimeType,
|
|
offsetSeconds: chunkOffset,
|
|
}),
|
|
];
|
|
if (diarizationEnabled) {
|
|
calls.push(
|
|
postDiarizeChunk({
|
|
audioBuffer: buf,
|
|
mimeType,
|
|
offsetSeconds: chunkOffset,
|
|
})
|
|
);
|
|
}
|
|
const settled = await Promise.allSettled(calls);
|
|
const txSettled = settled[0];
|
|
if (txSettled.status === "rejected") {
|
|
throw txSettled.reason;
|
|
}
|
|
chunkDurationsMs[i] = Date.now() - chunkStart;
|
|
chunkResults[i] = {
|
|
ok: true,
|
|
...txSettled.value,
|
|
startSeconds: chunk.startSeconds,
|
|
};
|
|
let diarOk = null; // null = not run, true/false = ran
|
|
if (diarizationEnabled) {
|
|
const diarSettled = settled[1];
|
|
if (diarSettled.status === "fulfilled") {
|
|
diarOk = true;
|
|
chunkDiarization[i] = {
|
|
ok: true,
|
|
chunkIndex: i,
|
|
startSeconds: chunk.startSeconds,
|
|
// Global timestamp BEFORE which segments in this
|
|
// chunk are duplicates of the prior chunk's tail
|
|
// (overlap zone). Used by clusterSpeakers to
|
|
// dedup diar segments — without this the same
|
|
// 30s overlap zone gets counted twice toward
|
|
// speaker totals on every chunk boundary.
|
|
chunkOverlapBoundarySec:
|
|
offsetSeconds + (chunk.overlapBoundarySec || 0),
|
|
...diarSettled.value,
|
|
};
|
|
} else {
|
|
diarOk = false;
|
|
console.warn(
|
|
`[hardware] diarize chunk ${i + 1}/${chunks.length} failed (transcript intact): ${diarSettled.reason?.message || diarSettled.reason}`
|
|
);
|
|
chunkDiarization[i] = {
|
|
ok: false,
|
|
chunkIndex: i,
|
|
startSeconds: chunk.startSeconds,
|
|
error: diarSettled.reason,
|
|
};
|
|
}
|
|
}
|
|
// Per-chunk completion log so the operator can see
|
|
// parallelism in the timeline. Earlier we logged only
|
|
// when each chunk STARTED, which made concurrent
|
|
// execution invisible (start lines look sequential
|
|
// because each pair of chunks finishes ~simultaneously,
|
|
// and only then does the next pair's first start log
|
|
// appear). Logging completion makes the in-flight
|
|
// overlap obvious.
|
|
const dur = ((chunkDurationsMs[i] || 0) / 1000).toFixed(1);
|
|
const diarTag =
|
|
diarOk === true ? " · diarize ok" :
|
|
diarOk === false ? " · diarize FAIL" : "";
|
|
console.log(
|
|
`[hardware] chunk ${i + 1}/${chunks.length} done in ${dur}s${diarTag}`
|
|
);
|
|
// Fire pipelined-analyze hook (no-op when not in
|
|
// pipelined mode). Hands the caller this chunk's
|
|
// segments + position so it can advance its time-
|
|
// covered cursor and unblock any window analyses
|
|
// that were waiting on this range.
|
|
if (onChunkComplete) {
|
|
try {
|
|
onChunkComplete({
|
|
chunkIndex: i,
|
|
totalChunks: chunks.length,
|
|
startSeconds: chunk.startSeconds,
|
|
durationSeconds: chunk.durationSeconds,
|
|
overlapBoundarySec: chunk.overlapBoundarySec || 0,
|
|
segments: chunkResults[i].segments || [],
|
|
diarOk,
|
|
});
|
|
} catch (cbErr) {
|
|
// Don't kill transcribe if the caller's callback
|
|
// throws — pipelined mode is best-effort, the
|
|
// final analyze fallback still works.
|
|
console.warn(
|
|
`[hardware] onChunkComplete callback failed for chunk ${i + 1}: ${cbErr?.message || cbErr}`
|
|
);
|
|
}
|
|
}
|
|
} catch (err) {
|
|
chunkDurationsMs[i] = Date.now() - chunkStart;
|
|
console.warn(
|
|
`[hardware] chunk ${i + 1}/${chunks.length} failed: ${err?.message || err}`
|
|
);
|
|
chunkResults[i] = { ok: false, error: err, startSeconds: chunk.startSeconds };
|
|
}
|
|
}
|
|
};
|
|
const workers = Array.from({ length: effectiveConcurrency }, worker);
|
|
await Promise.all(workers);
|
|
|
|
const failedChunks = chunkResults.filter((r) => !r || !r.ok);
|
|
const succeededChunks = chunkResults.filter((r) => r && r.ok);
|
|
if (succeededChunks.length === 0) {
|
|
const first = failedChunks[0]?.error;
|
|
const e = new Error(
|
|
`Parakeet chunked transcribe: all ${chunks.length} chunks failed. First error: ${first?.message || "unknown"}`
|
|
);
|
|
e.status = first?.status || 502;
|
|
throw e;
|
|
}
|
|
// Merge in chunk order so timestamps appear chronologically.
|
|
// Failed chunks leave gaps (their segments are simply absent);
|
|
// the stitched transcript may have time-gaps the analyzer can
|
|
// tolerate. Better than aborting the whole job for one
|
|
// transient chunk failure.
|
|
//
|
|
// Overlap dedup: when consecutive chunks share audio at their
|
|
// boundary (txChunkOverlapSeconds > 0), chunk N+1's first
|
|
// `overlap` seconds will repeat content from chunk N's tail.
|
|
// We drop segments from chunk N+1 whose global start time
|
|
// falls before the chunk's overlap boundary. Chunk 0 has no
|
|
// prior chunk so its boundary equals its own start time and
|
|
// the filter is a no-op.
|
|
const allSegments = [];
|
|
let totalDurationSec = 0;
|
|
let dedupedSegmentsCount = 0;
|
|
for (let i = 0; i < chunkResults.length; i++) {
|
|
const r = chunkResults[i];
|
|
if (!r || !r.ok) continue;
|
|
const globalOverlapBoundary =
|
|
offsetSeconds + (chunks[i]?.overlapBoundarySec || 0);
|
|
for (const seg of r.segments) {
|
|
if (seg.start >= globalOverlapBoundary) {
|
|
allSegments.push(seg);
|
|
} else {
|
|
dedupedSegmentsCount += 1;
|
|
}
|
|
}
|
|
totalDurationSec = Math.max(
|
|
totalDurationSec,
|
|
(r.startSeconds || 0) + (r.duration_seconds || 0)
|
|
);
|
|
}
|
|
if (dedupedSegmentsCount > 0) {
|
|
console.log(
|
|
`[hardware] dropped ${dedupedSegmentsCount} duplicate segment(s) in chunk overlap regions`
|
|
);
|
|
}
|
|
const lines = allSegments.length
|
|
? allSegments.map((s) => `[${formatMmSs(s.start)}] ${s.text}`)
|
|
: [`[0:00] `];
|
|
// Apply the same sort + merge passes Gemini's backend uses
|
|
// so the user-facing transcript is monotonically ordered and
|
|
// grouped into readable thought-sized entries instead of one
|
|
// entry per Parakeet segment (which can be 1-3 seconds each).
|
|
const stitchedText = mergeShortEntries(sortAndDedupeTranscript(lines.join("\n")));
|
|
// Phase 1C: when diarization was enabled, surface the
|
|
// per-chunk diarization data so the caller can run the
|
|
// cross-chunk clustering step in Phase 1D. Null/absent when
|
|
// diarization was off (current default), preserving the
|
|
// existing response shape for callers that don't consume it.
|
|
// Each entry: { ok, chunkIndex, startSeconds, duration_s,
|
|
// segments, speakers_local, fingerprints, models } on
|
|
// success; { ok: false, chunkIndex, startSeconds, error } on
|
|
// failure (chunk's transcript is still in `segments`/`text`).
|
|
const diarizationOut = diarizationEnabled
|
|
? chunkDiarization
|
|
: null;
|
|
|
|
// ── Phase 1D: cross-chunk speaker clustering ──
|
|
// When diarization ran, cluster the per-chunk fingerprints
|
|
// into global speaker IDs (Speaker_A, Speaker_B, ...) and
|
|
// stamp them onto the merged transcript segments. The raw
|
|
// per-chunk diarization data stays in `diarizationOut` for
|
|
// debugging; `speakers` is the operator-facing summary; each
|
|
// entry in `allSegments` gains `.speaker` + `.speaker_confidence`
|
|
// in place.
|
|
let speakersOut = null;
|
|
if (diarizationEnabled) {
|
|
const okCount = chunkDiarization.filter((d) => d && d.ok).length;
|
|
const totalFps = chunkDiarization.reduce(
|
|
(n, d) =>
|
|
n + (d && d.ok ? Object.keys(d.fingerprints || {}).length : 0),
|
|
0
|
|
);
|
|
if (totalFps > 0) {
|
|
const t0 = Date.now();
|
|
const { globalMap, uncertaintyMap, speakers, clusterCount, thresholdSimilarity } =
|
|
clusterSpeakers(chunkDiarization, clusterThresholdPct, {
|
|
anchorMinSpeakingSec,
|
|
smallClusterMaxSpeakingSec,
|
|
uncertainMarginPct,
|
|
});
|
|
assignSpeakersToSegments(allSegments, chunkDiarization, globalMap, uncertaintyMap);
|
|
const clusterMs = Date.now() - t0;
|
|
speakersOut = speakers;
|
|
console.log(
|
|
`[hardware] diarization: ${okCount}/${chunks.length} chunks succeeded, ${totalFps} fingerprints → ${clusterCount} distinct speaker(s) at ${(thresholdSimilarity * 100).toFixed(0)}% cosine-sim threshold (clustering took ${clusterMs}ms)`
|
|
);
|
|
} else {
|
|
console.log(
|
|
`[hardware] diarization: ${okCount}/${chunks.length} chunks succeeded, 0 fingerprints — clustering skipped`
|
|
);
|
|
}
|
|
}
|
|
return {
|
|
text: stitchedText,
|
|
segments: allSegments,
|
|
duration_seconds: totalDurationSec || duration,
|
|
usage: null,
|
|
model: transcribeModel,
|
|
chunk_count: chunks.length,
|
|
chunks_failed: failedChunks.length,
|
|
chunk_durations_ms: chunkDurationsMs,
|
|
diarization: diarizationOut,
|
|
// Map of globalId → summary stats. Null when diarization
|
|
// was off OR ran but produced zero fingerprints. See
|
|
// server/speaker-clustering.js for the shape.
|
|
speakers: speakersOut,
|
|
};
|
|
} finally {
|
|
try {
|
|
await fs.rm(tmpDir, { recursive: true, force: true });
|
|
} catch {}
|
|
}
|
|
},
|
|
|
|
// (postOneChunk + helpers live below the returned object so they
|
|
// can close over `parakeet` / `transcribeModel` / `timeoutMs`.)
|
|
|
|
// POST <gemma>/v1/chat/completions with the OpenAI shape. Ollama's
|
|
// server, vLLM, llama.cpp's HTTP server, and most other OSS LLM
|
|
// runners support this wire format — so we don't lock the relay
|
|
// to one specific Gemma deployment.
|
|
//
|
|
// Model-name autodiscovery: if `gemmaModel` is empty OR the
|
|
// configured model returns "model does not exist", we query
|
|
// /v1/models and use the first listed model id. Spark Control's
|
|
// discovery normally supplies the model name explicitly so this
|
|
// path runs only when discovery's vllm entry is missing the
|
|
// `model` field. Result is cached for the process lifetime so we
|
|
// don't pay the /v1/models round-trip on every analyze call.
|
|
async analyzeText({ prompt }) {
|
|
if (!gemma) {
|
|
const err = new Error(
|
|
"operator-hardware analyze is not configured — Spark Control discovery isn't reporting a ready vllm endpoint"
|
|
);
|
|
err.status = 503;
|
|
throw err;
|
|
}
|
|
|
|
// Resolve which model to send. Configured value wins; empty
|
|
// string triggers autodiscovery.
|
|
let modelToUse = (analyzeModel || "").trim();
|
|
if (!modelToUse) {
|
|
modelToUse = await discoverFirstModel(gemma, timeoutMs);
|
|
if (!modelToUse) {
|
|
const e = new Error(
|
|
`No analyze model configured and no models discoverable at ${gemma}/v1/models — set Gemma Model Name or check the Gemma endpoint`
|
|
);
|
|
e.status = 503;
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
const result = await callChatCompletions({
|
|
base: gemma,
|
|
model: modelToUse,
|
|
prompt,
|
|
maxTokens: anMaxTokens,
|
|
timeoutMs,
|
|
});
|
|
if (result.ok) {
|
|
return {
|
|
text: result.text,
|
|
usage: null,
|
|
model: modelToUse,
|
|
};
|
|
}
|
|
|
|
// Detect "model does not exist" (vLLM / Ollama wording) and
|
|
// retry once with autodiscovered first model — operator may
|
|
// have swapped models on their hardware without updating the
|
|
// relay's config field, very common ergonomic miss.
|
|
const looksLikeModelMissing =
|
|
result.status === 404 &&
|
|
/model.*(does not exist|not found|unknown)/i.test(result.body || "");
|
|
if (looksLikeModelMissing) {
|
|
const discovered = await discoverFirstModel(gemma, timeoutMs);
|
|
if (discovered && discovered !== modelToUse) {
|
|
console.warn(
|
|
`[hardware] configured model "${modelToUse}" not found at ${gemma}; auto-using discovered "${discovered}"`
|
|
);
|
|
const retry = await callChatCompletions({
|
|
base: gemma,
|
|
model: discovered,
|
|
prompt,
|
|
maxTokens: anMaxTokens,
|
|
timeoutMs,
|
|
});
|
|
if (retry.ok) {
|
|
return {
|
|
text: retry.text,
|
|
usage: null,
|
|
model: discovered,
|
|
};
|
|
}
|
|
// Discovered model also failed — fall through to surface
|
|
// the FIRST failure (the configured-model one) so the
|
|
// operator sees what they configured + what we tried.
|
|
}
|
|
const e = new Error(
|
|
`Gemma analyze: configured model "${modelToUse}" does not exist on ${gemma}. ` +
|
|
`Update "Gemma Model Name" in Set Gemma URL — or leave it blank to auto-pick from /v1/models. ` +
|
|
`Response: ${(result.body || "").slice(0, 200)}`
|
|
);
|
|
e.status = 404;
|
|
throw e;
|
|
}
|
|
|
|
const hint =
|
|
result.status === 404
|
|
? ` (tried both /v1/chat/completions and /chat/completions on ${gemma} — check the Gemma URL)`
|
|
: "";
|
|
const e = new Error(
|
|
`Gemma analyze ${result.status} at ${result.lastUrl}: ${(result.body || "").slice(0, 300)}${hint}`
|
|
);
|
|
e.status = result.status;
|
|
throw e;
|
|
},
|
|
};
|
|
}
|
|
|
|
// Shared helper that tries both /v1/chat/completions and
|
|
// /chat/completions, returns the unified result shape.
|
|
async function callChatCompletions({ base, model, prompt, maxTokens, timeoutMs }) {
|
|
const pathCandidates = ["/v1/chat/completions", "/chat/completions"];
|
|
let res = null;
|
|
let body = null;
|
|
let lastUrl = null;
|
|
for (const p of pathCandidates) {
|
|
const url = `${base}${p}`;
|
|
lastUrl = url;
|
|
try {
|
|
// lanFetch — analyze goes through Spark Control over HTTPS
|
|
// with the StartOS Local CA cert. See lan-fetch.js.
|
|
res = await lanFetch(url, {
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
redirect: "follow",
|
|
body: JSON.stringify({
|
|
model,
|
|
max_tokens: maxTokens,
|
|
messages: [{ role: "user", content: prompt }],
|
|
stream: false,
|
|
// Force JSON-mode output. Recap's analyze prompt already asks
|
|
// for a JSON shape; this constrains the model so it can't
|
|
// wrap the answer in "Here are the sections I found:" prose,
|
|
// which both costs tokens and trips our parser.
|
|
response_format: { type: "json_object" },
|
|
// Qwen3.6 ships with a reasoning/thinking mode that's great
|
|
// for math but pure latency-noise for structured extraction.
|
|
// The chat_template_kwargs field is honored by vLLM's
|
|
// tokenizer when applying the chat template; non-Qwen
|
|
// backends ignore it.
|
|
chat_template_kwargs: { enable_thinking: false },
|
|
}),
|
|
signal: AbortSignal.timeout(timeoutMs),
|
|
});
|
|
} catch (err) {
|
|
const cause =
|
|
err?.cause?.message || err?.cause?.code || err?.cause || "";
|
|
const detail = cause ? `${err.message} (cause: ${cause})` : err?.message || String(err);
|
|
const e = new Error(
|
|
`Gemma analyze network error at ${url}: ${detail}`
|
|
);
|
|
e.status = 502;
|
|
throw e;
|
|
}
|
|
body = await safeBody(res);
|
|
// Only fall through to the next path candidate when the 404 is a
|
|
// PATH-shaped 404 (FastAPI's default `{"detail":"Not Found"}` or
|
|
// an empty body). A 404 with a content body — e.g. vLLM's
|
|
// `{"error":{"message":"The model X does not exist..."}}` —
|
|
// means the endpoint exists, the request was rejected for a
|
|
// different reason. Falling through would mask the real error
|
|
// behind a path-not-found from the alternate path.
|
|
const looksLikePath404 =
|
|
res.status === 404 &&
|
|
(!body ||
|
|
body.length < 60 ||
|
|
/^\s*\{\s*"detail"\s*:\s*"Not Found"\s*\}\s*$/.test(body));
|
|
if (!looksLikePath404) break;
|
|
console.warn(`[hardware] path-404 at ${url} — trying next path candidate`);
|
|
}
|
|
if (!res.ok) {
|
|
return { ok: false, status: res.status, body, lastUrl };
|
|
}
|
|
let data;
|
|
try {
|
|
data = JSON.parse(body);
|
|
} catch {
|
|
return { ok: false, status: 502, body: "non-JSON response", lastUrl };
|
|
}
|
|
const text = data?.choices?.[0]?.message?.content || "";
|
|
return { ok: true, text, lastUrl };
|
|
}
|
|
|
|
// Cache of {base → modelId} so we don't pay the /v1/models round-trip
|
|
// on every call. Per-process; reset on container restart.
|
|
const _discoveredModelCache = new Map();
|
|
async function discoverFirstModel(base, timeoutMs) {
|
|
if (_discoveredModelCache.has(base)) {
|
|
return _discoveredModelCache.get(base);
|
|
}
|
|
try {
|
|
const res = await lanFetch(`${base}/v1/models`, {
|
|
signal: AbortSignal.timeout(timeoutMs),
|
|
});
|
|
if (!res.ok) {
|
|
console.warn(`[hardware] /v1/models returned ${res.status} at ${base}`);
|
|
return null;
|
|
}
|
|
const data = await res.json();
|
|
const first = Array.isArray(data?.data) ? data.data[0] : null;
|
|
const id = first?.id || null;
|
|
if (id) _discoveredModelCache.set(base, id);
|
|
return id;
|
|
} catch (err) {
|
|
console.warn(`[hardware] /v1/models discovery failed at ${base}: ${err?.message || err}`);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
function formatMmSs(seconds) {
|
|
const s = Math.max(0, Math.floor(seconds));
|
|
const h = Math.floor(s / 3600);
|
|
const m = Math.floor((s % 3600) / 60);
|
|
const sec = s % 60;
|
|
if (h > 0)
|
|
return `${h}:${String(m).padStart(2, "0")}:${String(sec).padStart(2, "0")}`;
|
|
return `${m}:${String(sec).padStart(2, "0")}`;
|
|
}
|
|
|
|
async function safeBody(res) {
|
|
try {
|
|
return await res.text();
|
|
} catch {
|
|
return "";
|
|
}
|
|
}
|
|
|
|
// Map common audio MIME types to a sensible file extension so ffprobe
|
|
// + ffmpeg pick the right demuxer. Defaults to `mp3` which is the
|
|
// fallback Recap normalizes to before sending here.
|
|
function extFromMime(mime) {
|
|
if (!mime) return "mp3";
|
|
const m = mime.toLowerCase();
|
|
if (m.includes("mp4") || m.includes("m4a")) return "m4a";
|
|
if (m.includes("ogg")) return "ogg";
|
|
if (m.includes("opus")) return "opus";
|
|
if (m.includes("wav")) return "wav";
|
|
if (m.includes("aac")) return "aac";
|
|
if (m.includes("webm")) return "webm";
|
|
if (m.includes("flac")) return "flac";
|
|
return "mp3";
|
|
}
|