8d839e3714
- Add redaction gateway (redaction_gateway.py, redaction/ scrub + tests) - Add embeddings proxy and spark_embed service (Dockerfile + main.py) - Expand audio_proxy with speaker-aware handling; deep_health/health/server updates - Package: configureSparks action + sparkConfig model updates, manifest/main wiring - Docs: AUDIO_API, EMBEDDINGS, REDACTION_GATEWAY; HANDOFF and runbook/known-issues refresh
131 lines
5.6 KiB
Python
131 lines
5.6 KiB
Python
"""OpenAI-compatible chat-completions proxy that forwards to the vLLM
|
|
process currently running on Spark 1.
|
|
|
|
Lets clients (Open WebUI, custom apps, etc.) use a single Spark Control
|
|
host for everything — same TLS cert, same allowlist, same place to add
|
|
rate limiting/observability later — instead of having to also reach
|
|
into <spark1-host>:8888 directly.
|
|
|
|
Endpoints:
|
|
POST /v1/chat/completions — OpenAI chat completions (streams when stream=true)
|
|
POST /v1/completions — OpenAI legacy completions (also stream-capable)
|
|
|
|
The proxy is intentionally dumb: forward the request body, stream the
|
|
response back. We don't parse or transform the OpenAI payload — vLLM
|
|
already speaks the same shape, and adding any transformation here would
|
|
create skew with the official OpenAI clients.
|
|
"""
|
|
from __future__ import annotations
|
|
import json
|
|
import logging
|
|
from typing import AsyncIterator
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, HTTPException, Request
|
|
from fastapi.responses import Response, StreamingResponse
|
|
|
|
from .config import Settings
|
|
|
|
logger = logging.getLogger("spark-control.llm")
|
|
|
|
|
|
# vLLM gets long for big-context completions; cap at 30 min to be safe.
|
|
DEFAULT_TIMEOUT = 1800.0
|
|
|
|
|
|
def build_router(settings: Settings) -> APIRouter:
|
|
router = APIRouter()
|
|
|
|
def _vllm_url(suffix: str) -> str:
|
|
return f"http://{settings.spark1_host}:{settings.vllm_port}{suffix}"
|
|
|
|
async def _proxy(request: Request, upstream_suffix: str) -> Response:
|
|
if not settings.spark1_host:
|
|
raise HTTPException(503, "Spark 1 host not configured")
|
|
body = await request.body()
|
|
# Determine whether the client requested streaming. vLLM returns SSE if
|
|
# stream=true; otherwise a single JSON object. We must stream when the
|
|
# client asked, otherwise FastAPI would buffer the entire response and
|
|
# block until vLLM finishes generating (defeats the point of streaming).
|
|
is_stream = False
|
|
try:
|
|
parsed = json.loads(body) if body else {}
|
|
is_stream = bool(parsed.get("stream"))
|
|
except Exception:
|
|
pass
|
|
|
|
# Forward content-type + accept headers; strip hop-by-hop headers.
|
|
fwd_headers = {
|
|
"Content-Type": request.headers.get("content-type", "application/json"),
|
|
}
|
|
if (accept := request.headers.get("accept")):
|
|
fwd_headers["Accept"] = accept
|
|
|
|
url = _vllm_url(upstream_suffix)
|
|
|
|
if is_stream:
|
|
# Stream the upstream response back chunk-by-chunk. We hold the
|
|
# httpx connection open for the lifetime of the stream.
|
|
async def passthrough() -> AsyncIterator[bytes]:
|
|
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
|
|
try:
|
|
async with client.stream(
|
|
"POST", url, content=body, headers=fwd_headers
|
|
) as r:
|
|
if r.status_code != 200:
|
|
err_body = await r.aread()
|
|
logger.warning(
|
|
"vllm %s returned %s: %s",
|
|
upstream_suffix, r.status_code, err_body[:300]
|
|
)
|
|
# Emit a single SSE error event so the client's
|
|
# parser doesn't just hang on an empty stream.
|
|
yield (
|
|
f"event: error\ndata: "
|
|
f"{json.dumps({'status': r.status_code, 'detail': err_body[:500].decode(errors='replace')})}\n\n"
|
|
).encode()
|
|
return
|
|
async for chunk in r.aiter_raw():
|
|
yield chunk
|
|
except httpx.HTTPError as e:
|
|
logger.exception("vllm stream failed: %s", e)
|
|
yield (
|
|
f"event: error\ndata: "
|
|
f"{json.dumps({'detail': f'vllm unreachable: {e}'})}\n\n"
|
|
).encode()
|
|
|
|
return StreamingResponse(passthrough(), media_type="text/event-stream")
|
|
|
|
# Non-streaming: one POST, return the body verbatim.
|
|
try:
|
|
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
|
|
r = await client.post(url, content=body, headers=fwd_headers)
|
|
except httpx.HTTPError as e:
|
|
raise HTTPException(502, f"vllm unreachable: {e}")
|
|
return Response(
|
|
content=r.content,
|
|
status_code=r.status_code,
|
|
media_type=r.headers.get("content-type", "application/json"),
|
|
)
|
|
|
|
@router.post("/v1/chat/completions")
|
|
async def chat_completions(request: Request) -> Response:
|
|
"""OpenAI chat-completions, forwarded to the vLLM on Spark 1.
|
|
|
|
Request body is passed through unchanged — anything vLLM understands
|
|
works here (model, messages, max_tokens, temperature, response_format,
|
|
chat_template_kwargs, tools, tool_choice, ...).
|
|
|
|
Streaming: set `stream: true` in the request body and we'll stream the
|
|
SSE response from vLLM back through this proxy. Default 30-min timeout
|
|
per request to accommodate large-context completions.
|
|
"""
|
|
return await _proxy(request, "/v1/chat/completions")
|
|
|
|
@router.post("/v1/completions")
|
|
async def completions(request: Request) -> Response:
|
|
"""OpenAI legacy completions, forwarded to the vLLM on Spark 1."""
|
|
return await _proxy(request, "/v1/completions")
|
|
|
|
return router
|