Add Spark Control hardware backend (diarize, queue, discovery)

This commit is contained in:
Keysat
2026-06-13 13:36:04 -05:00
parent 705807e286
commit 84d56c94c9
6 changed files with 1435 additions and 172 deletions
File diff suppressed because it is too large Load Diff
+92
View File
@@ -0,0 +1,92 @@
// Resolves which (URL, model) the relay should use for each operator-
// hardware pipeline. Spark Control is the SINGLE host the relay
// talks to — it owns transcribe (/v1/audio/transcriptions),
// diarize-chunk (/api/audio/diarize-chunk), AND analyze
// (/v1/chat/completions, OpenAI-compatible) on the same port. The
// discovery JSON Spark Control serves at /api/endpoints lists the
// backing services SC delegates to internally (e.g. a Parakeet
// wrapper, a vLLM instance) along with their ready state + currently-
// loaded model name; the relay reads it ONLY to learn (a) whether
// each service is ready, and (b) which model name to send in the
// upstream request body. The per-service base URLs in the discovery
// JSON are informational — clients hit Spark Control directly, not
// those internal URLs.
//
// Priority order, per pipeline:
//
// 1. Spark Control says ready → use the SC base URL + the model
// name discovery reports for that
// service.
//
// 2. SC says not_ready → return null URL + a blocked_reason
// string. Route handlers surface
// this to the user instead of trying
// a doomed call.
//
// 3. SC unreachable or no URL → null URL; planBackend treats the
// hardware path as not configured.
//
// History:
// - pre-v0.2.84: operator-typed override URLs (relay_parakeet_*,
// relay_gemma_*) won over discovery. Removed.
// - pre-v0.2.85: discovery's per-service base_urls were used
// directly, so transcribe went to a different host (the Parakeet
// wrapper) than diarize (Spark Control). That meant the relay
// was talking to two hosts for one logical operation, and the
// transcribe wrapper didn't have the diarize endpoint. Now: SC
// is the single host. The per-service base_urls in discovery
// are informational — used only for the operator's awareness in
// the dashboard's Service Discovery health line.
import { getSparkServiceState } from "./spark-control.js";
export async function resolveHardwareConfig(cfg) {
const sparkUrl = (cfg.relay_spark_control_url || "").trim();
// Spark Control base URL with the /api/endpoints suffix and any
// trailing slash stripped. Used as the URL for every operator-
// hardware call: transcribe, diarize-chunk, analyze.
const sparkBase = sparkUrl.replace(/\/$/, "").replace(/\/api\/endpoints$/, "");
const transcribe = await resolveOne({ sparkUrl, sparkBase, kind: "transcribe" });
const analyze = await resolveOne({ sparkUrl, sparkBase, kind: "analyze" });
const tts = await resolveOne({ sparkUrl, sparkBase, kind: "tts" });
return {
transcribe,
analyze,
tts,
sparkBase,
};
}
async function resolveOne({ sparkUrl, sparkBase, kind }) {
if (!sparkUrl || !sparkBase) {
return { url: null, model: null, source: null };
}
const state = await getSparkServiceState(sparkUrl, kind);
if (state.status === "ready") {
return {
// ALWAYS Spark Control's base URL — never the delegate's
// base_url. SC owns the wire-facing endpoint; the delegate
// URL is just where SC routes the request internally.
url: sparkBase,
// Model name comes from discovery so we send the right
// `model` field upstream. SC tells us what model is currently
// loaded on its parakeet / vllm delegate.
model: state.model,
source: "spark-control",
};
}
if (state.status === "not_ready") {
return {
url: null,
model: null,
source: "spark-control",
blocked_reason: state.reason,
};
}
// status === "unknown" → discovery unreachable or that service
// isn't listed. Return null URL; the route handler surfaces the
// error to the user.
return { url: null, model: null, source: null };
}
+110
View File
@@ -0,0 +1,110 @@
// Process-global FIFO queue for hardware-backed jobs.
//
// Why this exists: Spark Control + the operator's GPU box can really
// only handle one full pipeline at a time. Pre-queue, two concurrent
// /relay/v1/summarize-url requests would each spin up their own
// async worker — both would fire 2-parallel transcribe chunks + 2
// parallel diarize calls (and analyze on top), totalling 8+
// simultaneous calls into Spark Control. The GPU thrashes and
// either both jobs slow down massively or chunks start failing.
//
// The fix is a single-slot semaphore around any code path that
// drives the hardware backend. Gemini paths bypass entirely — the
// Google API handles arbitrary concurrency at its end and we have
// no local bottleneck. Intra-job concurrency (the operator's
// configured 2-parallel chunk worker) stays — the operator's GPU
// can handle 2 simultaneous transcribe POSTs per the SC dev's
// guidance; the queue serializes whole jobs, not individual calls
// within a job.
//
// Use:
// const release = await acquireHardwareSlot({
// jobId,
// onWait: (info) => { ... emit "queued" SSE event ... },
// });
// try {
// // ... do hardware-backed pipeline ...
// } finally {
// release();
// }
//
// onWait fires AT MOST ONCE, before this caller starts waiting.
// `info.position` is 1-indexed (1 = next up after the current
// active job, 2 = one slot back, etc). Callers use this to push
// a "queued" event to the SSE client so the UI can show "Queued
// — N jobs ahead of you".
let queueHead = Promise.resolve();
let pendingCount = 0; // includes the currently-active job
let currentJobId = null;
let queueLog = []; // bounded ring buffer for /admin/queue visibility
const MAX_LOG_ENTRIES = 50;
function logQueueEvent(event) {
queueLog.push({ at: Date.now(), ...event });
if (queueLog.length > MAX_LOG_ENTRIES) {
queueLog = queueLog.slice(-MAX_LOG_ENTRIES);
}
}
export async function acquireHardwareSlot({ jobId, onWait } = {}) {
pendingCount += 1;
const positionInQueue = pendingCount - 1; // 0 = active immediately, 1+ = waiting
const prev = queueHead;
let releaseFn;
const slot = new Promise((resolve) => {
releaseFn = resolve;
});
queueHead = prev.then(() => slot);
if (positionInQueue > 0) {
logQueueEvent({
kind: "enqueued",
jobId: jobId || null,
position: positionInQueue,
activeJobId: currentJobId,
});
if (typeof onWait === "function") {
try {
onWait({ position: positionInQueue, activeJobId: currentJobId });
} catch {}
}
console.log(
`[hardware-queue] job ${jobId ? jobId.slice(0, 8) : "(no-id)"} queued at position ${positionInQueue} (active: ${currentJobId ? currentJobId.slice(0, 8) : "none"})`
);
}
await prev;
currentJobId = jobId || null;
logQueueEvent({ kind: "started", jobId: jobId || null });
console.log(
`[hardware-queue] job ${jobId ? jobId.slice(0, 8) : "(no-id)"} started — ${pendingCount - 1} waiting`
);
let released = false;
return () => {
if (released) return;
released = true;
const wasActive = currentJobId;
currentJobId = null;
pendingCount -= 1;
logQueueEvent({ kind: "released", jobId: wasActive });
console.log(
`[hardware-queue] job ${wasActive ? wasActive.slice(0, 8) : "(no-id)"} released — ${pendingCount} still in queue`
);
releaseFn();
};
}
// Read-only snapshot for the operator dashboard. Returns:
// pendingCount: total jobs in the queue (active + waiting)
// currentJobId: the job currently holding the slot, or null
// log: recent queue events (most recent last; bounded to 50)
export function getHardwareQueueStatus() {
return {
pendingCount,
currentJobId,
log: queueLog.slice(),
};
}
+41
View File
@@ -0,0 +1,41 @@
// TLS-tolerant fetch for LAN calls to operator-managed services that
// use StartOS Local Intermediate CA certs (or other self-signed certs)
// the relay container doesn't trust by default.
//
// Scope: Spark Control discovery + diarize-chunk POST. Those are the
// only HTTPS endpoints the relay talks to over the operator's own
// LAN. Public-internet calls (Gemini, Keysat, BTCPay) keep going
// through the normal global fetch with full cert validation — we
// don't pipe those through this helper.
//
// Safety: the URL passed in came from the operator's own config
// (Service Discovery URL field). The relay isn't auto-discovering or
// following untrusted redirects to arbitrary hosts.
//
// Implementation: undici's Agent supports per-dispatcher
// rejectUnauthorized control; Node's built-in fetch globally does
// not. We construct one Agent at module init and pass it as the
// dispatcher on every call.
import { Agent, fetch as undiciFetch } from "undici";
const lanAgent = new Agent({
connect: { rejectUnauthorized: false },
// 5s for TCP+TLS handshake — long enough for cold StartOS sockets,
// short enough to fail fast on a bad URL.
connectTimeout: 5000,
// Diarize-chunk and transcribe on the operator's GPU box are slow
// (Sortformer + TitaNet on a 5-min chunk can run 30-120s before
// SC sends back response headers). headersTimeout/bodyTimeout = 0
// disables undici's own watchdog — we rely entirely on the
// AbortSignal.timeout the caller passes (900s by default for
// hardware backend calls). Without this, lanFetch was killing the
// connection at 10s with a bare "fetch failed" before SC had any
// chance to respond.
headersTimeout: 0,
bodyTimeout: 0,
});
export async function lanFetch(url, opts = {}) {
return undiciFetch(url, { ...opts, dispatcher: lanAgent });
}
+58
View File
@@ -0,0 +1,58 @@
// Passive error reporting to Spark Control's /api/health-event
// endpoint. When a hardware-backed transcribe or analyze call fails
// (Parakeet 503, vLLM model not found, network timeout to Gemma,
// etc.), the relay fires a small POST so the operator's Spark
// Control dashboard can surface the failure in real time — without
// waiting for its own 5s health-check poll to catch the outage.
//
// Fire-and-forget: the request is kicked off but NOT awaited by the
// caller's error path, so reporting failure to Spark Control never
// adds latency to the user's transcribe-failed response.
//
// Reuses the operator's `relay_spark_control_url` config field
// (already used for /api/endpoints discovery). The relay just swaps
// the path for /api/health-event, keeping the operator config
// surface to one URL.
import { getConfigSnapshot } from "./config.js";
const REPORT_TIMEOUT_MS = 3_000;
// Fire-and-forget. service ∈ { "parakeet", "vllm", "gemma", ... }
// matching whatever Spark Control's poller knows about. Errors are
// swallowed silently — observability hiccups shouldn't bleed into
// the relay's error path.
export function reportHealthEvent({ service, ok = false, error, ms }) {
// Wrap the actual work in setImmediate so the caller's microtask
// queue isn't blocked at all.
setImmediate(async () => {
try {
const cfg = await getConfigSnapshot();
const base = (cfg.relay_spark_control_url || "").trim();
if (!base) return; // not configured — silent no-op
// Strip any path the operator may have configured (they
// typically set the /api/endpoints URL) and append health-event.
let origin;
try {
origin = new URL(base).origin;
} catch {
return;
}
const url = `${origin}/api/health-event`;
await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
service,
ok,
source: "recap-relay",
error: typeof error === "string" ? error.slice(0, 280) : null,
ms: typeof ms === "number" ? ms : null,
}),
signal: AbortSignal.timeout(REPORT_TIMEOUT_MS),
});
} catch {
// Best-effort — swallow.
}
});
}
+207
View File
@@ -0,0 +1,207 @@
// Optional service-discovery client. When the operator points
// relay_spark_control_url at a Spark Control (or compatible) endpoint
// that serves a JSON document describing the local LLM/STT/TTS
// services, the relay uses that to fill in any URL + model fields
// the operator left blank in their per-backend config.
//
// Expected JSON shape (Spark Control's /api/endpoints):
// {
// "vllm": { ready: bool, base_url: "...", model: "...", openai_compat: bool },
// "parakeet": { ready: bool, base_url: "...", kind: "stt", model: "..." },
// "kokoro": { ready: bool, base_url: "...", kind: "tts" }
// }
//
// Cached for SHORT_CACHE_TTL_MS to keep relay responses snappy while
// still picking up model swaps on the operator's GPU box within a
// minute. Unreachable / failing endpoint falls back to operator-typed
// values silently — no boot-time hard dep.
import { lanFetch } from "./lan-fetch.js";
const SHORT_CACHE_TTL_MS = 60 * 1000; // 60s
const FETCH_TIMEOUT_MS = 3000;
let cached = { fetched_at: 0, url: null, discovery: null };
// Last error from a discovery fetch attempt — surfaced in the
// dashboard so the operator can see when discovery is silently
// failing (the alternative is "operator-typed values silently win"
// which produces confusing "fetch failed" errors downstream when the
// override URL is also broken). Cleared on each successful fetch.
let lastError = { at: 0, message: null };
// Fire-and-forget background refresh while serving from cache —
// callers never block on the network. The first call after the cache
// expires returns the stale snapshot but kicks off a refresh in the
// background, so the next call gets the fresh data without ever
// having paid a round-trip in the critical path.
let inflight = null;
// Operator-visible discovery snapshot used by the admin dashboard.
// Returns: { configured, url, lastFetched, lastError, services }
// - configured: whether the operator set a discovery URL at all
// - url: the configured URL (or null)
// - lastFetched: epoch ms of last successful fetch (or 0)
// - lastError: { at, message } of last failed fetch (cleared on success)
// - services: parsed map of { parakeet: {...}, vllm: {...}, ... }
// when the last fetch succeeded; null otherwise.
//
// `configured` reflects the operator's config; the other fields
// reflect what actually happened on the network. Read-only — never
// triggers a fetch, just reports the cached state. The dashboard's
// Settings tab renders this so the operator can spot a silently-
// failing discovery (typo, unreachable host, TLS cert that the
// container doesn't trust, etc.) without grepping container logs.
export function getSparkDiscoveryStatus(configuredUrl) {
const url = (configuredUrl || "").trim();
return {
configured: !!url,
url: url || null,
lastFetched: cached.url === url ? cached.fetched_at : 0,
lastError: cached.url === url && lastError.at > cached.fetched_at
? { at: lastError.at, message: lastError.message }
: null,
services: cached.url === url && cached.discovery
? Object.fromEntries(
Object.entries(cached.discovery).map(([k, v]) => [
k,
{
ready: !!(v && v.ready),
base_url: v && v.base_url ? String(v.base_url) : null,
model: v && v.model ? String(v.model) : null,
kind: v && v.kind ? String(v.kind) : null,
},
])
)
: null,
};
}
export async function getSparkControlDiscovery(url) {
if (!url) return null;
// If the URL changed (operator updated config), invalidate.
if (cached.url !== url) {
cached = { fetched_at: 0, url, discovery: null };
}
const fresh = Date.now() - cached.fetched_at < SHORT_CACHE_TTL_MS;
if (fresh && cached.discovery) {
return cached.discovery;
}
// Stale (or never fetched). Trigger a background refresh; return
// whatever we currently have (might be null on cold boot).
if (!inflight) {
inflight = fetchDiscovery(url)
.then((discovery) => {
cached = { fetched_at: Date.now(), url, discovery };
lastError = { at: 0, message: null };
})
.catch((err) => {
const msg = err?.message || String(err);
lastError = { at: Date.now(), message: msg };
console.warn(`[spark-control] discovery fetch failed for ${url}: ${msg}`);
})
.finally(() => {
inflight = null;
});
}
// On cold start, wait briefly for the first fetch so we don't serve
// a request with null discovery if Spark Control is alive.
if (!cached.discovery) {
try {
await Promise.race([
inflight,
new Promise((r) => setTimeout(r, FETCH_TIMEOUT_MS + 500)),
]);
} catch {}
}
return cached.discovery;
}
async function fetchDiscovery(url) {
// lanFetch uses an undici Agent with rejectUnauthorized:false so
// that Spark Control's StartOS Local Intermediate CA cert (the
// standard Start9 LAN setup) doesn't fail TLS validation in the
// relay container. Plain-http LAN URLs work through the same path
// without TLS overhead. Public-internet calls (Gemini, Keysat,
// BTCPay) keep using the global fetch with full cert validation
// — see lan-fetch.js for the scope rationale.
const res = await lanFetch(url, {
signal: AbortSignal.timeout(FETCH_TIMEOUT_MS),
redirect: "follow",
});
if (!res.ok) {
throw new Error(`HTTP ${res.status}`);
}
return await res.json();
}
// Given a Spark Control URL and a "kind" hint, return the live
// service state. Three outcomes:
//
// { status: "ready", base_url, model }
// Service is up + healthy. Use these values.
//
// { status: "not_ready", reason }
// Service is listed in discovery but ready=false. A model swap
// might be in progress; or the operator hasn't loaded a model
// yet; or the wrapper crashed. Route handlers should fail fast
// with this message so the user knows what to fix on Spark
// Control instead of seeing a generic 500.
//
// { status: "unknown" }
// Discovery URL not configured, not reachable, or the requested
// service isn't in the response. Caller should fall back to
// operator-typed config.
//
// kind: "transcribe" → uses .parakeet (any STT-shaped entry)
// kind: "analyze" → uses .vllm (any OpenAI-compat LLM entry)
// kind: "tts" → uses .kokoro (Kokoro TTS entry; no `model`
// field — voice is chosen per-request by the
// caller, so a ready kokoro with a base_url is
// enough)
export async function getSparkServiceState(url, kind) {
if (!url) return { status: "unknown" };
let discovery;
try {
discovery = await getSparkControlDiscovery(url);
} catch {
return { status: "unknown" };
}
if (!discovery) return { status: "unknown" };
const target =
kind === "transcribe"
? discovery.parakeet
: kind === "analyze"
? discovery.vllm
: kind === "tts"
? discovery.kokoro
: null;
if (!target) return { status: "unknown" };
if (!target.base_url) return { status: "unknown" };
if (target.ready === false) {
return {
status: "not_ready",
reason:
kind === "transcribe"
? "Spark Control reports STT (parakeet) is offline. Check spark-control — a model swap may be in progress, or the wrapper needs attention."
: kind === "tts"
? "Spark Control reports TTS (kokoro) is offline. Check spark-control — the Kokoro container may be down or restarting."
: "Spark Control reports the LLM (vllm) is offline. Check spark-control — load a model via the dashboard or wait for an in-progress swap to finish.",
};
}
return {
status: "ready",
base_url: target.base_url,
model: target.model || null,
};
}
// Kept for backward compatibility — same signature as before, returns
// null when the service isn't ready. New code should call
// getSparkServiceState directly so it can distinguish "not configured"
// from "configured but not ready".
export async function getDiscoveredEndpoint(url, kind) {
const state = await getSparkServiceState(url, kind);
if (state.status !== "ready") return null;
return { base_url: state.base_url, model: state.model };
}