Files
spark-control/image/app/hardware.py
T
Grant 1889ab45fb v0.4.0 - NIM installer + dashboard resilience
Hotfix (was v0.3.1):
- services.py: cache 'unreachable' per (host,user) for 25s so a dead Spark doesn't hang every /api/services call behind 6s ssh timeout
- ssh_run timeout reduced 10 -> 6s for docker_state probes
- hardware probe: shorter SSH timeout (6s), longer cache TTL for failures (25s)
- JS pollStatus retries loadModels() if state.models is empty (recovers from cold-start proxy timeout)
- Unreachable hardware card now includes troubleshooting steps (Spark Control cannot SSH into an unreachable Spark to restart it)

v0.4 NIM installer:
- nim.py module: curated SUGGESTED_NIMS list (Parakeet, Magpie, Riva) + NimManager that runs docker login nvcr.io + docker pull + docker run -d --gpus all -p PORT:PORT -v VOL:/opt/nim/.cache -e NGC_API_KEY -e ... --restart=unless-stopped + chown the volume to uid 1000 + restart. Streams all output via SSE; redacts the API key from log lines.
- custom_services.py: persists installed NIMs to /data/services-overrides.yaml so they appear in the services panel after install
- services.py: merges custom services into the panel
- /api/nim/catalog GET, /api/nim/install POST + GET/SSE
- /api/services/{name} DELETE for custom services
- UI: '+ Install NIM' button next to 'Always-on services'; modal lists curated images each with a 'Pick' button + a custom-image form; installation runs in a second dialog with phase + elapsed timer + collapsible log
- NGC API key field added to Configure Sparks (masked); injected as NGC_API_KEY env var into the container

Package: bump 0.4.0:0; main.ts adds SERVICES_OVERRIDES + NGC_API_KEY env vars
2026-05-12 12:32:29 -05:00

130 lines
5.3 KiB
Python

"""Per-Spark hardware snapshots: RAM, disk, GPU memory + utilization, CPU load, uptime.
Drives via a single SSH command per Spark that runs `free`, `df`, `nvidia-smi`,
`/proc/loadavg`, and `uptime -p` and prints labeled lines back. We parse those
labels in `_parse`.
"""
from __future__ import annotations
import asyncio
import time
from typing import Any
from .config import Settings
from .ssh import ssh_run
_PROBE = r"""
set -e
echo HOSTNAME=$(hostname)
echo UPTIME=$(uptime -p 2>/dev/null || uptime)
echo LOAD=$(awk '{print $1, $2, $3}' /proc/loadavg)
echo CORES=$(nproc 2>/dev/null || echo 0)
echo MEMORY=$(free -b 2>/dev/null | awk '/^Mem:/ {print $2, $3}')
echo DISK=$(df -B1 / 2>/dev/null | awk 'NR==2 {print $2, $3}')
echo GPU=$(nvidia-smi --query-gpu=name,utilization.gpu,temperature.gpu,power.draw,memory.total --format=csv,noheader,nounits 2>/dev/null | head -1)
echo GPU_MEM_USED_MIB=$(nvidia-smi --query-compute-apps=used_gpu_memory --format=csv,noheader,nounits 2>/dev/null | awk '{s+=$1} END {print s+0}')
""".strip()
def _parse_int(s: str) -> int | None:
try: return int(s)
except (TypeError, ValueError): return None
def _parse(out: str) -> dict:
info: dict[str, Any] = {}
for raw in out.splitlines():
if "=" not in raw:
continue
k, v = raw.split("=", 1)
info[k.strip().lower()] = v.strip()
parsed: dict[str, Any] = {}
parsed["hostname"] = info.get("hostname")
parsed["uptime"] = info.get("uptime")
parsed["cores"] = _parse_int(info.get("cores", ""))
# Load average -> (1m, 5m, 15m)
if info.get("load"):
loads = info["load"].split()
try:
parsed["load"] = [float(x) for x in loads[:3]]
except ValueError:
parsed["load"] = None
# Memory: total used in bytes
if info.get("memory"):
mem = info["memory"].split()
if len(mem) == 2:
tot, used = _parse_int(mem[0]), _parse_int(mem[1])
parsed["ram_total_bytes"] = tot
parsed["ram_used_bytes"] = used
# Disk: total used in bytes
if info.get("disk"):
dk = info["disk"].split()
if len(dk) == 2:
parsed["disk_total_bytes"] = _parse_int(dk[0])
parsed["disk_used_bytes"] = _parse_int(dk[1])
# GPU: "name, util_gpu, temp_C, power_W, memory_total_MiB"
if info.get("gpu"):
parts = [p.strip() for p in info["gpu"].split(",")]
if len(parts) >= 5:
name, ug, temp, power, mt = parts[0], parts[1], parts[2], parts[3], parts[4]
parsed["gpu_name"] = name
parsed["gpu_util_pct"] = _parse_int(ug)
parsed["gpu_temp_c"] = _parse_int(temp)
try: parsed["gpu_power_w"] = float(power)
except ValueError: parsed["gpu_power_w"] = None
# memory.total may be "[N/A]" on unified-memory systems (DGX Spark)
parsed["gpu_mem_total_mib"] = _parse_int(mt)
parsed["gpu_unified_memory"] = parsed["gpu_mem_total_mib"] is None
# Sum per-process compute memory (works even on unified-memory systems)
if info.get("gpu_mem_used_mib"):
parsed["gpu_mem_used_mib"] = _parse_int(info["gpu_mem_used_mib"])
return parsed
class HardwareProbe:
"""Caches results briefly to avoid hammering the Sparks."""
def __init__(self, settings: Settings, ttl_sec: float = 4.0, fail_ttl_sec: float = 25.0) -> None:
self.settings = settings
self.ttl_sec = ttl_sec
self.fail_ttl_sec = fail_ttl_sec
self._cache: dict[str, tuple[float, dict]] = {}
self._locks: dict[str, asyncio.Lock] = {}
def _ttl_for(self, value: dict) -> float:
return self.ttl_sec if value.get("reachable") else self.fail_ttl_sec
def _lock(self, key: str) -> asyncio.Lock:
if key not in self._locks:
self._locks[key] = asyncio.Lock()
return self._locks[key]
async def fetch(self) -> dict:
s1, s2 = await asyncio.gather(
self._one("spark1", self.settings.spark1_host, self.settings.spark1_user),
self._one("spark2", self.settings.spark2_host, self.settings.spark2_user),
)
return {"spark1": s1, "spark2": s2}
async def _one(self, key: str, host: str, user: str) -> dict:
if not host or not user:
return {"reachable": False, "configured": False}
async with self._lock(key):
now = time.monotonic()
cached = self._cache.get(key)
if cached and (now - cached[0] < self._ttl_for(cached[1])):
return cached[1]
# Use a shorter timeout for the connect phase; if a previous probe
# marked this host unreachable, return the cached failure immediately.
rc, out, err = await ssh_run(host, user, _PROBE, self.settings, timeout=6)
if rc != 0:
# Cache failures for a slightly longer TTL so the dashboard isn't
# blocked behind 6 s of SSH timeout on every poll.
result = {"reachable": False, "configured": True, "host": host, "error": err.strip() or out.strip() or f"rc={rc}"}
self._cache[key] = (now, result)
# Override the TTL effectively by inserting a sentinel into the cache age
return result
result = {"reachable": True, "configured": True, "host": host, **_parse(out)}
self._cache[key] = (now, result)
return result