ee8c2406b8
connectivity.py:
- Generalized 'spark' subject to any string; renamed 'spark' field to 'subject'
- Legacy v0.5 events with the old 'spark' field are migrated transparently on read (kind defaults to 'transition')
- New record_report(subject, ok, source, detail, latency_ms): always appends an event with kind='report'; does NOT mutate the current state (only active polling is authoritative)
- summary() returns events normalized to the new schema
Wiring:
- /api/status now calls record_state for vllm/parakeet/magpie (dedup on no-change)
- /api/services calls record_state for each service after its http check
- Result: dashboard observes service-level transitions automatically with no extra polling
Passive endpoint:
- POST /api/health-event with {service, ok, source?, error?, ms?}
- Useful for external apps (e.g. Open WebUI) to surface sub-poll-interval failures the dashboard would otherwise miss
UI:
- Connectivity dialog groups events by subject (hosts ordered first, then services)
- Per-subject summary shows transition count, down count, report count, failed-report count
- Transitions and reports render inline with distinct styling; reports show source app + error + latency
- Legacy v0.5 events render unchanged
Docs:
- README documents /api/health-event with a curl example
Package: bump to 0.6.0:0
191 lines
5.8 KiB
Python
191 lines
5.8 KiB
Python
"""Track up/down transitions for any subject (Sparks AND services) and cache MACs.
|
|
|
|
Persisted to /data/connectivity.json. Schema:
|
|
|
|
{
|
|
"macs": { "spark1": "aa:bb:..", "spark2": "11:22:.." },
|
|
"current": { "spark1": "up", "parakeet": "up", "magpie": "down", ... },
|
|
"last_change": { ... },
|
|
"events": [
|
|
# Active-probe transition (logged when state flips during polling)
|
|
{ "subject": "spark2", "at": "...", "kind": "transition",
|
|
"transition": "down" },
|
|
{ "subject": "spark2", "at": "...", "kind": "transition",
|
|
"transition": "up", "down_seconds": 4500 },
|
|
|
|
# Passive report (logged whenever an external app POSTs to
|
|
# /api/health-event regardless of state change)
|
|
{ "subject": "parakeet", "at": "...", "kind": "report",
|
|
"ok": false, "source": "open-webui",
|
|
"detail": "Connection refused", "latency_ms": 320 },
|
|
]
|
|
}
|
|
|
|
Legacy events from v0.5 with `spark` instead of `subject` and no `kind` field
|
|
are read transparently as kind="transition".
|
|
"""
|
|
from __future__ import annotations
|
|
import json
|
|
import os
|
|
import threading
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
|
|
MAX_EVENTS = 200 # rolling window — plenty for showing recent history
|
|
|
|
|
|
def _path() -> str:
|
|
return os.environ.get("CONNECTIVITY_LOG", "/data/connectivity.json")
|
|
|
|
|
|
_lock = threading.Lock()
|
|
|
|
|
|
def _read() -> dict:
|
|
try:
|
|
with open(_path()) as f:
|
|
return json.load(f) or {}
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
return {}
|
|
|
|
|
|
def _write(data: dict) -> None:
|
|
p = _path()
|
|
Path(p).parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = p + ".tmp"
|
|
with open(tmp, "w") as f:
|
|
json.dump(data, f, indent=2, sort_keys=False)
|
|
os.replace(tmp, p)
|
|
|
|
|
|
def load() -> dict:
|
|
with _lock:
|
|
d = _read()
|
|
d.setdefault("macs", {})
|
|
d.setdefault("current", {})
|
|
d.setdefault("last_change", {})
|
|
d.setdefault("events", [])
|
|
return d
|
|
|
|
|
|
def record_mac(subject: str, mac: Optional[str]) -> None:
|
|
if not mac:
|
|
return
|
|
with _lock:
|
|
d = _read()
|
|
d.setdefault("macs", {})
|
|
if d["macs"].get(subject) != mac:
|
|
d["macs"][subject] = mac
|
|
_write(d)
|
|
|
|
|
|
def record_state(subject: str, reachable: bool) -> Optional[dict]:
|
|
"""Update current state for `subject`. If it differs from the last seen
|
|
state, append a transition event. Returns the event dict if a transition
|
|
was recorded, else None.
|
|
|
|
`subject` can be a Spark host key (spark1/spark2) or a service name
|
|
(parakeet/magpie/vllm).
|
|
"""
|
|
new_state = "up" if reachable else "down"
|
|
now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
|
with _lock:
|
|
d = _read()
|
|
d.setdefault("macs", {})
|
|
d.setdefault("current", {})
|
|
d.setdefault("last_change", {})
|
|
d.setdefault("events", [])
|
|
prev = d["current"].get(subject)
|
|
if prev == new_state:
|
|
return None
|
|
event: dict = {
|
|
"subject": subject,
|
|
"at": now,
|
|
"kind": "transition",
|
|
"transition": new_state,
|
|
}
|
|
# When we have a previous state and timestamp, compute duration
|
|
last_change = d["last_change"].get(subject)
|
|
if prev and last_change:
|
|
try:
|
|
prev_dt = datetime.fromisoformat(last_change.replace("Z", "+00:00"))
|
|
duration = (datetime.now(timezone.utc) - prev_dt).total_seconds()
|
|
if prev == "down" and new_state == "up":
|
|
event["down_seconds"] = round(duration)
|
|
if prev == "up" and new_state == "down":
|
|
event["up_seconds"] = round(duration)
|
|
except ValueError:
|
|
pass
|
|
d["current"][subject] = new_state
|
|
d["last_change"][subject] = now
|
|
d["events"].append(event)
|
|
if len(d["events"]) > MAX_EVENTS:
|
|
d["events"] = d["events"][-MAX_EVENTS:]
|
|
_write(d)
|
|
return event
|
|
|
|
|
|
def record_report(
|
|
subject: str,
|
|
*,
|
|
ok: bool,
|
|
source: str = "external",
|
|
detail: str = "",
|
|
latency_ms: Optional[int] = None,
|
|
) -> dict:
|
|
"""Record a passive report from an external caller (e.g. Open WebUI got a
|
|
503 calling Parakeet). Always appended to the events list; does NOT change
|
|
the active-probe state (which only the polling probe is authoritative on).
|
|
"""
|
|
now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
|
with _lock:
|
|
d = _read()
|
|
d.setdefault("events", [])
|
|
event: dict = {
|
|
"subject": subject,
|
|
"at": now,
|
|
"kind": "report",
|
|
"ok": bool(ok),
|
|
"source": source or "external",
|
|
}
|
|
if detail:
|
|
event["detail"] = detail
|
|
if latency_ms is not None:
|
|
event["latency_ms"] = int(latency_ms)
|
|
d["events"].append(event)
|
|
if len(d["events"]) > MAX_EVENTS:
|
|
d["events"] = d["events"][-MAX_EVENTS:]
|
|
_write(d)
|
|
return event
|
|
|
|
|
|
def get_mac(subject: str) -> Optional[str]:
|
|
d = load()
|
|
return d.get("macs", {}).get(subject)
|
|
|
|
|
|
def _normalize_event(e: dict) -> dict:
|
|
"""Promote legacy v0.5 events to the v0.6 shape so the UI sees one schema."""
|
|
if "subject" in e:
|
|
e.setdefault("kind", "transition")
|
|
return e
|
|
# Legacy: had "spark" + "transition" only
|
|
if "spark" in e:
|
|
e["subject"] = e.pop("spark")
|
|
e.setdefault("kind", "transition")
|
|
return e
|
|
|
|
|
|
def summary() -> dict:
|
|
"""Compact summary for the UI: known MACs, current state, recent events."""
|
|
d = load()
|
|
events = [_normalize_event(dict(e)) for e in d.get("events", [])]
|
|
return {
|
|
"macs": d.get("macs", {}),
|
|
"current": d.get("current", {}),
|
|
"last_change": d.get("last_change", {}),
|
|
"events": events[-80:],
|
|
}
|