Files

197 lines
7.4 KiB
JavaScript

// Job-id deduplication. Recap mints a UUID per summarize job (the
// transcribe + analyze pair) and sends it in X-Recap-Job-Id on every
// relay call. The first call with a given (creditKey, job_id) tuple
// reserves a credit; subsequent calls with the same tuple are free
// until the job_id expires (1 hour).
//
// Keyed by creditKey (`lic:<fp>` for paid tiers, `inst:<installId>`
// otherwise) — the SAME key the credits.js ledger uses — so a
// transcribe that landed on the cloud account's install can be
// recognized + refunded by a follow-up analyze landing from the
// self-hosted install of the same license. The credit-key plumbing
// is what unifies them; the job-id is the dedup grain.
//
// Persisted to disk at /data/jobs.json so the refund logic survives
// container restarts. The earlier in-memory-only version had a bug:
// when transcribe charged a credit (marking the job in memory), the
// relay restarted, and then analyze tried to refund the failed call,
// `lookupJob` returned null (memory wiped) and refundJob did
// nothing. Credits stuck on the ledger. Disk persistence fixes that
// — a restart-and-resume operator-side never loses refund state.
import fs from "fs/promises";
import path from "path";
import { refundCredit, getCreditKey } from "./credits.js";
const JOB_TTL_MS = 60 * 60 * 1000; // 1 hour
// Map<creditKey|job_id, { backend, tier, install_id, license_fingerprint, charged_at, refunded }>
// install_id + license_fingerprint are stored alongside the credit-key
// so refundCredit can route the refund to the SAME ledger row that
// commitCredit charged — getOrCreateRow needs license context to
// resolve a `lic:<fp>` row.
const jobs = new Map();
let dataDir = "/data";
let jobsPath = "/data/jobs.json";
let writing = null; // serializes concurrent writes
function key(creditKey, jobId) {
return `${creditKey}|${jobId}`;
}
// Boot-time load. Called from server/index.js before any route hits.
// If the file is missing or corrupt, start empty — same effective
// state as a fresh install. Expired entries are pruned during load.
export async function initJobCredits({ dataDir: dd } = {}) {
if (dd) dataDir = dd;
jobsPath = path.join(dataDir, "jobs.json");
try {
const raw = await fs.readFile(jobsPath, "utf8");
const parsed = JSON.parse(raw);
if (parsed && Array.isArray(parsed.entries)) {
const cutoff = Date.now() - JOB_TTL_MS;
for (const entry of parsed.entries) {
if (
entry &&
typeof entry.key === "string" &&
typeof entry.charged_at === "number" &&
entry.charged_at >= cutoff
) {
const { key: k, ...rest } = entry;
jobs.set(k, rest);
}
}
console.log(`[job-credits] loaded ${jobs.size} jobs from ${jobsPath}`);
}
} catch (err) {
if (err.code !== "ENOENT") {
console.warn(`[job-credits] failed to read ${jobsPath}: ${err?.message || err}`);
}
}
}
async function persist() {
// Coalesce concurrent writes — same pattern as credits.js. Each
// mutation triggers a serialized write; outstanding writes
// resolve against the latest in-memory snapshot.
if (writing) await writing;
writing = (async () => {
const entries = [];
pruneExpired();
for (const [k, v] of jobs) {
entries.push({ key: k, ...v });
}
const tmp = jobsPath + ".tmp";
await fs.writeFile(tmp, JSON.stringify({ entries }), { mode: 0o600 });
await fs.rename(tmp, jobsPath);
})();
try {
await writing;
} finally {
writing = null;
}
}
// On a new request: returns existing reservation (caller must NOT
// double-charge) or null (caller should commit a credit). The caller
// passes the same (installId, license) pair it would pass to credits.js
// — we resolve to the credit-key internally so the dedup grain matches
// the ledger key.
export function lookupJob({ installId, license, creditKey = null, jobId }) {
if (!jobId) return null;
pruneExpired();
const ck = creditKey || getCreditKey({ installId, license });
const k = key(ck, jobId);
const existing = jobs.get(k);
if (existing && !existing.refunded) return existing;
return null;
}
// Mark a job as having been charged. Idempotent — second call for the
// same (creditKey, job_id) is a no-op. Stores enough context on the
// reservation that a later refundJob can reconstruct the same credit-key
// (and therefore find the same ledger row) without the caller needing
// to repeat the license.
export async function markJobCharged({ installId, license, creditKey = null, jobId, backend, tier }) {
if (!jobId) return;
pruneExpired();
const ck = creditKey || getCreditKey({ installId, license });
const k = key(ck, jobId);
if (jobs.has(k) && !jobs.get(k).refunded) return;
// Pull the license fingerprint off the credit-key so refund time
// doesn't need to recompute it (we'd no longer have the license
// object in scope on a restart-resume refund).
const license_fingerprint =
ck.startsWith("lic:") ? ck.slice("lic:".length) : null;
jobs.set(k, {
backend,
tier,
install_id: installId || null,
license_fingerprint,
charged_at: Date.now(),
refunded: false,
});
try {
await persist();
} catch (err) {
console.error(`[job-credits] persist failed after mark: ${err?.message || err}`);
}
}
// Refund a previously charged credit for a failed job. Returns the
// credit to the persistent ledger AND marks the in-memory job record
// as refunded so a subsequent same-job_id call is treated as new.
//
// Idempotent: refunding an already-refunded job is a no-op, so call
// sites can fire-and-forget on every error path without needing to
// track whether they were the FIRST error path to refund.
//
// The refund routes back to the SAME row that was charged because we
// stored the credit-key components (install_id + license_fingerprint)
// at mark time — refundCredit reconstructs the key via getOrCreateRow,
// which in turn calls getCreditKey on the same inputs.
export async function refundJob({ installId, license, creditKey = null, jobId }) {
if (!jobId) return;
const ck = creditKey || getCreditKey({ installId, license });
const k = key(ck, jobId);
const existing = jobs.get(k);
if (!existing || existing.refunded) return;
existing.refunded = true;
try {
// Route the refund back to the SAME row that was charged. The
// credit-key was computed at mark time from the same (installId,
// license) the caller is passing now — `ck` here equals the
// mark-time key whenever the caller is consistent. Pass it as an
// explicit creditKey override so refundCredit doesn't need to
// re-derive it from a license object the caller might not have
// (e.g. some error paths refund without re-resolving the license).
await refundCredit({
installId: existing.install_id || installId || null,
creditKey: ck,
backend: existing.backend,
tier: existing.tier,
});
} catch (err) {
console.error(
`[job-credits] refundCredit failed for ${ck}|${jobId}: ${err?.message || err}`
);
}
try {
await persist();
} catch (err) {
console.error(`[job-credits] persist failed after refund: ${err?.message || err}`);
}
}
function pruneExpired() {
const cutoff = Date.now() - JOB_TTL_MS;
for (const [k, v] of jobs) {
if (v.charged_at < cutoff) jobs.delete(k);
}
}
export function snapshotJobs() {
pruneExpired();
return Array.from(jobs.entries()).map(([k, v]) => ({ key: k, ...v }));
}