Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c0b35184ba | |||
| 7ecd77f1e5 | |||
| 6bcda6e348 | |||
| 7ae6ab3ba8 | |||
| dd3d1412d4 |
@@ -33,7 +33,7 @@ Subsystem guidance lives in `docs/guides/` and loads when matching files are tou
|
||||
|
||||
- `image/app/` — FastAPI app (`server.py` entry, routers in sibling modules, `static/` dashboard UI).
|
||||
- `package/startos/` — StartOS manifest, interfaces, actions, version + release notes.
|
||||
- `docs/` — `AUDIO_API.md`, `EMBEDDINGS.md`, `REDACTION_GATEWAY.md` (consumer-facing API refs; update with API changes).
|
||||
- `docs/` — `AUDIO_API.md`, `EMBEDDINGS.md`, `REDACTION_GATEWAY.md`, `COORDINATION.md` (consumer-facing API refs; update with API changes).
|
||||
- `README.md` (overview), `HANDOFF.md` (fresh-user install guide), `runbook.md` (ops notes), `known-issues.md`, `ROADMAP.md` (longer-term backlog — items move into "Current state" below when picked up).
|
||||
|
||||
## Conventions
|
||||
@@ -55,12 +55,15 @@ Subsystem guidance lives in `docs/guides/` and loads when matching files are tou
|
||||
|
||||
## Current state
|
||||
|
||||
- **Live service runs v0.22.0:0** (installed and serving); **v0.23.0:0 is built, committed (`e783653`), tagged, and published to Gitea Releases but its live install is PENDING** — see the P3 line below. Working features: swap dashboard; chat / transcribe / diarize(+chunk) / TTS proxies; embeddings + rerank + hybrid search (Qdrant); `/scrub` + `/rehydrate`; label-merge incl. dual-channel; per-Spark SSH-key copy + WireGuard `VPN <ip>` hardware-card badge; configurable vLLM port (Configure Sparks field, blank ⇒ 8888). Local/fine-tuned model support lands live once v0.23.0:0 installs. Spark 2 audio stack healthy. Security hardening (v0.19.0:0 — shellsafe SSH-injection guard, Qdrant path-injection, same-origin CSRF guard) shipped and stable; evidence in `EVALUATION.md`.
|
||||
- **matrix-bridge bot tile (done, v0.21.0:1, verified live):** `bot`-kind service tile — status badge from docker-state only (no HTTP port), plus **Update** / Restart / Stop/Start / **View logs**. Code: `app/matrix_bridge.py` + `/api/matrix-bridge/{update,logs}` (update streams; 25-min cap; fail-loud). Driven directly as `modelo` on Spark 2 (**no `sudo -iu`** — spark2 has no passwordless sudo). User is a blank-default Configure-Sparks field (`matrix_bridge_user`); blank → tile hidden (portable). Host reuses `spark2_host` (`192.168.1.87` = the bot's box `spark-32d0`); container/dir/branch are env-overridable defaults. **Load-bearing ops dep:** Update's `git fetch` runs as `modelo`, which needs `modelo`'s `~/.ssh/config` pinning the Gitea deploy key with `IdentitiesOnly yes` — else the wrong key is offered and Gitea denies (publickey). Optional next, only if the bot dev asks: Docker `HEALTHCHECK` for running-but-disconnected detection (spec §Note).
|
||||
- **Tests:** offline pytest harness in `image/tests/` — `cd image && .venv/bin/python -m pytest` (102 passing). Covers `build_launch_command` (incl. the shell-injection round-trip + local-model bind-mount), the transcript↔diarizer label-merge, the `shellsafe` validators, `matrix_bridge.build_update_command` (+ phase detection), and the configurable-topology layer (`test_topology.py`: `DISABLED_SERVICES` parsing, `vllm_container` override, disabled-service skip in `services_from_settings` + `check_*`, `probe_vllm_endpoint`). Mock-heavy swap/proxy tests deliberately skipped (low ROI). Redaction + live-audio suites remain standalone scripts.
|
||||
- **Live: v0.25.0:0** (installed 2026-06-18, server reports `status: installed`). The OpenClaw/Johnny-5 coexistence epic is fully shipped & live: configurable `VLLM_PORT` (v0.22, blank ⇒ 8888), local/fine-tuned models (v0.23), configurable topology (v0.24 — `VLLM_CONTAINER`, `DISABLED_SERVICES` hide-list, second-Spark `kind: vllm` monitor), coordination layer (v0.25 — swap reservation lock with `423`-enforced manual-swap pause + `?force=true` Release override, `swap_complete`/`swap_failed` webhook, read-only schedule registry; consumer API in `docs/COORDINATION.md`).
|
||||
- **Other live features:** swap dashboard; chat / transcribe / diarize(+chunk) / TTS proxies; embeddings + rerank + hybrid search (Qdrant); `/scrub` + `/rehydrate`; label-merge incl. dual-channel; per-Spark SSH-key copy + WireGuard `VPN <ip>` hardware badge. Security hardening (v0.19 — shellsafe SSH-injection guard, Qdrant path-injection, same-origin CSRF guard) stable (`EVALUATION.md`). Spark 2 audio/embeddings stack healthy.
|
||||
- **matrix-bridge bot tile (v0.21.0:1, live):** `bot`-kind tile (docker-state badge; Update/Restart/Stop-Start/View-logs) for the Matrix bot on Spark 2, driven as `modelo` (no `sudo -iu`; blank `matrix_bridge_user` ⇒ tile hidden; host reuses `spark2_host`). Code: `app/matrix_bridge.py` + `/api/matrix-bridge/{update,logs}`. **Load-bearing:** Update's `git fetch` runs as `modelo` and needs `modelo`'s `~/.ssh/config` pinning the Gitea deploy key with `IdentitiesOnly yes` (else publickey denial). Optional next only if the bot dev asks: Docker `HEALTHCHECK`.
|
||||
- **Tests:** offline pytest harness in `image/tests/` — `cd image && .venv/bin/python -m pytest` (124 passing). Covers `build_launch_command` (incl. the shell-injection round-trip + local-model bind-mount), the transcript↔diarizer label-merge, the `shellsafe` validators, `matrix_bridge.build_update_command` (+ phase detection), the configurable-topology layer (`test_topology.py`), and the coordination layer (`test_coordination.py`: swap-lock lifecycle/expiry/token-auth, schedule-registry CRUD, webhook payload + HMAC signature — `now` is injected into the lock so expiry is tested without sleeping). Mock-heavy swap/proxy/endpoint tests deliberately skipped (low ROI). Redaction + live-audio suites remain standalone scripts.
|
||||
- **Signal Engine "flakiness":** diagnosed as *not* a server bug — transient 1–4s unresponsiveness while the single GPU is busy. Client-side remedy (in-flight cap 2 / ceiling 3 / retry-on-timeout+503) drafted and **forwarded to that dev (owner confirmed 2026-06-15)**. Awaiting whether they want the measured concurrency knee.
|
||||
- **Stance (decided, not built):** no public interface / no API-token auth — LAN + WireGuard/Tailscale split-tunnel only; the CSRF guard covers the browser-driven vector.
|
||||
- **Known limits:** `/health` blips while the GPU is busy (mitigated client-side); dual-channel can miss a quiet local word under loud remote bleed; connectivity log misses sub-5s outages between 5s polls; diarizer caps at 4 speakers; matrix-bridge badge won't visibly flip on a fast `docker restart` (status re-checked only after the command returns).
|
||||
- **Infra gotcha (safety):** passwordless sudo is NOT configured on spark2 — design unprivileged probes for any Spark feature (the badge uses `ip`, not `sudo wg show`). spark2 sits on the `starttunnel` WireGuard subnet (`10.59.211.6/24`, survives reboot). Owner declined SSH-key rotation after the 2026-06-12 history scrub (only the key *name* leaked) — don't re-flag.
|
||||
- **Hosting:** self-hosted Gitea — remote `gitea`, branch `master`, over SSH; push after committing. (Wart: commit `8d839e3` is mislabeled `v0.13.0:4` but contains through v0.18.0:0.)
|
||||
- **Next — committed 2026-06-17: OpenClaw/Johnny-5 coexistence epic (full plan + design stance in `ROADMAP.md` → "Cluster coordination").** Stance: Spark Control = control plane / GPU arbiter, **not** a job runner; business cron jobs live in separate services that *call* its swap API (swaps are already API-driven via `POST /api/swap`). Sequence: (1) **configurable `VLLM_PORT`** — SHIPPED **v0.22.0:0** (Configure-Sparks field, blank ⇒ 8888; + `_env_int` hardening in `config.py` so a blank/bad port no longer crashes startup, killing a P3 tech-debt item). Committed `136a471`, pushed, tagged `v0.22.0`, rebuilt clean, installed, and **published to the self-hosted Gitea Releases** 2026-06-17 (`make release` → `scripts/gitea-release.sh`, takes `GITEA_URL` + a write token). **Distribution model (decided 2026-06-17):** Gitea Releases + a read-only token the adopter's agent uses to pull the latest s9pk (`GET /api/v1/repos/grant/spark-control/releases/latest` → download the `.s9pk` asset → sideload). Note: Gitea returns `browser_download_url` on its `.local` ROOT_URL, which won't resolve off-LAN — a remote adopter pulls via whatever address reaches the Gitea (the WireGuard IP). (2) **local-path/fine-tuned models** — DONE in tree, staged as **v0.23.0:0** (`ModelDef.local_path` + exactly-one-source validator; swap bind-mounts the dir at the same container path via the launch script's `VLLM_SPARK_EXTRA_DOCKER_ARGS` hook, **no `launch-cluster.sh` change**; "+ Add local model" UI form + `local` badge; `validate_local_path`; disk-delete refused for local; 94 tests pass. Reviewer-agent pass done, findings addressed (path validation + chat-template-location guard folded into the `ModelDef` validator so YAML/override entries are checked too; `_merge_overrides` skips a bad entry instead of failing the whole catalog; `VLLM_SPARK_EXTRA_DOCKER_ARGS` contract documented in `runbook.md`). **Committed `e783653`, tagged `v0.23.0`, built clean, published to Gitea Releases — but `make install` to the live Start9 FAILED: `immense-voyage.local` wasn't resolving via mDNS from the Mac (server up at `192.168.1.72`; `start-cli -H <ip>` reaches it but returns UNAUTHORIZED, auth bound to the registered `.local` host). FINISH-HERE: flush mDNS (`sudo dscacheutil -flushcache && sudo killall -HUP mDNSResponder`) or add a hosts entry, then re-run `cd package && make install`** (details in runbook → "Sideload can't reach the server"). (3) **configurable topology** — DONE in tree, staged as **v0.24.0:0** (built clean, not yet committed/installed). Three optional Configure-Sparks knobs: vLLM container name (`VLLM_CONTAINER`, blank ⇒ `vllm_node`, threaded into the swap log-tail + validator exec via `quote_arg`); "services to hide" (`DISABLED_SERVICES` comma list → `Settings.disabled_services` frozenset, skipped by `services_from_settings`, the `check_*` probes, deep-health `run_all`, and connectivity logging — kills the Parakeet-on-8000 collision); second-Spark vLLM monitor via a `kind: vllm` custom service in `services-overrides.yaml` (`probe_vllm_endpoint` shared with `check_vllm`). `/api/endpoints` gained a `disabled` flag; the health-dot hides when disabled. 102 tests pass (+8 in `test_topology.py`). Swap mechanism deliberately NOT generalized to raw `docker run` (that's coordination, item 4). Install pending — same mDNS situation as v0.23.0. Next: (4) coordination layer (swap lock + swap webhook + schedule visibility) — only when our own automation lands. Still-open older threads: audio concurrency sweep (only if the Signal Engine dev wants the knee; needs a quiet window); optional matrix-bridge Docker `HEALTHCHECK` if the bot dev asks; Parakeet long-audio guard deferred (rationale in ROADMAP).
|
||||
- **Design stance (decided):** Spark Control = control plane / GPU arbiter, **not** a job runner; recurring business jobs live in separate services that *call* the swap API (`POST /api/swap`). Full epic history (v0.22→v0.25) is in git log + `ROADMAP.md` → "Cluster coordination".
|
||||
- **Usage note (2026-06-18):** owner's daily driver is the solo **Qwen3.6 35B**; the 235B `cluster` models are dormant. Keeping `launch-cluster.sh` (the `eugr/spark-vllm-docker` community standard, mirrors NVIDIA's `dgx-spark-playbooks` Ray+RoCE design) is still correct even single-node — it supplies the maintained, hardware-tuned vLLM images; raw docker would mean DIY image upkeep for no gain. Spark 2 stays the speech/embeddings box regardless.
|
||||
- **Next steps (all low-priority / externally gated; P2/P3 tech-debt backlog in `ROADMAP.md`):** (1) raw-`docker run` swap generalization — **DEFERRED** (rationale in ROADMAP; revisit only if an adopter wants Spark Control to *drive*, not just monitor, raw-docker swaps — cleanest fix is the adopter adopting `launch-cluster.sh`). (2) audio concurrency knee — only if the Signal Engine dev wants it (needs a quiet window). (3) matrix-bridge Docker `HEALTHCHECK` — only if the bot dev asks. (4) Parakeet long-audio guard — deferred (rationale in ROADMAP).
|
||||
|
||||
+17
-4
@@ -12,10 +12,23 @@ Sequenced:
|
||||
1. **Configurable `VLLM_PORT`** — DONE, v0.22.0:0. Field in Configure Sparks (blank ⇒ 8888); numeric-setting parsing hardened so a blank/bad value falls back instead of crashing startup. Was the immediate "vLLM unreachable" bug for an adopter on port 8000.
|
||||
2. **Local-path / fine-tuned model support** — DONE, v0.23.0:0. Catalog/`ModelDef` gained `local_path` (exactly one of `repo`/`local_path`); swap bind-mounts the dir into the vLLM container at the same path via the launch script's `VLLM_SPARK_EXTRA_DOCKER_ARGS` hook (no `launch-cluster.sh` change); "+ Add local model" form + `local` badge; disk-delete refused for local models; `validate_local_path` boundary check. His merged `ten31-v2` was the motivating case.
|
||||
3. **Configurable topology** — DONE, v0.24.0:0. Three optional Configure-Sparks knobs: vLLM container name (`VLLM_CONTAINER`, blank ⇒ `vllm_node`; threaded through the swap log-tail + pre-flight validator via `quote_arg`); "services to hide" (`DISABLED_SERVICES`, comma list — hidden services show no tile and are skipped by status/deep-health/connectivity probes, killing the Parakeet-on-8000 collision); and a second-Spark vLLM monitor via a `kind: vllm` custom service in `services-overrides.yaml` (read-only tile probed through the shared `probe_vllm_endpoint`). `/api/endpoints` gained a `disabled` flag. Covers report P4/P5/#6. (Generalizing the *swap* mechanism to the adopter's raw `docker run` was deliberately left out — that's coordination, item 4; he swaps via his own crons and uses Spark Control to monitor.)
|
||||
4. **Coordination layer** — build when our own automation actually lands (zero value until something other than the dashboard swaps models):
|
||||
- **Swap lock** with holder + TTL (`POST` / `GET` / `DELETE /api/swap/lock`). An external scheduler acquires it before swapping; the dashboard then refuses manual swaps and shows who holds the GPU and until when. Enforced by the swap path, not advisory.
|
||||
- **Swap-event webhook** (`swap_complete` / `swap_failed`) to a configurable URL, so downstream consumers update their provider config when the running model changes.
|
||||
- **Schedule visibility** — read-only view the dashboard surfaces, *registered by* external schedulers (Spark Control does not own the schedule).
|
||||
4. **Coordination layer** — DONE in tree, staged as **v0.25.0:0** (built/typechecked clean; install pending). All three primitives shipped; `image/app/coordination.py` + `docs/COORDINATION.md`. Brought forward 2026-06-17 on request rather than waiting for our own automation.
|
||||
- **Swap lock** with holder + TTL (`POST` / `GET` / `DELETE /api/swap/lock`). Acquire returns a secret token; the swap endpoint refuses any real swap (`423`) that doesn't present it in `X-Swap-Lock-Token`, so the dashboard's manual swap is paused while a scheduler holds it (with a `?force=true` human override). In-memory + TTL-bounded → resets to unlocked on restart; re-acquire with the token extends. Enforced in `post_swap`, not advisory.
|
||||
- **Swap-event webhook** (`swap_complete` / `swap_failed`) to a configurable URL (Configure-Sparks field), fired from `SwapManager._run` *outside* the swap lock; optional shared secret ⇒ `X-Spark-Signature` HMAC. Fire-and-forget (5 s, no retries); dry runs don't fire.
|
||||
- **Schedule visibility** — `GET/POST/DELETE /api/schedule`; read-only "Scheduled jobs" dashboard panel, registered by external schedulers. Spark Control stores and displays, never executes.
|
||||
- Tests: `image/tests/test_coordination.py` (22 cases — lock lifecycle/expiry/token, the single-read swap gate, schedule CRUD + id validation, webhook payload+signature). Known limit: lock + schedules are in-memory (a restart frees the lock and empties the registry until schedulers re-register) — persist to `/data` only if that bites.
|
||||
|
||||
### Generalizing the swap mechanism to raw `docker run` — DEFERRED (decided 2026-06-18, research-backed; was item 4's last open thread)
|
||||
|
||||
Our swap drives `~/spark-vllm-docker/launch-cluster.sh` over SSH on Spark 1 (`./launch-cluster.sh stop`, then `[VLLM_SPARK_EXTRA_DOCKER_ARGS=…] ./launch-cluster.sh [--solo ]-d exec vllm serve <model> <args>`, then `docker logs -f` until the ready marker). The OpenClaw adopter launches vLLM with a plain `docker run` instead, so the swap button can't drive his cluster — only monitor it. The portability fix would be a configurable "swap backend": keep `launch-cluster.sh` as the default and add a "bring your own command" mode (operator-authored stop/launch templates in `services-overrides.yaml` with quoted `{model}`/`{container}`/`{port}`/`{extra_args}` substitution; ready-detection unchanged; the vLLM-argparse pre-flight disabled for that backend).
|
||||
|
||||
**Why deferred, not built:**
|
||||
- **Raw docker is not an upgrade for *us* — for half our catalog it's impossible.** `launch-cluster.sh` is the `eugr/spark-vllm-docker` community project (de-facto DGX Spark standard; mirrors NVIDIA's own `dgx-spark-playbooks` Ray+RDMA architecture). Its headline job is **multi-node** serving: our 235B `cluster` models (Qwen3-VL 235B, Qwen3 235B) exceed one Spark's 128 GB and *must* shard across both Sparks via Ray over the 200 Gbps ConnectX/RoCE link — plumbing (NCCL/MTU/per-node env) that a single-node `docker run` cannot do. So we keep the helper script; switching our own cluster to raw docker is off the table.
|
||||
- **The feature is therefore portability-only** (for differently-wired adopters), and the one known adopter doesn't need it — he swaps via his own crons and uses Spark Control to watch.
|
||||
- **Untestable on our hardware** — our cluster uses the helper script, so we can't validate a real raw-docker swap without risking the live vLLM.
|
||||
- The one real standing risk is eugr's single-maintainer status; fallback is community forks or migrating to NVIDIA's official `dgx-spark-playbooks` launcher (same design). No reason to switch now.
|
||||
|
||||
**Revisit only if** an adopter explicitly wants Spark Control to *drive* (not just monitor) swaps on a raw-`docker run` cluster. At that point, get their actual working `docker run` command and build the command-template backend to it.
|
||||
|
||||
## Near term
|
||||
- parakeet-asr long-audio memory guard — **deferred 2026-06-15, low priority.** A duration cap on `/v1/audio/diarize`: Sortformer runs the whole file in one pass (`diarizer.py:128-135`) over Spark 2's *shared* 128 GB unified memory (also feeding Kokoro/embeddings/Qdrant), so one giant single file can thrash into swap. **Precautionary — no observed incident**, and the production consumer (Recap Relay) already chunks via `/diarize-chunk` (~5-min, already bounded), so the only exposed path is a consumer POSTing one huge file to the full `/diarize`. When picked up: add a configurable `MAX_DIARIZE_SECONDS` guard in `diarizer.py` right after `duration` is computed (~line 130) → raise → HTTP 413 in `main.py` (mirrors the existing `MAX_UPLOAD_MB` 413); ship via the Reapply-patches action (restarts the live parakeet-asr container → needs go/no-go). Leave transcription out of v1 (upstream/un-patched file; parakeet-TDT handles long audio better). Revisit only if a consumer starts sending long single files.
|
||||
|
||||
@@ -0,0 +1,157 @@
|
||||
# Cluster coordination through Spark Control (v0.25.0)
|
||||
|
||||
Spark Control is the **GPU arbiter, not a job runner.** Your recurring pipelines
|
||||
(model-warming crons, "daily X" generators, batch jobs) live in your own
|
||||
services and *drive Spark Control's swap API*. This page documents the safety
|
||||
layer around that: a **swap reservation lock**, a **swap-event webhook**, and a
|
||||
**read-only schedule registry**.
|
||||
|
||||
If only the dashboard ever swaps models, you don't need any of this — it's for
|
||||
when something automated also swaps.
|
||||
|
||||
All endpoints are on the Spark Control host (same LAN/VPN URL as the LLM, audio,
|
||||
and embeddings proxies). There is no API-token auth by design (LAN + split-tunnel
|
||||
VPN only); a non-browser client passes the same-origin guard automatically.
|
||||
|
||||
---
|
||||
|
||||
## 1. Swap reservation lock
|
||||
|
||||
A short, TTL-bounded reservation of the swap path. While a lock is held, **any
|
||||
real swap that doesn't present the holder's token is refused with `423 Locked`**
|
||||
— including the dashboard's manual swap. The holder *name* is descriptive; the
|
||||
returned **token** is the secret that authorises swaps and the release.
|
||||
|
||||
The lock is in-memory: it resets to *unlocked* if Spark Control restarts (the
|
||||
safe-for-availability default), and the swap engine's own in-progress guard
|
||||
still prevents two swaps running at once.
|
||||
|
||||
### `POST /api/swap/lock` — acquire (or extend)
|
||||
|
||||
```json
|
||||
// request
|
||||
{ "holder": "openclaw-daily-vol", "ttl_seconds": 900, "note": "daily vol run" }
|
||||
|
||||
// 200 response
|
||||
{
|
||||
"held": true,
|
||||
"holder": "openclaw-daily-vol",
|
||||
"acquired_at": "2026-06-17T12:00:00+00:00",
|
||||
"expires_at": "2026-06-17T12:15:00+00:00",
|
||||
"seconds_remaining": 900,
|
||||
"note": "daily vol run",
|
||||
"token": "a1b2c3…" // SECRET — store it; needed to swap and to release
|
||||
}
|
||||
```
|
||||
|
||||
- `ttl_seconds` is optional (default 900) and clamped to `[1, 86400]`.
|
||||
- **`409`** if a *different* holder already holds it (body includes the current
|
||||
`lock` state). To **extend** your own lock, POST again with the same `holder`
|
||||
**and** your `token` — the token is preserved and the window slides forward.
|
||||
|
||||
### `GET /api/swap/lock` — status (no token)
|
||||
|
||||
```json
|
||||
{ "held": true, "holder": "openclaw-daily-vol", "expires_at": "…", "seconds_remaining": 612, "note": "…" }
|
||||
// or
|
||||
{ "held": false }
|
||||
```
|
||||
|
||||
### `DELETE /api/swap/lock` — release
|
||||
|
||||
Send your token in the `X-Swap-Lock-Token` header (or `?token=`):
|
||||
|
||||
```
|
||||
DELETE /api/swap/lock
|
||||
X-Swap-Lock-Token: a1b2c3…
|
||||
```
|
||||
|
||||
- **`403`** if the token doesn't match. The dashboard's human override is
|
||||
`DELETE /api/swap/lock?force=true` (no token).
|
||||
|
||||
### Swapping while you hold the lock
|
||||
|
||||
Pass the token on the swap call; the dashboard (no token) is then blocked:
|
||||
|
||||
```
|
||||
POST /api/swap
|
||||
X-Swap-Lock-Token: a1b2c3…
|
||||
{ "model_key": "gemma-3-27b" }
|
||||
```
|
||||
|
||||
Recommended scheduler flow: **acquire → swap (with token) → poll `/api/swap/{id}`
|
||||
→ release**. Always release in a `finally`; if you crash, the TTL frees it.
|
||||
|
||||
> `POST /api/swap/{key}/validate` (pre-flight) and dry-run swaps are **not**
|
||||
> blocked by the lock — they don't touch the cluster.
|
||||
|
||||
---
|
||||
|
||||
## 2. Swap-event webhook
|
||||
|
||||
Configure a URL in **Configure Sparks → "Swap webhook URL"**. After every real
|
||||
swap, Spark Control POSTs:
|
||||
|
||||
```json
|
||||
{
|
||||
"event": "swap_complete", // or "swap_failed"
|
||||
"job_id": "1a2b3c4d",
|
||||
"model_key": "gemma-3-27b",
|
||||
"state": "ready", // or "failed"
|
||||
"returncode": 0,
|
||||
"started_at": "2026-06-17T12:00:00+00:00",
|
||||
"finished_at": "2026-06-17T12:03:11+00:00",
|
||||
"dry_run": false
|
||||
}
|
||||
```
|
||||
|
||||
Headers: `X-Spark-Event: swap_complete`. If you set a **webhook secret**, the
|
||||
body is signed: `X-Spark-Signature: sha256=<hmac>` (HMAC-SHA256 of the raw body
|
||||
with the shared secret). Verify it like:
|
||||
|
||||
```python
|
||||
import hmac, hashlib
|
||||
expected = "sha256=" + hmac.new(secret.encode(), raw_body, hashlib.sha256).hexdigest()
|
||||
assert hmac.compare_digest(expected, request.headers["X-Spark-Signature"])
|
||||
```
|
||||
|
||||
Delivery is best-effort and fire-and-forget (5 s timeout, no retries) — a
|
||||
webhook failure never affects the swap itself. Dry runs don't fire.
|
||||
|
||||
---
|
||||
|
||||
## 3. Schedule registry (read-only display)
|
||||
|
||||
So the dashboard can show *what's scheduled to touch the GPU and when*, your
|
||||
schedulers register their jobs here. **Spark Control only displays these — it
|
||||
never executes them.**
|
||||
|
||||
### `POST /api/schedule` — register / update
|
||||
|
||||
```json
|
||||
// request (pass a stable `id` to update in place on re-register)
|
||||
{ "id": "daily-vol", "name": "Daily Vol", "owner": "openclaw",
|
||||
"cron": "0 6 * * *", "next_run": "2026-06-18T06:00:00Z",
|
||||
"description": "Swaps to the big model, generates the vol report" }
|
||||
|
||||
// response: the stored entry (generates an id if you omit one)
|
||||
```
|
||||
|
||||
`name` is required; `id` (if given) must match `[A-Za-z0-9_.-]` (≤64 chars).
|
||||
|
||||
### `GET /api/schedule` — list
|
||||
|
||||
```json
|
||||
{ "schedules": [ { "id": "daily-vol", "name": "Daily Vol", "owner": "openclaw",
|
||||
"cron": "0 6 * * *", "next_run": "…", "description": "…",
|
||||
"registered_at": "…", "updated_at": "…" } ] }
|
||||
```
|
||||
|
||||
### `DELETE /api/schedule/{id}` — deregister
|
||||
|
||||
```json
|
||||
{ "deleted": true }
|
||||
```
|
||||
|
||||
The registry is in-memory — re-register your schedules on your own startup so
|
||||
they survive a Spark Control restart.
|
||||
@@ -103,6 +103,8 @@ class Settings:
|
||||
bind_port: int
|
||||
open_webui_url: str
|
||||
ngc_api_key: str
|
||||
swap_webhook_url: str
|
||||
swap_webhook_secret: str
|
||||
|
||||
@classmethod
|
||||
def from_env(cls) -> "Settings":
|
||||
@@ -165,6 +167,11 @@ class Settings:
|
||||
bind_port=_env_int("BIND_PORT", 9999),
|
||||
open_webui_url=_env("OPEN_WEBUI_URL", ""),
|
||||
ngc_api_key=_env("NGC_API_KEY", ""),
|
||||
# Coordination layer: fire a swap-lifecycle webhook to this URL so
|
||||
# downstream consumers re-point their model config on a swap. Blank
|
||||
# ⇒ disabled. The optional secret HMAC-signs the body (X-Spark-Signature).
|
||||
swap_webhook_url=_env("SWAP_WEBHOOK_URL", ""),
|
||||
swap_webhook_secret=_env("SWAP_WEBHOOK_SECRET", ""),
|
||||
)
|
||||
|
||||
@property
|
||||
|
||||
@@ -0,0 +1,342 @@
|
||||
"""Cluster-coordination layer: the GPU swap lock, swap-event webhook, and the
|
||||
read-only schedule registry.
|
||||
|
||||
Spark Control is the **control plane / GPU arbiter, not a job runner.** Recurring
|
||||
business pipelines live in separate services that *call* the swap API. These
|
||||
three primitives add the *safety* layer around that:
|
||||
|
||||
- **Swap lock** — a TTL-bounded reservation of the swap path. An external
|
||||
scheduler acquires it before swapping; while held by someone else the
|
||||
dashboard's manual swap is refused (enforced in the swap endpoint, not
|
||||
advisory). Holder name is descriptive; the returned token is the secret that
|
||||
authorises a swap or a release.
|
||||
- **Webhook** — fires `swap_complete` / `swap_failed` to a configurable URL so
|
||||
downstream consumers re-point their provider config when the running model
|
||||
changes. Optionally HMAC-signed.
|
||||
- **Schedule registry** — a read-only view the dashboard surfaces, *registered
|
||||
by* external schedulers. Spark Control stores what it's told; it does not own
|
||||
or execute any schedule.
|
||||
|
||||
All state is in-memory (mirroring the swap/download/NIM job managers). On a
|
||||
restart the lock resets to *unlocked* — the available-by-default failure mode;
|
||||
the swap manager's own in-progress guard still prevents two swaps at once —
|
||||
and schedulers re-register their schedules.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# A lock reserves the GPU for a window; clamp the TTL so a buggy client can
|
||||
# neither pin the cluster forever nor take a zero-length (useless) lock.
|
||||
LOCK_TTL_MIN = 1
|
||||
LOCK_TTL_MAX = 86_400 # 24h
|
||||
LOCK_TTL_DEFAULT = 900 # 15 min
|
||||
|
||||
# Schedule ids are reflected to the dashboard and used as a URL path segment on
|
||||
# delete, so a caller-supplied id is whitelist-checked. Generated ids are hex.
|
||||
_SCHEDULE_ID_RE = re.compile(r"^[A-Za-z0-9_.-]{1,64}$")
|
||||
|
||||
|
||||
def valid_schedule_id(value: str) -> bool:
|
||||
"""Whitelist check for a caller-supplied schedule id (register and delete)."""
|
||||
return bool(_SCHEDULE_ID_RE.match(value or ""))
|
||||
|
||||
|
||||
def _now() -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
def _iso(dt: datetime) -> str:
|
||||
return dt.isoformat()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------- swap lock ----
|
||||
|
||||
class LockHeld(Exception):
|
||||
"""The lock is held by a different holder. Carries the public lock state so
|
||||
the endpoint can return holder + expiry in the 409 body."""
|
||||
|
||||
def __init__(self, state: dict) -> None:
|
||||
self.state = state
|
||||
super().__init__("swap lock is held by another holder")
|
||||
|
||||
|
||||
@dataclass
|
||||
class LockState:
|
||||
holder: str
|
||||
token: str
|
||||
acquired_at: datetime
|
||||
expires_at: datetime
|
||||
note: str = ""
|
||||
|
||||
def public(self, now: datetime) -> dict:
|
||||
"""Token-free view safe to expose on GET / in error bodies."""
|
||||
return {
|
||||
"held": True,
|
||||
"holder": self.holder,
|
||||
"acquired_at": _iso(self.acquired_at),
|
||||
"expires_at": _iso(self.expires_at),
|
||||
"seconds_remaining": max(0, int((self.expires_at - now).total_seconds())),
|
||||
"note": self.note,
|
||||
}
|
||||
|
||||
|
||||
class SwapLockManager:
|
||||
"""In-memory, TTL-bounded reservation of the GPU swap path.
|
||||
|
||||
`now` is injectable on every method purely so the expiry logic is testable
|
||||
without sleeping; production calls omit it and get wall-clock UTC.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._lock: Optional[LockState] = None
|
||||
|
||||
def _active(self, now: Optional[datetime] = None) -> Optional[LockState]:
|
||||
"""The current lock if one is held and unexpired; lazily clears an
|
||||
expired lock so it never lingers."""
|
||||
now = now or _now()
|
||||
if self._lock is not None and self._lock.expires_at <= now:
|
||||
self._lock = None
|
||||
return self._lock
|
||||
|
||||
def status(self, now: Optional[datetime] = None) -> dict:
|
||||
now = now or _now()
|
||||
active = self._active(now)
|
||||
return active.public(now) if active else {"held": False}
|
||||
|
||||
def acquire(
|
||||
self,
|
||||
holder: str,
|
||||
ttl_seconds: Optional[int] = None,
|
||||
note: str = "",
|
||||
token: Optional[str] = None,
|
||||
*,
|
||||
now: Optional[datetime] = None,
|
||||
) -> LockState:
|
||||
"""Acquire a free lock (new token), or extend one already held by
|
||||
presenting its token. A request without the token is refused even if the
|
||||
holder name matches — the name is descriptive, the token is the secret.
|
||||
"""
|
||||
now = now or _now()
|
||||
holder = (holder or "").strip()
|
||||
if not holder:
|
||||
raise ValueError("holder is required")
|
||||
ttl = ttl_seconds if ttl_seconds is not None else LOCK_TTL_DEFAULT
|
||||
try:
|
||||
ttl = int(ttl)
|
||||
except (TypeError, ValueError):
|
||||
ttl = LOCK_TTL_DEFAULT
|
||||
ttl = max(LOCK_TTL_MIN, min(LOCK_TTL_MAX, ttl))
|
||||
|
||||
active = self._active(now)
|
||||
if active is not None:
|
||||
# Held — only the token-holder may extend/re-acquire.
|
||||
if not (token and hmac.compare_digest(active.token, token)):
|
||||
raise LockHeld(active.public(now))
|
||||
self._lock = LockState(
|
||||
holder=holder or active.holder,
|
||||
token=active.token,
|
||||
acquired_at=active.acquired_at,
|
||||
expires_at=now + timedelta(seconds=ttl),
|
||||
note=note or active.note,
|
||||
)
|
||||
return self._lock
|
||||
|
||||
self._lock = LockState(
|
||||
holder=holder,
|
||||
token=uuid.uuid4().hex,
|
||||
acquired_at=now,
|
||||
expires_at=now + timedelta(seconds=ttl),
|
||||
note=note,
|
||||
)
|
||||
return self._lock
|
||||
|
||||
def verify(self, token: Optional[str], now: Optional[datetime] = None) -> bool:
|
||||
"""True iff `token` matches the currently-active lock."""
|
||||
active = self._active(now)
|
||||
return bool(active and token and hmac.compare_digest(active.token, token))
|
||||
|
||||
def is_blocked_by(self, token: Optional[str], now: Optional[datetime] = None) -> Optional[dict]:
|
||||
"""Single-read swap gate. Returns the public lock state if an active
|
||||
lock blocks a swap carrying this token, else None. Does exactly one
|
||||
`_active()` read so the decision can't straddle a TTL expiry the way a
|
||||
separate status()+verify() pair could (which, at the expiry tick, would
|
||||
spuriously refuse a swap that should now be allowed)."""
|
||||
now = now or _now()
|
||||
active = self._active(now)
|
||||
if active is None:
|
||||
return None
|
||||
if token and hmac.compare_digest(active.token, token):
|
||||
return None
|
||||
return active.public(now)
|
||||
|
||||
def release(
|
||||
self,
|
||||
token: Optional[str] = None,
|
||||
*,
|
||||
force: bool = False,
|
||||
now: Optional[datetime] = None,
|
||||
) -> bool:
|
||||
"""Release the lock. Returns False if nothing was held. Requires the
|
||||
matching token unless `force` (the human override from the dashboard)."""
|
||||
active = self._active(now)
|
||||
if active is None:
|
||||
return False
|
||||
if not force and not self.verify(token, now):
|
||||
raise PermissionError("token does not hold the lock")
|
||||
self._lock = None
|
||||
return True
|
||||
|
||||
|
||||
# ----------------------------------------------------------------- webhook ----
|
||||
|
||||
def build_webhook_payload(
|
||||
*,
|
||||
event: str,
|
||||
job_id: str,
|
||||
model_key: str,
|
||||
state: str,
|
||||
returncode: Optional[int],
|
||||
started_at: Optional[str],
|
||||
finished_at: Optional[str],
|
||||
dry_run: bool,
|
||||
) -> dict:
|
||||
return {
|
||||
"event": event, # swap_complete | swap_failed
|
||||
"job_id": job_id,
|
||||
"model_key": model_key,
|
||||
"state": state,
|
||||
"returncode": returncode,
|
||||
"started_at": started_at,
|
||||
"finished_at": finished_at,
|
||||
"dry_run": dry_run,
|
||||
}
|
||||
|
||||
|
||||
def sign_payload(secret: str, body: bytes) -> str:
|
||||
"""`X-Spark-Signature` value: sha256 HMAC of the exact JSON body the
|
||||
consumer receives, so they can recompute and trust it."""
|
||||
return "sha256=" + hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
|
||||
|
||||
|
||||
class WebhookNotifier:
|
||||
"""Fire-and-forget POST of swap-lifecycle events. A webhook failure is
|
||||
logged and swallowed — it must never affect the swap outcome."""
|
||||
|
||||
def __init__(self, url: str, secret: str = "", timeout: float = 5.0) -> None:
|
||||
self.url = (url or "").strip()
|
||||
self.secret = secret or ""
|
||||
self.timeout = timeout
|
||||
|
||||
@property
|
||||
def enabled(self) -> bool:
|
||||
return bool(self.url)
|
||||
|
||||
async def fire(self, event: str, payload: dict) -> None:
|
||||
if not self.enabled:
|
||||
return
|
||||
body = json.dumps(payload).encode()
|
||||
headers = {
|
||||
"content-type": "application/json",
|
||||
"user-agent": "spark-control-webhook",
|
||||
"x-spark-event": event,
|
||||
}
|
||||
if self.secret:
|
||||
headers["x-spark-signature"] = sign_payload(self.secret, body)
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
await client.post(self.url, content=body, headers=headers)
|
||||
except Exception as e: # noqa: BLE001 — best-effort, never propagate
|
||||
log.warning("swap webhook to %s failed: %s", self.url, e)
|
||||
|
||||
|
||||
# -------------------------------------------------------- schedule registry ----
|
||||
|
||||
@dataclass
|
||||
class ScheduleEntry:
|
||||
id: str
|
||||
name: str
|
||||
owner: str = ""
|
||||
cron: str = ""
|
||||
next_run: str = ""
|
||||
description: str = ""
|
||||
registered_at: str = ""
|
||||
updated_at: str = ""
|
||||
|
||||
def public(self) -> dict:
|
||||
return {
|
||||
"id": self.id,
|
||||
"name": self.name,
|
||||
"owner": self.owner,
|
||||
"cron": self.cron,
|
||||
"next_run": self.next_run,
|
||||
"description": self.description,
|
||||
"registered_at": self.registered_at,
|
||||
"updated_at": self.updated_at,
|
||||
}
|
||||
|
||||
|
||||
class ScheduleRegistry:
|
||||
"""What external schedulers tell us about their cron jobs. Read-only from the
|
||||
dashboard's side; Spark Control never executes any of it."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._items: dict[str, ScheduleEntry] = {}
|
||||
|
||||
def list(self) -> list[dict]:
|
||||
return [e.public() for e in self._items.values()]
|
||||
|
||||
def register(
|
||||
self,
|
||||
*,
|
||||
name: str,
|
||||
id: Optional[str] = None,
|
||||
owner: str = "",
|
||||
cron: str = "",
|
||||
next_run: str = "",
|
||||
description: str = "",
|
||||
) -> ScheduleEntry:
|
||||
name = (name or "").strip()
|
||||
if not name:
|
||||
raise ValueError("name is required")
|
||||
if id is not None:
|
||||
id = id.strip()
|
||||
if id and not valid_schedule_id(id):
|
||||
raise ValueError("id must match [A-Za-z0-9_.-] (max 64 chars)")
|
||||
ts = _iso(_now())
|
||||
existing = self._items.get(id) if id else None
|
||||
if existing is not None:
|
||||
existing.name = name
|
||||
existing.owner = owner.strip()
|
||||
existing.cron = cron
|
||||
existing.next_run = next_run
|
||||
existing.description = description
|
||||
existing.updated_at = ts
|
||||
return existing
|
||||
sid = id or uuid.uuid4().hex[:8]
|
||||
entry = ScheduleEntry(
|
||||
id=sid,
|
||||
name=name,
|
||||
owner=owner.strip(),
|
||||
cron=cron,
|
||||
next_run=next_run,
|
||||
description=description,
|
||||
registered_at=ts,
|
||||
updated_at=ts,
|
||||
)
|
||||
self._items[sid] = entry
|
||||
return entry
|
||||
|
||||
def delete(self, schedule_id: str) -> bool:
|
||||
return self._items.pop(schedule_id, None) is not None
|
||||
+107
-2
@@ -11,6 +11,7 @@ from typing import Literal
|
||||
|
||||
from .config import Settings
|
||||
from .connectivity import get_mac, record_report, record_state, summary as connectivity_summary
|
||||
from .coordination import LockHeld, ScheduleRegistry, SwapLockManager, WebhookNotifier, valid_schedule_id
|
||||
from .custom_services import add_custom_service, delete_custom_service
|
||||
from .audio_proxy import build_router as build_audio_router
|
||||
from .deep_health import DeepHealth
|
||||
@@ -37,7 +38,12 @@ from .wol import send_local_broadcast, send_via_peer
|
||||
|
||||
settings = Settings.from_env()
|
||||
catalog = load_catalog(settings.models_yaml)
|
||||
swap_manager = SwapManager(settings, catalog)
|
||||
# Coordination layer (GPU arbiter): swap-lifecycle webhook, the swap reservation
|
||||
# lock, and the read-only schedule registry. See coordination.py.
|
||||
swap_webhook = WebhookNotifier(settings.swap_webhook_url, settings.swap_webhook_secret)
|
||||
swap_lock = SwapLockManager()
|
||||
schedule_registry = ScheduleRegistry()
|
||||
swap_manager = SwapManager(settings, catalog, notifier=swap_webhook)
|
||||
download_manager = DownloadManager(settings)
|
||||
update_manager = UpdateManager(settings)
|
||||
hardware_probe = HardwareProbe(settings)
|
||||
@@ -67,6 +73,10 @@ _CSRF_EXEMPT_PREFIXES = (
|
||||
"/api/audio/", # diarize-chunk / label-merge / transcribe-with-speakers
|
||||
"/api/health-event", # health reports posted by consumer apps
|
||||
)
|
||||
# Note: the coordination endpoints (/api/swap/lock, /api/schedule) are
|
||||
# intentionally NOT exempt. External schedulers are non-browser clients (no
|
||||
# Origin header) so they pass the guard already — same as /api/swap — while a
|
||||
# malicious page can't drive them from the operator's browser. Don't add them.
|
||||
|
||||
|
||||
@app.middleware("http")
|
||||
@@ -892,9 +902,21 @@ async def validate_swap(key: str) -> dict:
|
||||
|
||||
|
||||
@app.post("/api/swap")
|
||||
async def post_swap(req: SwapRequest) -> dict:
|
||||
async def post_swap(req: SwapRequest, request: Request) -> dict:
|
||||
if not settings.configured and not req.dry_run:
|
||||
raise HTTPException(503, "spark1 not configured")
|
||||
# Enforce the swap reservation lock (the GPU arbiter). A held lock blocks any
|
||||
# real swap that doesn't present the holder's token in X-Swap-Lock-Token — so
|
||||
# an external scheduler that holds the lock can swap, but the dashboard (no
|
||||
# token) is refused while someone else holds it. Dry runs don't touch the
|
||||
# cluster, so they're exempt.
|
||||
if not req.dry_run:
|
||||
blocked = swap_lock.is_blocked_by(request.headers.get("x-swap-lock-token"))
|
||||
if blocked is not None:
|
||||
raise HTTPException(status_code=423, detail={
|
||||
"error": "the GPU swap path is reserved by another holder",
|
||||
"lock": blocked,
|
||||
})
|
||||
try:
|
||||
job = await swap_manager.trigger(req.model_key, dry_run=req.dry_run)
|
||||
except KeyError:
|
||||
@@ -949,6 +971,89 @@ async def stream_swap(job_id: str):
|
||||
return StreamingResponse(gen(), media_type="text/event-stream")
|
||||
|
||||
|
||||
# ---- Coordination layer: swap lock + schedule registry ----
|
||||
# Endpoints are control-surface, not browser-exempt: an external scheduler is a
|
||||
# non-browser client (no Origin header) so it passes the CSRF guard already, the
|
||||
# same way it calls /api/swap today; the dashboard is same-origin.
|
||||
|
||||
class LockAcquireRequest(BaseModel):
|
||||
holder: str
|
||||
ttl_seconds: int | None = None
|
||||
note: str = ""
|
||||
token: str | None = None # present only to extend an existing hold
|
||||
|
||||
|
||||
@app.post("/api/swap/lock")
|
||||
async def acquire_swap_lock(req: LockAcquireRequest) -> dict:
|
||||
"""Reserve the GPU swap path. Returns a secret token used to swap (header
|
||||
X-Swap-Lock-Token) and to release. 409 if held by another holder."""
|
||||
try:
|
||||
lock = swap_lock.acquire(req.holder, req.ttl_seconds, req.note, token=req.token)
|
||||
except ValueError as e:
|
||||
raise HTTPException(422, str(e))
|
||||
except LockHeld as e:
|
||||
raise HTTPException(status_code=409, detail={
|
||||
"error": "swap lock is held by another holder",
|
||||
"lock": e.state,
|
||||
})
|
||||
return {**swap_lock.status(), "token": lock.token}
|
||||
|
||||
|
||||
@app.get("/api/swap/lock")
|
||||
async def get_swap_lock() -> dict:
|
||||
"""Public, token-free view of the reservation: held? who? until when?"""
|
||||
return swap_lock.status()
|
||||
|
||||
|
||||
@app.delete("/api/swap/lock")
|
||||
async def release_swap_lock(request: Request, force: bool = Query(False)) -> dict:
|
||||
"""Release the reservation. Needs the matching X-Swap-Lock-Token unless
|
||||
?force=true (the human override from the dashboard)."""
|
||||
token = request.headers.get("x-swap-lock-token") or request.query_params.get("token")
|
||||
try:
|
||||
released = swap_lock.release(token, force=force)
|
||||
except PermissionError as e:
|
||||
raise HTTPException(403, str(e))
|
||||
return {"released": released, **swap_lock.status()}
|
||||
|
||||
|
||||
class ScheduleRequest(BaseModel):
|
||||
name: str
|
||||
id: str | None = None
|
||||
owner: str = ""
|
||||
cron: str = ""
|
||||
next_run: str = ""
|
||||
description: str = ""
|
||||
|
||||
|
||||
@app.get("/api/schedule")
|
||||
async def list_schedules() -> dict:
|
||||
return {"schedules": schedule_registry.list()}
|
||||
|
||||
|
||||
@app.post("/api/schedule")
|
||||
async def register_schedule(req: ScheduleRequest) -> dict:
|
||||
"""Register (or update, by id) a schedule an external scheduler owns. Spark
|
||||
Control only stores it for the dashboard — it never executes it."""
|
||||
try:
|
||||
entry = schedule_registry.register(
|
||||
name=req.name, id=req.id, owner=req.owner,
|
||||
cron=req.cron, next_run=req.next_run, description=req.description,
|
||||
)
|
||||
except ValueError as e:
|
||||
raise HTTPException(422, str(e))
|
||||
return entry.public()
|
||||
|
||||
|
||||
@app.delete("/api/schedule/{schedule_id}")
|
||||
async def delete_schedule(schedule_id: str) -> dict:
|
||||
# Whitelist the path segment at the boundary (repo convention), even though
|
||||
# it's only ever a dict key — keeps it from being reflected or logged raw.
|
||||
if not valid_schedule_id(schedule_id):
|
||||
raise HTTPException(422, "invalid schedule id")
|
||||
return {"deleted": schedule_registry.delete(schedule_id)}
|
||||
|
||||
|
||||
class DownloadRequest(BaseModel):
|
||||
repo: str
|
||||
mode: Literal["spark1", "spark2", "cluster"] = "spark1"
|
||||
|
||||
+100
-2
@@ -21,11 +21,19 @@ const state = {
|
||||
deep_health: {},
|
||||
disk_status: {}, // keyed by model key: { on_disk, total_bytes, per_host }
|
||||
disk_status_loaded: false,
|
||||
lock: { held: false }, // GPU swap reservation (coordination layer)
|
||||
schedules: [], // schedules external automation has registered
|
||||
};
|
||||
|
||||
const el = (sel) => document.querySelector(sel);
|
||||
const $$ = (sel) => document.querySelectorAll(sel);
|
||||
|
||||
// ISO timestamp -> local clock string (e.g. "2:45:10 PM"); '' if unparseable.
|
||||
function fmtClock(iso) {
|
||||
const t = Date.parse(iso);
|
||||
return isNaN(t) ? '' : new Date(t).toLocaleTimeString();
|
||||
}
|
||||
|
||||
function escapeHtml(s) {
|
||||
if (s == null) return '';
|
||||
return String(s)
|
||||
@@ -51,6 +59,12 @@ function renderCards() {
|
||||
const root = el('#cards');
|
||||
root.innerHTML = '';
|
||||
const isSwapping = !!state.swap_job_id;
|
||||
// GPU reserved by external automation — manual swaps are refused server-side
|
||||
// (423); reflect that in the buttons so the click never bounces.
|
||||
const locked = !!(state.lock && state.lock.held);
|
||||
const lockTip = locked
|
||||
? `Reserved by ${state.lock.holder || 'automation'}${state.lock.expires_at ? ' until ' + fmtClock(state.lock.expires_at) : ''}`
|
||||
: '';
|
||||
for (const key of Object.keys(state.models)) {
|
||||
const m = state.models[key];
|
||||
const isActive = key === state.current_model_key;
|
||||
@@ -94,7 +108,9 @@ function renderCards() {
|
||||
if (isActive) {
|
||||
primaryBtn = `<button class="btn" disabled>Current</button>`;
|
||||
} else if (isOnDisk) {
|
||||
primaryBtn = `<button class="btn primary" data-swap-key="${key}" ${isSwapping ? 'disabled' : ''}>Switch to this</button>`;
|
||||
const swapBlocked = isSwapping || locked;
|
||||
const tip = locked ? ` title="${escapeHtml(lockTip)}"` : '';
|
||||
primaryBtn = `<button class="btn primary" data-swap-key="${key}"${tip} ${swapBlocked ? 'disabled' : ''}>Switch to this</button>`;
|
||||
} else if (m.local_path) {
|
||||
// A local model can't be "downloaded" — its directory has to exist on the Spark.
|
||||
primaryBtn = `<button class="btn" disabled title="Directory not found on the Spark — create it there, then refresh">Not found on Spark</button>`;
|
||||
@@ -1234,6 +1250,11 @@ function openDiskDeleteDialog(key) {
|
||||
|
||||
async function triggerSwap(modelKey) {
|
||||
if (state.swap_job_id) return;
|
||||
if (state.lock && state.lock.held) {
|
||||
const until = state.lock.expires_at ? ' until ' + fmtClock(state.lock.expires_at) : '';
|
||||
alert(`The GPU swap path is reserved by ${state.lock.holder || 'automation'}${until}. Use "Release" on the reservation banner to override.`);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const r = await fetchJSON('/api/swap', {
|
||||
method: 'POST',
|
||||
@@ -1242,10 +1263,84 @@ async function triggerSwap(modelKey) {
|
||||
});
|
||||
attachToSwap(r.job_id, /*needsBackfill=*/false);
|
||||
} catch (e) {
|
||||
alert('Failed to start swap: ' + e.message);
|
||||
// 423 Locked: a reservation was acquired between our last poll and this click.
|
||||
if (e.message && e.message.startsWith('423')) {
|
||||
alert('The GPU swap path was just reserved by automation. Refreshing…');
|
||||
pollCoordination();
|
||||
} else {
|
||||
alert('Failed to start swap: ' + e.message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---- coordination layer: swap lock + schedule registry ----
|
||||
|
||||
async function pollCoordination() {
|
||||
try {
|
||||
state.lock = await fetchJSON('/api/swap/lock');
|
||||
} catch { state.lock = { held: false }; }
|
||||
try {
|
||||
const r = await fetchJSON('/api/schedule');
|
||||
state.schedules = r.schedules || [];
|
||||
} catch { state.schedules = []; }
|
||||
renderLockBanner();
|
||||
renderSchedules();
|
||||
renderCards(); // reflect lock state on the swap buttons
|
||||
}
|
||||
|
||||
function renderLockBanner() {
|
||||
const banner = el('#lock-banner');
|
||||
if (!banner) return;
|
||||
const lock = state.lock;
|
||||
if (lock && lock.held) {
|
||||
const until = lock.expires_at ? ` until ${fmtClock(lock.expires_at)}` : '';
|
||||
const note = lock.note ? ` — ${escapeHtml(lock.note)}` : '';
|
||||
el('#lock-text').innerHTML =
|
||||
`GPU swap path reserved by <strong>${escapeHtml(lock.holder || 'automation')}</strong>${until}${note}. Manual swaps are paused.`;
|
||||
banner.classList.remove('hidden');
|
||||
} else {
|
||||
banner.classList.add('hidden');
|
||||
}
|
||||
}
|
||||
|
||||
function renderSchedules() {
|
||||
const panel = el('#schedule-panel');
|
||||
const list = el('#schedule-list');
|
||||
if (!panel || !list) return;
|
||||
const items = state.schedules || [];
|
||||
if (!items.length) {
|
||||
panel.classList.add('hidden');
|
||||
list.innerHTML = '';
|
||||
return;
|
||||
}
|
||||
list.innerHTML = items.map((s) => {
|
||||
const meta = [
|
||||
s.cron ? `<code>${escapeHtml(s.cron)}</code>` : '',
|
||||
s.next_run ? `next: ${escapeHtml(s.next_run)}` : '',
|
||||
s.owner ? `by ${escapeHtml(s.owner)}` : '',
|
||||
].filter(Boolean).join(' · ');
|
||||
const desc = s.description ? `<div class="desc">${escapeHtml(s.description)}</div>` : '';
|
||||
return `<div class="schedule-item">
|
||||
<div class="name">${escapeHtml(s.name)}</div>
|
||||
<div class="muted small">${meta}</div>
|
||||
${desc}
|
||||
</div>`;
|
||||
}).join('');
|
||||
panel.classList.remove('hidden');
|
||||
}
|
||||
|
||||
async function releaseLock() {
|
||||
const lock = state.lock || {};
|
||||
const who = lock.holder || 'automation';
|
||||
if (!confirm(`Force-release the GPU reservation held by ${who}? Any job relying on it may then collide with a manual swap.`)) return;
|
||||
try {
|
||||
await fetchJSON('/api/swap/lock?force=true', { method: 'DELETE' });
|
||||
} catch (e) {
|
||||
alert('Failed to release: ' + e.message);
|
||||
}
|
||||
pollCoordination();
|
||||
}
|
||||
|
||||
async function triggerDownloadForKey(modelKey) {
|
||||
const m = state.models[modelKey];
|
||||
if (!m) return;
|
||||
@@ -2102,6 +2197,7 @@ async function init() {
|
||||
});
|
||||
el('#sshkey-close').addEventListener('click', () => el('#sshkey-dialog').close());
|
||||
el('#open-local').addEventListener('click', openLocalModelDialog);
|
||||
el('#lock-release').addEventListener('click', releaseLock);
|
||||
setupCatalogDialog();
|
||||
setupAdvancedDialog();
|
||||
setupLocalModelDialog();
|
||||
@@ -2119,6 +2215,7 @@ async function init() {
|
||||
await loadModels();
|
||||
await pollStatus();
|
||||
await renderServices();
|
||||
pollCoordination();
|
||||
pollHardware();
|
||||
pollUpdates();
|
||||
// Disk-status probe runs after first paint — slow over SSH and not blocking.
|
||||
@@ -2126,6 +2223,7 @@ async function init() {
|
||||
// Speech-model patches panel — slow over SSH, runs after first paint.
|
||||
renderSpeechModels();
|
||||
setInterval(pollStatus, 5000);
|
||||
setInterval(pollCoordination, 5000); // swap lock + schedule registry
|
||||
setInterval(pollHardware, 8000); // every 8s
|
||||
setInterval(pollUpdates, 300000); // every 5 min
|
||||
setInterval(loadDiskStatus, 60000); // every 60s — disk state changes rarely
|
||||
|
||||
@@ -96,6 +96,13 @@
|
||||
</details>
|
||||
</section>
|
||||
|
||||
<section id="lock-banner" class="banner lock-banner hidden">
|
||||
<span class="lock-icon" aria-hidden="true">🔒</span>
|
||||
<span id="lock-text">GPU swap path reserved</span>
|
||||
<span class="spacer"></span>
|
||||
<button id="lock-release" class="btn small-btn">Release</button>
|
||||
</section>
|
||||
|
||||
<nav id="dashboard-tabs" class="dashboard-tabs hidden" role="tablist">
|
||||
<button type="button" class="dashboard-tab" data-tab="llm" role="tab" aria-selected="true">LLM</button>
|
||||
<button type="button" class="dashboard-tab" data-tab="audio" role="tab" aria-selected="false">Audio / Speech</button>
|
||||
@@ -394,6 +401,14 @@
|
||||
<section id="cards" class="cards"></section>
|
||||
</section>
|
||||
|
||||
<section id="schedule-panel" class="schedule-panel hidden">
|
||||
<div class="section-header">
|
||||
<h2 class="section-title">Scheduled jobs</h2>
|
||||
</div>
|
||||
<p class="muted small">Registered by your own automation. Spark Control only displays these — it doesn't run them.</p>
|
||||
<div id="schedule-list" class="schedule-list"></div>
|
||||
</section>
|
||||
|
||||
<section id="update-banner" class="update-banner hidden">
|
||||
<div class="ub-context muted small">
|
||||
Updates to <strong><a href="https://github.com/eugr/spark-vllm-docker" target="_blank" rel="noopener">eugr/spark-vllm-docker</a></strong>
|
||||
|
||||
@@ -74,6 +74,42 @@ main {
|
||||
}
|
||||
.banner em { font-style: normal; background: rgba(245, 158, 11, 0.15); padding: 2px 6px; border-radius: 4px; }
|
||||
|
||||
/* GPU swap reservation (coordination layer) — informational, not a warning. */
|
||||
.lock-banner {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: 10px;
|
||||
border-color: var(--info);
|
||||
color: var(--info);
|
||||
}
|
||||
.lock-banner .lock-icon { font-size: 16px; }
|
||||
.lock-banner strong { color: var(--text); }
|
||||
.lock-banner .spacer { flex: 1; }
|
||||
|
||||
/* Scheduled-jobs panel — read-only view of what external automation registered. */
|
||||
.schedule-panel { margin-top: 8px; }
|
||||
.schedule-list {
|
||||
display: grid;
|
||||
grid-template-columns: repeat(auto-fill, minmax(240px, 1fr));
|
||||
gap: 12px;
|
||||
margin-top: 8px;
|
||||
}
|
||||
.schedule-item {
|
||||
background: var(--surface);
|
||||
border: 1px solid var(--border);
|
||||
border-radius: var(--radius);
|
||||
padding: 12px 14px;
|
||||
}
|
||||
.schedule-item .name { font-weight: 600; margin-bottom: 4px; }
|
||||
.schedule-item code {
|
||||
background: var(--surface-2);
|
||||
border: 1px solid var(--border);
|
||||
border-radius: 4px;
|
||||
padding: 1px 5px;
|
||||
font-size: 12px;
|
||||
}
|
||||
.schedule-item .desc { margin-top: 6px; color: var(--muted); font-size: 13px; }
|
||||
|
||||
/* ===== Endpoint panel ===== */
|
||||
|
||||
.endpoint-panel {
|
||||
|
||||
+23
-1
@@ -6,6 +6,7 @@ from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
|
||||
from .config import Settings
|
||||
from .coordination import WebhookNotifier, build_webhook_payload
|
||||
from .models import Catalog, build_launch_command
|
||||
from .shellsafe import quote_arg
|
||||
from .ssh import ssh_run, ssh_stream, StreamHandle
|
||||
@@ -33,9 +34,15 @@ class SwapJob:
|
||||
|
||||
|
||||
class SwapManager:
|
||||
def __init__(self, settings: Settings, catalog: Catalog) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
settings: Settings,
|
||||
catalog: Catalog,
|
||||
notifier: Optional[WebhookNotifier] = None,
|
||||
) -> None:
|
||||
self.settings = settings
|
||||
self.catalog = catalog
|
||||
self.notifier = notifier
|
||||
self.lock = asyncio.Lock()
|
||||
self.jobs: dict[str, SwapJob] = {}
|
||||
self.current_job_id: Optional[str] = None
|
||||
@@ -78,6 +85,21 @@ class SwapManager:
|
||||
job.finished_at = datetime.now(timezone.utc).isoformat()
|
||||
if self.current_job_id == job.id:
|
||||
self.current_job_id = None
|
||||
# Outside the swap lock (so a webhook POST can't stall a queued swap) and
|
||||
# only for real swaps — a dry run never changes the running model. A
|
||||
# webhook failure is logged inside fire(), never raised.
|
||||
if self.notifier is not None and self.notifier.enabled and not job.dry_run:
|
||||
event = "swap_complete" if job.state == "ready" else "swap_failed"
|
||||
await self.notifier.fire(event, build_webhook_payload(
|
||||
event=event,
|
||||
job_id=job.id,
|
||||
model_key=job.model_key,
|
||||
state=job.state,
|
||||
returncode=job.returncode,
|
||||
started_at=job.started_at,
|
||||
finished_at=job.finished_at,
|
||||
dry_run=job.dry_run,
|
||||
))
|
||||
|
||||
async def _do(self, job: SwapJob) -> None:
|
||||
model = self.catalog.models[job.model_key]
|
||||
|
||||
@@ -0,0 +1,201 @@
|
||||
"""Coordination layer: swap lock lifecycle/expiry, schedule registry CRUD, and
|
||||
the webhook payload+signature. All offline — the lock takes an injectable `now`
|
||||
so expiry is tested without sleeping, and the webhook is exercised only on the
|
||||
disabled (no-network) path plus its pure payload/signature helpers.
|
||||
"""
|
||||
import asyncio
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from app.coordination import (
|
||||
LOCK_TTL_MAX,
|
||||
LOCK_TTL_MIN,
|
||||
LockHeld,
|
||||
ScheduleRegistry,
|
||||
SwapLockManager,
|
||||
WebhookNotifier,
|
||||
build_webhook_payload,
|
||||
sign_payload,
|
||||
valid_schedule_id,
|
||||
)
|
||||
|
||||
T0 = datetime(2026, 6, 17, 12, 0, 0, tzinfo=timezone.utc)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------- swap lock ----
|
||||
|
||||
def test_acquire_free_lock_returns_token_and_status_held():
|
||||
mgr = SwapLockManager()
|
||||
lock = mgr.acquire("openclaw", ttl_seconds=60, note="daily vol", now=T0)
|
||||
assert lock.token
|
||||
st = mgr.status(now=T0)
|
||||
assert st["held"] is True
|
||||
assert st["holder"] == "openclaw"
|
||||
assert st["note"] == "daily vol"
|
||||
assert st["seconds_remaining"] == 60
|
||||
assert "token" not in st # public view never leaks the token
|
||||
|
||||
|
||||
def test_acquire_requires_holder():
|
||||
with pytest.raises(ValueError):
|
||||
SwapLockManager().acquire(" ", now=T0)
|
||||
|
||||
|
||||
def test_acquire_held_by_other_raises_lockheld_with_state():
|
||||
mgr = SwapLockManager()
|
||||
mgr.acquire("openclaw", ttl_seconds=60, now=T0)
|
||||
with pytest.raises(LockHeld) as ei:
|
||||
mgr.acquire("johnny5", ttl_seconds=60, now=T0)
|
||||
assert ei.value.state["holder"] == "openclaw"
|
||||
|
||||
|
||||
def test_reacquire_with_token_extends_and_keeps_token():
|
||||
mgr = SwapLockManager()
|
||||
first = mgr.acquire("openclaw", ttl_seconds=60, now=T0)
|
||||
later = T0 + timedelta(seconds=30)
|
||||
second = mgr.acquire("openclaw", ttl_seconds=60, token=first.token, now=later)
|
||||
assert second.token == first.token
|
||||
# window extended from the later moment, not the original
|
||||
assert mgr.status(now=later)["seconds_remaining"] == 60
|
||||
assert second.acquired_at == first.acquired_at # acquired_at preserved
|
||||
|
||||
|
||||
def test_reacquire_without_token_is_refused_even_for_same_holder_name():
|
||||
# Holder name is descriptive, not a secret — matching it must not grant access.
|
||||
mgr = SwapLockManager()
|
||||
mgr.acquire("openclaw", ttl_seconds=60, now=T0)
|
||||
with pytest.raises(LockHeld):
|
||||
mgr.acquire("openclaw", ttl_seconds=60, now=T0)
|
||||
|
||||
|
||||
def test_ttl_is_clamped():
|
||||
mgr = SwapLockManager()
|
||||
mgr.acquire("a", ttl_seconds=0, now=T0)
|
||||
assert mgr.status(now=T0)["seconds_remaining"] == LOCK_TTL_MIN
|
||||
mgr2 = SwapLockManager()
|
||||
mgr2.acquire("b", ttl_seconds=10**9, now=T0)
|
||||
assert mgr2.status(now=T0)["seconds_remaining"] == LOCK_TTL_MAX
|
||||
|
||||
|
||||
def test_lock_expires_and_clears_lazily():
|
||||
mgr = SwapLockManager()
|
||||
tok = mgr.acquire("openclaw", ttl_seconds=10, now=T0).token
|
||||
after = T0 + timedelta(seconds=11)
|
||||
assert mgr.status(now=after) == {"held": False}
|
||||
assert mgr.verify(tok, now=after) is False
|
||||
# an expired lock is free to re-take by anyone
|
||||
mgr.acquire("johnny5", ttl_seconds=10, now=after)
|
||||
assert mgr.status(now=after)["holder"] == "johnny5"
|
||||
|
||||
|
||||
def test_verify_matches_only_active_token():
|
||||
mgr = SwapLockManager()
|
||||
tok = mgr.acquire("openclaw", ttl_seconds=60, now=T0).token
|
||||
assert mgr.verify(tok, now=T0) is True
|
||||
assert mgr.verify("nope", now=T0) is False
|
||||
assert mgr.verify(None, now=T0) is False
|
||||
|
||||
|
||||
def test_release_requires_token_then_frees():
|
||||
mgr = SwapLockManager()
|
||||
tok = mgr.acquire("openclaw", ttl_seconds=60, now=T0).token
|
||||
with pytest.raises(PermissionError):
|
||||
mgr.release("wrong", now=T0)
|
||||
assert mgr.release(tok, now=T0) is True
|
||||
assert mgr.status(now=T0) == {"held": False}
|
||||
|
||||
|
||||
def test_force_release_skips_token_and_release_of_free_lock_is_false():
|
||||
mgr = SwapLockManager()
|
||||
mgr.acquire("openclaw", ttl_seconds=60, now=T0)
|
||||
assert mgr.release(force=True, now=T0) is True
|
||||
assert mgr.release(force=True, now=T0) is False # nothing held now
|
||||
|
||||
|
||||
def test_is_blocked_by_is_the_swap_gate():
|
||||
# Mirrors the single-read decision the /api/swap endpoint makes.
|
||||
mgr = SwapLockManager()
|
||||
assert mgr.is_blocked_by(None, now=T0) is None # free lock blocks nobody
|
||||
tok = mgr.acquire("openclaw", ttl_seconds=10, now=T0).token
|
||||
blocked = mgr.is_blocked_by(None, now=T0) # no token -> blocked
|
||||
assert blocked is not None and blocked["holder"] == "openclaw"
|
||||
assert mgr.is_blocked_by("wrong", now=T0) is not None # wrong token -> blocked
|
||||
assert mgr.is_blocked_by(tok, now=T0) is None # holder's token -> allowed
|
||||
# At/after expiry the gate is open even without a token (the bug a separate
|
||||
# status()+verify() pair would get wrong).
|
||||
assert mgr.is_blocked_by(None, now=T0 + timedelta(seconds=11)) is None
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- webhook ----
|
||||
|
||||
def test_build_webhook_payload_shape():
|
||||
p = build_webhook_payload(
|
||||
event="swap_complete", job_id="abc123", model_key="gemma",
|
||||
state="ready", returncode=0, started_at="t0", finished_at="t1",
|
||||
dry_run=False,
|
||||
)
|
||||
assert p == {
|
||||
"event": "swap_complete", "job_id": "abc123", "model_key": "gemma",
|
||||
"state": "ready", "returncode": 0, "started_at": "t0",
|
||||
"finished_at": "t1", "dry_run": False,
|
||||
}
|
||||
|
||||
|
||||
def test_sign_payload_is_deterministic_and_prefixed():
|
||||
body = b'{"event":"swap_complete"}'
|
||||
sig = sign_payload("s3cr3t", body)
|
||||
assert sig.startswith("sha256=")
|
||||
assert sig == sign_payload("s3cr3t", body)
|
||||
assert sig != sign_payload("other", body)
|
||||
|
||||
|
||||
def test_disabled_webhook_fire_is_noop():
|
||||
n = WebhookNotifier("", "")
|
||||
assert n.enabled is False
|
||||
# Must not attempt any network call or raise when no URL is configured.
|
||||
assert asyncio.run(n.fire("swap_complete", {"x": 1})) is None
|
||||
|
||||
|
||||
# --------------------------------------------------------- schedule registry ----
|
||||
|
||||
def test_register_and_list_schedule():
|
||||
reg = ScheduleRegistry()
|
||||
e = reg.register(name="Daily Vol", owner="openclaw", cron="0 6 * * *")
|
||||
assert e.id and e.registered_at and e.updated_at
|
||||
listed = reg.list()
|
||||
assert len(listed) == 1 and listed[0]["name"] == "Daily Vol"
|
||||
|
||||
|
||||
def test_register_with_id_updates_in_place():
|
||||
reg = ScheduleRegistry()
|
||||
reg.register(name="Daily Vol", id="dv", owner="openclaw", cron="0 6 * * *")
|
||||
reg.register(name="Daily Vol v2", id="dv", owner="openclaw", cron="0 7 * * *")
|
||||
listed = reg.list()
|
||||
assert len(listed) == 1
|
||||
assert listed[0]["name"] == "Daily Vol v2" and listed[0]["cron"] == "0 7 * * *"
|
||||
|
||||
|
||||
def test_register_requires_name_and_validates_id():
|
||||
reg = ScheduleRegistry()
|
||||
with pytest.raises(ValueError):
|
||||
reg.register(name=" ")
|
||||
with pytest.raises(ValueError):
|
||||
reg.register(name="ok", id="bad id; rm -rf")
|
||||
|
||||
|
||||
def test_delete_schedule():
|
||||
reg = ScheduleRegistry()
|
||||
reg.register(name="Daily Vol", id="dv")
|
||||
assert reg.delete("dv") is True
|
||||
assert reg.delete("dv") is False
|
||||
assert reg.list() == []
|
||||
|
||||
|
||||
def test_valid_schedule_id():
|
||||
assert valid_schedule_id("daily-vol")
|
||||
assert valid_schedule_id("a.b_c-1")
|
||||
assert not valid_schedule_id("")
|
||||
assert not valid_schedule_id("../etc")
|
||||
assert not valid_schedule_id("has space")
|
||||
assert not valid_schedule_id("x" * 65)
|
||||
@@ -173,6 +173,24 @@ const inputSpec = InputSpec.of({
|
||||
placeholder: 'starts with "nvapi-..."',
|
||||
masked: true,
|
||||
}),
|
||||
swap_webhook_url: Value.text({
|
||||
name: 'Swap webhook URL (optional)',
|
||||
description:
|
||||
'If you run automation that needs to know when the loaded model changes, paste a URL here. Spark Control POSTs a small JSON event (swap_complete / swap_failed) to it after every model swap, so the consumer can re-point its config to the new model. Leave blank to disable. Only needed if something other than this dashboard cares about swaps.',
|
||||
required: false,
|
||||
default: null,
|
||||
placeholder: 'e.g. https://my-service.local/spark-swap',
|
||||
masked: false,
|
||||
}),
|
||||
swap_webhook_secret: Value.text({
|
||||
name: 'Swap webhook secret (optional)',
|
||||
description:
|
||||
'Optional shared secret. If set, each webhook is signed with an "X-Spark-Signature: sha256=…" header (HMAC of the body) so the receiver can verify it really came from Spark Control. Leave blank to send the webhook unsigned.',
|
||||
required: false,
|
||||
default: null,
|
||||
placeholder: 'a random string the receiver also knows',
|
||||
masked: true,
|
||||
}),
|
||||
})
|
||||
|
||||
export const configureSparks = sdk.Action.withInput(
|
||||
|
||||
@@ -35,6 +35,11 @@ export const sparkConfigSchema = z.object({
|
||||
open_webui_url: z.string().catch(''),
|
||||
// Optional NGC API key for pulling NIM containers from nvcr.io/nim/...
|
||||
ngc_api_key: z.string().catch(''),
|
||||
// Optional coordination webhook: POSTed on swap_complete/swap_failed so
|
||||
// downstream consumers re-point their model config. Blank => disabled.
|
||||
swap_webhook_url: z.string().catch(''),
|
||||
// Optional shared secret; if set, the webhook body is HMAC-signed.
|
||||
swap_webhook_secret: z.string().catch(''),
|
||||
})
|
||||
|
||||
export type SparkConfig = z.infer<typeof sparkConfigSchema>
|
||||
|
||||
@@ -32,6 +32,8 @@ export const main = sdk.setupMain(async ({ effects }) => {
|
||||
matrix_bridge_user: '',
|
||||
open_webui_url: '',
|
||||
ngc_api_key: '',
|
||||
swap_webhook_url: '',
|
||||
swap_webhook_secret: '',
|
||||
}
|
||||
|
||||
return sdk.Daemons.of(effects).addDaemon('primary', {
|
||||
@@ -75,6 +77,8 @@ export const main = sdk.setupMain(async ({ effects }) => {
|
||||
CONNECTIVITY_LOG: '/data/connectivity.json',
|
||||
OPEN_WEBUI_URL: cfg.open_webui_url,
|
||||
NGC_API_KEY: cfg.ngc_api_key,
|
||||
SWAP_WEBHOOK_URL: cfg.swap_webhook_url,
|
||||
SWAP_WEBHOOK_SECRET: cfg.swap_webhook_secret,
|
||||
BIND_PORT: String(uiPort),
|
||||
},
|
||||
},
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import { VersionInfo, IMPOSSIBLE } from '@start9labs/start-sdk'
|
||||
|
||||
export const v0_1_0 = VersionInfo.of({
|
||||
version: '0.24.0:0',
|
||||
version: '0.25.0:0',
|
||||
releaseNotes: {
|
||||
en_US:
|
||||
"v0.24.0:0 — configurable cluster topology. Spark Control no longer assumes our exact layout, so a cluster that's wired differently can be monitored without forking. Three new optional settings in Configure Sparks: (1) vLLM container name — defaults to \"vllm_node\"; set it if your swappable vLLM runs under a different container name (the swap log view and pre-flight validator exec into it by name). (2) Services to hide — a comma-separated list of built-in services your cluster doesn't run (parakeet, kokoro, embeddings, qdrant); hidden ones show no tile and are never probed, so e.g. a vLLM sharing Parakeet's default port 8000 no longer gets a confusing Parakeet probe. (3) Monitor a second vLLM — register a vLLM on another Spark as a custom service with kind \"vllm\" (in /data/services-overrides.yaml); it gets a read-only health tile (loaded model + container state + start/stop/restart) alongside the swappable one. API: /api/endpoints now reports a `disabled` flag per service.",
|
||||
"v0.25.0:0 — cluster coordination layer (GPU arbiter). For clusters where automation, not just this dashboard, swaps models. Three additions: (1) Swap reservation lock — an external scheduler can reserve the GPU swap path (POST /api/swap/lock) and gets a secret token; while held, any swap without the token is refused (423), so the dashboard's manual swap is paused and shows who holds the GPU and until when (with a human Release override). The lock is TTL-bounded and self-frees. (2) Swap webhook — set a URL (and optional signing secret) in Configure Sparks; Spark Control POSTs a swap_complete / swap_failed event after each swap so downstream consumers re-point their model config. (3) Schedule registry — your automation can register its cron jobs (POST /api/schedule) for a read-only \"Scheduled jobs\" panel on the dashboard; Spark Control only displays them, it never runs them. New API: /api/swap/lock (GET/POST/DELETE), /api/schedule (GET/POST/DELETE). See docs/COORDINATION.md. Spark Control remains a control plane, not a job runner — business pipelines stay in their own services and call the swap API.",
|
||||
},
|
||||
migrations: {
|
||||
up: async ({ effects }) => {},
|
||||
|
||||
Reference in New Issue
Block a user