240 lines
8.1 KiB
JavaScript
240 lines
8.1 KiB
JavaScript
// Per-call audit log for profitability + observability. Each relay
|
||
// request (success or failure) appends one line of newline-delimited
|
||
// JSON to /data/relay-calls.ndjson. Append-only — read paths parse
|
||
// the whole file in memory for aggregation, which is cheap up to
|
||
// 100k+ entries at typical relay scale (low-tens-of-thousands of
|
||
// calls per month).
|
||
//
|
||
// Record shape (no field is required; missing fields just don't
|
||
// appear in aggregations):
|
||
// {
|
||
// ts: ms-epoch when the request landed
|
||
// install_id: X-Recap-Install-Id (truncated for log readability)
|
||
// tier: "core" | "pro" | "max"
|
||
// pipeline: "transcribe" | "analyze"
|
||
// backend: "gemini" | "hardware"
|
||
// model: e.g. "gemini-3-flash-preview", "parakeet-tdt-0.6b-v3"
|
||
// status: "success" | "error" | "refused" (refused = quota)
|
||
// credit_charged: 0 | 1
|
||
// duration_ms: end-to-end wall time
|
||
// input_tokens, output_tokens, thinking_tokens (Gemini only)
|
||
// cost_usd: computed from token counts × per-1M-token rates
|
||
// job_id: X-Recap-Job-Id (so we can collapse pairs into one)
|
||
// error: short error string if status="error"
|
||
// }
|
||
//
|
||
// Rotation isn't built in — for the prototype, operator can rotate
|
||
// manually (mv relay-calls.ndjson relay-calls.ndjson.0; restart). Once
|
||
// volume warrants, replace this with a daily-rotated logger or move to
|
||
// SQLite for indexed time-range queries.
|
||
|
||
import fs from "fs/promises";
|
||
import { createReadStream } from "fs";
|
||
import readline from "readline";
|
||
import path from "path";
|
||
|
||
let dataDir = "/data";
|
||
let logPath = "/data/relay-calls.ndjson";
|
||
|
||
export async function initAuditLog({ dataDir: dd }) {
|
||
if (dd) dataDir = dd;
|
||
logPath = path.join(dataDir, "relay-calls.ndjson");
|
||
// Ensure the file exists so the streaming read path doesn't trip.
|
||
try {
|
||
await fs.access(logPath);
|
||
} catch {
|
||
await fs.writeFile(logPath, "", { mode: 0o600 });
|
||
}
|
||
console.log(`[audit-log] writing to ${logPath}`);
|
||
}
|
||
|
||
// Best-effort append. Errors are logged but never rethrown — losing
|
||
// an audit line shouldn't fail the relay call that caused it.
|
||
export async function recordCall(entry) {
|
||
const record = { ts: Date.now(), ...entry };
|
||
try {
|
||
await fs.appendFile(logPath, JSON.stringify(record) + "\n", { mode: 0o600 });
|
||
} catch (err) {
|
||
console.error(`[audit-log] append failed: ${err?.message || err}`);
|
||
}
|
||
}
|
||
|
||
// Read all entries since `sinceMs` (default: 30 days). Streamed
|
||
// line-by-line so the whole file doesn't sit in memory at once.
|
||
// Returned array is newest-first.
|
||
export async function readEntries({
|
||
sinceMs = Date.now() - 30 * 24 * 3600 * 1000,
|
||
untilMs = Number.POSITIVE_INFINITY,
|
||
} = {}) {
|
||
const out = [];
|
||
try {
|
||
const stream = createReadStream(logPath, { encoding: "utf8" });
|
||
const rl = readline.createInterface({ input: stream, crlfDelay: Infinity });
|
||
for await (const line of rl) {
|
||
if (!line.trim()) continue;
|
||
try {
|
||
const r = JSON.parse(line);
|
||
if (typeof r.ts === "number" && r.ts >= sinceMs && r.ts <= untilMs) {
|
||
out.push(r);
|
||
}
|
||
} catch {
|
||
// Bad line — skip silently. Doesn't disrupt the rest of the read.
|
||
}
|
||
}
|
||
} catch (err) {
|
||
if (err.code !== "ENOENT") {
|
||
console.error(`[audit-log] read failed: ${err?.message || err}`);
|
||
}
|
||
}
|
||
// Newest first by ts.
|
||
out.sort((a, b) => b.ts - a.ts);
|
||
return out;
|
||
}
|
||
|
||
// Compute multi-dimensional aggregates over a set of entries. The
|
||
// dashboard renders all of these — each is a small object array
|
||
// suitable for direct tabulation.
|
||
export function aggregate(entries) {
|
||
const calls = entries.length;
|
||
const success = entries.filter((e) => e.status === "success").length;
|
||
const errors = entries.filter((e) => e.status === "error").length;
|
||
const refused = entries.filter((e) => e.status === "refused").length;
|
||
|
||
let totalCost = 0;
|
||
let totalDuration = 0;
|
||
let totalInputTokens = 0;
|
||
let totalOutputTokens = 0;
|
||
let totalThinkingTokens = 0;
|
||
for (const e of entries) {
|
||
totalCost += e.cost_usd || 0;
|
||
totalDuration += e.duration_ms || 0;
|
||
totalInputTokens += e.input_tokens || 0;
|
||
totalOutputTokens += e.output_tokens || 0;
|
||
totalThinkingTokens += e.thinking_tokens || 0;
|
||
}
|
||
|
||
// ── By tier ──
|
||
const byTier = groupBy(entries, (e) => e.tier || "unknown");
|
||
const tierRows = Object.entries(byTier).map(([tier, list]) => ({
|
||
tier,
|
||
calls: list.length,
|
||
cost_usd: sumBy(list, "cost_usd"),
|
||
avg_duration_ms: avgBy(list, "duration_ms"),
|
||
unique_installs: new Set(list.map((e) => e.install_id)).size,
|
||
}));
|
||
|
||
// ── By model ──
|
||
const byModel = groupBy(entries, (e) => e.model || "unknown");
|
||
const modelRows = Object.entries(byModel).map(([model, list]) => ({
|
||
model,
|
||
calls: list.length,
|
||
cost_usd: sumBy(list, "cost_usd"),
|
||
input_tokens: sumBy(list, "input_tokens"),
|
||
output_tokens: sumBy(list, "output_tokens"),
|
||
thinking_tokens: sumBy(list, "thinking_tokens"),
|
||
avg_duration_ms: avgBy(list, "duration_ms"),
|
||
avg_cost_usd: list.length > 0 ? sumBy(list, "cost_usd") / list.length : 0,
|
||
}));
|
||
|
||
// ── By pipeline ──
|
||
const byPipeline = groupBy(entries, (e) => e.pipeline || "unknown");
|
||
const pipelineRows = Object.entries(byPipeline).map(([pipeline, list]) => ({
|
||
pipeline,
|
||
calls: list.length,
|
||
cost_usd: sumBy(list, "cost_usd"),
|
||
avg_duration_ms: avgBy(list, "duration_ms"),
|
||
}));
|
||
|
||
// ── By backend ──
|
||
const byBackend = groupBy(entries, (e) => e.backend || "unknown");
|
||
const backendRows = Object.entries(byBackend).map(([backend, list]) => ({
|
||
backend,
|
||
calls: list.length,
|
||
cost_usd: sumBy(list, "cost_usd"),
|
||
avg_duration_ms: avgBy(list, "duration_ms"),
|
||
}));
|
||
|
||
// ── By install (top 20 by spend) ──
|
||
const byInstall = groupBy(entries, (e) => e.install_id || "unknown");
|
||
const installRows = Object.entries(byInstall)
|
||
.map(([install, list]) => ({
|
||
install_id: install,
|
||
tier_snapshot: list[0]?.tier || "core",
|
||
calls: list.length,
|
||
cost_usd: sumBy(list, "cost_usd"),
|
||
// Distinct summarize jobs (collapse transcribe+analyze pairs).
|
||
summaries: new Set(list.map((e) => e.job_id).filter(Boolean)).size,
|
||
avg_duration_ms: avgBy(list, "duration_ms"),
|
||
last_active_at: Math.max(...list.map((e) => e.ts || 0)),
|
||
}))
|
||
.sort((a, b) => b.cost_usd - a.cost_usd)
|
||
.slice(0, 20);
|
||
|
||
// ── By hour-of-day (for traffic-pattern view) ──
|
||
const byHour = groupBy(entries, (e) => new Date(e.ts).getUTCHours());
|
||
const hourRows = Array.from({ length: 24 }, (_, h) => {
|
||
const list = byHour[h] || [];
|
||
return {
|
||
hour_utc: h,
|
||
calls: list.length,
|
||
cost_usd: sumBy(list, "cost_usd"),
|
||
};
|
||
});
|
||
|
||
// ── Cost vs speed (per-model averages) ──
|
||
// Same source as modelRows but kept separate so the dashboard can
|
||
// render it as a scatter / table without extra transformation.
|
||
const costSpeedRows = modelRows
|
||
.map((r) => ({
|
||
model: r.model,
|
||
avg_cost_usd: r.avg_cost_usd,
|
||
avg_duration_ms: r.avg_duration_ms,
|
||
calls: r.calls,
|
||
}))
|
||
.sort((a, b) => a.avg_duration_ms - b.avg_duration_ms);
|
||
|
||
return {
|
||
summary: {
|
||
calls,
|
||
success,
|
||
errors,
|
||
refused,
|
||
success_rate: calls > 0 ? success / calls : 0,
|
||
total_cost_usd: totalCost,
|
||
total_duration_ms: totalDuration,
|
||
avg_duration_ms: calls > 0 ? totalDuration / calls : 0,
|
||
total_input_tokens: totalInputTokens,
|
||
total_output_tokens: totalOutputTokens,
|
||
total_thinking_tokens: totalThinkingTokens,
|
||
},
|
||
by_tier: tierRows,
|
||
by_model: modelRows,
|
||
by_pipeline: pipelineRows,
|
||
by_backend: backendRows,
|
||
by_install: installRows,
|
||
by_hour_utc: hourRows,
|
||
cost_vs_speed: costSpeedRows,
|
||
};
|
||
}
|
||
|
||
function groupBy(list, keyFn) {
|
||
const out = {};
|
||
for (const item of list) {
|
||
const k = keyFn(item);
|
||
if (!out[k]) out[k] = [];
|
||
out[k].push(item);
|
||
}
|
||
return out;
|
||
}
|
||
|
||
function sumBy(list, key) {
|
||
let s = 0;
|
||
for (const item of list) s += item[key] || 0;
|
||
return s;
|
||
}
|
||
|
||
function avgBy(list, key) {
|
||
if (list.length === 0) return 0;
|
||
return sumBy(list, key) / list.length;
|
||
}
|