Files
spark-control/image/app/disk.py
T
Keysat 9ff7ee9c1e v0.8.1:0 - delete model weights from disk via card trash icon
Each model card now shows whether its weights are present on disk
(with GB size) or not yet downloaded. When present and the model
isn't currently loaded, a trash icon appears; clicking it pops a
confirmation showing exactly how many GB will be freed and on
which Spark(s), then runs rm -rf on the HF cache directory via SSH.

Cluster-mode models are removed from both Sparks; solo-mode from
Spark 1 only. Safety rails: refuses to delete the currently-loaded
model, refuses during an in-flight swap or download, and the
catalog entry stays intact so it can be re-downloaded anytime.

Backend:
  - new image/app/disk.py: probe_disk + delete_from_disk over SSH
  - GET  /api/models/disk-status — parallel probe across all catalog models
  - DELETE /api/models/{key}/disk — guarded rm -rf, logs to connectivity events

Frontend:
  - on-disk / not-downloaded pills on every card
  - trash icon-btn in card-actions row (hidden when not on disk)
  - confirmation dialog showing per-host bytes-to-free
  - disk-status re-checked every 60s

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 17:07:20 -05:00

131 lines
5.2 KiB
Python

"""On-disk presence + deletion for Hugging Face model caches on the Sparks.
The HF cache layout for a repo `org/name` is:
~/.cache/huggingface/hub/models--org--name/
We use `du -sb` to measure size (bytes) and `rm -rf` to free it. All operations
are gated by the server endpoints, which refuse to delete a currently-loaded
model or one tied to an in-flight swap/download.
"""
from __future__ import annotations
import asyncio
import shlex
from dataclasses import dataclass
from typing import Optional
from .config import Settings
from .ssh import ssh_run
def repo_to_cache_dirname(repo: str) -> str:
"""Convert 'org/name' to 'models--org--name' (the HF hub cache directory)."""
if "/" not in repo:
raise ValueError(f"repo must be in 'org/name' form: {repo!r}")
return "models--" + repo.replace("/", "--")
def _cache_path(repo: str) -> str:
"""Full remote path to the model's cache directory."""
# Use $HOME so it resolves correctly regardless of the SSH user's home.
return f"$HOME/.cache/huggingface/hub/{repo_to_cache_dirname(repo)}"
@dataclass
class HostDiskResult:
host: str
on_disk: bool
size_bytes: int = 0
error: Optional[str] = None
@dataclass
class DiskStatus:
repo: str
on_disk: bool # True if present on AT LEAST one host
total_bytes: int # sum across hosts
per_host: list[HostDiskResult]
async def probe_host(host: str, user: str, repo: str, settings: Settings) -> HostDiskResult:
"""Return whether the model's cache dir exists on this host and its size."""
if not host or not user:
return HostDiskResult(host=host or "?", on_disk=False, error="host not configured")
path = _cache_path(repo)
# `du -sb` prints bytes; if the dir doesn't exist, `du` returns non-zero.
# We test existence explicitly first so we can report on_disk=False cleanly.
cmd = (
f"if [ -d {shlex.quote(path)} ]; then "
f"du -sb {shlex.quote(path)} 2>/dev/null | awk '{{print $1}}'; "
f"else echo MISSING; fi"
)
rc, out, err = await ssh_run(host, user, cmd, settings, timeout=20.0)
if rc != 0:
return HostDiskResult(host=host, on_disk=False, error=(err or out).strip() or f"rc={rc}")
raw = out.strip()
if raw == "MISSING" or raw == "":
return HostDiskResult(host=host, on_disk=False)
try:
size = int(raw.splitlines()[-1])
except ValueError:
return HostDiskResult(host=host, on_disk=False, error=f"unparsable du output: {raw!r}")
return HostDiskResult(host=host, on_disk=True, size_bytes=size)
async def probe_disk(repo: str, mode: str, settings: Settings) -> DiskStatus:
"""Probe one model across the relevant Sparks based on its mode (solo|cluster)."""
hosts: list[tuple[str, str]] = [(settings.spark1_host, settings.spark1_user)]
if mode == "cluster" and settings.spark2_host:
hosts.append((settings.spark2_host, settings.spark2_user))
results = await asyncio.gather(*(probe_host(h, u, repo, settings) for h, u in hosts))
on_disk = any(r.on_disk for r in results)
total = sum(r.size_bytes for r in results)
return DiskStatus(repo=repo, on_disk=on_disk, total_bytes=total, per_host=list(results))
async def delete_host(host: str, user: str, repo: str, settings: Settings) -> HostDiskResult:
"""Probe + rm -rf on one host. Returns bytes freed (0 if the dir wasn't there)."""
if not host or not user:
return HostDiskResult(host=host or "?", on_disk=False, error="host not configured")
path = _cache_path(repo)
# Safety: hard-code the prefix in the command so a bad `repo` can never escape.
# Compute size first, then remove. If absent, still return success (idempotent).
cmd = (
f"set -e; "
f"P={shlex.quote(path)}; "
f"if [ -d \"$P\" ]; then "
f" SIZE=$(du -sb \"$P\" 2>/dev/null | awk '{{print $1}}'); "
f" rm -rf -- \"$P\"; "
f" echo FREED $SIZE; "
f"else "
f" echo FREED 0; "
f"fi"
)
rc, out, err = await ssh_run(host, user, cmd, settings, timeout=120.0)
if rc != 0:
return HostDiskResult(host=host, on_disk=False, error=(err or out).strip() or f"rc={rc}")
# Parse the "FREED N" line
freed = 0
for line in out.splitlines():
parts = line.strip().split()
if len(parts) == 2 and parts[0] == "FREED":
try:
freed = int(parts[1])
except ValueError:
pass
break
return HostDiskResult(host=host, on_disk=False, size_bytes=freed)
async def delete_from_disk(repo: str, mode: str, settings: Settings) -> DiskStatus:
"""rm -rf the model's cache dir on the relevant Sparks. Idempotent."""
hosts: list[tuple[str, str]] = [(settings.spark1_host, settings.spark1_user)]
if mode == "cluster" and settings.spark2_host:
hosts.append((settings.spark2_host, settings.spark2_user))
results = await asyncio.gather(*(delete_host(h, u, repo, settings) for h, u in hosts))
total_freed = sum(r.size_bytes for r in results)
# After deletion, on_disk should be False on all hosts.
return DiskStatus(repo=repo, on_disk=False, total_bytes=total_freed, per_host=list(results))