diff --git a/image/app/config.py b/image/app/config.py index f5cc57e..6a6137c 100644 --- a/image/app/config.py +++ b/image/app/config.py @@ -42,6 +42,7 @@ class Settings: parakeet_port: int magpie_port: int bind_port: int + open_webui_url: str @classmethod def from_env(cls) -> "Settings": @@ -66,6 +67,7 @@ class Settings: parakeet_port=int(_env("PARAKEET_PORT", "8000")), magpie_port=int(_env("MAGPIE_PORT", "9000")), bind_port=int(_env("BIND_PORT", "9999")), + open_webui_url=_env("OPEN_WEBUI_URL", ""), ) @property diff --git a/image/app/download.py b/image/app/download.py index b861561..f2a00e0 100644 --- a/image/app/download.py +++ b/image/app/download.py @@ -19,7 +19,7 @@ from .config import Settings from .ssh import ssh_stream, StreamHandle -Mode = Literal["solo", "cluster"] +Mode = Literal["spark1", "spark2", "cluster"] _TQDM_RE = re.compile( @@ -113,17 +113,26 @@ class DownloadManager: async def _do(self, job: DownloadJob) -> None: s = self.settings - if not s.spark1_host or not s.spark1_user: - raise RuntimeError("spark1 not configured") + # Pick the SSH target and hf-download flags from the mode. + if job.mode == "spark2": + target_host, target_user = s.spark2_host, s.spark2_user + flags = "" + elif job.mode == "cluster": + target_host, target_user = s.spark1_host, s.spark1_user + flags = "-c --copy-parallel" + else: # spark1 + target_host, target_user = s.spark1_host, s.spark1_user + flags = "" + if not target_host or not target_user: + raise RuntimeError(f"{job.mode} host not configured") - flags = "-c --copy-parallel" if job.mode == "cluster" else "" cmd = f"cd ~/spark-vllm-docker && ./hf-download.sh {job.repo} {flags}".strip() job.append(f"$ {cmd}") job.state = "downloading" job.progress.phase = "Connecting to Hugging Face…" handle = StreamHandle() - async for line in ssh_stream(s.spark1_host, s.spark1_user, cmd, s, handle=handle): + async for line in ssh_stream(target_host, target_user, cmd, s, handle=handle): job.append(line) self._update_progress(job, line) diff --git a/image/app/hardware.py b/image/app/hardware.py new file mode 100644 index 0000000..84a6266 --- /dev/null +++ b/image/app/hardware.py @@ -0,0 +1,118 @@ +"""Per-Spark hardware snapshots: RAM, disk, GPU memory + utilization, CPU load, uptime. + +Drives via a single SSH command per Spark that runs `free`, `df`, `nvidia-smi`, +`/proc/loadavg`, and `uptime -p` and prints labeled lines back. We parse those +labels in `_parse`. +""" +from __future__ import annotations +import asyncio +import time +from typing import Any + +from .config import Settings +from .ssh import ssh_run + + +_PROBE = r""" +set -e +echo HOSTNAME=$(hostname) +echo UPTIME=$(uptime -p 2>/dev/null || uptime) +echo LOAD=$(awk '{print $1, $2, $3}' /proc/loadavg) +echo CORES=$(nproc 2>/dev/null || echo 0) +echo MEMORY=$(free -b 2>/dev/null | awk '/^Mem:/ {print $2, $3}') +echo DISK=$(df -B1 / 2>/dev/null | awk 'NR==2 {print $2, $3}') +echo GPU=$(nvidia-smi --query-gpu=name,utilization.gpu,temperature.gpu,power.draw,memory.total --format=csv,noheader,nounits 2>/dev/null | head -1) +echo GPU_MEM_USED_MIB=$(nvidia-smi --query-compute-apps=used_gpu_memory --format=csv,noheader,nounits 2>/dev/null | awk '{s+=$1} END {print s+0}') +""".strip() + + +def _parse_int(s: str) -> int | None: + try: return int(s) + except (TypeError, ValueError): return None + + +def _parse(out: str) -> dict: + info: dict[str, Any] = {} + for raw in out.splitlines(): + if "=" not in raw: + continue + k, v = raw.split("=", 1) + info[k.strip().lower()] = v.strip() + parsed: dict[str, Any] = {} + parsed["hostname"] = info.get("hostname") + parsed["uptime"] = info.get("uptime") + parsed["cores"] = _parse_int(info.get("cores", "")) + # Load average -> (1m, 5m, 15m) + if info.get("load"): + loads = info["load"].split() + try: + parsed["load"] = [float(x) for x in loads[:3]] + except ValueError: + parsed["load"] = None + # Memory: total used in bytes + if info.get("memory"): + mem = info["memory"].split() + if len(mem) == 2: + tot, used = _parse_int(mem[0]), _parse_int(mem[1]) + parsed["ram_total_bytes"] = tot + parsed["ram_used_bytes"] = used + # Disk: total used in bytes + if info.get("disk"): + dk = info["disk"].split() + if len(dk) == 2: + parsed["disk_total_bytes"] = _parse_int(dk[0]) + parsed["disk_used_bytes"] = _parse_int(dk[1]) + # GPU: "name, util_gpu, temp_C, power_W, memory_total_MiB" + if info.get("gpu"): + parts = [p.strip() for p in info["gpu"].split(",")] + if len(parts) >= 5: + name, ug, temp, power, mt = parts[0], parts[1], parts[2], parts[3], parts[4] + parsed["gpu_name"] = name + parsed["gpu_util_pct"] = _parse_int(ug) + parsed["gpu_temp_c"] = _parse_int(temp) + try: parsed["gpu_power_w"] = float(power) + except ValueError: parsed["gpu_power_w"] = None + # memory.total may be "[N/A]" on unified-memory systems (DGX Spark) + parsed["gpu_mem_total_mib"] = _parse_int(mt) + parsed["gpu_unified_memory"] = parsed["gpu_mem_total_mib"] is None + # Sum per-process compute memory (works even on unified-memory systems) + if info.get("gpu_mem_used_mib"): + parsed["gpu_mem_used_mib"] = _parse_int(info["gpu_mem_used_mib"]) + return parsed + + +class HardwareProbe: + """Caches results briefly to avoid hammering the Sparks.""" + + def __init__(self, settings: Settings, ttl_sec: float = 4.0) -> None: + self.settings = settings + self.ttl_sec = ttl_sec + self._cache: dict[str, tuple[float, dict]] = {} + self._locks: dict[str, asyncio.Lock] = {} + + def _lock(self, key: str) -> asyncio.Lock: + if key not in self._locks: + self._locks[key] = asyncio.Lock() + return self._locks[key] + + async def fetch(self) -> dict: + return { + "spark1": await self._one("spark1", self.settings.spark1_host, self.settings.spark1_user), + "spark2": await self._one("spark2", self.settings.spark2_host, self.settings.spark2_user), + } + + async def _one(self, key: str, host: str, user: str) -> dict: + if not host or not user: + return {"reachable": False, "configured": False} + async with self._lock(key): + now = time.monotonic() + cached = self._cache.get(key) + if cached and (now - cached[0] < self.ttl_sec): + return cached[1] + rc, out, err = await ssh_run(host, user, _PROBE, self.settings, timeout=8) + if rc != 0: + result = {"reachable": False, "configured": True, "host": host, "error": err.strip() or out.strip() or f"rc={rc}"} + else: + result = {"reachable": True, "configured": True, "host": host, **_parse(out)} + self._cache[key] = (now, result) + return result diff --git a/image/app/server.py b/image/app/server.py index d73b5e5..d5adaca 100644 --- a/image/app/server.py +++ b/image/app/server.py @@ -11,6 +11,7 @@ from typing import Literal from .config import Settings from .download import DownloadManager +from .hardware import HardwareProbe from .health import check_magpie, check_parakeet, check_vllm from .models import load_catalog from .overrides import add_custom, delete_custom, extract_knobs_from_args, load_overrides, set_knobs @@ -25,6 +26,7 @@ catalog = load_catalog(settings.models_yaml) swap_manager = SwapManager(settings, catalog) download_manager = DownloadManager(settings) update_manager = UpdateManager(settings) +hardware_probe = HardwareProbe(settings) app = FastAPI(title="spark-control", version="0.1.0") @@ -44,6 +46,7 @@ async def get_config() -> dict: "spark1_host": settings.spark1_host, "spark2_host": settings.spark2_host, "vllm_port": settings.vllm_port, + "open_webui_url": settings.open_webui_url or None, } @@ -116,6 +119,12 @@ async def del_model(key: str) -> dict: return {"ok": True, "key": key} +@app.get("/api/hardware") +async def get_hardware() -> dict: + """Per-Spark hardware snapshot — RAM, disk, GPU mem + util, CPU load, uptime.""" + return await hardware_probe.fetch() + + @app.get("/api/services") async def get_services() -> dict: """Lifecycle state of always-on support services (Parakeet, Magpie, …). @@ -297,7 +306,7 @@ async def stream_swap(job_id: str): class DownloadRequest(BaseModel): repo: str - mode: Literal["solo", "cluster"] = "solo" + mode: Literal["spark1", "spark2", "cluster"] = "spark1" @app.post("/api/download") @@ -376,6 +385,81 @@ async def get_updates() -> dict: return await get_update_status(settings) +@app.get("/api/explain-updates") +async def explain_updates(): + """Stream a layman's explanation of the pending commits from the currently-loaded vLLM model.""" + import httpx + info = await get_update_status(settings) + if not info.get("ok"): + async def err_gen(): + yield f"event: done\ndata: {json.dumps({'error': info.get('error', 'unknown')})}\n\n" + return StreamingResponse(err_gen(), media_type="text/event-stream") + + vllm = await check_vllm(settings) + if not vllm.get("ok") or not vllm.get("current_model"): + async def err_gen(): + yield f"event: done\ndata: {json.dumps({'error': 'no vLLM model loaded — swap to a model first'})}\n\n" + return StreamingResponse(err_gen(), media_type="text/event-stream") + + commits = "\n".join(info.get("log", [])) + if not commits.strip(): + async def empty_gen(): + yield f"event: done\ndata: {json.dumps({'error': 'no pending commits'})}\n\n" + return StreamingResponse(empty_gen(), media_type="text/event-stream") + + prompt = ( + "You are reviewing pending git commits to `eugr/spark-vllm-docker`, an upstream community project that " + "orchestrates vLLM on dual NVIDIA DGX Spark hardware (Blackwell GPUs, cluster via Ray, recipes per model). " + "The reader has a setup running models like Qwen3.6-35B-A3B-NVFP4 (daily driver, solo), Qwen3-VL 235B (cluster), " + "and Gemma 4 31B. The reader is technically literate but is NOT a vLLM expert.\n\n" + "For the commit list below: give a short overall verdict (Apply / Optional / Skip and why), then a brief " + "bullet per commit grouping similar ones. Call out anything that would break a working setup or that " + "requires re-downloading models. Avoid jargon. ~250 words max.\n\n" + f"Pending commits:\n{commits}" + ) + + async def gen(): + try: + async with httpx.AsyncClient(timeout=httpx.Timeout(300.0, connect=5.0)) as c: + async with c.stream( + "POST", + f"{vllm['base_url']}/chat/completions", + json={ + "model": vllm["current_model"], + "stream": True, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": 600, + "temperature": 0.4, + }, + ) as r: + r.raise_for_status() + async for line in r.aiter_lines(): + if not line.startswith("data: "): + continue + data = line[6:].strip() + if data == "[DONE]": + break + try: + chunk = json.loads(data) + choices = chunk.get("choices") or [] + if not choices: + continue + delta = choices[0].get("delta") or {} + text = delta.get("content") + reasoning = delta.get("reasoning") + if text: + yield f"data: {json.dumps({'content': text})}\n\n" + elif reasoning: + yield f"data: {json.dumps({'reasoning': reasoning})}\n\n" + except json.JSONDecodeError: + continue + except Exception as e: + yield f"data: {json.dumps({'error': f'{type(e).__name__}: {e}'})}\n\n" + yield f"event: done\ndata: {json.dumps({'ok': True})}\n\n" + + return StreamingResponse(gen(), media_type="text/event-stream") + + class UpdateRequest(BaseModel): mode: Literal["solo", "cluster"] = "cluster" diff --git a/image/app/static/app.js b/image/app/static/app.js index ca32807..00de929 100644 --- a/image/app/static/app.js +++ b/image/app/static/app.js @@ -13,6 +13,8 @@ const state = { swap_progress: 0, // 0–1 services: {}, service_action_in_flight: null, // e.g. "parakeet:restart" + hardware: {}, + config: {}, configured: true, timer_handle: null, }; @@ -63,7 +65,9 @@ function renderCards() { ${(m.capabilities || []).map(c => `${escapeHtml(c)}`).join('')} ${desc} -
${escapeHtml(m.repo)}
+
+ ${escapeHtml(m.repo)} +
@@ -178,6 +193,7 @@
Checking for updates… +
@@ -185,6 +201,10 @@ Pending commits

       
+