27699a2469
Dashboard:
- New 'Always-on services' section with cards for Parakeet and Magpie
- Each card: host:port, model loaded, status pill (Healthy/Unhealthy/Starting/Not configured)
- Start, Restart, Stop buttons. Buttons disabled when not applicable for current state
- Restart counter shown when > 1 (would have surfaced the old magpie crash loop)
Backend:
- New /api/services GET: docker container state + http health for each support service
- New POST /api/services/{name}/{action} for start | stop | restart
- services.py module: docker_state, run_action via SSH
- config.py: PARAKEET_HOST/USER/CONTAINER and MAGPIE_* env vars, default to spark2_*
- health.py: use per-service hosts (no longer hard-wired to spark2_host)
Package:
- sparkConfig.yaml.ts: add 6 new optional fields
- configureSparks action: optional 'Parakeet host', 'Parakeet container', 'Magpie host', 'Magpie container' fields; descriptions explain they default to Spark 2 when blank
- Handler normalizes nulls to empty strings before merge
- main.ts: pass new env vars to container
- bump to 0.2.0:0
246 lines
8.2 KiB
Python
246 lines
8.2 KiB
Python
from __future__ import annotations
|
|
import asyncio
|
|
import json
|
|
from pathlib import Path
|
|
|
|
from fastapi import FastAPI, HTTPException
|
|
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pydantic import BaseModel
|
|
|
|
from .config import Settings
|
|
from .health import check_magpie, check_parakeet, check_vllm
|
|
from .models import load_catalog
|
|
from .services import docker_state, run_action, services_from_settings
|
|
from .ssh import ssh_run
|
|
from .swap import SwapManager
|
|
|
|
|
|
settings = Settings.from_env()
|
|
catalog = load_catalog(settings.models_yaml)
|
|
swap_manager = SwapManager(settings, catalog)
|
|
|
|
app = FastAPI(title="spark-control", version="0.1.0")
|
|
|
|
_STATIC_DIR = Path(__file__).resolve().parent / "static"
|
|
app.mount("/static", StaticFiles(directory=_STATIC_DIR), name="static")
|
|
|
|
|
|
@app.get("/", include_in_schema=False)
|
|
async def index() -> FileResponse:
|
|
return FileResponse(_STATIC_DIR / "index.html")
|
|
|
|
|
|
@app.get("/api/config")
|
|
async def get_config() -> dict:
|
|
return {
|
|
"configured": settings.configured,
|
|
"spark1_host": settings.spark1_host,
|
|
"spark2_host": settings.spark2_host,
|
|
"vllm_port": settings.vllm_port,
|
|
}
|
|
|
|
|
|
@app.get("/api/models")
|
|
async def get_models() -> dict:
|
|
return {
|
|
"defaults": catalog.defaults.model_dump(),
|
|
"models": {k: v.model_dump() for k, v in catalog.models.items()},
|
|
}
|
|
|
|
|
|
@app.get("/api/services")
|
|
async def get_services() -> dict:
|
|
"""Lifecycle state of always-on support services (Parakeet, Magpie, …).
|
|
|
|
Each entry includes:
|
|
- host/port/container/user (configured)
|
|
- state: docker container status (running | exited | restarting | missing | unconfigured)
|
|
- http_ready: whether the service's /health endpoint responded
|
|
- base_url
|
|
- model (if reported by the service)
|
|
- restart_count
|
|
"""
|
|
services = services_from_settings(settings)
|
|
out: dict[str, dict] = {}
|
|
|
|
async def one(name: str):
|
|
svc = services[name]
|
|
docker = await docker_state(settings, svc)
|
|
if name == "parakeet":
|
|
http = await check_parakeet(settings)
|
|
else:
|
|
http = await check_magpie(settings)
|
|
return name, {
|
|
"host": svc.host,
|
|
"user": svc.user,
|
|
"port": svc.port,
|
|
"container": svc.container,
|
|
"kind": svc.kind,
|
|
"base_url": http.get("base_url"),
|
|
"http_ready": bool(http.get("ok")),
|
|
"model": (http.get("detail") or {}).get("model") if isinstance(http.get("detail"), dict) else None,
|
|
"docker_state": docker.get("state"),
|
|
"restart_count": docker.get("restart_count"),
|
|
"started_at": docker.get("started_at"),
|
|
"exit_code": docker.get("exit_code"),
|
|
"error": docker.get("error"),
|
|
"detail": http.get("detail"),
|
|
}
|
|
|
|
results = await asyncio.gather(*[one(n) for n in services.keys()])
|
|
for name, info in results:
|
|
out[name] = info
|
|
return out
|
|
|
|
|
|
@app.post("/api/services/{name}/{action}")
|
|
async def service_action(name: str, action: str) -> dict:
|
|
services = services_from_settings(settings)
|
|
if name not in services:
|
|
raise HTTPException(404, f"unknown service: {name}")
|
|
if action not in ("start", "stop", "restart"):
|
|
raise HTTPException(400, f"unknown action: {action}")
|
|
result = await run_action(settings, services[name], action) # type: ignore[arg-type]
|
|
if not result["ok"]:
|
|
raise HTTPException(500, result.get("stderr") or result.get("error") or "action failed")
|
|
return {"name": name, "action": action, **result}
|
|
|
|
|
|
@app.get("/api/endpoints")
|
|
async def get_endpoints() -> dict:
|
|
"""Service-discovery summary. Stable shape; other apps on the LAN can poll this
|
|
to learn the OpenAI-compatible vLLM endpoint, the Parakeet STT endpoint, and the
|
|
Magpie TTS endpoint without needing to know the individual Spark IPs."""
|
|
vllm, parakeet, magpie = await asyncio.gather(
|
|
check_vllm(settings),
|
|
check_parakeet(settings),
|
|
check_magpie(settings),
|
|
)
|
|
return {
|
|
"vllm": {
|
|
"ready": bool(vllm.get("ok")),
|
|
"base_url": vllm.get("base_url"),
|
|
"model": vllm.get("current_model"),
|
|
"openai_compat": True,
|
|
},
|
|
"parakeet": {
|
|
"ready": bool(parakeet.get("ok")),
|
|
"base_url": parakeet.get("base_url"),
|
|
"kind": "stt",
|
|
"model": (parakeet.get("detail") or {}).get("model") if isinstance(parakeet.get("detail"), dict) else None,
|
|
},
|
|
"magpie": {
|
|
"ready": bool(magpie.get("ok")),
|
|
"base_url": magpie.get("base_url"),
|
|
"kind": "tts",
|
|
},
|
|
}
|
|
|
|
|
|
@app.get("/api/status")
|
|
async def get_status() -> dict:
|
|
vllm, parakeet, magpie = await asyncio.gather(
|
|
check_vllm(settings),
|
|
check_parakeet(settings),
|
|
check_magpie(settings),
|
|
)
|
|
current_key = _identify_current_model(vllm.get("current_model"))
|
|
return {
|
|
"configured": settings.configured,
|
|
"vllm": vllm,
|
|
"parakeet": parakeet,
|
|
"magpie": magpie,
|
|
"current_model_key": current_key,
|
|
"current_swap_job": swap_manager.current_job_id,
|
|
}
|
|
|
|
|
|
def _identify_current_model(repo: str | None) -> str | None:
|
|
if not repo:
|
|
return None
|
|
for key, m in catalog.models.items():
|
|
if m.repo == repo:
|
|
return key
|
|
return None
|
|
|
|
|
|
class SwapRequest(BaseModel):
|
|
model_key: str
|
|
dry_run: bool = False
|
|
|
|
|
|
@app.post("/api/swap")
|
|
async def post_swap(req: SwapRequest) -> dict:
|
|
if not settings.configured and not req.dry_run:
|
|
raise HTTPException(503, "spark1 not configured")
|
|
try:
|
|
job = await swap_manager.trigger(req.model_key, dry_run=req.dry_run)
|
|
except KeyError:
|
|
raise HTTPException(404, f"unknown model: {req.model_key}")
|
|
except RuntimeError as e:
|
|
raise HTTPException(409, str(e))
|
|
return {"job_id": job.id, "model_key": job.model_key, "state": job.state}
|
|
|
|
|
|
@app.get("/api/swap/{job_id}")
|
|
async def get_swap(job_id: str) -> dict:
|
|
job = swap_manager.get(job_id)
|
|
if job is None:
|
|
raise HTTPException(404, "no such job")
|
|
return {
|
|
"id": job.id,
|
|
"model_key": job.model_key,
|
|
"state": job.state,
|
|
"started_at": job.started_at,
|
|
"finished_at": job.finished_at,
|
|
"returncode": job.returncode,
|
|
"dry_run": job.dry_run,
|
|
"lines": job.lines,
|
|
}
|
|
|
|
|
|
@app.get("/api/swap/{job_id}/stream")
|
|
async def stream_swap(job_id: str):
|
|
job = swap_manager.get(job_id)
|
|
if job is None:
|
|
raise HTTPException(404, "no such job")
|
|
|
|
async def gen():
|
|
sent = 0
|
|
while True:
|
|
n = len(job.lines)
|
|
if n > sent:
|
|
for line in job.lines[sent:n]:
|
|
payload = json.dumps({"line": line, "state": job.state})
|
|
yield f"data: {payload}\n\n"
|
|
sent = n
|
|
if job.returncode is not None and sent >= len(job.lines):
|
|
payload = json.dumps({
|
|
"state": job.state,
|
|
"returncode": job.returncode,
|
|
"finished_at": job.finished_at,
|
|
})
|
|
yield f"event: done\ndata: {payload}\n\n"
|
|
return
|
|
await asyncio.sleep(0.4)
|
|
|
|
return StreamingResponse(gen(), media_type="text/event-stream")
|
|
|
|
|
|
@app.post("/api/test-connection")
|
|
async def test_connection() -> dict:
|
|
"""Probe both Sparks with a `hostname` command. Useful for the StartOS setup flow."""
|
|
results: dict[str, dict] = {}
|
|
if settings.spark1_host:
|
|
rc, out, err = await ssh_run(settings.spark1_host, settings.spark1_user, "hostname && docker ps --format '{{.Names}}'", settings, timeout=10)
|
|
results["spark1"] = {"ok": rc == 0, "rc": rc, "stdout": out.strip(), "stderr": err.strip()}
|
|
else:
|
|
results["spark1"] = {"ok": False, "error": "not configured"}
|
|
if settings.spark2_host:
|
|
rc, out, err = await ssh_run(settings.spark2_host, settings.spark2_user, "hostname && docker ps --format '{{.Names}}'", settings, timeout=10)
|
|
results["spark2"] = {"ok": rc == 0, "rc": rc, "stdout": out.strip(), "stderr": err.strip()}
|
|
else:
|
|
results["spark2"] = {"ok": False, "error": "not configured"}
|
|
return results
|