Files
Keysat 8d839e3714 v0.13.0:4 - redaction gateway, embeddings proxy, expanded audio API
- Add redaction gateway (redaction_gateway.py, redaction/ scrub + tests)
- Add embeddings proxy and spark_embed service (Dockerfile + main.py)
- Expand audio_proxy with speaker-aware handling; deep_health/health/server updates
- Package: configureSparks action + sparkConfig model updates, manifest/main wiring
- Docs: AUDIO_API, EMBEDDINGS, REDACTION_GATEWAY; HANDOFF and runbook/known-issues refresh
2026-06-11 17:45:57 -05:00

131 lines
5.6 KiB
Python

"""OpenAI-compatible chat-completions proxy that forwards to the vLLM
process currently running on Spark 1.
Lets clients (Open WebUI, custom apps, etc.) use a single Spark Control
host for everything — same TLS cert, same allowlist, same place to add
rate limiting/observability later — instead of having to also reach
into <spark1-host>:8888 directly.
Endpoints:
POST /v1/chat/completions — OpenAI chat completions (streams when stream=true)
POST /v1/completions — OpenAI legacy completions (also stream-capable)
The proxy is intentionally dumb: forward the request body, stream the
response back. We don't parse or transform the OpenAI payload — vLLM
already speaks the same shape, and adding any transformation here would
create skew with the official OpenAI clients.
"""
from __future__ import annotations
import json
import logging
from typing import AsyncIterator
import httpx
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import Response, StreamingResponse
from .config import Settings
logger = logging.getLogger("spark-control.llm")
# vLLM gets long for big-context completions; cap at 30 min to be safe.
DEFAULT_TIMEOUT = 1800.0
def build_router(settings: Settings) -> APIRouter:
router = APIRouter()
def _vllm_url(suffix: str) -> str:
return f"http://{settings.spark1_host}:{settings.vllm_port}{suffix}"
async def _proxy(request: Request, upstream_suffix: str) -> Response:
if not settings.spark1_host:
raise HTTPException(503, "Spark 1 host not configured")
body = await request.body()
# Determine whether the client requested streaming. vLLM returns SSE if
# stream=true; otherwise a single JSON object. We must stream when the
# client asked, otherwise FastAPI would buffer the entire response and
# block until vLLM finishes generating (defeats the point of streaming).
is_stream = False
try:
parsed = json.loads(body) if body else {}
is_stream = bool(parsed.get("stream"))
except Exception:
pass
# Forward content-type + accept headers; strip hop-by-hop headers.
fwd_headers = {
"Content-Type": request.headers.get("content-type", "application/json"),
}
if (accept := request.headers.get("accept")):
fwd_headers["Accept"] = accept
url = _vllm_url(upstream_suffix)
if is_stream:
# Stream the upstream response back chunk-by-chunk. We hold the
# httpx connection open for the lifetime of the stream.
async def passthrough() -> AsyncIterator[bytes]:
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
try:
async with client.stream(
"POST", url, content=body, headers=fwd_headers
) as r:
if r.status_code != 200:
err_body = await r.aread()
logger.warning(
"vllm %s returned %s: %s",
upstream_suffix, r.status_code, err_body[:300]
)
# Emit a single SSE error event so the client's
# parser doesn't just hang on an empty stream.
yield (
f"event: error\ndata: "
f"{json.dumps({'status': r.status_code, 'detail': err_body[:500].decode(errors='replace')})}\n\n"
).encode()
return
async for chunk in r.aiter_raw():
yield chunk
except httpx.HTTPError as e:
logger.exception("vllm stream failed: %s", e)
yield (
f"event: error\ndata: "
f"{json.dumps({'detail': f'vllm unreachable: {e}'})}\n\n"
).encode()
return StreamingResponse(passthrough(), media_type="text/event-stream")
# Non-streaming: one POST, return the body verbatim.
try:
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
r = await client.post(url, content=body, headers=fwd_headers)
except httpx.HTTPError as e:
raise HTTPException(502, f"vllm unreachable: {e}")
return Response(
content=r.content,
status_code=r.status_code,
media_type=r.headers.get("content-type", "application/json"),
)
@router.post("/v1/chat/completions")
async def chat_completions(request: Request) -> Response:
"""OpenAI chat-completions, forwarded to the vLLM on Spark 1.
Request body is passed through unchanged — anything vLLM understands
works here (model, messages, max_tokens, temperature, response_format,
chat_template_kwargs, tools, tool_choice, ...).
Streaming: set `stream: true` in the request body and we'll stream the
SSE response from vLLM back through this proxy. Default 30-min timeout
per request to accommodate large-context completions.
"""
return await _proxy(request, "/v1/chat/completions")
@router.post("/v1/completions")
async def completions(request: Request) -> Response:
"""OpenAI legacy completions, forwarded to the vLLM on Spark 1."""
return await _proxy(request, "/v1/completions")
return router