"""Check for and apply updates to the upstream `eugr/spark-vllm-docker` checkout on Spark 1. We don't auto-update — only display what's available and let the user explicitly apply when they're ready. """ from __future__ import annotations import asyncio import uuid from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Optional from .config import Settings from .ssh import ssh_run, ssh_stream, StreamHandle async def get_update_status(settings: Settings) -> dict: """Return upstream-vs-local commit info. Runs `git fetch` then `git log`. No mutation other than refreshing remote refs (fetch). Safe to call frequently. """ if not settings.spark1_host or not settings.spark1_user: return {"ok": False, "error": "spark1 not configured"} script = ( "cd ~/spark-vllm-docker && " "branch=$(git rev-parse --abbrev-ref HEAD) && " "git fetch --quiet 2>&1 || true; " "ahead_behind=$(git rev-list --left-right --count HEAD...origin/$branch 2>/dev/null || echo '0\t0'); " "behind=$(echo \"$ahead_behind\" | awk '{print $2}'); " "ahead=$(echo \"$ahead_behind\" | awk '{print $1}'); " "current_hash=$(git rev-parse --short HEAD); " "current_msg=$(git log -1 --pretty=format:%s); " "dirty=$(git status --porcelain | wc -l); " "echo BRANCH=\"$branch\"; " "echo BEHIND=\"$behind\"; " "echo AHEAD=\"$ahead\"; " "echo DIRTY=\"$dirty\"; " "echo CURRENT=\"$current_hash $current_msg\"; " "echo ---LOG---; " "git log HEAD..origin/$branch --pretty=format:'%h %s (%ar)' 2>/dev/null | head -30" ) rc, out, err = await ssh_run( settings.spark1_host, settings.spark1_user, script, settings, timeout=30, ) if rc != 0: return {"ok": False, "error": err.strip() or out.strip() or f"rc={rc}"} info: dict = {"ok": True, "log": []} in_log = False for line in out.splitlines(): if line == "---LOG---": in_log = True continue if in_log: if line.strip(): info["log"].append(line) continue if "=" in line: k, v = line.split("=", 1) key = k.lower().strip() val = v.strip() if key in ("behind", "ahead", "dirty"): info[key] = int(val) if val.isdigit() else 0 else: info[key] = val return info @dataclass class UpdateJob: id: str mode: str # 'cluster' | 'solo' started_at: str state: str = "starting" lines: list[str] = field(default_factory=list) returncode: Optional[int] = None finished_at: Optional[str] = None phase: str = "Starting…" def append(self, line: str) -> None: self.lines.append(line) if len(self.lines) > 1000: del self.lines[: len(self.lines) - 1000] class UpdateManager: def __init__(self, settings: Settings) -> None: self.settings = settings self.lock = asyncio.Lock() self.jobs: dict[str, UpdateJob] = {} self.current_job_id: Optional[str] = None def get(self, job_id: str) -> UpdateJob | None: return self.jobs.get(job_id) async def trigger(self, mode: str = "cluster") -> UpdateJob: if mode not in ("cluster", "solo"): raise ValueError("mode must be 'cluster' or 'solo'") if self.lock.locked(): raise RuntimeError("An update is already in progress") job = UpdateJob( id=uuid.uuid4().hex[:8], mode=mode, started_at=datetime.now(timezone.utc).isoformat(), ) self.jobs[job.id] = job self.current_job_id = job.id asyncio.create_task(self._run(job)) return job async def _run(self, job: UpdateJob) -> None: async with self.lock: try: await self._do(job) if job.state != "failed": job.state = "done" job.returncode = 0 job.phase = "Done" except Exception as e: job.append(f"[error] {type(e).__name__}: {e}") job.state = "failed" if job.returncode is None: job.returncode = 1 finally: job.finished_at = datetime.now(timezone.utc).isoformat() if self.current_job_id == job.id: self.current_job_id = None async def _do(self, job: UpdateJob) -> None: s = self.settings if not s.spark1_host or not s.spark1_user: raise RuntimeError("spark1 not configured") flag = "-c" if job.mode == "cluster" else "" cmd = ( f"cd ~/spark-vllm-docker && " f"echo '=== git pull ===' && git pull --ff-only && " f"echo '=== build-and-copy ===' && ./build-and-copy.sh {flag}" ).strip() job.append(f"$ {cmd}") job.state = "running" job.phase = "Pulling latest changes…" handle = StreamHandle() async for line in ssh_stream(s.spark1_host, s.spark1_user, cmd, s, handle=handle): job.append(line) if "=== build-and-copy ===" in line: job.phase = "Building the vLLM container…" elif "Copy" in line and "complete" in line.lower(): job.phase = "Copying to peer Sparks…" elif "Already up to date" in line: job.phase = "No changes to pull; rebuilding…" rc = handle.returncode or 0 if rc != 0: job.state = "failed" job.returncode = rc