Files
Keysat 7ae6ab3ba8 v0.25.0:0 - cluster coordination layer (swap lock + webhook + schedule registry)
GPU-arbiter safety layer for when automation, not just the dashboard, swaps
models:
- swap reservation lock (POST/GET/DELETE /api/swap/lock); 423-enforced in
  post_swap via a single-read gate, TTL-bounded, secret-token auth, human
  force-release override + dashboard banner
- swap webhook (swap_complete/swap_failed) fired outside the swap lock, optional
  HMAC signature, configurable URL+secret
- read-only schedule registry (GET/POST/DELETE /api/schedule) + dashboard panel

New module image/app/coordination.py; docs/COORDINATION.md for consumers; 22
offline tests in test_coordination.py.
2026-06-18 07:07:08 -05:00

180 lines
7.4 KiB
Python

from __future__ import annotations
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from .shellsafe import validate_container
log = logging.getLogger(__name__)
def _env(name: str, default: str = "") -> str:
return os.environ.get(name, default)
def _env_container(name: str, default: str) -> str:
"""Resolve a container-name env var, validating it at the config boundary.
The value flows into `docker logs`/`docker exec` over SSH, so it's quoted at
the sink — but per the repo's two-layer convention it's also whitelist-checked
here. A malformed optional value falls back to `default` rather than crashing
daemon startup (mirrors `_env_int` for VLLM_PORT)."""
val = os.environ.get(name, "") or default
try:
return validate_container(val)
except ValueError:
log.warning("ignoring invalid %s=%r; using %r", name, val, default)
return default
def _env_set(name: str) -> frozenset[str]:
"""Parse a comma-separated env var into a lowercased frozenset of keys.
Used by DISABLED_SERVICES so an adopter whose cluster doesn't run a given
support service can switch its tile + probes off entirely (rather than have
the probe hit whatever else listens on that port — e.g. a vLLM sharing
Parakeet's default 8000)."""
raw = os.environ.get(name, "")
return frozenset(part.strip().lower() for part in raw.split(",") if part.strip())
def _env_int(name: str, default: int) -> int:
"""Parse an int env var, falling back to `default` when unset, blank, or
malformed. The StartOS Configure panel passes optional numeric fields as an
empty string when left blank, so a bare int("") would crash daemon startup."""
try:
return int(os.environ.get(name, "") or default)
except (TypeError, ValueError):
return default
def _resolve_models_yaml() -> str:
if env := os.environ.get("MODELS_YAML"):
return env
here = Path(__file__).resolve().parent # app/
candidates = [
here.parent / "models.yaml", # image/models.yaml (Docker)
here.parent.parent / "models.yaml", # <repo>/models.yaml (dev)
Path("/app/models.yaml"), # explicit container path
]
for p in candidates:
if p.exists():
return str(p)
return str(candidates[0]) # let load fail with a clear path
@dataclass(frozen=True)
class Settings:
spark1_host: str
spark1_user: str
spark2_host: str
spark2_user: str
parakeet_host: str
parakeet_user: str
parakeet_container: str
kokoro_host: str
kokoro_user: str
kokoro_container: str
embed_host: str
embed_user: str
embed_container: str
qdrant_host: str
qdrant_user: str
qdrant_container: str
qdrant_collection: str
matrix_bridge_host: str
matrix_bridge_user: str
matrix_bridge_container: str
matrix_bridge_dir: str
matrix_bridge_branch: str
redaction_map_db: str
redaction_map_ttl: int
ssh_key_path: str
ssh_known_hosts: str
models_yaml: str
vllm_port: int
vllm_container: str
disabled_services: frozenset[str]
parakeet_port: int
kokoro_port: int
embed_port: int
qdrant_port: int
bind_port: int
open_webui_url: str
ngc_api_key: str
swap_webhook_url: str
swap_webhook_secret: str
@classmethod
def from_env(cls) -> "Settings":
spark2_host = _env("SPARK2_HOST")
spark2_user = _env("SPARK2_USER")
# Parakeet (STT) and Kokoro (TTS) default to Spark 2 unless overridden.
return cls(
spark1_host=_env("SPARK1_HOST"),
spark1_user=_env("SPARK1_USER"),
spark2_host=spark2_host,
spark2_user=spark2_user,
parakeet_host=_env("PARAKEET_HOST") or spark2_host,
parakeet_user=_env("PARAKEET_USER") or spark2_user,
parakeet_container=_env("PARAKEET_CONTAINER") or "parakeet-asr",
kokoro_host=_env("KOKORO_HOST") or spark2_host,
kokoro_user=_env("KOKORO_USER") or spark2_user,
kokoro_container=_env("KOKORO_CONTAINER") or "kokoro-tts",
# Embeddings (spark-embed: bge-m3 dense + reranker) and Qdrant
# (vector storage) default to Spark 2 unless overridden.
embed_host=_env("EMBED_HOST") or spark2_host,
embed_user=_env("EMBED_USER") or spark2_user,
embed_container=_env("EMBED_CONTAINER") or "spark-embed",
qdrant_host=_env("QDRANT_HOST") or spark2_host,
qdrant_user=_env("QDRANT_USER") or spark2_user,
qdrant_container=_env("QDRANT_CONTAINER") or "qdrant",
qdrant_collection=_env("QDRANT_COLLECTION", ""),
# matrix-bridge bot container, driven as its own SSH user (the owner
# of the ~/matrix-bridge git clone) so git/docker run unprivileged.
# The user is BLANK by default and set via the "Configure Sparks"
# action; leaving it blank reports the service as unconfigured, which
# hides the tile. That keeps the shared package portable — a
# deployment without the bot never shows a stray tile or a hardcoded
# username. Host defaults to Spark 2 (same box); container/dir/branch
# are sensible defaults. All are env-overridable.
matrix_bridge_host=_env("MATRIX_BRIDGE_HOST") or spark2_host,
matrix_bridge_user=_env("MATRIX_BRIDGE_USER"),
matrix_bridge_container=_env("MATRIX_BRIDGE_CONTAINER") or "matrix-bridge",
matrix_bridge_dir=_env("MATRIX_BRIDGE_DIR") or "~/matrix-bridge",
matrix_bridge_branch=_env("MATRIX_BRIDGE_BRANCH") or "master",
# Redaction gateway pseudonym-map store (server-held de-anon key).
redaction_map_db=_env("REDACTION_MAP_DB", "/data/redaction_maps.db"),
redaction_map_ttl=_env_int("REDACTION_MAP_TTL", 7200),
ssh_key_path=_env("SSH_KEY_PATH"),
ssh_known_hosts=_env("SSH_KNOWN_HOSTS"),
models_yaml=_resolve_models_yaml(),
vllm_port=_env_int("VLLM_PORT", 8888),
# Container name for the swappable vLLM on Spark 1. Defaults to the
# bundled launch-cluster.sh container; override if you named yours
# something else (the swap log-tail and pre-flight validator exec
# into it by name).
vllm_container=_env_container("VLLM_CONTAINER", "vllm_node"),
# Built-in support-service keys (parakeet, kokoro, embeddings,
# qdrant) the deployment doesn't run — hidden from the dashboard and
# never probed.
disabled_services=_env_set("DISABLED_SERVICES"),
parakeet_port=_env_int("PARAKEET_PORT", 8000),
kokoro_port=_env_int("KOKORO_PORT", 8880),
embed_port=_env_int("EMBED_PORT", 8088),
qdrant_port=_env_int("QDRANT_PORT", 6333),
bind_port=_env_int("BIND_PORT", 9999),
open_webui_url=_env("OPEN_WEBUI_URL", ""),
ngc_api_key=_env("NGC_API_KEY", ""),
# Coordination layer: fire a swap-lifecycle webhook to this URL so
# downstream consumers re-point their model config on a swap. Blank
# ⇒ disabled. The optional secret HMAC-signs the body (X-Spark-Signature).
swap_webhook_url=_env("SWAP_WEBHOOK_URL", ""),
swap_webhook_secret=_env("SWAP_WEBHOOK_SECRET", ""),
)
@property
def configured(self) -> bool:
return bool(self.spark1_host)