diff --git a/image/app/disk.py b/image/app/disk.py new file mode 100644 index 0000000..3ef2df6 --- /dev/null +++ b/image/app/disk.py @@ -0,0 +1,130 @@ +"""On-disk presence + deletion for Hugging Face model caches on the Sparks. + +The HF cache layout for a repo `org/name` is: + + ~/.cache/huggingface/hub/models--org--name/ + +We use `du -sb` to measure size (bytes) and `rm -rf` to free it. All operations +are gated by the server endpoints, which refuse to delete a currently-loaded +model or one tied to an in-flight swap/download. +""" +from __future__ import annotations +import asyncio +import shlex +from dataclasses import dataclass +from typing import Optional + +from .config import Settings +from .ssh import ssh_run + + +def repo_to_cache_dirname(repo: str) -> str: + """Convert 'org/name' to 'models--org--name' (the HF hub cache directory).""" + if "/" not in repo: + raise ValueError(f"repo must be in 'org/name' form: {repo!r}") + return "models--" + repo.replace("/", "--") + + +def _cache_path(repo: str) -> str: + """Full remote path to the model's cache directory.""" + # Use $HOME so it resolves correctly regardless of the SSH user's home. + return f"$HOME/.cache/huggingface/hub/{repo_to_cache_dirname(repo)}" + + +@dataclass +class HostDiskResult: + host: str + on_disk: bool + size_bytes: int = 0 + error: Optional[str] = None + + +@dataclass +class DiskStatus: + repo: str + on_disk: bool # True if present on AT LEAST one host + total_bytes: int # sum across hosts + per_host: list[HostDiskResult] + + +async def probe_host(host: str, user: str, repo: str, settings: Settings) -> HostDiskResult: + """Return whether the model's cache dir exists on this host and its size.""" + if not host or not user: + return HostDiskResult(host=host or "?", on_disk=False, error="host not configured") + path = _cache_path(repo) + # `du -sb` prints bytes; if the dir doesn't exist, `du` returns non-zero. + # We test existence explicitly first so we can report on_disk=False cleanly. + cmd = ( + f"if [ -d {shlex.quote(path)} ]; then " + f"du -sb {shlex.quote(path)} 2>/dev/null | awk '{{print $1}}'; " + f"else echo MISSING; fi" + ) + rc, out, err = await ssh_run(host, user, cmd, settings, timeout=20.0) + if rc != 0: + return HostDiskResult(host=host, on_disk=False, error=(err or out).strip() or f"rc={rc}") + raw = out.strip() + if raw == "MISSING" or raw == "": + return HostDiskResult(host=host, on_disk=False) + try: + size = int(raw.splitlines()[-1]) + except ValueError: + return HostDiskResult(host=host, on_disk=False, error=f"unparsable du output: {raw!r}") + return HostDiskResult(host=host, on_disk=True, size_bytes=size) + + +async def probe_disk(repo: str, mode: str, settings: Settings) -> DiskStatus: + """Probe one model across the relevant Sparks based on its mode (solo|cluster).""" + hosts: list[tuple[str, str]] = [(settings.spark1_host, settings.spark1_user)] + if mode == "cluster" and settings.spark2_host: + hosts.append((settings.spark2_host, settings.spark2_user)) + + results = await asyncio.gather(*(probe_host(h, u, repo, settings) for h, u in hosts)) + on_disk = any(r.on_disk for r in results) + total = sum(r.size_bytes for r in results) + return DiskStatus(repo=repo, on_disk=on_disk, total_bytes=total, per_host=list(results)) + + +async def delete_host(host: str, user: str, repo: str, settings: Settings) -> HostDiskResult: + """Probe + rm -rf on one host. Returns bytes freed (0 if the dir wasn't there).""" + if not host or not user: + return HostDiskResult(host=host or "?", on_disk=False, error="host not configured") + path = _cache_path(repo) + # Safety: hard-code the prefix in the command so a bad `repo` can never escape. + # Compute size first, then remove. If absent, still return success (idempotent). + cmd = ( + f"set -e; " + f"P={shlex.quote(path)}; " + f"if [ -d \"$P\" ]; then " + f" SIZE=$(du -sb \"$P\" 2>/dev/null | awk '{{print $1}}'); " + f" rm -rf -- \"$P\"; " + f" echo FREED $SIZE; " + f"else " + f" echo FREED 0; " + f"fi" + ) + rc, out, err = await ssh_run(host, user, cmd, settings, timeout=120.0) + if rc != 0: + return HostDiskResult(host=host, on_disk=False, error=(err or out).strip() or f"rc={rc}") + # Parse the "FREED N" line + freed = 0 + for line in out.splitlines(): + parts = line.strip().split() + if len(parts) == 2 and parts[0] == "FREED": + try: + freed = int(parts[1]) + except ValueError: + pass + break + return HostDiskResult(host=host, on_disk=False, size_bytes=freed) + + +async def delete_from_disk(repo: str, mode: str, settings: Settings) -> DiskStatus: + """rm -rf the model's cache dir on the relevant Sparks. Idempotent.""" + hosts: list[tuple[str, str]] = [(settings.spark1_host, settings.spark1_user)] + if mode == "cluster" and settings.spark2_host: + hosts.append((settings.spark2_host, settings.spark2_user)) + + results = await asyncio.gather(*(delete_host(h, u, repo, settings) for h, u in hosts)) + total_freed = sum(r.size_bytes for r in results) + # After deletion, on_disk should be False on all hosts. + return DiskStatus(repo=repo, on_disk=False, total_bytes=total_freed, per_host=list(results)) diff --git a/image/app/server.py b/image/app/server.py index 4996261..8ad6e73 100644 --- a/image/app/server.py +++ b/image/app/server.py @@ -13,6 +13,7 @@ from .config import Settings from .connectivity import get_mac, record_report, record_state, summary as connectivity_summary from .custom_services import add_custom_service, delete_custom_service from .deep_health import DeepHealth +from .disk import delete_from_disk, probe_disk from .download import DownloadManager from .hardware import HardwareProbe from .health import check_magpie, check_parakeet, check_vllm @@ -139,6 +140,89 @@ async def del_model(key: str) -> dict: return {"ok": True, "key": key} +@app.get("/api/models/disk-status") +async def get_models_disk_status() -> dict: + """Probe each catalog model's HF cache on the appropriate Spark(s) in parallel. + + Result is keyed by model key: {on_disk, total_bytes, per_host:[{host,on_disk,size_bytes,error?}]}. + Designed to be called once on dashboard load; takes ~1–3s depending on Spark count. + """ + if not settings.configured: + return {"configured": False, "models": {}} + keys = list(catalog.models.keys()) + statuses = await asyncio.gather(*( + probe_disk(catalog.models[k].repo, catalog.models[k].mode, settings) for k in keys + ), return_exceptions=True) + out: dict[str, dict] = {} + for k, s in zip(keys, statuses): + if isinstance(s, Exception): + out[k] = {"on_disk": False, "total_bytes": 0, "per_host": [], "error": str(s)} + continue + out[k] = { + "on_disk": s.on_disk, + "total_bytes": s.total_bytes, + "per_host": [ + {"host": r.host, "on_disk": r.on_disk, "size_bytes": r.size_bytes, **({"error": r.error} if r.error else {})} + for r in s.per_host + ], + } + return {"configured": True, "models": out} + + +@app.delete("/api/models/{key}/disk") +async def del_model_disk(key: str) -> dict: + """Delete a model's weights from the Spark filesystem(s). The catalog entry stays. + + Safety rails: + - Refuses if the model is currently loaded on vLLM. + - Refuses if a swap or download is in flight. + - Idempotent: if the cache dir is already gone on a host, that host reports 0 bytes freed. + """ + if key not in catalog.models: + raise HTTPException(404, f"unknown model: {key}") + m = catalog.models[key] + + # Refuse if currently loaded + try: + vllm = await check_vllm(settings) + except Exception: + vllm = {} + if vllm.get("ok") and vllm.get("current_model") == m.repo: + raise HTTPException( + 409, + f"'{m.display_name}' is the currently loaded model. Switch to a different model first, then try again." + ) + + # Refuse if a swap is in flight + if swap_manager.current_job_id: + raise HTTPException(409, "a model swap is in progress; wait for it to finish") + + # Refuse if a download is in flight for this same repo (a different model's download is fine) + if download_manager.current_job_id: + job = download_manager.get(download_manager.current_job_id) + if job and job.repo == m.repo: + raise HTTPException(409, "this model is currently downloading; cancel or wait for it to finish") + + status = await delete_from_disk(m.repo, m.mode, settings) + # Audit log + record_report( + f"disk:{key}", + ok=True, + source="disk-delete", + detail=f"freed {status.total_bytes} bytes across {len(status.per_host)} host(s)", + ) + return { + "ok": True, + "key": key, + "repo": m.repo, + "bytes_freed": status.total_bytes, + "per_host": [ + {"host": r.host, "size_bytes": r.size_bytes, **({"error": r.error} if r.error else {})} + for r in status.per_host + ], + } + + @app.get("/api/hardware") async def get_hardware() -> dict: """Per-Spark hardware snapshot — RAM, disk, GPU mem + util, CPU load, uptime.""" diff --git a/image/app/static/app.js b/image/app/static/app.js index b15ca62..97bfd17 100644 --- a/image/app/static/app.js +++ b/image/app/static/app.js @@ -18,6 +18,8 @@ const state = { configured: true, timer_handle: null, deep_health: {}, + disk_status: {}, // keyed by model key: { on_disk, total_bytes, per_host } + disk_status_loaded: false, }; const el = (sel) => document.querySelector(sel); @@ -57,12 +59,36 @@ function renderCards() { ? `
${escapeHtml(m.repo)}) from disk.`;
+ const hostsEl = el('#dd-hosts');
+ hostsEl.innerHTML = '';
+ for (const h of (disk.per_host || [])) {
+ if (!h.on_disk) continue;
+ const li = document.createElement('li');
+ li.innerHTML = `${escapeHtml(h.host)} — ${fmtBytesShort(h.size_bytes)}`;
+ hostsEl.appendChild(li);
+ }
+ const errEl = el('#dd-error');
+ errEl.classList.add('hidden');
+ errEl.textContent = '';
+
+ const confirm = el('#dd-confirm');
+ const cancel = el('#dd-cancel');
+ const onCancel = () => dlg.close();
+ const onConfirm = async () => {
+ confirm.disabled = true;
+ cancel.disabled = true;
+ confirm.textContent = 'Deleting…';
+ try {
+ const r = await fetchJSON(`/api/models/${encodeURIComponent(key)}/disk`, { method: 'DELETE' });
+ dlg.close();
+ // Optimistically clear local disk state for this key, then refresh.
+ delete state.disk_status[key];
+ renderCards();
+ // Eagerly re-probe so size is accurate (and shows "not downloaded" pill).
+ loadDiskStatus();
+ const freed = r && typeof r.bytes_freed === 'number' ? fmtBytesShort(r.bytes_freed) : '';
+ console.log(`Deleted ${m.display_name} from disk${freed ? ` — freed ${freed}` : ''}.`);
+ } catch (e) {
+ errEl.textContent = e.message || 'Delete failed';
+ errEl.classList.remove('hidden');
+ } finally {
+ confirm.disabled = false;
+ cancel.disabled = false;
+ confirm.textContent = 'Delete from disk';
+ }
+ };
+ cancel.onclick = onCancel;
+ confirm.onclick = onConfirm;
+ dlg.showModal();
+}
+
async function triggerSwap(modelKey) {
if (state.swap_job_id) return;
try {
@@ -1523,9 +1627,12 @@ async function init() {
await renderServices();
pollHardware();
pollUpdates();
+ // Disk-status probe runs after first paint — slow over SSH and not blocking.
+ loadDiskStatus();
setInterval(pollStatus, 5000);
setInterval(pollHardware, 8000); // every 8s
setInterval(pollUpdates, 300000); // every 5 min
+ setInterval(loadDiskStatus, 60000); // every 60s — disk state changes rarely
}
init();
diff --git a/image/app/static/index.html b/image/app/static/index.html
index f9cf853..bedb0db 100644
--- a/image/app/static/index.html
+++ b/image/app/static/index.html
@@ -188,6 +188,20 @@
+
+