initial relay scaffold

This commit is contained in:
local
2026-05-11 20:03:27 -05:00
commit b9d86fa303
58 changed files with 7609 additions and 0 deletions
+127
View File
@@ -0,0 +1,127 @@
// Admin dashboard auth. Mirrors Recap's server/admin-auth.js shape so
// the patterns are familiar. Single-user auth (operator only) — verify
// scrypt(password, salt) against the stored hash, mint a signed
// session cookie on success. Cookie is HMAC-signed with a per-install
// session secret so attackers can't forge sessions even with read
// access to the relay's /admin endpoints.
import { scryptSync, timingSafeEqual, createHmac } from "crypto";
import { getConfigSnapshot } from "./config.js";
const SCRYPT_KEYLEN = 64;
const SESSION_COOKIE = "recap-relay-admin";
const SESSION_TTL_MS = 24 * 60 * 60 * 1000; // 24h
// Path prefix that requires admin auth. Public /relay/* paths are
// authenticated per-call by the route handlers, not the cookie.
const ADMIN_PREFIX = "/admin";
function constantTimeEqual(a, b) {
if (typeof a !== "string" || typeof b !== "string") return false;
if (a.length !== b.length) return false;
try {
return timingSafeEqual(Buffer.from(a), Buffer.from(b));
} catch {
return false;
}
}
function signSessionToken(payload, secret) {
const body = JSON.stringify(payload);
const sig = createHmac("sha256", secret).update(body).digest("hex");
return `${Buffer.from(body).toString("base64url")}.${sig}`;
}
function verifySessionToken(token, secret) {
if (!token) return null;
const parts = token.split(".");
if (parts.length !== 2) return null;
const [b64, sig] = parts;
let body;
try {
body = Buffer.from(b64, "base64url").toString("utf8");
} catch {
return null;
}
const expectedSig = createHmac("sha256", secret).update(body).digest("hex");
if (!constantTimeEqual(sig, expectedSig)) return null;
let payload;
try {
payload = JSON.parse(body);
} catch {
return null;
}
if (typeof payload?.exp !== "number" || Date.now() > payload.exp) return null;
return payload;
}
export function setupAdminAuthMiddleware(app) {
app.use(async (req, res, next) => {
if (!req.path.startsWith(ADMIN_PREFIX)) return next();
// /admin/login is reachable without auth.
if (req.path === "/admin/login" || req.path === "/admin/status") return next();
const cfg = await getConfigSnapshot();
if (!cfg.relay_admin_password_hash) {
// No password set — admin endpoints are disabled entirely.
return res.status(401).json({ error: "admin_disabled" });
}
const token = req.cookies?.[SESSION_COOKIE];
const payload = verifySessionToken(token, cfg.relay_admin_session_secret);
if (!payload) return res.status(401).json({ error: "unauthorized" });
req.adminUser = payload.user;
next();
});
}
export function setupAdminAuthRoutes(app) {
app.get("/admin/status", async (_req, res) => {
const cfg = await getConfigSnapshot();
res.json({
enabled: !!cfg.relay_admin_password_hash,
username: cfg.relay_admin_username || "admin",
});
});
app.post("/admin/login", async (req, res) => {
const cfg = await getConfigSnapshot();
if (!cfg.relay_admin_password_hash) {
return res.status(400).json({ error: "admin_disabled" });
}
const { username, password } = req.body || {};
if (
!username ||
!password ||
typeof username !== "string" ||
typeof password !== "string"
) {
return res.status(400).json({ error: "missing_credentials" });
}
if (username.trim() !== (cfg.relay_admin_username || "admin")) {
return res.status(401).json({ error: "invalid_credentials" });
}
const hash = scryptSync(password, cfg.relay_admin_password_salt, SCRYPT_KEYLEN).toString("hex");
if (!constantTimeEqual(hash, cfg.relay_admin_password_hash)) {
return res.status(401).json({ error: "invalid_credentials" });
}
const token = signSessionToken(
{ user: username, exp: Date.now() + SESSION_TTL_MS },
cfg.relay_admin_session_secret
);
res.cookie(SESSION_COOKIE, token, {
httpOnly: true,
sameSite: "lax",
// secure: true would be ideal but the relay runs behind
// StartTunnel which terminates TLS — the cookie travels over
// plain HTTP inside the tunnel. Leave secure false so the
// cookie sticks; the tunnel itself provides the encryption.
secure: false,
maxAge: SESSION_TTL_MS,
});
res.json({ ok: true, username });
});
app.post("/admin/logout", (_req, res) => {
res.clearCookie(SESSION_COOKIE);
res.json({ ok: true });
});
}
+176
View File
@@ -0,0 +1,176 @@
// Gemini backend forwarder. Receives a transcribe or analyze request
// from a route handler, calls the corresponding Gemini API, and
// returns a normalized result the route can wrap in the standard
// envelope.
//
// v0.1 implements:
// - transcribeAudio({ audio: Buffer, mimeType, title?, channel?,
// description?, chapters?, offsetSeconds? }) → { text, segments,
// duration_seconds }
// - analyzeText({ prompt }) → { text }
//
// Both go through @google/genai with similar prompts to Recap's
// gemini.js provider, so output shapes line up with what Recap's
// orchestration layer expects.
import { GoogleGenAI } from "@google/genai";
import fs from "fs/promises";
import os from "os";
import path from "path";
const TRANSCRIPTION_MODEL = "gemini-3-flash-preview";
const ANALYSIS_MODEL = "gemini-3.1-pro-preview";
const EMPTY_RETRIES = 3;
const TRANSCRIPTION_SAFETY = [
{ category: "HARM_CATEGORY_HARASSMENT", threshold: "BLOCK_NONE" },
{ category: "HARM_CATEGORY_HATE_SPEECH", threshold: "BLOCK_NONE" },
{ category: "HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold: "BLOCK_NONE" },
{ category: "HARM_CATEGORY_DANGEROUS_CONTENT", threshold: "BLOCK_NONE" },
];
export function createGeminiBackend({ apiKey, timeoutMs = 900_000 } = {}) {
if (!apiKey) {
throw new Error("createGeminiBackend: apiKey is required");
}
const ai = new GoogleGenAI({
apiKey,
httpOptions: { timeout: timeoutMs, headersTimeout: timeoutMs },
});
async function transcribeAudio({
audio,
mimeType,
title = "",
channel = "",
description = "",
chapters = [],
offsetSeconds = 0,
}) {
// The Files API requires a path on disk; write to a temp file.
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "relay-tx-"));
const tmpPath = path.join(tmpDir, "audio.bin");
await fs.writeFile(tmpPath, audio);
try {
const uploaded = await ai.files.upload({
file: tmpPath,
config: { mimeType },
});
let f = uploaded;
const pStart = Date.now();
while (f.state === "PROCESSING") {
if (Date.now() - pStart > 5 * 60 * 1000) {
throw new Error("Gemini file processing exceeded 5 min");
}
await new Promise((r) => setTimeout(r, 3000));
f = await ai.files.get({ name: f.name });
}
if (f.state === "FAILED") {
throw new Error("Gemini failed to process audio file");
}
const prompt = buildTranscriptionPrompt({ title, channel, description, chapters });
let result;
for (let attempt = 0; attempt < EMPTY_RETRIES; attempt++) {
result = await ai.models.generateContent({
model: TRANSCRIPTION_MODEL,
config: {
thinkingConfig: { thinkingLevel: "minimal" },
safetySettings: TRANSCRIPTION_SAFETY,
},
contents: [
{
role: "user",
parts: [
{ fileData: { fileUri: f.uri, mimeType } },
{ text: prompt },
],
},
],
});
if (safeText(result)) break;
}
// Best-effort cleanup of the uploaded File API artifact.
try { await ai.files.delete({ name: f.name }); } catch {}
const text = safeText(result) || "";
return {
text,
// Gemini returns a single timestamped blob — segments are
// parsed client-side by the orchestration layer. We could
// pre-parse here but Recap already has parseTimestampedTranscript
// that handles this exact shape.
segments: [],
duration_seconds: 0,
};
} finally {
try { await fs.rm(tmpDir, { recursive: true, force: true }); } catch {}
}
}
async function analyzeText({ prompt }) {
const result = await ai.models.generateContent({
model: ANALYSIS_MODEL,
contents: [
{
role: "user",
parts: [{ text: prompt }],
},
],
});
return {
text: safeText(result) || "",
};
}
return { transcribeAudio, analyzeText };
}
function safeText(r) {
try {
if (r?.text) return r.text;
} catch {}
try {
const parts = r?.candidates?.[0]?.content?.parts;
if (parts) return parts.map((p) => p.text || "").join("");
} catch {}
return "";
}
function buildTranscriptionPrompt({ title, channel, description, chapters } = {}) {
let ctx = "";
if (title) ctx += `Video title: "${title}"\n`;
if (channel) ctx += `Channel: ${channel}\n`;
if (description) {
const d = description.length > 1500 ? description.slice(0, 1500) + "…" : description;
ctx += `Video description (use to identify speakers by name):\n${d}\n`;
}
if (Array.isArray(chapters) && chapters.length > 0) {
const lines = chapters
.slice(0, 30)
.map((c) => {
const start = typeof c.start_time === "number" ? c.start_time : 0;
const mm = Math.floor(start / 60);
const ss = Math.floor(start % 60).toString().padStart(2, "0");
return ` [${mm}:${ss}] ${c.title || ""}`;
})
.join("\n");
ctx += `Chapter markers:\n${lines}\n`;
}
if (ctx) ctx += "\n";
return `${ctx}Transcribe this audio completely and verbatim. Include timestamps at regular intervals (every 15-30 seconds or at natural pauses).
Format each line as:
[MM:SS] The spoken text here...
Rules:
- Transcribe EVERY word spoken, do not skip or summarize anything.
- Use [MM:SS] or [H:MM:SS] timestamp format at the start of each line.
- Start a new timestamped line every 15-30 seconds or at natural speech pauses.
- Include filler words (um, uh, you know) for accuracy.
- Speaker identification: FIRST consult the metadata above — descriptions and chapter titles usually name the host(s) and guest(s) explicitly. Format as: [MM:SS] Name: text. Only fall back to "Host"/"Guest" if no names appear.
Return ONLY the timestamped transcript, nothing else.`;
}
+55
View File
@@ -0,0 +1,55 @@
// Operator-hardware fallback backend. Forwards transcribe requests to
// the operator's Parakeet (or any Whisper-API-compatible) endpoint and
// analyze requests to their Gemma (or any OpenAI-API-compatible) endpoint.
//
// v0.1 is a stub — the endpoints are wired up, but no operator has
// pointed a real Parakeet/Gemma at the relay yet. Returns a 503
// "hardware fallback not yet wired" so the credits.js routing logic
// still applies but users get a clear message instead of a silent
// failure.
export function createHardwareBackend({
parakeetBaseURL = "",
gemmaBaseURL = "",
} = {}) {
const hasParakeet = !!parakeetBaseURL;
const hasGemma = !!gemmaBaseURL;
return {
hasTranscribe: hasParakeet,
hasAnalyze: hasGemma,
async transcribeAudio() {
if (!hasParakeet) {
const err = new Error(
"operator-hardware transcribe path is not configured (relay_parakeet_base_url is empty)"
);
err.status = 503;
throw err;
}
// TODO v0.2: POST audio to parakeetBaseURL using the OpenAI
// audio-transcriptions wire format Recap already speaks. Return
// { text, segments, duration_seconds } in the same shape as
// gemini.js's transcribeAudio.
const err = new Error("operator-hardware transcribe path not yet implemented in relay v0.1");
err.status = 503;
throw err;
},
async analyzeText() {
if (!hasGemma) {
const err = new Error(
"operator-hardware analyze path is not configured (relay_gemma_base_url is empty)"
);
err.status = 503;
throw err;
}
// TODO v0.2: POST prompt to gemmaBaseURL using either /api/generate
// (Ollama native) or /v1/chat/completions (OpenAI-compatible).
// Return { text } matching gemini.js's analyzeText.
const err = new Error("operator-hardware analyze path not yet implemented in relay v0.1");
err.status = 503;
throw err;
},
};
}
+98
View File
@@ -0,0 +1,98 @@
// Live-reloading config layer. Mirrors Recap's config.js pattern: read
// /data/config/relay-config.json on every access (filesystem watcher
// pulls in StartOS-action changes without a daemon restart), parse,
// and expose typed accessors.
//
// All defaults match the schema in startos/file-models/config.json.ts.
import fs from "fs/promises";
import path from "path";
let dataDir = "/data";
let cached = { mtimeMs: 0, snapshot: defaultConfig() };
function defaultConfig() {
return {
relay_gemini_api_key: "",
relay_parakeet_base_url: "",
relay_gemma_base_url: "",
relay_keysat_base_url: "https://keysat.xyz",
relay_admin_username: "",
relay_admin_password_hash: "",
relay_admin_password_salt: "",
relay_admin_session_secret: "",
relay_tier_quotas_json: JSON.stringify({
core: { lifetime: 5, monthly: null, geminiCapMonthly: null },
pro: { lifetime: null, monthly: 50, geminiCapMonthly: 25 },
max: { lifetime: null, monthly: null, geminiCapMonthly: 50 },
}),
};
}
function configPath() {
return path.join(dataDir, "config", "relay-config.json");
}
export async function initConfig({ dataDir: dd }) {
if (dd) dataDir = dd;
await fs.mkdir(path.dirname(configPath()), { recursive: true }).catch(() => {});
// Prime the cache so the first request doesn't pay for a file-read.
await getConfigSnapshot();
}
// Reads the on-disk config and merges with defaults. Cheap — single
// stat + read per call, but the result is cached until the file mtime
// changes so repeat callers within one request don't re-read.
export async function getConfigSnapshot() {
const p = configPath();
let stat;
try {
stat = await fs.stat(p);
} catch {
return cached.snapshot;
}
if (stat.mtimeMs === cached.mtimeMs) return cached.snapshot;
try {
const raw = await fs.readFile(p, "utf8");
const parsed = JSON.parse(raw);
cached = {
mtimeMs: stat.mtimeMs,
snapshot: { ...defaultConfig(), ...parsed },
};
} catch (err) {
console.warn(`[config] failed to parse ${p}: ${err?.message}`);
}
return cached.snapshot;
}
// Parsed view of relay_tier_quotas_json, with safe fallbacks if the
// blob is missing or malformed.
export async function getTierQuotas() {
const cfg = await getConfigSnapshot();
try {
const parsed = JSON.parse(cfg.relay_tier_quotas_json);
return {
core: {
lifetime: parsed?.core?.lifetime ?? 5,
monthly: parsed?.core?.monthly ?? null,
geminiCapMonthly: parsed?.core?.geminiCapMonthly ?? null,
},
pro: {
lifetime: parsed?.pro?.lifetime ?? null,
monthly: parsed?.pro?.monthly ?? 50,
geminiCapMonthly: parsed?.pro?.geminiCapMonthly ?? 25,
},
max: {
lifetime: parsed?.max?.lifetime ?? null,
monthly: parsed?.max?.monthly ?? null,
geminiCapMonthly: parsed?.max?.geminiCapMonthly ?? 50,
},
};
} catch {
return {
core: { lifetime: 5, monthly: null, geminiCapMonthly: null },
pro: { lifetime: null, monthly: 50, geminiCapMonthly: 25 },
max: { lifetime: null, monthly: null, geminiCapMonthly: 50 },
};
}
}
+179
View File
@@ -0,0 +1,179 @@
// Credit ledger keyed by install-id. JSON-file backed (single file at
// /data/credits.json). Write throughput is low — at most one mutation
// per relay request — so a plain JSON file with mutex-style serial
// writes is plenty. Swap to SQLite if a single relay starts seeing
// dozens of req/sec sustained.
//
// Per-install row shape:
// {
// install_id: "uuid",
// tier_snapshot: "core" | "pro" | "max", // last-seen tier
// lifetime_consumed: number, // for Core lifetime cap
// month: "YYYY-MM", // calendar-month key
// monthly_consumed: number, // total this month
// monthly_gemini_consumed: number, // Gemini-only this month
// last_active_at: ISO-8601 string,
// }
import fs from "fs/promises";
import path from "path";
let dataDir = "/data";
let ledgerPath = "/data/credits.json";
let ledger = { rows: {} };
let writing = null; // serializes concurrent writes
export async function initCredits({ dataDir: dd }) {
if (dd) dataDir = dd;
ledgerPath = path.join(dataDir, "credits.json");
await fs.mkdir(dataDir, { recursive: true }).catch(() => {});
try {
const raw = await fs.readFile(ledgerPath, "utf8");
ledger = JSON.parse(raw) || { rows: {} };
if (!ledger.rows) ledger.rows = {};
} catch (err) {
if (err.code !== "ENOENT") {
console.warn(`[credits] failed to read ledger: ${err.message} — starting empty`);
}
ledger = { rows: {} };
}
console.log(`[credits] loaded ${Object.keys(ledger.rows).length} install rows from ${ledgerPath}`);
}
function currentMonthKey() {
const d = new Date();
return `${d.getUTCFullYear()}-${String(d.getUTCMonth() + 1).padStart(2, "0")}`;
}
// Lazily rolls over the per-install monthly counters when the calendar
// month changes. Lifetime counter is left untouched (Core lifetime
// credits never reset).
function ensureCurrentMonth(row) {
const m = currentMonthKey();
if (row.month !== m) {
row.month = m;
row.monthly_consumed = 0;
row.monthly_gemini_consumed = 0;
}
return row;
}
function blankRow(installId) {
return {
install_id: installId,
tier_snapshot: "core",
lifetime_consumed: 0,
month: currentMonthKey(),
monthly_consumed: 0,
monthly_gemini_consumed: 0,
last_active_at: new Date().toISOString(),
};
}
async function persist() {
// Coalesce concurrent writes — multiple in-flight mutations resolve
// against the same persisted snapshot in fifo order.
if (writing) await writing;
writing = (async () => {
const tmp = ledgerPath + ".tmp";
await fs.writeFile(tmp, JSON.stringify(ledger), { mode: 0o600 });
await fs.rename(tmp, ledgerPath);
})();
try {
await writing;
} finally {
writing = null;
}
}
// Returns the row for an install, creating + persisting a blank one
// if this is the first time we've seen it.
export async function getOrCreateRow(installId) {
if (!installId) throw new Error("getOrCreateRow: installId required");
let row = ledger.rows[installId];
if (!row) {
row = blankRow(installId);
ledger.rows[installId] = row;
await persist();
}
return ensureCurrentMonth(row);
}
// Compute the remaining balance for a row against its tier's quota.
// Returns:
// { remaining: number | null, capped: "lifetime" | "monthly" | "none", gemini_remaining: number | null }
// `null` for remaining means "unlimited" (Max tier total).
export function computeRemaining(row, quota) {
const tier = row.tier_snapshot;
const tierQuota = quota[tier] || quota.core;
if (tierQuota.lifetime != null) {
const remaining = Math.max(0, tierQuota.lifetime - (row.lifetime_consumed || 0));
return {
remaining,
capped: "lifetime",
gemini_remaining: null, // lifetime tier doesn't split Gemini/hardware
};
}
let remaining;
if (tierQuota.monthly == null) {
remaining = null; // unlimited
} else {
remaining = Math.max(0, tierQuota.monthly - (row.monthly_consumed || 0));
}
const geminiRemaining =
tierQuota.geminiCapMonthly == null
? null
: Math.max(0, tierQuota.geminiCapMonthly - (row.monthly_gemini_consumed || 0));
return {
remaining,
capped: "monthly",
gemini_remaining: geminiRemaining,
};
}
// Decide what backend a request should go to and whether it can be
// served at all. Returns { allowed, backend: "gemini"|"hardware",
// reason }. Does NOT debit — that's a separate commit step after the
// backend call succeeds.
export function planBackend(row, quota, { hasHardware }) {
const balance = computeRemaining(row, quota);
// Out of credits entirely?
if (balance.remaining === 0) {
return { allowed: false, backend: null, reason: "out_of_credits" };
}
// Pick backend: Gemini if there's room under the Gemini cap; else
// fall back to hardware if configured; else 402.
if (balance.gemini_remaining === null || balance.gemini_remaining > 0) {
return { allowed: true, backend: "gemini", reason: null };
}
if (hasHardware) {
return { allowed: true, backend: "hardware", reason: null };
}
return { allowed: false, backend: null, reason: "gemini_cap_exceeded_no_hardware" };
}
// Debit one credit on a successful call. Persists immediately.
export async function commitCredit(installId, { backend, tier }) {
const row = await getOrCreateRow(installId);
row.tier_snapshot = tier;
if (tier === "core") {
row.lifetime_consumed = (row.lifetime_consumed || 0) + 1;
} else {
row.monthly_consumed = (row.monthly_consumed || 0) + 1;
if (backend === "gemini") {
row.monthly_gemini_consumed = (row.monthly_gemini_consumed || 0) + 1;
}
}
row.last_active_at = new Date().toISOString();
await persist();
}
// For the admin dashboard.
export function snapshotAll() {
return Object.values(ledger.rows).map((r) => ({ ...r }));
}
+75
View File
@@ -0,0 +1,75 @@
// Recap Relay — operator-side credit-metered proxy in front of Gemini
// (and optionally a co-located Parakeet+Gemma setup).
//
// Two public endpoints:
// POST /relay/transcribe — audio → text (Gemini File API)
// POST /relay/analyze — text → topic sections JSON (Gemini Pro)
// Plus admin endpoints under /admin/* gated by an HTTP session cookie.
import express from "express";
import cors from "cors";
import cookieParser from "cookie-parser";
import path from "path";
import { fileURLToPath } from "url";
import { initConfig } from "./config.js";
import { initCredits } from "./credits.js";
import {
setupAdminAuthMiddleware,
setupAdminAuthRoutes,
} from "./admin-auth.js";
import { transcribeRouter } from "./routes/transcribe.js";
import { analyzeRouter } from "./routes/analyze.js";
import { healthRouter } from "./routes/health.js";
import { adminRouter } from "./routes/admin.js";
const __dirname = path.dirname(fileURLToPath(import.meta.url));
// DATA_DIR is /data on StartOS, project root in dev.
const DATA_DIR = process.env.DATA_DIR || path.join(__dirname, "..");
const PORT = parseInt(process.env.PORT || "3002", 10);
await initConfig({ dataDir: DATA_DIR });
await initCredits({ dataDir: DATA_DIR });
const app = express();
app.use(cors());
app.use(cookieParser());
// Admin auth must run BEFORE the admin routes register so the cookie
// check applies to /admin/usage, /admin/config, etc. /admin/login and
// /admin/status are explicitly exempted inside the middleware.
setupAdminAuthMiddleware(app);
setupAdminAuthRoutes(app);
// Public relay endpoints. No app-level auth — each route handler
// authenticates per-call via headers (X-Recap-Install-Id required,
// Authorization optional).
app.use("/relay", healthRouter());
app.use("/relay", transcribeRouter());
app.use("/relay", analyzeRouter());
// Admin dashboard endpoints (cookie-gated).
app.use("/admin", adminRouter({ dataDir: DATA_DIR }));
// Static admin UI (v0.2 will flesh out public/admin.html). For v0.1
// the dashboard is JSON-only; serve any static assets dropped into
// public/ but don't error if the directory is empty.
app.use(express.static(path.join(__dirname, "..", "public")));
// Root: redirect to /admin/ for operator convenience, or show a tiny
// placeholder for Recap clients that hit the root by mistake.
app.get("/", (_req, res) => {
res.type("text/plain").send(
"Recap Relay\n" +
"===========\n" +
"Public endpoints: POST /relay/transcribe, POST /relay/analyze, GET /relay/health\n" +
"Operator dashboard: /admin/\n"
);
});
const HOSTNAME = process.env.HOSTNAME || "0.0.0.0";
app.listen(PORT, HOSTNAME, () => {
console.log(`[relay] listening on http://${HOSTNAME}:${PORT}`);
console.log(`[relay] data directory: ${DATA_DIR}`);
});
+68
View File
@@ -0,0 +1,68 @@
// Job-id deduplication. Recap mints a UUID per summarize job (the
// transcribe + analyze pair) and sends it in X-Recap-Job-Id on every
// relay call. The first call with a given (install_id, job_id) tuple
// reserves a credit; subsequent calls with the same tuple are free
// until the job_id expires (1 hour).
//
// Stored in-memory only — not persisted across restarts because (a)
// a restart breaks all in-flight Recap streams anyway and (b) the
// worst-case outcome of a "lost reservation" is the user being
// charged for a single retry, which is acceptable.
const JOB_TTL_MS = 60 * 60 * 1000; // 1 hour
// Map<install_id|job_id, { backend, tier, charged_at, refunded }>
const jobs = new Map();
function key(installId, jobId) {
return `${installId}|${jobId}`;
}
// On a new request: returns { charged: true } if this is the first call
// for the job (caller must commit a credit), or { charged: false,
// backend, tier } if it's a retry/follow-up.
export function lookupJob(installId, jobId) {
if (!installId || !jobId) return null;
pruneExpired();
const k = key(installId, jobId);
const existing = jobs.get(k);
if (existing && !existing.refunded) return existing;
return null;
}
// Mark a job as having been charged. Idempotent — second call for the
// same (install_id, job_id) is a no-op.
export function markJobCharged(installId, jobId, { backend, tier }) {
if (!installId || !jobId) return;
pruneExpired();
const k = key(installId, jobId);
if (jobs.has(k) && !jobs.get(k).refunded) return;
jobs.set(k, {
backend,
tier,
charged_at: Date.now(),
refunded: false,
});
}
// Refund a previously charged credit for a failed job. Future calls
// with the same job_id will be treated as new (since the reservation
// is no longer valid).
export function refundJob(installId, jobId) {
if (!installId || !jobId) return;
const k = key(installId, jobId);
const existing = jobs.get(k);
if (existing) existing.refunded = true;
}
function pruneExpired() {
const cutoff = Date.now() - JOB_TTL_MS;
for (const [k, v] of jobs) {
if (v.charged_at < cutoff) jobs.delete(k);
}
}
export function snapshotJobs() {
pruneExpired();
return Array.from(jobs.entries()).map(([k, v]) => ({ key: k, ...v }));
}
+183
View File
@@ -0,0 +1,183 @@
// Cached license validation against the Keysat license server. Two
// layers:
// 1. Offline signature verification using the vendored
// @keysat/licensing-client. Fast and works without network.
// 2. Cached online check against keysat.xyz (or wherever
// relay_keysat_base_url points) — confirms the key hasn't been
// revoked since the last sync. Cached per license-key for
// KEYSAT_CACHE_TTL_MS to avoid hammering keysat on hot paths.
//
// Returns a normalized object with tier resolved from entitlements:
// { state: "licensed" | "invalid" | "anonymous",
// tier: "core" | "pro" | "max",
// licenseUuid: string | null,
// entitlements: string[],
// reason: string | null }
import { getConfigSnapshot } from "./config.js";
const KEYSAT_CACHE_TTL_MS = 60 * 60 * 1000; // 1 hour
// Map<licenseKey, { result, validated_at }>
const cache = new Map();
// Dynamically import the licensing client so it doesn't block boot
// if vendor/keysat-licensing-client is missing in dev environments.
let verifierLoaded = false;
let verifier = null;
async function loadVerifier() {
if (verifierLoaded) return verifier;
verifierLoaded = true;
try {
const mod = await import("@keysat/licensing-client");
// Same pattern Recap uses: build a verifier with the embedded
// public key. Recap's license.js shows the exact call; copy it
// here. For v0.1 we only need offline verification — if the
// vendor module signature differs across Recap versions we can
// tweak this.
if (mod?.createVerifier) {
verifier = mod.createVerifier();
}
} catch (err) {
console.warn(
`[keysat] failed to load @keysat/licensing-client (${err.message}) — will treat all licenses as anonymous`
);
}
return verifier;
}
// Resolve a tier from an entitlements set. Bundles aren't supported
// yet — explicit entitlement names only. relay_max wins over relay_pro
// if somehow both are present.
function tierFromEntitlements(entitlements) {
if (entitlements.has("relay_max")) return "max";
if (entitlements.has("relay_pro")) return "pro";
return "core";
}
// Public entry: takes the raw `Authorization: Bearer <key>` value (or
// null) and returns a resolved license. Anonymous = no header = Core
// tier.
export async function resolveLicense(rawAuth) {
if (!rawAuth) {
return {
state: "anonymous",
tier: "core",
licenseUuid: null,
entitlements: [],
reason: null,
};
}
const key = stripBearer(rawAuth);
if (!key) {
return {
state: "invalid",
tier: "core",
licenseUuid: null,
entitlements: [],
reason: "malformed_auth_header",
};
}
// Cache hit (still fresh)?
const cached = cache.get(key);
if (cached && Date.now() - cached.validated_at < KEYSAT_CACHE_TTL_MS) {
return cached.result;
}
// Offline verify first — establishes the entitlements + license id.
const v = await loadVerifier();
let offline = null;
if (v) {
try {
offline = v.verify(key);
} catch (err) {
const result = {
state: "invalid",
tier: "core",
licenseUuid: null,
entitlements: [],
reason: `verify_failed: ${err.message}`,
};
cache.set(key, { result, validated_at: Date.now() });
return result;
}
}
// If offline verify worked, use its payload as the source of truth
// for entitlements + license id. Then hit keysat for revocation
// status.
const entitlements = new Set(offline?.payload?.entitlements || []);
const licenseUuid = offline?.payload?.licenseUuid || null;
let online = await onlineCheck(key);
if (online && online.revoked) {
const result = {
state: "invalid",
tier: "core",
licenseUuid,
entitlements: [],
reason: online.reason || "revoked",
};
cache.set(key, { result, validated_at: Date.now() });
return result;
}
const tier = tierFromEntitlements(entitlements);
const result = {
state: "licensed",
tier,
licenseUuid,
entitlements: [...entitlements],
reason: null,
};
cache.set(key, { result, validated_at: Date.now() });
return result;
}
function stripBearer(raw) {
const m = (raw || "").trim().match(/^Bearer\s+(.+)$/i);
if (m) return m[1].trim();
// Accept a bare key without the Bearer prefix for tolerance.
return raw.trim();
}
// Best-effort online check. Returns null on network error (cache the
// offline-verified result with a short TTL so we don't pound keysat
// while it's down) or { revoked: boolean, reason?: string }.
async function onlineCheck(licenseKey) {
try {
const cfg = await getConfigSnapshot();
const base = (cfg.relay_keysat_base_url || "").replace(/\/$/, "");
if (!base) return null;
// POST /validate is the standard Keysat shape (per the licensing
// client docs). If the actual endpoint differs we'll wire it up
// once we point the relay at a real Keysat server.
const res = await fetch(`${base}/validate`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ license_key: licenseKey }),
signal: AbortSignal.timeout(5000),
});
if (!res.ok) {
console.warn(`[keysat] online check ${base}/validate returned ${res.status}`);
return null;
}
const data = await res.json();
return {
revoked: !!data?.revoked || data?.status === "revoked",
reason: data?.reason || null,
};
} catch (err) {
console.warn(`[keysat] online check failed: ${err?.message}`);
return null;
}
}
export function snapshotCache() {
return Array.from(cache.entries()).map(([k, v]) => ({
license_prefix: k.slice(0, 12) + "…",
tier: v.result?.tier,
state: v.result?.state,
validated_at: new Date(v.validated_at).toISOString(),
}));
}
+14
View File
@@ -0,0 +1,14 @@
{
"name": "recap-relay-server",
"version": "0.1.0",
"type": "module",
"private": true,
"dependencies": {
"@google/genai": "^1.0.0",
"@keysat/licensing-client": "file:../vendor/keysat-licensing-client",
"cors": "^2.8.5",
"cookie-parser": "^1.4.6",
"express": "^4.21.0",
"multer": "^1.4.5-lts.1"
}
}
+102
View File
@@ -0,0 +1,102 @@
// /admin/* — operator dashboard endpoints. All require the admin
// session cookie (enforced by admin-auth middleware).
//
// v0.1 endpoints (JSON only; v0.2 will add an HTML dashboard):
// GET /admin/usage — all install rows + last-month aggregates
// GET /admin/config — current operator config (sans password hash)
// POST /admin/quotas — adjust tier quotas live (mirror of StartOS
// action but reachable from the dashboard)
import express from "express";
import { getConfigSnapshot } from "../config.js";
import { snapshotAll } from "../credits.js";
import { snapshotCache } from "../keysat-client.js";
import { snapshotJobs } from "../job-credits.js";
import fs from "fs/promises";
import path from "path";
export function adminRouter({ dataDir }) {
const router = express.Router();
router.get("/usage", async (_req, res) => {
const rows = snapshotAll();
res.json({
installs: rows.length,
rows,
});
});
router.get("/config", async (_req, res) => {
const cfg = await getConfigSnapshot();
// Strip secrets before exposing to the dashboard.
const safe = {
keysat_base_url: cfg.relay_keysat_base_url,
parakeet_base_url: cfg.relay_parakeet_base_url,
gemma_base_url: cfg.relay_gemma_base_url,
gemini_configured: !!cfg.relay_gemini_api_key,
admin_username: cfg.relay_admin_username,
tier_quotas: tryParse(cfg.relay_tier_quotas_json),
};
res.json(safe);
});
router.get("/license-cache", async (_req, res) => {
res.json({ entries: snapshotCache() });
});
router.get("/jobs", async (_req, res) => {
res.json({ entries: snapshotJobs() });
});
// Adjust the live quotas blob. Same shape the StartOS action writes
// to relay_tier_quotas_json — kept here so the dashboard can tune
// quotas without round-tripping the StartOS UI.
router.post("/quotas", express.json(), async (req, res) => {
const incoming = req.body || {};
const normalized = {
core: {
lifetime: numOrNull(incoming?.core?.lifetime, 5),
monthly: numOrNull(incoming?.core?.monthly, null),
geminiCapMonthly: numOrNull(incoming?.core?.geminiCapMonthly, null),
},
pro: {
lifetime: numOrNull(incoming?.pro?.lifetime, null),
monthly: numOrNull(incoming?.pro?.monthly, 50),
geminiCapMonthly: numOrNull(incoming?.pro?.geminiCapMonthly, 25),
},
max: {
lifetime: numOrNull(incoming?.max?.lifetime, null),
monthly: numOrNull(incoming?.max?.monthly, null),
geminiCapMonthly: numOrNull(incoming?.max?.geminiCapMonthly, 50),
},
};
// Write directly into relay-config.json — the live-reloader picks
// it up on the next read.
const configPath = path.join(dataDir, "config", "relay-config.json");
let existing = {};
try {
existing = JSON.parse(await fs.readFile(configPath, "utf8"));
} catch {}
existing.relay_tier_quotas_json = JSON.stringify(normalized);
await fs.mkdir(path.dirname(configPath), { recursive: true });
await fs.writeFile(configPath, JSON.stringify(existing), { mode: 0o600 });
res.json({ ok: true, quotas: normalized });
});
return router;
}
function numOrNull(v, fallback) {
if (v === null) return null;
const n = Number(v);
if (Number.isFinite(n)) return n;
return fallback;
}
function tryParse(s) {
try {
return JSON.parse(s);
} catch {
return null;
}
}
+114
View File
@@ -0,0 +1,114 @@
// POST /relay/analyze — forwards an analysis prompt to the chosen
// backend and returns the standard envelope.
//
// Request body (application/json):
// { prompt: string }
//
// Headers: same as /relay/transcribe (X-Recap-Install-Id required,
// X-Recap-Job-Id optional, Authorization optional Bearer license).
//
// Same charge-once-per-job semantics: a Recap summarize job pairs
// transcribe + analyze with the same X-Recap-Job-Id. The first call
// (whichever endpoint) charges 1 credit; the second is free.
import express from "express";
import { resolveLicense } from "../keysat-client.js";
import { getOrCreateRow, planBackend, commitCredit } from "../credits.js";
import { lookupJob, markJobCharged, refundJob } from "../job-credits.js";
import { getConfigSnapshot, getTierQuotas } from "../config.js";
import { createGeminiBackend } from "../backends/gemini.js";
import { createHardwareBackend } from "../backends/hardware.js";
import { envelope, errorEnvelope } from "./envelope.js";
export function analyzeRouter() {
const router = express.Router();
router.post("/analyze", express.json({ limit: "10mb" }), async (req, res) => {
const installId = req.header("X-Recap-Install-Id");
const jobId = req.header("X-Recap-Job-Id") || null;
const auth = req.header("Authorization");
if (!installId) {
const e = await errorEnvelope({
error: "missing X-Recap-Install-Id header",
statusHint: 400,
});
return res.status(400).json(e.body);
}
const prompt = req.body?.prompt;
if (!prompt || typeof prompt !== "string") {
const e = await errorEnvelope({
error: "missing or non-string body.prompt",
installId,
statusHint: 400,
});
return res.status(400).json(e.body);
}
const license = await resolveLicense(auth);
const tier = license.tier;
const row = await getOrCreateRow(installId);
row.tier_snapshot = tier;
let reusedJob = false;
let chosenBackend = null;
const existingJob = lookupJob(installId, jobId);
if (existingJob) {
reusedJob = true;
chosenBackend = existingJob.backend;
} else {
const cfg = await getConfigSnapshot();
const hasHardware = !!cfg.relay_gemma_base_url;
const quota = await getTierQuotas();
const plan = planBackend(row, quota, { hasHardware });
if (!plan.allowed) {
const e = await errorEnvelope({
error: plan.reason,
installId,
tier,
statusHint: 402,
});
return res.status(402).json(e.body);
}
chosenBackend = plan.backend;
}
const cfg = await getConfigSnapshot();
let result;
try {
if (chosenBackend === "gemini") {
const backend = createGeminiBackend({ apiKey: cfg.relay_gemini_api_key });
result = await backend.analyzeText({ prompt });
} else {
const backend = createHardwareBackend({
parakeetBaseURL: cfg.relay_parakeet_base_url,
gemmaBaseURL: cfg.relay_gemma_base_url,
});
result = await backend.analyzeText({ prompt });
}
} catch (err) {
if (reusedJob) refundJob(installId, jobId);
console.error(`[relay/analyze] backend error: ${err?.message}`);
const e = await errorEnvelope({
error: err?.message || "backend_error",
installId,
tier,
statusHint: err?.status || 502,
});
return res.status(e.statusHint).json(e.body);
}
let creditCharged = 0;
if (!reusedJob) {
await commitCredit(installId, { backend: chosenBackend, tier });
markJobCharged(installId, jobId, { backend: chosenBackend, tier });
creditCharged = 1;
}
const body = await envelope({ result, installId, tier, creditCharged });
res.json(body);
});
return router;
}
+58
View File
@@ -0,0 +1,58 @@
// Standard response envelope. Every /relay/* response (success and
// error both) goes through this so Recap clients can keep their
// credit-balance display accurate regardless of what happened.
//
// Shape: { result, credits_remaining, tier, credit_charged }
import { getOrCreateRow, computeRemaining } from "../credits.js";
import { getTierQuotas } from "../config.js";
// Build the envelope around a result object.
export async function envelope({
result = null,
installId,
tier,
creditCharged = 0,
}) {
const quota = await getTierQuotas();
const row = await getOrCreateRow(installId);
// tier_snapshot on the row was just updated by commitCredit; if no
// credit was committed (free reuse via job_id) it still reflects
// the last-known tier for this install, which is fine.
const balance = computeRemaining(row, quota);
return {
result,
credits_remaining: balance.remaining, // null = unlimited (Max)
tier,
credit_charged: creditCharged,
};
}
// Same shape but for error responses. The error reason goes in `error`
// alongside `result: null`. Clients should still update their balance
// display from `credits_remaining` so failed calls (which were
// refunded) reflect the unchanged balance.
export async function errorEnvelope({
error,
installId,
tier = "core",
statusHint = 500,
}) {
let creditsRemaining = null;
try {
const quota = await getTierQuotas();
const row = await getOrCreateRow(installId || "unknown");
const balance = computeRemaining(row, quota);
creditsRemaining = balance.remaining;
} catch {}
return {
statusHint,
body: {
result: null,
error: typeof error === "string" ? error : error?.message || "unknown_error",
credits_remaining: creditsRemaining,
tier,
credit_charged: 0,
},
};
}
+27
View File
@@ -0,0 +1,27 @@
// GET /relay/health — public liveness check. No auth, no credit
// accounting. Returns a minimal status object so monitoring + Recap's
// /api/relay/status can verify the relay is reachable.
import express from "express";
import { getConfigSnapshot } from "../config.js";
export function healthRouter() {
const router = express.Router();
router.get("/health", async (_req, res) => {
const cfg = await getConfigSnapshot();
res.json({
ok: true,
service: "recap-relay",
version: "0.1.0",
backends: {
gemini: !!cfg.relay_gemini_api_key,
parakeet: !!cfg.relay_parakeet_base_url,
gemma: !!cfg.relay_gemma_base_url,
},
admin_enabled: !!cfg.relay_admin_password_hash,
});
});
return router;
}
+157
View File
@@ -0,0 +1,157 @@
// POST /relay/transcribe — forwards an audio payload to the chosen
// backend (Gemini first, operator hardware as overflow) and returns
// the standard envelope.
//
// Request shape: multipart/form-data
// audio: binary audio file (required)
// mime_type: string (default application/octet-stream)
// title: string (optional, used by Gemini prompt)
// channel: string (optional)
// description: string (optional)
// chapters: JSON-stringified array (optional)
// offset_seconds: number string (optional, for chunked audio)
//
// Headers:
// X-Recap-Install-Id (required)
// X-Recap-Job-Id (optional but expected — pairs with /analyze)
// Authorization (optional Bearer LIC1-... for licensed tiers)
//
// Response (standard envelope):
// {
// result: { text: "[MM:SS] ...", segments: [], duration_seconds: 0 },
// credits_remaining, tier, credit_charged
// }
import express from "express";
import multer from "multer";
import { resolveLicense } from "../keysat-client.js";
import { getOrCreateRow, planBackend, commitCredit } from "../credits.js";
import { lookupJob, markJobCharged, refundJob } from "../job-credits.js";
import { getConfigSnapshot, getTierQuotas } from "../config.js";
import { createGeminiBackend } from "../backends/gemini.js";
import { createHardwareBackend } from "../backends/hardware.js";
import { envelope, errorEnvelope } from "./envelope.js";
const upload = multer({
storage: multer.memoryStorage(),
limits: { fileSize: 200 * 1024 * 1024 }, // 200 MB per request
});
export function transcribeRouter() {
const router = express.Router();
router.post("/transcribe", upload.single("audio"), async (req, res) => {
const installId = req.header("X-Recap-Install-Id");
const jobId = req.header("X-Recap-Job-Id") || null;
const auth = req.header("Authorization");
if (!installId) {
const e = await errorEnvelope({
error: "missing X-Recap-Install-Id header",
statusHint: 400,
});
return res.status(400).json(e.body);
}
if (!req.file) {
const e = await errorEnvelope({ error: "missing audio file", installId, statusHint: 400 });
return res.status(400).json(e.body);
}
const license = await resolveLicense(auth);
const tier = license.tier;
// Persist tier on the row so the admin dashboard reflects the
// most recently seen tier for this install.
const row = await getOrCreateRow(installId);
row.tier_snapshot = tier;
// Job-id dedup. If we've already charged this job, skip the
// credit check entirely — the user is paying once for the whole
// summarize job.
let reusedJob = false;
let chosenBackend = null;
const existingJob = lookupJob(installId, jobId);
if (existingJob) {
reusedJob = true;
chosenBackend = existingJob.backend;
} else {
const cfg = await getConfigSnapshot();
const hasHardware = !!cfg.relay_parakeet_base_url;
const quota = await getTierQuotas();
const plan = planBackend(row, quota, { hasHardware });
if (!plan.allowed) {
const e = await errorEnvelope({
error: plan.reason,
installId,
tier,
statusHint: 402,
});
return res.status(402).json(e.body);
}
chosenBackend = plan.backend;
}
// Build the backend client based on chosenBackend.
const cfg = await getConfigSnapshot();
let result;
try {
if (chosenBackend === "gemini") {
const backend = createGeminiBackend({ apiKey: cfg.relay_gemini_api_key });
result = await backend.transcribeAudio({
audio: req.file.buffer,
mimeType: req.body?.mime_type || req.file.mimetype || "application/octet-stream",
title: req.body?.title || "",
channel: req.body?.channel || "",
description: req.body?.description || "",
chapters: parseChaptersField(req.body?.chapters),
offsetSeconds: Number(req.body?.offset_seconds) || 0,
});
} else {
const backend = createHardwareBackend({
parakeetBaseURL: cfg.relay_parakeet_base_url,
gemmaBaseURL: cfg.relay_gemma_base_url,
});
result = await backend.transcribeAudio({
audio: req.file.buffer,
mimeType: req.body?.mime_type || req.file.mimetype || "application/octet-stream",
offsetSeconds: Number(req.body?.offset_seconds) || 0,
});
}
} catch (err) {
// If we'd charged this job already (rare — most refundable
// failures happen on the FIRST call), refund.
if (reusedJob) refundJob(installId, jobId);
console.error(`[relay/transcribe] backend error: ${err?.message}`);
const e = await errorEnvelope({
error: err?.message || "backend_error",
installId,
tier,
statusHint: err?.status || 502,
});
return res.status(e.statusHint).json(e.body);
}
// Commit the credit on success (unless this was a job-id reuse).
let creditCharged = 0;
if (!reusedJob) {
await commitCredit(installId, { backend: chosenBackend, tier });
markJobCharged(installId, jobId, { backend: chosenBackend, tier });
creditCharged = 1;
}
const body = await envelope({ result, installId, tier, creditCharged });
res.json(body);
});
return router;
}
function parseChaptersField(raw) {
if (!raw) return [];
try {
const arr = JSON.parse(raw);
return Array.isArray(arr) ? arr : [];
} catch {
return [];
}
}