"""Per-Spark hardware snapshots: RAM, disk, GPU memory + utilization, CPU load, uptime. Drives via a single SSH command per Spark that runs `free`, `df`, `nvidia-smi`, `/proc/loadavg`, and `uptime -p` and prints labeled lines back. We parse those labels in `_parse`. """ from __future__ import annotations import asyncio import time from typing import Any from .config import Settings from .connectivity import record_mac, record_state from .ssh import ssh_run _PROBE = r""" set -e echo HOSTNAME=$(hostname) echo UPTIME=$(uptime -p 2>/dev/null || uptime) echo LOAD=$(awk '{print $1, $2, $3}' /proc/loadavg) echo CORES=$(nproc 2>/dev/null || echo 0) echo MEMORY=$(free -b 2>/dev/null | awk '/^Mem:/ {print $2, $3}') echo DISK=$(df -B1 / 2>/dev/null | awk 'NR==2 {print $2, $3}') echo GPU=$(nvidia-smi --query-gpu=name,utilization.gpu,temperature.gpu,power.draw,memory.total --format=csv,noheader,nounits 2>/dev/null | head -1) echo GPU_MEM_USED_MIB=$(nvidia-smi --query-compute-apps=used_gpu_memory --format=csv,noheader,nounits 2>/dev/null | awk '{s+=$1} END {print s+0}') DEFIF=$(ip route show default 2>/dev/null | awk '{print $5; exit}') echo MAC=$(cat /sys/class/net/$DEFIF/address 2>/dev/null) WGIF=$(ip -o link show type wireguard 2>/dev/null | awk -F': ' 'NR==1 {print $2}') echo WG_IFACE=$WGIF echo WG_ADDR=$(ip -o -4 addr show "$WGIF" 2>/dev/null | awk 'NR==1 {print $4}') """.strip() def _parse_int(s: str) -> int | None: try: return int(s) except (TypeError, ValueError): return None def _parse(out: str) -> dict: info: dict[str, Any] = {} for raw in out.splitlines(): if "=" not in raw: continue k, v = raw.split("=", 1) info[k.strip().lower()] = v.strip() parsed: dict[str, Any] = {} parsed["hostname"] = info.get("hostname") parsed["uptime"] = info.get("uptime") parsed["cores"] = _parse_int(info.get("cores", "")) # Load average -> (1m, 5m, 15m) if info.get("load"): loads = info["load"].split() try: parsed["load"] = [float(x) for x in loads[:3]] except ValueError: parsed["load"] = None # Memory: total used in bytes if info.get("memory"): mem = info["memory"].split() if len(mem) == 2: tot, used = _parse_int(mem[0]), _parse_int(mem[1]) parsed["ram_total_bytes"] = tot parsed["ram_used_bytes"] = used # Disk: total used in bytes if info.get("disk"): dk = info["disk"].split() if len(dk) == 2: parsed["disk_total_bytes"] = _parse_int(dk[0]) parsed["disk_used_bytes"] = _parse_int(dk[1]) # GPU: "name, util_gpu, temp_C, power_W, memory_total_MiB" if info.get("gpu"): parts = [p.strip() for p in info["gpu"].split(",")] if len(parts) >= 5: name, ug, temp, power, mt = parts[0], parts[1], parts[2], parts[3], parts[4] parsed["gpu_name"] = name parsed["gpu_util_pct"] = _parse_int(ug) parsed["gpu_temp_c"] = _parse_int(temp) try: parsed["gpu_power_w"] = float(power) except ValueError: parsed["gpu_power_w"] = None # memory.total may be "[N/A]" on unified-memory systems (DGX Spark) parsed["gpu_mem_total_mib"] = _parse_int(mt) parsed["gpu_unified_memory"] = parsed["gpu_mem_total_mib"] is None # Sum per-process compute memory (works even on unified-memory systems) if info.get("gpu_mem_used_mib"): parsed["gpu_mem_used_mib"] = _parse_int(info["gpu_mem_used_mib"]) # MAC address on the default-route interface (for Wake-on-LAN) if info.get("mac"): parsed["mac"] = info["mac"].lower() # WireGuard tunnel membership: name + address of the first wg interface, if # any. Read-only and unprivileged (`ip` needs no root), so it never depends # on sudo and never breaks the probe — absence just yields no badge. parsed["wg_iface"] = info.get("wg_iface") or None parsed["wg_addr"] = info.get("wg_addr") or None return parsed class HardwareProbe: """Caches results briefly to avoid hammering the Sparks.""" def __init__(self, settings: Settings, ttl_sec: float = 4.0, fail_ttl_sec: float = 25.0) -> None: self.settings = settings self.ttl_sec = ttl_sec self.fail_ttl_sec = fail_ttl_sec self._cache: dict[str, tuple[float, dict]] = {} self._locks: dict[str, asyncio.Lock] = {} def _ttl_for(self, value: dict) -> float: return self.ttl_sec if value.get("reachable") else self.fail_ttl_sec def _lock(self, key: str) -> asyncio.Lock: if key not in self._locks: self._locks[key] = asyncio.Lock() return self._locks[key] async def fetch(self) -> dict: s1, s2 = await asyncio.gather( self._one("spark1", self.settings.spark1_host, self.settings.spark1_user), self._one("spark2", self.settings.spark2_host, self.settings.spark2_user), ) return {"spark1": s1, "spark2": s2} async def _one(self, key: str, host: str, user: str) -> dict: if not host or not user: return {"reachable": False, "configured": False} async with self._lock(key): now = time.monotonic() cached = self._cache.get(key) if cached and (now - cached[0] < self._ttl_for(cached[1])): return cached[1] # Use a shorter timeout for the connect phase; if a previous probe # marked this host unreachable, return the cached failure immediately. rc, out, err = await ssh_run(host, user, _PROBE, self.settings, timeout=6) if rc != 0: result = {"reachable": False, "configured": True, "host": host, "error": err.strip() or out.strip() or f"rc={rc}"} self._cache[key] = (now, result) record_state(key, False) return result parsed = _parse(out) result = {"reachable": True, "configured": True, "host": host, **parsed} self._cache[key] = (now, result) record_state(key, True) if parsed.get("mac"): record_mac(key, parsed["mac"]) return result