7 Commits

Author SHA1 Message Date
Keysat c0b35184ba docs: trim Current state to live status — coordination epic shipped 2026-06-18 08:09:59 -05:00
Keysat 7ecd77f1e5 docs: defer raw-docker swap generalization — multi-node rationale recorded 2026-06-18 07:58:25 -05:00
Keysat 6bcda6e348 docs: v0.25.0:0 installed live — update Current state 2026-06-18 07:11:33 -05:00
Keysat 7ae6ab3ba8 v0.25.0:0 - cluster coordination layer (swap lock + webhook + schedule registry)
GPU-arbiter safety layer for when automation, not just the dashboard, swaps
models:
- swap reservation lock (POST/GET/DELETE /api/swap/lock); 423-enforced in
  post_swap via a single-read gate, TTL-bounded, secret-token auth, human
  force-release override + dashboard banner
- swap webhook (swap_complete/swap_failed) fired outside the swap lock, optional
  HMAC signature, configurable URL+secret
- read-only schedule registry (GET/POST/DELETE /api/schedule) + dashboard panel

New module image/app/coordination.py; docs/COORDINATION.md for consumers; 22
offline tests in test_coordination.py.
2026-06-18 07:07:08 -05:00
Keysat dd3d1412d4 docs: v0.24.0:0 committed/tagged/pushed — Gitea release asset + live install still pending 2026-06-17 23:11:14 -05:00
Keysat 26070eb191 v0.24.0:0 - configurable cluster topology (vllm container name, hide services, second-vllm monitor)
Make the cluster topology configurable so an adopter wired differently
(vLLM on both Sparks, port 8000, different container name, no Parakeet)
can monitor without forking. Covers the OpenClaw report P4/P5/#6.

- VLLM_CONTAINER override (default vllm_node), validated at the boundary
  and quote_arg-quoted into the swap log-tail + pre-flight validator exec.
- DISABLED_SERVICES list: hidden services show no tile and are skipped by
  status/deep-health/connectivity probes (kills the Parakeet-on-8000
  collision).
- kind: vllm custom service monitors a second Spark's vLLM via the shared
  probe_vllm_endpoint; /api/endpoints gains a disabled flag.

Swap mechanism intentionally not generalized to raw docker run (that's
coordination, roadmap item 4).
2026-06-17 23:03:33 -05:00
Keysat 90394f891b docs: v0.23.0 published, live install pending (mDNS); runbook sideload troubleshooting 2026-06-17 22:36:41 -05:00
22 changed files with 1353 additions and 38 deletions
+8 -5
View File
@@ -33,7 +33,7 @@ Subsystem guidance lives in `docs/guides/` and loads when matching files are tou
- `image/app/` — FastAPI app (`server.py` entry, routers in sibling modules, `static/` dashboard UI). - `image/app/` — FastAPI app (`server.py` entry, routers in sibling modules, `static/` dashboard UI).
- `package/startos/` — StartOS manifest, interfaces, actions, version + release notes. - `package/startos/` — StartOS manifest, interfaces, actions, version + release notes.
- `docs/``AUDIO_API.md`, `EMBEDDINGS.md`, `REDACTION_GATEWAY.md` (consumer-facing API refs; update with API changes). - `docs/``AUDIO_API.md`, `EMBEDDINGS.md`, `REDACTION_GATEWAY.md`, `COORDINATION.md` (consumer-facing API refs; update with API changes).
- `README.md` (overview), `HANDOFF.md` (fresh-user install guide), `runbook.md` (ops notes), `known-issues.md`, `ROADMAP.md` (longer-term backlog — items move into "Current state" below when picked up). - `README.md` (overview), `HANDOFF.md` (fresh-user install guide), `runbook.md` (ops notes), `known-issues.md`, `ROADMAP.md` (longer-term backlog — items move into "Current state" below when picked up).
## Conventions ## Conventions
@@ -55,12 +55,15 @@ Subsystem guidance lives in `docs/guides/` and loads when matching files are tou
## Current state ## Current state
- **Working (v0.22.0:0, installed and serving):** swap dashboard; chat / transcribe / diarize(+chunk) / TTS proxies; embeddings + rerank + hybrid search (Qdrant); `/scrub` + `/rehydrate`; label-merge incl. dual-channel; per-Spark SSH-key copy + WireGuard `VPN <ip>` hardware-card badge; configurable vLLM port (Configure Sparks field, blank ⇒ 8888). Spark 2 audio stack healthy. Security hardening (v0.19.0:0 — shellsafe SSH-injection guard, Qdrant path-injection, same-origin CSRF guard) shipped and stable; evidence in `EVALUATION.md`. - **Live: v0.25.0:0** (installed 2026-06-18, server reports `status: installed`). The OpenClaw/Johnny-5 coexistence epic is fully shipped & live: configurable `VLLM_PORT` (v0.22, blank ⇒ 8888), local/fine-tuned models (v0.23), configurable topology (v0.24 — `VLLM_CONTAINER`, `DISABLED_SERVICES` hide-list, second-Spark `kind: vllm` monitor), coordination layer (v0.25 — swap reservation lock with `423`-enforced manual-swap pause + `?force=true` Release override, `swap_complete`/`swap_failed` webhook, read-only schedule registry; consumer API in `docs/COORDINATION.md`).
- **matrix-bridge bot tile (done, v0.21.0:1, verified live):** `bot`-kind service tile — status badge from docker-state only (no HTTP port), plus **Update** / Restart / Stop/Start / **View logs**. Code: `app/matrix_bridge.py` + `/api/matrix-bridge/{update,logs}` (update streams; 25-min cap; fail-loud). Driven directly as `modelo` on Spark 2 (**no `sudo -iu`** — spark2 has no passwordless sudo). User is a blank-default Configure-Sparks field (`matrix_bridge_user`); blank → tile hidden (portable). Host reuses `spark2_host` (`192.168.1.87` = the bot's box `spark-32d0`); container/dir/branch are env-overridable defaults. **Load-bearing ops dep:** Update's `git fetch` runs as `modelo`, which needs `modelo`'s `~/.ssh/config` pinning the Gitea deploy key with `IdentitiesOnly yes` — else the wrong key is offered and Gitea denies (publickey). Optional next, only if the bot dev asks: Docker `HEALTHCHECK` for running-but-disconnected detection (spec §Note). - **Other live features:** swap dashboard; chat / transcribe / diarize(+chunk) / TTS proxies; embeddings + rerank + hybrid search (Qdrant); `/scrub` + `/rehydrate`; label-merge incl. dual-channel; per-Spark SSH-key copy + WireGuard `VPN <ip>` hardware badge. Security hardening (v0.19 — shellsafe SSH-injection guard, Qdrant path-injection, same-origin CSRF guard) stable (`EVALUATION.md`). Spark 2 audio/embeddings stack healthy.
- **Tests:** offline pytest harness in `image/tests/``cd image && .venv/bin/python -m pytest` (70 passing). Covers `build_launch_command` (incl. the shell-injection round-trip), the transcript↔diarizer label-merge, the `shellsafe` validators, and `matrix_bridge.build_update_command` (+ phase detection). Mock-heavy swap/proxy tests deliberately skipped (low ROI). Redaction + live-audio suites remain standalone scripts. - **matrix-bridge bot tile (v0.21.0:1, live):** `bot`-kind tile (docker-state badge; Update/Restart/Stop-Start/View-logs) for the Matrix bot on Spark 2, driven as `modelo` (no `sudo -iu`; blank `matrix_bridge_user` ⇒ tile hidden; host reuses `spark2_host`). Code: `app/matrix_bridge.py` + `/api/matrix-bridge/{update,logs}`. **Load-bearing:** Update's `git fetch` runs as `modelo` and needs `modelo`'s `~/.ssh/config` pinning the Gitea deploy key with `IdentitiesOnly yes` (else publickey denial). Optional next only if the bot dev asks: Docker `HEALTHCHECK`.
- **Tests:** offline pytest harness in `image/tests/``cd image && .venv/bin/python -m pytest` (124 passing). Covers `build_launch_command` (incl. the shell-injection round-trip + local-model bind-mount), the transcript↔diarizer label-merge, the `shellsafe` validators, `matrix_bridge.build_update_command` (+ phase detection), the configurable-topology layer (`test_topology.py`), and the coordination layer (`test_coordination.py`: swap-lock lifecycle/expiry/token-auth, schedule-registry CRUD, webhook payload + HMAC signature — `now` is injected into the lock so expiry is tested without sleeping). Mock-heavy swap/proxy/endpoint tests deliberately skipped (low ROI). Redaction + live-audio suites remain standalone scripts.
- **Signal Engine "flakiness":** diagnosed as *not* a server bug — transient 14s unresponsiveness while the single GPU is busy. Client-side remedy (in-flight cap 2 / ceiling 3 / retry-on-timeout+503) drafted and **forwarded to that dev (owner confirmed 2026-06-15)**. Awaiting whether they want the measured concurrency knee. - **Signal Engine "flakiness":** diagnosed as *not* a server bug — transient 14s unresponsiveness while the single GPU is busy. Client-side remedy (in-flight cap 2 / ceiling 3 / retry-on-timeout+503) drafted and **forwarded to that dev (owner confirmed 2026-06-15)**. Awaiting whether they want the measured concurrency knee.
- **Stance (decided, not built):** no public interface / no API-token auth — LAN + WireGuard/Tailscale split-tunnel only; the CSRF guard covers the browser-driven vector. - **Stance (decided, not built):** no public interface / no API-token auth — LAN + WireGuard/Tailscale split-tunnel only; the CSRF guard covers the browser-driven vector.
- **Known limits:** `/health` blips while the GPU is busy (mitigated client-side); dual-channel can miss a quiet local word under loud remote bleed; connectivity log misses sub-5s outages between 5s polls; diarizer caps at 4 speakers; matrix-bridge badge won't visibly flip on a fast `docker restart` (status re-checked only after the command returns). - **Known limits:** `/health` blips while the GPU is busy (mitigated client-side); dual-channel can miss a quiet local word under loud remote bleed; connectivity log misses sub-5s outages between 5s polls; diarizer caps at 4 speakers; matrix-bridge badge won't visibly flip on a fast `docker restart` (status re-checked only after the command returns).
- **Infra gotcha (safety):** passwordless sudo is NOT configured on spark2 — design unprivileged probes for any Spark feature (the badge uses `ip`, not `sudo wg show`). spark2 sits on the `starttunnel` WireGuard subnet (`10.59.211.6/24`, survives reboot). Owner declined SSH-key rotation after the 2026-06-12 history scrub (only the key *name* leaked) — don't re-flag. - **Infra gotcha (safety):** passwordless sudo is NOT configured on spark2 — design unprivileged probes for any Spark feature (the badge uses `ip`, not `sudo wg show`). spark2 sits on the `starttunnel` WireGuard subnet (`10.59.211.6/24`, survives reboot). Owner declined SSH-key rotation after the 2026-06-12 history scrub (only the key *name* leaked) — don't re-flag.
- **Hosting:** self-hosted Gitea — remote `gitea`, branch `master`, over SSH; push after committing. (Wart: commit `8d839e3` is mislabeled `v0.13.0:4` but contains through v0.18.0:0.) - **Hosting:** self-hosted Gitea — remote `gitea`, branch `master`, over SSH; push after committing. (Wart: commit `8d839e3` is mislabeled `v0.13.0:4` but contains through v0.18.0:0.)
- **Next — committed 2026-06-17: OpenClaw/Johnny-5 coexistence epic (full plan + design stance in `ROADMAP.md` → "Cluster coordination").** Stance: Spark Control = control plane / GPU arbiter, **not** a job runner; business cron jobs live in separate services that *call* its swap API (swaps are already API-driven via `POST /api/swap`). Sequence: (1) **configurable `VLLM_PORT`** — SHIPPED **v0.22.0:0** (Configure-Sparks field, blank ⇒ 8888; + `_env_int` hardening in `config.py` so a blank/bad port no longer crashes startup, killing a P3 tech-debt item). Committed `136a471`, pushed, tagged `v0.22.0`, rebuilt clean, installed, and **published to the self-hosted Gitea Releases** 2026-06-17 (`make release``scripts/gitea-release.sh`, takes `GITEA_URL` + a write token). **Distribution model (decided 2026-06-17):** Gitea Releases + a read-only token the adopter's agent uses to pull the latest s9pk (`GET /api/v1/repos/grant/spark-control/releases/latest` → download the `.s9pk` asset → sideload). Note: Gitea returns `browser_download_url` on its `.local` ROOT_URL, which won't resolve off-LAN — a remote adopter pulls via whatever address reaches the Gitea (the WireGuard IP). (2) **local-path/fine-tuned models** — DONE in tree, staged as **v0.23.0:0** (`ModelDef.local_path` + exactly-one-source validator; swap bind-mounts the dir at the same container path via the launch script's `VLLM_SPARK_EXTRA_DOCKER_ARGS` hook, **no `launch-cluster.sh` change**; "+ Add local model" UI form + `local` badge; `validate_local_path`; disk-delete refused for local; 94 tests pass; verified via TestClient). **Reviewer-agent pass done; findings addressed:** path validation folded into the `ModelDef` validator (so YAML/override-added local models are checked too), a chat-template-must-live-inside-`local_path` guard, `_merge_overrides` skips a bad entry instead of breaking the whole catalog, and the `VLLM_SPARK_EXTRA_DOCKER_ARGS` unquoted-expansion contract is documented in `runbook.md`. **Not yet built/installed/published — awaiting go/no-go.** Next: (3) configurable topology (service→Spark→port map + container names); (4) coordination layer (swap lock + swap webhook + schedule visibility) — only when our own automation lands. Still-open older threads: audio concurrency sweep (only if the Signal Engine dev wants the knee; needs a quiet window); optional matrix-bridge Docker `HEALTHCHECK` if the bot dev asks; Parakeet long-audio guard deferred (rationale in ROADMAP). - **Design stance (decided):** Spark Control = control plane / GPU arbiter, **not** a job runner; recurring business jobs live in separate services that *call* the swap API (`POST /api/swap`). Full epic history (v0.22→v0.25) is in git log + `ROADMAP.md` → "Cluster coordination".
- **Usage note (2026-06-18):** owner's daily driver is the solo **Qwen3.6 35B**; the 235B `cluster` models are dormant. Keeping `launch-cluster.sh` (the `eugr/spark-vllm-docker` community standard, mirrors NVIDIA's `dgx-spark-playbooks` Ray+RoCE design) is still correct even single-node — it supplies the maintained, hardware-tuned vLLM images; raw docker would mean DIY image upkeep for no gain. Spark 2 stays the speech/embeddings box regardless.
- **Next steps (all low-priority / externally gated; P2/P3 tech-debt backlog in `ROADMAP.md`):** (1) raw-`docker run` swap generalization — **DEFERRED** (rationale in ROADMAP; revisit only if an adopter wants Spark Control to *drive*, not just monitor, raw-docker swaps — cleanest fix is the adopter adopting `launch-cluster.sh`). (2) audio concurrency knee — only if the Signal Engine dev wants it (needs a quiet window). (3) matrix-bridge Docker `HEALTHCHECK` — only if the bot dev asks. (4) Parakeet long-audio guard — deferred (rationale in ROADMAP).
+18 -5
View File
@@ -11,11 +11,24 @@ Driven by the one other Spark Control adopter (a colleague running OpenClaw + cr
Sequenced: Sequenced:
1. **Configurable `VLLM_PORT`** — DONE, v0.22.0:0. Field in Configure Sparks (blank ⇒ 8888); numeric-setting parsing hardened so a blank/bad value falls back instead of crashing startup. Was the immediate "vLLM unreachable" bug for an adopter on port 8000. 1. **Configurable `VLLM_PORT`** — DONE, v0.22.0:0. Field in Configure Sparks (blank ⇒ 8888); numeric-setting parsing hardened so a blank/bad value falls back instead of crashing startup. Was the immediate "vLLM unreachable" bug for an adopter on port 8000.
2. **Local-path / fine-tuned model support** — DONE, v0.23.0:0. Catalog/`ModelDef` gained `local_path` (exactly one of `repo`/`local_path`); swap bind-mounts the dir into the vLLM container at the same path via the launch script's `VLLM_SPARK_EXTRA_DOCKER_ARGS` hook (no `launch-cluster.sh` change); "+ Add local model" form + `local` badge; disk-delete refused for local models; `validate_local_path` boundary check. His merged `ten31-v2` was the motivating case. 2. **Local-path / fine-tuned model support** — DONE, v0.23.0:0. Catalog/`ModelDef` gained `local_path` (exactly one of `repo`/`local_path`); swap bind-mounts the dir into the vLLM container at the same path via the launch script's `VLLM_SPARK_EXTRA_DOCKER_ARGS` hook (no `launch-cluster.sh` change); "+ Add local model" form + `local` badge; disk-delete refused for local models; `validate_local_path` boundary check. His merged `ten31-v2` was the motivating case.
3. **Configurable topology**make the service→Spark→port map and container names configurable so the package stops assuming our exact layout. Lets an adopter monitor vLLM on *both* Sparks, use a different container name, and stop the Parakeet probe from hitting a vLLM that shares its port — without forking. (Covers report P4 multi-Spark vLLM, P5 container name, and the Parakeet-port collision #6.) 3. **Configurable topology**DONE, v0.24.0:0. Three optional Configure-Sparks knobs: vLLM container name (`VLLM_CONTAINER`, blank ⇒ `vllm_node`; threaded through the swap log-tail + pre-flight validator via `quote_arg`); "services to hide" (`DISABLED_SERVICES`, comma list — hidden services show no tile and are skipped by status/deep-health/connectivity probes, killing the Parakeet-on-8000 collision); and a second-Spark vLLM monitor via a `kind: vllm` custom service in `services-overrides.yaml` (read-only tile probed through the shared `probe_vllm_endpoint`). `/api/endpoints` gained a `disabled` flag. Covers report P4/P5/#6. (Generalizing the *swap* mechanism to the adopter's raw `docker run` was deliberately left out — that's coordination, item 4; he swaps via his own crons and uses Spark Control to monitor.)
4. **Coordination layer**build when our own automation actually lands (zero value until something other than the dashboard swaps models): 4. **Coordination layer**DONE in tree, staged as **v0.25.0:0** (built/typechecked clean; install pending). All three primitives shipped; `image/app/coordination.py` + `docs/COORDINATION.md`. Brought forward 2026-06-17 on request rather than waiting for our own automation.
- **Swap lock** with holder + TTL (`POST` / `GET` / `DELETE /api/swap/lock`). An external scheduler acquires it before swapping; the dashboard then refuses manual swaps and shows who holds the GPU and until when. Enforced by the swap path, not advisory. - **Swap lock** with holder + TTL (`POST` / `GET` / `DELETE /api/swap/lock`). Acquire returns a secret token; the swap endpoint refuses any real swap (`423`) that doesn't present it in `X-Swap-Lock-Token`, so the dashboard's manual swap is paused while a scheduler holds it (with a `?force=true` human override). In-memory + TTL-bounded → resets to unlocked on restart; re-acquire with the token extends. Enforced in `post_swap`, not advisory.
- **Swap-event webhook** (`swap_complete` / `swap_failed`) to a configurable URL, so downstream consumers update their provider config when the running model changes. - **Swap-event webhook** (`swap_complete` / `swap_failed`) to a configurable URL (Configure-Sparks field), fired from `SwapManager._run` *outside* the swap lock; optional shared secret ⇒ `X-Spark-Signature` HMAC. Fire-and-forget (5 s, no retries); dry runs don't fire.
- **Schedule visibility** — read-only view the dashboard surfaces, *registered by* external schedulers (Spark Control does not own the schedule). - **Schedule visibility** — `GET/POST/DELETE /api/schedule`; read-only "Scheduled jobs" dashboard panel, registered by external schedulers. Spark Control stores and displays, never executes.
- Tests: `image/tests/test_coordination.py` (22 cases — lock lifecycle/expiry/token, the single-read swap gate, schedule CRUD + id validation, webhook payload+signature). Known limit: lock + schedules are in-memory (a restart frees the lock and empties the registry until schedulers re-register) — persist to `/data` only if that bites.
### Generalizing the swap mechanism to raw `docker run` — DEFERRED (decided 2026-06-18, research-backed; was item 4's last open thread)
Our swap drives `~/spark-vllm-docker/launch-cluster.sh` over SSH on Spark 1 (`./launch-cluster.sh stop`, then `[VLLM_SPARK_EXTRA_DOCKER_ARGS=…] ./launch-cluster.sh [--solo ]-d exec vllm serve <model> <args>`, then `docker logs -f` until the ready marker). The OpenClaw adopter launches vLLM with a plain `docker run` instead, so the swap button can't drive his cluster — only monitor it. The portability fix would be a configurable "swap backend": keep `launch-cluster.sh` as the default and add a "bring your own command" mode (operator-authored stop/launch templates in `services-overrides.yaml` with quoted `{model}`/`{container}`/`{port}`/`{extra_args}` substitution; ready-detection unchanged; the vLLM-argparse pre-flight disabled for that backend).
**Why deferred, not built:**
- **Raw docker is not an upgrade for *us* — for half our catalog it's impossible.** `launch-cluster.sh` is the `eugr/spark-vllm-docker` community project (de-facto DGX Spark standard; mirrors NVIDIA's own `dgx-spark-playbooks` Ray+RDMA architecture). Its headline job is **multi-node** serving: our 235B `cluster` models (Qwen3-VL 235B, Qwen3 235B) exceed one Spark's 128 GB and *must* shard across both Sparks via Ray over the 200 Gbps ConnectX/RoCE link — plumbing (NCCL/MTU/per-node env) that a single-node `docker run` cannot do. So we keep the helper script; switching our own cluster to raw docker is off the table.
- **The feature is therefore portability-only** (for differently-wired adopters), and the one known adopter doesn't need it — he swaps via his own crons and uses Spark Control to watch.
- **Untestable on our hardware** — our cluster uses the helper script, so we can't validate a real raw-docker swap without risking the live vLLM.
- The one real standing risk is eugr's single-maintainer status; fallback is community forks or migrating to NVIDIA's official `dgx-spark-playbooks` launcher (same design). No reason to switch now.
**Revisit only if** an adopter explicitly wants Spark Control to *drive* (not just monitor) swaps on a raw-`docker run` cluster. At that point, get their actual working `docker run` command and build the command-template backend to it.
## Near term ## Near term
- parakeet-asr long-audio memory guard — **deferred 2026-06-15, low priority.** A duration cap on `/v1/audio/diarize`: Sortformer runs the whole file in one pass (`diarizer.py:128-135`) over Spark 2's *shared* 128 GB unified memory (also feeding Kokoro/embeddings/Qdrant), so one giant single file can thrash into swap. **Precautionary — no observed incident**, and the production consumer (Recap Relay) already chunks via `/diarize-chunk` (~5-min, already bounded), so the only exposed path is a consumer POSTing one huge file to the full `/diarize`. When picked up: add a configurable `MAX_DIARIZE_SECONDS` guard in `diarizer.py` right after `duration` is computed (~line 130) → raise → HTTP 413 in `main.py` (mirrors the existing `MAX_UPLOAD_MB` 413); ship via the Reapply-patches action (restarts the live parakeet-asr container → needs go/no-go). Leave transcription out of v1 (upstream/un-patched file; parakeet-TDT handles long audio better). Revisit only if a consumer starts sending long single files. - parakeet-asr long-audio memory guard — **deferred 2026-06-15, low priority.** A duration cap on `/v1/audio/diarize`: Sortformer runs the whole file in one pass (`diarizer.py:128-135`) over Spark 2's *shared* 128 GB unified memory (also feeding Kokoro/embeddings/Qdrant), so one giant single file can thrash into swap. **Precautionary — no observed incident**, and the production consumer (Recap Relay) already chunks via `/diarize-chunk` (~5-min, already bounded), so the only exposed path is a consumer POSTing one huge file to the full `/diarize`. When picked up: add a configurable `MAX_DIARIZE_SECONDS` guard in `diarizer.py` right after `duration` is computed (~line 130) → raise → HTTP 413 in `main.py` (mirrors the existing `MAX_UPLOAD_MB` 413); ship via the Reapply-patches action (restarts the live parakeet-asr container → needs go/no-go). Leave transcription out of v1 (upstream/un-patched file; parakeet-TDT handles long audio better). Revisit only if a consumer starts sending long single files.
+157
View File
@@ -0,0 +1,157 @@
# Cluster coordination through Spark Control (v0.25.0)
Spark Control is the **GPU arbiter, not a job runner.** Your recurring pipelines
(model-warming crons, "daily X" generators, batch jobs) live in your own
services and *drive Spark Control's swap API*. This page documents the safety
layer around that: a **swap reservation lock**, a **swap-event webhook**, and a
**read-only schedule registry**.
If only the dashboard ever swaps models, you don't need any of this — it's for
when something automated also swaps.
All endpoints are on the Spark Control host (same LAN/VPN URL as the LLM, audio,
and embeddings proxies). There is no API-token auth by design (LAN + split-tunnel
VPN only); a non-browser client passes the same-origin guard automatically.
---
## 1. Swap reservation lock
A short, TTL-bounded reservation of the swap path. While a lock is held, **any
real swap that doesn't present the holder's token is refused with `423 Locked`**
— including the dashboard's manual swap. The holder *name* is descriptive; the
returned **token** is the secret that authorises swaps and the release.
The lock is in-memory: it resets to *unlocked* if Spark Control restarts (the
safe-for-availability default), and the swap engine's own in-progress guard
still prevents two swaps running at once.
### `POST /api/swap/lock` — acquire (or extend)
```json
// request
{ "holder": "openclaw-daily-vol", "ttl_seconds": 900, "note": "daily vol run" }
// 200 response
{
"held": true,
"holder": "openclaw-daily-vol",
"acquired_at": "2026-06-17T12:00:00+00:00",
"expires_at": "2026-06-17T12:15:00+00:00",
"seconds_remaining": 900,
"note": "daily vol run",
"token": "a1b2c3…" // SECRET — store it; needed to swap and to release
}
```
- `ttl_seconds` is optional (default 900) and clamped to `[1, 86400]`.
- **`409`** if a *different* holder already holds it (body includes the current
`lock` state). To **extend** your own lock, POST again with the same `holder`
**and** your `token` — the token is preserved and the window slides forward.
### `GET /api/swap/lock` — status (no token)
```json
{ "held": true, "holder": "openclaw-daily-vol", "expires_at": "…", "seconds_remaining": 612, "note": "…" }
// or
{ "held": false }
```
### `DELETE /api/swap/lock` — release
Send your token in the `X-Swap-Lock-Token` header (or `?token=`):
```
DELETE /api/swap/lock
X-Swap-Lock-Token: a1b2c3…
```
- **`403`** if the token doesn't match. The dashboard's human override is
`DELETE /api/swap/lock?force=true` (no token).
### Swapping while you hold the lock
Pass the token on the swap call; the dashboard (no token) is then blocked:
```
POST /api/swap
X-Swap-Lock-Token: a1b2c3…
{ "model_key": "gemma-3-27b" }
```
Recommended scheduler flow: **acquire → swap (with token) → poll `/api/swap/{id}`
→ release**. Always release in a `finally`; if you crash, the TTL frees it.
> `POST /api/swap/{key}/validate` (pre-flight) and dry-run swaps are **not**
> blocked by the lock — they don't touch the cluster.
---
## 2. Swap-event webhook
Configure a URL in **Configure Sparks → "Swap webhook URL"**. After every real
swap, Spark Control POSTs:
```json
{
"event": "swap_complete", // or "swap_failed"
"job_id": "1a2b3c4d",
"model_key": "gemma-3-27b",
"state": "ready", // or "failed"
"returncode": 0,
"started_at": "2026-06-17T12:00:00+00:00",
"finished_at": "2026-06-17T12:03:11+00:00",
"dry_run": false
}
```
Headers: `X-Spark-Event: swap_complete`. If you set a **webhook secret**, the
body is signed: `X-Spark-Signature: sha256=<hmac>` (HMAC-SHA256 of the raw body
with the shared secret). Verify it like:
```python
import hmac, hashlib
expected = "sha256=" + hmac.new(secret.encode(), raw_body, hashlib.sha256).hexdigest()
assert hmac.compare_digest(expected, request.headers["X-Spark-Signature"])
```
Delivery is best-effort and fire-and-forget (5 s timeout, no retries) — a
webhook failure never affects the swap itself. Dry runs don't fire.
---
## 3. Schedule registry (read-only display)
So the dashboard can show *what's scheduled to touch the GPU and when*, your
schedulers register their jobs here. **Spark Control only displays these — it
never executes them.**
### `POST /api/schedule` — register / update
```json
// request (pass a stable `id` to update in place on re-register)
{ "id": "daily-vol", "name": "Daily Vol", "owner": "openclaw",
"cron": "0 6 * * *", "next_run": "2026-06-18T06:00:00Z",
"description": "Swaps to the big model, generates the vol report" }
// response: the stored entry (generates an id if you omit one)
```
`name` is required; `id` (if given) must match `[A-Za-z0-9_.-]` (≤64 chars).
### `GET /api/schedule` — list
```json
{ "schedules": [ { "id": "daily-vol", "name": "Daily Vol", "owner": "openclaw",
"cron": "0 6 * * *", "next_run": "…", "description": "…",
"registered_at": "…", "updated_at": "…" } ] }
```
### `DELETE /api/schedule/{id}` — deregister
```json
{ "deleted": true }
```
The registry is in-memory — re-register your schedules on your own startup so
they survive a Spark Control restart.
+49
View File
@@ -1,13 +1,44 @@
from __future__ import annotations from __future__ import annotations
import logging
import os import os
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from .shellsafe import validate_container
log = logging.getLogger(__name__)
def _env(name: str, default: str = "") -> str: def _env(name: str, default: str = "") -> str:
return os.environ.get(name, default) return os.environ.get(name, default)
def _env_container(name: str, default: str) -> str:
"""Resolve a container-name env var, validating it at the config boundary.
The value flows into `docker logs`/`docker exec` over SSH, so it's quoted at
the sink — but per the repo's two-layer convention it's also whitelist-checked
here. A malformed optional value falls back to `default` rather than crashing
daemon startup (mirrors `_env_int` for VLLM_PORT)."""
val = os.environ.get(name, "") or default
try:
return validate_container(val)
except ValueError:
log.warning("ignoring invalid %s=%r; using %r", name, val, default)
return default
def _env_set(name: str) -> frozenset[str]:
"""Parse a comma-separated env var into a lowercased frozenset of keys.
Used by DISABLED_SERVICES so an adopter whose cluster doesn't run a given
support service can switch its tile + probes off entirely (rather than have
the probe hit whatever else listens on that port — e.g. a vLLM sharing
Parakeet's default 8000)."""
raw = os.environ.get(name, "")
return frozenset(part.strip().lower() for part in raw.split(",") if part.strip())
def _env_int(name: str, default: int) -> int: def _env_int(name: str, default: int) -> int:
"""Parse an int env var, falling back to `default` when unset, blank, or """Parse an int env var, falling back to `default` when unset, blank, or
malformed. The StartOS Configure panel passes optional numeric fields as an malformed. The StartOS Configure panel passes optional numeric fields as an
@@ -63,6 +94,8 @@ class Settings:
ssh_known_hosts: str ssh_known_hosts: str
models_yaml: str models_yaml: str
vllm_port: int vllm_port: int
vllm_container: str
disabled_services: frozenset[str]
parakeet_port: int parakeet_port: int
kokoro_port: int kokoro_port: int
embed_port: int embed_port: int
@@ -70,6 +103,8 @@ class Settings:
bind_port: int bind_port: int
open_webui_url: str open_webui_url: str
ngc_api_key: str ngc_api_key: str
swap_webhook_url: str
swap_webhook_secret: str
@classmethod @classmethod
def from_env(cls) -> "Settings": def from_env(cls) -> "Settings":
@@ -116,6 +151,15 @@ class Settings:
ssh_known_hosts=_env("SSH_KNOWN_HOSTS"), ssh_known_hosts=_env("SSH_KNOWN_HOSTS"),
models_yaml=_resolve_models_yaml(), models_yaml=_resolve_models_yaml(),
vllm_port=_env_int("VLLM_PORT", 8888), vllm_port=_env_int("VLLM_PORT", 8888),
# Container name for the swappable vLLM on Spark 1. Defaults to the
# bundled launch-cluster.sh container; override if you named yours
# something else (the swap log-tail and pre-flight validator exec
# into it by name).
vllm_container=_env_container("VLLM_CONTAINER", "vllm_node"),
# Built-in support-service keys (parakeet, kokoro, embeddings,
# qdrant) the deployment doesn't run — hidden from the dashboard and
# never probed.
disabled_services=_env_set("DISABLED_SERVICES"),
parakeet_port=_env_int("PARAKEET_PORT", 8000), parakeet_port=_env_int("PARAKEET_PORT", 8000),
kokoro_port=_env_int("KOKORO_PORT", 8880), kokoro_port=_env_int("KOKORO_PORT", 8880),
embed_port=_env_int("EMBED_PORT", 8088), embed_port=_env_int("EMBED_PORT", 8088),
@@ -123,6 +167,11 @@ class Settings:
bind_port=_env_int("BIND_PORT", 9999), bind_port=_env_int("BIND_PORT", 9999),
open_webui_url=_env("OPEN_WEBUI_URL", ""), open_webui_url=_env("OPEN_WEBUI_URL", ""),
ngc_api_key=_env("NGC_API_KEY", ""), ngc_api_key=_env("NGC_API_KEY", ""),
# Coordination layer: fire a swap-lifecycle webhook to this URL so
# downstream consumers re-point their model config on a swap. Blank
# ⇒ disabled. The optional secret HMAC-signs the body (X-Spark-Signature).
swap_webhook_url=_env("SWAP_WEBHOOK_URL", ""),
swap_webhook_secret=_env("SWAP_WEBHOOK_SECRET", ""),
) )
@property @property
+342
View File
@@ -0,0 +1,342 @@
"""Cluster-coordination layer: the GPU swap lock, swap-event webhook, and the
read-only schedule registry.
Spark Control is the **control plane / GPU arbiter, not a job runner.** Recurring
business pipelines live in separate services that *call* the swap API. These
three primitives add the *safety* layer around that:
- **Swap lock** — a TTL-bounded reservation of the swap path. An external
scheduler acquires it before swapping; while held by someone else the
dashboard's manual swap is refused (enforced in the swap endpoint, not
advisory). Holder name is descriptive; the returned token is the secret that
authorises a swap or a release.
- **Webhook** — fires `swap_complete` / `swap_failed` to a configurable URL so
downstream consumers re-point their provider config when the running model
changes. Optionally HMAC-signed.
- **Schedule registry** — a read-only view the dashboard surfaces, *registered
by* external schedulers. Spark Control stores what it's told; it does not own
or execute any schedule.
All state is in-memory (mirroring the swap/download/NIM job managers). On a
restart the lock resets to *unlocked* — the available-by-default failure mode;
the swap manager's own in-progress guard still prevents two swaps at once —
and schedulers re-register their schedules.
"""
from __future__ import annotations
import hashlib
import hmac
import json
import logging
import re
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Optional
import httpx
log = logging.getLogger(__name__)
# A lock reserves the GPU for a window; clamp the TTL so a buggy client can
# neither pin the cluster forever nor take a zero-length (useless) lock.
LOCK_TTL_MIN = 1
LOCK_TTL_MAX = 86_400 # 24h
LOCK_TTL_DEFAULT = 900 # 15 min
# Schedule ids are reflected to the dashboard and used as a URL path segment on
# delete, so a caller-supplied id is whitelist-checked. Generated ids are hex.
_SCHEDULE_ID_RE = re.compile(r"^[A-Za-z0-9_.-]{1,64}$")
def valid_schedule_id(value: str) -> bool:
"""Whitelist check for a caller-supplied schedule id (register and delete)."""
return bool(_SCHEDULE_ID_RE.match(value or ""))
def _now() -> datetime:
return datetime.now(timezone.utc)
def _iso(dt: datetime) -> str:
return dt.isoformat()
# ---------------------------------------------------------------- swap lock ----
class LockHeld(Exception):
"""The lock is held by a different holder. Carries the public lock state so
the endpoint can return holder + expiry in the 409 body."""
def __init__(self, state: dict) -> None:
self.state = state
super().__init__("swap lock is held by another holder")
@dataclass
class LockState:
holder: str
token: str
acquired_at: datetime
expires_at: datetime
note: str = ""
def public(self, now: datetime) -> dict:
"""Token-free view safe to expose on GET / in error bodies."""
return {
"held": True,
"holder": self.holder,
"acquired_at": _iso(self.acquired_at),
"expires_at": _iso(self.expires_at),
"seconds_remaining": max(0, int((self.expires_at - now).total_seconds())),
"note": self.note,
}
class SwapLockManager:
"""In-memory, TTL-bounded reservation of the GPU swap path.
`now` is injectable on every method purely so the expiry logic is testable
without sleeping; production calls omit it and get wall-clock UTC.
"""
def __init__(self) -> None:
self._lock: Optional[LockState] = None
def _active(self, now: Optional[datetime] = None) -> Optional[LockState]:
"""The current lock if one is held and unexpired; lazily clears an
expired lock so it never lingers."""
now = now or _now()
if self._lock is not None and self._lock.expires_at <= now:
self._lock = None
return self._lock
def status(self, now: Optional[datetime] = None) -> dict:
now = now or _now()
active = self._active(now)
return active.public(now) if active else {"held": False}
def acquire(
self,
holder: str,
ttl_seconds: Optional[int] = None,
note: str = "",
token: Optional[str] = None,
*,
now: Optional[datetime] = None,
) -> LockState:
"""Acquire a free lock (new token), or extend one already held by
presenting its token. A request without the token is refused even if the
holder name matches — the name is descriptive, the token is the secret.
"""
now = now or _now()
holder = (holder or "").strip()
if not holder:
raise ValueError("holder is required")
ttl = ttl_seconds if ttl_seconds is not None else LOCK_TTL_DEFAULT
try:
ttl = int(ttl)
except (TypeError, ValueError):
ttl = LOCK_TTL_DEFAULT
ttl = max(LOCK_TTL_MIN, min(LOCK_TTL_MAX, ttl))
active = self._active(now)
if active is not None:
# Held — only the token-holder may extend/re-acquire.
if not (token and hmac.compare_digest(active.token, token)):
raise LockHeld(active.public(now))
self._lock = LockState(
holder=holder or active.holder,
token=active.token,
acquired_at=active.acquired_at,
expires_at=now + timedelta(seconds=ttl),
note=note or active.note,
)
return self._lock
self._lock = LockState(
holder=holder,
token=uuid.uuid4().hex,
acquired_at=now,
expires_at=now + timedelta(seconds=ttl),
note=note,
)
return self._lock
def verify(self, token: Optional[str], now: Optional[datetime] = None) -> bool:
"""True iff `token` matches the currently-active lock."""
active = self._active(now)
return bool(active and token and hmac.compare_digest(active.token, token))
def is_blocked_by(self, token: Optional[str], now: Optional[datetime] = None) -> Optional[dict]:
"""Single-read swap gate. Returns the public lock state if an active
lock blocks a swap carrying this token, else None. Does exactly one
`_active()` read so the decision can't straddle a TTL expiry the way a
separate status()+verify() pair could (which, at the expiry tick, would
spuriously refuse a swap that should now be allowed)."""
now = now or _now()
active = self._active(now)
if active is None:
return None
if token and hmac.compare_digest(active.token, token):
return None
return active.public(now)
def release(
self,
token: Optional[str] = None,
*,
force: bool = False,
now: Optional[datetime] = None,
) -> bool:
"""Release the lock. Returns False if nothing was held. Requires the
matching token unless `force` (the human override from the dashboard)."""
active = self._active(now)
if active is None:
return False
if not force and not self.verify(token, now):
raise PermissionError("token does not hold the lock")
self._lock = None
return True
# ----------------------------------------------------------------- webhook ----
def build_webhook_payload(
*,
event: str,
job_id: str,
model_key: str,
state: str,
returncode: Optional[int],
started_at: Optional[str],
finished_at: Optional[str],
dry_run: bool,
) -> dict:
return {
"event": event, # swap_complete | swap_failed
"job_id": job_id,
"model_key": model_key,
"state": state,
"returncode": returncode,
"started_at": started_at,
"finished_at": finished_at,
"dry_run": dry_run,
}
def sign_payload(secret: str, body: bytes) -> str:
"""`X-Spark-Signature` value: sha256 HMAC of the exact JSON body the
consumer receives, so they can recompute and trust it."""
return "sha256=" + hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
class WebhookNotifier:
"""Fire-and-forget POST of swap-lifecycle events. A webhook failure is
logged and swallowed — it must never affect the swap outcome."""
def __init__(self, url: str, secret: str = "", timeout: float = 5.0) -> None:
self.url = (url or "").strip()
self.secret = secret or ""
self.timeout = timeout
@property
def enabled(self) -> bool:
return bool(self.url)
async def fire(self, event: str, payload: dict) -> None:
if not self.enabled:
return
body = json.dumps(payload).encode()
headers = {
"content-type": "application/json",
"user-agent": "spark-control-webhook",
"x-spark-event": event,
}
if self.secret:
headers["x-spark-signature"] = sign_payload(self.secret, body)
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
await client.post(self.url, content=body, headers=headers)
except Exception as e: # noqa: BLE001 — best-effort, never propagate
log.warning("swap webhook to %s failed: %s", self.url, e)
# -------------------------------------------------------- schedule registry ----
@dataclass
class ScheduleEntry:
id: str
name: str
owner: str = ""
cron: str = ""
next_run: str = ""
description: str = ""
registered_at: str = ""
updated_at: str = ""
def public(self) -> dict:
return {
"id": self.id,
"name": self.name,
"owner": self.owner,
"cron": self.cron,
"next_run": self.next_run,
"description": self.description,
"registered_at": self.registered_at,
"updated_at": self.updated_at,
}
class ScheduleRegistry:
"""What external schedulers tell us about their cron jobs. Read-only from the
dashboard's side; Spark Control never executes any of it."""
def __init__(self) -> None:
self._items: dict[str, ScheduleEntry] = {}
def list(self) -> list[dict]:
return [e.public() for e in self._items.values()]
def register(
self,
*,
name: str,
id: Optional[str] = None,
owner: str = "",
cron: str = "",
next_run: str = "",
description: str = "",
) -> ScheduleEntry:
name = (name or "").strip()
if not name:
raise ValueError("name is required")
if id is not None:
id = id.strip()
if id and not valid_schedule_id(id):
raise ValueError("id must match [A-Za-z0-9_.-] (max 64 chars)")
ts = _iso(_now())
existing = self._items.get(id) if id else None
if existing is not None:
existing.name = name
existing.owner = owner.strip()
existing.cron = cron
existing.next_run = next_run
existing.description = description
existing.updated_at = ts
return existing
sid = id or uuid.uuid4().hex[:8]
entry = ScheduleEntry(
id=sid,
name=name,
owner=owner.strip(),
cron=cron,
next_run=next_run,
description=description,
registered_at=ts,
updated_at=ts,
)
self._items[sid] = entry
return entry
def delete(self, schedule_id: str) -> bool:
return self._items.pop(schedule_id, None) is not None
+11
View File
@@ -10,6 +10,17 @@ Format:
port: 8001 port: 8001
health_path: /health health_path: /health
image: nvcr.io/nim/nvidia/riva-multilingual:latest image: nvcr.io/nim/nvidia/riva-multilingual:latest
A `kind: vllm` entry monitors an additional vLLM on another Spark (read-only —
the swap machinery only drives the primary Spark 1 vLLM). It gets a health tile
probed via /v1/models plus container state and start/stop/restart:
custom:
- key: vllm-spark2
kind: vllm
host: <spark-2-ip>
user: <ssh-user>
container: vllm_node
port: 8000
""" """
from __future__ import annotations from __future__ import annotations
import os import os
+4
View File
@@ -377,6 +377,10 @@ class DeepHealth:
async def run_all(self) -> dict[str, ProbeResult]: async def run_all(self) -> dict[str, ProbeResult]:
results = {} results = {}
for name in self.PROBES: for name in self.PROBES:
# Don't deep-probe a service the deployment switched off — its port
# may be answered by something else (e.g. a vLLM on Parakeet's 8000).
if name in self.settings.disabled_services:
continue
results[name] = await self.run_one(name) results[name] = await self.run_one(name)
return results return results
+34 -9
View File
@@ -6,17 +6,28 @@ from .config import Settings
_TIMEOUT = 3.0 _TIMEOUT = 3.0
async def check_vllm(settings: Settings) -> dict: def _disabled(settings: Settings, key: str) -> dict | None:
base_url = ( """A clean 'disabled' verdict if `key` is in DISABLED_SERVICES, else None.
f"http://{settings.spark1_host}:{settings.vllm_port}/v1"
if settings.spark1_host Lets an adopter who doesn't run a given support service switch its probe off
else None entirely — so the probe never hits whatever else listens on that port, and
) the connectivity log doesn't record it as perpetually down."""
if not settings.spark1_host: if key in settings.disabled_services:
return {"ok": False, "error": "spark1 not configured", "base_url": base_url} return {"ok": False, "disabled": True, "error": "disabled", "base_url": None}
return None
async def probe_vllm_endpoint(host: str, port: int) -> dict:
"""Probe any OpenAI-compatible vLLM at host:port via /v1/models.
Shared by the primary (Spark 1) health check and any extra vLLM registered
as a custom service (kind: vllm) to monitor a second Spark."""
base_url = f"http://{host}:{port}/v1" if host else None
if not host:
return {"ok": False, "error": "vllm host not configured", "base_url": base_url}
try: try:
async with httpx.AsyncClient(timeout=_TIMEOUT) as c: async with httpx.AsyncClient(timeout=_TIMEOUT) as c:
r = await c.get(f"http://{settings.spark1_host}:{settings.vllm_port}/v1/models") r = await c.get(f"http://{host}:{port}/v1/models")
r.raise_for_status() r.raise_for_status()
ids = [m["id"] for m in r.json().get("data", [])] ids = [m["id"] for m in r.json().get("data", [])]
return { return {
@@ -29,7 +40,15 @@ async def check_vllm(settings: Settings) -> dict:
return {"ok": False, "error": str(e), "base_url": base_url} return {"ok": False, "error": str(e), "base_url": base_url}
async def check_vllm(settings: Settings) -> dict:
if not settings.spark1_host:
return {"ok": False, "error": "spark1 not configured", "base_url": None}
return await probe_vllm_endpoint(settings.spark1_host, settings.vllm_port)
async def check_parakeet(settings: Settings) -> dict: async def check_parakeet(settings: Settings) -> dict:
if d := _disabled(settings, "parakeet"):
return d
base_url = ( base_url = (
f"http://{settings.parakeet_host}:{settings.parakeet_port}" f"http://{settings.parakeet_host}:{settings.parakeet_port}"
if settings.parakeet_host if settings.parakeet_host
@@ -47,6 +66,8 @@ async def check_parakeet(settings: Settings) -> dict:
async def check_kokoro(settings: Settings) -> dict: async def check_kokoro(settings: Settings) -> dict:
if d := _disabled(settings, "kokoro"):
return d
base_url = ( base_url = (
f"http://{settings.kokoro_host}:{settings.kokoro_port}" f"http://{settings.kokoro_host}:{settings.kokoro_port}"
if settings.kokoro_host if settings.kokoro_host
@@ -68,6 +89,8 @@ async def check_kokoro(settings: Settings) -> dict:
async def check_embeddings(settings: Settings) -> dict: async def check_embeddings(settings: Settings) -> dict:
if d := _disabled(settings, "embeddings"):
return d
base_url = ( base_url = (
f"http://{settings.embed_host}:{settings.embed_port}" f"http://{settings.embed_host}:{settings.embed_port}"
if settings.embed_host if settings.embed_host
@@ -89,6 +112,8 @@ async def check_embeddings(settings: Settings) -> dict:
async def check_qdrant(settings: Settings) -> dict: async def check_qdrant(settings: Settings) -> dict:
if d := _disabled(settings, "qdrant"):
return d
base_url = ( base_url = (
f"http://{settings.qdrant_host}:{settings.qdrant_port}" f"http://{settings.qdrant_host}:{settings.qdrant_port}"
if settings.qdrant_host if settings.qdrant_host
+127 -10
View File
@@ -11,6 +11,7 @@ from typing import Literal
from .config import Settings from .config import Settings
from .connectivity import get_mac, record_report, record_state, summary as connectivity_summary from .connectivity import get_mac, record_report, record_state, summary as connectivity_summary
from .coordination import LockHeld, ScheduleRegistry, SwapLockManager, WebhookNotifier, valid_schedule_id
from .custom_services import add_custom_service, delete_custom_service from .custom_services import add_custom_service, delete_custom_service
from .audio_proxy import build_router as build_audio_router from .audio_proxy import build_router as build_audio_router
from .deep_health import DeepHealth from .deep_health import DeepHealth
@@ -20,7 +21,7 @@ from .llm_proxy import build_router as build_llm_router
from .embeddings_proxy import build_router as build_embeddings_router from .embeddings_proxy import build_router as build_embeddings_router
from .redaction_gateway import build_router as build_redaction_router, MapStore from .redaction_gateway import build_router as build_redaction_router, MapStore
from .hardware import HardwareProbe from .hardware import HardwareProbe
from .health import check_kokoro, check_parakeet, check_vllm, check_embeddings, check_qdrant from .health import check_kokoro, check_parakeet, check_vllm, check_embeddings, check_qdrant, probe_vllm_endpoint
from .matrix_bridge import MatrixBridgeManager from .matrix_bridge import MatrixBridgeManager
from .models import ModelDef, load_catalog from .models import ModelDef, load_catalog
from .nim import SUGGESTED_NIMS, CATALOG_URL, NimManager from .nim import SUGGESTED_NIMS, CATALOG_URL, NimManager
@@ -37,7 +38,12 @@ from .wol import send_local_broadcast, send_via_peer
settings = Settings.from_env() settings = Settings.from_env()
catalog = load_catalog(settings.models_yaml) catalog = load_catalog(settings.models_yaml)
swap_manager = SwapManager(settings, catalog) # Coordination layer (GPU arbiter): swap-lifecycle webhook, the swap reservation
# lock, and the read-only schedule registry. See coordination.py.
swap_webhook = WebhookNotifier(settings.swap_webhook_url, settings.swap_webhook_secret)
swap_lock = SwapLockManager()
schedule_registry = ScheduleRegistry()
swap_manager = SwapManager(settings, catalog, notifier=swap_webhook)
download_manager = DownloadManager(settings) download_manager = DownloadManager(settings)
update_manager = UpdateManager(settings) update_manager = UpdateManager(settings)
hardware_probe = HardwareProbe(settings) hardware_probe = HardwareProbe(settings)
@@ -67,6 +73,10 @@ _CSRF_EXEMPT_PREFIXES = (
"/api/audio/", # diarize-chunk / label-merge / transcribe-with-speakers "/api/audio/", # diarize-chunk / label-merge / transcribe-with-speakers
"/api/health-event", # health reports posted by consumer apps "/api/health-event", # health reports posted by consumer apps
) )
# Note: the coordination endpoints (/api/swap/lock, /api/schedule) are
# intentionally NOT exempt. External schedulers are non-browser clients (no
# Origin header) so they pass the guard already — same as /api/swap — while a
# malicious page can't drive them from the operator's browser. Don't add them.
@app.middleware("http") @app.middleware("http")
@@ -500,6 +510,10 @@ async def get_services() -> dict:
http = await check_embeddings(settings) http = await check_embeddings(settings)
elif name == "qdrant": elif name == "qdrant":
http = await check_qdrant(settings) http = await check_qdrant(settings)
elif svc.kind == "vllm":
# An extra vLLM monitored on another Spark (registered as a custom
# service). Probe its own host/port, not the primary Spark 1 one.
http = await probe_vllm_endpoint(svc.host, svc.port)
elif svc.kind == "bot": elif svc.kind == "bot":
# No HTTP health endpoint (host networking, no port) — judged purely # No HTTP health endpoint (host networking, no port) — judged purely
# by docker state. http_ready stays None so the badge isn't pinned # by docker state. http_ready stays None so the badge isn't pinned
@@ -521,7 +535,7 @@ async def get_services() -> dict:
# Prefer the check fn's own top-level model key (embeddings reports # Prefer the check fn's own top-level model key (embeddings reports
# it there); fall back to a model field inside detail for services # it there); fall back to a model field inside detail for services
# whose /health embeds it (parakeet). # whose /health embeds it (parakeet).
"model": http.get("model") or ((http.get("detail") or {}).get("model") if isinstance(http.get("detail"), dict) else None), "model": http.get("model") or http.get("current_model") or ((http.get("detail") or {}).get("model") if isinstance(http.get("detail"), dict) else None),
"docker_state": docker.get("state"), "docker_state": docker.get("state"),
"restart_count": docker.get("restart_count"), "restart_count": docker.get("restart_count"),
"started_at": docker.get("started_at"), "started_at": docker.get("started_at"),
@@ -799,17 +813,20 @@ async def get_endpoints() -> dict:
"base_url": vllm.get("base_url"), "base_url": vllm.get("base_url"),
"model": vllm.get("current_model"), "model": vllm.get("current_model"),
"openai_compat": True, "openai_compat": True,
"disabled": bool(vllm.get("disabled")),
}, },
"parakeet": { "parakeet": {
"ready": bool(parakeet.get("ok")), "ready": bool(parakeet.get("ok")),
"base_url": parakeet.get("base_url"), "base_url": parakeet.get("base_url"),
"kind": "stt", "kind": "stt",
"model": (parakeet.get("detail") or {}).get("model") if isinstance(parakeet.get("detail"), dict) else None, "model": (parakeet.get("detail") or {}).get("model") if isinstance(parakeet.get("detail"), dict) else None,
"disabled": bool(parakeet.get("disabled")),
}, },
"kokoro": { "kokoro": {
"ready": bool(kokoro.get("ok")), "ready": bool(kokoro.get("ok")),
"base_url": kokoro.get("base_url"), "base_url": kokoro.get("base_url"),
"kind": "tts", "kind": "tts",
"disabled": bool(kokoro.get("disabled")),
}, },
"embeddings": { "embeddings": {
"ready": bool(embeddings.get("ok")), "ready": bool(embeddings.get("ok")),
@@ -818,12 +835,14 @@ async def get_endpoints() -> dict:
"model": embeddings.get("model"), "model": embeddings.get("model"),
# The proxied OpenAI-compatible endpoints live on Spark Control itself. # The proxied OpenAI-compatible endpoints live on Spark Control itself.
"openai_endpoints": ["/v1/embeddings", "/v1/rerank", "/api/search"], "openai_endpoints": ["/v1/embeddings", "/v1/rerank", "/api/search"],
"disabled": bool(embeddings.get("disabled")),
}, },
"qdrant": { "qdrant": {
"ready": bool(qdrant.get("ok")), "ready": bool(qdrant.get("ok")),
"base_url": qdrant.get("base_url"), "base_url": qdrant.get("base_url"),
"kind": "vectordb", "kind": "vectordb",
"collection": settings.qdrant_collection or None, "collection": settings.qdrant_collection or None,
"disabled": bool(qdrant.get("disabled")),
}, },
} }
@@ -837,12 +856,15 @@ async def get_status() -> dict:
check_embeddings(settings), check_embeddings(settings),
check_qdrant(settings), check_qdrant(settings),
) )
# Feed health into the connectivity log (deduped — only logs on transition) # Feed health into the connectivity log (deduped — only logs on transition).
record_state("vllm", bool(vllm.get("ok"))) # Skip services switched off via DISABLED_SERVICES — they'd otherwise log as
record_state("parakeet", bool(parakeet.get("ok"))) # perpetually down.
record_state("kokoro", bool(kokoro.get("ok"))) for _name, _r in (
record_state("embeddings", bool(embeddings.get("ok"))) ("vllm", vllm), ("parakeet", parakeet), ("kokoro", kokoro),
record_state("qdrant", bool(qdrant.get("ok"))) ("embeddings", embeddings), ("qdrant", qdrant),
):
if not _r.get("disabled"):
record_state(_name, bool(_r.get("ok")))
current_key = _identify_current_model(vllm.get("current_model")) current_key = _identify_current_model(vllm.get("current_model"))
return { return {
"configured": settings.configured, "configured": settings.configured,
@@ -880,9 +902,21 @@ async def validate_swap(key: str) -> dict:
@app.post("/api/swap") @app.post("/api/swap")
async def post_swap(req: SwapRequest) -> dict: async def post_swap(req: SwapRequest, request: Request) -> dict:
if not settings.configured and not req.dry_run: if not settings.configured and not req.dry_run:
raise HTTPException(503, "spark1 not configured") raise HTTPException(503, "spark1 not configured")
# Enforce the swap reservation lock (the GPU arbiter). A held lock blocks any
# real swap that doesn't present the holder's token in X-Swap-Lock-Token — so
# an external scheduler that holds the lock can swap, but the dashboard (no
# token) is refused while someone else holds it. Dry runs don't touch the
# cluster, so they're exempt.
if not req.dry_run:
blocked = swap_lock.is_blocked_by(request.headers.get("x-swap-lock-token"))
if blocked is not None:
raise HTTPException(status_code=423, detail={
"error": "the GPU swap path is reserved by another holder",
"lock": blocked,
})
try: try:
job = await swap_manager.trigger(req.model_key, dry_run=req.dry_run) job = await swap_manager.trigger(req.model_key, dry_run=req.dry_run)
except KeyError: except KeyError:
@@ -937,6 +971,89 @@ async def stream_swap(job_id: str):
return StreamingResponse(gen(), media_type="text/event-stream") return StreamingResponse(gen(), media_type="text/event-stream")
# ---- Coordination layer: swap lock + schedule registry ----
# Endpoints are control-surface, not browser-exempt: an external scheduler is a
# non-browser client (no Origin header) so it passes the CSRF guard already, the
# same way it calls /api/swap today; the dashboard is same-origin.
class LockAcquireRequest(BaseModel):
holder: str
ttl_seconds: int | None = None
note: str = ""
token: str | None = None # present only to extend an existing hold
@app.post("/api/swap/lock")
async def acquire_swap_lock(req: LockAcquireRequest) -> dict:
"""Reserve the GPU swap path. Returns a secret token used to swap (header
X-Swap-Lock-Token) and to release. 409 if held by another holder."""
try:
lock = swap_lock.acquire(req.holder, req.ttl_seconds, req.note, token=req.token)
except ValueError as e:
raise HTTPException(422, str(e))
except LockHeld as e:
raise HTTPException(status_code=409, detail={
"error": "swap lock is held by another holder",
"lock": e.state,
})
return {**swap_lock.status(), "token": lock.token}
@app.get("/api/swap/lock")
async def get_swap_lock() -> dict:
"""Public, token-free view of the reservation: held? who? until when?"""
return swap_lock.status()
@app.delete("/api/swap/lock")
async def release_swap_lock(request: Request, force: bool = Query(False)) -> dict:
"""Release the reservation. Needs the matching X-Swap-Lock-Token unless
?force=true (the human override from the dashboard)."""
token = request.headers.get("x-swap-lock-token") or request.query_params.get("token")
try:
released = swap_lock.release(token, force=force)
except PermissionError as e:
raise HTTPException(403, str(e))
return {"released": released, **swap_lock.status()}
class ScheduleRequest(BaseModel):
name: str
id: str | None = None
owner: str = ""
cron: str = ""
next_run: str = ""
description: str = ""
@app.get("/api/schedule")
async def list_schedules() -> dict:
return {"schedules": schedule_registry.list()}
@app.post("/api/schedule")
async def register_schedule(req: ScheduleRequest) -> dict:
"""Register (or update, by id) a schedule an external scheduler owns. Spark
Control only stores it for the dashboard — it never executes it."""
try:
entry = schedule_registry.register(
name=req.name, id=req.id, owner=req.owner,
cron=req.cron, next_run=req.next_run, description=req.description,
)
except ValueError as e:
raise HTTPException(422, str(e))
return entry.public()
@app.delete("/api/schedule/{schedule_id}")
async def delete_schedule(schedule_id: str) -> dict:
# Whitelist the path segment at the boundary (repo convention), even though
# it's only ever a dict key — keeps it from being reflected or logged raw.
if not valid_schedule_id(schedule_id):
raise HTTPException(422, "invalid schedule id")
return {"deleted": schedule_registry.delete(schedule_id)}
class DownloadRequest(BaseModel): class DownloadRequest(BaseModel):
repo: str repo: str
mode: Literal["spark1", "spark2", "cluster"] = "spark1" mode: Literal["spark1", "spark2", "cluster"] = "spark1"
+13 -2
View File
@@ -5,6 +5,7 @@ machinery. We just run `docker start|stop|restart <container>` via SSH on the
appropriate host. appropriate host.
""" """
from __future__ import annotations from __future__ import annotations
import logging
import time import time
from dataclasses import dataclass from dataclasses import dataclass
from typing import Literal, Optional from typing import Literal, Optional
@@ -13,6 +14,8 @@ from .config import Settings
from .shellsafe import quote_arg from .shellsafe import quote_arg
from .ssh import ssh_run from .ssh import ssh_run
log = logging.getLogger(__name__)
# Cache the "unreachable" verdict per (host, user) for a short period so that a # Cache the "unreachable" verdict per (host, user) for a short period so that a
# repeated docker_state call doesn't re-pay the 6 s SSH connect timeout each time. # repeated docker_state call doesn't re-pay the 6 s SSH connect timeout each time.
@@ -103,7 +106,13 @@ def services_from_settings(s: Settings) -> dict[str, ServiceDef]:
} }
for entry in load_custom_services(): for entry in load_custom_services():
key = entry.get("key") key = entry.get("key")
if not key or key in out: if not key:
continue
if key in out:
# A custom entry can't shadow a built-in (parakeet/kokoro/…); warn so
# an adopter who picked a colliding key for, say, a second vLLM sees
# why no tile appeared instead of a silent no-op.
log.warning("custom service %r collides with a built-in name; ignoring", key)
continue continue
out[key] = ServiceDef( out[key] = ServiceDef(
name=key, name=key,
@@ -113,7 +122,9 @@ def services_from_settings(s: Settings) -> dict[str, ServiceDef]:
container=entry.get("container", key), container=entry.get("container", key),
port=int(entry.get("port", 0)), port=int(entry.get("port", 0)),
) )
return out # Drop services the deployment has switched off (DISABLED_SERVICES) so they
# show no tile and are never probed/auto-restarted.
return {k: v for k, v in out.items() if k not in s.disabled_services}
async def docker_state(settings: Settings, svc: ServiceDef) -> dict: async def docker_state(settings: Settings, svc: ServiceDef) -> dict:
+104 -2
View File
@@ -21,11 +21,19 @@ const state = {
deep_health: {}, deep_health: {},
disk_status: {}, // keyed by model key: { on_disk, total_bytes, per_host } disk_status: {}, // keyed by model key: { on_disk, total_bytes, per_host }
disk_status_loaded: false, disk_status_loaded: false,
lock: { held: false }, // GPU swap reservation (coordination layer)
schedules: [], // schedules external automation has registered
}; };
const el = (sel) => document.querySelector(sel); const el = (sel) => document.querySelector(sel);
const $$ = (sel) => document.querySelectorAll(sel); const $$ = (sel) => document.querySelectorAll(sel);
// ISO timestamp -> local clock string (e.g. "2:45:10 PM"); '' if unparseable.
function fmtClock(iso) {
const t = Date.parse(iso);
return isNaN(t) ? '' : new Date(t).toLocaleTimeString();
}
function escapeHtml(s) { function escapeHtml(s) {
if (s == null) return ''; if (s == null) return '';
return String(s) return String(s)
@@ -51,6 +59,12 @@ function renderCards() {
const root = el('#cards'); const root = el('#cards');
root.innerHTML = ''; root.innerHTML = '';
const isSwapping = !!state.swap_job_id; const isSwapping = !!state.swap_job_id;
// GPU reserved by external automation — manual swaps are refused server-side
// (423); reflect that in the buttons so the click never bounces.
const locked = !!(state.lock && state.lock.held);
const lockTip = locked
? `Reserved by ${state.lock.holder || 'automation'}${state.lock.expires_at ? ' until ' + fmtClock(state.lock.expires_at) : ''}`
: '';
for (const key of Object.keys(state.models)) { for (const key of Object.keys(state.models)) {
const m = state.models[key]; const m = state.models[key];
const isActive = key === state.current_model_key; const isActive = key === state.current_model_key;
@@ -94,7 +108,9 @@ function renderCards() {
if (isActive) { if (isActive) {
primaryBtn = `<button class="btn" disabled>Current</button>`; primaryBtn = `<button class="btn" disabled>Current</button>`;
} else if (isOnDisk) { } else if (isOnDisk) {
primaryBtn = `<button class="btn primary" data-swap-key="${key}" ${isSwapping ? 'disabled' : ''}>Switch to this</button>`; const swapBlocked = isSwapping || locked;
const tip = locked ? ` title="${escapeHtml(lockTip)}"` : '';
primaryBtn = `<button class="btn primary" data-swap-key="${key}"${tip} ${swapBlocked ? 'disabled' : ''}>Switch to this</button>`;
} else if (m.local_path) { } else if (m.local_path) {
// A local model can't be "downloaded" — its directory has to exist on the Spark. // A local model can't be "downloaded" — its directory has to exist on the Spark.
primaryBtn = `<button class="btn" disabled title="Directory not found on the Spark — create it there, then refresh">Not found on Spark</button>`; primaryBtn = `<button class="btn" disabled title="Directory not found on the Spark — create it there, then refresh">Not found on Spark</button>`;
@@ -932,6 +948,10 @@ function renderHealth(status) {
function setDot(id, ok, payload) { function setDot(id, ok, payload) {
const item = el(id); const item = el(id);
if (!item) return; if (!item) return;
// A service switched off via DISABLED_SERVICES isn't part of this
// deployment — hide its indicator entirely rather than show it as down.
if (payload && payload.disabled) { item.classList.add('hidden'); return; }
item.classList.remove('hidden');
const dot = item.querySelector('.dot'); const dot = item.querySelector('.dot');
dot.classList.remove('ok', 'bad', 'warn'); dot.classList.remove('ok', 'bad', 'warn');
if (ok === true) dot.classList.add('ok'); if (ok === true) dot.classList.add('ok');
@@ -1230,6 +1250,11 @@ function openDiskDeleteDialog(key) {
async function triggerSwap(modelKey) { async function triggerSwap(modelKey) {
if (state.swap_job_id) return; if (state.swap_job_id) return;
if (state.lock && state.lock.held) {
const until = state.lock.expires_at ? ' until ' + fmtClock(state.lock.expires_at) : '';
alert(`The GPU swap path is reserved by ${state.lock.holder || 'automation'}${until}. Use "Release" on the reservation banner to override.`);
return;
}
try { try {
const r = await fetchJSON('/api/swap', { const r = await fetchJSON('/api/swap', {
method: 'POST', method: 'POST',
@@ -1238,10 +1263,84 @@ async function triggerSwap(modelKey) {
}); });
attachToSwap(r.job_id, /*needsBackfill=*/false); attachToSwap(r.job_id, /*needsBackfill=*/false);
} catch (e) { } catch (e) {
alert('Failed to start swap: ' + e.message); // 423 Locked: a reservation was acquired between our last poll and this click.
if (e.message && e.message.startsWith('423')) {
alert('The GPU swap path was just reserved by automation. Refreshing…');
pollCoordination();
} else {
alert('Failed to start swap: ' + e.message);
}
} }
} }
// ---- coordination layer: swap lock + schedule registry ----
async function pollCoordination() {
try {
state.lock = await fetchJSON('/api/swap/lock');
} catch { state.lock = { held: false }; }
try {
const r = await fetchJSON('/api/schedule');
state.schedules = r.schedules || [];
} catch { state.schedules = []; }
renderLockBanner();
renderSchedules();
renderCards(); // reflect lock state on the swap buttons
}
function renderLockBanner() {
const banner = el('#lock-banner');
if (!banner) return;
const lock = state.lock;
if (lock && lock.held) {
const until = lock.expires_at ? ` until ${fmtClock(lock.expires_at)}` : '';
const note = lock.note ? `${escapeHtml(lock.note)}` : '';
el('#lock-text').innerHTML =
`GPU swap path reserved by <strong>${escapeHtml(lock.holder || 'automation')}</strong>${until}${note}. Manual swaps are paused.`;
banner.classList.remove('hidden');
} else {
banner.classList.add('hidden');
}
}
function renderSchedules() {
const panel = el('#schedule-panel');
const list = el('#schedule-list');
if (!panel || !list) return;
const items = state.schedules || [];
if (!items.length) {
panel.classList.add('hidden');
list.innerHTML = '';
return;
}
list.innerHTML = items.map((s) => {
const meta = [
s.cron ? `<code>${escapeHtml(s.cron)}</code>` : '',
s.next_run ? `next: ${escapeHtml(s.next_run)}` : '',
s.owner ? `by ${escapeHtml(s.owner)}` : '',
].filter(Boolean).join(' · ');
const desc = s.description ? `<div class="desc">${escapeHtml(s.description)}</div>` : '';
return `<div class="schedule-item">
<div class="name">${escapeHtml(s.name)}</div>
<div class="muted small">${meta}</div>
${desc}
</div>`;
}).join('');
panel.classList.remove('hidden');
}
async function releaseLock() {
const lock = state.lock || {};
const who = lock.holder || 'automation';
if (!confirm(`Force-release the GPU reservation held by ${who}? Any job relying on it may then collide with a manual swap.`)) return;
try {
await fetchJSON('/api/swap/lock?force=true', { method: 'DELETE' });
} catch (e) {
alert('Failed to release: ' + e.message);
}
pollCoordination();
}
async function triggerDownloadForKey(modelKey) { async function triggerDownloadForKey(modelKey) {
const m = state.models[modelKey]; const m = state.models[modelKey];
if (!m) return; if (!m) return;
@@ -2098,6 +2197,7 @@ async function init() {
}); });
el('#sshkey-close').addEventListener('click', () => el('#sshkey-dialog').close()); el('#sshkey-close').addEventListener('click', () => el('#sshkey-dialog').close());
el('#open-local').addEventListener('click', openLocalModelDialog); el('#open-local').addEventListener('click', openLocalModelDialog);
el('#lock-release').addEventListener('click', releaseLock);
setupCatalogDialog(); setupCatalogDialog();
setupAdvancedDialog(); setupAdvancedDialog();
setupLocalModelDialog(); setupLocalModelDialog();
@@ -2115,6 +2215,7 @@ async function init() {
await loadModels(); await loadModels();
await pollStatus(); await pollStatus();
await renderServices(); await renderServices();
pollCoordination();
pollHardware(); pollHardware();
pollUpdates(); pollUpdates();
// Disk-status probe runs after first paint — slow over SSH and not blocking. // Disk-status probe runs after first paint — slow over SSH and not blocking.
@@ -2122,6 +2223,7 @@ async function init() {
// Speech-model patches panel — slow over SSH, runs after first paint. // Speech-model patches panel — slow over SSH, runs after first paint.
renderSpeechModels(); renderSpeechModels();
setInterval(pollStatus, 5000); setInterval(pollStatus, 5000);
setInterval(pollCoordination, 5000); // swap lock + schedule registry
setInterval(pollHardware, 8000); // every 8s setInterval(pollHardware, 8000); // every 8s
setInterval(pollUpdates, 300000); // every 5 min setInterval(pollUpdates, 300000); // every 5 min
setInterval(loadDiskStatus, 60000); // every 60s — disk state changes rarely setInterval(loadDiskStatus, 60000); // every 60s — disk state changes rarely
+15
View File
@@ -96,6 +96,13 @@
</details> </details>
</section> </section>
<section id="lock-banner" class="banner lock-banner hidden">
<span class="lock-icon" aria-hidden="true">🔒</span>
<span id="lock-text">GPU swap path reserved</span>
<span class="spacer"></span>
<button id="lock-release" class="btn small-btn">Release</button>
</section>
<nav id="dashboard-tabs" class="dashboard-tabs hidden" role="tablist"> <nav id="dashboard-tabs" class="dashboard-tabs hidden" role="tablist">
<button type="button" class="dashboard-tab" data-tab="llm" role="tab" aria-selected="true">LLM</button> <button type="button" class="dashboard-tab" data-tab="llm" role="tab" aria-selected="true">LLM</button>
<button type="button" class="dashboard-tab" data-tab="audio" role="tab" aria-selected="false">Audio / Speech</button> <button type="button" class="dashboard-tab" data-tab="audio" role="tab" aria-selected="false">Audio / Speech</button>
@@ -394,6 +401,14 @@
<section id="cards" class="cards"></section> <section id="cards" class="cards"></section>
</section> </section>
<section id="schedule-panel" class="schedule-panel hidden">
<div class="section-header">
<h2 class="section-title">Scheduled jobs</h2>
</div>
<p class="muted small">Registered by your own automation. Spark Control only displays these — it doesn't run them.</p>
<div id="schedule-list" class="schedule-list"></div>
</section>
<section id="update-banner" class="update-banner hidden"> <section id="update-banner" class="update-banner hidden">
<div class="ub-context muted small"> <div class="ub-context muted small">
Updates to <strong><a href="https://github.com/eugr/spark-vllm-docker" target="_blank" rel="noopener">eugr/spark-vllm-docker</a></strong> Updates to <strong><a href="https://github.com/eugr/spark-vllm-docker" target="_blank" rel="noopener">eugr/spark-vllm-docker</a></strong>
+36
View File
@@ -74,6 +74,42 @@ main {
} }
.banner em { font-style: normal; background: rgba(245, 158, 11, 0.15); padding: 2px 6px; border-radius: 4px; } .banner em { font-style: normal; background: rgba(245, 158, 11, 0.15); padding: 2px 6px; border-radius: 4px; }
/* GPU swap reservation (coordination layer) — informational, not a warning. */
.lock-banner {
display: flex;
align-items: center;
gap: 10px;
border-color: var(--info);
color: var(--info);
}
.lock-banner .lock-icon { font-size: 16px; }
.lock-banner strong { color: var(--text); }
.lock-banner .spacer { flex: 1; }
/* Scheduled-jobs panel — read-only view of what external automation registered. */
.schedule-panel { margin-top: 8px; }
.schedule-list {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(240px, 1fr));
gap: 12px;
margin-top: 8px;
}
.schedule-item {
background: var(--surface);
border: 1px solid var(--border);
border-radius: var(--radius);
padding: 12px 14px;
}
.schedule-item .name { font-weight: 600; margin-bottom: 4px; }
.schedule-item code {
background: var(--surface-2);
border: 1px solid var(--border);
border-radius: 4px;
padding: 1px 5px;
font-size: 12px;
}
.schedule-item .desc { margin-top: 6px; color: var(--muted); font-size: 13px; }
/* ===== Endpoint panel ===== */ /* ===== Endpoint panel ===== */
.endpoint-panel { .endpoint-panel {
+25 -2
View File
@@ -6,7 +6,9 @@ from datetime import datetime, timezone
from typing import Optional from typing import Optional
from .config import Settings from .config import Settings
from .coordination import WebhookNotifier, build_webhook_payload
from .models import Catalog, build_launch_command from .models import Catalog, build_launch_command
from .shellsafe import quote_arg
from .ssh import ssh_run, ssh_stream, StreamHandle from .ssh import ssh_run, ssh_stream, StreamHandle
@@ -32,9 +34,15 @@ class SwapJob:
class SwapManager: class SwapManager:
def __init__(self, settings: Settings, catalog: Catalog) -> None: def __init__(
self,
settings: Settings,
catalog: Catalog,
notifier: Optional[WebhookNotifier] = None,
) -> None:
self.settings = settings self.settings = settings
self.catalog = catalog self.catalog = catalog
self.notifier = notifier
self.lock = asyncio.Lock() self.lock = asyncio.Lock()
self.jobs: dict[str, SwapJob] = {} self.jobs: dict[str, SwapJob] = {}
self.current_job_id: Optional[str] = None self.current_job_id: Optional[str] = None
@@ -77,6 +85,21 @@ class SwapManager:
job.finished_at = datetime.now(timezone.utc).isoformat() job.finished_at = datetime.now(timezone.utc).isoformat()
if self.current_job_id == job.id: if self.current_job_id == job.id:
self.current_job_id = None self.current_job_id = None
# Outside the swap lock (so a webhook POST can't stall a queued swap) and
# only for real swaps — a dry run never changes the running model. A
# webhook failure is logged inside fire(), never raised.
if self.notifier is not None and self.notifier.enabled and not job.dry_run:
event = "swap_complete" if job.state == "ready" else "swap_failed"
await self.notifier.fire(event, build_webhook_payload(
event=event,
job_id=job.id,
model_key=job.model_key,
state=job.state,
returncode=job.returncode,
started_at=job.started_at,
finished_at=job.finished_at,
dry_run=job.dry_run,
))
async def _do(self, job: SwapJob) -> None: async def _do(self, job: SwapJob) -> None:
model = self.catalog.models[job.model_key] model = self.catalog.models[job.model_key]
@@ -112,7 +135,7 @@ class SwapManager:
# Step 3: tail logs until the ready marker (or timeout) # Step 3: tail logs until the ready marker (or timeout)
job.state = "tailing" job.state = "tailing"
tail_cmd = "docker logs -f --tail 50 vllm_node" tail_cmd = f"docker logs -f --tail 50 {quote_arg(s.vllm_container)}"
job.append(f"$ {tail_cmd}") job.append(f"$ {tail_cmd}")
timeout = max(model.expected_ready_seconds * 2, 600) timeout = max(model.expected_ready_seconds * 2, 600)
handle = StreamHandle() handle = StreamHandle()
+2 -1
View File
@@ -22,6 +22,7 @@ from typing import Any
from .config import Settings from .config import Settings
from .models import Catalog, build_launch_command from .models import Catalog, build_launch_command
from .shellsafe import quote_arg
from .ssh import ssh_run from .ssh import ssh_run
@@ -114,7 +115,7 @@ async def validate_launch(key: str, catalog: Catalog, settings: Settings) -> dic
# Pipe the JSON args list to a here-doc Python invocation. The validator # Pipe the JSON args list to a here-doc Python invocation. The validator
# reads from stdin to avoid shell-escaping the args themselves. # reads from stdin to avoid shell-escaping the args themselves.
cmd = ( cmd = (
f"echo '{payload}' | docker exec -i vllm_node python3 -c " f"echo '{payload}' | docker exec -i {quote_arg(settings.vllm_container)} python3 -c "
+ shlex.quote(_VALIDATOR_SCRIPT) + shlex.quote(_VALIDATOR_SCRIPT)
) )
+201
View File
@@ -0,0 +1,201 @@
"""Coordination layer: swap lock lifecycle/expiry, schedule registry CRUD, and
the webhook payload+signature. All offline — the lock takes an injectable `now`
so expiry is tested without sleeping, and the webhook is exercised only on the
disabled (no-network) path plus its pure payload/signature helpers.
"""
import asyncio
from datetime import datetime, timedelta, timezone
import pytest
from app.coordination import (
LOCK_TTL_MAX,
LOCK_TTL_MIN,
LockHeld,
ScheduleRegistry,
SwapLockManager,
WebhookNotifier,
build_webhook_payload,
sign_payload,
valid_schedule_id,
)
T0 = datetime(2026, 6, 17, 12, 0, 0, tzinfo=timezone.utc)
# ----------------------------------------------------------------- swap lock ----
def test_acquire_free_lock_returns_token_and_status_held():
mgr = SwapLockManager()
lock = mgr.acquire("openclaw", ttl_seconds=60, note="daily vol", now=T0)
assert lock.token
st = mgr.status(now=T0)
assert st["held"] is True
assert st["holder"] == "openclaw"
assert st["note"] == "daily vol"
assert st["seconds_remaining"] == 60
assert "token" not in st # public view never leaks the token
def test_acquire_requires_holder():
with pytest.raises(ValueError):
SwapLockManager().acquire(" ", now=T0)
def test_acquire_held_by_other_raises_lockheld_with_state():
mgr = SwapLockManager()
mgr.acquire("openclaw", ttl_seconds=60, now=T0)
with pytest.raises(LockHeld) as ei:
mgr.acquire("johnny5", ttl_seconds=60, now=T0)
assert ei.value.state["holder"] == "openclaw"
def test_reacquire_with_token_extends_and_keeps_token():
mgr = SwapLockManager()
first = mgr.acquire("openclaw", ttl_seconds=60, now=T0)
later = T0 + timedelta(seconds=30)
second = mgr.acquire("openclaw", ttl_seconds=60, token=first.token, now=later)
assert second.token == first.token
# window extended from the later moment, not the original
assert mgr.status(now=later)["seconds_remaining"] == 60
assert second.acquired_at == first.acquired_at # acquired_at preserved
def test_reacquire_without_token_is_refused_even_for_same_holder_name():
# Holder name is descriptive, not a secret — matching it must not grant access.
mgr = SwapLockManager()
mgr.acquire("openclaw", ttl_seconds=60, now=T0)
with pytest.raises(LockHeld):
mgr.acquire("openclaw", ttl_seconds=60, now=T0)
def test_ttl_is_clamped():
mgr = SwapLockManager()
mgr.acquire("a", ttl_seconds=0, now=T0)
assert mgr.status(now=T0)["seconds_remaining"] == LOCK_TTL_MIN
mgr2 = SwapLockManager()
mgr2.acquire("b", ttl_seconds=10**9, now=T0)
assert mgr2.status(now=T0)["seconds_remaining"] == LOCK_TTL_MAX
def test_lock_expires_and_clears_lazily():
mgr = SwapLockManager()
tok = mgr.acquire("openclaw", ttl_seconds=10, now=T0).token
after = T0 + timedelta(seconds=11)
assert mgr.status(now=after) == {"held": False}
assert mgr.verify(tok, now=after) is False
# an expired lock is free to re-take by anyone
mgr.acquire("johnny5", ttl_seconds=10, now=after)
assert mgr.status(now=after)["holder"] == "johnny5"
def test_verify_matches_only_active_token():
mgr = SwapLockManager()
tok = mgr.acquire("openclaw", ttl_seconds=60, now=T0).token
assert mgr.verify(tok, now=T0) is True
assert mgr.verify("nope", now=T0) is False
assert mgr.verify(None, now=T0) is False
def test_release_requires_token_then_frees():
mgr = SwapLockManager()
tok = mgr.acquire("openclaw", ttl_seconds=60, now=T0).token
with pytest.raises(PermissionError):
mgr.release("wrong", now=T0)
assert mgr.release(tok, now=T0) is True
assert mgr.status(now=T0) == {"held": False}
def test_force_release_skips_token_and_release_of_free_lock_is_false():
mgr = SwapLockManager()
mgr.acquire("openclaw", ttl_seconds=60, now=T0)
assert mgr.release(force=True, now=T0) is True
assert mgr.release(force=True, now=T0) is False # nothing held now
def test_is_blocked_by_is_the_swap_gate():
# Mirrors the single-read decision the /api/swap endpoint makes.
mgr = SwapLockManager()
assert mgr.is_blocked_by(None, now=T0) is None # free lock blocks nobody
tok = mgr.acquire("openclaw", ttl_seconds=10, now=T0).token
blocked = mgr.is_blocked_by(None, now=T0) # no token -> blocked
assert blocked is not None and blocked["holder"] == "openclaw"
assert mgr.is_blocked_by("wrong", now=T0) is not None # wrong token -> blocked
assert mgr.is_blocked_by(tok, now=T0) is None # holder's token -> allowed
# At/after expiry the gate is open even without a token (the bug a separate
# status()+verify() pair would get wrong).
assert mgr.is_blocked_by(None, now=T0 + timedelta(seconds=11)) is None
# ------------------------------------------------------------------- webhook ----
def test_build_webhook_payload_shape():
p = build_webhook_payload(
event="swap_complete", job_id="abc123", model_key="gemma",
state="ready", returncode=0, started_at="t0", finished_at="t1",
dry_run=False,
)
assert p == {
"event": "swap_complete", "job_id": "abc123", "model_key": "gemma",
"state": "ready", "returncode": 0, "started_at": "t0",
"finished_at": "t1", "dry_run": False,
}
def test_sign_payload_is_deterministic_and_prefixed():
body = b'{"event":"swap_complete"}'
sig = sign_payload("s3cr3t", body)
assert sig.startswith("sha256=")
assert sig == sign_payload("s3cr3t", body)
assert sig != sign_payload("other", body)
def test_disabled_webhook_fire_is_noop():
n = WebhookNotifier("", "")
assert n.enabled is False
# Must not attempt any network call or raise when no URL is configured.
assert asyncio.run(n.fire("swap_complete", {"x": 1})) is None
# --------------------------------------------------------- schedule registry ----
def test_register_and_list_schedule():
reg = ScheduleRegistry()
e = reg.register(name="Daily Vol", owner="openclaw", cron="0 6 * * *")
assert e.id and e.registered_at and e.updated_at
listed = reg.list()
assert len(listed) == 1 and listed[0]["name"] == "Daily Vol"
def test_register_with_id_updates_in_place():
reg = ScheduleRegistry()
reg.register(name="Daily Vol", id="dv", owner="openclaw", cron="0 6 * * *")
reg.register(name="Daily Vol v2", id="dv", owner="openclaw", cron="0 7 * * *")
listed = reg.list()
assert len(listed) == 1
assert listed[0]["name"] == "Daily Vol v2" and listed[0]["cron"] == "0 7 * * *"
def test_register_requires_name_and_validates_id():
reg = ScheduleRegistry()
with pytest.raises(ValueError):
reg.register(name=" ")
with pytest.raises(ValueError):
reg.register(name="ok", id="bad id; rm -rf")
def test_delete_schedule():
reg = ScheduleRegistry()
reg.register(name="Daily Vol", id="dv")
assert reg.delete("dv") is True
assert reg.delete("dv") is False
assert reg.list() == []
def test_valid_schedule_id():
assert valid_schedule_id("daily-vol")
assert valid_schedule_id("a.b_c-1")
assert not valid_schedule_id("")
assert not valid_schedule_id("../etc")
assert not valid_schedule_id("has space")
assert not valid_schedule_id("x" * 65)
+120
View File
@@ -0,0 +1,120 @@
"""Configurable topology: DISABLED_SERVICES, vLLM container override, and the
extra-vLLM probe. All offline — the disabled checks short-circuit before any
network call, and the probes are exercised only on the not-configured path.
"""
import asyncio
from app.config import Settings
from app.health import (
check_embeddings,
check_kokoro,
check_parakeet,
check_qdrant,
check_vllm,
probe_vllm_endpoint,
)
from app.services import services_from_settings
def _settings(monkeypatch, **env) -> Settings:
# Pin the topology env vars under test; default the rest to blank so a stray
# value in the real environment can't leak into the assertion.
keys = [
"SPARK1_HOST", "SPARK1_USER", "SPARK2_HOST", "SPARK2_USER",
"DISABLED_SERVICES", "VLLM_CONTAINER",
]
for k in keys:
monkeypatch.delenv(k, raising=False)
for k, v in env.items():
monkeypatch.setenv(k, v)
return Settings.from_env()
# ---- DISABLED_SERVICES parsing ----
def test_disabled_services_parsed_lowercased_and_trimmed(monkeypatch):
s = _settings(monkeypatch, DISABLED_SERVICES="parakeet, Kokoro ,,")
assert s.disabled_services == frozenset({"parakeet", "kokoro"})
def test_disabled_services_blank_is_empty(monkeypatch):
assert _settings(monkeypatch).disabled_services == frozenset()
# ---- vLLM container override ----
def test_vllm_container_defaults_to_vllm_node(monkeypatch):
assert _settings(monkeypatch).vllm_container == "vllm_node"
def test_vllm_container_override(monkeypatch):
assert _settings(monkeypatch, VLLM_CONTAINER="vllm-gemma4").vllm_container == "vllm-gemma4"
def test_vllm_container_invalid_falls_back(monkeypatch):
# A malformed value (space / shell metachar) is rejected at the boundary and
# falls back to the default rather than crashing startup or reaching a sink.
assert _settings(monkeypatch, VLLM_CONTAINER="bad name; rm -rf").vllm_container == "vllm_node"
# ---- services map honors the disable list ----
def test_services_from_settings_drops_disabled(monkeypatch):
s = _settings(
monkeypatch,
SPARK1_HOST="10.0.0.1", SPARK1_USER="u",
SPARK2_HOST="10.0.0.2", SPARK2_USER="u",
DISABLED_SERVICES="parakeet,qdrant",
)
svcs = services_from_settings(s)
assert "parakeet" not in svcs and "qdrant" not in svcs
assert "kokoro" in svcs and "embeddings" in svcs
def test_custom_vllm_service_registered(monkeypatch):
from app import custom_services
monkeypatch.setattr(custom_services, "load_custom_services", lambda: [
{"key": "vllm-spark2", "kind": "vllm", "host": "10.0.0.2",
"user": "u", "container": "vllm_node", "port": 8000},
])
s = _settings(monkeypatch, SPARK1_HOST="10.0.0.1", SPARK1_USER="u",
SPARK2_HOST="10.0.0.2", SPARK2_USER="u")
svc = services_from_settings(s)["vllm-spark2"]
assert svc.kind == "vllm" and svc.port == 8000 and svc.container == "vllm_node"
def test_custom_service_colliding_with_builtin_is_ignored(monkeypatch):
# A custom entry can't shadow a built-in key — the built-in wins.
from app import custom_services
monkeypatch.setattr(custom_services, "load_custom_services", lambda: [
{"key": "parakeet", "kind": "vllm", "host": "10.0.0.9", "user": "u", "port": 8000},
])
s = _settings(monkeypatch, SPARK1_HOST="10.0.0.1", SPARK1_USER="u",
SPARK2_HOST="10.0.0.2", SPARK2_USER="u")
assert services_from_settings(s)["parakeet"].kind == "stt"
# ---- disabled health checks short-circuit (no network) ----
def test_disabled_check_returns_disabled_verdict(monkeypatch):
s = _settings(
monkeypatch,
SPARK2_HOST="10.0.0.2", SPARK2_USER="u", # host set, but disable wins
DISABLED_SERVICES="parakeet,kokoro,embeddings,qdrant",
)
for check in (check_parakeet, check_kokoro, check_embeddings, check_qdrant):
r = asyncio.run(check(s))
assert r == {"ok": False, "disabled": True, "error": "disabled", "base_url": None}
# ---- vLLM probe: not-configured path is pure ----
def test_probe_vllm_endpoint_unconfigured(monkeypatch):
r = asyncio.run(probe_vllm_endpoint("", 8000))
assert r["ok"] is False and "not configured" in r["error"]
def test_check_vllm_unconfigured_without_spark1(monkeypatch):
s = _settings(monkeypatch) # no SPARK1_HOST
r = asyncio.run(check_vllm(s))
assert r["ok"] is False and "spark1 not configured" in r["error"]
@@ -49,6 +49,24 @@ const inputSpec = InputSpec.of({
placeholder: 'leave blank for 8888', placeholder: 'leave blank for 8888',
masked: false, masked: false,
}), }),
vllm_container: Value.text({
name: 'vLLM container name (optional)',
description:
'Docker container name for the swappable vLLM on Spark 1. Defaults to "vllm_node" (what the bundled launch-cluster.sh creates). Change this only if you run your vLLM under a different container name — the model-swap log view and the pre-flight validator exec into it by name.',
required: false,
default: null,
placeholder: 'leave blank for vllm_node',
masked: false,
}),
disabled_services: Value.text({
name: 'Services to hide (optional)',
description:
"Comma-separated list of built-in services your cluster doesn't run, so Spark Control hides their tiles and stops probing them. Valid names: parakeet, kokoro, embeddings, qdrant. Example: if you only run vLLM, set this to 'parakeet,kokoro,embeddings,qdrant'. Leave blank to monitor all of them. (Useful when, say, your vLLM shares port 8000 with Parakeet's default — hide Parakeet so its probe doesn't hit vLLM.)",
required: false,
default: null,
placeholder: 'e.g. parakeet,kokoro',
masked: false,
}),
parakeet_host: Value.text({ parakeet_host: Value.text({
name: 'Parakeet host (optional)', name: 'Parakeet host (optional)',
description: description:
@@ -155,6 +173,24 @@ const inputSpec = InputSpec.of({
placeholder: 'starts with "nvapi-..."', placeholder: 'starts with "nvapi-..."',
masked: true, masked: true,
}), }),
swap_webhook_url: Value.text({
name: 'Swap webhook URL (optional)',
description:
'If you run automation that needs to know when the loaded model changes, paste a URL here. Spark Control POSTs a small JSON event (swap_complete / swap_failed) to it after every model swap, so the consumer can re-point its config to the new model. Leave blank to disable. Only needed if something other than this dashboard cares about swaps.',
required: false,
default: null,
placeholder: 'e.g. https://my-service.local/spark-swap',
masked: false,
}),
swap_webhook_secret: Value.text({
name: 'Swap webhook secret (optional)',
description:
'Optional shared secret. If set, each webhook is signed with an "X-Spark-Signature: sha256=…" header (HMAC of the body) so the receiver can verify it really came from Spark Control. Leave blank to send the webhook unsigned.',
required: false,
default: null,
placeholder: 'a random string the receiver also knows',
masked: true,
}),
}) })
export const configureSparks = sdk.Action.withInput( export const configureSparks = sdk.Action.withInput(
@@ -9,6 +9,11 @@ export const sparkConfigSchema = z.object({
spark2_user: z.string().catch(''), spark2_user: z.string().catch(''),
// Optional vLLM port override (Spark 1). Blank => 8888 (launch-cluster.sh default). // Optional vLLM port override (Spark 1). Blank => 8888 (launch-cluster.sh default).
vllm_port: z.string().catch(''), vllm_port: z.string().catch(''),
// Optional vLLM container-name override (Spark 1). Blank => "vllm_node".
vllm_container: z.string().catch(''),
// Optional comma-separated list of built-in services to switch off
// (parakeet, kokoro, embeddings, qdrant). Blank => all enabled.
disabled_services: z.string().catch(''),
// Optional per-service overrides. Blank => use spark2_host / spark2_user. // Optional per-service overrides. Blank => use spark2_host / spark2_user.
parakeet_host: z.string().catch(''), parakeet_host: z.string().catch(''),
parakeet_user: z.string().catch(''), parakeet_user: z.string().catch(''),
@@ -30,6 +35,11 @@ export const sparkConfigSchema = z.object({
open_webui_url: z.string().catch(''), open_webui_url: z.string().catch(''),
// Optional NGC API key for pulling NIM containers from nvcr.io/nim/... // Optional NGC API key for pulling NIM containers from nvcr.io/nim/...
ngc_api_key: z.string().catch(''), ngc_api_key: z.string().catch(''),
// Optional coordination webhook: POSTed on swap_complete/swap_failed so
// downstream consumers re-point their model config. Blank => disabled.
swap_webhook_url: z.string().catch(''),
// Optional shared secret; if set, the webhook body is HMAC-signed.
swap_webhook_secret: z.string().catch(''),
}) })
export type SparkConfig = z.infer<typeof sparkConfigSchema> export type SparkConfig = z.infer<typeof sparkConfigSchema>
+8
View File
@@ -14,6 +14,8 @@ export const main = sdk.setupMain(async ({ effects }) => {
spark2_host: '', spark2_host: '',
spark2_user: '', spark2_user: '',
vllm_port: '', vllm_port: '',
vllm_container: '',
disabled_services: '',
parakeet_host: '', parakeet_host: '',
parakeet_user: '', parakeet_user: '',
parakeet_container: '', parakeet_container: '',
@@ -30,6 +32,8 @@ export const main = sdk.setupMain(async ({ effects }) => {
matrix_bridge_user: '', matrix_bridge_user: '',
open_webui_url: '', open_webui_url: '',
ngc_api_key: '', ngc_api_key: '',
swap_webhook_url: '',
swap_webhook_secret: '',
} }
return sdk.Daemons.of(effects).addDaemon('primary', { return sdk.Daemons.of(effects).addDaemon('primary', {
@@ -52,6 +56,8 @@ export const main = sdk.setupMain(async ({ effects }) => {
SPARK2_HOST: cfg.spark2_host, SPARK2_HOST: cfg.spark2_host,
SPARK2_USER: cfg.spark2_user, SPARK2_USER: cfg.spark2_user,
VLLM_PORT: cfg.vllm_port, VLLM_PORT: cfg.vllm_port,
VLLM_CONTAINER: cfg.vllm_container,
DISABLED_SERVICES: cfg.disabled_services,
PARAKEET_HOST: cfg.parakeet_host, PARAKEET_HOST: cfg.parakeet_host,
PARAKEET_USER: cfg.parakeet_user, PARAKEET_USER: cfg.parakeet_user,
PARAKEET_CONTAINER: cfg.parakeet_container, PARAKEET_CONTAINER: cfg.parakeet_container,
@@ -71,6 +77,8 @@ export const main = sdk.setupMain(async ({ effects }) => {
CONNECTIVITY_LOG: '/data/connectivity.json', CONNECTIVITY_LOG: '/data/connectivity.json',
OPEN_WEBUI_URL: cfg.open_webui_url, OPEN_WEBUI_URL: cfg.open_webui_url,
NGC_API_KEY: cfg.ngc_api_key, NGC_API_KEY: cfg.ngc_api_key,
SWAP_WEBHOOK_URL: cfg.swap_webhook_url,
SWAP_WEBHOOK_SECRET: cfg.swap_webhook_secret,
BIND_PORT: String(uiPort), BIND_PORT: String(uiPort),
}, },
}, },
+2 -2
View File
@@ -1,10 +1,10 @@
import { VersionInfo, IMPOSSIBLE } from '@start9labs/start-sdk' import { VersionInfo, IMPOSSIBLE } from '@start9labs/start-sdk'
export const v0_1_0 = VersionInfo.of({ export const v0_1_0 = VersionInfo.of({
version: '0.23.0:0', version: '0.25.0:0',
releaseNotes: { releaseNotes: {
en_US: en_US:
"v0.23.0:0 — local / fine-tuned model support. You can now add a model that lives as a directory on a Spark (e.g. a LoRA-merged fine-tune), not just a Hugging Face repo. Use the new \"+ Add local model\" button under LLM swap: give it the model's absolute path on the Spark, an optional chat-template path, and the usual launch knobs. On swap, Spark Control bind-mounts that directory into the vLLM container at the same path (via the launch script's existing VLLM_SPARK_EXTRA_DOCKER_ARGS hook — nothing to change on the Spark) and runs `vllm serve <dir>`. Local models show a \"local\" badge and their path instead of a Hugging Face link, and their weights are never offered for dashboard deletion (that directory is your own training output, not a re-downloadable cache). API: POST /api/models now accepts `local_path` (set exactly one of `repo` or `local_path`), validated against a strict path whitelist with no traversal.", "v0.25.0:0 — cluster coordination layer (GPU arbiter). For clusters where automation, not just this dashboard, swaps models. Three additions: (1) Swap reservation lock — an external scheduler can reserve the GPU swap path (POST /api/swap/lock) and gets a secret token; while held, any swap without the token is refused (423), so the dashboard's manual swap is paused and shows who holds the GPU and until when (with a human Release override). The lock is TTL-bounded and self-frees. (2) Swap webhook — set a URL (and optional signing secret) in Configure Sparks; Spark Control POSTs a swap_complete / swap_failed event after each swap so downstream consumers re-point their model config. (3) Schedule registry — your automation can register its cron jobs (POST /api/schedule) for a read-only \"Scheduled jobs\" panel on the dashboard; Spark Control only displays them, it never runs them. New API: /api/swap/lock (GET/POST/DELETE), /api/schedule (GET/POST/DELETE). See docs/COORDINATION.md. Spark Control remains a control plane, not a job runner — business pipelines stay in their own services and call the swap API.",
}, },
migrations: { migrations: {
up: async ({ effects }) => {}, up: async ({ effects }) => {},
+31
View File
@@ -52,6 +52,26 @@ The **Update** button runs `git fetch && git reset --hard origin/<branch> && doc
3. Spark Control's own package key must be authorized for that SSH user (Show Public Key → add to their `authorized_keys`) unless it's the same user Spark Control already uses for that Spark. 3. Spark Control's own package key must be authorized for that SSH user (Show Public Key → add to their `authorized_keys`) unless it's the same user Spark Control already uses for that Spark.
## Configurable topology (v0.24.0+)
For a cluster wired differently from the reference layout, three optional knobs in **Configure Sparks** (no fork needed):
- **vLLM container name** — defaults to `vllm_node`. Set it if your swappable vLLM on Spark 1 runs under a different container name; the swap log-tail and the pre-flight validator `docker exec` into it by name.
- **Services to hide** — comma-separated `parakeet,kokoro,embeddings,qdrant`. Hidden services show no tile and are never probed (status, deep-health, or connectivity log). Use this when a service you don't run would otherwise be probed at a port something else answers — e.g. a vLLM on port 8000 colliding with Parakeet's default.
- **Monitor a second vLLM** — the swap machinery only drives the Spark 1 vLLM, but you can *monitor* a vLLM on another Spark by adding a custom service of `kind: vllm` to `/data/services-overrides.yaml`:
```yaml
custom:
- key: vllm-spark2
kind: vllm
host: <spark-2-ip>
user: <ssh-user>
container: vllm_node
port: 8000
```
It gets a read-only tile: loaded model (via `/v1/models`), container state, and start/stop/restart. (Spark Control's SSH key must be authorized for that user — Show Public Key.)
## Adding a new model ## Adding a new model
1. Add an entry to `image/models.yaml`. Required fields: `display_name`, `repo`, `size_gb`, `mode` (`solo` or `cluster`), `vllm_args`. Optional but recommended: `description` (one paragraph — what the model is, what it's good for, how it differs from others; renders below the meta tags in each card), `capabilities` (tags like `[vision, reasoning, tools]`), `expected_ready_seconds`. 1. Add an entry to `image/models.yaml`. Required fields: `display_name`, `repo`, `size_gb`, `mode` (`solo` or `cluster`), `vllm_args`. Optional but recommended: `description` (one paragraph — what the model is, what it's good for, how it differs from others; renders below the meta tags in each card), `capabilities` (tags like `[vision, reasoning, tools]`), `expected_ready_seconds`.
@@ -81,6 +101,17 @@ cd ~/spark-vllm-docker
docker logs -f vllm_node # wait for "Application startup complete." docker logs -f vllm_node # wait for "Application startup complete."
``` ```
## Sideload (`make install`) can't reach the server
Symptom: `make install` fails with `package.sideload: error sending request for url (https://immense-voyage.local/rpc/v1)`. Cause seen 2026-06-17: `immense-voyage.local` stopped resolving via mDNS from the Mac (`curl https://immense-voyage.local/...` → exit 6, "couldn't resolve host"), even though the server is up — `curl -sk https://<server-ip>/rpc/v1` returns 200.
- **Don't** work around it with `start-cli -H https://<server-ip> package install`: TLS connects but it returns `UNAUTHORIZED`, because start-cli's stored credential is bound to the registered `.local` host, not the IP.
- **Fix:** make the name resolve again, then re-run `make install`:
- `sudo dscacheutil -flushcache && sudo killall -HUP mDNSResponder` (flush mDNS), or
- `echo "<server-ip> immense-voyage.local" | sudo tee -a /etc/hosts` (deterministic; remove later).
Note this only blocks installing to *your own* Start9 — building and publishing the s9pk to Gitea Releases is unaffected (adopters still pull the latest).
## Diagnostics ## Diagnostics
```bash ```bash