1c4e861783
Triaged from a full independent evaluation (EVALUATION.md). Addresses the three P0/P1 code findings; the proxy/data APIs that downstream apps consume are deliberately untouched. - ssh command injection (P0): new shellsafe.py validates + shlex.quotes every user-supplied value crossing into an SSH command on the Sparks (model repo, vllm args/knobs, NIM image/container/volume/port/env, service names). Boundary validation on POST /api/models and POST /api/nim/install; quoting at every sink in models/download/nim/services. NGC key now quoted too. - qdrant path injection (P1): /api/search validates the collection name against a metacharacter-free whitelist and URL-encodes the path segment. - csrf (P1): csrf_guard middleware enforces same-origin on state-changing control endpoints; /v1/*, /scrub, /rehydrate, /api/search, /api/audio/* and /api/health-event are exempt so external consumers are unaffected. Verified: injection survives only as a single quoted token, vLLM preflight shlex.split round-trip intact, CSRF behaviors covered via TestClient, both offline redaction suites still pass, tsc clean, s9pk rebuilt.
153 lines
5.4 KiB
Python
153 lines
5.4 KiB
Python
"""Lifecycle controls for support-service containers (Parakeet, Kokoro, etc.).
|
|
|
|
These are independent always-on containers that don't go through the LLM-swap
|
|
machinery. We just run `docker start|stop|restart <container>` via SSH on the
|
|
appropriate host.
|
|
"""
|
|
from __future__ import annotations
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Literal, Optional
|
|
|
|
from .config import Settings
|
|
from .shellsafe import quote_arg
|
|
from .ssh import ssh_run
|
|
|
|
|
|
# Cache the "unreachable" verdict per (host, user) for a short period so that a
|
|
# repeated docker_state call doesn't re-pay the 6 s SSH connect timeout each time.
|
|
_UNREACHABLE_TTL = 25.0
|
|
_unreachable_cache: dict[tuple[str, str], float] = {}
|
|
|
|
|
|
def _is_recently_unreachable(host: str, user: str) -> bool:
|
|
ts = _unreachable_cache.get((host, user))
|
|
return bool(ts and time.monotonic() - ts < _UNREACHABLE_TTL)
|
|
|
|
|
|
def _mark_unreachable(host: str, user: str) -> None:
|
|
_unreachable_cache[(host, user)] = time.monotonic()
|
|
|
|
|
|
def _clear_unreachable(host: str, user: str) -> None:
|
|
_unreachable_cache.pop((host, user), None)
|
|
|
|
|
|
ServiceName = Literal["parakeet", "kokoro", "embeddings", "qdrant"]
|
|
ServiceAction = Literal["start", "stop", "restart"]
|
|
|
|
# Which service kinds are safe to auto-restart on a wedge probe. GPU model
|
|
# servers can wedge their CUDA context and recover via restart. A vector DB
|
|
# (qdrant) holds the only copy of the index and must NOT be auto-restarted on
|
|
# a transient/benign probe error (e.g. a 404 on a missing collection) — a
|
|
# restart mid-write/mid-snapshot is exactly what we don't want.
|
|
RESTARTABLE_KINDS = {"stt", "tts", "embedding"}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ServiceDef:
|
|
name: str
|
|
kind: str # 'stt' | 'tts' | …
|
|
host: str
|
|
user: str
|
|
container: str
|
|
port: int
|
|
|
|
|
|
def services_from_settings(s: Settings) -> dict[str, ServiceDef]:
|
|
from .custom_services import load_custom_services
|
|
out: dict[str, ServiceDef] = {
|
|
"parakeet": ServiceDef(
|
|
name="parakeet",
|
|
kind="stt",
|
|
host=s.parakeet_host,
|
|
user=s.parakeet_user,
|
|
container=s.parakeet_container,
|
|
port=s.parakeet_port,
|
|
),
|
|
"kokoro": ServiceDef(
|
|
name="kokoro",
|
|
kind="tts",
|
|
host=s.kokoro_host,
|
|
user=s.kokoro_user,
|
|
container=s.kokoro_container,
|
|
port=s.kokoro_port,
|
|
),
|
|
"embeddings": ServiceDef(
|
|
name="embeddings",
|
|
kind="embedding",
|
|
host=s.embed_host,
|
|
user=s.embed_user,
|
|
container=s.embed_container,
|
|
port=s.embed_port,
|
|
),
|
|
"qdrant": ServiceDef(
|
|
name="qdrant",
|
|
kind="vectordb",
|
|
host=s.qdrant_host,
|
|
user=s.qdrant_user,
|
|
container=s.qdrant_container,
|
|
port=s.qdrant_port,
|
|
),
|
|
}
|
|
for entry in load_custom_services():
|
|
key = entry.get("key")
|
|
if not key or key in out:
|
|
continue
|
|
out[key] = ServiceDef(
|
|
name=key,
|
|
kind=entry.get("kind", ""),
|
|
host=entry.get("host", ""),
|
|
user=entry.get("user", ""),
|
|
container=entry.get("container", key),
|
|
port=int(entry.get("port", 0)),
|
|
)
|
|
return out
|
|
|
|
|
|
async def docker_state(settings: Settings, svc: ServiceDef) -> dict:
|
|
"""Get docker state (running, exited, restarting, etc.) + restart count."""
|
|
if not svc.host or not svc.user:
|
|
return {"state": "unconfigured", "restart_count": None, "uptime": None}
|
|
if _is_recently_unreachable(svc.host, svc.user):
|
|
return {"state": "unreachable", "host_unreachable": True, "restart_count": None, "uptime": None}
|
|
cmd = (
|
|
f"docker inspect {quote_arg(svc.container)} "
|
|
f"--format '{{{{.State.Status}}}}|{{{{.State.StartedAt}}}}|{{{{.RestartCount}}}}|{{{{.State.ExitCode}}}}|{{{{.State.Error}}}}' "
|
|
f"2>&1 || echo 'NOT_FOUND'"
|
|
)
|
|
rc, out, _ = await ssh_run(svc.host, svc.user, cmd, settings, timeout=6)
|
|
out = out.strip()
|
|
if rc == 124 or "timeout after" in out.lower():
|
|
_mark_unreachable(svc.host, svc.user)
|
|
return {"state": "unreachable", "host_unreachable": True, "restart_count": None, "uptime": None}
|
|
_clear_unreachable(svc.host, svc.user)
|
|
if rc != 0 or out.startswith("NOT_FOUND") or "Error" in out and "no such object" in out.lower():
|
|
return {"state": "missing", "restart_count": None, "uptime": None, "raw": out}
|
|
parts = out.split("|")
|
|
if len(parts) < 4:
|
|
return {"state": "unknown", "raw": out}
|
|
status, started_at, restart_count, exit_code = parts[0], parts[1], parts[2], parts[3]
|
|
error = parts[4] if len(parts) > 4 else ""
|
|
return {
|
|
"state": status,
|
|
"started_at": started_at,
|
|
"restart_count": int(restart_count) if restart_count.isdigit() else None,
|
|
"exit_code": int(exit_code) if exit_code.lstrip("-").isdigit() else None,
|
|
"error": error or None,
|
|
}
|
|
|
|
|
|
async def run_action(settings: Settings, svc: ServiceDef, action: ServiceAction) -> dict:
|
|
"""Run docker start/stop/restart on the target host."""
|
|
if not svc.host or not svc.user:
|
|
return {"ok": False, "error": "service host not configured"}
|
|
cmd = f"docker {action} {quote_arg(svc.container)}"
|
|
rc, out, err = await ssh_run(svc.host, svc.user, cmd, settings, timeout=30)
|
|
return {
|
|
"ok": rc == 0,
|
|
"rc": rc,
|
|
"stdout": out.strip(),
|
|
"stderr": err.strip(),
|
|
}
|