"""Track up/down transitions for any subject (Sparks AND services) and cache MACs. Persisted to /data/connectivity.json. Schema: { "macs": { "spark1": "aa:bb:..", "spark2": "11:22:.." }, "current": { "spark1": "up", "parakeet": "up", "kokoro": "up", ... }, "last_change": { ... }, "events": [ # Active-probe transition (logged when state flips during polling) { "subject": "spark2", "at": "...", "kind": "transition", "transition": "down" }, { "subject": "spark2", "at": "...", "kind": "transition", "transition": "up", "down_seconds": 4500 }, # Passive report (logged whenever an external app POSTs to # /api/health-event regardless of state change) { "subject": "parakeet", "at": "...", "kind": "report", "ok": false, "source": "open-webui", "detail": "Connection refused", "latency_ms": 320 }, ] } Legacy events from v0.5 with `spark` instead of `subject` and no `kind` field are read transparently as kind="transition". """ from __future__ import annotations import json import os import threading from datetime import datetime, timezone from pathlib import Path from typing import Optional MAX_EVENTS = 200 # rolling window — plenty for showing recent history def _path() -> str: return os.environ.get("CONNECTIVITY_LOG", "/data/connectivity.json") _lock = threading.Lock() def _read() -> dict: try: with open(_path()) as f: return json.load(f) or {} except (FileNotFoundError, json.JSONDecodeError): return {} def _write(data: dict) -> None: p = _path() Path(p).parent.mkdir(parents=True, exist_ok=True) tmp = p + ".tmp" with open(tmp, "w") as f: json.dump(data, f, indent=2, sort_keys=False) os.replace(tmp, p) def load() -> dict: with _lock: d = _read() d.setdefault("macs", {}) d.setdefault("current", {}) d.setdefault("last_change", {}) d.setdefault("events", []) return d def record_mac(subject: str, mac: Optional[str]) -> None: if not mac: return with _lock: d = _read() d.setdefault("macs", {}) if d["macs"].get(subject) != mac: d["macs"][subject] = mac _write(d) def record_state(subject: str, reachable: bool) -> Optional[dict]: """Update current state for `subject`. If it differs from the last seen state, append a transition event. Returns the event dict if a transition was recorded, else None. `subject` can be a Spark host key (spark1/spark2) or a service name (parakeet/kokoro/vllm). """ new_state = "up" if reachable else "down" now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") with _lock: d = _read() d.setdefault("macs", {}) d.setdefault("current", {}) d.setdefault("last_change", {}) d.setdefault("events", []) prev = d["current"].get(subject) if prev == new_state: return None event: dict = { "subject": subject, "at": now, "kind": "transition", "transition": new_state, } # When we have a previous state and timestamp, compute duration last_change = d["last_change"].get(subject) if prev and last_change: try: prev_dt = datetime.fromisoformat(last_change.replace("Z", "+00:00")) duration = (datetime.now(timezone.utc) - prev_dt).total_seconds() if prev == "down" and new_state == "up": event["down_seconds"] = round(duration) if prev == "up" and new_state == "down": event["up_seconds"] = round(duration) except ValueError: pass d["current"][subject] = new_state d["last_change"][subject] = now d["events"].append(event) if len(d["events"]) > MAX_EVENTS: d["events"] = d["events"][-MAX_EVENTS:] _write(d) return event def record_report( subject: str, *, ok: bool, source: str = "external", detail: str = "", latency_ms: Optional[int] = None, ) -> dict: """Record a passive report from an external caller (e.g. Open WebUI got a 503 calling Parakeet). Always appended to the events list; does NOT change the active-probe state (which only the polling probe is authoritative on). """ now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") with _lock: d = _read() d.setdefault("events", []) event: dict = { "subject": subject, "at": now, "kind": "report", "ok": bool(ok), "source": source or "external", } if detail: event["detail"] = detail if latency_ms is not None: event["latency_ms"] = int(latency_ms) d["events"].append(event) if len(d["events"]) > MAX_EVENTS: d["events"] = d["events"][-MAX_EVENTS:] _write(d) return event def get_mac(subject: str) -> Optional[str]: d = load() return d.get("macs", {}).get(subject) def _normalize_event(e: dict) -> dict: """Promote legacy v0.5 events to the v0.6 shape so the UI sees one schema.""" if "subject" in e: e.setdefault("kind", "transition") return e # Legacy: had "spark" + "transition" only if "spark" in e: e["subject"] = e.pop("spark") e.setdefault("kind", "transition") return e def summary() -> dict: """Compact summary for the UI: known MACs, current state, recent events.""" d = load() events = [_normalize_event(dict(e)) for e in d.get("events", [])] return { "macs": d.get("macs", {}), "current": d.get("current", {}), "last_change": d.get("last_change", {}), "events": events[-80:], }