Initial scaffold: image/ FastAPI app, models.yaml, docs

- image/ FastAPI app: /api/status, /api/swap, /api/swap/{id}/stream, /api/test-connection
- models.yaml: 5-model catalog (qwen3-vl, gemma4, qwen36, qwen3-235b-fp8, qwen25-72b)
- README, runbook, known-issues
- Dry-run swap verified against live Spark 1 (gemma4 currently loaded)
This commit is contained in:
Grant
2026-05-12 09:29:13 -05:00
commit ae8efa1754
19 changed files with 1500 additions and 0 deletions
View File
+58
View File
@@ -0,0 +1,58 @@
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
def _env(name: str, default: str = "") -> str:
return os.environ.get(name, default)
def _resolve_models_yaml() -> str:
if env := os.environ.get("MODELS_YAML"):
return env
here = Path(__file__).resolve().parent # app/
candidates = [
here.parent / "models.yaml", # image/models.yaml (Docker)
here.parent.parent / "models.yaml", # <repo>/models.yaml (dev)
Path("/app/models.yaml"), # explicit container path
]
for p in candidates:
if p.exists():
return str(p)
return str(candidates[0]) # let load fail with a clear path
@dataclass(frozen=True)
class Settings:
spark1_host: str
spark1_user: str
spark2_host: str
spark2_user: str
ssh_key_path: str
ssh_known_hosts: str
models_yaml: str
vllm_port: int
parakeet_port: int
magpie_port: int
bind_port: int
@classmethod
def from_env(cls) -> "Settings":
return cls(
spark1_host=_env("SPARK1_HOST"),
spark1_user=_env("SPARK1_USER", "<spark-user>"),
spark2_host=_env("SPARK2_HOST"),
spark2_user=_env("SPARK2_USER", "<spark-user>"),
ssh_key_path=_env("SSH_KEY_PATH"),
ssh_known_hosts=_env("SSH_KNOWN_HOSTS"),
models_yaml=_resolve_models_yaml(),
vllm_port=int(_env("VLLM_PORT", "8888")),
parakeet_port=int(_env("PARAKEET_PORT", "8000")),
magpie_port=int(_env("MAGPIE_PORT", "9000")),
bind_port=int(_env("BIND_PORT", "9999")),
)
@property
def configured(self) -> bool:
return bool(self.spark1_host)
+43
View File
@@ -0,0 +1,43 @@
from __future__ import annotations
import httpx
from .config import Settings
_TIMEOUT = 3.0
async def check_vllm(settings: Settings) -> dict:
if not settings.spark1_host:
return {"ok": False, "error": "spark1 not configured"}
try:
async with httpx.AsyncClient(timeout=_TIMEOUT) as c:
r = await c.get(f"http://{settings.spark1_host}:{settings.vllm_port}/v1/models")
r.raise_for_status()
ids = [m["id"] for m in r.json().get("data", [])]
return {"ok": True, "current_model": ids[0] if ids else None, "all": ids}
except Exception as e:
return {"ok": False, "error": str(e)}
async def check_parakeet(settings: Settings) -> dict:
if not settings.spark2_host:
return {"ok": False, "error": "spark2 not configured"}
try:
async with httpx.AsyncClient(timeout=_TIMEOUT) as c:
r = await c.get(f"http://{settings.spark2_host}:{settings.parakeet_port}/health")
r.raise_for_status()
return {"ok": True, "detail": r.json()}
except Exception as e:
return {"ok": False, "error": str(e)}
async def check_magpie(settings: Settings) -> dict:
if not settings.spark2_host:
return {"ok": False, "error": "spark2 not configured"}
try:
async with httpx.AsyncClient(timeout=_TIMEOUT) as c:
r = await c.get(f"http://{settings.spark2_host}:{settings.magpie_port}/v1/health/ready")
r.raise_for_status()
return {"ok": True, "detail": r.json() if r.headers.get("content-type", "").startswith("application/json") else r.text}
except Exception as e:
return {"ok": False, "error": str(e)}
+40
View File
@@ -0,0 +1,40 @@
from __future__ import annotations
from typing import Literal
import yaml
from pydantic import BaseModel, Field
class ModelDef(BaseModel):
display_name: str
repo: str
size_gb: float
mode: Literal["solo", "cluster"]
capabilities: list[str] = Field(default_factory=list)
expected_ready_seconds: int = 300
vllm_args: list[str] = Field(default_factory=list)
class Defaults(BaseModel):
port: int = 8888
host: str = "0.0.0.0"
class Catalog(BaseModel):
defaults: Defaults = Field(default_factory=Defaults)
models: dict[str, ModelDef]
def load_catalog(path: str) -> Catalog:
with open(path) as f:
data = yaml.safe_load(f)
return Catalog.model_validate(data)
def build_launch_command(key: str, model: ModelDef, defaults: Defaults) -> str:
"""Return the shell command to launch `model` on Spark 1.
Assumes cwd will be `~/spark-vllm-docker` (we cd in the SSH wrapper).
"""
solo = "--solo " if model.mode == "solo" else ""
args = [f"--port={defaults.port}", f"--host={defaults.host}", *model.vllm_args]
return f"./launch-cluster.sh {solo}-d exec vllm serve {model.repo} {' '.join(args)}"
+155
View File
@@ -0,0 +1,155 @@
from __future__ import annotations
import asyncio
import json
from pathlib import Path
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from .config import Settings
from .health import check_magpie, check_parakeet, check_vllm
from .models import load_catalog
from .ssh import ssh_run
from .swap import SwapManager
settings = Settings.from_env()
catalog = load_catalog(settings.models_yaml)
swap_manager = SwapManager(settings, catalog)
app = FastAPI(title="spark-control", version="0.1.0")
_STATIC_DIR = Path(__file__).resolve().parent / "static"
app.mount("/static", StaticFiles(directory=_STATIC_DIR), name="static")
@app.get("/", include_in_schema=False)
async def index() -> FileResponse:
return FileResponse(_STATIC_DIR / "index.html")
@app.get("/api/config")
async def get_config() -> dict:
return {
"configured": settings.configured,
"spark1_host": settings.spark1_host,
"spark2_host": settings.spark2_host,
"vllm_port": settings.vllm_port,
}
@app.get("/api/models")
async def get_models() -> dict:
return {
"defaults": catalog.defaults.model_dump(),
"models": {k: v.model_dump() for k, v in catalog.models.items()},
}
@app.get("/api/status")
async def get_status() -> dict:
vllm, parakeet, magpie = await asyncio.gather(
check_vllm(settings),
check_parakeet(settings),
check_magpie(settings),
)
current_key = _identify_current_model(vllm.get("current_model"))
return {
"configured": settings.configured,
"vllm": vllm,
"parakeet": parakeet,
"magpie": magpie,
"current_model_key": current_key,
"current_swap_job": swap_manager.current_job_id,
}
def _identify_current_model(repo: str | None) -> str | None:
if not repo:
return None
for key, m in catalog.models.items():
if m.repo == repo:
return key
return None
class SwapRequest(BaseModel):
model_key: str
dry_run: bool = False
@app.post("/api/swap")
async def post_swap(req: SwapRequest) -> dict:
if not settings.configured and not req.dry_run:
raise HTTPException(503, "spark1 not configured")
try:
job = await swap_manager.trigger(req.model_key, dry_run=req.dry_run)
except KeyError:
raise HTTPException(404, f"unknown model: {req.model_key}")
except RuntimeError as e:
raise HTTPException(409, str(e))
return {"job_id": job.id, "model_key": job.model_key, "state": job.state}
@app.get("/api/swap/{job_id}")
async def get_swap(job_id: str) -> dict:
job = swap_manager.get(job_id)
if job is None:
raise HTTPException(404, "no such job")
return {
"id": job.id,
"model_key": job.model_key,
"state": job.state,
"started_at": job.started_at,
"finished_at": job.finished_at,
"returncode": job.returncode,
"dry_run": job.dry_run,
"lines": job.lines,
}
@app.get("/api/swap/{job_id}/stream")
async def stream_swap(job_id: str):
job = swap_manager.get(job_id)
if job is None:
raise HTTPException(404, "no such job")
async def gen():
sent = 0
while True:
n = len(job.lines)
if n > sent:
for line in job.lines[sent:n]:
payload = json.dumps({"line": line, "state": job.state})
yield f"data: {payload}\n\n"
sent = n
if job.returncode is not None and sent >= len(job.lines):
payload = json.dumps({
"state": job.state,
"returncode": job.returncode,
"finished_at": job.finished_at,
})
yield f"event: done\ndata: {payload}\n\n"
return
await asyncio.sleep(0.4)
return StreamingResponse(gen(), media_type="text/event-stream")
@app.post("/api/test-connection")
async def test_connection() -> dict:
"""Probe both Sparks with a `hostname` command. Useful for the StartOS setup flow."""
results: dict[str, dict] = {}
if settings.spark1_host:
rc, out, err = await ssh_run(settings.spark1_host, settings.spark1_user, "hostname && docker ps --format '{{.Names}}'", settings, timeout=10)
results["spark1"] = {"ok": rc == 0, "rc": rc, "stdout": out.strip(), "stderr": err.strip()}
else:
results["spark1"] = {"ok": False, "error": "not configured"}
if settings.spark2_host:
rc, out, err = await ssh_run(settings.spark2_host, settings.spark2_user, "hostname && docker ps --format '{{.Names}}'", settings, timeout=10)
results["spark2"] = {"ok": rc == 0, "rc": rc, "stdout": out.strip(), "stderr": err.strip()}
else:
results["spark2"] = {"ok": False, "error": "not configured"}
return results
+91
View File
@@ -0,0 +1,91 @@
"""Async wrappers around the system `ssh` client.
We shell out rather than use Paramiko/asyncssh so that:
- Host key + auth behavior is identical to what a user would see at the shell.
- The same ssh config file (`~/.ssh/config`) and key files work in dev.
- We don't pull in a heavy crypto dependency for the container image.
"""
from __future__ import annotations
import asyncio
from typing import AsyncIterator
from .config import Settings
def _base_args(settings: Settings) -> list[str]:
args = [
"ssh",
"-o", "BatchMode=yes",
"-o", "StrictHostKeyChecking=accept-new",
"-o", "ServerAliveInterval=15",
"-o", "ServerAliveCountMax=4",
]
if settings.ssh_key_path:
args += ["-i", settings.ssh_key_path]
if settings.ssh_known_hosts:
args += ["-o", f"UserKnownHostsFile={settings.ssh_known_hosts}"]
return args
async def ssh_run(
host: str,
user: str,
command: str,
settings: Settings,
timeout: float = 30.0,
) -> tuple[int, str, str]:
"""Run a one-shot SSH command. Returns (rc, stdout, stderr)."""
args = _base_args(settings) + [f"{user}@{host}", command]
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout_b, stderr_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return 124, "", f"timeout after {timeout}s"
assert proc.returncode is not None
return proc.returncode, stdout_b.decode(errors="replace"), stderr_b.decode(errors="replace")
class StreamHandle:
"""Holds the final returncode once an `ssh_stream()` generator completes."""
def __init__(self) -> None:
self.returncode: int | None = None
async def ssh_stream(
host: str,
user: str,
command: str,
settings: Settings,
handle: StreamHandle | None = None,
) -> AsyncIterator[str]:
"""Yield stdout (and merged stderr) lines from a long-running SSH command.
The generator may be aborted by closing it (e.g. `break` in `async for`);
the child SSH process is terminated and waited on in the `finally` block.
"""
args = _base_args(settings) + [f"{user}@{host}", command]
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
assert proc.stdout is not None
try:
async for raw in proc.stdout:
yield raw.decode(errors="replace").rstrip("\r\n")
finally:
if proc.returncode is None:
proc.terminate()
try:
await asyncio.wait_for(proc.wait(), timeout=5)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
if handle is not None:
handle.returncode = proc.returncode
+195
View File
@@ -0,0 +1,195 @@
// spark-control front-end
// - polls /api/status every 5s for current model + health
// - lists models from /api/models as cards
// - POST /api/swap to start a swap, then opens SSE /api/swap/{id}/stream
const state = {
models: {},
defaults: {},
current_model_key: null,
swap_job_id: null,
swap_eventsource: null,
configured: true,
};
function el(sel) { return document.querySelector(sel); }
function $(sel) { return document.querySelectorAll(sel); }
async function fetchJSON(url, opts) {
const r = await fetch(url, opts);
if (!r.ok) {
const text = await r.text().catch(() => "");
throw new Error(`${r.status} ${r.statusText}: ${text}`);
}
return r.json();
}
function renderCards() {
const root = el("#cards");
root.innerHTML = "";
const keys = Object.keys(state.models);
for (const key of keys) {
const m = state.models[key];
const isActive = key === state.current_model_key;
const isSwapping = !!state.swap_job_id;
const card = document.createElement("div");
card.className = "card" + (isActive ? " active" : "");
card.innerHTML = `
<div class="name">${m.display_name}</div>
<div class="meta">
<span class="tag mode-${m.mode}">${m.mode}</span>
<span class="tag">${m.size_gb} GB</span>
${(m.capabilities || []).map(c => `<span class="tag cap">${c}</span>`).join("")}
</div>
<div class="muted small" style="word-break:break-all">${m.repo}</div>
<div class="spacer"></div>
<button class="btn ${isActive ? "" : "primary"}" data-key="${key}" ${isActive || isSwapping ? "disabled" : ""}>
${isActive ? "Current" : "Switch to this"}
</button>
`;
root.appendChild(card);
}
for (const btn of $(".card .btn")) {
btn.addEventListener("click", () => triggerSwap(btn.dataset.key));
}
}
function renderCurrent(status) {
const c = el("#current");
if (!status.configured) {
c.innerHTML = `<span class="muted">not configured</span>`;
return;
}
if (status.current_swap_job) {
c.innerHTML = `<span class="muted">swap in progress</span>`;
return;
}
const v = status.vllm || {};
if (!v.ok) {
c.innerHTML = `<span class="muted">vLLM unreachable</span>`;
return;
}
const key = status.current_model_key;
const m = key ? state.models[key] : null;
const label = m ? m.display_name : (v.current_model || "(unknown)");
c.innerHTML = `<strong>${label}</strong>`;
}
function renderHealth(status) {
function setDot(id, ok) {
const item = el(id);
if (!item) return;
const dot = item.querySelector(".dot");
dot.classList.remove("ok", "bad", "warn");
if (ok === true) dot.classList.add("ok");
else if (ok === false) dot.classList.add("bad");
else dot.classList.add("warn");
item.title = JSON.stringify(status[id.replace("#h-", "")] || {}, null, 2);
}
setDot("#h-vllm", status.vllm && status.vllm.ok);
setDot("#h-parakeet", status.parakeet && status.parakeet.ok);
setDot("#h-magpie", status.magpie && status.magpie.ok);
el("#updated").textContent = `updated ${new Date().toLocaleTimeString()}`;
}
function renderBanner(status) {
el("#setup-banner").classList.toggle("hidden", !!status.configured);
}
async function pollStatus() {
try {
const status = await fetchJSON("/api/status");
state.current_model_key = status.current_model_key;
state.configured = status.configured;
renderBanner(status);
renderCurrent(status);
renderHealth(status);
if (status.current_swap_job && status.current_swap_job !== state.swap_job_id) {
attachToSwap(status.current_swap_job);
} else if (!status.current_swap_job && state.swap_job_id && !state.swap_eventsource) {
// someone else's swap finished; clear local
state.swap_job_id = null;
el("#swap-panel").classList.add("hidden");
}
renderCards();
} catch (e) {
console.error("status poll failed", e);
}
}
async function loadModels() {
const data = await fetchJSON("/api/models");
state.defaults = data.defaults || {};
state.models = data.models || {};
}
async function triggerSwap(modelKey) {
if (state.swap_job_id) return;
try {
const r = await fetchJSON("/api/swap", {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ model_key: modelKey }),
});
attachToSwap(r.job_id);
} catch (e) {
alert("Failed to start swap: " + e.message);
}
}
function attachToSwap(jobId) {
if (state.swap_eventsource) {
state.swap_eventsource.close();
state.swap_eventsource = null;
}
state.swap_job_id = jobId;
el("#swap-panel").classList.remove("hidden");
el("#swap-log").textContent = "";
el("#swap-state").textContent = "starting";
const es = new EventSource(`/api/swap/${jobId}/stream`);
state.swap_eventsource = es;
es.onmessage = (ev) => {
try {
const d = JSON.parse(ev.data);
if (d.state) el("#swap-state").textContent = d.state;
if (d.line) appendLog(d.line);
} catch {}
};
es.addEventListener("done", (ev) => {
try {
const d = JSON.parse(ev.data);
el("#swap-state").textContent = d.state + ` (rc=${d.returncode})`;
} catch {}
es.close();
state.swap_eventsource = null;
state.swap_job_id = null;
setTimeout(() => {
el("#swap-panel").classList.add("hidden");
pollStatus();
}, 4000);
pollStatus();
});
es.onerror = () => {
// SSE drops happen on tab background; reconnect on next poll
es.close();
state.swap_eventsource = null;
};
renderCards();
}
function appendLog(line) {
const log = el("#swap-log");
log.textContent += line + "\n";
log.scrollTop = log.scrollHeight;
}
async function init() {
await loadModels();
await pollStatus();
setInterval(pollStatus, 5000);
}
init();
+51
View File
@@ -0,0 +1,51 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, viewport-fit=cover">
<meta name="color-scheme" content="dark">
<title>spark-control</title>
<link rel="stylesheet" href="/static/style.css">
</head>
<body>
<header class="topbar">
<div class="brand">
<span class="logo-dot"></span>
<span>spark-control</span>
</div>
<div class="current" id="current">
<span class="muted">connecting…</span>
</div>
</header>
<main>
<section id="setup-banner" class="banner hidden">
<strong>Configuration needed.</strong>
<span>Run the <em>Configure Sparks</em> action in StartOS to set hostnames, then run <em>Test Connection</em>.</span>
</section>
<section id="swap-panel" class="swap-panel hidden">
<div class="swap-header">
<span class="spinner"></span>
<span id="swap-title">Swapping…</span>
<span class="spacer"></span>
<span class="muted small" id="swap-state"></span>
</div>
<pre id="swap-log" class="log"></pre>
</section>
<section id="cards" class="cards"></section>
<footer class="footer">
<div class="health">
<span class="health-item" id="h-vllm"><span class="dot"></span> vLLM</span>
<span class="health-item" id="h-parakeet"><span class="dot"></span> Parakeet</span>
<span class="health-item" id="h-magpie"><span class="dot"></span> Magpie</span>
</div>
<div class="muted small" id="updated"></div>
</footer>
</main>
<script src="/static/app.js"></script>
</body>
</html>
+170
View File
@@ -0,0 +1,170 @@
:root {
--bg: #0a0a0d;
--surface: #15151a;
--surface-2: #1c1c22;
--border: #25252c;
--text: #e6e6ea;
--muted: #7e7e8a;
--accent: #4ade80;
--warn: #f59e0b;
--error: #ef4444;
--info: #60a5fa;
--radius: 10px;
}
* { box-sizing: border-box; }
html, body { margin: 0; padding: 0; }
body {
background: var(--bg);
color: var(--text);
font: 15px/1.5 -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
min-height: 100vh;
-webkit-font-smoothing: antialiased;
}
.muted { color: var(--muted); }
.small { font-size: 13px; }
.hidden { display: none !important; }
.spacer { flex: 1; }
.topbar {
position: sticky;
top: 0;
background: rgba(10, 10, 13, 0.85);
backdrop-filter: saturate(160%) blur(10px);
-webkit-backdrop-filter: saturate(160%) blur(10px);
border-bottom: 1px solid var(--border);
display: flex;
align-items: center;
gap: 16px;
padding: 12px 20px;
z-index: 10;
}
.brand { display: flex; align-items: center; gap: 10px; font-weight: 600; }
.logo-dot { width: 10px; height: 10px; border-radius: 50%; background: var(--accent); box-shadow: 0 0 12px var(--accent); }
.current { flex: 1; text-align: right; font-size: 14px; }
.current strong { color: var(--accent); }
main {
max-width: 880px;
margin: 0 auto;
padding: 24px 20px 80px;
}
.banner {
background: var(--surface);
border: 1px solid var(--warn);
color: var(--warn);
padding: 12px 16px;
border-radius: var(--radius);
margin-bottom: 16px;
font-size: 14px;
}
.banner em { font-style: normal; background: rgba(245, 158, 11, 0.15); padding: 2px 6px; border-radius: 4px; }
.swap-panel {
background: var(--surface);
border: 1px solid var(--info);
border-radius: var(--radius);
padding: 14px 16px;
margin-bottom: 20px;
}
.swap-header { display: flex; align-items: center; gap: 10px; }
.swap-header #swap-title { font-weight: 600; color: var(--info); }
.spinner {
width: 14px; height: 14px;
border: 2px solid var(--info);
border-right-color: transparent;
border-radius: 50%;
animation: spin 0.8s linear infinite;
}
@keyframes spin { to { transform: rotate(360deg); } }
.log {
background: #08080b;
border: 1px solid var(--border);
border-radius: 6px;
padding: 10px 12px;
margin: 10px 0 0;
font: 12px/1.55 ui-monospace, SFMono-Regular, "SF Mono", Menlo, monospace;
color: #c7c7d1;
max-height: 280px;
overflow: auto;
white-space: pre-wrap;
word-break: break-word;
}
.cards {
display: grid;
gap: 14px;
grid-template-columns: repeat(auto-fill, minmax(280px, 1fr));
}
.card {
background: var(--surface);
border: 1px solid var(--border);
border-radius: var(--radius);
padding: 16px;
display: flex;
flex-direction: column;
gap: 12px;
transition: border-color 0.15s, transform 0.15s;
}
.card.active {
border-color: var(--accent);
box-shadow: 0 0 0 1px var(--accent) inset, 0 0 24px rgba(74, 222, 128, 0.08);
}
.card .name { font-weight: 600; font-size: 15px; }
.card .meta { display: flex; flex-wrap: wrap; gap: 6px; font-size: 12px; color: var(--muted); }
.tag {
background: var(--surface-2);
border: 1px solid var(--border);
padding: 2px 8px;
border-radius: 999px;
font-size: 11px;
}
.tag.mode-cluster { color: var(--info); border-color: rgba(96, 165, 250, 0.4); }
.tag.mode-solo { color: var(--accent); border-color: rgba(74, 222, 128, 0.4); }
.tag.cap { color: var(--muted); }
.btn {
appearance: none;
border: 1px solid var(--border);
background: var(--surface-2);
color: var(--text);
padding: 8px 14px;
border-radius: 8px;
cursor: pointer;
font: inherit;
font-weight: 500;
transition: background 0.15s, border-color 0.15s, opacity 0.15s;
}
.btn:hover:not(:disabled) { background: #24242c; border-color: #34343c; }
.btn.primary { background: var(--accent); color: #052e16; border-color: var(--accent); }
.btn.primary:hover:not(:disabled) { background: #6ee19a; }
.btn:disabled { opacity: 0.45; cursor: not-allowed; }
.card.active .btn { background: rgba(74, 222, 128, 0.12); color: var(--accent); border-color: rgba(74, 222, 128, 0.4); }
.footer {
margin-top: 28px;
padding-top: 16px;
border-top: 1px solid var(--border);
display: flex;
align-items: center;
gap: 14px;
flex-wrap: wrap;
}
.health { display: flex; gap: 14px; flex-wrap: wrap; }
.health-item { display: inline-flex; align-items: center; gap: 6px; font-size: 13px; color: var(--muted); }
.dot { width: 9px; height: 9px; border-radius: 50%; background: var(--muted); display: inline-block; }
.dot.ok { background: var(--accent); box-shadow: 0 0 8px rgba(74, 222, 128, 0.7); }
.dot.bad { background: var(--error); box-shadow: 0 0 8px rgba(239, 68, 68, 0.7); }
.dot.warn { background: var(--warn); }
@media (max-width: 640px) {
.topbar { padding: 10px 14px; }
main { padding: 16px 14px 80px; }
.cards { grid-template-columns: 1fr; }
}
+140
View File
@@ -0,0 +1,140 @@
from __future__ import annotations
import asyncio
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
from .config import Settings
from .models import Catalog, build_launch_command
from .ssh import ssh_run, ssh_stream, StreamHandle
READY_MARKER = "Application startup complete."
MAX_LINES = 500
@dataclass
class SwapJob:
id: str
model_key: str
started_at: str
state: str = "starting" # starting|stopping|launching|tailing|ready|failed
lines: list[str] = field(default_factory=list)
returncode: Optional[int] = None
finished_at: Optional[str] = None
dry_run: bool = False
def append(self, line: str) -> None:
self.lines.append(line)
if len(self.lines) > MAX_LINES:
del self.lines[: len(self.lines) - MAX_LINES]
class SwapManager:
def __init__(self, settings: Settings, catalog: Catalog) -> None:
self.settings = settings
self.catalog = catalog
self.lock = asyncio.Lock()
self.jobs: dict[str, SwapJob] = {}
self.current_job_id: Optional[str] = None
def get(self, job_id: str) -> SwapJob | None:
return self.jobs.get(job_id)
def reload_catalog(self, catalog: Catalog) -> None:
self.catalog = catalog
async def trigger(self, model_key: str, *, dry_run: bool = False) -> SwapJob:
if model_key not in self.catalog.models:
raise KeyError(model_key)
if self.lock.locked():
raise RuntimeError("A swap is already in progress")
job = SwapJob(
id=uuid.uuid4().hex[:8],
model_key=model_key,
started_at=datetime.now(timezone.utc).isoformat(),
dry_run=dry_run,
)
self.jobs[job.id] = job
self.current_job_id = job.id
asyncio.create_task(self._run(job))
return job
async def _run(self, job: SwapJob) -> None:
async with self.lock:
try:
await self._do(job)
if job.state != "failed":
job.state = "ready"
job.returncode = 0
except Exception as e:
job.append(f"[error] {type(e).__name__}: {e}")
job.state = "failed"
if job.returncode is None:
job.returncode = 1
finally:
job.finished_at = datetime.now(timezone.utc).isoformat()
if self.current_job_id == job.id:
self.current_job_id = None
async def _do(self, job: SwapJob) -> None:
model = self.catalog.models[job.model_key]
s = self.settings
# Step 1: stop
job.state = "stopping"
stop_cmd = "cd ~/spark-vllm-docker && ./launch-cluster.sh stop"
job.append(f"$ {stop_cmd}")
if not job.dry_run:
rc, out, err = await ssh_run(s.spark1_host, s.spark1_user, stop_cmd, s, timeout=180)
for line in (out + err).splitlines():
job.append(line)
if rc != 0:
job.returncode = rc
job.state = "failed"
return
# Step 2: launch
job.state = "launching"
launch = build_launch_command(job.model_key, model, self.catalog.defaults)
launch_cmd = f"cd ~/spark-vllm-docker && {launch}"
job.append(f"$ {launch_cmd}")
if job.dry_run:
return
rc, out, err = await ssh_run(s.spark1_host, s.spark1_user, launch_cmd, s, timeout=60)
for line in (out + err).splitlines():
job.append(line)
if rc != 0:
job.returncode = rc
job.state = "failed"
return
# Step 3: tail logs until the ready marker (or timeout)
job.state = "tailing"
tail_cmd = "docker logs -f --tail 50 vllm_node"
job.append(f"$ {tail_cmd}")
timeout = max(model.expected_ready_seconds * 2, 600)
handle = StreamHandle()
loop = asyncio.get_event_loop()
deadline = loop.time() + timeout
ready = False
async def _tail() -> bool:
async for line in ssh_stream(s.spark1_host, s.spark1_user, tail_cmd, s, handle=handle):
job.append(line)
if READY_MARKER in line:
return True
if loop.time() > deadline:
return False
return False
try:
ready = await asyncio.wait_for(_tail(), timeout=timeout + 30)
except asyncio.TimeoutError:
ready = False
if not ready:
job.append(f"[error] did not see '{READY_MARKER}' within {timeout}s")
job.state = "failed"
job.returncode = 124