// Job-id deduplication. Recap mints a UUID per summarize job (the // transcribe + analyze pair) and sends it in X-Recap-Job-Id on every // relay call. The first call with a given (creditKey, job_id) tuple // reserves a credit; subsequent calls with the same tuple are free // until the job_id expires (1 hour). // // Keyed by creditKey (`lic:` for paid tiers, `inst:` // otherwise) — the SAME key the credits.js ledger uses — so a // transcribe that landed on the cloud account's install can be // recognized + refunded by a follow-up analyze landing from the // self-hosted install of the same license. The credit-key plumbing // is what unifies them; the job-id is the dedup grain. // // Persisted to disk at /data/jobs.json so the refund logic survives // container restarts. The earlier in-memory-only version had a bug: // when transcribe charged a credit (marking the job in memory), the // relay restarted, and then analyze tried to refund the failed call, // `lookupJob` returned null (memory wiped) and refundJob did // nothing. Credits stuck on the ledger. Disk persistence fixes that // — a restart-and-resume operator-side never loses refund state. import fs from "fs/promises"; import path from "path"; import { refundCredit, getCreditKey } from "./credits.js"; const JOB_TTL_MS = 60 * 60 * 1000; // 1 hour // Map // install_id + license_fingerprint are stored alongside the credit-key // so refundCredit can route the refund to the SAME ledger row that // commitCredit charged — getOrCreateRow needs license context to // resolve a `lic:` row. const jobs = new Map(); let dataDir = "/data"; let jobsPath = "/data/jobs.json"; let writing = null; // serializes concurrent writes function key(creditKey, jobId) { return `${creditKey}|${jobId}`; } // Boot-time load. Called from server/index.js before any route hits. // If the file is missing or corrupt, start empty — same effective // state as a fresh install. Expired entries are pruned during load. export async function initJobCredits({ dataDir: dd } = {}) { if (dd) dataDir = dd; jobsPath = path.join(dataDir, "jobs.json"); try { const raw = await fs.readFile(jobsPath, "utf8"); const parsed = JSON.parse(raw); if (parsed && Array.isArray(parsed.entries)) { const cutoff = Date.now() - JOB_TTL_MS; for (const entry of parsed.entries) { if ( entry && typeof entry.key === "string" && typeof entry.charged_at === "number" && entry.charged_at >= cutoff ) { const { key: k, ...rest } = entry; jobs.set(k, rest); } } console.log(`[job-credits] loaded ${jobs.size} jobs from ${jobsPath}`); } } catch (err) { if (err.code !== "ENOENT") { console.warn(`[job-credits] failed to read ${jobsPath}: ${err?.message || err}`); } } } async function persist() { // Coalesce concurrent writes — same pattern as credits.js. Each // mutation triggers a serialized write; outstanding writes // resolve against the latest in-memory snapshot. if (writing) await writing; writing = (async () => { const entries = []; pruneExpired(); for (const [k, v] of jobs) { entries.push({ key: k, ...v }); } const tmp = jobsPath + ".tmp"; await fs.writeFile(tmp, JSON.stringify({ entries }), { mode: 0o600 }); await fs.rename(tmp, jobsPath); })(); try { await writing; } finally { writing = null; } } // On a new request: returns existing reservation (caller must NOT // double-charge) or null (caller should commit a credit). The caller // passes the same (installId, license) pair it would pass to credits.js // — we resolve to the credit-key internally so the dedup grain matches // the ledger key. export function lookupJob({ installId, license, creditKey = null, jobId }) { if (!jobId) return null; pruneExpired(); const ck = creditKey || getCreditKey({ installId, license }); const k = key(ck, jobId); const existing = jobs.get(k); if (existing && !existing.refunded) return existing; return null; } // Mark a job as having been charged. Idempotent — second call for the // same (creditKey, job_id) is a no-op. Stores enough context on the // reservation that a later refundJob can reconstruct the same credit-key // (and therefore find the same ledger row) without the caller needing // to repeat the license. export async function markJobCharged({ installId, license, creditKey = null, jobId, backend, tier }) { if (!jobId) return; pruneExpired(); const ck = creditKey || getCreditKey({ installId, license }); const k = key(ck, jobId); if (jobs.has(k) && !jobs.get(k).refunded) return; // Pull the license fingerprint off the credit-key so refund time // doesn't need to recompute it (we'd no longer have the license // object in scope on a restart-resume refund). const license_fingerprint = ck.startsWith("lic:") ? ck.slice("lic:".length) : null; jobs.set(k, { backend, tier, install_id: installId || null, license_fingerprint, charged_at: Date.now(), refunded: false, }); try { await persist(); } catch (err) { console.error(`[job-credits] persist failed after mark: ${err?.message || err}`); } } // Refund a previously charged credit for a failed job. Returns the // credit to the persistent ledger AND marks the in-memory job record // as refunded so a subsequent same-job_id call is treated as new. // // Idempotent: refunding an already-refunded job is a no-op, so call // sites can fire-and-forget on every error path without needing to // track whether they were the FIRST error path to refund. // // The refund routes back to the SAME row that was charged because we // stored the credit-key components (install_id + license_fingerprint) // at mark time — refundCredit reconstructs the key via getOrCreateRow, // which in turn calls getCreditKey on the same inputs. export async function refundJob({ installId, license, creditKey = null, jobId }) { if (!jobId) return; const ck = creditKey || getCreditKey({ installId, license }); const k = key(ck, jobId); const existing = jobs.get(k); if (!existing || existing.refunded) return; existing.refunded = true; try { // Route the refund back to the SAME row that was charged. The // credit-key was computed at mark time from the same (installId, // license) the caller is passing now — `ck` here equals the // mark-time key whenever the caller is consistent. Pass it as an // explicit creditKey override so refundCredit doesn't need to // re-derive it from a license object the caller might not have // (e.g. some error paths refund without re-resolving the license). await refundCredit({ installId: existing.install_id || installId || null, creditKey: ck, backend: existing.backend, tier: existing.tier, }); } catch (err) { console.error( `[job-credits] refundCredit failed for ${ck}|${jobId}: ${err?.message || err}` ); } try { await persist(); } catch (err) { console.error(`[job-credits] persist failed after refund: ${err?.message || err}`); } } function pruneExpired() { const cutoff = Date.now() - JOB_TTL_MS; for (const [k, v] of jobs) { if (v.charged_at < cutoff) jobs.delete(k); } } export function snapshotJobs() { pruneExpired(); return Array.from(jobs.entries()).map(([k, v]) => ({ key: k, ...v })); }