from __future__ import annotations import httpx from .config import Settings _TIMEOUT = 3.0 async def check_vllm(settings: Settings) -> dict: base_url = ( f"http://{settings.spark1_host}:{settings.vllm_port}/v1" if settings.spark1_host else None ) if not settings.spark1_host: return {"ok": False, "error": "spark1 not configured", "base_url": base_url} try: async with httpx.AsyncClient(timeout=_TIMEOUT) as c: r = await c.get(f"http://{settings.spark1_host}:{settings.vllm_port}/v1/models") r.raise_for_status() ids = [m["id"] for m in r.json().get("data", [])] return { "ok": True, "current_model": ids[0] if ids else None, "all": ids, "base_url": base_url, } except Exception as e: return {"ok": False, "error": str(e), "base_url": base_url} async def check_parakeet(settings: Settings) -> dict: base_url = ( f"http://{settings.parakeet_host}:{settings.parakeet_port}" if settings.parakeet_host else None ) if not settings.parakeet_host: return {"ok": False, "error": "parakeet host not configured", "base_url": base_url} try: async with httpx.AsyncClient(timeout=_TIMEOUT) as c: r = await c.get(f"http://{settings.parakeet_host}:{settings.parakeet_port}/health") r.raise_for_status() return {"ok": True, "detail": r.json(), "base_url": base_url} except Exception as e: return {"ok": False, "error": str(e), "base_url": base_url} async def check_magpie(settings: Settings) -> dict: base_url = ( f"http://{settings.magpie_host}:{settings.magpie_port}" if settings.magpie_host else None ) if not settings.magpie_host: return {"ok": False, "error": "magpie host not configured", "base_url": base_url} try: async with httpx.AsyncClient(timeout=_TIMEOUT) as c: r = await c.get(f"http://{settings.magpie_host}:{settings.magpie_port}/v1/health/ready") r.raise_for_status() return { "ok": True, "detail": r.json() if r.headers.get("content-type", "").startswith("application/json") else r.text, "base_url": base_url, } except Exception as e: return {"ok": False, "error": str(e), "base_url": base_url}