Files

164 lines
5.2 KiB
JavaScript

// Per-job output storage. After a transcribe + analyze cycle
// completes, the worker calls saveJobOutput() to persist the
// transcript + analysis JSON to /data/relay-outputs/<job_id>.json.
// The operator dashboard surfaces these as a "View" link per job
// that opens a Recap-style two-pane render in a new tab.
//
// Storage policy:
// - Test-run jobs (source = "admin-test") are ALWAYS saved
// - Real-user jobs are saved only when relay_save_user_outputs
// is true in the operator config (default false for privacy)
//
// Storage format (per file):
// {
// job_id: string
// batch_id: string | null
// source: "admin-test" | null
// saved_at: ms-epoch
// transcript: string ("[MM:SS] line\n[MM:SS] line...")
// analysis: { sections: [{title, summary, startIndex, endIndex}, ...] }
// meta: {
// title, media_url, audio_seconds, audio_bytes,
// transcribe_backend, transcribe_model,
// analyze_backend, analyze_model,
// transcribe_ms, analyze_ms, wall_time_ms,
// captions_mode
// }
// }
//
// Files are simple JSON — no index, no DB. Listing scans the dir;
// deletion just rm's the file. Cheap up to thousands of entries; if
// the operator hits scale, swap in a SQLite index without changing
// the on-disk format.
import fs from "fs/promises";
import path from "path";
let outputDir = "/data/relay-outputs";
export async function initOutputStore({ dataDir }) {
outputDir = path.join(dataDir, "relay-outputs");
try {
await fs.mkdir(outputDir, { recursive: true, mode: 0o700 });
} catch (err) {
console.warn(`[output-store] mkdir failed: ${err?.message || err}`);
}
console.log(`[output-store] writing to ${outputDir}`);
}
// Path constructor with light sanitization — job_id is a UUID-style
// string, but filter out anything that could traverse the filesystem
// just in case the upstream ID generator changes.
function pathFor(jobId) {
const safe = String(jobId || "").replace(/[^a-zA-Z0-9_-]/g, "");
if (!safe) throw new Error("invalid job_id");
return path.join(outputDir, `${safe}.json`);
}
// Save a job's transcript + analysis to disk. Best-effort: on write
// failure, log and continue — the audit log remains the source of
// truth for whether the job ran.
export async function saveJobOutput(jobId, payload) {
try {
const filePath = pathFor(jobId);
const body = JSON.stringify(
{ job_id: jobId, saved_at: Date.now(), ...payload },
null,
2
);
await fs.writeFile(filePath, body, { mode: 0o600 });
} catch (err) {
console.warn(
`[output-store] save failed for ${jobId}: ${err?.message || err}`
);
}
}
// Read a single job's stored output. Returns null when missing —
// the route layer should turn that into a 404.
export async function getJobOutput(jobId) {
try {
const filePath = pathFor(jobId);
const raw = await fs.readFile(filePath, "utf8");
return JSON.parse(raw);
} catch (err) {
if (err.code === "ENOENT") return null;
console.warn(
`[output-store] read failed for ${jobId}: ${err?.message || err}`
);
return null;
}
}
// Check existence cheaply (stat) without reading the file body —
// the Jobs table only needs a has_output boolean per row, not the
// full payload, and scanning thousands of stats is much cheaper
// than reading thousands of files into memory.
export async function listJobOutputIds() {
try {
const files = await fs.readdir(outputDir);
return files
.filter((f) => f.endsWith(".json"))
.map((f) => f.replace(/\.json$/, ""));
} catch (err) {
if (err.code === "ENOENT") return [];
console.warn(
`[output-store] list failed: ${err?.message || err}`
);
return [];
}
}
// Delete one job's output. Returns true on success, false when
// the file didn't exist.
export async function deleteJobOutput(jobId) {
try {
await fs.unlink(pathFor(jobId));
return true;
} catch (err) {
if (err.code === "ENOENT") return false;
throw err;
}
}
// Bulk delete. Accepts either an array of job_ids or { all: true }.
// Returns { deleted, missing } for caller reporting.
export async function bulkDeleteOutputs({ jobIds, all }) {
let deleted = 0;
let missing = 0;
if (all) {
const ids = await listJobOutputIds();
for (const id of ids) {
const ok = await deleteJobOutput(id).catch(() => false);
if (ok) deleted++;
}
return { deleted, missing };
}
if (!Array.isArray(jobIds)) return { deleted: 0, missing: 0 };
for (const id of jobIds) {
const ok = await deleteJobOutput(id).catch(() => false);
if (ok) deleted++;
else missing++;
}
return { deleted, missing };
}
// Aggregate stats for the dashboard "Stored outputs" mini-panel.
export async function getStoredOutputsSummary() {
try {
const files = await fs.readdir(outputDir);
const jsonFiles = files.filter((f) => f.endsWith(".json"));
let totalBytes = 0;
for (const f of jsonFiles) {
try {
const s = await fs.stat(path.join(outputDir, f));
totalBytes += s.size;
} catch {}
}
return { count: jsonFiles.length, total_bytes: totalBytes };
} catch (err) {
if (err.code === "ENOENT") return { count: 0, total_bytes: 0 };
return { count: 0, total_bytes: 0, error: err?.message };
}
}