// Per-call audit log for profitability + observability. Each relay // request (success or failure) appends one line of newline-delimited // JSON to /data/relay-calls.ndjson. Append-only — read paths parse // the whole file in memory for aggregation, which is cheap up to // 100k+ entries at typical relay scale (low-tens-of-thousands of // calls per month). // // Record shape (no field is required; missing fields just don't // appear in aggregations): // { // ts: ms-epoch when the request landed // install_id: X-Recap-Install-Id (truncated for log readability) // tier: "core" | "pro" | "max" // pipeline: "transcribe" | "analyze" // backend: "gemini" | "hardware" // model: e.g. "gemini-3-flash-preview", "parakeet-tdt-0.6b-v3" // status: "success" | "error" | "refused" (refused = quota) // credit_charged: 0 | 1 // duration_ms: end-to-end wall time // input_tokens, output_tokens, thinking_tokens (Gemini only) // cost_usd: computed from token counts × per-1M-token rates // job_id: X-Recap-Job-Id (so we can collapse pairs into one) // error: short error string if status="error" // } // // Rotation isn't built in — for the prototype, operator can rotate // manually (mv relay-calls.ndjson relay-calls.ndjson.0; restart). Once // volume warrants, replace this with a daily-rotated logger or move to // SQLite for indexed time-range queries. import fs from "fs/promises"; import { createReadStream } from "fs"; import readline from "readline"; import path from "path"; let dataDir = "/data"; let logPath = "/data/relay-calls.ndjson"; export async function initAuditLog({ dataDir: dd }) { if (dd) dataDir = dd; logPath = path.join(dataDir, "relay-calls.ndjson"); // Ensure the file exists so the streaming read path doesn't trip. try { await fs.access(logPath); } catch { await fs.writeFile(logPath, "", { mode: 0o600 }); } console.log(`[audit-log] writing to ${logPath}`); } // Best-effort append. Errors are logged but never rethrown — losing // an audit line shouldn't fail the relay call that caused it. export async function recordCall(entry) { const record = { ts: Date.now(), ...entry }; try { await fs.appendFile(logPath, JSON.stringify(record) + "\n", { mode: 0o600 }); } catch (err) { console.error(`[audit-log] append failed: ${err?.message || err}`); } } // Read all entries since `sinceMs` (default: 30 days). Streamed // line-by-line so the whole file doesn't sit in memory at once. // Returned array is newest-first. export async function readEntries({ sinceMs = Date.now() - 30 * 24 * 3600 * 1000, untilMs = Number.POSITIVE_INFINITY, } = {}) { const out = []; try { const stream = createReadStream(logPath, { encoding: "utf8" }); const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); for await (const line of rl) { if (!line.trim()) continue; try { const r = JSON.parse(line); if (typeof r.ts === "number" && r.ts >= sinceMs && r.ts <= untilMs) { out.push(r); } } catch { // Bad line — skip silently. Doesn't disrupt the rest of the read. } } } catch (err) { if (err.code !== "ENOENT") { console.error(`[audit-log] read failed: ${err?.message || err}`); } } // Newest first by ts. out.sort((a, b) => b.ts - a.ts); return out; } // Compute multi-dimensional aggregates over a set of entries. The // dashboard renders all of these — each is a small object array // suitable for direct tabulation. export function aggregate(entries) { const calls = entries.length; const success = entries.filter((e) => e.status === "success").length; const errors = entries.filter((e) => e.status === "error").length; const refused = entries.filter((e) => e.status === "refused").length; let totalCost = 0; let totalDuration = 0; let totalInputTokens = 0; let totalOutputTokens = 0; let totalThinkingTokens = 0; for (const e of entries) { totalCost += e.cost_usd || 0; totalDuration += e.duration_ms || 0; totalInputTokens += e.input_tokens || 0; totalOutputTokens += e.output_tokens || 0; totalThinkingTokens += e.thinking_tokens || 0; } // ── By tier ── const byTier = groupBy(entries, (e) => e.tier || "unknown"); const tierRows = Object.entries(byTier).map(([tier, list]) => ({ tier, calls: list.length, cost_usd: sumBy(list, "cost_usd"), avg_duration_ms: avgBy(list, "duration_ms"), unique_installs: new Set(list.map((e) => e.install_id)).size, })); // ── By model ── const byModel = groupBy(entries, (e) => e.model || "unknown"); const modelRows = Object.entries(byModel).map(([model, list]) => ({ model, calls: list.length, cost_usd: sumBy(list, "cost_usd"), input_tokens: sumBy(list, "input_tokens"), output_tokens: sumBy(list, "output_tokens"), thinking_tokens: sumBy(list, "thinking_tokens"), avg_duration_ms: avgBy(list, "duration_ms"), avg_cost_usd: list.length > 0 ? sumBy(list, "cost_usd") / list.length : 0, })); // ── By pipeline ── const byPipeline = groupBy(entries, (e) => e.pipeline || "unknown"); const pipelineRows = Object.entries(byPipeline).map(([pipeline, list]) => ({ pipeline, calls: list.length, cost_usd: sumBy(list, "cost_usd"), avg_duration_ms: avgBy(list, "duration_ms"), })); // ── By backend ── const byBackend = groupBy(entries, (e) => e.backend || "unknown"); const backendRows = Object.entries(byBackend).map(([backend, list]) => ({ backend, calls: list.length, cost_usd: sumBy(list, "cost_usd"), avg_duration_ms: avgBy(list, "duration_ms"), })); // ── By install (top 20 by spend) ── const byInstall = groupBy(entries, (e) => e.install_id || "unknown"); const installRows = Object.entries(byInstall) .map(([install, list]) => ({ install_id: install, tier_snapshot: list[0]?.tier || "core", calls: list.length, cost_usd: sumBy(list, "cost_usd"), // Distinct summarize jobs (collapse transcribe+analyze pairs). summaries: new Set(list.map((e) => e.job_id).filter(Boolean)).size, avg_duration_ms: avgBy(list, "duration_ms"), last_active_at: Math.max(...list.map((e) => e.ts || 0)), })) .sort((a, b) => b.cost_usd - a.cost_usd) .slice(0, 20); // ── By hour-of-day (for traffic-pattern view) ── const byHour = groupBy(entries, (e) => new Date(e.ts).getUTCHours()); const hourRows = Array.from({ length: 24 }, (_, h) => { const list = byHour[h] || []; return { hour_utc: h, calls: list.length, cost_usd: sumBy(list, "cost_usd"), }; }); // ── Cost vs speed (per-model averages) ── // Same source as modelRows but kept separate so the dashboard can // render it as a scatter / table without extra transformation. const costSpeedRows = modelRows .map((r) => ({ model: r.model, avg_cost_usd: r.avg_cost_usd, avg_duration_ms: r.avg_duration_ms, calls: r.calls, })) .sort((a, b) => a.avg_duration_ms - b.avg_duration_ms); return { summary: { calls, success, errors, refused, success_rate: calls > 0 ? success / calls : 0, total_cost_usd: totalCost, total_duration_ms: totalDuration, avg_duration_ms: calls > 0 ? totalDuration / calls : 0, total_input_tokens: totalInputTokens, total_output_tokens: totalOutputTokens, total_thinking_tokens: totalThinkingTokens, }, by_tier: tierRows, by_model: modelRows, by_pipeline: pipelineRows, by_backend: backendRows, by_install: installRows, by_hour_utc: hourRows, cost_vs_speed: costSpeedRows, }; } function groupBy(list, keyFn) { const out = {}; for (const item of list) { const k = keyFn(item); if (!out[k]) out[k] = []; out[k].push(item); } return out; } function sumBy(list, key) { let s = 0; for (const item of list) s += item[key] || 0; return s; } function avgBy(list, key) { if (list.length === 0) return 0; return sumBy(list, key) / list.length; }