Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 000c55febe | |||
| 6434b01a95 | |||
| 5827683a09 | |||
| ee8c2406b8 |
@@ -84,6 +84,24 @@ Other services on your LAN can hit `GET /api/endpoints` to learn where the curre
|
||||
|
||||
`base_url` is filled in whenever Configure Sparks has been completed (even if the underlying service isn't currently up). Pair the URL with `ready: true` to safely route traffic.
|
||||
|
||||
## Reporting failures from external apps
|
||||
|
||||
Spark Control polls every 5 s, so a brief blip in Parakeet/Magpie/vLLM availability can slip between polls and never make it into the connectivity log. To capture short failures, an external app (e.g. Open WebUI) can POST whenever a call fails (or succeeds):
|
||||
|
||||
```bash
|
||||
curl -X POST http://<dashboard-url>/api/health-event \
|
||||
-H 'content-type: application/json' \
|
||||
-d '{
|
||||
"service": "parakeet",
|
||||
"ok": false,
|
||||
"source": "open-webui",
|
||||
"error": "HTTP 503",
|
||||
"ms": 420
|
||||
}'
|
||||
```
|
||||
|
||||
Fields: `service` (required), `ok` (required), `source` (optional, free-form), `error` (optional), `ms` (optional latency). Each POST appends a `report` event to the connectivity log alongside the polling-based transition events.
|
||||
|
||||
## Status
|
||||
|
||||
**v0.2.3** — installed and verified on a Start9 server. Five bundled LLMs in the catalog (qwen3-vl, gemma4, qwen36, qwen3-235b-fp8, qwen2.5-72b), plus any custom models added through the UI.
|
||||
|
||||
+87
-23
@@ -1,17 +1,28 @@
|
||||
"""Track Spark up/down transitions and cache discovered MAC addresses.
|
||||
"""Track up/down transitions for any subject (Sparks AND services) and cache MACs.
|
||||
|
||||
Persisted to /data/connectivity.json so history survives package restarts:
|
||||
Persisted to /data/connectivity.json. Schema:
|
||||
|
||||
{
|
||||
"macs": { "spark1": "aa:bb:..", "spark2": "11:22:.." },
|
||||
"current": { "spark1": "up", "spark2": "down" },
|
||||
"last_change": { "spark1": "2026-05-12T15:00:00Z", ... },
|
||||
"current": { "spark1": "up", "parakeet": "up", "magpie": "down", ... },
|
||||
"last_change": { ... },
|
||||
"events": [
|
||||
{ "spark": "spark2", "at": "2026-05-12T17:30:00Z", "transition": "down" },
|
||||
{ "spark": "spark2", "at": "2026-05-12T18:45:00Z", "transition": "up", "down_seconds": 4500 },
|
||||
...
|
||||
# Active-probe transition (logged when state flips during polling)
|
||||
{ "subject": "spark2", "at": "...", "kind": "transition",
|
||||
"transition": "down" },
|
||||
{ "subject": "spark2", "at": "...", "kind": "transition",
|
||||
"transition": "up", "down_seconds": 4500 },
|
||||
|
||||
# Passive report (logged whenever an external app POSTs to
|
||||
# /api/health-event regardless of state change)
|
||||
{ "subject": "parakeet", "at": "...", "kind": "report",
|
||||
"ok": false, "source": "open-webui",
|
||||
"detail": "Connection refused", "latency_ms": 320 },
|
||||
]
|
||||
}
|
||||
|
||||
Legacy events from v0.5 with `spark` instead of `subject` and no `kind` field
|
||||
are read transparently as kind="transition".
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import json
|
||||
@@ -59,21 +70,24 @@ def load() -> dict:
|
||||
return d
|
||||
|
||||
|
||||
def record_mac(spark: str, mac: Optional[str]) -> None:
|
||||
def record_mac(subject: str, mac: Optional[str]) -> None:
|
||||
if not mac:
|
||||
return
|
||||
with _lock:
|
||||
d = _read()
|
||||
d.setdefault("macs", {})
|
||||
if d["macs"].get(spark) != mac:
|
||||
d["macs"][spark] = mac
|
||||
if d["macs"].get(subject) != mac:
|
||||
d["macs"][subject] = mac
|
||||
_write(d)
|
||||
|
||||
|
||||
def record_state(spark: str, reachable: bool) -> Optional[dict]:
|
||||
"""Update current state. If it differs from the last seen state, append an event.
|
||||
def record_state(subject: str, reachable: bool) -> Optional[dict]:
|
||||
"""Update current state for `subject`. If it differs from the last seen
|
||||
state, append a transition event. Returns the event dict if a transition
|
||||
was recorded, else None.
|
||||
|
||||
Returns the event dict if a transition was recorded, else None.
|
||||
`subject` can be a Spark host key (spark1/spark2) or a service name
|
||||
(parakeet/magpie/vllm).
|
||||
"""
|
||||
new_state = "up" if reachable else "down"
|
||||
now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
@@ -83,12 +97,17 @@ def record_state(spark: str, reachable: bool) -> Optional[dict]:
|
||||
d.setdefault("current", {})
|
||||
d.setdefault("last_change", {})
|
||||
d.setdefault("events", [])
|
||||
prev = d["current"].get(spark)
|
||||
prev = d["current"].get(subject)
|
||||
if prev == new_state:
|
||||
return None
|
||||
event: dict = {"spark": spark, "at": now, "transition": new_state}
|
||||
event: dict = {
|
||||
"subject": subject,
|
||||
"at": now,
|
||||
"kind": "transition",
|
||||
"transition": new_state,
|
||||
}
|
||||
# When we have a previous state and timestamp, compute duration
|
||||
last_change = d["last_change"].get(spark)
|
||||
last_change = d["last_change"].get(subject)
|
||||
if prev and last_change:
|
||||
try:
|
||||
prev_dt = datetime.fromisoformat(last_change.replace("Z", "+00:00"))
|
||||
@@ -99,28 +118,73 @@ def record_state(spark: str, reachable: bool) -> Optional[dict]:
|
||||
event["up_seconds"] = round(duration)
|
||||
except ValueError:
|
||||
pass
|
||||
d["current"][spark] = new_state
|
||||
d["last_change"][spark] = now
|
||||
d["current"][subject] = new_state
|
||||
d["last_change"][subject] = now
|
||||
d["events"].append(event)
|
||||
# Keep rolling window
|
||||
if len(d["events"]) > MAX_EVENTS:
|
||||
d["events"] = d["events"][-MAX_EVENTS:]
|
||||
_write(d)
|
||||
return event
|
||||
|
||||
|
||||
def get_mac(spark: str) -> Optional[str]:
|
||||
def record_report(
|
||||
subject: str,
|
||||
*,
|
||||
ok: bool,
|
||||
source: str = "external",
|
||||
detail: str = "",
|
||||
latency_ms: Optional[int] = None,
|
||||
) -> dict:
|
||||
"""Record a passive report from an external caller (e.g. Open WebUI got a
|
||||
503 calling Parakeet). Always appended to the events list; does NOT change
|
||||
the active-probe state (which only the polling probe is authoritative on).
|
||||
"""
|
||||
now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
with _lock:
|
||||
d = _read()
|
||||
d.setdefault("events", [])
|
||||
event: dict = {
|
||||
"subject": subject,
|
||||
"at": now,
|
||||
"kind": "report",
|
||||
"ok": bool(ok),
|
||||
"source": source or "external",
|
||||
}
|
||||
if detail:
|
||||
event["detail"] = detail
|
||||
if latency_ms is not None:
|
||||
event["latency_ms"] = int(latency_ms)
|
||||
d["events"].append(event)
|
||||
if len(d["events"]) > MAX_EVENTS:
|
||||
d["events"] = d["events"][-MAX_EVENTS:]
|
||||
_write(d)
|
||||
return event
|
||||
|
||||
|
||||
def get_mac(subject: str) -> Optional[str]:
|
||||
d = load()
|
||||
return d.get("macs", {}).get(spark)
|
||||
return d.get("macs", {}).get(subject)
|
||||
|
||||
|
||||
def _normalize_event(e: dict) -> dict:
|
||||
"""Promote legacy v0.5 events to the v0.6 shape so the UI sees one schema."""
|
||||
if "subject" in e:
|
||||
e.setdefault("kind", "transition")
|
||||
return e
|
||||
# Legacy: had "spark" + "transition" only
|
||||
if "spark" in e:
|
||||
e["subject"] = e.pop("spark")
|
||||
e.setdefault("kind", "transition")
|
||||
return e
|
||||
|
||||
|
||||
def summary() -> dict:
|
||||
"""Compact summary for the UI: known MACs, current state, recent events."""
|
||||
d = load()
|
||||
events = d.get("events", [])
|
||||
events = [_normalize_event(dict(e)) for e in d.get("events", [])]
|
||||
return {
|
||||
"macs": d.get("macs", {}),
|
||||
"current": d.get("current", {}),
|
||||
"last_change": d.get("last_change", {}),
|
||||
"events": events[-50:],
|
||||
"events": events[-80:],
|
||||
}
|
||||
|
||||
@@ -0,0 +1,341 @@
|
||||
"""Deep health probes for each service.
|
||||
|
||||
Why this exists: Triton's /health endpoint returns 200 as long as the HTTP
|
||||
layer is alive and the model is registered. It does NOT verify that the CUDA
|
||||
context inside the worker process is healthy. We've observed Parakeet getting
|
||||
its CUDA context wedged after an OOM, where /health stays green but every
|
||||
real transcription returns 500 cudaErrorUnknown.
|
||||
|
||||
So this module sends *real* but tiny synthetic inference requests:
|
||||
- Parakeet: 1 second of digital silence (16 kHz mono PCM, in-memory WAV)
|
||||
- Magpie: short text-to-speech, response audio discarded
|
||||
- vLLM: 1-token chat completion against whatever model is loaded
|
||||
|
||||
All synthetic payloads are generated on demand into BytesIO, sent over HTTP,
|
||||
and never touched the filesystem (on either spark-control's side or the
|
||||
target service's side beyond normal Triton/Riva working memory).
|
||||
|
||||
When a probe fails with a signal that looks like a CUDA wedge, we
|
||||
automatically issue `docker restart <container>`. Rate-limited to 3 restarts
|
||||
per service per 30 minutes to avoid restart loops.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import asyncio
|
||||
import io
|
||||
import time
|
||||
import wave
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
|
||||
from .config import Settings
|
||||
from .connectivity import record_report
|
||||
from .services import ServiceDef, run_action, services_from_settings
|
||||
|
||||
|
||||
# Default 5-minute interval, controllable via env. Sub-minute is silly for a
|
||||
# heavy synthetic probe; we just want to catch wedges within a reasonable
|
||||
# window — much faster than the user noticing on their next real call.
|
||||
DEFAULT_INTERVAL_SEC = 300.0
|
||||
PROBE_TIMEOUT_SEC = 20.0
|
||||
RESTART_RATE_LIMIT = 3 # max auto-restarts per service
|
||||
RESTART_RATE_WINDOW_SEC = 1800.0 # within a 30-min window
|
||||
RESTART_COOLDOWN_SEC = 120.0 # don't restart again within this many seconds of the last one
|
||||
STARTUP_GRACE_SEC = 60.0 # don't auto-restart for the first minute after this app boots
|
||||
|
||||
|
||||
def _silence_wav(seconds: float = 1.0, sample_rate: int = 16000) -> io.BytesIO:
|
||||
"""Return an in-memory WAV file containing `seconds` of digital silence."""
|
||||
n_frames = int(seconds * sample_rate)
|
||||
buf = io.BytesIO()
|
||||
with wave.open(buf, "wb") as w:
|
||||
w.setnchannels(1)
|
||||
w.setsampwidth(2) # int16
|
||||
w.setframerate(sample_rate)
|
||||
w.writeframes(b"\x00\x00" * n_frames)
|
||||
buf.seek(0)
|
||||
return buf
|
||||
|
||||
|
||||
def _looks_like_wedge(error: str) -> bool:
|
||||
"""Heuristic: does this error string look like a stuck CUDA context that
|
||||
a container restart would clear? We want to be conservative — only act
|
||||
on signals we're confident about, otherwise leave the user in charge."""
|
||||
err = (error or "").lower()
|
||||
needles = [
|
||||
"cudaerrorunknown",
|
||||
"cuda error: unknown",
|
||||
"cuda kernel errors",
|
||||
"internal server error",
|
||||
"engine core initialization failed",
|
||||
"503", # service unavailable from a dependency
|
||||
"500", # generic 5xx with a body that may not parse
|
||||
]
|
||||
return any(n in err for n in needles)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProbeResult:
|
||||
ok: bool
|
||||
at: str
|
||||
latency_ms: Optional[int] = None
|
||||
error: str = ""
|
||||
note: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class ServiceState:
|
||||
last: Optional[ProbeResult] = None
|
||||
last_ok_at: Optional[str] = None
|
||||
restarts: list[float] = field(default_factory=list)
|
||||
|
||||
|
||||
class DeepHealth:
|
||||
def __init__(self, settings: Settings, interval_sec: float = DEFAULT_INTERVAL_SEC) -> None:
|
||||
self.settings = settings
|
||||
self.interval_sec = interval_sec
|
||||
self.state: dict[str, ServiceState] = {
|
||||
"parakeet": ServiceState(),
|
||||
"magpie": ServiceState(),
|
||||
"vllm": ServiceState(),
|
||||
}
|
||||
self._stop = asyncio.Event()
|
||||
self._boot_at = time.monotonic()
|
||||
|
||||
# ---- probes ---------------------------------------------------------
|
||||
|
||||
async def probe_parakeet(self) -> ProbeResult:
|
||||
s = self.settings
|
||||
now_iso = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
if not s.parakeet_host:
|
||||
return ProbeResult(ok=False, at=now_iso, error="not configured")
|
||||
url = f"http://{s.parakeet_host}:{s.parakeet_port}/v1/audio/transcriptions"
|
||||
wav = _silence_wav(1.0)
|
||||
t0 = time.monotonic()
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=PROBE_TIMEOUT_SEC) as c:
|
||||
r = await c.post(
|
||||
url,
|
||||
files={"file": ("probe.wav", wav, "audio/wav")},
|
||||
data={"model": "parakeet-tdt-0.6b-v3"},
|
||||
)
|
||||
latency = round((time.monotonic() - t0) * 1000)
|
||||
if 200 <= r.status_code < 300:
|
||||
return ProbeResult(ok=True, at=now_iso, latency_ms=latency)
|
||||
return ProbeResult(
|
||||
ok=False,
|
||||
at=now_iso,
|
||||
latency_ms=latency,
|
||||
error=f"HTTP {r.status_code}: {r.text[:240]}",
|
||||
)
|
||||
except Exception as e:
|
||||
return ProbeResult(ok=False, at=now_iso, error=f"{type(e).__name__}: {e}")
|
||||
|
||||
async def probe_magpie(self) -> ProbeResult:
|
||||
s = self.settings
|
||||
now_iso = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
if not s.magpie_host:
|
||||
return ProbeResult(ok=False, at=now_iso, error="not configured")
|
||||
# Magpie /v1/audio/synthesize expects multipart form-data, not JSON.
|
||||
# The (None, value) tuple in httpx's `files=` produces a non-file form field.
|
||||
url = f"http://{s.magpie_host}:{s.magpie_port}/v1/audio/synthesize"
|
||||
form: dict = {"text": (None, "hi"), "language": (None, "en-US")}
|
||||
t0 = time.monotonic()
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=PROBE_TIMEOUT_SEC) as c:
|
||||
r = await c.post(url, files=form)
|
||||
latency = round((time.monotonic() - t0) * 1000)
|
||||
if 200 <= r.status_code < 300:
|
||||
return ProbeResult(ok=True, at=now_iso, latency_ms=latency)
|
||||
# 4xx that aren't 5xx mean server is alive but our payload is off —
|
||||
# don't classify as wedge.
|
||||
if 400 <= r.status_code < 500:
|
||||
return ProbeResult(
|
||||
ok=True,
|
||||
at=now_iso,
|
||||
latency_ms=latency,
|
||||
note=f"{r.status_code} — server alive (probe payload may need a voice name)",
|
||||
)
|
||||
return ProbeResult(
|
||||
ok=False,
|
||||
at=now_iso,
|
||||
latency_ms=latency,
|
||||
error=f"HTTP {r.status_code}: {r.text[:240]}",
|
||||
)
|
||||
except Exception as e:
|
||||
return ProbeResult(ok=False, at=now_iso, error=f"{type(e).__name__}: {e}")
|
||||
|
||||
async def probe_vllm(self) -> ProbeResult:
|
||||
s = self.settings
|
||||
now_iso = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
if not s.spark1_host:
|
||||
return ProbeResult(ok=False, at=now_iso, error="not configured")
|
||||
base = f"http://{s.spark1_host}:{s.vllm_port}"
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=5.0) as c:
|
||||
r = await c.get(f"{base}/v1/models")
|
||||
r.raise_for_status()
|
||||
models = r.json().get("data") or []
|
||||
if not models:
|
||||
return ProbeResult(ok=False, at=now_iso, error="no model loaded")
|
||||
model_id = models[0]["id"]
|
||||
except Exception as e:
|
||||
return ProbeResult(ok=False, at=now_iso, error=f"list models: {type(e).__name__}: {e}")
|
||||
t0 = time.monotonic()
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=PROBE_TIMEOUT_SEC) as c:
|
||||
r = await c.post(
|
||||
f"{base}/v1/chat/completions",
|
||||
json={
|
||||
"model": model_id,
|
||||
"messages": [{"role": "user", "content": "hi"}],
|
||||
"max_tokens": 1,
|
||||
"temperature": 0,
|
||||
},
|
||||
)
|
||||
latency = round((time.monotonic() - t0) * 1000)
|
||||
if 200 <= r.status_code < 300:
|
||||
return ProbeResult(ok=True, at=now_iso, latency_ms=latency)
|
||||
return ProbeResult(
|
||||
ok=False,
|
||||
at=now_iso,
|
||||
latency_ms=latency,
|
||||
error=f"HTTP {r.status_code}: {r.text[:240]}",
|
||||
)
|
||||
except Exception as e:
|
||||
return ProbeResult(ok=False, at=now_iso, error=f"{type(e).__name__}: {e}")
|
||||
|
||||
# ---- orchestration --------------------------------------------------
|
||||
|
||||
PROBES = {
|
||||
"parakeet": "probe_parakeet",
|
||||
"magpie": "probe_magpie",
|
||||
"vllm": "probe_vllm",
|
||||
}
|
||||
|
||||
async def run_one(self, service: str) -> ProbeResult:
|
||||
fn = getattr(self, self.PROBES[service])
|
||||
result: ProbeResult = await fn()
|
||||
st = self.state[service]
|
||||
prev_ok = st.last.ok if st.last else None
|
||||
st.last = result
|
||||
if result.ok:
|
||||
st.last_ok_at = result.at
|
||||
|
||||
# Log to connectivity history: every failure, plus the first success
|
||||
# after a failure (recovery), plus the first probe ever — but skip
|
||||
# the "still ok" steady-state to keep the log readable.
|
||||
if not result.ok:
|
||||
record_report(
|
||||
service,
|
||||
ok=False,
|
||||
source="deep-health",
|
||||
detail=result.error[:240],
|
||||
latency_ms=result.latency_ms,
|
||||
)
|
||||
elif prev_ok is False:
|
||||
record_report(
|
||||
service,
|
||||
ok=True,
|
||||
source="deep-health",
|
||||
detail="recovered" + (f" — {result.note}" if result.note else ""),
|
||||
latency_ms=result.latency_ms,
|
||||
)
|
||||
elif prev_ok is None:
|
||||
record_report(
|
||||
service,
|
||||
ok=True,
|
||||
source="deep-health",
|
||||
detail="first probe ok" + (f" — {result.note}" if result.note else ""),
|
||||
latency_ms=result.latency_ms,
|
||||
)
|
||||
|
||||
# Maybe auto-restart
|
||||
if not result.ok and _looks_like_wedge(result.error):
|
||||
await self._maybe_restart(service, result.error)
|
||||
return result
|
||||
|
||||
async def _maybe_restart(self, service: str, error: str) -> None:
|
||||
# No restarts during the boot grace period.
|
||||
if time.monotonic() - self._boot_at < STARTUP_GRACE_SEC:
|
||||
return
|
||||
st = self.state[service]
|
||||
now = time.monotonic()
|
||||
st.restarts = [t for t in st.restarts if now - t < RESTART_RATE_WINDOW_SEC]
|
||||
if st.restarts and now - st.restarts[-1] < RESTART_COOLDOWN_SEC:
|
||||
return # already restarted recently, give it time
|
||||
if len(st.restarts) >= RESTART_RATE_LIMIT:
|
||||
record_report(
|
||||
service,
|
||||
ok=False,
|
||||
source="deep-health",
|
||||
detail=f"rate-limited; not auto-restarting (would be #{len(st.restarts)+1} in 30 min)",
|
||||
)
|
||||
return
|
||||
services = services_from_settings(self.settings)
|
||||
if service not in services:
|
||||
return
|
||||
svc = services[service]
|
||||
if not svc.host or not svc.user:
|
||||
return
|
||||
result = await run_action(self.settings, svc, "restart")
|
||||
st.restarts.append(now)
|
||||
ok = result.get("ok", False)
|
||||
record_report(
|
||||
service,
|
||||
ok=False,
|
||||
source="deep-health",
|
||||
detail=f"auto-restart triggered (wedge: {error[:120]}); restart {'OK' if ok else 'FAILED'}",
|
||||
)
|
||||
|
||||
async def run_all(self) -> dict[str, ProbeResult]:
|
||||
results = {}
|
||||
for name in self.PROBES:
|
||||
results[name] = await self.run_one(name)
|
||||
return results
|
||||
|
||||
async def run_periodic(self) -> None:
|
||||
"""Long-running loop. Cancel via .stop()."""
|
||||
# Brief initial wait to let app finish startup
|
||||
try:
|
||||
await asyncio.wait_for(self._stop.wait(), timeout=10.0)
|
||||
return
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
while not self._stop.is_set():
|
||||
try:
|
||||
await self.run_all()
|
||||
except Exception:
|
||||
# Never let the loop die; the periodic check is best-effort
|
||||
pass
|
||||
try:
|
||||
await asyncio.wait_for(self._stop.wait(), timeout=self.interval_sec)
|
||||
return
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
|
||||
def stop(self) -> None:
|
||||
self._stop.set()
|
||||
|
||||
def summary(self) -> dict:
|
||||
out = {}
|
||||
for name, st in self.state.items():
|
||||
last = st.last
|
||||
out[name] = {
|
||||
"last_ok_at": st.last_ok_at,
|
||||
"last": (
|
||||
{
|
||||
"ok": last.ok,
|
||||
"at": last.at,
|
||||
"latency_ms": last.latency_ms,
|
||||
"error": last.error,
|
||||
"note": last.note,
|
||||
}
|
||||
if last
|
||||
else None
|
||||
),
|
||||
"auto_restarts_window": len(st.restarts),
|
||||
}
|
||||
return out
|
||||
+83
-1
@@ -10,8 +10,9 @@ from pydantic import BaseModel
|
||||
from typing import Literal
|
||||
|
||||
from .config import Settings
|
||||
from .connectivity import get_mac, summary as connectivity_summary
|
||||
from .connectivity import get_mac, record_report, record_state, summary as connectivity_summary
|
||||
from .custom_services import add_custom_service, delete_custom_service
|
||||
from .deep_health import DeepHealth
|
||||
from .download import DownloadManager
|
||||
from .hardware import HardwareProbe
|
||||
from .health import check_magpie, check_parakeet, check_vllm
|
||||
@@ -22,6 +23,7 @@ from .services import docker_state, run_action, services_from_settings
|
||||
from .ssh import ssh_run
|
||||
from .swap import SwapManager
|
||||
from .updates import UpdateManager, get_update_status
|
||||
from .validate import validate_launch
|
||||
from .wol import send_local_broadcast, send_via_peer
|
||||
|
||||
|
||||
@@ -32,9 +34,22 @@ download_manager = DownloadManager(settings)
|
||||
update_manager = UpdateManager(settings)
|
||||
hardware_probe = HardwareProbe(settings)
|
||||
nim_manager = NimManager(settings)
|
||||
deep_health = DeepHealth(settings)
|
||||
|
||||
app = FastAPI(title="spark-control", version="0.1.0")
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def _start_deep_health() -> None:
|
||||
# Fire-and-forget; the loop catches its own exceptions.
|
||||
asyncio.create_task(deep_health.run_periodic())
|
||||
|
||||
|
||||
@app.on_event("shutdown")
|
||||
async def _stop_deep_health() -> None:
|
||||
deep_health.stop()
|
||||
|
||||
|
||||
_STATIC_DIR = Path(__file__).resolve().parent / "static"
|
||||
app.mount("/static", StaticFiles(directory=_STATIC_DIR), name="static")
|
||||
|
||||
@@ -136,6 +151,58 @@ async def get_connectivity() -> dict:
|
||||
return connectivity_summary()
|
||||
|
||||
|
||||
@app.get("/api/deep-health")
|
||||
async def get_deep_health() -> dict:
|
||||
"""Last result + auto-restart counters for each service's synthetic probe."""
|
||||
return deep_health.summary()
|
||||
|
||||
|
||||
@app.post("/api/deep-health/{service}/run")
|
||||
async def run_deep_health(service: str) -> dict:
|
||||
"""Manually run a single service's deep-health probe right now."""
|
||||
if service not in deep_health.PROBES:
|
||||
raise HTTPException(404, f"unknown service: {service}")
|
||||
result = await deep_health.run_one(service)
|
||||
return {
|
||||
"ok": result.ok,
|
||||
"at": result.at,
|
||||
"latency_ms": result.latency_ms,
|
||||
"error": result.error,
|
||||
"note": result.note,
|
||||
}
|
||||
|
||||
|
||||
class HealthEventBody(BaseModel):
|
||||
service: str # e.g. "parakeet", "magpie", "vllm"
|
||||
ok: bool # true on success, false on failure
|
||||
source: str | None = None # what app reported (e.g. "open-webui")
|
||||
error: str | None = None # optional detail
|
||||
ms: int | None = None # optional latency
|
||||
|
||||
|
||||
@app.post("/api/health-event")
|
||||
async def post_health_event(body: HealthEventBody) -> dict:
|
||||
"""Passive endpoint: any LAN app can POST here when its call to one of our
|
||||
services succeeds or (more usefully) fails. We log the report into the
|
||||
connectivity history so a brief blip that polling misses still surfaces.
|
||||
|
||||
Example:
|
||||
curl -X POST http://<dashboard>/api/health-event \\
|
||||
-H 'content-type: application/json' \\
|
||||
-d '{"service":"parakeet","ok":false,"error":"503","source":"open-webui","ms":420}'
|
||||
"""
|
||||
if not body.service.strip():
|
||||
raise HTTPException(400, "service is required")
|
||||
event = record_report(
|
||||
body.service.strip(),
|
||||
ok=body.ok,
|
||||
source=(body.source or "external").strip(),
|
||||
detail=(body.error or "").strip(),
|
||||
latency_ms=body.ms,
|
||||
)
|
||||
return {"ok": True, "recorded": event}
|
||||
|
||||
|
||||
@app.post("/api/spark/{name}/wake")
|
||||
async def wake_spark(name: str) -> dict:
|
||||
"""Send a Wake-on-LAN magic packet for the named Spark.
|
||||
@@ -216,6 +283,8 @@ async def get_services() -> dict:
|
||||
results = await asyncio.gather(*[one(n) for n in services.keys()])
|
||||
for name, info in results:
|
||||
out[name] = info
|
||||
# Feed http reachability into the connectivity log (transition-only)
|
||||
record_state(name, bool(info.get("http_ready")))
|
||||
return out
|
||||
|
||||
|
||||
@@ -372,6 +441,10 @@ async def get_status() -> dict:
|
||||
check_parakeet(settings),
|
||||
check_magpie(settings),
|
||||
)
|
||||
# Feed health into the connectivity log (deduped — only logs on transition)
|
||||
record_state("vllm", bool(vllm.get("ok")))
|
||||
record_state("parakeet", bool(parakeet.get("ok")))
|
||||
record_state("magpie", bool(magpie.get("ok")))
|
||||
current_key = _identify_current_model(vllm.get("current_model"))
|
||||
return {
|
||||
"configured": settings.configured,
|
||||
@@ -397,6 +470,15 @@ class SwapRequest(BaseModel):
|
||||
dry_run: bool = False
|
||||
|
||||
|
||||
@app.post("/api/swap/{key}/validate")
|
||||
async def validate_swap(key: str) -> dict:
|
||||
"""Pre-flight check: run vLLM's argparse layer against the proposed launch
|
||||
command WITHOUT starting an engine. Cheap (~5 s) and doesn't disturb the
|
||||
currently-loaded model.
|
||||
"""
|
||||
return await validate_launch(key, catalog, settings)
|
||||
|
||||
|
||||
@app.post("/api/swap")
|
||||
async def post_swap(req: SwapRequest) -> dict:
|
||||
if not settings.configured and not req.dry_run:
|
||||
|
||||
+140
-15
@@ -17,6 +17,7 @@ const state = {
|
||||
config: {},
|
||||
configured: true,
|
||||
timer_handle: null,
|
||||
deep_health: {},
|
||||
};
|
||||
|
||||
const el = (sel) => document.querySelector(sel);
|
||||
@@ -73,8 +74,10 @@ function renderCards() {
|
||||
<button class="btn ${isActive ? '' : 'primary'}" data-swap-key="${key}" ${isActive || isSwapping ? 'disabled' : ''}>
|
||||
${isActive ? 'Current' : 'Switch to this'}
|
||||
</button>
|
||||
<button class="btn test-btn" data-test-key="${key}" title="Pre-flight check the launch command without starting the engine">Test</button>
|
||||
<button class="btn adv-btn" data-adv-key="${key}" title="Advanced settings">Advanced</button>
|
||||
</div>
|
||||
<div class="test-result hidden" data-test-result-for="${key}"></div>
|
||||
`;
|
||||
root.appendChild(card);
|
||||
}
|
||||
@@ -84,6 +87,37 @@ function renderCards() {
|
||||
for (const btn of root.querySelectorAll('[data-adv-key]')) {
|
||||
btn.addEventListener('click', () => openAdvanced(btn.dataset.advKey));
|
||||
}
|
||||
for (const btn of root.querySelectorAll('[data-test-key]')) {
|
||||
btn.addEventListener('click', () => testLaunch(btn.dataset.testKey, btn));
|
||||
}
|
||||
}
|
||||
|
||||
async function testLaunch(key, btn) {
|
||||
const resultEl = document.querySelector(`[data-test-result-for="${key}"]`);
|
||||
if (!resultEl) return;
|
||||
const originalText = btn.textContent;
|
||||
btn.disabled = true;
|
||||
btn.textContent = 'Testing…';
|
||||
resultEl.classList.remove('hidden', 'ok', 'fail');
|
||||
resultEl.innerHTML = '<span class="muted small">Checking launch args against vLLM\'s parser…</span>';
|
||||
try {
|
||||
const r = await fetchJSON(`/api/swap/${encodeURIComponent(key)}/validate`, { method: 'POST' });
|
||||
if (r.ok) {
|
||||
resultEl.classList.add('ok');
|
||||
resultEl.innerHTML = `<span class="ok-mark">✓</span> Launch args parse OK. <span class="muted small">(Doesn't guarantee runtime success — only catches argparse-level issues.)</span>`;
|
||||
} else {
|
||||
resultEl.classList.add('fail');
|
||||
const err = escapeHtml(r.error || 'unknown error');
|
||||
const stage = r.stage ? ` <span class="muted small">(${escapeHtml(r.stage)})</span>` : '';
|
||||
resultEl.innerHTML = `<span class="fail-mark">✗</span> Would fail: ${err}${stage}`;
|
||||
}
|
||||
} catch (e) {
|
||||
resultEl.classList.add('fail');
|
||||
resultEl.innerHTML = `<span class="fail-mark">✗</span> Test failed: ${escapeHtml(e.message)}`;
|
||||
} finally {
|
||||
btn.disabled = false;
|
||||
btn.textContent = originalText;
|
||||
}
|
||||
}
|
||||
|
||||
function renderCurrent(status) {
|
||||
@@ -146,28 +180,42 @@ function openConnectivityDialog() {
|
||||
const c = state.connectivity || {};
|
||||
const events = c.events || [];
|
||||
if (events.length === 0) {
|
||||
content.innerHTML = '<div class="muted small">No transitions recorded yet. Once a Spark goes down and comes back, you\'ll see entries here.</div>';
|
||||
content.innerHTML = '<div class="muted small">No events recorded yet. Once a Spark or service goes down and back up (or an external app reports a failure), entries appear here.</div>';
|
||||
dlg.showModal();
|
||||
return;
|
||||
}
|
||||
const bySpark = {};
|
||||
const bySubject = {};
|
||||
for (const e of events) {
|
||||
(bySpark[e.spark] = bySpark[e.spark] || []).push(e);
|
||||
const subj = e.subject || e.spark || 'unknown'; // legacy fallback
|
||||
(bySubject[subj] = bySubject[subj] || []).push(e);
|
||||
}
|
||||
const html = Object.entries(bySpark).map(([spark, evs]) => {
|
||||
const downs = evs.filter(e => e.transition === 'down').length;
|
||||
const mac = c.macs?.[spark];
|
||||
// Sort subjects: hosts first, then services, alphabetical
|
||||
const hostOrder = ['spark1', 'spark2'];
|
||||
const subjects = Object.keys(bySubject).sort((a, b) => {
|
||||
const ia = hostOrder.indexOf(a);
|
||||
const ib = hostOrder.indexOf(b);
|
||||
if (ia >= 0 && ib >= 0) return ia - ib;
|
||||
if (ia >= 0) return -1;
|
||||
if (ib >= 0) return 1;
|
||||
return a.localeCompare(b);
|
||||
});
|
||||
|
||||
const html = subjects.map((subj) => {
|
||||
const evs = bySubject[subj];
|
||||
const transitions = evs.filter(e => (e.kind || 'transition') === 'transition');
|
||||
const reports = evs.filter(e => e.kind === 'report');
|
||||
const downs = transitions.filter(e => e.transition === 'down').length;
|
||||
const failedReports = reports.filter(e => !e.ok).length;
|
||||
const mac = c.macs?.[subj];
|
||||
const summaryParts = [];
|
||||
if (transitions.length) summaryParts.push(`${transitions.length} probe transition${transitions.length===1?'':'s'} (${downs} down)`);
|
||||
if (reports.length) summaryParts.push(`${reports.length} app report${reports.length===1?'':'s'} (${failedReports} failed)`);
|
||||
const isHost = hostOrder.includes(subj);
|
||||
return `
|
||||
<div class="conn-spark">
|
||||
<h4>${escapeHtml(spark)}${mac ? ` <span class="muted small">${escapeHtml(mac)}</span>` : ''}</h4>
|
||||
<div class="conn-summary">${evs.length} transition${evs.length===1?'':'s'} · ${downs} down event${downs===1?'':'s'} in window</div>
|
||||
${evs.slice(-25).reverse().map(e => `
|
||||
<div class="conn-event ${e.transition}">
|
||||
<span class="when">${escapeHtml(e.at.replace('T', ' ').replace('Z', ''))}</span>
|
||||
<span class="what">${e.transition === 'up' ? '↑ came back online' : '↓ dropped offline'}</span>
|
||||
<span class="dur">${e.down_seconds != null ? `was down ${fmtDuration(e.down_seconds)}` : ''}${e.up_seconds != null ? `was up ${fmtDuration(e.up_seconds)}` : ''}</span>
|
||||
</div>
|
||||
`).join('')}
|
||||
<h4>${escapeHtml(subj)}${isHost ? ' <span class="muted small">[host]</span>' : ' <span class="muted small">[service]</span>'}${mac ? ` <span class="muted small">${escapeHtml(mac)}</span>` : ''}</h4>
|
||||
<div class="conn-summary">${summaryParts.join(' · ') || 'no events'}</div>
|
||||
${evs.slice(-30).reverse().map(e => renderConnEvent(e)).join('')}
|
||||
</div>
|
||||
`;
|
||||
}).join('');
|
||||
@@ -175,6 +223,33 @@ function openConnectivityDialog() {
|
||||
dlg.showModal();
|
||||
}
|
||||
|
||||
function renderConnEvent(e) {
|
||||
const when = escapeHtml((e.at || '').replace('T', ' ').replace('Z', ''));
|
||||
const kind = e.kind || 'transition';
|
||||
if (kind === 'report') {
|
||||
const ok = !!e.ok;
|
||||
const source = escapeHtml(e.source || 'external');
|
||||
const detail = e.detail ? ` — ${escapeHtml(e.detail)}` : '';
|
||||
const latency = e.latency_ms != null ? ` (${e.latency_ms} ms)` : '';
|
||||
return `
|
||||
<div class="conn-event ${ok ? 'up' : 'down'} report">
|
||||
<span class="when">${when}</span>
|
||||
<span class="what">${ok ? '◷ report: ok' : '◷ report: failed'} <span class="muted">from</span> ${source}${detail}</span>
|
||||
<span class="dur">${latency}</span>
|
||||
</div>
|
||||
`;
|
||||
}
|
||||
const down = e.down_seconds != null ? `was down ${fmtDuration(e.down_seconds)}` : '';
|
||||
const up = e.up_seconds != null ? `was up ${fmtDuration(e.up_seconds)}` : '';
|
||||
return `
|
||||
<div class="conn-event ${e.transition}">
|
||||
<span class="when">${when}</span>
|
||||
<span class="what">${e.transition === 'up' ? '↑ came back online' : '↓ dropped offline'}</span>
|
||||
<span class="dur">${down}${up}</span>
|
||||
</div>
|
||||
`;
|
||||
}
|
||||
|
||||
async function wakeSpark(name) {
|
||||
try {
|
||||
const r = await fetchJSON(`/api/spark/${name}/wake`, { method: 'POST' });
|
||||
@@ -339,6 +414,35 @@ async function renderServices() {
|
||||
const restartsRow = s.restart_count != null && s.restart_count > 1
|
||||
? `<div class="row"><span class="k">Restarts</span><span class="v">${s.restart_count}</span></div>`
|
||||
: '';
|
||||
const dh = state.deep_health?.[name];
|
||||
let deepRow = '';
|
||||
if (dh && dh.last) {
|
||||
const last = dh.last;
|
||||
const when = (last.at || '').slice(11, 19); // HH:MM:SS
|
||||
const verdict = last.ok
|
||||
? `<span class="dh-ok">deep check ok</span>`
|
||||
: `<span class="dh-fail">deep check FAILED</span>`;
|
||||
const lat = last.latency_ms != null ? ` <span class="muted">${last.latency_ms} ms</span>` : '';
|
||||
const restarts = dh.auto_restarts_window > 0
|
||||
? ` <span class="muted">· ${dh.auto_restarts_window} auto-restart${dh.auto_restarts_window === 1 ? '' : 's'} in 30 min</span>`
|
||||
: '';
|
||||
deepRow = `
|
||||
<div class="row deep-row">
|
||||
<span class="k">Deep</span>
|
||||
<span class="v deep-v">${verdict} <span class="muted small">${escapeHtml(when)}</span>${lat}${restarts}</span>
|
||||
<button class="icon-btn dh-run-btn" data-dh-run="${escapeHtml(name)}" title="Run deep check now">↻</button>
|
||||
</div>
|
||||
${last.ok ? '' : `<div class="deep-error muted small">${escapeHtml((last.error || last.note || '').slice(0, 200))}</div>`}
|
||||
`;
|
||||
} else if (dh) {
|
||||
deepRow = `
|
||||
<div class="row deep-row">
|
||||
<span class="k">Deep</span>
|
||||
<span class="v muted-v">no probe yet</span>
|
||||
<button class="icon-btn dh-run-btn" data-dh-run="${escapeHtml(name)}" title="Run deep check now">↻</button>
|
||||
</div>
|
||||
`;
|
||||
}
|
||||
card.innerHTML = `
|
||||
<div class="head">
|
||||
<span class="name">${escapeHtml(name)}</span>
|
||||
@@ -349,6 +453,7 @@ async function renderServices() {
|
||||
${urlRow}
|
||||
${modelRow}
|
||||
${restartsRow}
|
||||
${deepRow}
|
||||
<div class="service-actions">
|
||||
<button class="btn" data-svc-action="${name}:start" ${disable('start') ? 'disabled' : ''}>Start</button>
|
||||
<button class="btn" data-svc-action="${name}:restart" ${disable('restart') ? 'disabled' : ''}>Restart</button>
|
||||
@@ -360,6 +465,25 @@ async function renderServices() {
|
||||
for (const btn of grid.querySelectorAll('.btn[data-svc-action]')) {
|
||||
btn.addEventListener('click', () => onServiceAction(btn.dataset.svcAction));
|
||||
}
|
||||
for (const btn of grid.querySelectorAll('[data-dh-run]')) {
|
||||
btn.addEventListener('click', () => onDeepHealthRun(btn.dataset.dhRun, btn));
|
||||
}
|
||||
}
|
||||
|
||||
async function onDeepHealthRun(name, btn) {
|
||||
btn.disabled = true;
|
||||
const orig = btn.textContent;
|
||||
btn.textContent = '…';
|
||||
try {
|
||||
await fetchJSON(`/api/deep-health/${encodeURIComponent(name)}/run`, { method: 'POST' });
|
||||
} catch (e) {
|
||||
console.warn('deep-health run failed', e);
|
||||
} finally {
|
||||
try { state.deep_health = await fetchJSON('/api/deep-health'); } catch {}
|
||||
btn.textContent = orig;
|
||||
btn.disabled = false;
|
||||
renderServices();
|
||||
}
|
||||
}
|
||||
|
||||
async function onServiceAction(key) {
|
||||
@@ -594,6 +718,7 @@ async function pollStatus() {
|
||||
// Refresh services state lazily — every 5s poll triggers this too.
|
||||
try {
|
||||
state.services = await fetchJSON('/api/services');
|
||||
try { state.deep_health = await fetchJSON('/api/deep-health'); } catch {}
|
||||
renderServices();
|
||||
} catch {}
|
||||
if (status.current_swap_job && status.current_swap_job !== state.swap_job_id) {
|
||||
|
||||
@@ -411,6 +411,8 @@ main {
|
||||
.conn-event .what { flex: 1; }
|
||||
.conn-event.up .what { color: var(--accent); }
|
||||
.conn-event.down .what { color: var(--error); }
|
||||
.conn-event.report .what { font-style: italic; }
|
||||
.conn-event .muted { color: var(--muted); font-style: normal; }
|
||||
.conn-event .dur { color: var(--muted); }
|
||||
.conn-summary { color: var(--muted); font-size: 11px; padding: 4px 0 10px; }
|
||||
.hw-metric { display: flex; align-items: center; gap: 10px; font-size: 12px; }
|
||||
@@ -620,6 +622,19 @@ main {
|
||||
.service-card .row .v.copyable.copied { outline: 1px solid var(--accent); background: rgba(74, 222, 128, 0.05); }
|
||||
.service-card .row .icon-btn { padding: 3px 6px; }
|
||||
.service-card .row .icon-btn svg { width: 12px; height: 12px; }
|
||||
.service-card .deep-row .deep-v { display: flex; align-items: center; gap: 6px; font-family: inherit; flex-wrap: wrap; }
|
||||
.service-card .dh-ok { color: var(--accent); }
|
||||
.service-card .dh-fail { color: var(--error); font-weight: 500; }
|
||||
.service-card .dh-run-btn { font-family: inherit; }
|
||||
.service-card .deep-error {
|
||||
padding: 4px 8px;
|
||||
background: rgba(239, 68, 68, 0.06);
|
||||
border-left: 2px solid var(--error);
|
||||
border-radius: 4px;
|
||||
font-family: ui-monospace, SFMono-Regular, Menlo, monospace;
|
||||
font-size: 11px;
|
||||
word-break: break-word;
|
||||
}
|
||||
|
||||
.service-actions {
|
||||
display: flex;
|
||||
@@ -699,9 +714,24 @@ main {
|
||||
.card.active .btn { background: rgba(74, 222, 128, 0.12); color: var(--accent); border-color: rgba(74, 222, 128, 0.4); }
|
||||
.card-actions { display: flex; gap: 6px; }
|
||||
.card-actions .btn.primary { flex: 1; }
|
||||
.card .adv-btn { padding: 8px 12px; font-size: 12px; }
|
||||
.card .adv-btn,
|
||||
.card .test-btn { padding: 8px 12px; font-size: 12px; }
|
||||
.card .custom-pill { color: var(--info); border-color: rgba(96, 165, 250, 0.4); }
|
||||
|
||||
.test-result {
|
||||
font-size: 12px;
|
||||
line-height: 1.45;
|
||||
padding: 8px 10px;
|
||||
border-radius: 5px;
|
||||
margin-top: 4px;
|
||||
border: 1px solid var(--border);
|
||||
background: var(--surface-2);
|
||||
}
|
||||
.test-result.ok { border-color: rgba(74, 222, 128, 0.4); background: rgba(74, 222, 128, 0.04); }
|
||||
.test-result.fail { border-color: rgba(239, 68, 68, 0.45); background: rgba(239, 68, 68, 0.06); word-break: break-word; }
|
||||
.test-result .ok-mark { color: var(--accent); font-weight: 600; }
|
||||
.test-result .fail-mark { color: var(--error); font-weight: 600; }
|
||||
|
||||
.footer {
|
||||
margin-top: 28px;
|
||||
padding-top: 16px;
|
||||
|
||||
@@ -0,0 +1,137 @@
|
||||
"""Pre-flight validation of a proposed vLLM launch command.
|
||||
|
||||
Runs vLLM's own argparse layer (EngineArgs) inside the vllm_node container WITHOUT
|
||||
starting the engine. Catches:
|
||||
|
||||
* unknown flag names (typos)
|
||||
* bad types / values that argparse rejects
|
||||
* deprecated flags removed in the installed vLLM version
|
||||
|
||||
Does NOT catch (these surface only during real engine init):
|
||||
* model-architecture-specific constraints (e.g. Qwen3.6 Mamba block_size)
|
||||
* OOM at weight-loading time
|
||||
* Triton / CUDA-kernel compatibility errors
|
||||
|
||||
A pre-flight check that returns "ok" is therefore NOT a guarantee — but a
|
||||
"failed" verdict is a definitive 'don't bother with the real swap'.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import json
|
||||
import shlex
|
||||
from typing import Any
|
||||
|
||||
from .config import Settings
|
||||
from .models import Catalog, build_launch_command
|
||||
from .ssh import ssh_run
|
||||
|
||||
|
||||
# Validates the proposed args against the same combined parser vLLM uses for
|
||||
# `vllm serve` (engine args + server args + frontend args). Returns one JSON
|
||||
# line on stdout: {"ok": true, ...} or {"ok": false, ...}.
|
||||
_VALIDATOR_SCRIPT = r"""
|
||||
import argparse, json, sys
|
||||
|
||||
# Mirror what `vllm serve` does internally: FlexibleArgumentParser (which is
|
||||
# more lenient about dashes vs underscores) wrapped with make_arg_parser
|
||||
# (which adds engine + server + frontend args).
|
||||
parser = None
|
||||
try:
|
||||
# Newer vLLM path
|
||||
from vllm.utils.argparse_utils import FlexibleArgumentParser
|
||||
except Exception:
|
||||
try:
|
||||
# Older fallback
|
||||
from vllm.engine.arg_utils import FlexibleArgumentParser
|
||||
except Exception:
|
||||
FlexibleArgumentParser = argparse.ArgumentParser # type: ignore
|
||||
|
||||
try:
|
||||
from vllm.entrypoints.openai.cli_args import make_arg_parser
|
||||
parser = make_arg_parser(FlexibleArgumentParser(add_help=False))
|
||||
except Exception:
|
||||
pass
|
||||
if parser is None:
|
||||
try:
|
||||
from vllm.engine.arg_utils import EngineArgs
|
||||
parser = FlexibleArgumentParser(add_help=False)
|
||||
EngineArgs.add_cli_args(parser)
|
||||
except Exception as e:
|
||||
print(json.dumps({"ok": False, "stage": "import", "error": f"{type(e).__name__}: {e}"}))
|
||||
sys.exit(0)
|
||||
|
||||
class _ArgError(Exception):
|
||||
pass
|
||||
|
||||
def _err(message):
|
||||
raise _ArgError(message)
|
||||
|
||||
parser.error = _err # capture argparse errors instead of sys.exit(2)
|
||||
|
||||
try:
|
||||
raw = sys.stdin.read()
|
||||
arglist = json.loads(raw)
|
||||
ns = parser.parse_args(arglist)
|
||||
print(json.dumps({"ok": True, "model": getattr(ns, "model", None)}))
|
||||
except _ArgError as e:
|
||||
print(json.dumps({"ok": False, "stage": "parse", "error": str(e)}))
|
||||
except SystemExit as e:
|
||||
print(json.dumps({"ok": False, "stage": "parse", "error": f"argparse exit {e.code}"}))
|
||||
except Exception as e:
|
||||
print(json.dumps({"ok": False, "stage": "parse", "error": f"{type(e).__name__}: {e}"}))
|
||||
"""
|
||||
|
||||
|
||||
def _vllm_arg_list(key: str, model_def, catalog: Catalog) -> list[str]:
|
||||
"""Reconstruct the args list passed to `vllm serve` (without the positional model)."""
|
||||
cmd = build_launch_command(key, model_def, catalog.defaults)
|
||||
# build_launch_command yields:
|
||||
# ./launch-cluster.sh [--solo] -d exec vllm serve <repo> <args...>
|
||||
# We just want the bits after `vllm serve <repo>`.
|
||||
tokens = shlex.split(cmd)
|
||||
if "serve" not in tokens:
|
||||
return []
|
||||
i = tokens.index("serve")
|
||||
after = tokens[i + 1 :] # repo, then args
|
||||
if not after:
|
||||
return []
|
||||
args = after[1:] # drop the repo
|
||||
# EngineArgs expects --model=REPO rather than positional, so prepend it.
|
||||
return [f"--model={after[0]}", *args]
|
||||
|
||||
|
||||
async def validate_launch(key: str, catalog: Catalog, settings: Settings) -> dict:
|
||||
if key not in catalog.models:
|
||||
return {"ok": False, "stage": "lookup", "error": f"unknown model: {key}"}
|
||||
if not settings.spark1_host or not settings.spark1_user:
|
||||
return {"ok": False, "stage": "config", "error": "spark1 not configured"}
|
||||
|
||||
model = catalog.models[key]
|
||||
arg_list = _vllm_arg_list(key, model, catalog)
|
||||
if not arg_list:
|
||||
return {"ok": False, "stage": "build", "error": "failed to build args list"}
|
||||
|
||||
payload = json.dumps(arg_list).replace("'", "'\\''")
|
||||
# Pipe the JSON args list to a here-doc Python invocation. The validator
|
||||
# reads from stdin to avoid shell-escaping the args themselves.
|
||||
cmd = (
|
||||
f"echo '{payload}' | docker exec -i vllm_node python3 -c "
|
||||
+ shlex.quote(_VALIDATOR_SCRIPT)
|
||||
)
|
||||
|
||||
rc, out, err = await ssh_run(settings.spark1_host, settings.spark1_user, cmd, settings, timeout=20)
|
||||
if rc != 0 and not out.strip():
|
||||
return {
|
||||
"ok": False,
|
||||
"stage": "ssh",
|
||||
"error": err.strip() or f"rc={rc}",
|
||||
"cmd_args": arg_list,
|
||||
"launch_cmd": build_launch_command(key, model, catalog.defaults),
|
||||
}
|
||||
last = out.strip().splitlines()[-1] if out.strip() else ""
|
||||
try:
|
||||
result: dict[str, Any] = json.loads(last)
|
||||
except json.JSONDecodeError:
|
||||
result = {"ok": False, "stage": "decode", "error": "validator did not return JSON", "raw": out[-500:]}
|
||||
result["cmd_args"] = arg_list
|
||||
result["launch_cmd"] = build_launch_command(key, model, catalog.defaults)
|
||||
return result
|
||||
@@ -66,6 +66,7 @@ models:
|
||||
vllm_args:
|
||||
- --gpu-memory-utilization=0.85
|
||||
- --max-model-len=65536
|
||||
- --max-num-batched-tokens=16384
|
||||
- --reasoning-parser=qwen3
|
||||
- --moe_backend=flashinfer_cutlass
|
||||
- --load-format=fastsafetensors
|
||||
|
||||
@@ -20,6 +20,10 @@ The trick is the `docker run --rm alpine chown` — it runs as root inside the t
|
||||
|
||||
This flag is Blackwell-specific. If vLLM in the container reports `unrecognized arguments: --moe_backend` or similar, edit `models.yaml` for `qwen36` and drop that flag. The swap UI does NOT auto-fallback in v0.1 — failure surfaces in the log stream.
|
||||
|
||||
## Qwen3.6 Mamba block-size assertion (fixed in v0.6.0:1)
|
||||
|
||||
Qwen3.6 uses a Mamba-attention hybrid that requires `--max-num-batched-tokens >= 2096`. vLLM's default is 2048, which trips `AssertionError: In Mamba cache align mode, block_size (2096) must be <= max_num_batched_tokens (2048)`. Fix: bake `--max-num-batched-tokens=16384` into the bundled qwen36 entry — matches the upstream qwen3.5-35b-a3b-fp8 recipe.
|
||||
|
||||
## Two SSH paths to Spark 1 from the laptop
|
||||
|
||||
`ssh <spark-user>@<spark-1-ip>` does NOT work from the laptop because the NVIDIA Sync ssh_config only has a Host entry for `<spark-1-host>.local`. Always use the `.local` hostname or `<spark-2-ip>`-style entries that ARE matched.
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import { VersionInfo, IMPOSSIBLE } from '@start9labs/start-sdk'
|
||||
|
||||
export const v0_1_0 = VersionInfo.of({
|
||||
version: '0.5.0:0',
|
||||
version: '0.8.0:2',
|
||||
releaseNotes: {
|
||||
en_US:
|
||||
'v0.5: Wake-on-LAN + connectivity history. Each Spark\'s MAC is now auto-discovered during the normal hardware sweep and cached in /data/connectivity.json. Up/down transitions are logged with duration. Unreachable hardware cards get a "Wake (WoL)" button that sends a magic packet (preferring the other Spark as the sender so it originates on the right LAN segment). New "Connectivity log" button in the hardware section shows the recent transitions for each Spark — useful for spotting patterns (e.g. always-at-noon dropouts).',
|
||||
'v0.8: deep health probes. Every 5 minutes, Spark Control sends a tiny synthetic inference request to each service (1 second of silent audio to Parakeet, short text to Magpie, 1-token completion to vLLM). All payloads are generated in-memory and never written to disk. If a probe returns CUDA-error / 5xx signals while the container is still "up" — i.e. the classic Triton-wedge pattern where /health stays green but real inference fails — Spark Control automatically restarts the affected container. Rate-limited to 3 auto-restarts per service per 30 minutes. Each service card now shows the last deep-check timestamp, latency, and an inline "Run now" button. Failures and recoveries are logged into the connectivity history with source=deep-health.',
|
||||
},
|
||||
migrations: {
|
||||
up: async ({ effects }) => {},
|
||||
|
||||
Reference in New Issue
Block a user