26070eb191
Make the cluster topology configurable so an adopter wired differently (vLLM on both Sparks, port 8000, different container name, no Parakeet) can monitor without forking. Covers the OpenClaw report P4/P5/#6. - VLLM_CONTAINER override (default vllm_node), validated at the boundary and quote_arg-quoted into the swap log-tail + pre-flight validator exec. - DISABLED_SERVICES list: hidden services show no tile and are skipped by status/deep-health/connectivity probes (kills the Parakeet-on-8000 collision). - kind: vllm custom service monitors a second Spark's vLLM via the shared probe_vllm_endpoint; /api/endpoints gains a disabled flag. Swap mechanism intentionally not generalized to raw docker run (that's coordination, roadmap item 4).
175 lines
6.5 KiB
Python
175 lines
6.5 KiB
Python
"""Lifecycle controls for support-service containers (Parakeet, Kokoro, etc.).
|
|
|
|
These are independent always-on containers that don't go through the LLM-swap
|
|
machinery. We just run `docker start|stop|restart <container>` via SSH on the
|
|
appropriate host.
|
|
"""
|
|
from __future__ import annotations
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Literal, Optional
|
|
|
|
from .config import Settings
|
|
from .shellsafe import quote_arg
|
|
from .ssh import ssh_run
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
# Cache the "unreachable" verdict per (host, user) for a short period so that a
|
|
# repeated docker_state call doesn't re-pay the 6 s SSH connect timeout each time.
|
|
_UNREACHABLE_TTL = 25.0
|
|
_unreachable_cache: dict[tuple[str, str], float] = {}
|
|
|
|
|
|
def _is_recently_unreachable(host: str, user: str) -> bool:
|
|
ts = _unreachable_cache.get((host, user))
|
|
return bool(ts and time.monotonic() - ts < _UNREACHABLE_TTL)
|
|
|
|
|
|
def _mark_unreachable(host: str, user: str) -> None:
|
|
_unreachable_cache[(host, user)] = time.monotonic()
|
|
|
|
|
|
def _clear_unreachable(host: str, user: str) -> None:
|
|
_unreachable_cache.pop((host, user), None)
|
|
|
|
|
|
ServiceName = Literal["parakeet", "kokoro", "embeddings", "qdrant"]
|
|
ServiceAction = Literal["start", "stop", "restart"]
|
|
|
|
# Which service kinds are safe to auto-restart on a wedge probe. GPU model
|
|
# servers can wedge their CUDA context and recover via restart. A vector DB
|
|
# (qdrant) holds the only copy of the index and must NOT be auto-restarted on
|
|
# a transient/benign probe error (e.g. a 404 on a missing collection) — a
|
|
# restart mid-write/mid-snapshot is exactly what we don't want.
|
|
RESTARTABLE_KINDS = {"stt", "tts", "embedding"}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ServiceDef:
|
|
name: str
|
|
kind: str # 'stt' | 'tts' | …
|
|
host: str
|
|
user: str
|
|
container: str
|
|
port: int
|
|
|
|
|
|
def services_from_settings(s: Settings) -> dict[str, ServiceDef]:
|
|
from .custom_services import load_custom_services
|
|
out: dict[str, ServiceDef] = {
|
|
"parakeet": ServiceDef(
|
|
name="parakeet",
|
|
kind="stt",
|
|
host=s.parakeet_host,
|
|
user=s.parakeet_user,
|
|
container=s.parakeet_container,
|
|
port=s.parakeet_port,
|
|
),
|
|
"kokoro": ServiceDef(
|
|
name="kokoro",
|
|
kind="tts",
|
|
host=s.kokoro_host,
|
|
user=s.kokoro_user,
|
|
container=s.kokoro_container,
|
|
port=s.kokoro_port,
|
|
),
|
|
"embeddings": ServiceDef(
|
|
name="embeddings",
|
|
kind="embedding",
|
|
host=s.embed_host,
|
|
user=s.embed_user,
|
|
container=s.embed_container,
|
|
port=s.embed_port,
|
|
),
|
|
"qdrant": ServiceDef(
|
|
name="qdrant",
|
|
kind="vectordb",
|
|
host=s.qdrant_host,
|
|
user=s.qdrant_user,
|
|
container=s.qdrant_container,
|
|
port=s.qdrant_port,
|
|
),
|
|
# matrix-bridge Matrix bot. No HTTP port to probe (host networking, no
|
|
# health endpoint) — judged purely by docker state. Driven as its own
|
|
# SSH user (modelo, the repo owner) so git/docker run unprivileged.
|
|
"matrix-bridge": ServiceDef(
|
|
name="matrix-bridge",
|
|
kind="bot",
|
|
host=s.matrix_bridge_host,
|
|
user=s.matrix_bridge_user,
|
|
container=s.matrix_bridge_container,
|
|
port=0,
|
|
),
|
|
}
|
|
for entry in load_custom_services():
|
|
key = entry.get("key")
|
|
if not key:
|
|
continue
|
|
if key in out:
|
|
# A custom entry can't shadow a built-in (parakeet/kokoro/…); warn so
|
|
# an adopter who picked a colliding key for, say, a second vLLM sees
|
|
# why no tile appeared instead of a silent no-op.
|
|
log.warning("custom service %r collides with a built-in name; ignoring", key)
|
|
continue
|
|
out[key] = ServiceDef(
|
|
name=key,
|
|
kind=entry.get("kind", ""),
|
|
host=entry.get("host", ""),
|
|
user=entry.get("user", ""),
|
|
container=entry.get("container", key),
|
|
port=int(entry.get("port", 0)),
|
|
)
|
|
# Drop services the deployment has switched off (DISABLED_SERVICES) so they
|
|
# show no tile and are never probed/auto-restarted.
|
|
return {k: v for k, v in out.items() if k not in s.disabled_services}
|
|
|
|
|
|
async def docker_state(settings: Settings, svc: ServiceDef) -> dict:
|
|
"""Get docker state (running, exited, restarting, etc.) + restart count."""
|
|
if not svc.host or not svc.user:
|
|
return {"state": "unconfigured", "restart_count": None, "uptime": None}
|
|
if _is_recently_unreachable(svc.host, svc.user):
|
|
return {"state": "unreachable", "host_unreachable": True, "restart_count": None, "uptime": None}
|
|
cmd = (
|
|
f"docker inspect {quote_arg(svc.container)} "
|
|
f"--format '{{{{.State.Status}}}}|{{{{.State.StartedAt}}}}|{{{{.RestartCount}}}}|{{{{.State.ExitCode}}}}|{{{{.State.Error}}}}' "
|
|
f"2>&1 || echo 'NOT_FOUND'"
|
|
)
|
|
rc, out, _ = await ssh_run(svc.host, svc.user, cmd, settings, timeout=6)
|
|
out = out.strip()
|
|
if rc == 124 or "timeout after" in out.lower():
|
|
_mark_unreachable(svc.host, svc.user)
|
|
return {"state": "unreachable", "host_unreachable": True, "restart_count": None, "uptime": None}
|
|
_clear_unreachable(svc.host, svc.user)
|
|
if rc != 0 or out.startswith("NOT_FOUND") or "Error" in out and "no such object" in out.lower():
|
|
return {"state": "missing", "restart_count": None, "uptime": None, "raw": out}
|
|
parts = out.split("|")
|
|
if len(parts) < 4:
|
|
return {"state": "unknown", "raw": out}
|
|
status, started_at, restart_count, exit_code = parts[0], parts[1], parts[2], parts[3]
|
|
error = parts[4] if len(parts) > 4 else ""
|
|
return {
|
|
"state": status,
|
|
"started_at": started_at,
|
|
"restart_count": int(restart_count) if restart_count.isdigit() else None,
|
|
"exit_code": int(exit_code) if exit_code.lstrip("-").isdigit() else None,
|
|
"error": error or None,
|
|
}
|
|
|
|
|
|
async def run_action(settings: Settings, svc: ServiceDef, action: ServiceAction) -> dict:
|
|
"""Run docker start/stop/restart on the target host."""
|
|
if not svc.host or not svc.user:
|
|
return {"ok": False, "error": "service host not configured"}
|
|
cmd = f"docker {action} {quote_arg(svc.container)}"
|
|
rc, out, err = await ssh_run(svc.host, svc.user, cmd, settings, timeout=30)
|
|
return {
|
|
"ok": rc == 0,
|
|
"rc": rc,
|
|
"stdout": out.strip(),
|
|
"stderr": err.strip(),
|
|
}
|