Files
recap-relay/server/audit-log.js
T

642 lines
25 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Per-call audit log for profitability + observability. Each relay
// request (success or failure) appends one line of newline-delimited
// JSON to /data/relay-calls.ndjson. Append-only — read paths parse
// the whole file in memory for aggregation, which is cheap up to
// 100k+ entries at typical relay scale (low-tens-of-thousands of
// calls per month).
//
// Record shape (no field is required; missing fields just don't
// appear in aggregations):
// {
// ts: ms-epoch when the request landed
// install_id: X-Recap-Install-Id (truncated for log readability)
// license_fingerprint: stable 16-hex hash of the licenseUuid for
// paid-tier calls; null for anonymous/Core. Added
// in the license-keyed-credits refactor so spend can
// be aggregated by license-pool (since one license
// may span multiple installs). install_id is STILL
// logged on every entry — license_fingerprint is
// additive forensic visibility, not a replacement.
// tier: "core" | "pro" | "max"
// pipeline: "transcribe" | "analyze"
// backend: "gemini" | "hardware"
// model: e.g. "gemini-3-flash-preview", "parakeet-tdt-0.6b-v3"
// status: "success" | "error" | "refused" (refused = quota)
// credit_charged: 0 | 1
// duration_ms: end-to-end wall time
// input_tokens, output_tokens, thinking_tokens (Gemini only)
// cost_usd: computed from token counts × per-1M-token rates
// job_id: X-Recap-Job-Id (so we can collapse pairs into one)
// error: short error string if status="error"
// }
//
// Rotation isn't built in — for the prototype, operator can rotate
// manually (mv relay-calls.ndjson relay-calls.ndjson.0; restart). Once
// volume warrants, replace this with a daily-rotated logger or move to
// SQLite for indexed time-range queries.
import fs from "fs/promises";
import { createReadStream } from "fs";
import readline from "readline";
import path from "path";
let dataDir = "/data";
let logPath = "/data/relay-calls.ndjson";
// Size at which we rotate the live ndjson to a dated archive. Picked
// to roughly match a year of high-volume relay traffic — a typical
// entry is ~400 bytes, so 50MB ≈ 130k entries. Rotation runs once at
// boot; the operator can also rotate manually any time.
const ROTATION_THRESHOLD_BYTES = 50 * 1024 * 1024;
export async function initAuditLog({ dataDir: dd }) {
if (dd) dataDir = dd;
logPath = path.join(dataDir, "relay-calls.ndjson");
await maybeRotateLog();
// Ensure the file exists so the streaming read path doesn't trip.
try {
await fs.access(logPath);
} catch {
await fs.writeFile(logPath, "", { mode: 0o600 });
}
console.log(`[audit-log] writing to ${logPath}`);
}
// Rotate the live ndjson to a dated archive when it grows past the
// threshold. The dashboard's `readEntries` always reads the live file
// only — archived entries fall out of the rolling 30-day window
// naturally and are kept around as raw files for ad-hoc analysis or
// long-term storage / CSV export. If a same-day archive already exists
// (e.g. operator restarts the relay mid-rotation), append a counter.
async function maybeRotateLog() {
let stat;
try {
stat = await fs.stat(logPath);
} catch {
return; // No file yet — nothing to rotate.
}
if (stat.size < ROTATION_THRESHOLD_BYTES) return;
const ymd = new Date().toISOString().slice(0, 10);
let archive = path.join(dataDir, `relay-calls-${ymd}.ndjson`);
let counter = 1;
while (true) {
try {
await fs.access(archive);
// Exists; pick a new name with a counter suffix.
archive = path.join(dataDir, `relay-calls-${ymd}.${counter}.ndjson`);
counter += 1;
if (counter > 99) return; // pathological — give up rotating
} catch {
break; // Free name found.
}
}
try {
await fs.rename(logPath, archive);
await fs.writeFile(logPath, "", { mode: 0o600 });
console.log(
`[audit-log] rotated ${(stat.size / 1024 / 1024).toFixed(1)}MB → ${archive}`
);
} catch (err) {
console.warn(`[audit-log] rotation failed: ${err?.message || err}`);
}
}
// Best-effort append. Errors are logged but never rethrown — losing
// an audit line shouldn't fail the relay call that caused it.
export async function recordCall(entry) {
const record = { ts: Date.now(), ...entry };
try {
await fs.appendFile(logPath, JSON.stringify(record) + "\n", { mode: 0o600 });
} catch (err) {
console.error(`[audit-log] append failed: ${err?.message || err}`);
}
}
// Truncate the entire audit log. Used by the dashboard's "Delete all"
// button for cleanup before going-live or after a string of bad-data
// test runs (relay re-installed mid-run, config tweaks producing
// inconsistent measurements, etc.). Destructive — no undo.
export async function clearAllAuditEntries() {
try {
await fs.writeFile(logPath, "", { mode: 0o600 });
return { ok: true };
} catch (err) {
return { ok: false, error: err?.message || String(err) };
}
}
// Delete audit rows matching specific job_ids. Reads the whole log,
// filters out lines belonging to the target jobs, writes the remainder
// back. O(N) on the file size; fine for any plausible audit log (we
// rotate at 64MB anyway). Returns the count of rows removed.
export async function deleteAuditRowsByJobIds(jobIds) {
if (!Array.isArray(jobIds) || jobIds.length === 0) return { deleted: 0 };
const idSet = new Set(jobIds);
const lines = [];
let deleted = 0;
try {
const stream = createReadStream(logPath, { encoding: "utf8" });
const rl = readline.createInterface({ input: stream, crlfDelay: Infinity });
for await (const line of rl) {
if (!line.trim()) continue;
try {
const r = JSON.parse(line);
if (r.job_id && idSet.has(r.job_id)) {
deleted += 1;
continue;
}
lines.push(line);
} catch {
// Bad line — preserve it rather than dropping; matches the
// skip-and-continue behavior of readEntries.
lines.push(line);
}
}
} catch (err) {
if (err.code !== "ENOENT") throw err;
}
await fs.writeFile(logPath, lines.join("\n") + (lines.length ? "\n" : ""), {
mode: 0o600,
});
return { deleted };
}
// Read all entries since `sinceMs` (default: 30 days). Streamed
// line-by-line so the whole file doesn't sit in memory at once.
// Returned array is newest-first.
export async function readEntries({
sinceMs = Date.now() - 30 * 24 * 3600 * 1000,
untilMs = Number.POSITIVE_INFINITY,
} = {}) {
const out = [];
try {
const stream = createReadStream(logPath, { encoding: "utf8" });
const rl = readline.createInterface({ input: stream, crlfDelay: Infinity });
for await (const line of rl) {
if (!line.trim()) continue;
try {
const r = JSON.parse(line);
if (typeof r.ts === "number" && r.ts >= sinceMs && r.ts <= untilMs) {
out.push(r);
}
} catch {
// Bad line — skip silently. Doesn't disrupt the rest of the read.
}
}
} catch (err) {
if (err.code !== "ENOENT") {
console.error(`[audit-log] read failed: ${err?.message || err}`);
}
}
// Newest first by ts.
out.sort((a, b) => b.ts - a.ts);
return out;
}
// Compute multi-dimensional aggregates over a set of entries. The
// dashboard renders all of these — each is a small object array
// suitable for direct tabulation.
export function aggregate(entries) {
const calls = entries.length;
const success = entries.filter((e) => e.status === "success").length;
const errors = entries.filter((e) => e.status === "error").length;
const refused = entries.filter((e) => e.status === "refused").length;
let totalCost = 0;
let totalDuration = 0;
let totalInputTokens = 0;
let totalOutputTokens = 0;
let totalThinkingTokens = 0;
for (const e of entries) {
totalCost += e.cost_usd || 0;
totalDuration += e.duration_ms || 0;
totalInputTokens += e.input_tokens || 0;
totalOutputTokens += e.output_tokens || 0;
totalThinkingTokens += e.thinking_tokens || 0;
}
// ── By tier ──
// unique_users is install-count for Core (no license to dedup against)
// and distinct-license-count for paid tiers (so a Pro license active
// on two installs counts ONCE here, matching the post-refactor credit
// model where they share one monthly pool). Falls back to install_id
// for paid entries that predate the license_fingerprint field.
const byTier = groupBy(entries, (e) => e.tier || "unknown");
const tierRows = Object.entries(byTier).map(([tier, list]) => ({
tier,
calls: list.length,
cost_usd: sumBy(list, "cost_usd"),
avg_duration_ms: avgBy(list, "duration_ms"),
unique_installs:
tier === "core" || tier === "unknown"
? new Set(list.map((e) => e.install_id)).size
: new Set(list.map((e) => e.license_fingerprint || e.install_id)).size,
}));
// ── By model ──
const byModel = groupBy(entries, (e) => e.model || "unknown");
const modelRows = Object.entries(byModel).map(([model, list]) => ({
model,
calls: list.length,
cost_usd: sumBy(list, "cost_usd"),
input_tokens: sumBy(list, "input_tokens"),
output_tokens: sumBy(list, "output_tokens"),
thinking_tokens: sumBy(list, "thinking_tokens"),
avg_duration_ms: avgBy(list, "duration_ms"),
avg_cost_usd: list.length > 0 ? sumBy(list, "cost_usd") / list.length : 0,
}));
// ── By pipeline ──
const byPipeline = groupBy(entries, (e) => e.pipeline || "unknown");
const pipelineRows = Object.entries(byPipeline).map(([pipeline, list]) => ({
pipeline,
calls: list.length,
cost_usd: sumBy(list, "cost_usd"),
avg_duration_ms: avgBy(list, "duration_ms"),
}));
// ── By backend ──
const byBackend = groupBy(entries, (e) => e.backend || "unknown");
const backendRows = Object.entries(byBackend).map(([backend, list]) => ({
backend,
calls: list.length,
cost_usd: sumBy(list, "cost_usd"),
avg_duration_ms: avgBy(list, "duration_ms"),
}));
// ── By install (top 20 by spend) ──
const byInstall = groupBy(entries, (e) => e.install_id || "unknown");
const installRows = Object.entries(byInstall)
.map(([install, list]) => ({
install_id: install,
tier_snapshot: list[0]?.tier || "core",
calls: list.length,
cost_usd: sumBy(list, "cost_usd"),
// Distinct summarize jobs (collapse transcribe+analyze pairs).
summaries: new Set(list.map((e) => e.job_id).filter(Boolean)).size,
avg_duration_ms: avgBy(list, "duration_ms"),
last_active_at: Math.max(...list.map((e) => e.ts || 0)),
}))
.sort((a, b) => b.cost_usd - a.cost_usd)
.slice(0, 20);
// ── By license fingerprint (top 20 by spend, paid tiers only) ──
// One license may span multiple installs (cloud account + self-host),
// and the post-refactor credit ledger aggregates their spend onto a
// single pool. This view mirrors that — operators get a "by paid
// user" rollup that doesn't double-count multi-install Pros, plus an
// install-count column to see distribution per license.
const byLicense = groupBy(
entries.filter((e) => e.license_fingerprint),
(e) => e.license_fingerprint
);
const licenseRows = Object.entries(byLicense)
.map(([fp, list]) => ({
license_fingerprint: fp,
tier_snapshot: list[0]?.tier || "core",
calls: list.length,
cost_usd: sumBy(list, "cost_usd"),
summaries: new Set(list.map((e) => e.job_id).filter(Boolean)).size,
unique_installs: new Set(list.map((e) => e.install_id).filter(Boolean)).size,
avg_duration_ms: avgBy(list, "duration_ms"),
last_active_at: Math.max(...list.map((e) => e.ts || 0)),
}))
.sort((a, b) => b.cost_usd - a.cost_usd)
.slice(0, 20);
// ── By hour-of-day (for traffic-pattern view) ──
const byHour = groupBy(entries, (e) => new Date(e.ts).getUTCHours());
const hourRows = Array.from({ length: 24 }, (_, h) => {
const list = byHour[h] || [];
return {
hour_utc: h,
calls: list.length,
cost_usd: sumBy(list, "cost_usd"),
};
});
// ── Cost vs speed (per-model averages) ──
// Same source as modelRows but kept separate so the dashboard can
// render it as a scatter / table without extra transformation.
const costSpeedRows = modelRows
.map((r) => ({
model: r.model,
avg_cost_usd: r.avg_cost_usd,
avg_duration_ms: r.avg_duration_ms,
calls: r.calls,
}))
.sort((a, b) => a.avg_duration_ms - b.avg_duration_ms);
// ── Per-summary rollup (collapse transcribe + analyze pairs) ──
// Every "summarize" produces 2 audit entries — one transcribe, one
// analyze — sharing a job_id. The dashboard's call-level views show
// them separately, which is useful for backend-vs-pipeline tuning but
// confusing as "how many summaries did I serve". Group by job_id so
// operators see one row per summary with combined cost/duration.
// Entries without a job_id (older relay versions, or balance pings)
// are bucketed into their own "no-jobid" row at the bottom.
const byJob = groupBy(entries, (e) => e.job_id || "__no_jobid__");
const summaryRows = Object.entries(byJob)
.filter(([k]) => k !== "__no_jobid__")
.map(([jobId, list]) => {
const transcribe = list.find((e) => e.pipeline === "transcribe");
const analyze = list.find((e) => e.pipeline === "analyze");
return {
job_id: jobId,
install_id: list[0]?.install_id || null,
tier: list[0]?.tier || null,
started_at: Math.min(...list.map((e) => e.ts || Infinity)),
completed_at: Math.max(...list.map((e) => e.ts || 0)),
transcribe_backend: transcribe?.backend || null,
transcribe_model: transcribe?.model || null,
analyze_backend: analyze?.backend || null,
analyze_model: analyze?.model || null,
total_cost_usd: sumBy(list, "cost_usd"),
total_duration_ms: sumBy(list, "duration_ms"),
status:
list.every((e) => e.status === "success")
? "success"
: list.some((e) => e.status === "error")
? "error"
: "partial",
had_transcribe: !!transcribe,
had_analyze: !!analyze,
};
})
.sort((a, b) => b.completed_at - a.completed_at);
// ── Recent errors (newest 50) ──
// Quick triage view — when something is failing, the operator needs
// to see the offending error strings without scrolling the full
// call log.
// Surface any audit row carrying an error message — that catches
// status="error" (true backend failures) AND status="partial"
// (e.g. transcribe-with-truncated-chunks, which records the
// missing-speech message in the error field). Operators rely on
// this view to triage all degraded behavior, not just outright
// 5xx-class failures, so the broader filter is the right default.
const errorRows = entries
.filter((e) => e.error)
.slice(0, 50)
.map((e) => ({
ts: e.ts,
install_id: e.install_id || null,
tier: e.tier || null,
pipeline: e.pipeline || null,
backend: e.backend || null,
model: e.model || null,
duration_ms: e.duration_ms || 0,
error: (e.error || "").slice(0, 280),
attempts: Array.isArray(e.attempts) ? e.attempts : null,
}));
// ── Per-(pipeline, model) performance + failure tables ──
// Normalizes raw duration_ms by audio_seconds so different models
// can be compared on a backend-agnostic benchmark: how many ms of
// wall-clock time does this model take per minute of audio? Analyze
// calls don't have audio (they consume the transcript text), so we
// report ms-per-1k-input-tokens for those instead.
//
// Failure rate is computed against `attempted` (success + error)
// and excludes `refused` calls — refused requests never reached the
// backend, so they shouldn't count against the model's reliability.
const byPipelineModel = {};
for (const e of entries) {
const pipeline = e.pipeline || "unknown";
const model = e.model || "unknown";
if (model === "unknown" && e.status === "refused") continue; // refused entries often have no model
const key = `${pipeline}::${model}`;
if (!byPipelineModel[key]) {
byPipelineModel[key] = {
pipeline,
model,
calls: 0,
success: 0,
errors: 0,
refused: 0,
partials: 0,
sum_duration_ms: 0,
sum_audio_seconds: 0,
sum_input_tokens: 0,
sum_output_tokens: 0,
error_counts: {}, // { error_signature: count }
};
}
const row = byPipelineModel[key];
row.calls += 1;
if (e.status === "success") row.success += 1;
if (e.status === "error") row.errors += 1;
if (e.status === "refused") row.refused += 1;
if (e.status === "partial") row.partials += 1;
row.sum_duration_ms += e.duration_ms || 0;
if (typeof e.audio_seconds === "number" && e.audio_seconds > 0) {
row.sum_audio_seconds += e.audio_seconds;
}
row.sum_input_tokens += e.input_tokens || 0;
row.sum_output_tokens += e.output_tokens || 0;
// Aggregate the top-error counts off ANY row that has a populated
// error message — not just status="error" rows. Partial (truncated
// transcribe) and refused (out-of-credits, capacity-gated) rows
// also carry useful error strings the operator wants to see in
// the "Top failure modes" table. The old gate `status === "error"`
// hid all truncations because they're recorded as status="partial".
if (e.error) {
const sig = errorSignature(e.error);
row.error_counts[sig] = (row.error_counts[sig] || 0) + 1;
}
}
const perfByModel = Object.values(byPipelineModel).map((r) => {
const attempted = r.success + r.errors;
const successRate = attempted > 0 ? r.success / attempted : null;
const audioMin = r.sum_audio_seconds / 60;
const msPerAudioMin = audioMin > 0 ? r.sum_duration_ms / audioMin : null;
const msPer1kInputTokens =
r.sum_input_tokens > 0
? r.sum_duration_ms / (r.sum_input_tokens / 1000)
: null;
// Top 3 error signatures by frequency for this model.
const topErrors = Object.entries(r.error_counts)
.map(([signature, count]) => ({ signature, count }))
.sort((a, b) => b.count - a.count)
.slice(0, 3);
return {
pipeline: r.pipeline,
model: r.model,
calls: r.calls,
success: r.success,
errors: r.errors,
refused: r.refused,
partials: r.partials,
// "failures" = total signal worth surfacing in failure tables.
// Includes partials so a TX that lost minutes of speech via a
// truncated chunk is counted as a failure mode, not silently
// tucked away under a "success" pipe. The errors-by-model
// dashboard table reads this; the per-call "errors" field stays
// available for stricter computations.
failures: r.errors + r.partials,
success_rate: successRate,
// Speed benchmark fields. Either or both may be null when there
// wasn't enough successful-with-metadata data to compute them.
ms_per_audio_minute: msPerAudioMin,
ms_per_1k_input_tokens: msPer1kInputTokens,
total_audio_minutes: audioMin > 0 ? audioMin : null,
top_errors: topErrors,
};
});
// ── Revenue / margin (requires tier prices supplied by caller) ──
// Distinct paying USERS in the window × the operator's per-tier
// monthly price. For Core (free) we count distinct installs — that's
// still the right grain for free-tier "active users", since Core has
// no license to dedup against. For Pro/Max we count distinct license
// fingerprints so a single Pro license activated on two installs
// (cloud + self-host) counts ONCE toward monthly revenue, matching
// the post-refactor credit model where they share one monthly pool.
// Falls back to install_id for paid entries missing a fingerprint
// (legacy pre-refactor audit rows) so historical ranges stay
// approximately correct rather than dropping to zero.
//
// Strictly an *estimate* — the relay doesn't know if a Pro user
// actually paid this month, just that they touched a request.
// Underestimates churned customers (who paid but didn't call) and
// overestimates trial users (who haven't paid yet). Hooked in by
// the dashboard route, not here, so tests can pass an empty prices
// map and get zero.
const tierActiveInstalls = {
core: new Set(),
pro: new Set(),
max: new Set(),
};
for (const e of entries) {
const t = e.tier || "core";
if (!tierActiveInstalls[t]) continue;
if (t === "core") {
if (e.install_id) tierActiveInstalls.core.add(e.install_id);
} else {
// Paid: prefer fingerprint, fall back to install_id for legacy rows.
const id = e.license_fingerprint || e.install_id;
if (id) tierActiveInstalls[t].add(id);
}
}
return {
summary: {
calls,
success,
errors,
refused,
success_rate: calls > 0 ? success / calls : 0,
total_cost_usd: totalCost,
total_duration_ms: totalDuration,
avg_duration_ms: calls > 0 ? totalDuration / calls : 0,
total_input_tokens: totalInputTokens,
total_output_tokens: totalOutputTokens,
total_thinking_tokens: totalThinkingTokens,
total_summaries: summaryRows.length,
// active_installs_by_tier name retained for dashboard compatibility,
// but the paid-tier counts here are actually DISTINCT LICENSES,
// not distinct installs (see the comment on tierActiveInstalls
// above). Core remains install-based. The dashboard label is
// "Active users by tier" which fits either grain.
active_installs_by_tier: {
core: tierActiveInstalls.core.size,
pro: tierActiveInstalls.pro.size,
max: tierActiveInstalls.max.size,
},
},
by_tier: tierRows,
by_model: modelRows,
by_pipeline: pipelineRows,
by_backend: backendRows,
by_install: installRows,
by_license: licenseRows,
by_hour_utc: hourRows,
cost_vs_speed: costSpeedRows,
by_summary: summaryRows,
errors: errorRows,
perf_by_model: perfByModel,
};
}
// Normalize a raw error string into a stable signature so two
// near-identical messages bucket together. The audit log stores
// truncated raw messages — we want the bucket key to be coarse enough
// that small variations (a different request-id, file name, port
// number, etc.) collapse into a single error class.
//
// Heuristics:
// - Strip ISO timestamps and timestamps with offsets
// - Strip UUIDs / hex blob hashes / long alphanumeric IDs
// - Strip numeric file sizes and ports
// - Strip URLs to their host + path-pattern
// - Trim to first 120 chars after normalization
function errorSignature(raw) {
if (!raw) return "(unknown)";
let s = String(raw);
s = s.replace(/\b\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:?\d{2})?\b/g, "<ts>");
s = s.replace(/\b[0-9a-f]{32,}\b/gi, "<hex>");
s = s.replace(/\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b/gi, "<uuid>");
s = s.replace(/https?:\/\/[^\s)"']+/g, "<url>");
s = s.replace(/:\d{2,5}\b/g, ":<port>");
s = s.replace(/\b\d{4,}\b/g, "<n>");
return s.trim().slice(0, 120);
}
// Derived revenue/margin numbers. Pulled out of aggregate() because it
// needs prices the operator sets in config — keeping the core
// aggregator config-agnostic. Returns:
// {
// monthly_revenue_usd: pro_count * pro_price + max_count * max_price
// + core_count * core_price,
// gemini_cost_usd_in_range: summary.total_cost_usd (passed through),
// margin_usd: revenue - cost, // approximate
// by_tier_revenue: [{ tier, active_installs, price_usd, revenue_usd }],
// }
//
// `active_installs_by_tier` should come from the aggregate summary
// (Set sizes already computed there). `prices` is the {core,pro,max}
// USD-per-month map. `geminiCostInRange` is total_cost_usd from the
// summary.
export function computeRevenue({ activeInstallsByTier, prices, geminiCostInRange }) {
const tiers = ["core", "pro", "max"];
const byTier = tiers.map((tier) => {
const installs = activeInstallsByTier?.[tier] || 0;
const price = Math.max(0, Number(prices?.[tier] ?? 0));
return {
tier,
active_installs: installs,
price_usd: price,
revenue_usd: installs * price,
};
});
const revenue = byTier.reduce((s, r) => s + r.revenue_usd, 0);
return {
monthly_revenue_usd: revenue,
gemini_cost_usd_in_range: geminiCostInRange,
margin_usd: revenue - geminiCostInRange,
by_tier_revenue: byTier,
};
}
function groupBy(list, keyFn) {
const out = {};
for (const item of list) {
const k = keyFn(item);
if (!out[k]) out[k] = [];
out[k].push(item);
}
return out;
}
function sumBy(list, key) {
let s = 0;
for (const item of list) s += item[key] || 0;
return s;
}
function avgBy(list, key) {
if (list.length === 0) return 0;
return sumBy(list, key) / list.length;
}