diff --git a/image/app/download.py b/image/app/download.py new file mode 100644 index 0000000..b861561 --- /dev/null +++ b/image/app/download.py @@ -0,0 +1,165 @@ +"""Drive `./hf-download.sh ` on Spark 1 via SSH and stream progress. + +Parses `huggingface-hub` tqdm-style progress lines like: + + Downloading (incomplete total...): 8%|▏ | 2.06G/25.1G [03:20<18:35, 20.6MB/s] + +into a structured percent + bytes done / total + ETA payload that the +front-end can render as a clean progress bar. +""" +from __future__ import annotations +import asyncio +import re +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Literal, Optional + +from .config import Settings +from .ssh import ssh_stream, StreamHandle + + +Mode = Literal["solo", "cluster"] + + +_TQDM_RE = re.compile( + r"(\d+(?:\.\d+)?)\s*%\s*\|.*?\|\s*" + r"([\d.]+[KMG]?B?)\s*/\s*([\d.]+[KMG]?B?)\s*" + r"\[(\d+:\d+:?\d*)\s*<\s*(\d+:\d+:?\d*),?\s*" + r"([\d.]+\s*\w+/s)?", + re.IGNORECASE, +) + +# huggingface_hub also emits "Fetching N files: pct%|..." +_FETCHING_RE = re.compile( + r"Fetching\s+(\d+)\s+files:\s+(\d+(?:\.\d+)?)\s*%", + re.IGNORECASE, +) + + +@dataclass +class DownloadProgress: + percent: float = 0.0 + downloaded: str = "" + total: str = "" + elapsed: str = "" + eta: str = "" + rate: str = "" + phase: str = "Starting…" + + +@dataclass +class DownloadJob: + id: str + repo: str + mode: Mode + started_at: str + state: str = "starting" # starting | downloading | copying | done | failed + lines: list[str] = field(default_factory=list) + progress: DownloadProgress = field(default_factory=DownloadProgress) + returncode: Optional[int] = None + finished_at: Optional[str] = None + + def append(self, line: str) -> None: + self.lines.append(line) + if len(self.lines) > 800: + del self.lines[: len(self.lines) - 800] + + +class DownloadManager: + def __init__(self, settings: Settings) -> None: + self.settings = settings + self.lock = asyncio.Lock() + self.jobs: dict[str, DownloadJob] = {} + self.current_job_id: Optional[str] = None + + def get(self, job_id: str) -> DownloadJob | None: + return self.jobs.get(job_id) + + async def trigger(self, repo: str, mode: Mode) -> DownloadJob: + if not repo or "/" not in repo: + raise ValueError("repo must be in 'org/name' form") + if self.lock.locked(): + raise RuntimeError("A download is already in progress") + job = DownloadJob( + id=uuid.uuid4().hex[:8], + repo=repo, + mode=mode, + started_at=datetime.now(timezone.utc).isoformat(), + ) + self.jobs[job.id] = job + self.current_job_id = job.id + asyncio.create_task(self._run(job)) + return job + + async def _run(self, job: DownloadJob) -> None: + async with self.lock: + try: + await self._do(job) + if job.state != "failed": + job.state = "done" + job.returncode = 0 + job.progress.percent = 100.0 + job.progress.phase = "Done" + except Exception as e: + job.append(f"[error] {type(e).__name__}: {e}") + job.state = "failed" + if job.returncode is None: + job.returncode = 1 + finally: + job.finished_at = datetime.now(timezone.utc).isoformat() + if self.current_job_id == job.id: + self.current_job_id = None + + async def _do(self, job: DownloadJob) -> None: + s = self.settings + if not s.spark1_host or not s.spark1_user: + raise RuntimeError("spark1 not configured") + + flags = "-c --copy-parallel" if job.mode == "cluster" else "" + cmd = f"cd ~/spark-vllm-docker && ./hf-download.sh {job.repo} {flags}".strip() + job.append(f"$ {cmd}") + job.state = "downloading" + job.progress.phase = "Connecting to Hugging Face…" + + handle = StreamHandle() + async for line in ssh_stream(s.spark1_host, s.spark1_user, cmd, s, handle=handle): + job.append(line) + self._update_progress(job, line) + + rc = handle.returncode or 0 + if rc != 0: + job.state = "failed" + job.returncode = rc + + def _update_progress(self, job: DownloadJob, line: str) -> None: + p = job.progress + # Phase transitions from log content + if "Copying model" in line or "Parallel copy enabled" in line: + job.state = "copying" + p.phase = "Copying to peer Sparks…" + elif "Download completed" in line: + p.phase = "Download complete, finalizing…" + elif "Copy complete" in line: + p.phase = "Copy complete" + elif "Still waiting to acquire lock" in line: + p.phase = "Waiting for lock (another download in progress)…" + + # Check the "Fetching N files" pattern first (it could match TQDM_RE otherwise). + m2 = _FETCHING_RE.search(line) + if m2: + p.percent = float(m2.group(2)) + p.phase = f"Fetching {m2.group(1)} files" + return + + m = _TQDM_RE.search(line) + if m: + p.percent = float(m.group(1)) + p.downloaded = m.group(2) + p.total = m.group(3) + p.elapsed = m.group(4) + p.eta = m.group(5) + p.rate = m.group(6) or p.rate + if job.state != "copying": + p.phase = "Downloading" + return diff --git a/image/app/server.py b/image/app/server.py index e838e13..ef73b30 100644 --- a/image/app/server.py +++ b/image/app/server.py @@ -7,8 +7,10 @@ from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse, JSONResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel +from typing import Literal from .config import Settings +from .download import DownloadManager from .health import check_magpie, check_parakeet, check_vllm from .models import load_catalog from .services import docker_state, run_action, services_from_settings @@ -19,6 +21,7 @@ from .swap import SwapManager settings = Settings.from_env() catalog = load_catalog(settings.models_yaml) swap_manager = SwapManager(settings, catalog) +download_manager = DownloadManager(settings) app = FastAPI(title="spark-control", version="0.1.0") @@ -228,6 +231,82 @@ async def stream_swap(job_id: str): return StreamingResponse(gen(), media_type="text/event-stream") +class DownloadRequest(BaseModel): + repo: str + mode: Literal["solo", "cluster"] = "solo" + + +@app.post("/api/download") +async def post_download(req: DownloadRequest) -> dict: + if not settings.configured: + raise HTTPException(503, "spark1 not configured") + try: + job = await download_manager.trigger(req.repo, req.mode) + except ValueError as e: + raise HTTPException(400, str(e)) + except RuntimeError as e: + raise HTTPException(409, str(e)) + return {"job_id": job.id, "repo": job.repo, "mode": job.mode, "state": job.state} + + +@app.get("/api/download/{job_id}") +async def get_download(job_id: str) -> dict: + job = download_manager.get(job_id) + if job is None: + raise HTTPException(404, "no such job") + return _serialize_download(job) + + +def _serialize_download(job) -> dict: + return { + "id": job.id, + "repo": job.repo, + "mode": job.mode, + "state": job.state, + "started_at": job.started_at, + "finished_at": job.finished_at, + "returncode": job.returncode, + "progress": { + "percent": job.progress.percent, + "downloaded": job.progress.downloaded, + "total": job.progress.total, + "elapsed": job.progress.elapsed, + "eta": job.progress.eta, + "rate": job.progress.rate, + "phase": job.progress.phase, + }, + "lines": job.lines, + } + + +@app.get("/api/download/{job_id}/stream") +async def stream_download(job_id: str): + job = download_manager.get(job_id) + if job is None: + raise HTTPException(404, "no such job") + + async def gen(): + sent = 0 + last_progress = None + while True: + n = len(job.lines) + if n > sent: + for line in job.lines[sent:n]: + yield f"data: {json.dumps({'line': line})}\n\n" + sent = n + # progress is small; emit on change + prog = (job.progress.percent, job.progress.phase, job.progress.downloaded, job.progress.eta, job.progress.rate) + if prog != last_progress: + yield f"event: progress\ndata: {json.dumps({'state': job.state, **_serialize_download(job)['progress']})}\n\n" + last_progress = prog + if job.returncode is not None and sent >= len(job.lines): + yield f"event: done\ndata: {json.dumps({'state': job.state, 'returncode': job.returncode, 'finished_at': job.finished_at})}\n\n" + return + await asyncio.sleep(0.5) + + return StreamingResponse(gen(), media_type="text/event-stream") + + @app.post("/api/test-connection") async def test_connection() -> dict: """Probe both Sparks with a `hostname` command. Useful for the StartOS setup flow.""" diff --git a/image/app/static/app.js b/image/app/static/app.js index bc44c01..8efc98f 100644 --- a/image/app/static/app.js +++ b/image/app/static/app.js @@ -496,8 +496,145 @@ function appendLog(line) { log.scrollTop = log.scrollHeight; } +// ===================== model downloads ===================== + +const dlState = { + job_id: null, + eventsource: null, + started_at: null, + timer_handle: null, +}; + +function openDownloadForm() { + el('#download-panel').classList.remove('hidden'); + el('#download-form').classList.remove('hidden'); + el('#download-progress').classList.add('hidden'); + el('#dl-repo').focus(); +} + +function closeDownloadPanel() { + el('#download-panel').classList.add('hidden'); + el('#download-form').classList.remove('hidden'); + el('#download-progress').classList.add('hidden'); + el('#dl-repo').value = ''; +} + +function dlTimerStart(startedAt) { + dlState.started_at = startedAt; + if (dlState.timer_handle) clearInterval(dlState.timer_handle); + const tick = () => { + if (!dlState.started_at) return; + const sec = Math.max(0, Math.floor((Date.now() - dlState.started_at) / 1000)); + const m = Math.floor(sec / 60); + const s = sec % 60; + el('#dl-elapsed').textContent = `${m}:${s.toString().padStart(2, '0')}`; + }; + tick(); + dlState.timer_handle = setInterval(tick, 500); +} + +function dlTimerStop() { + if (dlState.timer_handle) { clearInterval(dlState.timer_handle); dlState.timer_handle = null; } +} + +async function startDownload() { + const repo = el('#dl-repo').value.trim(); + const mode = document.querySelector('input[name="dl-mode"]:checked').value; + if (!repo || !repo.includes('/')) { + alert('Enter a HuggingFace repo in the form "org/name", e.g. RedHatAI/Qwen3.6-35B-A3B-NVFP4'); + return; + } + try { + const r = await fetchJSON('/api/download', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ repo, mode }), + }); + attachToDownload(r.job_id); + } catch (e) { + alert('Failed to start download: ' + e.message); + } +} + +function renderDownloadProgress(p) { + el('#dl-phase').textContent = p.phase || 'Working…'; + const statsParts = []; + if (p.downloaded && p.total) statsParts.push(`${p.downloaded} / ${p.total}`); + if (p.rate) statsParts.push(p.rate); + if (p.eta) statsParts.push(`ETA ${p.eta}`); + el('#dl-stats').textContent = statsParts.join(' · '); + const pct = Math.max(2, Math.min(100, p.percent || 2)); + el('#dl-progress-fill').style.width = `${pct}%`; + el('#dl-phase-detail').textContent = p.percent > 0 ? `${p.percent.toFixed(1)}%` : ''; +} + +function dlAppendLog(line) { + const log = el('#dl-log'); + log.textContent += line + '\n'; + log.scrollTop = log.scrollHeight; +} + +async function attachToDownload(jobId) { + if (dlState.eventsource) { dlState.eventsource.close(); dlState.eventsource = null; } + dlState.job_id = jobId; + el('#download-form').classList.add('hidden'); + el('#download-progress').classList.remove('hidden'); + el('#dl-log').textContent = ''; + el('#dl-title').textContent = 'Downloading…'; + + try { + const snap = await fetchJSON(`/api/download/${jobId}`); + dlTimerStart(Date.parse(snap.started_at)); + for (const line of snap.lines || []) dlAppendLog(line); + renderDownloadProgress(snap.progress); + if (snap.returncode !== null && snap.returncode !== undefined) { + handleDownloadDone(snap); + return; + } + } catch (e) { + console.warn('download backfill failed', e); + dlTimerStart(Date.now()); + } + + const es = new EventSource(`/api/download/${jobId}/stream`); + dlState.eventsource = es; + es.onmessage = (ev) => { + try { + const d = JSON.parse(ev.data); + if (d.line !== undefined) dlAppendLog(d.line); + } catch {} + }; + es.addEventListener('progress', (ev) => { + try { renderDownloadProgress(JSON.parse(ev.data)); } catch {} + }); + es.addEventListener('done', (ev) => { + let d = {}; + try { d = JSON.parse(ev.data); } catch {} + handleDownloadDone(d); + }); + es.onerror = () => { es.close(); dlState.eventsource = null; }; +} + +function handleDownloadDone(d) { + if (dlState.eventsource) { dlState.eventsource.close(); dlState.eventsource = null; } + dlTimerStop(); + if (d.state === 'failed') { + el('#dl-title').textContent = `Failed (rc=${d.returncode})`; + el('#dl-phase').textContent = 'Failed'; + } else { + el('#dl-title').textContent = 'Done'; + el('#dl-phase').textContent = 'Done ✓ — you can now add this model to the catalog and swap to it.'; + el('#dl-progress-fill').style.width = '100%'; + } + dlState.job_id = null; +} + async function init() { setupCopyButtons(); + el('#open-download').addEventListener('click', openDownloadForm); + el('#dl-cancel').addEventListener('click', closeDownloadPanel); + el('#dl-start').addEventListener('click', startDownload); + el('#dl-repo').addEventListener('keydown', (e) => { if (e.key === 'Enter') startDownload(); }); await loadModels(); await pollStatus(); await renderServices(); diff --git a/image/app/static/index.html b/image/app/static/index.html index 7fa3ba4..4f5dc18 100644 --- a/image/app/static/index.html +++ b/image/app/static/index.html @@ -69,7 +69,50 @@
-

LLM swap

+
+

LLM swap

+ +
+ + +
diff --git a/image/app/static/style.css b/image/app/static/style.css index dcecde4..9751856 100644 --- a/image/app/static/style.css +++ b/image/app/static/style.css @@ -217,6 +217,70 @@ main { word-break: break-word; } +/* ===== Section header (title + action button) ===== */ + +.section-header { + display: flex; + align-items: center; + gap: 12px; + margin: 24px 0 12px; +} +.section-header .section-title { margin: 0; } +.section-header .spacer { flex: 1; } +.section-header .small-btn, +.btn.small-btn { + margin-left: auto; + padding: 5px 12px; + font-size: 12px; +} + +/* ===== Download panel ===== */ + +.download-panel { + background: var(--surface); + border: 1px solid var(--info); + border-radius: var(--radius); + padding: 14px 16px; + margin-bottom: 16px; +} +.download-form .dl-row { + display: flex; + align-items: center; + gap: 12px; + padding: 6px 0; + flex-wrap: wrap; +} +.dl-label { + color: var(--muted); + font-size: 12px; + min-width: 110px; + flex-shrink: 0; + text-transform: uppercase; + letter-spacing: 0.05em; +} +.dl-row input[type='text'] { + flex: 1; + background: var(--surface-2); + border: 1px solid var(--border); + color: var(--text); + padding: 7px 10px; + border-radius: 6px; + font: 13px ui-monospace, SFMono-Regular, "SF Mono", Menlo, monospace; + min-width: 200px; +} +.dl-row input[type='text']:focus { outline: 1px solid var(--info); border-color: var(--info); } +.radio { display: inline-flex; align-items: center; gap: 6px; font-size: 13px; color: var(--text); cursor: pointer; } +.radio input { accent-color: var(--accent); } +.dl-actions { display: flex; gap: 8px; justify-content: flex-end; margin-top: 10px; } +.dl-stats { + margin-top: 8px; + font-family: ui-monospace, SFMono-Regular, "SF Mono", Menlo, monospace; +} +.dl-header { display: flex; align-items: center; gap: 12px; } +.dl-header #dl-title { font-weight: 600; color: var(--info); } +#dl-log-details { margin-top: 12px; } +#dl-log-details summary { cursor: pointer; padding: 4px 0; } + /* ===== Section titles ===== */ .section-title { diff --git a/package/startos/versions/v0_1_0.ts b/package/startos/versions/v0_1_0.ts index b1bbe22..edeb142 100644 --- a/package/startos/versions/v0_1_0.ts +++ b/package/startos/versions/v0_1_0.ts @@ -1,10 +1,10 @@ import { VersionInfo, IMPOSSIBLE } from '@start9labs/start-sdk' export const v0_1_0 = VersionInfo.of({ - version: '0.2.0:0', + version: '0.2.1:0', releaseNotes: { en_US: - 'Always-on services panel: dashboard now has cards for Parakeet and Magpie with Start/Stop/Restart buttons and live container state (uptime, restart count). Configure Sparks adds optional per-service host/container fields so Parakeet or Magpie can live on Spark 1 (or anywhere) instead of being hard-wired to Spark 2.', + 'Model download from the dashboard. Click "+ Download a new model", paste a Hugging Face repo (e.g. RedHatAI/...), choose solo or cluster, and watch %% progress with bytes, rate, and ETA. Drives ./hf-download.sh on Spark 1 over SSH and parses the tqdm output.', }, migrations: { up: async ({ effects }) => {},