a02f4db850
wol.py:
- build_magic_packet(): standard 6x0xFF + 16x MAC layout
- send_local_broadcast(): direct from container (ports 9 + 7 for safety)
- send_via_peer(): preferred path; SSHes to the OTHER Spark and runs a Python one-liner there so the packet originates on the target's LAN segment (most reliable)
- MAC validation + normalization
connectivity.py:
- /data/connectivity.json persistence (thread-safe, atomic rename)
- Stores per-Spark current state + last_change timestamp + rolling 200-event log
- Records up/down transitions; computes down_seconds / up_seconds durations
- MAC cache populated lazily during hardware probes
hardware.py:
- Probe now reads MAC via /sys/class/net/<default-route-iface>/address
- After each probe, record_state() emits a transition event if state changed
- record_mac() caches the address so WoL works when the Spark next goes down
Endpoints:
- GET /api/connectivity: macs, current state, last_change, events[]
- POST /api/spark/{name}/wake: tries via-peer first, falls back to direct broadcast
UI:
- Unreachable hardware card shows the cached MAC + 'Wake (WoL)' button (only if MAC known)
- New 'Connectivity log' button opens a modal with per-Spark transition history (last 25 each), including duration of each prior up/down period
- pollHardware also pulls /api/connectivity so WoL buttons appear without an extra fetch
Package: bump 0.5.0:0; main.ts sets CONNECTIVITY_LOG=/data/connectivity.json
138 lines
5.6 KiB
Python
138 lines
5.6 KiB
Python
"""Per-Spark hardware snapshots: RAM, disk, GPU memory + utilization, CPU load, uptime.
|
|
|
|
Drives via a single SSH command per Spark that runs `free`, `df`, `nvidia-smi`,
|
|
`/proc/loadavg`, and `uptime -p` and prints labeled lines back. We parse those
|
|
labels in `_parse`.
|
|
"""
|
|
from __future__ import annotations
|
|
import asyncio
|
|
import time
|
|
from typing import Any
|
|
|
|
from .config import Settings
|
|
from .connectivity import record_mac, record_state
|
|
from .ssh import ssh_run
|
|
|
|
|
|
_PROBE = r"""
|
|
set -e
|
|
echo HOSTNAME=$(hostname)
|
|
echo UPTIME=$(uptime -p 2>/dev/null || uptime)
|
|
echo LOAD=$(awk '{print $1, $2, $3}' /proc/loadavg)
|
|
echo CORES=$(nproc 2>/dev/null || echo 0)
|
|
echo MEMORY=$(free -b 2>/dev/null | awk '/^Mem:/ {print $2, $3}')
|
|
echo DISK=$(df -B1 / 2>/dev/null | awk 'NR==2 {print $2, $3}')
|
|
echo GPU=$(nvidia-smi --query-gpu=name,utilization.gpu,temperature.gpu,power.draw,memory.total --format=csv,noheader,nounits 2>/dev/null | head -1)
|
|
echo GPU_MEM_USED_MIB=$(nvidia-smi --query-compute-apps=used_gpu_memory --format=csv,noheader,nounits 2>/dev/null | awk '{s+=$1} END {print s+0}')
|
|
DEFIF=$(ip route show default 2>/dev/null | awk '{print $5; exit}')
|
|
echo MAC=$(cat /sys/class/net/$DEFIF/address 2>/dev/null)
|
|
""".strip()
|
|
|
|
|
|
def _parse_int(s: str) -> int | None:
|
|
try: return int(s)
|
|
except (TypeError, ValueError): return None
|
|
|
|
|
|
def _parse(out: str) -> dict:
|
|
info: dict[str, Any] = {}
|
|
for raw in out.splitlines():
|
|
if "=" not in raw:
|
|
continue
|
|
k, v = raw.split("=", 1)
|
|
info[k.strip().lower()] = v.strip()
|
|
parsed: dict[str, Any] = {}
|
|
parsed["hostname"] = info.get("hostname")
|
|
parsed["uptime"] = info.get("uptime")
|
|
parsed["cores"] = _parse_int(info.get("cores", ""))
|
|
# Load average -> (1m, 5m, 15m)
|
|
if info.get("load"):
|
|
loads = info["load"].split()
|
|
try:
|
|
parsed["load"] = [float(x) for x in loads[:3]]
|
|
except ValueError:
|
|
parsed["load"] = None
|
|
# Memory: total used in bytes
|
|
if info.get("memory"):
|
|
mem = info["memory"].split()
|
|
if len(mem) == 2:
|
|
tot, used = _parse_int(mem[0]), _parse_int(mem[1])
|
|
parsed["ram_total_bytes"] = tot
|
|
parsed["ram_used_bytes"] = used
|
|
# Disk: total used in bytes
|
|
if info.get("disk"):
|
|
dk = info["disk"].split()
|
|
if len(dk) == 2:
|
|
parsed["disk_total_bytes"] = _parse_int(dk[0])
|
|
parsed["disk_used_bytes"] = _parse_int(dk[1])
|
|
# GPU: "name, util_gpu, temp_C, power_W, memory_total_MiB"
|
|
if info.get("gpu"):
|
|
parts = [p.strip() for p in info["gpu"].split(",")]
|
|
if len(parts) >= 5:
|
|
name, ug, temp, power, mt = parts[0], parts[1], parts[2], parts[3], parts[4]
|
|
parsed["gpu_name"] = name
|
|
parsed["gpu_util_pct"] = _parse_int(ug)
|
|
parsed["gpu_temp_c"] = _parse_int(temp)
|
|
try: parsed["gpu_power_w"] = float(power)
|
|
except ValueError: parsed["gpu_power_w"] = None
|
|
# memory.total may be "[N/A]" on unified-memory systems (DGX Spark)
|
|
parsed["gpu_mem_total_mib"] = _parse_int(mt)
|
|
parsed["gpu_unified_memory"] = parsed["gpu_mem_total_mib"] is None
|
|
# Sum per-process compute memory (works even on unified-memory systems)
|
|
if info.get("gpu_mem_used_mib"):
|
|
parsed["gpu_mem_used_mib"] = _parse_int(info["gpu_mem_used_mib"])
|
|
# MAC address on the default-route interface (for Wake-on-LAN)
|
|
if info.get("mac"):
|
|
parsed["mac"] = info["mac"].lower()
|
|
return parsed
|
|
|
|
|
|
class HardwareProbe:
|
|
"""Caches results briefly to avoid hammering the Sparks."""
|
|
|
|
def __init__(self, settings: Settings, ttl_sec: float = 4.0, fail_ttl_sec: float = 25.0) -> None:
|
|
self.settings = settings
|
|
self.ttl_sec = ttl_sec
|
|
self.fail_ttl_sec = fail_ttl_sec
|
|
self._cache: dict[str, tuple[float, dict]] = {}
|
|
self._locks: dict[str, asyncio.Lock] = {}
|
|
|
|
def _ttl_for(self, value: dict) -> float:
|
|
return self.ttl_sec if value.get("reachable") else self.fail_ttl_sec
|
|
|
|
def _lock(self, key: str) -> asyncio.Lock:
|
|
if key not in self._locks:
|
|
self._locks[key] = asyncio.Lock()
|
|
return self._locks[key]
|
|
|
|
async def fetch(self) -> dict:
|
|
s1, s2 = await asyncio.gather(
|
|
self._one("spark1", self.settings.spark1_host, self.settings.spark1_user),
|
|
self._one("spark2", self.settings.spark2_host, self.settings.spark2_user),
|
|
)
|
|
return {"spark1": s1, "spark2": s2}
|
|
|
|
async def _one(self, key: str, host: str, user: str) -> dict:
|
|
if not host or not user:
|
|
return {"reachable": False, "configured": False}
|
|
async with self._lock(key):
|
|
now = time.monotonic()
|
|
cached = self._cache.get(key)
|
|
if cached and (now - cached[0] < self._ttl_for(cached[1])):
|
|
return cached[1]
|
|
# Use a shorter timeout for the connect phase; if a previous probe
|
|
# marked this host unreachable, return the cached failure immediately.
|
|
rc, out, err = await ssh_run(host, user, _PROBE, self.settings, timeout=6)
|
|
if rc != 0:
|
|
result = {"reachable": False, "configured": True, "host": host, "error": err.strip() or out.strip() or f"rc={rc}"}
|
|
self._cache[key] = (now, result)
|
|
record_state(key, False)
|
|
return result
|
|
parsed = _parse(out)
|
|
result = {"reachable": True, "configured": True, "host": host, **parsed}
|
|
self._cache[key] = (now, result)
|
|
record_state(key, True)
|
|
if parsed.get("mac"):
|
|
record_mac(key, parsed["mac"])
|
|
return result
|