// Per-call audit log for profitability + observability. Each relay // request (success or failure) appends one line of newline-delimited // JSON to /data/relay-calls.ndjson. Append-only — read paths parse // the whole file in memory for aggregation, which is cheap up to // 100k+ entries at typical relay scale (low-tens-of-thousands of // calls per month). // // Record shape (no field is required; missing fields just don't // appear in aggregations): // { // ts: ms-epoch when the request landed // install_id: X-Recap-Install-Id (truncated for log readability) // license_fingerprint: stable 16-hex hash of the licenseUuid for // paid-tier calls; null for anonymous/Core. Added // in the license-keyed-credits refactor so spend can // be aggregated by license-pool (since one license // may span multiple installs). install_id is STILL // logged on every entry — license_fingerprint is // additive forensic visibility, not a replacement. // tier: "core" | "pro" | "max" // pipeline: "transcribe" | "analyze" // backend: "gemini" | "hardware" // model: e.g. "gemini-3-flash-preview", "parakeet-tdt-0.6b-v3" // status: "success" | "error" | "refused" (refused = quota) // credit_charged: 0 | 1 // duration_ms: end-to-end wall time // input_tokens, output_tokens, thinking_tokens (Gemini only) // cost_usd: computed from token counts × per-1M-token rates // job_id: X-Recap-Job-Id (so we can collapse pairs into one) // error: short error string if status="error" // } // // Rotation isn't built in — for the prototype, operator can rotate // manually (mv relay-calls.ndjson relay-calls.ndjson.0; restart). Once // volume warrants, replace this with a daily-rotated logger or move to // SQLite for indexed time-range queries. import fs from "fs/promises"; import { createReadStream } from "fs"; import readline from "readline"; import path from "path"; let dataDir = "/data"; let logPath = "/data/relay-calls.ndjson"; // Size at which we rotate the live ndjson to a dated archive. Picked // to roughly match a year of high-volume relay traffic — a typical // entry is ~400 bytes, so 50MB ≈ 130k entries. Rotation runs once at // boot; the operator can also rotate manually any time. const ROTATION_THRESHOLD_BYTES = 50 * 1024 * 1024; export async function initAuditLog({ dataDir: dd }) { if (dd) dataDir = dd; logPath = path.join(dataDir, "relay-calls.ndjson"); await maybeRotateLog(); // Ensure the file exists so the streaming read path doesn't trip. try { await fs.access(logPath); } catch { await fs.writeFile(logPath, "", { mode: 0o600 }); } console.log(`[audit-log] writing to ${logPath}`); } // Rotate the live ndjson to a dated archive when it grows past the // threshold. The dashboard's `readEntries` always reads the live file // only — archived entries fall out of the rolling 30-day window // naturally and are kept around as raw files for ad-hoc analysis or // long-term storage / CSV export. If a same-day archive already exists // (e.g. operator restarts the relay mid-rotation), append a counter. async function maybeRotateLog() { let stat; try { stat = await fs.stat(logPath); } catch { return; // No file yet — nothing to rotate. } if (stat.size < ROTATION_THRESHOLD_BYTES) return; const ymd = new Date().toISOString().slice(0, 10); let archive = path.join(dataDir, `relay-calls-${ymd}.ndjson`); let counter = 1; while (true) { try { await fs.access(archive); // Exists; pick a new name with a counter suffix. archive = path.join(dataDir, `relay-calls-${ymd}.${counter}.ndjson`); counter += 1; if (counter > 99) return; // pathological — give up rotating } catch { break; // Free name found. } } try { await fs.rename(logPath, archive); await fs.writeFile(logPath, "", { mode: 0o600 }); console.log( `[audit-log] rotated ${(stat.size / 1024 / 1024).toFixed(1)}MB → ${archive}` ); } catch (err) { console.warn(`[audit-log] rotation failed: ${err?.message || err}`); } } // Best-effort append. Errors are logged but never rethrown — losing // an audit line shouldn't fail the relay call that caused it. export async function recordCall(entry) { const record = { ts: Date.now(), ...entry }; try { await fs.appendFile(logPath, JSON.stringify(record) + "\n", { mode: 0o600 }); } catch (err) { console.error(`[audit-log] append failed: ${err?.message || err}`); } } // Truncate the entire audit log. Used by the dashboard's "Delete all" // button for cleanup before going-live or after a string of bad-data // test runs (relay re-installed mid-run, config tweaks producing // inconsistent measurements, etc.). Destructive — no undo. export async function clearAllAuditEntries() { try { await fs.writeFile(logPath, "", { mode: 0o600 }); return { ok: true }; } catch (err) { return { ok: false, error: err?.message || String(err) }; } } // Delete audit rows matching specific job_ids. Reads the whole log, // filters out lines belonging to the target jobs, writes the remainder // back. O(N) on the file size; fine for any plausible audit log (we // rotate at 64MB anyway). Returns the count of rows removed. export async function deleteAuditRowsByJobIds(jobIds) { if (!Array.isArray(jobIds) || jobIds.length === 0) return { deleted: 0 }; const idSet = new Set(jobIds); const lines = []; let deleted = 0; try { const stream = createReadStream(logPath, { encoding: "utf8" }); const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); for await (const line of rl) { if (!line.trim()) continue; try { const r = JSON.parse(line); if (r.job_id && idSet.has(r.job_id)) { deleted += 1; continue; } lines.push(line); } catch { // Bad line — preserve it rather than dropping; matches the // skip-and-continue behavior of readEntries. lines.push(line); } } } catch (err) { if (err.code !== "ENOENT") throw err; } await fs.writeFile(logPath, lines.join("\n") + (lines.length ? "\n" : ""), { mode: 0o600, }); return { deleted }; } // Read all entries since `sinceMs` (default: 30 days). Streamed // line-by-line so the whole file doesn't sit in memory at once. // Returned array is newest-first. export async function readEntries({ sinceMs = Date.now() - 30 * 24 * 3600 * 1000, untilMs = Number.POSITIVE_INFINITY, } = {}) { const out = []; try { const stream = createReadStream(logPath, { encoding: "utf8" }); const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); for await (const line of rl) { if (!line.trim()) continue; try { const r = JSON.parse(line); if (typeof r.ts === "number" && r.ts >= sinceMs && r.ts <= untilMs) { out.push(r); } } catch { // Bad line — skip silently. Doesn't disrupt the rest of the read. } } } catch (err) { if (err.code !== "ENOENT") { console.error(`[audit-log] read failed: ${err?.message || err}`); } } // Newest first by ts. out.sort((a, b) => b.ts - a.ts); return out; } // Compute multi-dimensional aggregates over a set of entries. The // dashboard renders all of these — each is a small object array // suitable for direct tabulation. export function aggregate(entries) { const calls = entries.length; const success = entries.filter((e) => e.status === "success").length; const errors = entries.filter((e) => e.status === "error").length; const refused = entries.filter((e) => e.status === "refused").length; let totalCost = 0; let totalDuration = 0; let totalInputTokens = 0; let totalOutputTokens = 0; let totalThinkingTokens = 0; for (const e of entries) { totalCost += e.cost_usd || 0; totalDuration += e.duration_ms || 0; totalInputTokens += e.input_tokens || 0; totalOutputTokens += e.output_tokens || 0; totalThinkingTokens += e.thinking_tokens || 0; } // ── By tier ── // unique_users is install-count for Core (no license to dedup against) // and distinct-license-count for paid tiers (so a Pro license active // on two installs counts ONCE here, matching the post-refactor credit // model where they share one monthly pool). Falls back to install_id // for paid entries that predate the license_fingerprint field. const byTier = groupBy(entries, (e) => e.tier || "unknown"); const tierRows = Object.entries(byTier).map(([tier, list]) => ({ tier, calls: list.length, cost_usd: sumBy(list, "cost_usd"), avg_duration_ms: avgBy(list, "duration_ms"), unique_installs: tier === "core" || tier === "unknown" ? new Set(list.map((e) => e.install_id)).size : new Set(list.map((e) => e.license_fingerprint || e.install_id)).size, })); // ── By model ── const byModel = groupBy(entries, (e) => e.model || "unknown"); const modelRows = Object.entries(byModel).map(([model, list]) => ({ model, calls: list.length, cost_usd: sumBy(list, "cost_usd"), input_tokens: sumBy(list, "input_tokens"), output_tokens: sumBy(list, "output_tokens"), thinking_tokens: sumBy(list, "thinking_tokens"), avg_duration_ms: avgBy(list, "duration_ms"), avg_cost_usd: list.length > 0 ? sumBy(list, "cost_usd") / list.length : 0, })); // ── By pipeline ── const byPipeline = groupBy(entries, (e) => e.pipeline || "unknown"); const pipelineRows = Object.entries(byPipeline).map(([pipeline, list]) => ({ pipeline, calls: list.length, cost_usd: sumBy(list, "cost_usd"), avg_duration_ms: avgBy(list, "duration_ms"), })); // ── By backend ── const byBackend = groupBy(entries, (e) => e.backend || "unknown"); const backendRows = Object.entries(byBackend).map(([backend, list]) => ({ backend, calls: list.length, cost_usd: sumBy(list, "cost_usd"), avg_duration_ms: avgBy(list, "duration_ms"), })); // ── By install (top 20 by spend) ── const byInstall = groupBy(entries, (e) => e.install_id || "unknown"); const installRows = Object.entries(byInstall) .map(([install, list]) => ({ install_id: install, tier_snapshot: list[0]?.tier || "core", calls: list.length, cost_usd: sumBy(list, "cost_usd"), // Distinct summarize jobs (collapse transcribe+analyze pairs). summaries: new Set(list.map((e) => e.job_id).filter(Boolean)).size, avg_duration_ms: avgBy(list, "duration_ms"), last_active_at: Math.max(...list.map((e) => e.ts || 0)), })) .sort((a, b) => b.cost_usd - a.cost_usd) .slice(0, 20); // ── By license fingerprint (top 20 by spend, paid tiers only) ── // One license may span multiple installs (cloud account + self-host), // and the post-refactor credit ledger aggregates their spend onto a // single pool. This view mirrors that — operators get a "by paid // user" rollup that doesn't double-count multi-install Pros, plus an // install-count column to see distribution per license. const byLicense = groupBy( entries.filter((e) => e.license_fingerprint), (e) => e.license_fingerprint ); const licenseRows = Object.entries(byLicense) .map(([fp, list]) => ({ license_fingerprint: fp, tier_snapshot: list[0]?.tier || "core", calls: list.length, cost_usd: sumBy(list, "cost_usd"), summaries: new Set(list.map((e) => e.job_id).filter(Boolean)).size, unique_installs: new Set(list.map((e) => e.install_id).filter(Boolean)).size, avg_duration_ms: avgBy(list, "duration_ms"), last_active_at: Math.max(...list.map((e) => e.ts || 0)), })) .sort((a, b) => b.cost_usd - a.cost_usd) .slice(0, 20); // ── By hour-of-day (for traffic-pattern view) ── const byHour = groupBy(entries, (e) => new Date(e.ts).getUTCHours()); const hourRows = Array.from({ length: 24 }, (_, h) => { const list = byHour[h] || []; return { hour_utc: h, calls: list.length, cost_usd: sumBy(list, "cost_usd"), }; }); // ── Cost vs speed (per-model averages) ── // Same source as modelRows but kept separate so the dashboard can // render it as a scatter / table without extra transformation. const costSpeedRows = modelRows .map((r) => ({ model: r.model, avg_cost_usd: r.avg_cost_usd, avg_duration_ms: r.avg_duration_ms, calls: r.calls, })) .sort((a, b) => a.avg_duration_ms - b.avg_duration_ms); // ── Per-summary rollup (collapse transcribe + analyze pairs) ── // Every "summarize" produces 2 audit entries — one transcribe, one // analyze — sharing a job_id. The dashboard's call-level views show // them separately, which is useful for backend-vs-pipeline tuning but // confusing as "how many summaries did I serve". Group by job_id so // operators see one row per summary with combined cost/duration. // Entries without a job_id (older relay versions, or balance pings) // are bucketed into their own "no-jobid" row at the bottom. const byJob = groupBy(entries, (e) => e.job_id || "__no_jobid__"); const summaryRows = Object.entries(byJob) .filter(([k]) => k !== "__no_jobid__") .map(([jobId, list]) => { const transcribe = list.find((e) => e.pipeline === "transcribe"); const analyze = list.find((e) => e.pipeline === "analyze"); return { job_id: jobId, install_id: list[0]?.install_id || null, tier: list[0]?.tier || null, started_at: Math.min(...list.map((e) => e.ts || Infinity)), completed_at: Math.max(...list.map((e) => e.ts || 0)), transcribe_backend: transcribe?.backend || null, transcribe_model: transcribe?.model || null, analyze_backend: analyze?.backend || null, analyze_model: analyze?.model || null, total_cost_usd: sumBy(list, "cost_usd"), total_duration_ms: sumBy(list, "duration_ms"), status: list.every((e) => e.status === "success") ? "success" : list.some((e) => e.status === "error") ? "error" : "partial", had_transcribe: !!transcribe, had_analyze: !!analyze, }; }) .sort((a, b) => b.completed_at - a.completed_at); // ── Recent errors (newest 50) ── // Quick triage view — when something is failing, the operator needs // to see the offending error strings without scrolling the full // call log. // Surface any audit row carrying an error message — that catches // status="error" (true backend failures) AND status="partial" // (e.g. transcribe-with-truncated-chunks, which records the // missing-speech message in the error field). Operators rely on // this view to triage all degraded behavior, not just outright // 5xx-class failures, so the broader filter is the right default. const errorRows = entries .filter((e) => e.error) .slice(0, 50) .map((e) => ({ ts: e.ts, install_id: e.install_id || null, tier: e.tier || null, pipeline: e.pipeline || null, backend: e.backend || null, model: e.model || null, duration_ms: e.duration_ms || 0, error: (e.error || "").slice(0, 280), attempts: Array.isArray(e.attempts) ? e.attempts : null, })); // ── Per-(pipeline, model) performance + failure tables ── // Normalizes raw duration_ms by audio_seconds so different models // can be compared on a backend-agnostic benchmark: how many ms of // wall-clock time does this model take per minute of audio? Analyze // calls don't have audio (they consume the transcript text), so we // report ms-per-1k-input-tokens for those instead. // // Failure rate is computed against `attempted` (success + error) // and excludes `refused` calls — refused requests never reached the // backend, so they shouldn't count against the model's reliability. const byPipelineModel = {}; for (const e of entries) { const pipeline = e.pipeline || "unknown"; const model = e.model || "unknown"; if (model === "unknown" && e.status === "refused") continue; // refused entries often have no model const key = `${pipeline}::${model}`; if (!byPipelineModel[key]) { byPipelineModel[key] = { pipeline, model, calls: 0, success: 0, errors: 0, refused: 0, partials: 0, sum_duration_ms: 0, sum_audio_seconds: 0, sum_input_tokens: 0, sum_output_tokens: 0, error_counts: {}, // { error_signature: count } }; } const row = byPipelineModel[key]; row.calls += 1; if (e.status === "success") row.success += 1; if (e.status === "error") row.errors += 1; if (e.status === "refused") row.refused += 1; if (e.status === "partial") row.partials += 1; row.sum_duration_ms += e.duration_ms || 0; if (typeof e.audio_seconds === "number" && e.audio_seconds > 0) { row.sum_audio_seconds += e.audio_seconds; } row.sum_input_tokens += e.input_tokens || 0; row.sum_output_tokens += e.output_tokens || 0; // Aggregate the top-error counts off ANY row that has a populated // error message — not just status="error" rows. Partial (truncated // transcribe) and refused (out-of-credits, capacity-gated) rows // also carry useful error strings the operator wants to see in // the "Top failure modes" table. The old gate `status === "error"` // hid all truncations because they're recorded as status="partial". if (e.error) { const sig = errorSignature(e.error); row.error_counts[sig] = (row.error_counts[sig] || 0) + 1; } } const perfByModel = Object.values(byPipelineModel).map((r) => { const attempted = r.success + r.errors; const successRate = attempted > 0 ? r.success / attempted : null; const audioMin = r.sum_audio_seconds / 60; const msPerAudioMin = audioMin > 0 ? r.sum_duration_ms / audioMin : null; const msPer1kInputTokens = r.sum_input_tokens > 0 ? r.sum_duration_ms / (r.sum_input_tokens / 1000) : null; // Top 3 error signatures by frequency for this model. const topErrors = Object.entries(r.error_counts) .map(([signature, count]) => ({ signature, count })) .sort((a, b) => b.count - a.count) .slice(0, 3); return { pipeline: r.pipeline, model: r.model, calls: r.calls, success: r.success, errors: r.errors, refused: r.refused, partials: r.partials, // "failures" = total signal worth surfacing in failure tables. // Includes partials so a TX that lost minutes of speech via a // truncated chunk is counted as a failure mode, not silently // tucked away under a "success" pipe. The errors-by-model // dashboard table reads this; the per-call "errors" field stays // available for stricter computations. failures: r.errors + r.partials, success_rate: successRate, // Speed benchmark fields. Either or both may be null when there // wasn't enough successful-with-metadata data to compute them. ms_per_audio_minute: msPerAudioMin, ms_per_1k_input_tokens: msPer1kInputTokens, total_audio_minutes: audioMin > 0 ? audioMin : null, top_errors: topErrors, }; }); // ── Revenue / margin (requires tier prices supplied by caller) ── // Distinct paying USERS in the window × the operator's per-tier // monthly price. For Core (free) we count distinct installs — that's // still the right grain for free-tier "active users", since Core has // no license to dedup against. For Pro/Max we count distinct license // fingerprints so a single Pro license activated on two installs // (cloud + self-host) counts ONCE toward monthly revenue, matching // the post-refactor credit model where they share one monthly pool. // Falls back to install_id for paid entries missing a fingerprint // (legacy pre-refactor audit rows) so historical ranges stay // approximately correct rather than dropping to zero. // // Strictly an *estimate* — the relay doesn't know if a Pro user // actually paid this month, just that they touched a request. // Underestimates churned customers (who paid but didn't call) and // overestimates trial users (who haven't paid yet). Hooked in by // the dashboard route, not here, so tests can pass an empty prices // map and get zero. const tierActiveInstalls = { core: new Set(), pro: new Set(), max: new Set(), }; for (const e of entries) { const t = e.tier || "core"; if (!tierActiveInstalls[t]) continue; if (t === "core") { if (e.install_id) tierActiveInstalls.core.add(e.install_id); } else { // Paid: prefer fingerprint, fall back to install_id for legacy rows. const id = e.license_fingerprint || e.install_id; if (id) tierActiveInstalls[t].add(id); } } return { summary: { calls, success, errors, refused, success_rate: calls > 0 ? success / calls : 0, total_cost_usd: totalCost, total_duration_ms: totalDuration, avg_duration_ms: calls > 0 ? totalDuration / calls : 0, total_input_tokens: totalInputTokens, total_output_tokens: totalOutputTokens, total_thinking_tokens: totalThinkingTokens, total_summaries: summaryRows.length, // active_installs_by_tier name retained for dashboard compatibility, // but the paid-tier counts here are actually DISTINCT LICENSES, // not distinct installs (see the comment on tierActiveInstalls // above). Core remains install-based. The dashboard label is // "Active users by tier" which fits either grain. active_installs_by_tier: { core: tierActiveInstalls.core.size, pro: tierActiveInstalls.pro.size, max: tierActiveInstalls.max.size, }, }, by_tier: tierRows, by_model: modelRows, by_pipeline: pipelineRows, by_backend: backendRows, by_install: installRows, by_license: licenseRows, by_hour_utc: hourRows, cost_vs_speed: costSpeedRows, by_summary: summaryRows, errors: errorRows, perf_by_model: perfByModel, }; } // Normalize a raw error string into a stable signature so two // near-identical messages bucket together. The audit log stores // truncated raw messages — we want the bucket key to be coarse enough // that small variations (a different request-id, file name, port // number, etc.) collapse into a single error class. // // Heuristics: // - Strip ISO timestamps and timestamps with offsets // - Strip UUIDs / hex blob hashes / long alphanumeric IDs // - Strip numeric file sizes and ports // - Strip URLs to their host + path-pattern // - Trim to first 120 chars after normalization function errorSignature(raw) { if (!raw) return "(unknown)"; let s = String(raw); s = s.replace(/\b\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:?\d{2})?\b/g, ""); s = s.replace(/\b[0-9a-f]{32,}\b/gi, ""); s = s.replace(/\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b/gi, ""); s = s.replace(/https?:\/\/[^\s)"']+/g, ""); s = s.replace(/:\d{2,5}\b/g, ":"); s = s.replace(/\b\d{4,}\b/g, ""); return s.trim().slice(0, 120); } // Derived revenue/margin numbers. Pulled out of aggregate() because it // needs prices the operator sets in config — keeping the core // aggregator config-agnostic. Returns: // { // monthly_revenue_usd: pro_count * pro_price + max_count * max_price // + core_count * core_price, // gemini_cost_usd_in_range: summary.total_cost_usd (passed through), // margin_usd: revenue - cost, // approximate // by_tier_revenue: [{ tier, active_installs, price_usd, revenue_usd }], // } // // `active_installs_by_tier` should come from the aggregate summary // (Set sizes already computed there). `prices` is the {core,pro,max} // USD-per-month map. `geminiCostInRange` is total_cost_usd from the // summary. export function computeRevenue({ activeInstallsByTier, prices, geminiCostInRange }) { const tiers = ["core", "pro", "max"]; const byTier = tiers.map((tier) => { const installs = activeInstallsByTier?.[tier] || 0; const price = Math.max(0, Number(prices?.[tier] ?? 0)); return { tier, active_installs: installs, price_usd: price, revenue_usd: installs * price, }; }); const revenue = byTier.reduce((s, r) => s + r.revenue_usd, 0); return { monthly_revenue_usd: revenue, gemini_cost_usd_in_range: geminiCostInRange, margin_usd: revenue - geminiCostInRange, by_tier_revenue: byTier, }; } function groupBy(list, keyFn) { const out = {}; for (const item of list) { const k = keyFn(item); if (!out[k]) out[k] = []; out[k].push(item); } return out; } function sumBy(list, key) { let s = 0; for (const item of list) s += item[key] || 0; return s; } function avgBy(list, key) { if (list.length === 0) return 0; return sumBy(list, key) / list.length; }