Files
spark-control/image/app/services.py
T

164 lines
5.9 KiB
Python

"""Lifecycle controls for support-service containers (Parakeet, Kokoro, etc.).
These are independent always-on containers that don't go through the LLM-swap
machinery. We just run `docker start|stop|restart <container>` via SSH on the
appropriate host.
"""
from __future__ import annotations
import time
from dataclasses import dataclass
from typing import Literal, Optional
from .config import Settings
from .shellsafe import quote_arg
from .ssh import ssh_run
# Cache the "unreachable" verdict per (host, user) for a short period so that a
# repeated docker_state call doesn't re-pay the 6 s SSH connect timeout each time.
_UNREACHABLE_TTL = 25.0
_unreachable_cache: dict[tuple[str, str], float] = {}
def _is_recently_unreachable(host: str, user: str) -> bool:
ts = _unreachable_cache.get((host, user))
return bool(ts and time.monotonic() - ts < _UNREACHABLE_TTL)
def _mark_unreachable(host: str, user: str) -> None:
_unreachable_cache[(host, user)] = time.monotonic()
def _clear_unreachable(host: str, user: str) -> None:
_unreachable_cache.pop((host, user), None)
ServiceName = Literal["parakeet", "kokoro", "embeddings", "qdrant"]
ServiceAction = Literal["start", "stop", "restart"]
# Which service kinds are safe to auto-restart on a wedge probe. GPU model
# servers can wedge their CUDA context and recover via restart. A vector DB
# (qdrant) holds the only copy of the index and must NOT be auto-restarted on
# a transient/benign probe error (e.g. a 404 on a missing collection) — a
# restart mid-write/mid-snapshot is exactly what we don't want.
RESTARTABLE_KINDS = {"stt", "tts", "embedding"}
@dataclass(frozen=True)
class ServiceDef:
name: str
kind: str # 'stt' | 'tts' | …
host: str
user: str
container: str
port: int
def services_from_settings(s: Settings) -> dict[str, ServiceDef]:
from .custom_services import load_custom_services
out: dict[str, ServiceDef] = {
"parakeet": ServiceDef(
name="parakeet",
kind="stt",
host=s.parakeet_host,
user=s.parakeet_user,
container=s.parakeet_container,
port=s.parakeet_port,
),
"kokoro": ServiceDef(
name="kokoro",
kind="tts",
host=s.kokoro_host,
user=s.kokoro_user,
container=s.kokoro_container,
port=s.kokoro_port,
),
"embeddings": ServiceDef(
name="embeddings",
kind="embedding",
host=s.embed_host,
user=s.embed_user,
container=s.embed_container,
port=s.embed_port,
),
"qdrant": ServiceDef(
name="qdrant",
kind="vectordb",
host=s.qdrant_host,
user=s.qdrant_user,
container=s.qdrant_container,
port=s.qdrant_port,
),
# matrix-bridge Matrix bot. No HTTP port to probe (host networking, no
# health endpoint) — judged purely by docker state. Driven as its own
# SSH user (modelo, the repo owner) so git/docker run unprivileged.
"matrix-bridge": ServiceDef(
name="matrix-bridge",
kind="bot",
host=s.matrix_bridge_host,
user=s.matrix_bridge_user,
container=s.matrix_bridge_container,
port=0,
),
}
for entry in load_custom_services():
key = entry.get("key")
if not key or key in out:
continue
out[key] = ServiceDef(
name=key,
kind=entry.get("kind", ""),
host=entry.get("host", ""),
user=entry.get("user", ""),
container=entry.get("container", key),
port=int(entry.get("port", 0)),
)
return out
async def docker_state(settings: Settings, svc: ServiceDef) -> dict:
"""Get docker state (running, exited, restarting, etc.) + restart count."""
if not svc.host or not svc.user:
return {"state": "unconfigured", "restart_count": None, "uptime": None}
if _is_recently_unreachable(svc.host, svc.user):
return {"state": "unreachable", "host_unreachable": True, "restart_count": None, "uptime": None}
cmd = (
f"docker inspect {quote_arg(svc.container)} "
f"--format '{{{{.State.Status}}}}|{{{{.State.StartedAt}}}}|{{{{.RestartCount}}}}|{{{{.State.ExitCode}}}}|{{{{.State.Error}}}}' "
f"2>&1 || echo 'NOT_FOUND'"
)
rc, out, _ = await ssh_run(svc.host, svc.user, cmd, settings, timeout=6)
out = out.strip()
if rc == 124 or "timeout after" in out.lower():
_mark_unreachable(svc.host, svc.user)
return {"state": "unreachable", "host_unreachable": True, "restart_count": None, "uptime": None}
_clear_unreachable(svc.host, svc.user)
if rc != 0 or out.startswith("NOT_FOUND") or "Error" in out and "no such object" in out.lower():
return {"state": "missing", "restart_count": None, "uptime": None, "raw": out}
parts = out.split("|")
if len(parts) < 4:
return {"state": "unknown", "raw": out}
status, started_at, restart_count, exit_code = parts[0], parts[1], parts[2], parts[3]
error = parts[4] if len(parts) > 4 else ""
return {
"state": status,
"started_at": started_at,
"restart_count": int(restart_count) if restart_count.isdigit() else None,
"exit_code": int(exit_code) if exit_code.lstrip("-").isdigit() else None,
"error": error or None,
}
async def run_action(settings: Settings, svc: ServiceDef, action: ServiceAction) -> dict:
"""Run docker start/stop/restart on the target host."""
if not svc.host or not svc.user:
return {"ok": False, "error": "service host not configured"}
cmd = f"docker {action} {quote_arg(svc.container)}"
rc, out, err = await ssh_run(svc.host, svc.user, cmd, settings, timeout=30)
return {
"ok": rc == 0,
"rc": rc,
"stdout": out.strip(),
"stderr": err.strip(),
}