diff --git a/image/app/deep_health.py b/image/app/deep_health.py new file mode 100644 index 0000000..6695e04 --- /dev/null +++ b/image/app/deep_health.py @@ -0,0 +1,341 @@ +"""Deep health probes for each service. + +Why this exists: Triton's /health endpoint returns 200 as long as the HTTP +layer is alive and the model is registered. It does NOT verify that the CUDA +context inside the worker process is healthy. We've observed Parakeet getting +its CUDA context wedged after an OOM, where /health stays green but every +real transcription returns 500 cudaErrorUnknown. + +So this module sends *real* but tiny synthetic inference requests: + - Parakeet: 1 second of digital silence (16 kHz mono PCM, in-memory WAV) + - Magpie: short text-to-speech, response audio discarded + - vLLM: 1-token chat completion against whatever model is loaded + +All synthetic payloads are generated on demand into BytesIO, sent over HTTP, +and never touched the filesystem (on either spark-control's side or the +target service's side beyond normal Triton/Riva working memory). + +When a probe fails with a signal that looks like a CUDA wedge, we +automatically issue `docker restart `. Rate-limited to 3 restarts +per service per 30 minutes to avoid restart loops. +""" +from __future__ import annotations +import asyncio +import io +import time +import wave +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Optional + +import httpx + +from .config import Settings +from .connectivity import record_report +from .services import ServiceDef, run_action, services_from_settings + + +# Default 5-minute interval, controllable via env. Sub-minute is silly for a +# heavy synthetic probe; we just want to catch wedges within a reasonable +# window — much faster than the user noticing on their next real call. +DEFAULT_INTERVAL_SEC = 300.0 +PROBE_TIMEOUT_SEC = 20.0 +RESTART_RATE_LIMIT = 3 # max auto-restarts per service +RESTART_RATE_WINDOW_SEC = 1800.0 # within a 30-min window +RESTART_COOLDOWN_SEC = 120.0 # don't restart again within this many seconds of the last one +STARTUP_GRACE_SEC = 60.0 # don't auto-restart for the first minute after this app boots + + +def _silence_wav(seconds: float = 1.0, sample_rate: int = 16000) -> io.BytesIO: + """Return an in-memory WAV file containing `seconds` of digital silence.""" + n_frames = int(seconds * sample_rate) + buf = io.BytesIO() + with wave.open(buf, "wb") as w: + w.setnchannels(1) + w.setsampwidth(2) # int16 + w.setframerate(sample_rate) + w.writeframes(b"\x00\x00" * n_frames) + buf.seek(0) + return buf + + +def _looks_like_wedge(error: str) -> bool: + """Heuristic: does this error string look like a stuck CUDA context that + a container restart would clear? We want to be conservative — only act + on signals we're confident about, otherwise leave the user in charge.""" + err = (error or "").lower() + needles = [ + "cudaerrorunknown", + "cuda error: unknown", + "cuda kernel errors", + "internal server error", + "engine core initialization failed", + "503", # service unavailable from a dependency + "500", # generic 5xx with a body that may not parse + ] + return any(n in err for n in needles) + + +@dataclass +class ProbeResult: + ok: bool + at: str + latency_ms: Optional[int] = None + error: str = "" + note: str = "" + + +@dataclass +class ServiceState: + last: Optional[ProbeResult] = None + last_ok_at: Optional[str] = None + restarts: list[float] = field(default_factory=list) + + +class DeepHealth: + def __init__(self, settings: Settings, interval_sec: float = DEFAULT_INTERVAL_SEC) -> None: + self.settings = settings + self.interval_sec = interval_sec + self.state: dict[str, ServiceState] = { + "parakeet": ServiceState(), + "magpie": ServiceState(), + "vllm": ServiceState(), + } + self._stop = asyncio.Event() + self._boot_at = time.monotonic() + + # ---- probes --------------------------------------------------------- + + async def probe_parakeet(self) -> ProbeResult: + s = self.settings + now_iso = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + if not s.parakeet_host: + return ProbeResult(ok=False, at=now_iso, error="not configured") + url = f"http://{s.parakeet_host}:{s.parakeet_port}/v1/audio/transcriptions" + wav = _silence_wav(1.0) + t0 = time.monotonic() + try: + async with httpx.AsyncClient(timeout=PROBE_TIMEOUT_SEC) as c: + r = await c.post( + url, + files={"file": ("probe.wav", wav, "audio/wav")}, + data={"model": "parakeet-tdt-0.6b-v3"}, + ) + latency = round((time.monotonic() - t0) * 1000) + if 200 <= r.status_code < 300: + return ProbeResult(ok=True, at=now_iso, latency_ms=latency) + return ProbeResult( + ok=False, + at=now_iso, + latency_ms=latency, + error=f"HTTP {r.status_code}: {r.text[:240]}", + ) + except Exception as e: + return ProbeResult(ok=False, at=now_iso, error=f"{type(e).__name__}: {e}") + + async def probe_magpie(self) -> ProbeResult: + s = self.settings + now_iso = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + if not s.magpie_host: + return ProbeResult(ok=False, at=now_iso, error="not configured") + # Magpie /v1/audio/synthesize expects multipart form-data, not JSON. + # The (None, value) tuple in httpx's `files=` produces a non-file form field. + url = f"http://{s.magpie_host}:{s.magpie_port}/v1/audio/synthesize" + form: dict = {"text": (None, "hi"), "language": (None, "en-US")} + t0 = time.monotonic() + try: + async with httpx.AsyncClient(timeout=PROBE_TIMEOUT_SEC) as c: + r = await c.post(url, files=form) + latency = round((time.monotonic() - t0) * 1000) + if 200 <= r.status_code < 300: + return ProbeResult(ok=True, at=now_iso, latency_ms=latency) + # 4xx that aren't 5xx mean server is alive but our payload is off — + # don't classify as wedge. + if 400 <= r.status_code < 500: + return ProbeResult( + ok=True, + at=now_iso, + latency_ms=latency, + note=f"{r.status_code} — server alive (probe payload may need a voice name)", + ) + return ProbeResult( + ok=False, + at=now_iso, + latency_ms=latency, + error=f"HTTP {r.status_code}: {r.text[:240]}", + ) + except Exception as e: + return ProbeResult(ok=False, at=now_iso, error=f"{type(e).__name__}: {e}") + + async def probe_vllm(self) -> ProbeResult: + s = self.settings + now_iso = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + if not s.spark1_host: + return ProbeResult(ok=False, at=now_iso, error="not configured") + base = f"http://{s.spark1_host}:{s.vllm_port}" + try: + async with httpx.AsyncClient(timeout=5.0) as c: + r = await c.get(f"{base}/v1/models") + r.raise_for_status() + models = r.json().get("data") or [] + if not models: + return ProbeResult(ok=False, at=now_iso, error="no model loaded") + model_id = models[0]["id"] + except Exception as e: + return ProbeResult(ok=False, at=now_iso, error=f"list models: {type(e).__name__}: {e}") + t0 = time.monotonic() + try: + async with httpx.AsyncClient(timeout=PROBE_TIMEOUT_SEC) as c: + r = await c.post( + f"{base}/v1/chat/completions", + json={ + "model": model_id, + "messages": [{"role": "user", "content": "hi"}], + "max_tokens": 1, + "temperature": 0, + }, + ) + latency = round((time.monotonic() - t0) * 1000) + if 200 <= r.status_code < 300: + return ProbeResult(ok=True, at=now_iso, latency_ms=latency) + return ProbeResult( + ok=False, + at=now_iso, + latency_ms=latency, + error=f"HTTP {r.status_code}: {r.text[:240]}", + ) + except Exception as e: + return ProbeResult(ok=False, at=now_iso, error=f"{type(e).__name__}: {e}") + + # ---- orchestration -------------------------------------------------- + + PROBES = { + "parakeet": "probe_parakeet", + "magpie": "probe_magpie", + "vllm": "probe_vllm", + } + + async def run_one(self, service: str) -> ProbeResult: + fn = getattr(self, self.PROBES[service]) + result: ProbeResult = await fn() + st = self.state[service] + prev_ok = st.last.ok if st.last else None + st.last = result + if result.ok: + st.last_ok_at = result.at + + # Log to connectivity history: every failure, plus the first success + # after a failure (recovery), plus the first probe ever — but skip + # the "still ok" steady-state to keep the log readable. + if not result.ok: + record_report( + service, + ok=False, + source="deep-health", + detail=result.error[:240], + latency_ms=result.latency_ms, + ) + elif prev_ok is False: + record_report( + service, + ok=True, + source="deep-health", + detail="recovered" + (f" — {result.note}" if result.note else ""), + latency_ms=result.latency_ms, + ) + elif prev_ok is None: + record_report( + service, + ok=True, + source="deep-health", + detail="first probe ok" + (f" — {result.note}" if result.note else ""), + latency_ms=result.latency_ms, + ) + + # Maybe auto-restart + if not result.ok and _looks_like_wedge(result.error): + await self._maybe_restart(service, result.error) + return result + + async def _maybe_restart(self, service: str, error: str) -> None: + # No restarts during the boot grace period. + if time.monotonic() - self._boot_at < STARTUP_GRACE_SEC: + return + st = self.state[service] + now = time.monotonic() + st.restarts = [t for t in st.restarts if now - t < RESTART_RATE_WINDOW_SEC] + if st.restarts and now - st.restarts[-1] < RESTART_COOLDOWN_SEC: + return # already restarted recently, give it time + if len(st.restarts) >= RESTART_RATE_LIMIT: + record_report( + service, + ok=False, + source="deep-health", + detail=f"rate-limited; not auto-restarting (would be #{len(st.restarts)+1} in 30 min)", + ) + return + services = services_from_settings(self.settings) + if service not in services: + return + svc = services[service] + if not svc.host or not svc.user: + return + result = await run_action(self.settings, svc, "restart") + st.restarts.append(now) + ok = result.get("ok", False) + record_report( + service, + ok=False, + source="deep-health", + detail=f"auto-restart triggered (wedge: {error[:120]}); restart {'OK' if ok else 'FAILED'}", + ) + + async def run_all(self) -> dict[str, ProbeResult]: + results = {} + for name in self.PROBES: + results[name] = await self.run_one(name) + return results + + async def run_periodic(self) -> None: + """Long-running loop. Cancel via .stop().""" + # Brief initial wait to let app finish startup + try: + await asyncio.wait_for(self._stop.wait(), timeout=10.0) + return + except asyncio.TimeoutError: + pass + while not self._stop.is_set(): + try: + await self.run_all() + except Exception: + # Never let the loop die; the periodic check is best-effort + pass + try: + await asyncio.wait_for(self._stop.wait(), timeout=self.interval_sec) + return + except asyncio.TimeoutError: + continue + + def stop(self) -> None: + self._stop.set() + + def summary(self) -> dict: + out = {} + for name, st in self.state.items(): + last = st.last + out[name] = { + "last_ok_at": st.last_ok_at, + "last": ( + { + "ok": last.ok, + "at": last.at, + "latency_ms": last.latency_ms, + "error": last.error, + "note": last.note, + } + if last + else None + ), + "auto_restarts_window": len(st.restarts), + } + return out diff --git a/image/app/server.py b/image/app/server.py index 9b6707f..4996261 100644 --- a/image/app/server.py +++ b/image/app/server.py @@ -12,6 +12,7 @@ from typing import Literal from .config import Settings from .connectivity import get_mac, record_report, record_state, summary as connectivity_summary from .custom_services import add_custom_service, delete_custom_service +from .deep_health import DeepHealth from .download import DownloadManager from .hardware import HardwareProbe from .health import check_magpie, check_parakeet, check_vllm @@ -33,9 +34,22 @@ download_manager = DownloadManager(settings) update_manager = UpdateManager(settings) hardware_probe = HardwareProbe(settings) nim_manager = NimManager(settings) +deep_health = DeepHealth(settings) app = FastAPI(title="spark-control", version="0.1.0") + +@app.on_event("startup") +async def _start_deep_health() -> None: + # Fire-and-forget; the loop catches its own exceptions. + asyncio.create_task(deep_health.run_periodic()) + + +@app.on_event("shutdown") +async def _stop_deep_health() -> None: + deep_health.stop() + + _STATIC_DIR = Path(__file__).resolve().parent / "static" app.mount("/static", StaticFiles(directory=_STATIC_DIR), name="static") @@ -137,6 +151,27 @@ async def get_connectivity() -> dict: return connectivity_summary() +@app.get("/api/deep-health") +async def get_deep_health() -> dict: + """Last result + auto-restart counters for each service's synthetic probe.""" + return deep_health.summary() + + +@app.post("/api/deep-health/{service}/run") +async def run_deep_health(service: str) -> dict: + """Manually run a single service's deep-health probe right now.""" + if service not in deep_health.PROBES: + raise HTTPException(404, f"unknown service: {service}") + result = await deep_health.run_one(service) + return { + "ok": result.ok, + "at": result.at, + "latency_ms": result.latency_ms, + "error": result.error, + "note": result.note, + } + + class HealthEventBody(BaseModel): service: str # e.g. "parakeet", "magpie", "vllm" ok: bool # true on success, false on failure diff --git a/image/app/static/app.js b/image/app/static/app.js index 10d31fb..b15ca62 100644 --- a/image/app/static/app.js +++ b/image/app/static/app.js @@ -17,6 +17,7 @@ const state = { config: {}, configured: true, timer_handle: null, + deep_health: {}, }; const el = (sel) => document.querySelector(sel); @@ -413,6 +414,35 @@ async function renderServices() { const restartsRow = s.restart_count != null && s.restart_count > 1 ? `
Restarts${s.restart_count}
` : ''; + const dh = state.deep_health?.[name]; + let deepRow = ''; + if (dh && dh.last) { + const last = dh.last; + const when = (last.at || '').slice(11, 19); // HH:MM:SS + const verdict = last.ok + ? `deep check ok` + : `deep check FAILED`; + const lat = last.latency_ms != null ? ` ${last.latency_ms} ms` : ''; + const restarts = dh.auto_restarts_window > 0 + ? ` · ${dh.auto_restarts_window} auto-restart${dh.auto_restarts_window === 1 ? '' : 's'} in 30 min` + : ''; + deepRow = ` +
+ Deep + ${verdict} ${escapeHtml(when)}${lat}${restarts} + +
+ ${last.ok ? '' : `
${escapeHtml((last.error || last.note || '').slice(0, 200))}
`} + `; + } else if (dh) { + deepRow = ` +
+ Deep + no probe yet + +
+ `; + } card.innerHTML = `
${escapeHtml(name)} @@ -423,6 +453,7 @@ async function renderServices() { ${urlRow} ${modelRow} ${restartsRow} + ${deepRow}
@@ -434,6 +465,25 @@ async function renderServices() { for (const btn of grid.querySelectorAll('.btn[data-svc-action]')) { btn.addEventListener('click', () => onServiceAction(btn.dataset.svcAction)); } + for (const btn of grid.querySelectorAll('[data-dh-run]')) { + btn.addEventListener('click', () => onDeepHealthRun(btn.dataset.dhRun, btn)); + } +} + +async function onDeepHealthRun(name, btn) { + btn.disabled = true; + const orig = btn.textContent; + btn.textContent = '…'; + try { + await fetchJSON(`/api/deep-health/${encodeURIComponent(name)}/run`, { method: 'POST' }); + } catch (e) { + console.warn('deep-health run failed', e); + } finally { + try { state.deep_health = await fetchJSON('/api/deep-health'); } catch {} + btn.textContent = orig; + btn.disabled = false; + renderServices(); + } } async function onServiceAction(key) { @@ -668,6 +718,7 @@ async function pollStatus() { // Refresh services state lazily — every 5s poll triggers this too. try { state.services = await fetchJSON('/api/services'); + try { state.deep_health = await fetchJSON('/api/deep-health'); } catch {} renderServices(); } catch {} if (status.current_swap_job && status.current_swap_job !== state.swap_job_id) { diff --git a/image/app/static/style.css b/image/app/static/style.css index b13d37c..e6e30b5 100644 --- a/image/app/static/style.css +++ b/image/app/static/style.css @@ -622,6 +622,19 @@ main { .service-card .row .v.copyable.copied { outline: 1px solid var(--accent); background: rgba(74, 222, 128, 0.05); } .service-card .row .icon-btn { padding: 3px 6px; } .service-card .row .icon-btn svg { width: 12px; height: 12px; } +.service-card .deep-row .deep-v { display: flex; align-items: center; gap: 6px; font-family: inherit; flex-wrap: wrap; } +.service-card .dh-ok { color: var(--accent); } +.service-card .dh-fail { color: var(--error); font-weight: 500; } +.service-card .dh-run-btn { font-family: inherit; } +.service-card .deep-error { + padding: 4px 8px; + background: rgba(239, 68, 68, 0.06); + border-left: 2px solid var(--error); + border-radius: 4px; + font-family: ui-monospace, SFMono-Regular, Menlo, monospace; + font-size: 11px; + word-break: break-word; +} .service-actions { display: flex; diff --git a/package/startos/versions/v0_1_0.ts b/package/startos/versions/v0_1_0.ts index fe170d1..a8015fa 100644 --- a/package/startos/versions/v0_1_0.ts +++ b/package/startos/versions/v0_1_0.ts @@ -1,10 +1,10 @@ import { VersionInfo, IMPOSSIBLE } from '@start9labs/start-sdk' export const v0_1_0 = VersionInfo.of({ - version: '0.7.0:2', + version: '0.8.0:2', releaseNotes: { en_US: - 'v0.7: pre-flight launch validation. New "Test" button on every model card runs vLLM\'s argparse against the proposed launch command inside the running vllm_node container — without starting an engine. Catches unknown flags, bad types, and version-removed flags in about 5 seconds, before disrupting the currently-loaded model. (Runtime-only failures like the Qwen3.6 Mamba block-size assertion still only surface during a real swap, but argparse-stage bugs are now caught up front.)', + 'v0.8: deep health probes. Every 5 minutes, Spark Control sends a tiny synthetic inference request to each service (1 second of silent audio to Parakeet, short text to Magpie, 1-token completion to vLLM). All payloads are generated in-memory and never written to disk. If a probe returns CUDA-error / 5xx signals while the container is still "up" — i.e. the classic Triton-wedge pattern where /health stays green but real inference fails — Spark Control automatically restarts the affected container. Rate-limited to 3 auto-restarts per service per 30 minutes. Each service card now shows the last deep-check timestamp, latency, and an inline "Run now" button. Failures and recoveries are logged into the connectivity history with source=deep-health.', }, migrations: { up: async ({ effects }) => {},