"""Track Spark up/down transitions and cache discovered MAC addresses. Persisted to /data/connectivity.json so history survives package restarts: { "macs": { "spark1": "aa:bb:..", "spark2": "11:22:.." }, "current": { "spark1": "up", "spark2": "down" }, "last_change": { "spark1": "2026-05-12T15:00:00Z", ... }, "events": [ { "spark": "spark2", "at": "2026-05-12T17:30:00Z", "transition": "down" }, { "spark": "spark2", "at": "2026-05-12T18:45:00Z", "transition": "up", "down_seconds": 4500 }, ... ] } """ from __future__ import annotations import json import os import threading from datetime import datetime, timezone from pathlib import Path from typing import Optional MAX_EVENTS = 200 # rolling window — plenty for showing recent history def _path() -> str: return os.environ.get("CONNECTIVITY_LOG", "/data/connectivity.json") _lock = threading.Lock() def _read() -> dict: try: with open(_path()) as f: return json.load(f) or {} except (FileNotFoundError, json.JSONDecodeError): return {} def _write(data: dict) -> None: p = _path() Path(p).parent.mkdir(parents=True, exist_ok=True) tmp = p + ".tmp" with open(tmp, "w") as f: json.dump(data, f, indent=2, sort_keys=False) os.replace(tmp, p) def load() -> dict: with _lock: d = _read() d.setdefault("macs", {}) d.setdefault("current", {}) d.setdefault("last_change", {}) d.setdefault("events", []) return d def record_mac(spark: str, mac: Optional[str]) -> None: if not mac: return with _lock: d = _read() d.setdefault("macs", {}) if d["macs"].get(spark) != mac: d["macs"][spark] = mac _write(d) def record_state(spark: str, reachable: bool) -> Optional[dict]: """Update current state. If it differs from the last seen state, append an event. Returns the event dict if a transition was recorded, else None. """ new_state = "up" if reachable else "down" now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") with _lock: d = _read() d.setdefault("macs", {}) d.setdefault("current", {}) d.setdefault("last_change", {}) d.setdefault("events", []) prev = d["current"].get(spark) if prev == new_state: return None event: dict = {"spark": spark, "at": now, "transition": new_state} # When we have a previous state and timestamp, compute duration last_change = d["last_change"].get(spark) if prev and last_change: try: prev_dt = datetime.fromisoformat(last_change.replace("Z", "+00:00")) duration = (datetime.now(timezone.utc) - prev_dt).total_seconds() if prev == "down" and new_state == "up": event["down_seconds"] = round(duration) if prev == "up" and new_state == "down": event["up_seconds"] = round(duration) except ValueError: pass d["current"][spark] = new_state d["last_change"][spark] = now d["events"].append(event) # Keep rolling window if len(d["events"]) > MAX_EVENTS: d["events"] = d["events"][-MAX_EVENTS:] _write(d) return event def get_mac(spark: str) -> Optional[str]: d = load() return d.get("macs", {}).get(spark) def summary() -> dict: """Compact summary for the UI: known MACs, current state, recent events.""" d = load() events = d.get("events", []) return { "macs": d.get("macs", {}), "current": d.get("current", {}), "last_change": d.get("last_change", {}), "events": events[-50:], }