Files

146 lines
6.1 KiB
Python

"""Per-Spark hardware snapshots: RAM, disk, GPU memory + utilization, CPU load, uptime.
Drives via a single SSH command per Spark that runs `free`, `df`, `nvidia-smi`,
`/proc/loadavg`, and `uptime -p` and prints labeled lines back. We parse those
labels in `_parse`.
"""
from __future__ import annotations
import asyncio
import time
from typing import Any
from .config import Settings
from .connectivity import record_mac, record_state
from .ssh import ssh_run
_PROBE = r"""
set -e
echo HOSTNAME=$(hostname)
echo UPTIME=$(uptime -p 2>/dev/null || uptime)
echo LOAD=$(awk '{print $1, $2, $3}' /proc/loadavg)
echo CORES=$(nproc 2>/dev/null || echo 0)
echo MEMORY=$(free -b 2>/dev/null | awk '/^Mem:/ {print $2, $3}')
echo DISK=$(df -B1 / 2>/dev/null | awk 'NR==2 {print $2, $3}')
echo GPU=$(nvidia-smi --query-gpu=name,utilization.gpu,temperature.gpu,power.draw,memory.total --format=csv,noheader,nounits 2>/dev/null | head -1)
echo GPU_MEM_USED_MIB=$(nvidia-smi --query-compute-apps=used_gpu_memory --format=csv,noheader,nounits 2>/dev/null | awk '{s+=$1} END {print s+0}')
DEFIF=$(ip route show default 2>/dev/null | awk '{print $5; exit}')
echo MAC=$(cat /sys/class/net/$DEFIF/address 2>/dev/null)
WGIF=$(ip -o link show type wireguard 2>/dev/null | awk -F': ' 'NR==1 {print $2}')
echo WG_IFACE=$WGIF
echo WG_ADDR=$(ip -o -4 addr show "$WGIF" 2>/dev/null | awk 'NR==1 {print $4}')
""".strip()
def _parse_int(s: str) -> int | None:
try: return int(s)
except (TypeError, ValueError): return None
def _parse(out: str) -> dict:
info: dict[str, Any] = {}
for raw in out.splitlines():
if "=" not in raw:
continue
k, v = raw.split("=", 1)
info[k.strip().lower()] = v.strip()
parsed: dict[str, Any] = {}
parsed["hostname"] = info.get("hostname")
parsed["uptime"] = info.get("uptime")
parsed["cores"] = _parse_int(info.get("cores", ""))
# Load average -> (1m, 5m, 15m)
if info.get("load"):
loads = info["load"].split()
try:
parsed["load"] = [float(x) for x in loads[:3]]
except ValueError:
parsed["load"] = None
# Memory: total used in bytes
if info.get("memory"):
mem = info["memory"].split()
if len(mem) == 2:
tot, used = _parse_int(mem[0]), _parse_int(mem[1])
parsed["ram_total_bytes"] = tot
parsed["ram_used_bytes"] = used
# Disk: total used in bytes
if info.get("disk"):
dk = info["disk"].split()
if len(dk) == 2:
parsed["disk_total_bytes"] = _parse_int(dk[0])
parsed["disk_used_bytes"] = _parse_int(dk[1])
# GPU: "name, util_gpu, temp_C, power_W, memory_total_MiB"
if info.get("gpu"):
parts = [p.strip() for p in info["gpu"].split(",")]
if len(parts) >= 5:
name, ug, temp, power, mt = parts[0], parts[1], parts[2], parts[3], parts[4]
parsed["gpu_name"] = name
parsed["gpu_util_pct"] = _parse_int(ug)
parsed["gpu_temp_c"] = _parse_int(temp)
try: parsed["gpu_power_w"] = float(power)
except ValueError: parsed["gpu_power_w"] = None
# memory.total may be "[N/A]" on unified-memory systems (DGX Spark)
parsed["gpu_mem_total_mib"] = _parse_int(mt)
parsed["gpu_unified_memory"] = parsed["gpu_mem_total_mib"] is None
# Sum per-process compute memory (works even on unified-memory systems)
if info.get("gpu_mem_used_mib"):
parsed["gpu_mem_used_mib"] = _parse_int(info["gpu_mem_used_mib"])
# MAC address on the default-route interface (for Wake-on-LAN)
if info.get("mac"):
parsed["mac"] = info["mac"].lower()
# WireGuard tunnel membership: name + address of the first wg interface, if
# any. Read-only and unprivileged (`ip` needs no root), so it never depends
# on sudo and never breaks the probe — absence just yields no badge.
parsed["wg_iface"] = info.get("wg_iface") or None
parsed["wg_addr"] = info.get("wg_addr") or None
return parsed
class HardwareProbe:
"""Caches results briefly to avoid hammering the Sparks."""
def __init__(self, settings: Settings, ttl_sec: float = 4.0, fail_ttl_sec: float = 25.0) -> None:
self.settings = settings
self.ttl_sec = ttl_sec
self.fail_ttl_sec = fail_ttl_sec
self._cache: dict[str, tuple[float, dict]] = {}
self._locks: dict[str, asyncio.Lock] = {}
def _ttl_for(self, value: dict) -> float:
return self.ttl_sec if value.get("reachable") else self.fail_ttl_sec
def _lock(self, key: str) -> asyncio.Lock:
if key not in self._locks:
self._locks[key] = asyncio.Lock()
return self._locks[key]
async def fetch(self) -> dict:
s1, s2 = await asyncio.gather(
self._one("spark1", self.settings.spark1_host, self.settings.spark1_user),
self._one("spark2", self.settings.spark2_host, self.settings.spark2_user),
)
return {"spark1": s1, "spark2": s2}
async def _one(self, key: str, host: str, user: str) -> dict:
if not host or not user:
return {"reachable": False, "configured": False}
async with self._lock(key):
now = time.monotonic()
cached = self._cache.get(key)
if cached and (now - cached[0] < self._ttl_for(cached[1])):
return cached[1]
# Use a shorter timeout for the connect phase; if a previous probe
# marked this host unreachable, return the cached failure immediately.
rc, out, err = await ssh_run(host, user, _PROBE, self.settings, timeout=6)
if rc != 0:
result = {"reachable": False, "configured": True, "host": host, "error": err.strip() or out.strip() or f"rc={rc}"}
self._cache[key] = (now, result)
record_state(key, False)
return result
parsed = _parse(out)
result = {"reachable": True, "configured": True, "host": host, **parsed}
self._cache[key] = (now, result)
record_state(key, True)
if parsed.get("mac"):
record_mac(key, parsed["mac"])
return result