Files
spark-control/image/app/audio_proxy.py
T
Keysat 197655a62b v0.9.0:2 - audio proxy: turn Parakeet wedge 500 into clean 503 + immediate auto-restart
Parakeet's recurring CUDA wedge (CUBLAS_STATUS_*_ERROR mid-attention)
fires reliably on Open WebUI's WebM/Opus->MP3 audio. Previously the
proxy relayed the upstream 500 verbatim, Open WebUI showed "Server
connection error" with no signal to retry, and recovery took up to
5 minutes (waiting for the next periodic deep-health probe).

Now the proxy:
  1. Detects 500 from /v1/audio/transcriptions
  2. Fires deep_health.run_one("parakeet") as a background asyncio task
     (which contains the same wedge-detect + rate-limited auto-restart
     logic, but runs immediately instead of waiting for the next tick)
  3. Returns 503 with a clear detail message and Retry-After: 60

The client (Open WebUI, Home Assistant, etc.) gets a proper retry
signal; the auto-restart triggers inside seconds; the next attempt
~60s later succeeds. Rate-limiting (3 restarts per 30 min) is
inherited from the deep-health module so this can't cause restart
storms.

server.py: pass deep_health into build_audio_router().
audio_proxy.py: new 503-with-restart branch; signature now accepts
                deep_health as an optional dependency.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 18:07:35 -05:00

213 lines
9.1 KiB
Python

"""OpenAI-compatible audio proxy: lets any OpenAI-shaped client (Open WebUI,
Home Assistant, etc.) talk to Parakeet (STT) and Magpie (TTS) through one URL.
Endpoints exposed on spark-control's port (same as the dashboard):
GET /v1/models — lists STT model + Magpie voices in OpenAI shape
POST /v1/audio/speech — OpenAI TTS → Magpie /v1/audio/synthesize
POST /v1/audio/transcriptions — forward to Parakeet (already OpenAI-compatible)
Both downstream services already speak HTTP on the LAN; this module just adapts
request/response shapes so OpenAI clients don't need a custom integration.
When Parakeet returns a 500 (commonly the recurring CUDA wedge), the proxy
returns a clearer 503 with Retry-After=60, and fires the deep-health probe in
the background — which detects the wedge and triggers a rate-limited container
restart inside seconds. The client's next attempt ~60s later then succeeds.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any, Optional
import httpx
from fastapi import APIRouter, Form, HTTPException, Request, UploadFile, File
from fastapi.responses import Response, StreamingResponse
from pydantic import BaseModel
from .config import Settings
logger = logging.getLogger("spark-control.audio")
# Magpie voice name encodes its language. Example:
# Magpie-Multilingual.EN-US.Mia -> en-US
# Magpie-Multilingual.ES-US.Diego -> es-US
# Magpie-Multilingual.FR-FR.Pascal -> fr-FR
def _lang_from_voice(voice: str) -> str:
try:
parts = voice.split(".")
# parts = ["Magpie-Multilingual", "EN-US", "Mia"] (or with emotion suffix)
if len(parts) >= 2 and "-" in parts[1]:
lang_part = parts[1] # "EN-US"
primary, region = lang_part.split("-", 1)
return f"{primary.lower()}-{region.upper()}"
except Exception:
pass
return "en-US"
# Default voice: configurable, falls back to a sensible English voice if unset.
DEFAULT_VOICE = "Magpie-Multilingual.EN-US.Mia"
class SpeechRequest(BaseModel):
"""OpenAI /v1/audio/speech request body."""
model: Optional[str] = None # ignored — Magpie has one model
input: str # the text to speak
voice: Optional[str] = None # e.g. "Magpie-Multilingual.EN-US.Mia"
response_format: Optional[str] = "wav" # only "wav" supported today
speed: Optional[float] = 1.0 # ignored by Magpie
# Magpie-specific extensions (clients may pass these through)
language: Optional[str] = None
sample_rate_hz: Optional[int] = 22050
encoding: Optional[str] = "LINEAR_PCM"
def build_router(settings: Settings, deep_health: Any = None) -> APIRouter:
"""Build the audio proxy router.
If `deep_health` is provided, 500s from Parakeet trigger an immediate
background probe (which contains the same wedge-detect → auto-restart
logic as the 5-minute periodic loop, but fires now instead of waiting).
"""
router = APIRouter()
def _parakeet_base() -> str:
return f"http://{settings.parakeet_host}:{settings.parakeet_port}"
def _magpie_base() -> str:
return f"http://{settings.magpie_host}:{settings.magpie_port}"
# ---- /v1/models ----
@router.get("/v1/models")
async def list_models() -> dict:
"""Advertise the STT model + a small voice menu so clients can
populate their voice-picker UIs. Falls back gracefully if Magpie
is offline (returns just the STT entry)."""
data: list[dict] = [
{
"id": "parakeet-tdt-0.6b-v3",
"object": "model",
"owned_by": "nvidia",
"kind": "stt",
},
]
# Try to enumerate voices from Magpie; if unreachable, just skip.
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.get(f"{_magpie_base()}/v1/audio/list_voices")
if r.status_code == 200:
voices_by_locales = r.json()
seen = set()
for _locales, payload in voices_by_locales.items():
for v in payload.get("voices", []):
# Collapse emotion variants — expose only the base voice name.
# "Magpie-Multilingual.EN-US.Mia.Angry" -> "Magpie-Multilingual.EN-US.Mia"
parts = v.split(".")
base = ".".join(parts[:3]) if len(parts) >= 3 else v
if base not in seen:
seen.add(base)
data.append({
"id": base,
"object": "model",
"owned_by": "nvidia",
"kind": "tts",
})
except Exception as e:
logger.warning("magpie voice list unavailable: %s", e)
return {"object": "list", "data": data}
# ---- /v1/audio/speech (TTS) ----
@router.post("/v1/audio/speech")
async def speech(body: SpeechRequest) -> Response:
"""OpenAI-style TTS. Translates to Magpie's multipart synth call.
Returns raw WAV bytes (Content-Type: audio/wav) — browsers and most
clients play these directly.
"""
text = (body.input or "").strip()
if not text:
raise HTTPException(400, "input text is required")
voice = body.voice or DEFAULT_VOICE
language = body.language or _lang_from_voice(voice)
sample_rate = int(body.sample_rate_hz or 22050)
encoding = body.encoding or "LINEAR_PCM"
form = {
"text": text,
"language": language,
"voice": voice,
"sample_rate_hz": str(sample_rate),
"encoding": encoding,
}
try:
async with httpx.AsyncClient(timeout=120.0) as client:
r = await client.post(f"{_magpie_base()}/v1/audio/synthesize", data=form)
except httpx.HTTPError as e:
raise HTTPException(502, f"magpie unreachable: {e}")
if r.status_code != 200:
# Surface Magpie's error message verbatim so clients can debug voice/lang typos.
raise HTTPException(r.status_code, r.text[:500])
# Magpie returns WAV bytes already (Content-Type: audio/wav). Pass through.
media_type = r.headers.get("content-type", "audio/wav")
return Response(content=r.content, media_type=media_type)
# ---- /v1/audio/transcriptions (STT) ----
@router.post("/v1/audio/transcriptions")
async def transcriptions(
file: UploadFile = File(...),
model: Optional[str] = Form(default=None),
language: Optional[str] = Form(default=None),
prompt: Optional[str] = Form(default=None),
response_format: Optional[str] = Form(default="json"),
temperature: Optional[float] = Form(default=None),
) -> Response:
"""Forward to Parakeet's already-OpenAI-compatible endpoint.
We relay rather than redirect so clients only need to know one URL
(spark-control's) — and so any future client-side rewrites of the
request shape (e.g. translating Whisper-format params) happen here.
"""
body = await file.read()
files = {"file": (file.filename or "audio.wav", body, file.content_type or "application/octet-stream")}
data: dict[str, str] = {}
if model: data["model"] = model
if language: data["language"] = language
if prompt: data["prompt"] = prompt
if response_format: data["response_format"] = response_format
if temperature is not None: data["temperature"] = str(temperature)
try:
async with httpx.AsyncClient(timeout=300.0) as client:
r = await client.post(
f"{_parakeet_base()}/v1/audio/transcriptions",
files=files, data=data,
)
except httpx.HTTPError as e:
raise HTTPException(502, f"parakeet unreachable: {e}")
if r.status_code == 500:
# Parakeet 500s are almost always the CUDA wedge (CUBLAS_*_ERROR
# mid-attention). Kick deep-health to detect+restart in the
# background, and return a clean retry signal to the client.
err_snippet = r.text[:400]
logger.warning("parakeet 500 — firing deep-health probe in background. detail=%s", err_snippet)
if deep_health is not None:
try:
asyncio.create_task(deep_health.run_one("parakeet"))
except Exception as e:
logger.error("failed to schedule deep-health probe: %s", e)
raise HTTPException(
status_code=503,
detail="Parakeet returned a transient error (likely CUDA wedge). Auto-restart triggered; retry in ~60s.",
headers={"Retry-After": "60"},
)
if r.status_code != 200:
raise HTTPException(r.status_code, r.text[:500])
return Response(content=r.content, media_type=r.headers.get("content-type", "application/json"))
return router