Files
Keysat 26070eb191 v0.24.0:0 - configurable cluster topology (vllm container name, hide services, second-vllm monitor)
Make the cluster topology configurable so an adopter wired differently
(vLLM on both Sparks, port 8000, different container name, no Parakeet)
can monitor without forking. Covers the OpenClaw report P4/P5/#6.

- VLLM_CONTAINER override (default vllm_node), validated at the boundary
  and quote_arg-quoted into the swap log-tail + pre-flight validator exec.
- DISABLED_SERVICES list: hidden services show no tile and are skipped by
  status/deep-health/connectivity probes (kills the Parakeet-on-8000
  collision).
- kind: vllm custom service monitors a second Spark's vLLM via the shared
  probe_vllm_endpoint; /api/endpoints gains a disabled flag.

Swap mechanism intentionally not generalized to raw docker run (that's
coordination, roadmap item 4).
2026-06-17 23:03:33 -05:00

175 lines
6.5 KiB
Python

"""Lifecycle controls for support-service containers (Parakeet, Kokoro, etc.).
These are independent always-on containers that don't go through the LLM-swap
machinery. We just run `docker start|stop|restart <container>` via SSH on the
appropriate host.
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass
from typing import Literal, Optional
from .config import Settings
from .shellsafe import quote_arg
from .ssh import ssh_run
log = logging.getLogger(__name__)
# Cache the "unreachable" verdict per (host, user) for a short period so that a
# repeated docker_state call doesn't re-pay the 6 s SSH connect timeout each time.
_UNREACHABLE_TTL = 25.0
_unreachable_cache: dict[tuple[str, str], float] = {}
def _is_recently_unreachable(host: str, user: str) -> bool:
ts = _unreachable_cache.get((host, user))
return bool(ts and time.monotonic() - ts < _UNREACHABLE_TTL)
def _mark_unreachable(host: str, user: str) -> None:
_unreachable_cache[(host, user)] = time.monotonic()
def _clear_unreachable(host: str, user: str) -> None:
_unreachable_cache.pop((host, user), None)
ServiceName = Literal["parakeet", "kokoro", "embeddings", "qdrant"]
ServiceAction = Literal["start", "stop", "restart"]
# Which service kinds are safe to auto-restart on a wedge probe. GPU model
# servers can wedge their CUDA context and recover via restart. A vector DB
# (qdrant) holds the only copy of the index and must NOT be auto-restarted on
# a transient/benign probe error (e.g. a 404 on a missing collection) — a
# restart mid-write/mid-snapshot is exactly what we don't want.
RESTARTABLE_KINDS = {"stt", "tts", "embedding"}
@dataclass(frozen=True)
class ServiceDef:
name: str
kind: str # 'stt' | 'tts' | …
host: str
user: str
container: str
port: int
def services_from_settings(s: Settings) -> dict[str, ServiceDef]:
from .custom_services import load_custom_services
out: dict[str, ServiceDef] = {
"parakeet": ServiceDef(
name="parakeet",
kind="stt",
host=s.parakeet_host,
user=s.parakeet_user,
container=s.parakeet_container,
port=s.parakeet_port,
),
"kokoro": ServiceDef(
name="kokoro",
kind="tts",
host=s.kokoro_host,
user=s.kokoro_user,
container=s.kokoro_container,
port=s.kokoro_port,
),
"embeddings": ServiceDef(
name="embeddings",
kind="embedding",
host=s.embed_host,
user=s.embed_user,
container=s.embed_container,
port=s.embed_port,
),
"qdrant": ServiceDef(
name="qdrant",
kind="vectordb",
host=s.qdrant_host,
user=s.qdrant_user,
container=s.qdrant_container,
port=s.qdrant_port,
),
# matrix-bridge Matrix bot. No HTTP port to probe (host networking, no
# health endpoint) — judged purely by docker state. Driven as its own
# SSH user (modelo, the repo owner) so git/docker run unprivileged.
"matrix-bridge": ServiceDef(
name="matrix-bridge",
kind="bot",
host=s.matrix_bridge_host,
user=s.matrix_bridge_user,
container=s.matrix_bridge_container,
port=0,
),
}
for entry in load_custom_services():
key = entry.get("key")
if not key:
continue
if key in out:
# A custom entry can't shadow a built-in (parakeet/kokoro/…); warn so
# an adopter who picked a colliding key for, say, a second vLLM sees
# why no tile appeared instead of a silent no-op.
log.warning("custom service %r collides with a built-in name; ignoring", key)
continue
out[key] = ServiceDef(
name=key,
kind=entry.get("kind", ""),
host=entry.get("host", ""),
user=entry.get("user", ""),
container=entry.get("container", key),
port=int(entry.get("port", 0)),
)
# Drop services the deployment has switched off (DISABLED_SERVICES) so they
# show no tile and are never probed/auto-restarted.
return {k: v for k, v in out.items() if k not in s.disabled_services}
async def docker_state(settings: Settings, svc: ServiceDef) -> dict:
"""Get docker state (running, exited, restarting, etc.) + restart count."""
if not svc.host or not svc.user:
return {"state": "unconfigured", "restart_count": None, "uptime": None}
if _is_recently_unreachable(svc.host, svc.user):
return {"state": "unreachable", "host_unreachable": True, "restart_count": None, "uptime": None}
cmd = (
f"docker inspect {quote_arg(svc.container)} "
f"--format '{{{{.State.Status}}}}|{{{{.State.StartedAt}}}}|{{{{.RestartCount}}}}|{{{{.State.ExitCode}}}}|{{{{.State.Error}}}}' "
f"2>&1 || echo 'NOT_FOUND'"
)
rc, out, _ = await ssh_run(svc.host, svc.user, cmd, settings, timeout=6)
out = out.strip()
if rc == 124 or "timeout after" in out.lower():
_mark_unreachable(svc.host, svc.user)
return {"state": "unreachable", "host_unreachable": True, "restart_count": None, "uptime": None}
_clear_unreachable(svc.host, svc.user)
if rc != 0 or out.startswith("NOT_FOUND") or "Error" in out and "no such object" in out.lower():
return {"state": "missing", "restart_count": None, "uptime": None, "raw": out}
parts = out.split("|")
if len(parts) < 4:
return {"state": "unknown", "raw": out}
status, started_at, restart_count, exit_code = parts[0], parts[1], parts[2], parts[3]
error = parts[4] if len(parts) > 4 else ""
return {
"state": status,
"started_at": started_at,
"restart_count": int(restart_count) if restart_count.isdigit() else None,
"exit_code": int(exit_code) if exit_code.lstrip("-").isdigit() else None,
"error": error or None,
}
async def run_action(settings: Settings, svc: ServiceDef, action: ServiceAction) -> dict:
"""Run docker start/stop/restart on the target host."""
if not svc.host or not svc.user:
return {"ok": False, "error": "service host not configured"}
cmd = f"docker {action} {quote_arg(svc.container)}"
rc, out, err = await ssh_run(svc.host, svc.user, cmd, settings, timeout=30)
return {
"ok": rc == 0,
"rc": rc,
"stdout": out.strip(),
"stderr": err.strip(),
}