Files
recap-relay/server/audit-log.js
T

240 lines
8.1 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Per-call audit log for profitability + observability. Each relay
// request (success or failure) appends one line of newline-delimited
// JSON to /data/relay-calls.ndjson. Append-only — read paths parse
// the whole file in memory for aggregation, which is cheap up to
// 100k+ entries at typical relay scale (low-tens-of-thousands of
// calls per month).
//
// Record shape (no field is required; missing fields just don't
// appear in aggregations):
// {
// ts: ms-epoch when the request landed
// install_id: X-Recap-Install-Id (truncated for log readability)
// tier: "core" | "pro" | "max"
// pipeline: "transcribe" | "analyze"
// backend: "gemini" | "hardware"
// model: e.g. "gemini-3-flash-preview", "parakeet-tdt-0.6b-v3"
// status: "success" | "error" | "refused" (refused = quota)
// credit_charged: 0 | 1
// duration_ms: end-to-end wall time
// input_tokens, output_tokens, thinking_tokens (Gemini only)
// cost_usd: computed from token counts × per-1M-token rates
// job_id: X-Recap-Job-Id (so we can collapse pairs into one)
// error: short error string if status="error"
// }
//
// Rotation isn't built in — for the prototype, operator can rotate
// manually (mv relay-calls.ndjson relay-calls.ndjson.0; restart). Once
// volume warrants, replace this with a daily-rotated logger or move to
// SQLite for indexed time-range queries.
import fs from "fs/promises";
import { createReadStream } from "fs";
import readline from "readline";
import path from "path";
let dataDir = "/data";
let logPath = "/data/relay-calls.ndjson";
export async function initAuditLog({ dataDir: dd }) {
if (dd) dataDir = dd;
logPath = path.join(dataDir, "relay-calls.ndjson");
// Ensure the file exists so the streaming read path doesn't trip.
try {
await fs.access(logPath);
} catch {
await fs.writeFile(logPath, "", { mode: 0o600 });
}
console.log(`[audit-log] writing to ${logPath}`);
}
// Best-effort append. Errors are logged but never rethrown — losing
// an audit line shouldn't fail the relay call that caused it.
export async function recordCall(entry) {
const record = { ts: Date.now(), ...entry };
try {
await fs.appendFile(logPath, JSON.stringify(record) + "\n", { mode: 0o600 });
} catch (err) {
console.error(`[audit-log] append failed: ${err?.message || err}`);
}
}
// Read all entries since `sinceMs` (default: 30 days). Streamed
// line-by-line so the whole file doesn't sit in memory at once.
// Returned array is newest-first.
export async function readEntries({
sinceMs = Date.now() - 30 * 24 * 3600 * 1000,
untilMs = Number.POSITIVE_INFINITY,
} = {}) {
const out = [];
try {
const stream = createReadStream(logPath, { encoding: "utf8" });
const rl = readline.createInterface({ input: stream, crlfDelay: Infinity });
for await (const line of rl) {
if (!line.trim()) continue;
try {
const r = JSON.parse(line);
if (typeof r.ts === "number" && r.ts >= sinceMs && r.ts <= untilMs) {
out.push(r);
}
} catch {
// Bad line — skip silently. Doesn't disrupt the rest of the read.
}
}
} catch (err) {
if (err.code !== "ENOENT") {
console.error(`[audit-log] read failed: ${err?.message || err}`);
}
}
// Newest first by ts.
out.sort((a, b) => b.ts - a.ts);
return out;
}
// Compute multi-dimensional aggregates over a set of entries. The
// dashboard renders all of these — each is a small object array
// suitable for direct tabulation.
export function aggregate(entries) {
const calls = entries.length;
const success = entries.filter((e) => e.status === "success").length;
const errors = entries.filter((e) => e.status === "error").length;
const refused = entries.filter((e) => e.status === "refused").length;
let totalCost = 0;
let totalDuration = 0;
let totalInputTokens = 0;
let totalOutputTokens = 0;
let totalThinkingTokens = 0;
for (const e of entries) {
totalCost += e.cost_usd || 0;
totalDuration += e.duration_ms || 0;
totalInputTokens += e.input_tokens || 0;
totalOutputTokens += e.output_tokens || 0;
totalThinkingTokens += e.thinking_tokens || 0;
}
// ── By tier ──
const byTier = groupBy(entries, (e) => e.tier || "unknown");
const tierRows = Object.entries(byTier).map(([tier, list]) => ({
tier,
calls: list.length,
cost_usd: sumBy(list, "cost_usd"),
avg_duration_ms: avgBy(list, "duration_ms"),
unique_installs: new Set(list.map((e) => e.install_id)).size,
}));
// ── By model ──
const byModel = groupBy(entries, (e) => e.model || "unknown");
const modelRows = Object.entries(byModel).map(([model, list]) => ({
model,
calls: list.length,
cost_usd: sumBy(list, "cost_usd"),
input_tokens: sumBy(list, "input_tokens"),
output_tokens: sumBy(list, "output_tokens"),
thinking_tokens: sumBy(list, "thinking_tokens"),
avg_duration_ms: avgBy(list, "duration_ms"),
avg_cost_usd: list.length > 0 ? sumBy(list, "cost_usd") / list.length : 0,
}));
// ── By pipeline ──
const byPipeline = groupBy(entries, (e) => e.pipeline || "unknown");
const pipelineRows = Object.entries(byPipeline).map(([pipeline, list]) => ({
pipeline,
calls: list.length,
cost_usd: sumBy(list, "cost_usd"),
avg_duration_ms: avgBy(list, "duration_ms"),
}));
// ── By backend ──
const byBackend = groupBy(entries, (e) => e.backend || "unknown");
const backendRows = Object.entries(byBackend).map(([backend, list]) => ({
backend,
calls: list.length,
cost_usd: sumBy(list, "cost_usd"),
avg_duration_ms: avgBy(list, "duration_ms"),
}));
// ── By install (top 20 by spend) ──
const byInstall = groupBy(entries, (e) => e.install_id || "unknown");
const installRows = Object.entries(byInstall)
.map(([install, list]) => ({
install_id: install,
tier_snapshot: list[0]?.tier || "core",
calls: list.length,
cost_usd: sumBy(list, "cost_usd"),
// Distinct summarize jobs (collapse transcribe+analyze pairs).
summaries: new Set(list.map((e) => e.job_id).filter(Boolean)).size,
avg_duration_ms: avgBy(list, "duration_ms"),
last_active_at: Math.max(...list.map((e) => e.ts || 0)),
}))
.sort((a, b) => b.cost_usd - a.cost_usd)
.slice(0, 20);
// ── By hour-of-day (for traffic-pattern view) ──
const byHour = groupBy(entries, (e) => new Date(e.ts).getUTCHours());
const hourRows = Array.from({ length: 24 }, (_, h) => {
const list = byHour[h] || [];
return {
hour_utc: h,
calls: list.length,
cost_usd: sumBy(list, "cost_usd"),
};
});
// ── Cost vs speed (per-model averages) ──
// Same source as modelRows but kept separate so the dashboard can
// render it as a scatter / table without extra transformation.
const costSpeedRows = modelRows
.map((r) => ({
model: r.model,
avg_cost_usd: r.avg_cost_usd,
avg_duration_ms: r.avg_duration_ms,
calls: r.calls,
}))
.sort((a, b) => a.avg_duration_ms - b.avg_duration_ms);
return {
summary: {
calls,
success,
errors,
refused,
success_rate: calls > 0 ? success / calls : 0,
total_cost_usd: totalCost,
total_duration_ms: totalDuration,
avg_duration_ms: calls > 0 ? totalDuration / calls : 0,
total_input_tokens: totalInputTokens,
total_output_tokens: totalOutputTokens,
total_thinking_tokens: totalThinkingTokens,
},
by_tier: tierRows,
by_model: modelRows,
by_pipeline: pipelineRows,
by_backend: backendRows,
by_install: installRows,
by_hour_utc: hourRows,
cost_vs_speed: costSpeedRows,
};
}
function groupBy(list, keyFn) {
const out = {};
for (const item of list) {
const k = keyFn(item);
if (!out[k]) out[k] = [];
out[k].push(item);
}
return out;
}
function sumBy(list, key) {
let s = 0;
for (const item of list) s += item[key] || 0;
return s;
}
function avgBy(list, key) {
if (list.length === 0) return 0;
return sumBy(list, key) / list.length;
}