9 Commits

Author SHA1 Message Date
Keysat 9ff7ee9c1e v0.8.1:0 - delete model weights from disk via card trash icon
Each model card now shows whether its weights are present on disk
(with GB size) or not yet downloaded. When present and the model
isn't currently loaded, a trash icon appears; clicking it pops a
confirmation showing exactly how many GB will be freed and on
which Spark(s), then runs rm -rf on the HF cache directory via SSH.

Cluster-mode models are removed from both Sparks; solo-mode from
Spark 1 only. Safety rails: refuses to delete the currently-loaded
model, refuses during an in-flight swap or download, and the
catalog entry stays intact so it can be re-downloaded anytime.

Backend:
  - new image/app/disk.py: probe_disk + delete_from_disk over SSH
  - GET  /api/models/disk-status — parallel probe across all catalog models
  - DELETE /api/models/{key}/disk — guarded rm -rf, logs to connectivity events

Frontend:
  - on-disk / not-downloaded pills on every card
  - trash icon-btn in card-actions row (hidden when not on disk)
  - confirmation dialog showing per-host bytes-to-free
  - disk-status re-checked every 60s

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 17:07:20 -05:00
Grant 1602b3b3b4 v0.8.0:4 - vLLM deep-health: 'no model loaded' is idle, not a wedge
Previously a ConnectError on /v1/models classified vLLM as failing, which would feed into the wedge auto-restart heuristic. But when no model is loaded (the normal idle state between swaps, or after a failed swap leaves the vllm_node container up with no process serving), nothing is listening on 8888 — that's by design, not a wedge.

The vLLM probe now does a two-step check:
  1. GET /v1/models. ConnectError or empty list -> ok=true with note='no model currently loaded (idle)'. No auto-restart triggered (it wouldn't help anyway — restarting vllm_node kills any loaded model and doesn't load a new one).
  2. If a model is loaded, POST 1-token chat completion. A 5xx here is a genuine wedge worth restarting for.

Result: deep-health correctly reports 'no model loaded' as informational rather than flagging it as a failure. Auto-restart for vLLM only fires when a model is actually loaded AND inference fails — the right semantics.
2026-05-12 14:50:00 -05:00
Grant 8ac455f5f5 v0.8.0:3 - add --max-num-batched-tokens=16384 to vision models (gemma4, qwen3-vl)
After the recent eugr/spark-vllm-docker update, vLLM became stricter about multimodal token budgets:

  ValueError: Chunked MM input disabled but max_tokens_per_mm_item (2496) is
  larger than max_num_batched_tokens (2048). Please increase max_num_batched_tokens.

Each image input produces 2496 tokens, but vLLM's default --max-num-batched-tokens of 2048 is just under. Same class of bug as the Qwen3.6 Mamba block-size assertion we fixed in 0.6.0:1, surfacing on different models.

Fix: bake --max-num-batched-tokens=16384 into every multimodal model entry. Now applied to:
  - qwen36 (already had it for the Mamba constraint; works for multimodal too since Qwen3.6 has vision)
  - gemma4 (crashed today on engine init)
  - qwen3-vl (would crash with the same error if anyone tried it)

The pre-flight Test button validates argparse but the 2048<2496 check happens at runtime engine init, so it's not caught by Test — only by actually trying to load. This is exactly the kind of bug v0.7's Test catches the *syntax* of but not the *semantics*; runtime errors like this still surface only on real swap. Known limitation documented in v0.7 release notes.
2026-05-12 14:47:32 -05:00
Grant 000c55febe v0.8.0 - Deep health probes + auto-restart on CUDA wedge
deep_health.py:
- Synthetic probes per service, all payloads generated in-memory (BytesIO), never written to disk:
  - Parakeet: 1s of digital silence via in-memory WAV → POST /v1/audio/transcriptions
  - Magpie:   short 'hi' text → POST /v1/audio/synthesize (multipart form-data, real TTS API endpoint discovered via openapi.json)
  - vLLM:     1-token completion against currently-loaded model
- Background loop runs every 5 minutes (configurable). Best-effort: exceptions in the loop never kill it.
- Auto-restart on wedge-pattern errors (cudaErrorUnknown / CUFFT_INTERNAL_ERROR / 500 / Engine core init failed): docker restart of the affected container.
  - Rate-limited: max 3 restarts per service per 30 min.
  - Cooldown: 120 s between consecutive restarts on the same service.
  - 60 s startup grace before any auto-restart can fire after the app boots.
- Probe failures + recoveries logged via record_report(source='deep-health') into the connectivity history alongside the polling-based transitions.

API:
- GET /api/deep-health: per-service last result + auto-restart counters
- POST /api/deep-health/{service}/run: manual trigger now

UI:
- Service cards show 'Deep check ok/FAILED <time> <latency>' inline, plus a ↻ button to run-now
- Auto-restart count in 30-min window surfaced on the card when > 0
- Inline error excerpt shown for failed probes

Bug fix: server.py app startup hook was placed before the FastAPI app object was constructed (would crash on import). Moved after.
2026-05-12 14:41:01 -05:00
Grant 6434b01a95 v0.7.0 - Pre-flight launch validation (Test button on every model card)
validate.py:
- Builds the same args list a real swap would pass to 'vllm serve'
- SSHes into Spark 1 and runs vLLM's own argparse layer inside the running vllm_node container, WITHOUT initializing the engine
- Uses FlexibleArgumentParser (from vllm.utils.argparse_utils, with fallback to engine.arg_utils) + make_arg_parser — the exact same parser the 'vllm serve' CLI uses. Earlier attempt with bare argparse.ArgumentParser was too strict (rejected '--moe_backend' with underscore that the real CLI accepts via FlexibleArgumentParser's normalization)
- Returns structured {ok, stage, error, cmd_args, launch_cmd} so the UI can surface the exact failure cause

Endpoint: POST /api/swap/{key}/validate. Cheap (~5s), no engine init, no disruption to the currently-loaded model.

Frontend: 'Test' button on every model card, inline result below the action row (green check or red detailed error). Result stays visible until the user reloads or clicks Test again.

Catches: typos in flag names, deprecated/removed flags after a vLLM upgrade, type mismatches. Does NOT catch runtime-only failures (Mamba block-size assertion, OOM at load, kernel-compat). Ok=true is necessary-but-not-sufficient; ok=false is definitive 'don't bother running it'.
2026-05-12 13:37:37 -05:00
Grant 5827683a09 v0.6.0:1 - fix Qwen3.6 Mamba block-size assertion at launch
vLLM trips on launching Qwen3.6-35B-A3B-NVFP4 with:
  AssertionError: In Mamba cache align mode, block_size (2096) must be
  <= max_num_batched_tokens (2048).

Qwen3.6 uses a Mamba-attention hybrid. The default --max-num-batched-tokens of 2048 is just under the model's required block_size of 2096. The upstream sibling recipe (qwen3.5-35b-a3b-fp8.yaml) sets it to 16384; use the same value.

Earlier qwen36 swaps in this session worked because vLLM hadn't reached the Mamba-validation code path on that prior path (different attention backend pick or auto-retry). Whatever the reason, the explicit flag avoids the dance.

Also documented in known-issues.md.
2026-05-12 13:22:24 -05:00
Grant ee8c2406b8 v0.6.0 - Service-level connectivity tracking + passive failure-report endpoint
connectivity.py:
- Generalized 'spark' subject to any string; renamed 'spark' field to 'subject'
- Legacy v0.5 events with the old 'spark' field are migrated transparently on read (kind defaults to 'transition')
- New record_report(subject, ok, source, detail, latency_ms): always appends an event with kind='report'; does NOT mutate the current state (only active polling is authoritative)
- summary() returns events normalized to the new schema

Wiring:
- /api/status now calls record_state for vllm/parakeet/magpie (dedup on no-change)
- /api/services calls record_state for each service after its http check
- Result: dashboard observes service-level transitions automatically with no extra polling

Passive endpoint:
- POST /api/health-event with {service, ok, source?, error?, ms?}
- Useful for external apps (e.g. Open WebUI) to surface sub-poll-interval failures the dashboard would otherwise miss

UI:
- Connectivity dialog groups events by subject (hosts ordered first, then services)
- Per-subject summary shows transition count, down count, report count, failed-report count
- Transitions and reports render inline with distinct styling; reports show source app + error + latency
- Legacy v0.5 events render unchanged

Docs:
- README documents /api/health-event with a curl example

Package: bump to 0.6.0:0
2026-05-12 13:19:27 -05:00
Grant a02f4db850 v0.5.0 - Wake-on-LAN + connectivity history
wol.py:
- build_magic_packet(): standard 6x0xFF + 16x MAC layout
- send_local_broadcast(): direct from container (ports 9 + 7 for safety)
- send_via_peer(): preferred path; SSHes to the OTHER Spark and runs a Python one-liner there so the packet originates on the target's LAN segment (most reliable)
- MAC validation + normalization

connectivity.py:
- /data/connectivity.json persistence (thread-safe, atomic rename)
- Stores per-Spark current state + last_change timestamp + rolling 200-event log
- Records up/down transitions; computes down_seconds / up_seconds durations
- MAC cache populated lazily during hardware probes

hardware.py:
- Probe now reads MAC via /sys/class/net/<default-route-iface>/address
- After each probe, record_state() emits a transition event if state changed
- record_mac() caches the address so WoL works when the Spark next goes down

Endpoints:
- GET /api/connectivity: macs, current state, last_change, events[]
- POST /api/spark/{name}/wake: tries via-peer first, falls back to direct broadcast

UI:
- Unreachable hardware card shows the cached MAC + 'Wake (WoL)' button (only if MAC known)
- New 'Connectivity log' button opens a modal with per-Spark transition history (last 25 each), including duration of each prior up/down period
- pollHardware also pulls /api/connectivity so WoL buttons appear without an extra fetch

Package: bump 0.5.0:0; main.ts sets CONNECTIVITY_LOG=/data/connectivity.json
2026-05-12 12:51:49 -05:00
Grant 1889ab45fb v0.4.0 - NIM installer + dashboard resilience
Hotfix (was v0.3.1):
- services.py: cache 'unreachable' per (host,user) for 25s so a dead Spark doesn't hang every /api/services call behind 6s ssh timeout
- ssh_run timeout reduced 10 -> 6s for docker_state probes
- hardware probe: shorter SSH timeout (6s), longer cache TTL for failures (25s)
- JS pollStatus retries loadModels() if state.models is empty (recovers from cold-start proxy timeout)
- Unreachable hardware card now includes troubleshooting steps (Spark Control cannot SSH into an unreachable Spark to restart it)

v0.4 NIM installer:
- nim.py module: curated SUGGESTED_NIMS list (Parakeet, Magpie, Riva) + NimManager that runs docker login nvcr.io + docker pull + docker run -d --gpus all -p PORT:PORT -v VOL:/opt/nim/.cache -e NGC_API_KEY -e ... --restart=unless-stopped + chown the volume to uid 1000 + restart. Streams all output via SSE; redacts the API key from log lines.
- custom_services.py: persists installed NIMs to /data/services-overrides.yaml so they appear in the services panel after install
- services.py: merges custom services into the panel
- /api/nim/catalog GET, /api/nim/install POST + GET/SSE
- /api/services/{name} DELETE for custom services
- UI: '+ Install NIM' button next to 'Always-on services'; modal lists curated images each with a 'Pick' button + a custom-image form; installation runs in a second dialog with phase + elapsed timer + collapsible log
- NGC API key field added to Configure Sparks (masked); injected as NGC_API_KEY env var into the container

Package: bump 0.4.0:0; main.ts adds SERVICES_OVERRIDES + NGC_API_KEY env vars
2026-05-12 12:32:29 -05:00
21 changed files with 2240 additions and 12 deletions
+18
View File
@@ -84,6 +84,24 @@ Other services on your LAN can hit `GET /api/endpoints` to learn where the curre
`base_url` is filled in whenever Configure Sparks has been completed (even if the underlying service isn't currently up). Pair the URL with `ready: true` to safely route traffic.
## Reporting failures from external apps
Spark Control polls every 5 s, so a brief blip in Parakeet/Magpie/vLLM availability can slip between polls and never make it into the connectivity log. To capture short failures, an external app (e.g. Open WebUI) can POST whenever a call fails (or succeeds):
```bash
curl -X POST http://<dashboard-url>/api/health-event \
-H 'content-type: application/json' \
-d '{
"service": "parakeet",
"ok": false,
"source": "open-webui",
"error": "HTTP 503",
"ms": 420
}'
```
Fields: `service` (required), `ok` (required), `source` (optional, free-form), `error` (optional), `ms` (optional latency). Each POST appends a `report` event to the connectivity log alongside the polling-based transition events.
## Status
**v0.2.3** — installed and verified on a Start9 server. Five bundled LLMs in the catalog (qwen3-vl, gemma4, qwen36, qwen3-235b-fp8, qwen2.5-72b), plus any custom models added through the UI.
+2
View File
@@ -43,6 +43,7 @@ class Settings:
magpie_port: int
bind_port: int
open_webui_url: str
ngc_api_key: str
@classmethod
def from_env(cls) -> "Settings":
@@ -68,6 +69,7 @@ class Settings:
magpie_port=int(_env("MAGPIE_PORT", "9000")),
bind_port=int(_env("BIND_PORT", "9999")),
open_webui_url=_env("OPEN_WEBUI_URL", ""),
ngc_api_key=_env("NGC_API_KEY", ""),
)
@property
+190
View File
@@ -0,0 +1,190 @@
"""Track up/down transitions for any subject (Sparks AND services) and cache MACs.
Persisted to /data/connectivity.json. Schema:
{
"macs": { "spark1": "aa:bb:..", "spark2": "11:22:.." },
"current": { "spark1": "up", "parakeet": "up", "magpie": "down", ... },
"last_change": { ... },
"events": [
# Active-probe transition (logged when state flips during polling)
{ "subject": "spark2", "at": "...", "kind": "transition",
"transition": "down" },
{ "subject": "spark2", "at": "...", "kind": "transition",
"transition": "up", "down_seconds": 4500 },
# Passive report (logged whenever an external app POSTs to
# /api/health-event regardless of state change)
{ "subject": "parakeet", "at": "...", "kind": "report",
"ok": false, "source": "open-webui",
"detail": "Connection refused", "latency_ms": 320 },
]
}
Legacy events from v0.5 with `spark` instead of `subject` and no `kind` field
are read transparently as kind="transition".
"""
from __future__ import annotations
import json
import os
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
MAX_EVENTS = 200 # rolling window — plenty for showing recent history
def _path() -> str:
return os.environ.get("CONNECTIVITY_LOG", "/data/connectivity.json")
_lock = threading.Lock()
def _read() -> dict:
try:
with open(_path()) as f:
return json.load(f) or {}
except (FileNotFoundError, json.JSONDecodeError):
return {}
def _write(data: dict) -> None:
p = _path()
Path(p).parent.mkdir(parents=True, exist_ok=True)
tmp = p + ".tmp"
with open(tmp, "w") as f:
json.dump(data, f, indent=2, sort_keys=False)
os.replace(tmp, p)
def load() -> dict:
with _lock:
d = _read()
d.setdefault("macs", {})
d.setdefault("current", {})
d.setdefault("last_change", {})
d.setdefault("events", [])
return d
def record_mac(subject: str, mac: Optional[str]) -> None:
if not mac:
return
with _lock:
d = _read()
d.setdefault("macs", {})
if d["macs"].get(subject) != mac:
d["macs"][subject] = mac
_write(d)
def record_state(subject: str, reachable: bool) -> Optional[dict]:
"""Update current state for `subject`. If it differs from the last seen
state, append a transition event. Returns the event dict if a transition
was recorded, else None.
`subject` can be a Spark host key (spark1/spark2) or a service name
(parakeet/magpie/vllm).
"""
new_state = "up" if reachable else "down"
now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
with _lock:
d = _read()
d.setdefault("macs", {})
d.setdefault("current", {})
d.setdefault("last_change", {})
d.setdefault("events", [])
prev = d["current"].get(subject)
if prev == new_state:
return None
event: dict = {
"subject": subject,
"at": now,
"kind": "transition",
"transition": new_state,
}
# When we have a previous state and timestamp, compute duration
last_change = d["last_change"].get(subject)
if prev and last_change:
try:
prev_dt = datetime.fromisoformat(last_change.replace("Z", "+00:00"))
duration = (datetime.now(timezone.utc) - prev_dt).total_seconds()
if prev == "down" and new_state == "up":
event["down_seconds"] = round(duration)
if prev == "up" and new_state == "down":
event["up_seconds"] = round(duration)
except ValueError:
pass
d["current"][subject] = new_state
d["last_change"][subject] = now
d["events"].append(event)
if len(d["events"]) > MAX_EVENTS:
d["events"] = d["events"][-MAX_EVENTS:]
_write(d)
return event
def record_report(
subject: str,
*,
ok: bool,
source: str = "external",
detail: str = "",
latency_ms: Optional[int] = None,
) -> dict:
"""Record a passive report from an external caller (e.g. Open WebUI got a
503 calling Parakeet). Always appended to the events list; does NOT change
the active-probe state (which only the polling probe is authoritative on).
"""
now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
with _lock:
d = _read()
d.setdefault("events", [])
event: dict = {
"subject": subject,
"at": now,
"kind": "report",
"ok": bool(ok),
"source": source or "external",
}
if detail:
event["detail"] = detail
if latency_ms is not None:
event["latency_ms"] = int(latency_ms)
d["events"].append(event)
if len(d["events"]) > MAX_EVENTS:
d["events"] = d["events"][-MAX_EVENTS:]
_write(d)
return event
def get_mac(subject: str) -> Optional[str]:
d = load()
return d.get("macs", {}).get(subject)
def _normalize_event(e: dict) -> dict:
"""Promote legacy v0.5 events to the v0.6 shape so the UI sees one schema."""
if "subject" in e:
e.setdefault("kind", "transition")
return e
# Legacy: had "spark" + "transition" only
if "spark" in e:
e["subject"] = e.pop("spark")
e.setdefault("kind", "transition")
return e
def summary() -> dict:
"""Compact summary for the UI: known MACs, current state, recent events."""
d = load()
events = [_normalize_event(dict(e)) for e in d.get("events", [])]
return {
"macs": d.get("macs", {}),
"current": d.get("current", {}),
"last_change": d.get("last_change", {}),
"events": events[-80:],
}
+59
View File
@@ -0,0 +1,59 @@
"""User-installed services persist in /data/services-overrides.yaml.
Format:
custom:
- key: my-riva
kind: stt
host: <spark-2-ip>
user: <spark-user>
container: riva-asr
port: 8001
health_path: /health
image: nvcr.io/nim/nvidia/riva-multilingual:latest
"""
from __future__ import annotations
import os
from pathlib import Path
import yaml
def _path() -> str:
return os.environ.get("SERVICES_OVERRIDES", "/data/services-overrides.yaml")
def load_custom_services() -> list[dict]:
try:
with open(_path()) as f:
data = yaml.safe_load(f) or {}
except FileNotFoundError:
return []
return data.get("custom") or []
def add_custom_service(entry: dict) -> None:
p = _path()
Path(p).parent.mkdir(parents=True, exist_ok=True)
data: dict = {}
try:
with open(p) as f:
data = yaml.safe_load(f) or {}
except FileNotFoundError:
pass
custom = data.get("custom") or []
custom = [c for c in custom if c.get("key") != entry["key"]]
custom.append(entry)
data["custom"] = custom
with open(p, "w") as f:
yaml.safe_dump(data, f, sort_keys=False)
def delete_custom_service(key: str) -> None:
p = _path()
try:
with open(p) as f:
data = yaml.safe_load(f) or {}
except FileNotFoundError:
return
data["custom"] = [c for c in (data.get("custom") or []) if c.get("key") != key]
with open(p, "w") as f:
yaml.safe_dump(data, f, sort_keys=False)
+363
View File
@@ -0,0 +1,363 @@
"""Deep health probes for each service.
Why this exists: Triton's /health endpoint returns 200 as long as the HTTP
layer is alive and the model is registered. It does NOT verify that the CUDA
context inside the worker process is healthy. We've observed Parakeet getting
its CUDA context wedged after an OOM, where /health stays green but every
real transcription returns 500 cudaErrorUnknown.
So this module sends *real* but tiny synthetic inference requests:
- Parakeet: 1 second of digital silence (16 kHz mono PCM, in-memory WAV)
- Magpie: short text-to-speech, response audio discarded
- vLLM: 1-token chat completion against whatever model is loaded
All synthetic payloads are generated on demand into BytesIO, sent over HTTP,
and never touched the filesystem (on either spark-control's side or the
target service's side beyond normal Triton/Riva working memory).
When a probe fails with a signal that looks like a CUDA wedge, we
automatically issue `docker restart <container>`. Rate-limited to 3 restarts
per service per 30 minutes to avoid restart loops.
"""
from __future__ import annotations
import asyncio
import io
import time
import wave
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
import httpx
from .config import Settings
from .connectivity import record_report
from .services import ServiceDef, run_action, services_from_settings
# Default 5-minute interval, controllable via env. Sub-minute is silly for a
# heavy synthetic probe; we just want to catch wedges within a reasonable
# window — much faster than the user noticing on their next real call.
DEFAULT_INTERVAL_SEC = 300.0
PROBE_TIMEOUT_SEC = 20.0
RESTART_RATE_LIMIT = 3 # max auto-restarts per service
RESTART_RATE_WINDOW_SEC = 1800.0 # within a 30-min window
RESTART_COOLDOWN_SEC = 120.0 # don't restart again within this many seconds of the last one
STARTUP_GRACE_SEC = 60.0 # don't auto-restart for the first minute after this app boots
def _silence_wav(seconds: float = 1.0, sample_rate: int = 16000) -> io.BytesIO:
"""Return an in-memory WAV file containing `seconds` of digital silence."""
n_frames = int(seconds * sample_rate)
buf = io.BytesIO()
with wave.open(buf, "wb") as w:
w.setnchannels(1)
w.setsampwidth(2) # int16
w.setframerate(sample_rate)
w.writeframes(b"\x00\x00" * n_frames)
buf.seek(0)
return buf
def _looks_like_wedge(error: str) -> bool:
"""Heuristic: does this error string look like a stuck CUDA context that
a container restart would clear? We want to be conservative — only act
on signals we're confident about, otherwise leave the user in charge."""
err = (error or "").lower()
needles = [
"cudaerrorunknown",
"cuda error: unknown",
"cuda kernel errors",
"internal server error",
"engine core initialization failed",
"503", # service unavailable from a dependency
"500", # generic 5xx with a body that may not parse
]
return any(n in err for n in needles)
@dataclass
class ProbeResult:
ok: bool
at: str
latency_ms: Optional[int] = None
error: str = ""
note: str = ""
@dataclass
class ServiceState:
last: Optional[ProbeResult] = None
last_ok_at: Optional[str] = None
restarts: list[float] = field(default_factory=list)
class DeepHealth:
def __init__(self, settings: Settings, interval_sec: float = DEFAULT_INTERVAL_SEC) -> None:
self.settings = settings
self.interval_sec = interval_sec
self.state: dict[str, ServiceState] = {
"parakeet": ServiceState(),
"magpie": ServiceState(),
"vllm": ServiceState(),
}
self._stop = asyncio.Event()
self._boot_at = time.monotonic()
# ---- probes ---------------------------------------------------------
async def probe_parakeet(self) -> ProbeResult:
s = self.settings
now_iso = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
if not s.parakeet_host:
return ProbeResult(ok=False, at=now_iso, error="not configured")
url = f"http://{s.parakeet_host}:{s.parakeet_port}/v1/audio/transcriptions"
wav = _silence_wav(1.0)
t0 = time.monotonic()
try:
async with httpx.AsyncClient(timeout=PROBE_TIMEOUT_SEC) as c:
r = await c.post(
url,
files={"file": ("probe.wav", wav, "audio/wav")},
data={"model": "parakeet-tdt-0.6b-v3"},
)
latency = round((time.monotonic() - t0) * 1000)
if 200 <= r.status_code < 300:
return ProbeResult(ok=True, at=now_iso, latency_ms=latency)
return ProbeResult(
ok=False,
at=now_iso,
latency_ms=latency,
error=f"HTTP {r.status_code}: {r.text[:240]}",
)
except Exception as e:
return ProbeResult(ok=False, at=now_iso, error=f"{type(e).__name__}: {e}")
async def probe_magpie(self) -> ProbeResult:
s = self.settings
now_iso = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
if not s.magpie_host:
return ProbeResult(ok=False, at=now_iso, error="not configured")
# Magpie /v1/audio/synthesize expects multipart form-data, not JSON.
# The (None, value) tuple in httpx's `files=` produces a non-file form field.
url = f"http://{s.magpie_host}:{s.magpie_port}/v1/audio/synthesize"
form: dict = {"text": (None, "hi"), "language": (None, "en-US")}
t0 = time.monotonic()
try:
async with httpx.AsyncClient(timeout=PROBE_TIMEOUT_SEC) as c:
r = await c.post(url, files=form)
latency = round((time.monotonic() - t0) * 1000)
if 200 <= r.status_code < 300:
return ProbeResult(ok=True, at=now_iso, latency_ms=latency)
# 4xx that aren't 5xx mean server is alive but our payload is off —
# don't classify as wedge.
if 400 <= r.status_code < 500:
return ProbeResult(
ok=True,
at=now_iso,
latency_ms=latency,
note=f"{r.status_code} — server alive (probe payload may need a voice name)",
)
return ProbeResult(
ok=False,
at=now_iso,
latency_ms=latency,
error=f"HTTP {r.status_code}: {r.text[:240]}",
)
except Exception as e:
return ProbeResult(ok=False, at=now_iso, error=f"{type(e).__name__}: {e}")
async def probe_vllm(self) -> ProbeResult:
s = self.settings
now_iso = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
if not s.spark1_host:
return ProbeResult(ok=False, at=now_iso, error="not configured")
base = f"http://{s.spark1_host}:{s.vllm_port}"
# Step 1: is there a model loaded?
try:
async with httpx.AsyncClient(timeout=5.0) as c:
r = await c.get(f"{base}/v1/models")
if 200 <= r.status_code < 300:
models = r.json().get("data") or []
else:
# 5xx on /v1/models suggests something wedged after a model loaded
return ProbeResult(
ok=False,
at=now_iso,
error=f"list_models HTTP {r.status_code}: {r.text[:240]}",
)
except Exception:
# Connection refused / timeout: usually means no vLLM process listening
# (the vllm_node container is alive but no `vllm serve` is running yet).
# That's an idle state, not a wedge — don't trigger auto-restart.
return ProbeResult(
ok=True,
at=now_iso,
note="no model currently loaded (idle)",
)
if not models:
return ProbeResult(
ok=True,
at=now_iso,
note="no model currently loaded (idle)",
)
model_id = models[0]["id"]
# Step 2: model is loaded; verify it can actually complete a 1-token request.
t0 = time.monotonic()
try:
async with httpx.AsyncClient(timeout=PROBE_TIMEOUT_SEC) as c:
r = await c.post(
f"{base}/v1/chat/completions",
json={
"model": model_id,
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 1,
"temperature": 0,
},
)
latency = round((time.monotonic() - t0) * 1000)
if 200 <= r.status_code < 300:
return ProbeResult(ok=True, at=now_iso, latency_ms=latency, note=f"model={model_id}")
return ProbeResult(
ok=False,
at=now_iso,
latency_ms=latency,
error=f"HTTP {r.status_code}: {r.text[:240]}",
)
except Exception as e:
return ProbeResult(ok=False, at=now_iso, error=f"{type(e).__name__}: {e}")
# ---- orchestration --------------------------------------------------
PROBES = {
"parakeet": "probe_parakeet",
"magpie": "probe_magpie",
"vllm": "probe_vllm",
}
async def run_one(self, service: str) -> ProbeResult:
fn = getattr(self, self.PROBES[service])
result: ProbeResult = await fn()
st = self.state[service]
prev_ok = st.last.ok if st.last else None
st.last = result
if result.ok:
st.last_ok_at = result.at
# Log to connectivity history: every failure, plus the first success
# after a failure (recovery), plus the first probe ever — but skip
# the "still ok" steady-state to keep the log readable.
if not result.ok:
record_report(
service,
ok=False,
source="deep-health",
detail=result.error[:240],
latency_ms=result.latency_ms,
)
elif prev_ok is False:
record_report(
service,
ok=True,
source="deep-health",
detail="recovered" + (f"{result.note}" if result.note else ""),
latency_ms=result.latency_ms,
)
elif prev_ok is None:
record_report(
service,
ok=True,
source="deep-health",
detail="first probe ok" + (f"{result.note}" if result.note else ""),
latency_ms=result.latency_ms,
)
# Maybe auto-restart
if not result.ok and _looks_like_wedge(result.error):
await self._maybe_restart(service, result.error)
return result
async def _maybe_restart(self, service: str, error: str) -> None:
# No restarts during the boot grace period.
if time.monotonic() - self._boot_at < STARTUP_GRACE_SEC:
return
st = self.state[service]
now = time.monotonic()
st.restarts = [t for t in st.restarts if now - t < RESTART_RATE_WINDOW_SEC]
if st.restarts and now - st.restarts[-1] < RESTART_COOLDOWN_SEC:
return # already restarted recently, give it time
if len(st.restarts) >= RESTART_RATE_LIMIT:
record_report(
service,
ok=False,
source="deep-health",
detail=f"rate-limited; not auto-restarting (would be #{len(st.restarts)+1} in 30 min)",
)
return
services = services_from_settings(self.settings)
if service not in services:
return
svc = services[service]
if not svc.host or not svc.user:
return
result = await run_action(self.settings, svc, "restart")
st.restarts.append(now)
ok = result.get("ok", False)
record_report(
service,
ok=False,
source="deep-health",
detail=f"auto-restart triggered (wedge: {error[:120]}); restart {'OK' if ok else 'FAILED'}",
)
async def run_all(self) -> dict[str, ProbeResult]:
results = {}
for name in self.PROBES:
results[name] = await self.run_one(name)
return results
async def run_periodic(self) -> None:
"""Long-running loop. Cancel via .stop()."""
# Brief initial wait to let app finish startup
try:
await asyncio.wait_for(self._stop.wait(), timeout=10.0)
return
except asyncio.TimeoutError:
pass
while not self._stop.is_set():
try:
await self.run_all()
except Exception:
# Never let the loop die; the periodic check is best-effort
pass
try:
await asyncio.wait_for(self._stop.wait(), timeout=self.interval_sec)
return
except asyncio.TimeoutError:
continue
def stop(self) -> None:
self._stop.set()
def summary(self) -> dict:
out = {}
for name, st in self.state.items():
last = st.last
out[name] = {
"last_ok_at": st.last_ok_at,
"last": (
{
"ok": last.ok,
"at": last.at,
"latency_ms": last.latency_ms,
"error": last.error,
"note": last.note,
}
if last
else None
),
"auto_restarts_window": len(st.restarts),
}
return out
+130
View File
@@ -0,0 +1,130 @@
"""On-disk presence + deletion for Hugging Face model caches on the Sparks.
The HF cache layout for a repo `org/name` is:
~/.cache/huggingface/hub/models--org--name/
We use `du -sb` to measure size (bytes) and `rm -rf` to free it. All operations
are gated by the server endpoints, which refuse to delete a currently-loaded
model or one tied to an in-flight swap/download.
"""
from __future__ import annotations
import asyncio
import shlex
from dataclasses import dataclass
from typing import Optional
from .config import Settings
from .ssh import ssh_run
def repo_to_cache_dirname(repo: str) -> str:
"""Convert 'org/name' to 'models--org--name' (the HF hub cache directory)."""
if "/" not in repo:
raise ValueError(f"repo must be in 'org/name' form: {repo!r}")
return "models--" + repo.replace("/", "--")
def _cache_path(repo: str) -> str:
"""Full remote path to the model's cache directory."""
# Use $HOME so it resolves correctly regardless of the SSH user's home.
return f"$HOME/.cache/huggingface/hub/{repo_to_cache_dirname(repo)}"
@dataclass
class HostDiskResult:
host: str
on_disk: bool
size_bytes: int = 0
error: Optional[str] = None
@dataclass
class DiskStatus:
repo: str
on_disk: bool # True if present on AT LEAST one host
total_bytes: int # sum across hosts
per_host: list[HostDiskResult]
async def probe_host(host: str, user: str, repo: str, settings: Settings) -> HostDiskResult:
"""Return whether the model's cache dir exists on this host and its size."""
if not host or not user:
return HostDiskResult(host=host or "?", on_disk=False, error="host not configured")
path = _cache_path(repo)
# `du -sb` prints bytes; if the dir doesn't exist, `du` returns non-zero.
# We test existence explicitly first so we can report on_disk=False cleanly.
cmd = (
f"if [ -d {shlex.quote(path)} ]; then "
f"du -sb {shlex.quote(path)} 2>/dev/null | awk '{{print $1}}'; "
f"else echo MISSING; fi"
)
rc, out, err = await ssh_run(host, user, cmd, settings, timeout=20.0)
if rc != 0:
return HostDiskResult(host=host, on_disk=False, error=(err or out).strip() or f"rc={rc}")
raw = out.strip()
if raw == "MISSING" or raw == "":
return HostDiskResult(host=host, on_disk=False)
try:
size = int(raw.splitlines()[-1])
except ValueError:
return HostDiskResult(host=host, on_disk=False, error=f"unparsable du output: {raw!r}")
return HostDiskResult(host=host, on_disk=True, size_bytes=size)
async def probe_disk(repo: str, mode: str, settings: Settings) -> DiskStatus:
"""Probe one model across the relevant Sparks based on its mode (solo|cluster)."""
hosts: list[tuple[str, str]] = [(settings.spark1_host, settings.spark1_user)]
if mode == "cluster" and settings.spark2_host:
hosts.append((settings.spark2_host, settings.spark2_user))
results = await asyncio.gather(*(probe_host(h, u, repo, settings) for h, u in hosts))
on_disk = any(r.on_disk for r in results)
total = sum(r.size_bytes for r in results)
return DiskStatus(repo=repo, on_disk=on_disk, total_bytes=total, per_host=list(results))
async def delete_host(host: str, user: str, repo: str, settings: Settings) -> HostDiskResult:
"""Probe + rm -rf on one host. Returns bytes freed (0 if the dir wasn't there)."""
if not host or not user:
return HostDiskResult(host=host or "?", on_disk=False, error="host not configured")
path = _cache_path(repo)
# Safety: hard-code the prefix in the command so a bad `repo` can never escape.
# Compute size first, then remove. If absent, still return success (idempotent).
cmd = (
f"set -e; "
f"P={shlex.quote(path)}; "
f"if [ -d \"$P\" ]; then "
f" SIZE=$(du -sb \"$P\" 2>/dev/null | awk '{{print $1}}'); "
f" rm -rf -- \"$P\"; "
f" echo FREED $SIZE; "
f"else "
f" echo FREED 0; "
f"fi"
)
rc, out, err = await ssh_run(host, user, cmd, settings, timeout=120.0)
if rc != 0:
return HostDiskResult(host=host, on_disk=False, error=(err or out).strip() or f"rc={rc}")
# Parse the "FREED N" line
freed = 0
for line in out.splitlines():
parts = line.strip().split()
if len(parts) == 2 and parts[0] == "FREED":
try:
freed = int(parts[1])
except ValueError:
pass
break
return HostDiskResult(host=host, on_disk=False, size_bytes=freed)
async def delete_from_disk(repo: str, mode: str, settings: Settings) -> DiskStatus:
"""rm -rf the model's cache dir on the relevant Sparks. Idempotent."""
hosts: list[tuple[str, str]] = [(settings.spark1_host, settings.spark1_user)]
if mode == "cluster" and settings.spark2_host:
hosts.append((settings.spark2_host, settings.spark2_user))
results = await asyncio.gather(*(delete_host(h, u, repo, settings) for h, u in hosts))
total_freed = sum(r.size_bytes for r in results)
# After deletion, on_disk should be False on all hosts.
return DiskStatus(repo=repo, on_disk=False, total_bytes=total_freed, per_host=list(results))
+23 -5
View File
@@ -10,6 +10,7 @@ import time
from typing import Any
from .config import Settings
from .connectivity import record_mac, record_state
from .ssh import ssh_run
@@ -23,6 +24,8 @@ echo MEMORY=$(free -b 2>/dev/null | awk '/^Mem:/ {print $2, $3}')
echo DISK=$(df -B1 / 2>/dev/null | awk 'NR==2 {print $2, $3}')
echo GPU=$(nvidia-smi --query-gpu=name,utilization.gpu,temperature.gpu,power.draw,memory.total --format=csv,noheader,nounits 2>/dev/null | head -1)
echo GPU_MEM_USED_MIB=$(nvidia-smi --query-compute-apps=used_gpu_memory --format=csv,noheader,nounits 2>/dev/null | awk '{s+=$1} END {print s+0}')
DEFIF=$(ip route show default 2>/dev/null | awk '{print $5; exit}')
echo MAC=$(cat /sys/class/net/$DEFIF/address 2>/dev/null)
""".strip()
@@ -78,18 +81,25 @@ def _parse(out: str) -> dict:
# Sum per-process compute memory (works even on unified-memory systems)
if info.get("gpu_mem_used_mib"):
parsed["gpu_mem_used_mib"] = _parse_int(info["gpu_mem_used_mib"])
# MAC address on the default-route interface (for Wake-on-LAN)
if info.get("mac"):
parsed["mac"] = info["mac"].lower()
return parsed
class HardwareProbe:
"""Caches results briefly to avoid hammering the Sparks."""
def __init__(self, settings: Settings, ttl_sec: float = 4.0) -> None:
def __init__(self, settings: Settings, ttl_sec: float = 4.0, fail_ttl_sec: float = 25.0) -> None:
self.settings = settings
self.ttl_sec = ttl_sec
self.fail_ttl_sec = fail_ttl_sec
self._cache: dict[str, tuple[float, dict]] = {}
self._locks: dict[str, asyncio.Lock] = {}
def _ttl_for(self, value: dict) -> float:
return self.ttl_sec if value.get("reachable") else self.fail_ttl_sec
def _lock(self, key: str) -> asyncio.Lock:
if key not in self._locks:
self._locks[key] = asyncio.Lock()
@@ -108,12 +118,20 @@ class HardwareProbe:
async with self._lock(key):
now = time.monotonic()
cached = self._cache.get(key)
if cached and (now - cached[0] < self.ttl_sec):
if cached and (now - cached[0] < self._ttl_for(cached[1])):
return cached[1]
rc, out, err = await ssh_run(host, user, _PROBE, self.settings, timeout=12)
# Use a shorter timeout for the connect phase; if a previous probe
# marked this host unreachable, return the cached failure immediately.
rc, out, err = await ssh_run(host, user, _PROBE, self.settings, timeout=6)
if rc != 0:
result = {"reachable": False, "configured": True, "host": host, "error": err.strip() or out.strip() or f"rc={rc}"}
else:
result = {"reachable": True, "configured": True, "host": host, **_parse(out)}
self._cache[key] = (now, result)
record_state(key, False)
return result
parsed = _parse(out)
result = {"reachable": True, "configured": True, "host": host, **parsed}
self._cache[key] = (now, result)
record_state(key, True)
if parsed.get("mac"):
record_mac(key, parsed["mac"])
return result
+202
View File
@@ -0,0 +1,202 @@
"""NVIDIA NIM container install / lifecycle.
Two pieces:
* A small curated catalog of NIM images (so users don't have to copy/paste
huge nvcr.io URLs).
* An installer that SSHes into the target Spark, runs `docker pull` then
`docker run -d --gpus all -p PORT:PORT -v VOLUME:/opt/nim/.cache
-e NGC_API_KEY=... IMAGE` and streams output.
Custom services also persist via `overrides.add_custom_service()` so the
Services panel can show them.
"""
from __future__ import annotations
import asyncio
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
from .config import Settings
from .ssh import ssh_stream, StreamHandle
# Curated list. These are the most useful NIM containers for a dual-Spark
# audio-and-LLM setup. Browse the full catalog at
# https://catalog.ngc.nvidia.com/orgs/nim/teams/nvidia
CATALOG_URL = "https://catalog.ngc.nvidia.com/orgs/nim/teams/nvidia/containers"
SUGGESTED_NIMS: list[dict] = [
{
"key": "parakeet-tdt-0.6b-v3",
"name": "Parakeet TDT 0.6B v3",
"image": "nvcr.io/nim/nvidia/parakeet-tdt-0-6b-v3:latest",
"default_container": "parakeet-asr",
"default_port": 8000,
"kind": "stt",
"description": "Streaming speech-to-text (English). Used by Open WebUI for voice input. ~1 GB.",
"homepage": "https://catalog.ngc.nvidia.com/orgs/nim/teams/nvidia/containers/parakeet-tdt-0-6b-v3",
},
{
"key": "magpie-tts-multilingual",
"name": "Magpie TTS Multilingual",
"image": "nvcr.io/nim/nvidia/magpie-tts-multilingual:latest",
"default_container": "magpie-tts",
"default_port": 9000,
"kind": "tts",
"description": "Multilingual text-to-speech. Counterpart to Parakeet for 'read aloud'. ~3 GB.",
"homepage": "https://catalog.ngc.nvidia.com/orgs/nim/teams/nvidia/containers/magpie-tts-multilingual",
},
{
"key": "riva-multilingual",
"name": "Riva Multilingual ASR",
"image": "nvcr.io/nim/nvidia/riva-multilingual:latest",
"default_container": "riva-asr",
"default_port": 8001,
"kind": "stt",
"description": "NVIDIA Riva speech-recognition multi-language model. Larger and more accurate than Parakeet.",
"homepage": "https://catalog.ngc.nvidia.com/orgs/nim/teams/nvidia",
},
]
@dataclass
class NimInstallJob:
id: str
image: str
container: str
port: int
host: str
user: str
volume: Optional[str]
started_at: str
state: str = "starting" # starting | pulling | running | done | failed
phase: str = "Starting…"
lines: list[str] = field(default_factory=list)
returncode: Optional[int] = None
finished_at: Optional[str] = None
def append(self, line: str) -> None:
self.lines.append(line)
if len(self.lines) > 1000:
del self.lines[: len(self.lines) - 1000]
class NimManager:
def __init__(self, settings: Settings) -> None:
self.settings = settings
self.lock = asyncio.Lock()
self.jobs: dict[str, NimInstallJob] = {}
self.current_job_id: Optional[str] = None
def get(self, job_id: str) -> NimInstallJob | None:
return self.jobs.get(job_id)
async def trigger(
self,
*,
image: str,
container: str,
port: int,
host: str,
user: str,
volume: str | None = None,
extra_env: dict[str, str] | None = None,
) -> NimInstallJob:
if self.lock.locked():
raise RuntimeError("Another NIM install is already in progress")
if not host or not user:
raise RuntimeError("target host not configured")
if not self.settings.ngc_api_key:
raise RuntimeError(
"NGC_API_KEY is not set. Open Configure Sparks in StartOS and paste your NGC personal API key (free at https://ngc.nvidia.com/setup/personal-key)."
)
job = NimInstallJob(
id=uuid.uuid4().hex[:8],
image=image,
container=container,
port=port,
host=host,
user=user,
volume=volume or f"{container}-cache",
started_at=datetime.now(timezone.utc).isoformat(),
)
self.jobs[job.id] = job
self.current_job_id = job.id
asyncio.create_task(self._run(job, extra_env or {}))
return job
async def _run(self, job: NimInstallJob, extra_env: dict[str, str]) -> None:
async with self.lock:
try:
await self._do(job, extra_env)
if job.state != "failed":
job.state = "done"
job.returncode = 0
job.phase = "Done"
except Exception as e:
job.append(f"[error] {type(e).__name__}: {e}")
job.state = "failed"
if job.returncode is None:
job.returncode = 1
finally:
job.finished_at = datetime.now(timezone.utc).isoformat()
if self.current_job_id == job.id:
self.current_job_id = None
async def _do(self, job: NimInstallJob, extra_env: dict[str, str]) -> None:
# Build the bash one-liner. We use docker login non-interactively with the NGC API key.
env_parts = [f'-e NGC_API_KEY=$NGC_API_KEY']
for k, v in extra_env.items():
env_parts.append(f"-e {k}={v}")
env_str = " ".join(env_parts)
cmd = (
f"set -e; "
f"export NGC_API_KEY='{self.settings.ngc_api_key}'; "
f"echo '=== docker login nvcr.io ==='; "
f"echo \"$NGC_API_KEY\" | docker login nvcr.io -u '$oauthtoken' --password-stdin; "
f"echo '=== docker pull {job.image} (this can be 1-10 GB) ==='; "
f"docker pull {job.image}; "
f"echo '=== remove any prior container with the same name ==='; "
f"docker rm -f {job.container} 2>/dev/null || true; "
f"echo '=== docker run -d --gpus all -p {job.port}:{job.port} -v {job.volume}:/opt/nim/.cache {env_str} --name {job.container} --restart unless-stopped {job.image} ==='; "
f"docker run -d --gpus all "
f"-p {job.port}:{job.port} "
f"-v {job.volume}:/opt/nim/.cache "
f"{env_str} "
f"--name {job.container} "
f"--restart unless-stopped "
f"{job.image}; "
f"echo '=== ensuring cache volume is writable by uid 1000 (riva-server) ==='; "
f"docker run --rm -v {job.volume}:/cache alpine chown -R 1000:1000 /cache && "
f"docker restart {job.container}; "
f"echo '=== install complete; container is starting up and will download its model on first boot ==='"
)
job.append(f"$ <install command for {job.image} on {job.host}>")
job.state = "pulling"
job.phase = "Pulling image from nvcr.io (this can take a few minutes)…"
handle = StreamHandle()
async for line in ssh_stream(job.host, job.user, cmd, self.settings, handle=handle):
# Don't log lines containing the api key
if self.settings.ngc_api_key and self.settings.ngc_api_key in line:
continue
job.append(line)
if "docker pull" in line:
job.phase = "Pulling image from nvcr.io…"
elif "Login Succeeded" in line:
job.phase = "Logged in to NGC; pulling image…"
elif "Pull complete" in line:
job.phase = "Pulling layers…"
elif "Status: Downloaded newer image" in line or "Image is up to date" in line:
job.phase = "Image ready; starting container…"
elif "docker run -d" in line:
job.state = "running"
job.phase = "Container starting; downloading model on first boot…"
rc = handle.returncode or 0
if rc != 0:
job.state = "failed"
job.returncode = rc
+317
View File
@@ -10,15 +10,22 @@ from pydantic import BaseModel
from typing import Literal
from .config import Settings
from .connectivity import get_mac, record_report, record_state, summary as connectivity_summary
from .custom_services import add_custom_service, delete_custom_service
from .deep_health import DeepHealth
from .disk import delete_from_disk, probe_disk
from .download import DownloadManager
from .hardware import HardwareProbe
from .health import check_magpie, check_parakeet, check_vllm
from .models import load_catalog
from .nim import SUGGESTED_NIMS, CATALOG_URL, NimManager
from .overrides import add_custom, delete_custom, extract_knobs_from_args, load_overrides, set_knobs
from .services import docker_state, run_action, services_from_settings
from .ssh import ssh_run
from .swap import SwapManager
from .updates import UpdateManager, get_update_status
from .validate import validate_launch
from .wol import send_local_broadcast, send_via_peer
settings = Settings.from_env()
@@ -27,9 +34,23 @@ swap_manager = SwapManager(settings, catalog)
download_manager = DownloadManager(settings)
update_manager = UpdateManager(settings)
hardware_probe = HardwareProbe(settings)
nim_manager = NimManager(settings)
deep_health = DeepHealth(settings)
app = FastAPI(title="spark-control", version="0.1.0")
@app.on_event("startup")
async def _start_deep_health() -> None:
# Fire-and-forget; the loop catches its own exceptions.
asyncio.create_task(deep_health.run_periodic())
@app.on_event("shutdown")
async def _stop_deep_health() -> None:
deep_health.stop()
_STATIC_DIR = Path(__file__).resolve().parent / "static"
app.mount("/static", StaticFiles(directory=_STATIC_DIR), name="static")
@@ -119,12 +140,191 @@ async def del_model(key: str) -> dict:
return {"ok": True, "key": key}
@app.get("/api/models/disk-status")
async def get_models_disk_status() -> dict:
"""Probe each catalog model's HF cache on the appropriate Spark(s) in parallel.
Result is keyed by model key: {on_disk, total_bytes, per_host:[{host,on_disk,size_bytes,error?}]}.
Designed to be called once on dashboard load; takes ~13s depending on Spark count.
"""
if not settings.configured:
return {"configured": False, "models": {}}
keys = list(catalog.models.keys())
statuses = await asyncio.gather(*(
probe_disk(catalog.models[k].repo, catalog.models[k].mode, settings) for k in keys
), return_exceptions=True)
out: dict[str, dict] = {}
for k, s in zip(keys, statuses):
if isinstance(s, Exception):
out[k] = {"on_disk": False, "total_bytes": 0, "per_host": [], "error": str(s)}
continue
out[k] = {
"on_disk": s.on_disk,
"total_bytes": s.total_bytes,
"per_host": [
{"host": r.host, "on_disk": r.on_disk, "size_bytes": r.size_bytes, **({"error": r.error} if r.error else {})}
for r in s.per_host
],
}
return {"configured": True, "models": out}
@app.delete("/api/models/{key}/disk")
async def del_model_disk(key: str) -> dict:
"""Delete a model's weights from the Spark filesystem(s). The catalog entry stays.
Safety rails:
- Refuses if the model is currently loaded on vLLM.
- Refuses if a swap or download is in flight.
- Idempotent: if the cache dir is already gone on a host, that host reports 0 bytes freed.
"""
if key not in catalog.models:
raise HTTPException(404, f"unknown model: {key}")
m = catalog.models[key]
# Refuse if currently loaded
try:
vllm = await check_vllm(settings)
except Exception:
vllm = {}
if vllm.get("ok") and vllm.get("current_model") == m.repo:
raise HTTPException(
409,
f"'{m.display_name}' is the currently loaded model. Switch to a different model first, then try again."
)
# Refuse if a swap is in flight
if swap_manager.current_job_id:
raise HTTPException(409, "a model swap is in progress; wait for it to finish")
# Refuse if a download is in flight for this same repo (a different model's download is fine)
if download_manager.current_job_id:
job = download_manager.get(download_manager.current_job_id)
if job and job.repo == m.repo:
raise HTTPException(409, "this model is currently downloading; cancel or wait for it to finish")
status = await delete_from_disk(m.repo, m.mode, settings)
# Audit log
record_report(
f"disk:{key}",
ok=True,
source="disk-delete",
detail=f"freed {status.total_bytes} bytes across {len(status.per_host)} host(s)",
)
return {
"ok": True,
"key": key,
"repo": m.repo,
"bytes_freed": status.total_bytes,
"per_host": [
{"host": r.host, "size_bytes": r.size_bytes, **({"error": r.error} if r.error else {})}
for r in status.per_host
],
}
@app.get("/api/hardware")
async def get_hardware() -> dict:
"""Per-Spark hardware snapshot — RAM, disk, GPU mem + util, CPU load, uptime."""
return await hardware_probe.fetch()
@app.get("/api/connectivity")
async def get_connectivity() -> dict:
"""Up/down transition log per Spark + cached MACs."""
return connectivity_summary()
@app.get("/api/deep-health")
async def get_deep_health() -> dict:
"""Last result + auto-restart counters for each service's synthetic probe."""
return deep_health.summary()
@app.post("/api/deep-health/{service}/run")
async def run_deep_health(service: str) -> dict:
"""Manually run a single service's deep-health probe right now."""
if service not in deep_health.PROBES:
raise HTTPException(404, f"unknown service: {service}")
result = await deep_health.run_one(service)
return {
"ok": result.ok,
"at": result.at,
"latency_ms": result.latency_ms,
"error": result.error,
"note": result.note,
}
class HealthEventBody(BaseModel):
service: str # e.g. "parakeet", "magpie", "vllm"
ok: bool # true on success, false on failure
source: str | None = None # what app reported (e.g. "open-webui")
error: str | None = None # optional detail
ms: int | None = None # optional latency
@app.post("/api/health-event")
async def post_health_event(body: HealthEventBody) -> dict:
"""Passive endpoint: any LAN app can POST here when its call to one of our
services succeeds or (more usefully) fails. We log the report into the
connectivity history so a brief blip that polling misses still surfaces.
Example:
curl -X POST http://<dashboard>/api/health-event \\
-H 'content-type: application/json' \\
-d '{"service":"parakeet","ok":false,"error":"503","source":"open-webui","ms":420}'
"""
if not body.service.strip():
raise HTTPException(400, "service is required")
event = record_report(
body.service.strip(),
ok=body.ok,
source=(body.source or "external").strip(),
detail=(body.error or "").strip(),
latency_ms=body.ms,
)
return {"ok": True, "recorded": event}
@app.post("/api/spark/{name}/wake")
async def wake_spark(name: str) -> dict:
"""Send a Wake-on-LAN magic packet for the named Spark.
Tries the OTHER Spark (if reachable) first because the packet has to
originate on the target's LAN segment to be reliable. Falls back to a
direct UDP broadcast from this container.
"""
if name not in ("spark1", "spark2"):
raise HTTPException(404, f"unknown spark: {name}")
mac = get_mac(name)
if not mac:
raise HTTPException(400, f"MAC for {name} not yet known; bring it up once so we can probe it, then this will work next time it sleeps")
# Find the peer's connectivity to decide the path.
other = "spark2" if name == "spark1" else "spark1"
other_host = settings.spark1_host if other == "spark1" else settings.spark2_host
other_user = settings.spark1_user if other == "spark1" else settings.spark2_user
delivered_via = None
via_peer_ok = False
via_peer_err = ""
if other_host and other_user:
via_peer_ok, via_peer_err = await send_via_peer(other_host, other_user, mac, settings)
if via_peer_ok:
delivered_via = other
if not via_peer_ok:
# Fall back to direct from this container
try:
send_local_broadcast(mac)
delivered_via = "container"
except Exception as e:
raise HTTPException(500, f"WoL failed: peer={via_peer_err!r} container={e!r}")
return {"ok": True, "spark": name, "mac": mac, "delivered_via": delivered_via}
@app.get("/api/services")
async def get_services() -> dict:
"""Lifecycle state of always-on support services (Parakeet, Magpie, …).
@@ -167,9 +367,113 @@ async def get_services() -> dict:
results = await asyncio.gather(*[one(n) for n in services.keys()])
for name, info in results:
out[name] = info
# Feed http reachability into the connectivity log (transition-only)
record_state(name, bool(info.get("http_ready")))
return out
@app.get("/api/nim/catalog")
async def get_nim_catalog() -> dict:
return {
"catalog_url": CATALOG_URL,
"ngc_key_configured": bool(settings.ngc_api_key),
"suggested": SUGGESTED_NIMS,
}
class NimInstallBody(BaseModel):
image: str
container: str
port: int
host: Literal["spark1", "spark2"] = "spark2"
kind: str = ""
register: bool = True # write to custom services overrides after install
@app.post("/api/nim/install")
async def post_nim_install(body: NimInstallBody) -> dict:
target_host = settings.spark1_host if body.host == "spark1" else settings.spark2_host
target_user = settings.spark1_user if body.host == "spark1" else settings.spark2_user
try:
job = await nim_manager.trigger(
image=body.image,
container=body.container,
port=body.port,
host=target_host,
user=target_user,
)
except RuntimeError as e:
raise HTTPException(409 if "in progress" in str(e) else 400, str(e))
if body.register:
# Persist in custom services so the panel shows it after install.
add_custom_service({
"key": body.container,
"kind": body.kind or "nim",
"host": target_host,
"user": target_user,
"container": body.container,
"port": body.port,
"image": body.image,
})
return {"job_id": job.id, "image": job.image, "container": job.container, "state": job.state}
@app.get("/api/nim/install/{job_id}")
async def get_nim_install(job_id: str) -> dict:
job = nim_manager.get(job_id)
if job is None:
raise HTTPException(404, "no such job")
return {
"id": job.id,
"image": job.image,
"container": job.container,
"port": job.port,
"host": job.host,
"state": job.state,
"phase": job.phase,
"started_at": job.started_at,
"finished_at": job.finished_at,
"returncode": job.returncode,
"lines": job.lines,
}
@app.get("/api/nim/install/{job_id}/stream")
async def stream_nim_install(job_id: str):
job = nim_manager.get(job_id)
if job is None:
raise HTTPException(404, "no such job")
async def gen():
sent = 0
last_phase = None
while True:
n = len(job.lines)
if n > sent:
for line in job.lines[sent:n]:
yield f"data: {json.dumps({'line': line})}\n\n"
sent = n
if job.phase != last_phase:
yield f"event: phase\ndata: {json.dumps({'state': job.state, 'phase': job.phase})}\n\n"
last_phase = job.phase
if job.returncode is not None and sent >= len(job.lines):
yield f"event: done\ndata: {json.dumps({'state': job.state, 'returncode': job.returncode})}\n\n"
return
await asyncio.sleep(0.5)
return StreamingResponse(gen(), media_type="text/event-stream")
@app.delete("/api/services/{name}")
async def del_service(name: str) -> dict:
# Only allow deleting custom services (not the bundled parakeet/magpie keys)
if name in ("parakeet", "magpie"):
raise HTTPException(400, "built-in service; cannot delete (use Configure Sparks to point at a different host)")
delete_custom_service(name)
return {"ok": True, "name": name}
@app.post("/api/services/{name}/{action}")
async def service_action(name: str, action: str) -> dict:
services = services_from_settings(settings)
@@ -221,6 +525,10 @@ async def get_status() -> dict:
check_parakeet(settings),
check_magpie(settings),
)
# Feed health into the connectivity log (deduped — only logs on transition)
record_state("vllm", bool(vllm.get("ok")))
record_state("parakeet", bool(parakeet.get("ok")))
record_state("magpie", bool(magpie.get("ok")))
current_key = _identify_current_model(vllm.get("current_model"))
return {
"configured": settings.configured,
@@ -246,6 +554,15 @@ class SwapRequest(BaseModel):
dry_run: bool = False
@app.post("/api/swap/{key}/validate")
async def validate_swap(key: str) -> dict:
"""Pre-flight check: run vLLM's argparse layer against the proposed launch
command WITHOUT starting an engine. Cheap (~5 s) and doesn't disturb the
currently-loaded model.
"""
return await validate_launch(key, catalog, settings)
@app.post("/api/swap")
async def post_swap(req: SwapRequest) -> dict:
if not settings.configured and not req.dry_run:
+42 -2
View File
@@ -5,6 +5,7 @@ machinery. We just run `docker start|stop|restart <container>` via SSH on the
appropriate host.
"""
from __future__ import annotations
import time
from dataclasses import dataclass
from typing import Literal, Optional
@@ -12,6 +13,25 @@ from .config import Settings
from .ssh import ssh_run
# Cache the "unreachable" verdict per (host, user) for a short period so that a
# repeated docker_state call doesn't re-pay the 6 s SSH connect timeout each time.
_UNREACHABLE_TTL = 25.0
_unreachable_cache: dict[tuple[str, str], float] = {}
def _is_recently_unreachable(host: str, user: str) -> bool:
ts = _unreachable_cache.get((host, user))
return bool(ts and time.monotonic() - ts < _UNREACHABLE_TTL)
def _mark_unreachable(host: str, user: str) -> None:
_unreachable_cache[(host, user)] = time.monotonic()
def _clear_unreachable(host: str, user: str) -> None:
_unreachable_cache.pop((host, user), None)
ServiceName = Literal["parakeet", "magpie"]
ServiceAction = Literal["start", "stop", "restart"]
@@ -27,7 +47,8 @@ class ServiceDef:
def services_from_settings(s: Settings) -> dict[str, ServiceDef]:
return {
from .custom_services import load_custom_services
out: dict[str, ServiceDef] = {
"parakeet": ServiceDef(
name="parakeet",
kind="stt",
@@ -45,19 +66,38 @@ def services_from_settings(s: Settings) -> dict[str, ServiceDef]:
port=s.magpie_port,
),
}
for entry in load_custom_services():
key = entry.get("key")
if not key or key in out:
continue
out[key] = ServiceDef(
name=key,
kind=entry.get("kind", ""),
host=entry.get("host", ""),
user=entry.get("user", ""),
container=entry.get("container", key),
port=int(entry.get("port", 0)),
)
return out
async def docker_state(settings: Settings, svc: ServiceDef) -> dict:
"""Get docker state (running, exited, restarting, etc.) + restart count."""
if not svc.host or not svc.user:
return {"state": "unconfigured", "restart_count": None, "uptime": None}
if _is_recently_unreachable(svc.host, svc.user):
return {"state": "unreachable", "host_unreachable": True, "restart_count": None, "uptime": None}
cmd = (
f"docker inspect {svc.container} "
f"--format '{{{{.State.Status}}}}|{{{{.State.StartedAt}}}}|{{{{.RestartCount}}}}|{{{{.State.ExitCode}}}}|{{{{.State.Error}}}}' "
f"2>&1 || echo 'NOT_FOUND'"
)
rc, out, _ = await ssh_run(svc.host, svc.user, cmd, settings, timeout=10)
rc, out, _ = await ssh_run(svc.host, svc.user, cmd, settings, timeout=6)
out = out.strip()
if rc == 124 or "timeout after" in out.lower():
_mark_unreachable(svc.host, svc.user)
return {"state": "unreachable", "host_unreachable": True, "restart_count": None, "uptime": None}
_clear_unreachable(svc.host, svc.user)
if rc != 0 or out.startswith("NOT_FOUND") or "Error" in out and "no such object" in out.lower():
return {"state": "missing", "restart_count": None, "uptime": None, "raw": out}
parts = out.split("|")
+465
View File
@@ -17,6 +17,9 @@ const state = {
config: {},
configured: true,
timer_handle: null,
deep_health: {},
disk_status: {}, // keyed by model key: { on_disk, total_bytes, per_host }
disk_status_loaded: false,
};
const el = (sel) => document.querySelector(sel);
@@ -56,12 +59,36 @@ function renderCards() {
? `<div class="desc">${escapeHtml(m.description)}</div>`
: '';
const customPill = m.custom ? `<span class="tag custom-pill">custom</span>` : '';
// Disk-presence pill + trash button. Until /api/models/disk-status comes back,
// we don't know — render a neutral placeholder.
const disk = state.disk_status[key];
let diskPill = '';
if (state.disk_status_loaded) {
if (disk && disk.on_disk) {
const gb = (disk.total_bytes / 1e9);
diskPill = `<span class="tag on-disk" title="Weights present on disk">on disk · ${gb.toFixed(1)} GB</span>`;
} else {
diskPill = `<span class="tag not-on-disk" title="Weights not downloaded">not downloaded</span>`;
}
}
// Trash button — hidden if not on disk; disabled (with tooltip) if currently loaded.
let trashBtn = '';
if (state.disk_status_loaded && disk && disk.on_disk) {
const disabled = isActive || isSwapping;
const tip = isActive
? 'Currently loaded — switch to another model first'
: isSwapping
? 'A swap is in progress'
: 'Delete weights from disk';
trashBtn = `<button class="icon-btn danger" data-disk-del-key="${key}" title="${escapeHtml(tip)}" aria-label="Delete from disk" ${disabled ? 'disabled' : ''}>${trashIcon}</button>`;
}
card.innerHTML = `
<div class="name">${escapeHtml(m.display_name)}</div>
<div class="meta">
<span class="tag mode-${m.mode}">${m.mode}</span>
<span class="tag">${m.size_gb} GB</span>
${customPill}
${diskPill}
${(m.capabilities || []).map(c => `<span class="tag cap">${escapeHtml(c)}</span>`).join('')}
</div>
${desc}
@@ -73,8 +100,11 @@ function renderCards() {
<button class="btn ${isActive ? '' : 'primary'}" data-swap-key="${key}" ${isActive || isSwapping ? 'disabled' : ''}>
${isActive ? 'Current' : 'Switch to this'}
</button>
<button class="btn test-btn" data-test-key="${key}" title="Pre-flight check the launch command without starting the engine">Test</button>
<button class="btn adv-btn" data-adv-key="${key}" title="Advanced settings">Advanced</button>
${trashBtn}
</div>
<div class="test-result hidden" data-test-result-for="${key}"></div>
`;
root.appendChild(card);
}
@@ -84,6 +114,42 @@ function renderCards() {
for (const btn of root.querySelectorAll('[data-adv-key]')) {
btn.addEventListener('click', () => openAdvanced(btn.dataset.advKey));
}
for (const btn of root.querySelectorAll('[data-test-key]')) {
btn.addEventListener('click', () => testLaunch(btn.dataset.testKey, btn));
}
for (const btn of root.querySelectorAll('[data-disk-del-key]')) {
btn.addEventListener('click', () => openDiskDeleteDialog(btn.dataset.diskDelKey));
}
}
const trashIcon = '<svg viewBox="0 0 24 24" width="14" height="14" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" aria-hidden="true"><polyline points="3 6 5 6 21 6"></polyline><path d="M19 6l-1 14a2 2 0 0 1-2 2H8a2 2 0 0 1-2-2L5 6"></path><path d="M10 11v6"></path><path d="M14 11v6"></path><path d="M9 6V4a2 2 0 0 1 2-2h2a2 2 0 0 1 2 2v2"></path></svg>';
async function testLaunch(key, btn) {
const resultEl = document.querySelector(`[data-test-result-for="${key}"]`);
if (!resultEl) return;
const originalText = btn.textContent;
btn.disabled = true;
btn.textContent = 'Testing…';
resultEl.classList.remove('hidden', 'ok', 'fail');
resultEl.innerHTML = '<span class="muted small">Checking launch args against vLLM\'s parser…</span>';
try {
const r = await fetchJSON(`/api/swap/${encodeURIComponent(key)}/validate`, { method: 'POST' });
if (r.ok) {
resultEl.classList.add('ok');
resultEl.innerHTML = `<span class="ok-mark">✓</span> Launch args parse OK. <span class="muted small">(Doesn't guarantee runtime success — only catches argparse-level issues.)</span>`;
} else {
resultEl.classList.add('fail');
const err = escapeHtml(r.error || 'unknown error');
const stage = r.stage ? ` <span class="muted small">(${escapeHtml(r.stage)})</span>` : '';
resultEl.innerHTML = `<span class="fail-mark">✗</span> Would fail: ${err}${stage}`;
}
} catch (e) {
resultEl.classList.add('fail');
resultEl.innerHTML = `<span class="fail-mark">✗</span> Test failed: ${escapeHtml(e.message)}`;
} finally {
btn.disabled = false;
btn.textContent = originalText;
}
}
function renderCurrent(status) {
@@ -121,10 +187,110 @@ function bar(usedPct, warn) {
async function pollHardware() {
try {
state.hardware = await fetchJSON('/api/hardware');
try { state.connectivity = await fetchJSON('/api/connectivity'); } catch {}
renderHardware();
} catch (e) { console.warn('hardware poll failed', e); }
}
function fmtDuration(sec) {
if (sec == null) return '';
if (sec < 60) return `${Math.round(sec)}s`;
if (sec < 3600) return `${Math.round(sec / 60)}m`;
if (sec < 86400) {
const h = Math.floor(sec / 3600);
const m = Math.round((sec % 3600) / 60);
return m ? `${h}h ${m}m` : `${h}h`;
}
const d = Math.floor(sec / 86400);
const h = Math.round((sec % 86400) / 3600);
return h ? `${d}d ${h}h` : `${d}d`;
}
function openConnectivityDialog() {
const dlg = el('#connectivity-dialog');
const content = el('#connectivity-content');
const c = state.connectivity || {};
const events = c.events || [];
if (events.length === 0) {
content.innerHTML = '<div class="muted small">No events recorded yet. Once a Spark or service goes down and back up (or an external app reports a failure), entries appear here.</div>';
dlg.showModal();
return;
}
const bySubject = {};
for (const e of events) {
const subj = e.subject || e.spark || 'unknown'; // legacy fallback
(bySubject[subj] = bySubject[subj] || []).push(e);
}
// Sort subjects: hosts first, then services, alphabetical
const hostOrder = ['spark1', 'spark2'];
const subjects = Object.keys(bySubject).sort((a, b) => {
const ia = hostOrder.indexOf(a);
const ib = hostOrder.indexOf(b);
if (ia >= 0 && ib >= 0) return ia - ib;
if (ia >= 0) return -1;
if (ib >= 0) return 1;
return a.localeCompare(b);
});
const html = subjects.map((subj) => {
const evs = bySubject[subj];
const transitions = evs.filter(e => (e.kind || 'transition') === 'transition');
const reports = evs.filter(e => e.kind === 'report');
const downs = transitions.filter(e => e.transition === 'down').length;
const failedReports = reports.filter(e => !e.ok).length;
const mac = c.macs?.[subj];
const summaryParts = [];
if (transitions.length) summaryParts.push(`${transitions.length} probe transition${transitions.length===1?'':'s'} (${downs} down)`);
if (reports.length) summaryParts.push(`${reports.length} app report${reports.length===1?'':'s'} (${failedReports} failed)`);
const isHost = hostOrder.includes(subj);
return `
<div class="conn-spark">
<h4>${escapeHtml(subj)}${isHost ? ' <span class="muted small">[host]</span>' : ' <span class="muted small">[service]</span>'}${mac ? ` <span class="muted small">${escapeHtml(mac)}</span>` : ''}</h4>
<div class="conn-summary">${summaryParts.join(' · ') || 'no events'}</div>
${evs.slice(-30).reverse().map(e => renderConnEvent(e)).join('')}
</div>
`;
}).join('');
content.innerHTML = html;
dlg.showModal();
}
function renderConnEvent(e) {
const when = escapeHtml((e.at || '').replace('T', ' ').replace('Z', ''));
const kind = e.kind || 'transition';
if (kind === 'report') {
const ok = !!e.ok;
const source = escapeHtml(e.source || 'external');
const detail = e.detail ? `${escapeHtml(e.detail)}` : '';
const latency = e.latency_ms != null ? ` (${e.latency_ms} ms)` : '';
return `
<div class="conn-event ${ok ? 'up' : 'down'} report">
<span class="when">${when}</span>
<span class="what">${ok ? '◷ report: ok' : '◷ report: failed'} <span class="muted">from</span> ${source}${detail}</span>
<span class="dur">${latency}</span>
</div>
`;
}
const down = e.down_seconds != null ? `was down ${fmtDuration(e.down_seconds)}` : '';
const up = e.up_seconds != null ? `was up ${fmtDuration(e.up_seconds)}` : '';
return `
<div class="conn-event ${e.transition}">
<span class="when">${when}</span>
<span class="what">${e.transition === 'up' ? '↑ came back online' : '↓ dropped offline'}</span>
<span class="dur">${down}${up}</span>
</div>
`;
}
async function wakeSpark(name) {
try {
const r = await fetchJSON(`/api/spark/${name}/wake`, { method: 'POST' });
alert(`Wake-on-LAN sent to ${name} (MAC ${r.mac}, via ${r.delivered_via}). Give it ~30 seconds to wake; the card will go green when it comes back.`);
} catch (e) {
alert(`Wake failed: ${e.message}`);
}
}
function renderHardware() {
const panel = el('#hardware-panel');
const grid = el('#hardware-grid');
@@ -138,12 +304,30 @@ function renderHardware() {
const card = document.createElement('div');
if (!s.reachable) {
card.className = 'hw-card unreachable';
const mac = state.connectivity?.macs?.[key];
const wolRow = mac
? `<div class="wol-row">
<span class="mac-display">${escapeHtml(mac)}</span>
<span class="spacer"></span>
<button class="btn" data-wake="${escapeHtml(key)}">Wake (WoL)</button>
</div>`
: `<div class="muted small">MAC not yet known — once it's been up once with this dashboard installed, "Wake" will appear here.</div>`;
card.innerHTML = `
<div class="head">
<span class="name">${escapeHtml(key)}</span>
<span class="meta">unreachable</span>
</div>
<div class="muted small">${escapeHtml(s.host || '')}${escapeHtml(s.error || 'no response')}</div>
${wolRow}
<div class="muted small" style="line-height:1.5">
If Wake-on-LAN doesn't bring it back, manual steps:
<ol style="margin: 6px 0 0 18px; padding: 0;">
<li>Verify it's powered on (check the front LED).</li>
<li>Ping it from another LAN device.</li>
<li>Power-cycle it physically.</li>
<li>If it boots, this card will go green again automatically.</li>
</ol>
</div>
`;
grid.appendChild(card);
continue;
@@ -262,6 +446,35 @@ async function renderServices() {
const restartsRow = s.restart_count != null && s.restart_count > 1
? `<div class="row"><span class="k">Restarts</span><span class="v">${s.restart_count}</span></div>`
: '';
const dh = state.deep_health?.[name];
let deepRow = '';
if (dh && dh.last) {
const last = dh.last;
const when = (last.at || '').slice(11, 19); // HH:MM:SS
const verdict = last.ok
? `<span class="dh-ok">deep check ok</span>`
: `<span class="dh-fail">deep check FAILED</span>`;
const lat = last.latency_ms != null ? ` <span class="muted">${last.latency_ms} ms</span>` : '';
const restarts = dh.auto_restarts_window > 0
? ` <span class="muted">· ${dh.auto_restarts_window} auto-restart${dh.auto_restarts_window === 1 ? '' : 's'} in 30 min</span>`
: '';
deepRow = `
<div class="row deep-row">
<span class="k">Deep</span>
<span class="v deep-v">${verdict} <span class="muted small">${escapeHtml(when)}</span>${lat}${restarts}</span>
<button class="icon-btn dh-run-btn" data-dh-run="${escapeHtml(name)}" title="Run deep check now">↻</button>
</div>
${last.ok ? '' : `<div class="deep-error muted small">${escapeHtml((last.error || last.note || '').slice(0, 200))}</div>`}
`;
} else if (dh) {
deepRow = `
<div class="row deep-row">
<span class="k">Deep</span>
<span class="v muted-v">no probe yet</span>
<button class="icon-btn dh-run-btn" data-dh-run="${escapeHtml(name)}" title="Run deep check now">↻</button>
</div>
`;
}
card.innerHTML = `
<div class="head">
<span class="name">${escapeHtml(name)}</span>
@@ -272,6 +485,7 @@ async function renderServices() {
${urlRow}
${modelRow}
${restartsRow}
${deepRow}
<div class="service-actions">
<button class="btn" data-svc-action="${name}:start" ${disable('start') ? 'disabled' : ''}>Start</button>
<button class="btn" data-svc-action="${name}:restart" ${disable('restart') ? 'disabled' : ''}>Restart</button>
@@ -283,6 +497,25 @@ async function renderServices() {
for (const btn of grid.querySelectorAll('.btn[data-svc-action]')) {
btn.addEventListener('click', () => onServiceAction(btn.dataset.svcAction));
}
for (const btn of grid.querySelectorAll('[data-dh-run]')) {
btn.addEventListener('click', () => onDeepHealthRun(btn.dataset.dhRun, btn));
}
}
async function onDeepHealthRun(name, btn) {
btn.disabled = true;
const orig = btn.textContent;
btn.textContent = '…';
try {
await fetchJSON(`/api/deep-health/${encodeURIComponent(name)}/run`, { method: 'POST' });
} catch (e) {
console.warn('deep-health run failed', e);
} finally {
try { state.deep_health = await fetchJSON('/api/deep-health'); } catch {}
btn.textContent = orig;
btn.disabled = false;
renderServices();
}
}
async function onServiceAction(key) {
@@ -510,9 +743,14 @@ async function pollStatus() {
renderCurrent(status);
renderEndpoint(status);
renderHealth(status);
// If models hasn't loaded yet (init may have hit a transient proxy timeout), retry.
if (!state.models || Object.keys(state.models).length === 0) {
try { await loadModels(); } catch {}
}
// Refresh services state lazily — every 5s poll triggers this too.
try {
state.services = await fetchJSON('/api/services');
try { state.deep_health = await fetchJSON('/api/deep-health'); } catch {}
renderServices();
} catch {}
if (status.current_swap_job && status.current_swap_job !== state.swap_job_id) {
@@ -533,6 +771,78 @@ async function loadModels() {
state.models = data.models || {};
}
async function loadDiskStatus() {
// Probes each catalog model's HF cache over SSH; takes a beat. Best-effort.
try {
const r = await fetchJSON('/api/models/disk-status');
if (r && r.models) {
state.disk_status = r.models;
state.disk_status_loaded = true;
renderCards();
}
} catch (e) {
// Silent — pills just won't render. Don't block dashboard.
console.warn('disk-status probe failed:', e.message);
}
}
function fmtBytesShort(n) {
if (!Number.isFinite(n) || n <= 0) return '0 B';
if (n >= 1e9) return `${(n / 1e9).toFixed(1)} GB`;
if (n >= 1e6) return `${(n / 1e6).toFixed(1)} MB`;
if (n >= 1e3) return `${(n / 1e3).toFixed(1)} KB`;
return `${n} B`;
}
function openDiskDeleteDialog(key) {
const m = state.models[key];
const disk = state.disk_status[key];
if (!m || !disk || !disk.on_disk) return;
const dlg = el('#disk-delete-dialog');
el('#dd-summary').innerHTML = `Free <strong>${fmtBytesShort(disk.total_bytes)}</strong> by removing <strong>${escapeHtml(m.display_name)}</strong> (<code>${escapeHtml(m.repo)}</code>) from disk.`;
const hostsEl = el('#dd-hosts');
hostsEl.innerHTML = '';
for (const h of (disk.per_host || [])) {
if (!h.on_disk) continue;
const li = document.createElement('li');
li.innerHTML = `<code>${escapeHtml(h.host)}</code> — ${fmtBytesShort(h.size_bytes)}`;
hostsEl.appendChild(li);
}
const errEl = el('#dd-error');
errEl.classList.add('hidden');
errEl.textContent = '';
const confirm = el('#dd-confirm');
const cancel = el('#dd-cancel');
const onCancel = () => dlg.close();
const onConfirm = async () => {
confirm.disabled = true;
cancel.disabled = true;
confirm.textContent = 'Deleting…';
try {
const r = await fetchJSON(`/api/models/${encodeURIComponent(key)}/disk`, { method: 'DELETE' });
dlg.close();
// Optimistically clear local disk state for this key, then refresh.
delete state.disk_status[key];
renderCards();
// Eagerly re-probe so size is accurate (and shows "not downloaded" pill).
loadDiskStatus();
const freed = r && typeof r.bytes_freed === 'number' ? fmtBytesShort(r.bytes_freed) : '';
console.log(`Deleted ${m.display_name} from disk${freed ? ` — freed ${freed}` : ''}.`);
} catch (e) {
errEl.textContent = e.message || 'Delete failed';
errEl.classList.remove('hidden');
} finally {
confirm.disabled = false;
cancel.disabled = false;
confirm.textContent = 'Delete from disk';
}
};
cancel.onclick = onCancel;
confirm.onclick = onConfirm;
dlg.showModal();
}
async function triggerSwap(modelKey) {
if (state.swap_job_id) return;
try {
@@ -953,6 +1263,147 @@ function setupAdvancedDialog() {
el('#adv-gmu').addEventListener('input', (e) => { el('#adv-gmu-out').value = parseFloat(e.target.value).toFixed(2); });
}
// ===================== NIM installer =====================
const nimState = {
catalog: null,
job_id: null,
eventsource: null,
timer: null,
started_at: null,
};
async function loadNimCatalog() {
try {
nimState.catalog = await fetchJSON('/api/nim/catalog');
el('#nim-catalog-link').href = nimState.catalog.catalog_url;
const warn = el('#nim-key-warn');
if (!nimState.catalog.ngc_key_configured) {
warn.classList.add('nim-key-warn');
warn.innerHTML = '⚠️ NGC API key not set. Open <strong>Configure Sparks</strong> in StartOS and paste your NGC personal API key, otherwise installs will fail. <a href="https://ngc.nvidia.com/setup/personal-key" target="_blank" rel="noopener">Get a key</a>';
} else {
warn.classList.remove('nim-key-warn');
warn.textContent = '';
}
const grid = el('#nim-suggested');
grid.innerHTML = '';
for (const s of nimState.catalog.suggested || []) {
const card = document.createElement('div');
card.className = 'nim-card';
card.innerHTML = `
<div class="info">
<div class="name">${escapeHtml(s.name)} <span class="muted small">· ${escapeHtml(s.kind || 'nim')}</span></div>
<div class="desc">${escapeHtml(s.description || '')}</div>
<div class="img">${escapeHtml(s.image)}</div>
<div class="links">${s.homepage ? `<a href="${escapeHtml(s.homepage)}" target="_blank" rel="noopener">View on NGC ↗</a>` : ''}</div>
</div>
<button type="button" class="btn primary nim-pick" data-image="${escapeHtml(s.image)}" data-container="${escapeHtml(s.default_container)}" data-port="${s.default_port}" data-kind="${escapeHtml(s.kind)}">Pick</button>
`;
grid.appendChild(card);
}
grid.querySelectorAll('.nim-pick').forEach(btn => {
btn.addEventListener('click', () => {
el('#nim-image').value = btn.dataset.image;
el('#nim-container').value = btn.dataset.container;
el('#nim-port').value = btn.dataset.port;
el('#nim-kind').value = btn.dataset.kind || 'nim';
});
});
} catch (e) { console.warn('nim catalog failed', e); }
}
function openNimDialog() {
loadNimCatalog();
el('#nim-dialog').showModal();
}
async function submitNim(e) {
e.preventDefault();
const body = {
image: el('#nim-image').value.trim(),
container: el('#nim-container').value.trim(),
port: parseInt(el('#nim-port').value, 10),
host: el('#nim-host').value,
kind: el('#nim-kind').value,
};
if (!body.image || !body.container || !body.port) {
alert('Image, container name, and port are required.');
return;
}
try {
const r = await fetchJSON('/api/nim/install', {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(body),
});
el('#nim-dialog').close();
attachNimProgress(r.job_id);
} catch (e) {
alert('Install failed: ' + e.message);
}
}
function nimTimerStart(at) {
nimState.started_at = at;
if (nimState.timer) clearInterval(nimState.timer);
const tick = () => {
if (!nimState.started_at) return;
const sec = Math.max(0, Math.floor((Date.now() - nimState.started_at) / 1000));
const m = Math.floor(sec / 60);
const s = sec % 60;
el('#nim-prog-elapsed').textContent = `${m}:${s.toString().padStart(2, '0')}`;
};
tick();
nimState.timer = setInterval(tick, 500);
}
async function attachNimProgress(jobId) {
nimState.job_id = jobId;
el('#nim-prog-log').textContent = '';
el('#nim-prog-title').textContent = 'Installing…';
el('#nim-progress-dialog').showModal();
try {
const snap = await fetchJSON(`/api/nim/install/${jobId}`);
nimTimerStart(Date.parse(snap.started_at));
el('#nim-prog-phase').textContent = snap.phase || 'Working…';
el('#nim-prog-log').textContent = (snap.lines || []).join('\n');
if (snap.returncode !== null) { onNimDone(snap); return; }
} catch { nimTimerStart(Date.now()); }
const es = new EventSource(`/api/nim/install/${jobId}/stream`);
nimState.eventsource = es;
es.onmessage = ev => {
try {
const d = JSON.parse(ev.data);
if (d.line !== undefined) {
const log = el('#nim-prog-log');
log.textContent += d.line + '\n';
log.scrollTop = log.scrollHeight;
}
} catch {}
};
es.addEventListener('phase', ev => {
try { el('#nim-prog-phase').textContent = JSON.parse(ev.data).phase; } catch {}
});
es.addEventListener('done', ev => {
let d = {}; try { d = JSON.parse(ev.data); } catch {}
onNimDone(d);
});
es.onerror = () => { es.close(); nimState.eventsource = null; };
}
function onNimDone(d) {
if (nimState.eventsource) { nimState.eventsource.close(); nimState.eventsource = null; }
if (nimState.timer) { clearInterval(nimState.timer); nimState.timer = null; }
if (d.state === 'failed') {
el('#nim-prog-title').textContent = `Failed (rc=${d.returncode})`;
el('#nim-prog-phase').textContent = 'Failed';
} else {
el('#nim-prog-title').textContent = 'Installed';
el('#nim-prog-phase').textContent = 'Done ✓ — service will appear when the container reports healthy.';
}
pollStatus();
}
// ===================== Explain context (LLM commit summary) =====================
let explainEventSource = null;
@@ -1149,6 +1600,17 @@ async function init() {
el('#ub-apply').addEventListener('click', applyUpdate);
el('#ub-explain').addEventListener('click', explainContext);
el('#dl-repo').addEventListener('input', updateDlHfLink);
el('#open-nim').addEventListener('click', openNimDialog);
el('#nim-cancel').addEventListener('click', () => el('#nim-dialog').close());
el('#nim-form').addEventListener('submit', submitNim);
el('#nim-prog-close').addEventListener('click', () => el('#nim-progress-dialog').close());
el('#open-connectivity').addEventListener('click', openConnectivityDialog);
el('#connectivity-close').addEventListener('click', () => el('#connectivity-dialog').close());
// Wake-on-LAN buttons live on unreachable hardware cards; delegate.
el('#hardware-grid').addEventListener('click', (e) => {
const btn = e.target.closest('[data-wake]');
if (btn) wakeSpark(btn.dataset.wake);
});
setupCatalogDialog();
setupAdvancedDialog();
// Open WebUI link from /api/config
@@ -1165,9 +1627,12 @@ async function init() {
await renderServices();
pollHardware();
pollUpdates();
// Disk-status probe runs after first paint — slow over SSH and not blocking.
loadDiskStatus();
setInterval(pollStatus, 5000);
setInterval(pollHardware, 8000); // every 8s
setInterval(pollUpdates, 300000); // every 5 min
setInterval(loadDiskStatus, 60000); // every 60s — disk state changes rarely
}
init();
+88 -2
View File
@@ -26,8 +26,22 @@
</section>
<section id="hardware-panel" class="hardware-panel hidden">
<h2 class="section-title">Spark hardware</h2>
<div class="section-header">
<h2 class="section-title">Spark hardware</h2>
<button id="open-connectivity" class="btn small-btn">Connectivity log</button>
</div>
<div id="hardware-grid" class="hardware-grid"></div>
<dialog id="connectivity-dialog" class="modal">
<form method="dialog" class="modal-form">
<h3>Spark connectivity history</h3>
<p class="muted small">Most recent up/down transitions per Spark. Tracked since this dashboard was installed.</p>
<div id="connectivity-content" class="connectivity-content"></div>
<div class="modal-actions">
<button type="button" id="connectivity-close" class="btn">Close</button>
</div>
</form>
</dialog>
</section>
<section id="endpoint-panel" class="endpoint-panel hidden">
@@ -76,8 +90,66 @@
</section>
<section id="services-panel" class="services hidden">
<h2 class="section-title">Always-on services</h2>
<div class="section-header">
<h2 class="section-title">Always-on services</h2>
<button id="open-nim" class="btn small-btn">+ Install NIM</button>
</div>
<div id="services-grid" class="services-grid"></div>
<dialog id="nim-dialog" class="modal">
<form method="dialog" class="modal-form" id="nim-form">
<h3>Install a NVIDIA NIM container</h3>
<p class="muted small" id="nim-key-warn"></p>
<p class="muted small">Pick a curated container below or paste any image from <a href="#" id="nim-catalog-link" target="_blank" rel="noopener">the NGC NIM catalog</a>. Spark Control will <code>docker pull</code> and <code>docker run</code> it on the target Spark.</p>
<div id="nim-suggested" class="nim-grid"></div>
<fieldset class="modal-fieldset">
<legend>Custom image</legend>
<label class="modal-row"><span>Image (nvcr.io/...)</span><input type="text" id="nim-image" placeholder="nvcr.io/nim/nvidia/<name>:latest"></label>
<label class="modal-row"><span>Container name</span><input type="text" id="nim-container" placeholder="my-service"></label>
<label class="modal-row"><span>Port</span><input type="number" id="nim-port" min="1" max="65535"></label>
<label class="modal-row"><span>Kind</span>
<select id="nim-kind">
<option value="nim">NIM (other)</option>
<option value="stt">STT (speech-to-text)</option>
<option value="tts">TTS (text-to-speech)</option>
<option value="vision">Vision</option>
<option value="embedding">Embedding</option>
</select>
</label>
<label class="modal-row"><span>Target Spark</span>
<select id="nim-host">
<option value="spark2">Spark 2 (default for support services)</option>
<option value="spark1">Spark 1 (head node)</option>
</select>
</label>
</fieldset>
<div class="modal-actions">
<button type="button" id="nim-cancel" class="btn">Cancel</button>
<button type="submit" class="btn primary" id="nim-start">Install</button>
</div>
</form>
</dialog>
<dialog id="nim-progress-dialog" class="modal">
<form method="dialog" class="modal-form">
<h3 id="nim-prog-title">Installing…</h3>
<div class="phase-row">
<div class="phase" id="nim-prog-phase">Starting…</div>
<span class="spacer"></span>
<span class="timer" id="nim-prog-elapsed">0:00</span>
</div>
<details open>
<summary class="muted small">Log</summary>
<pre id="nim-prog-log" class="log"></pre>
</details>
<div class="modal-actions">
<button type="button" id="nim-prog-close" class="btn">Close</button>
</div>
</form>
</dialog>
</section>
<section id="models-section">
@@ -116,6 +188,20 @@
</form>
</dialog>
<dialog id="disk-delete-dialog" class="modal">
<form method="dialog" class="modal-form">
<h3>Delete model weights from disk?</h3>
<p id="dd-summary" class="muted small"></p>
<ul class="muted small dd-hosts" id="dd-hosts"></ul>
<p class="muted small">This is reversible — you can re-download from the catalog at any time. The catalog entry stays intact.</p>
<p id="dd-error" class="muted small dd-error hidden"></p>
<div class="modal-actions">
<button type="button" id="dd-cancel" class="btn">Cancel</button>
<button type="button" id="dd-confirm" class="btn danger">Delete from disk</button>
</div>
</form>
</dialog>
<dialog id="advanced-dialog" class="modal">
<form method="dialog" class="modal-form" id="advanced-form">
<h3 id="adv-title">Advanced settings</h3>
+107 -1
View File
@@ -376,6 +376,45 @@ main {
.hw-card .head .meta { color: var(--muted); font-size: 12px; margin-left: auto; }
.hw-card.unreachable { border-color: rgba(239, 68, 68, 0.4); }
.hw-card.unreachable .name { color: var(--error); }
.hw-card.unreachable ol { color: var(--muted); }
.hw-card .wol-row {
margin-top: 8px;
display: flex;
align-items: center;
gap: 8px;
font-size: 12px;
color: var(--muted);
}
.hw-card .wol-row .btn { padding: 5px 10px; font-size: 12px; }
.hw-card .mac-display { font-family: ui-monospace, SFMono-Regular, Menlo, monospace; }
.connectivity-content {
max-height: 360px;
overflow-y: auto;
border: 1px solid var(--border);
border-radius: 6px;
padding: 10px;
background: var(--surface-2);
}
.conn-spark { margin-bottom: 16px; }
.conn-spark h4 { font-size: 13px; margin: 0 0 8px; color: var(--text); }
.conn-event {
font-size: 12px;
display: flex;
gap: 10px;
padding: 4px 0;
border-bottom: 1px solid rgba(255,255,255,0.04);
font-family: ui-monospace, SFMono-Regular, Menlo, monospace;
}
.conn-event:last-child { border-bottom: 0; }
.conn-event .when { color: var(--muted); flex-shrink: 0; }
.conn-event .what { flex: 1; }
.conn-event.up .what { color: var(--accent); }
.conn-event.down .what { color: var(--error); }
.conn-event.report .what { font-style: italic; }
.conn-event .muted { color: var(--muted); font-style: normal; }
.conn-event .dur { color: var(--muted); }
.conn-summary { color: var(--muted); font-size: 11px; padding: 4px 0 10px; }
.hw-metric { display: flex; align-items: center; gap: 10px; font-size: 12px; }
.hw-metric .label { color: var(--muted); width: 56px; flex-shrink: 0; text-transform: uppercase; letter-spacing: 0.05em; font-size: 11px; }
.hw-metric .bar { flex: 1; height: 8px; background: var(--surface-2); border-radius: 4px; overflow: hidden; position: relative; }
@@ -477,6 +516,37 @@ main {
#dl-log-details { margin-top: 12px; }
#dl-log-details summary { cursor: pointer; padding: 4px 0; }
/* ===== NIM install dialog ===== */
.modal#nim-dialog,
.modal#nim-progress-dialog { max-width: 640px; }
.nim-grid {
display: grid;
gap: 8px;
grid-template-columns: 1fr;
max-height: 240px;
overflow-y: auto;
margin-bottom: 4px;
}
.nim-card {
background: var(--surface-2);
border: 1px solid var(--border);
border-radius: 6px;
padding: 10px 12px;
display: flex;
gap: 10px;
align-items: flex-start;
}
.nim-card .info { flex: 1; }
.nim-card .name { font-weight: 600; font-size: 13px; }
.nim-card .desc { color: var(--muted); font-size: 12px; margin-top: 4px; }
.nim-card .img { font-family: ui-monospace, SFMono-Regular, Menlo, monospace; color: #6b6b75; font-size: 11px; margin-top: 4px; word-break: break-all; }
.nim-card .btn { padding: 6px 12px; font-size: 12px; flex-shrink: 0; }
.nim-card .links { font-size: 11px; margin-top: 4px; }
.nim-card .links a { color: var(--info); text-decoration: none; }
.nim-card .links a:hover { text-decoration: underline; }
.nim-key-warn { color: var(--warn); }
/* ===== Section titles ===== */
.section-title {
@@ -552,6 +622,19 @@ main {
.service-card .row .v.copyable.copied { outline: 1px solid var(--accent); background: rgba(74, 222, 128, 0.05); }
.service-card .row .icon-btn { padding: 3px 6px; }
.service-card .row .icon-btn svg { width: 12px; height: 12px; }
.service-card .deep-row .deep-v { display: flex; align-items: center; gap: 6px; font-family: inherit; flex-wrap: wrap; }
.service-card .dh-ok { color: var(--accent); }
.service-card .dh-fail { color: var(--error); font-weight: 500; }
.service-card .dh-run-btn { font-family: inherit; }
.service-card .deep-error {
padding: 4px 8px;
background: rgba(239, 68, 68, 0.06);
border-left: 2px solid var(--error);
border-radius: 4px;
font-family: ui-monospace, SFMono-Regular, Menlo, monospace;
font-size: 11px;
word-break: break-word;
}
.service-actions {
display: flex;
@@ -631,8 +714,31 @@ main {
.card.active .btn { background: rgba(74, 222, 128, 0.12); color: var(--accent); border-color: rgba(74, 222, 128, 0.4); }
.card-actions { display: flex; gap: 6px; }
.card-actions .btn.primary { flex: 1; }
.card .adv-btn { padding: 8px 12px; font-size: 12px; }
.card .adv-btn,
.card .test-btn { padding: 8px 12px; font-size: 12px; }
.card .custom-pill { color: var(--info); border-color: rgba(96, 165, 250, 0.4); }
.tag.on-disk { color: var(--accent); border-color: rgba(74, 222, 128, 0.4); }
.tag.not-on-disk { color: var(--muted); border-color: var(--border); opacity: 0.7; }
.card-actions .icon-btn.danger { color: var(--error); border-color: rgba(239, 68, 68, 0.3); margin-left: auto; }
.card-actions .icon-btn.danger:hover:not(:disabled) { background: rgba(239, 68, 68, 0.08); border-color: var(--error); color: var(--error); }
.card-actions .icon-btn.danger:disabled { opacity: 0.35; cursor: not-allowed; }
.dd-hosts { padding-left: 18px; margin: 4px 0 8px; }
.dd-hosts code { background: var(--surface-2); padding: 1px 5px; border-radius: 4px; }
.dd-error { color: var(--error); }
.test-result {
font-size: 12px;
line-height: 1.45;
padding: 8px 10px;
border-radius: 5px;
margin-top: 4px;
border: 1px solid var(--border);
background: var(--surface-2);
}
.test-result.ok { border-color: rgba(74, 222, 128, 0.4); background: rgba(74, 222, 128, 0.04); }
.test-result.fail { border-color: rgba(239, 68, 68, 0.45); background: rgba(239, 68, 68, 0.06); word-break: break-word; }
.test-result .ok-mark { color: var(--accent); font-weight: 600; }
.test-result .fail-mark { color: var(--error); font-weight: 600; }
.footer {
margin-top: 28px;
+137
View File
@@ -0,0 +1,137 @@
"""Pre-flight validation of a proposed vLLM launch command.
Runs vLLM's own argparse layer (EngineArgs) inside the vllm_node container WITHOUT
starting the engine. Catches:
* unknown flag names (typos)
* bad types / values that argparse rejects
* deprecated flags removed in the installed vLLM version
Does NOT catch (these surface only during real engine init):
* model-architecture-specific constraints (e.g. Qwen3.6 Mamba block_size)
* OOM at weight-loading time
* Triton / CUDA-kernel compatibility errors
A pre-flight check that returns "ok" is therefore NOT a guarantee — but a
"failed" verdict is a definitive 'don't bother with the real swap'.
"""
from __future__ import annotations
import json
import shlex
from typing import Any
from .config import Settings
from .models import Catalog, build_launch_command
from .ssh import ssh_run
# Validates the proposed args against the same combined parser vLLM uses for
# `vllm serve` (engine args + server args + frontend args). Returns one JSON
# line on stdout: {"ok": true, ...} or {"ok": false, ...}.
_VALIDATOR_SCRIPT = r"""
import argparse, json, sys
# Mirror what `vllm serve` does internally: FlexibleArgumentParser (which is
# more lenient about dashes vs underscores) wrapped with make_arg_parser
# (which adds engine + server + frontend args).
parser = None
try:
# Newer vLLM path
from vllm.utils.argparse_utils import FlexibleArgumentParser
except Exception:
try:
# Older fallback
from vllm.engine.arg_utils import FlexibleArgumentParser
except Exception:
FlexibleArgumentParser = argparse.ArgumentParser # type: ignore
try:
from vllm.entrypoints.openai.cli_args import make_arg_parser
parser = make_arg_parser(FlexibleArgumentParser(add_help=False))
except Exception:
pass
if parser is None:
try:
from vllm.engine.arg_utils import EngineArgs
parser = FlexibleArgumentParser(add_help=False)
EngineArgs.add_cli_args(parser)
except Exception as e:
print(json.dumps({"ok": False, "stage": "import", "error": f"{type(e).__name__}: {e}"}))
sys.exit(0)
class _ArgError(Exception):
pass
def _err(message):
raise _ArgError(message)
parser.error = _err # capture argparse errors instead of sys.exit(2)
try:
raw = sys.stdin.read()
arglist = json.loads(raw)
ns = parser.parse_args(arglist)
print(json.dumps({"ok": True, "model": getattr(ns, "model", None)}))
except _ArgError as e:
print(json.dumps({"ok": False, "stage": "parse", "error": str(e)}))
except SystemExit as e:
print(json.dumps({"ok": False, "stage": "parse", "error": f"argparse exit {e.code}"}))
except Exception as e:
print(json.dumps({"ok": False, "stage": "parse", "error": f"{type(e).__name__}: {e}"}))
"""
def _vllm_arg_list(key: str, model_def, catalog: Catalog) -> list[str]:
"""Reconstruct the args list passed to `vllm serve` (without the positional model)."""
cmd = build_launch_command(key, model_def, catalog.defaults)
# build_launch_command yields:
# ./launch-cluster.sh [--solo] -d exec vllm serve <repo> <args...>
# We just want the bits after `vllm serve <repo>`.
tokens = shlex.split(cmd)
if "serve" not in tokens:
return []
i = tokens.index("serve")
after = tokens[i + 1 :] # repo, then args
if not after:
return []
args = after[1:] # drop the repo
# EngineArgs expects --model=REPO rather than positional, so prepend it.
return [f"--model={after[0]}", *args]
async def validate_launch(key: str, catalog: Catalog, settings: Settings) -> dict:
if key not in catalog.models:
return {"ok": False, "stage": "lookup", "error": f"unknown model: {key}"}
if not settings.spark1_host or not settings.spark1_user:
return {"ok": False, "stage": "config", "error": "spark1 not configured"}
model = catalog.models[key]
arg_list = _vllm_arg_list(key, model, catalog)
if not arg_list:
return {"ok": False, "stage": "build", "error": "failed to build args list"}
payload = json.dumps(arg_list).replace("'", "'\\''")
# Pipe the JSON args list to a here-doc Python invocation. The validator
# reads from stdin to avoid shell-escaping the args themselves.
cmd = (
f"echo '{payload}' | docker exec -i vllm_node python3 -c "
+ shlex.quote(_VALIDATOR_SCRIPT)
)
rc, out, err = await ssh_run(settings.spark1_host, settings.spark1_user, cmd, settings, timeout=20)
if rc != 0 and not out.strip():
return {
"ok": False,
"stage": "ssh",
"error": err.strip() or f"rc={rc}",
"cmd_args": arg_list,
"launch_cmd": build_launch_command(key, model, catalog.defaults),
}
last = out.strip().splitlines()[-1] if out.strip() else ""
try:
result: dict[str, Any] = json.loads(last)
except json.JSONDecodeError:
result = {"ok": False, "stage": "decode", "error": "validator did not return JSON", "raw": out[-500:]}
result["cmd_args"] = arg_list
result["launch_cmd"] = build_launch_command(key, model, catalog.defaults)
return result
+69
View File
@@ -0,0 +1,69 @@
"""Wake-on-LAN.
Two delivery paths, tried in order:
1. SSH into the other Spark and have IT broadcast — most reliable because the
packet originates from the same LAN subnet as the sleeping Spark.
2. Direct UDP broadcast from this container. May or may not work depending
on the StartOS container's network namespace.
The DGX Spark's NIC must have WoL enabled in firmware/OS for either path to
actually wake the box; this module just delivers the magic packet correctly.
"""
from __future__ import annotations
import asyncio
import re
import socket
from .config import Settings
from .ssh import ssh_run
_MAC_RE = re.compile(r"^[0-9a-fA-F]{2}([:-]?[0-9a-fA-F]{2}){5}$")
def normalize_mac(mac: str) -> str:
mac = mac.strip().lower()
if not _MAC_RE.match(mac):
raise ValueError(f"invalid MAC address: {mac!r}")
return mac.replace("-", ":")
def build_magic_packet(mac: str) -> bytes:
mac_bytes = bytes.fromhex(normalize_mac(mac).replace(":", ""))
return b"\xff" * 6 + mac_bytes * 16
def send_local_broadcast(mac: str, broadcast: str = "255.255.255.255", port: int = 9) -> None:
"""Send from THIS container. May not reach the LAN in some topologies."""
pkt = build_magic_packet(mac)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
s.sendto(pkt, (broadcast, port))
# Also send to port 7 (alternate WoL convention) for safety
s.sendto(pkt, (broadcast, 7))
finally:
s.close()
async def send_via_peer(host: str, user: str, mac: str, settings: Settings) -> tuple[bool, str]:
"""Use a different (reachable) Spark to send the WoL packet to its peer.
Uses Python 3 (always present on the Sparks for vLLM) to avoid depending on
wakeonlan / etherwake being installed.
"""
normalized = normalize_mac(mac)
mac_hex = normalized.replace(":", "")
py = (
"python3 -c \""
"import socket; "
f"m=bytes.fromhex('{mac_hex}'); "
"s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM); "
"s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1); "
"s.sendto(b'\\xff'*6 + m*16, ('255.255.255.255', 9)); "
"s.sendto(b'\\xff'*6 + m*16, ('255.255.255.255', 7)); "
"print('sent')\""
)
rc, out, err = await ssh_run(host, user, py, settings, timeout=8)
return rc == 0 and "sent" in out, (err.strip() or out.strip() or f"rc={rc}")
+3
View File
@@ -30,6 +30,7 @@ models:
- -tp=2
- --distributed-executor-backend=ray
- --max-model-len=32768
- --max-num-batched-tokens=16384
gemma4:
display_name: "Gemma 4 31B"
@@ -45,6 +46,7 @@ models:
vllm_args:
- --gpu-memory-utilization=0.8
- --max-model-len=32768
- --max-num-batched-tokens=16384
- --reasoning-parser=gemma4
- --tool-call-parser=gemma4
- --enable-auto-tool-choice
@@ -66,6 +68,7 @@ models:
vllm_args:
- --gpu-memory-utilization=0.85
- --max-model-len=65536
- --max-num-batched-tokens=16384
- --reasoning-parser=qwen3
- --moe_backend=flashinfer_cutlass
- --load-format=fastsafetensors
+8
View File
@@ -20,6 +20,14 @@ The trick is the `docker run --rm alpine chown` — it runs as root inside the t
This flag is Blackwell-specific. If vLLM in the container reports `unrecognized arguments: --moe_backend` or similar, edit `models.yaml` for `qwen36` and drop that flag. The swap UI does NOT auto-fallback in v0.1 — failure surfaces in the log stream.
## Qwen3.6 Mamba block-size assertion (fixed in v0.6.0:1)
Qwen3.6 uses a Mamba-attention hybrid that requires `--max-num-batched-tokens >= 2096`. vLLM's default is 2048, which trips `AssertionError: In Mamba cache align mode, block_size (2096) must be <= max_num_batched_tokens (2048)`. Fix: bake `--max-num-batched-tokens=16384` into the bundled qwen36 entry — matches the upstream qwen3.5-35b-a3b-fp8 recipe.
## Multimodal token budget for vision models (fixed in v0.8.0:1)
After the eugr/spark-vllm-docker update, vLLM became stricter about multimodal token budgets. Vision-capable models like Gemma 4 31B and Qwen3-VL crash at engine init with `ValueError: Chunked MM input disabled but max_tokens_per_mm_item (2496) is larger than max_num_batched_tokens (2048)`. Fix: bake `--max-num-batched-tokens=16384` into every model that has the `vision` capability. Now applied to qwen3-vl, gemma4, and qwen36 (which was already set for the Mamba issue).
## Two SSH paths to Spark 1 from the laptop
`ssh <spark-user>@<spark-1-ip>` does NOT work from the laptop because the NVIDIA Sync ssh_config only has a Host entry for `<spark-1-host>.local`. Always use the `.local` hostname or `<spark-2-ip>`-style entries that ARE matched.
@@ -85,6 +85,15 @@ const inputSpec = InputSpec.of({
placeholder: 'e.g. https://open-webui.yourserver.local',
masked: false,
}),
ngc_api_key: Value.text({
name: 'NGC API key (optional)',
description:
'NVIDIA NGC personal API key — needed to install NIM containers (Parakeet, Magpie, etc.) from nvcr.io. Get one free at https://ngc.nvidia.com/setup/personal-key. Stored only on this Start9 server; passed to docker as the NGC_API_KEY env var when installing NIM services.',
required: false,
default: null,
placeholder: 'starts with "nvapi-..."',
masked: true,
}),
})
export const configureSparks = sdk.Action.withInput(
@@ -16,6 +16,8 @@ export const sparkConfigSchema = z.object({
magpie_container: z.string().catch(''),
// Optional Open WebUI deep-link
open_webui_url: z.string().catch(''),
// Optional NGC API key for pulling NIM containers from nvcr.io/nim/...
ngc_api_key: z.string().catch(''),
})
export type SparkConfig = z.infer<typeof sparkConfigSchema>
+4
View File
@@ -20,6 +20,7 @@ export const main = sdk.setupMain(async ({ effects }) => {
magpie_user: '',
magpie_container: '',
open_webui_url: '',
ngc_api_key: '',
}
return sdk.Daemons.of(effects).addDaemon('primary', {
@@ -48,7 +49,10 @@ export const main = sdk.setupMain(async ({ effects }) => {
MAGPIE_USER: cfg.magpie_user,
MAGPIE_CONTAINER: cfg.magpie_container,
MODELS_OVERRIDES: '/data/models-overrides.yaml',
SERVICES_OVERRIDES: '/data/services-overrides.yaml',
CONNECTIVITY_LOG: '/data/connectivity.json',
OPEN_WEBUI_URL: cfg.open_webui_url,
NGC_API_KEY: cfg.ngc_api_key,
BIND_PORT: String(uiPort),
},
},
+2 -2
View File
@@ -1,10 +1,10 @@
import { VersionInfo, IMPOSSIBLE } from '@start9labs/start-sdk'
export const v0_1_0 = VersionInfo.of({
version: '0.3.0:1',
version: '0.8.1:0',
releaseNotes: {
en_US:
'v0.3: Spark hardware dashboard (RAM, disk, GPU memory + utilization, CPU load, uptime per Spark). Per-model Advanced settings now show plain-English hints tied to your actual GPU memory (e.g. "0.85 GPU util leaves ~18 GB free"). "Explain context" button on the update banner asks the loaded LLM to summarize pending commits in plain English. Optional Open WebUI URL in Configure Sparks shows a one-click "Open chat" button in the top bar. Downloads can now target Spark 1, Spark 2, or both. Each model card links out to its Hugging Face page.',
'v0.8.1: model weights can now be deleted from disk directly from the dashboard. Each model card shows whether the weights are present (with on-disk GB size) or not yet downloaded. When present and the model is NOT currently loaded, a small trash icon appears on the card; clicking it pops a confirmation showing how many GB will be freed and on which Spark(s), then runs `rm -rf` on the Hugging Face cache directory via SSH. Cluster-mode models are deleted from both Sparks; solo-mode from Spark 1 only. Safety rails: refuses to delete the currently-loaded model, refuses during an in-flight swap or download, and the catalog entry stays — you can always re-download. Disk status is probed once on dashboard load and re-checked every 60s.',
},
migrations: {
up: async ({ effects }) => {},