Files
spark-control/image/app/nim.py
T
Keysat 8d839e3714 v0.13.0:4 - redaction gateway, embeddings proxy, expanded audio API
- Add redaction gateway (redaction_gateway.py, redaction/ scrub + tests)
- Add embeddings proxy and spark_embed service (Dockerfile + main.py)
- Expand audio_proxy with speaker-aware handling; deep_health/health/server updates
- Package: configureSparks action + sparkConfig model updates, manifest/main wiring
- Docs: AUDIO_API, EMBEDDINGS, REDACTION_GATEWAY; HANDOFF and runbook/known-issues refresh
2026-06-11 17:45:57 -05:00

193 lines
7.5 KiB
Python

"""NVIDIA NIM container install / lifecycle.
Two pieces:
* A small curated catalog of NIM images (so users don't have to copy/paste
huge nvcr.io URLs).
* An installer that SSHes into the target Spark, runs `docker pull` then
`docker run -d --gpus all -p PORT:PORT -v VOLUME:/opt/nim/.cache
-e NGC_API_KEY=... IMAGE` and streams output.
Custom services also persist via `overrides.add_custom_service()` so the
Services panel can show them.
"""
from __future__ import annotations
import asyncio
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
from .config import Settings
from .ssh import ssh_stream, StreamHandle
# Curated list. These are the most useful NIM containers for a dual-Spark
# audio-and-LLM setup. Browse the full catalog at
# https://catalog.ngc.nvidia.com/orgs/nim/teams/nvidia
CATALOG_URL = "https://catalog.ngc.nvidia.com/orgs/nim/teams/nvidia/containers"
SUGGESTED_NIMS: list[dict] = [
{
"key": "parakeet-tdt-0.6b-v3",
"name": "Parakeet TDT 0.6B v3",
"image": "nvcr.io/nim/nvidia/parakeet-tdt-0-6b-v3:latest",
"default_container": "parakeet-asr",
"default_port": 8000,
"kind": "stt",
"description": "Streaming speech-to-text (English). Used by Open WebUI for voice input. ~1 GB.",
"homepage": "https://catalog.ngc.nvidia.com/orgs/nim/teams/nvidia/containers/parakeet-tdt-0-6b-v3",
},
{
"key": "riva-multilingual",
"name": "Riva Multilingual ASR",
"image": "nvcr.io/nim/nvidia/riva-multilingual:latest",
"default_container": "riva-asr",
"default_port": 8001,
"kind": "stt",
"description": "NVIDIA Riva speech-recognition multi-language model. Larger and more accurate than Parakeet.",
"homepage": "https://catalog.ngc.nvidia.com/orgs/nim/teams/nvidia",
},
]
@dataclass
class NimInstallJob:
id: str
image: str
container: str
port: int
host: str
user: str
volume: Optional[str]
started_at: str
state: str = "starting" # starting | pulling | running | done | failed
phase: str = "Starting…"
lines: list[str] = field(default_factory=list)
returncode: Optional[int] = None
finished_at: Optional[str] = None
def append(self, line: str) -> None:
self.lines.append(line)
if len(self.lines) > 1000:
del self.lines[: len(self.lines) - 1000]
class NimManager:
def __init__(self, settings: Settings) -> None:
self.settings = settings
self.lock = asyncio.Lock()
self.jobs: dict[str, NimInstallJob] = {}
self.current_job_id: Optional[str] = None
def get(self, job_id: str) -> NimInstallJob | None:
return self.jobs.get(job_id)
async def trigger(
self,
*,
image: str,
container: str,
port: int,
host: str,
user: str,
volume: str | None = None,
extra_env: dict[str, str] | None = None,
) -> NimInstallJob:
if self.lock.locked():
raise RuntimeError("Another NIM install is already in progress")
if not host or not user:
raise RuntimeError("target host not configured")
if not self.settings.ngc_api_key:
raise RuntimeError(
"NGC_API_KEY is not set. Open Configure Sparks in StartOS and paste your NGC personal API key (free at https://ngc.nvidia.com/setup/personal-key)."
)
job = NimInstallJob(
id=uuid.uuid4().hex[:8],
image=image,
container=container,
port=port,
host=host,
user=user,
volume=volume or f"{container}-cache",
started_at=datetime.now(timezone.utc).isoformat(),
)
self.jobs[job.id] = job
self.current_job_id = job.id
asyncio.create_task(self._run(job, extra_env or {}))
return job
async def _run(self, job: NimInstallJob, extra_env: dict[str, str]) -> None:
async with self.lock:
try:
await self._do(job, extra_env)
if job.state != "failed":
job.state = "done"
job.returncode = 0
job.phase = "Done"
except Exception as e:
job.append(f"[error] {type(e).__name__}: {e}")
job.state = "failed"
if job.returncode is None:
job.returncode = 1
finally:
job.finished_at = datetime.now(timezone.utc).isoformat()
if self.current_job_id == job.id:
self.current_job_id = None
async def _do(self, job: NimInstallJob, extra_env: dict[str, str]) -> None:
# Build the bash one-liner. We use docker login non-interactively with the NGC API key.
env_parts = [f'-e NGC_API_KEY=$NGC_API_KEY']
for k, v in extra_env.items():
env_parts.append(f"-e {k}={v}")
env_str = " ".join(env_parts)
cmd = (
f"set -e; "
f"export NGC_API_KEY='{self.settings.ngc_api_key}'; "
f"echo '=== docker login nvcr.io ==='; "
f"echo \"$NGC_API_KEY\" | docker login nvcr.io -u '$oauthtoken' --password-stdin; "
f"echo '=== docker pull {job.image} (this can be 1-10 GB) ==='; "
f"docker pull {job.image}; "
f"echo '=== remove any prior container with the same name ==='; "
f"docker rm -f {job.container} 2>/dev/null || true; "
f"echo '=== docker run -d --gpus all -p {job.port}:{job.port} -v {job.volume}:/opt/nim/.cache {env_str} --name {job.container} --restart unless-stopped {job.image} ==='; "
f"docker run -d --gpus all "
f"-p {job.port}:{job.port} "
f"-v {job.volume}:/opt/nim/.cache "
f"{env_str} "
f"--name {job.container} "
f"--restart unless-stopped "
f"{job.image}; "
f"echo '=== ensuring cache volume is writable by uid 1000 (riva-server) ==='; "
f"docker run --rm -v {job.volume}:/cache alpine chown -R 1000:1000 /cache && "
f"docker restart {job.container}; "
f"echo '=== install complete; container is starting up and will download its model on first boot ==='"
)
job.append(f"$ <install command for {job.image} on {job.host}>")
job.state = "pulling"
job.phase = "Pulling image from nvcr.io (this can take a few minutes)…"
handle = StreamHandle()
async for line in ssh_stream(job.host, job.user, cmd, self.settings, handle=handle):
# Don't log lines containing the api key
if self.settings.ngc_api_key and self.settings.ngc_api_key in line:
continue
job.append(line)
if "docker pull" in line:
job.phase = "Pulling image from nvcr.io…"
elif "Login Succeeded" in line:
job.phase = "Logged in to NGC; pulling image…"
elif "Pull complete" in line:
job.phase = "Pulling layers…"
elif "Status: Downloaded newer image" in line or "Image is up to date" in line:
job.phase = "Image ready; starting container…"
elif "docker run -d" in line:
job.state = "running"
job.phase = "Container starting; downloading model on first boot…"
rc = handle.returncode or 0
if rc != 0:
job.state = "failed"
job.returncode = rc