diff --git a/image/app/llm_proxy.py b/image/app/llm_proxy.py new file mode 100644 index 0000000..2b8f908 --- /dev/null +++ b/image/app/llm_proxy.py @@ -0,0 +1,130 @@ +"""OpenAI-compatible chat-completions proxy that forwards to the vLLM +process currently running on Spark 1. + +Lets clients (recap-relay, Open WebUI, etc.) use a single Spark Control +host for everything — same TLS cert, same allowlist, same place to add +rate limiting/observability later — instead of having to also reach +into :8888 directly. + +Endpoints: + POST /v1/chat/completions — OpenAI chat completions (streams when stream=true) + POST /v1/completions — OpenAI legacy completions (also stream-capable) + +The proxy is intentionally dumb: forward the request body, stream the +response back. We don't parse or transform the OpenAI payload — vLLM +already speaks the same shape, and adding any transformation here would +create skew with the official OpenAI clients. +""" +from __future__ import annotations +import json +import logging +from typing import AsyncIterator + +import httpx +from fastapi import APIRouter, HTTPException, Request +from fastapi.responses import Response, StreamingResponse + +from .config import Settings + +logger = logging.getLogger("spark-control.llm") + + +# vLLM gets long for big-context completions; cap at 30 min to be safe. +DEFAULT_TIMEOUT = 1800.0 + + +def build_router(settings: Settings) -> APIRouter: + router = APIRouter() + + def _vllm_url(suffix: str) -> str: + return f"http://{settings.spark1_host}:{settings.vllm_port}{suffix}" + + async def _proxy(request: Request, upstream_suffix: str) -> Response: + if not settings.spark1_host: + raise HTTPException(503, "Spark 1 host not configured") + body = await request.body() + # Determine whether the client requested streaming. vLLM returns SSE if + # stream=true; otherwise a single JSON object. We must stream when the + # client asked, otherwise FastAPI would buffer the entire response and + # block until vLLM finishes generating (defeats the point of streaming). + is_stream = False + try: + parsed = json.loads(body) if body else {} + is_stream = bool(parsed.get("stream")) + except Exception: + pass + + # Forward content-type + accept headers; strip hop-by-hop headers. + fwd_headers = { + "Content-Type": request.headers.get("content-type", "application/json"), + } + if (accept := request.headers.get("accept")): + fwd_headers["Accept"] = accept + + url = _vllm_url(upstream_suffix) + + if is_stream: + # Stream the upstream response back chunk-by-chunk. We hold the + # httpx connection open for the lifetime of the stream. + async def passthrough() -> AsyncIterator[bytes]: + async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: + try: + async with client.stream( + "POST", url, content=body, headers=fwd_headers + ) as r: + if r.status_code != 200: + err_body = await r.aread() + logger.warning( + "vllm %s returned %s: %s", + upstream_suffix, r.status_code, err_body[:300] + ) + # Emit a single SSE error event so the client's + # parser doesn't just hang on an empty stream. + yield ( + f"event: error\ndata: " + f"{json.dumps({'status': r.status_code, 'detail': err_body[:500].decode(errors='replace')})}\n\n" + ).encode() + return + async for chunk in r.aiter_raw(): + yield chunk + except httpx.HTTPError as e: + logger.exception("vllm stream failed: %s", e) + yield ( + f"event: error\ndata: " + f"{json.dumps({'detail': f'vllm unreachable: {e}'})}\n\n" + ).encode() + + return StreamingResponse(passthrough(), media_type="text/event-stream") + + # Non-streaming: one POST, return the body verbatim. + try: + async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: + r = await client.post(url, content=body, headers=fwd_headers) + except httpx.HTTPError as e: + raise HTTPException(502, f"vllm unreachable: {e}") + return Response( + content=r.content, + status_code=r.status_code, + media_type=r.headers.get("content-type", "application/json"), + ) + + @router.post("/v1/chat/completions") + async def chat_completions(request: Request) -> Response: + """OpenAI chat-completions, forwarded to the vLLM on Spark 1. + + Request body is passed through unchanged — anything vLLM understands + works here (model, messages, max_tokens, temperature, response_format, + chat_template_kwargs, tools, tool_choice, ...). + + Streaming: set `stream: true` in the request body and we'll stream the + SSE response from vLLM back through this proxy. Default 30-min timeout + per request to accommodate large-context completions. + """ + return await _proxy(request, "/v1/chat/completions") + + @router.post("/v1/completions") + async def completions(request: Request) -> Response: + """OpenAI legacy completions, forwarded to the vLLM on Spark 1.""" + return await _proxy(request, "/v1/completions") + + return router diff --git a/image/app/server.py b/image/app/server.py index ed05be6..b2c26aa 100644 --- a/image/app/server.py +++ b/image/app/server.py @@ -16,6 +16,7 @@ from .audio_proxy import build_router as build_audio_router from .deep_health import DeepHealth from .disk import delete_from_disk, probe_disk from .download import DownloadManager +from .llm_proxy import build_router as build_llm_router from .hardware import HardwareProbe from .health import check_magpie, check_parakeet, check_vllm from .models import load_catalog @@ -64,6 +65,12 @@ app.mount("/static", StaticFiles(directory=_STATIC_DIR), name="static") # when Parakeet returns 500, instead of waiting up to 5 min for the periodic probe. app.include_router(build_audio_router(settings, deep_health=deep_health)) +# OpenAI-compatible LLM proxy: /v1/chat/completions, /v1/completions. +# Forwards to whatever vLLM is currently running on Spark 1 (per the LLM swap +# state). Supports SSE streaming when stream=true. Same trusted-host model +# as the audio proxy — clients only need one URL for everything. +app.include_router(build_llm_router(settings)) + @app.get("/", include_in_schema=False) async def index() -> FileResponse: diff --git a/package/startos/versions/v0_1_0.ts b/package/startos/versions/v0_1_0.ts index f93136b..4a02315 100644 --- a/package/startos/versions/v0_1_0.ts +++ b/package/startos/versions/v0_1_0.ts @@ -1,10 +1,10 @@ import { VersionInfo, IMPOSSIBLE } from '@start9labs/start-sdk' export const v0_1_0 = VersionInfo.of({ - version: '0.13.0:2', + version: '0.13.0:3', releaseNotes: { en_US: - 'v0.13.0:2 — per-segment confidence in diarize-chunk. Sortformer outputs per-frame per-speaker sigmoid probabilities (~12.6 fps) that we previously discarded. Now: for each diarization segment, compute mean probability of the assigned speaker across the segment\'s frames → confidence in [0, 1]. Recap Relay (and other consumers) can threshold this to render uncertain segments as "Speaker_0?" with a question mark, or to skip them entirely. Endpoint shape is otherwise unchanged — segments[].confidence is a new field, value may be None on derivation failure. Click Reapply patches on the Speech Models card after install to pick up the updated diarizer.py + main.py.', + 'v0.13.0:3 — chat-completions proxy. Adds POST /v1/chat/completions (and /v1/completions for the legacy endpoint) to Spark Control that forwards to whichever vLLM is currently loaded on Spark 1. Supports SSE streaming when stream=true in the request body. Request body is passed through unchanged — any vLLM-supported field works (model, messages, max_tokens, temperature, response_format, chat_template_kwargs, tools, ...). Closes the last gap that forced clients to know about both Spark Control AND the direct vLLM URL — recap-relay and friends can now use one trusted host for everything (transcribe, diarize, analyze) with one cert and one allowlist. 30-min request timeout to accommodate large-context completions. No parakeet container changes; no Reapply patches needed.', }, migrations: { up: async ({ effects }) => {},