// Daily Digest — per-episode overview synthesis (multi-mode / cloud). // // Phase 2 of the Daily Digest feature: turn a saved recap's stored topic // summaries into a 1–2 paragraph overview via the relay LLM, then cache // the result back onto the session JSON (`digestOverview`) so it's // generated at most once per episode. The daily scan + email (phase 3) // will call getOrCreateEpisodeOverview(); no scheduler lives here yet. // // Cost ownership (Q1 = operator-absorbed): the synthesis call uses the // OPERATOR's relay identity — the same credit pool that free signed-in // users' summaries already draw from (resolveProviderOpts with req=null // → operator install identity). A retention email shouldn't silently // drain the recipient's quota for recaps they already made. To bill the // recipient instead, build the provider with their cloud identity at the // one marked line below. import { randomBytes } from "crypto"; import { getProvider, resolveProviderOpts } from "./providers/index.js"; import { patchSession, loadSession, listScopeSessions } from "./history.js"; import { getDb } from "./db.js"; import { sendMail, isSmtpReady } from "./smtp.js"; import { renderDigestEmail } from "./email-template.js"; import { getConfigSnapshot } from "./config.js"; // Operator-internal vocabulary the sibling relay could surface in model // output (backend / hardware names, LAN hosts). Scrubbed before any // digest text reaches a cloud user — the same error-boundary rule the // rest of the app follows. This is a backstop: the synthesis input is // the recap's own (already user-facing) topic summaries, so a leak here // is unlikely. Kept conservative to avoid mangling legitimate prose — // only unambiguous infra tokens and private/LAN hosts, never common // words or public data. const OPERATOR_TERMS = [ /\bspark[\s-]?control\b/gi, /\bparakeet\b/gi, /\bsortformer\b/gi, /\btitanet\b/gi, /\bvllm\b/gi, ]; const LAN_HOST_RE = /\bhttps?:\/\/[^\s)]*\.local\b[^\s)]*/gi; const PRIVATE_IP_RE = /\b(?:10|127)\.\d{1,3}\.\d{1,3}\.\d{1,3}\b|\b192\.168\.\d{1,3}\.\d{1,3}\b|\b172\.(?:1[6-9]|2\d|3[01])\.\d{1,3}\.\d{1,3}\b/g; export function scrubOperatorStrings(text) { if (!text) return ""; let out = String(text); for (const re of OPERATOR_TERMS) out = out.replace(re, ""); out = out.replace(LAN_HOST_RE, ""); out = out.replace(PRIVATE_IP_RE, ""); // Tidy whitespace / orphaned punctuation the removals may have left. return out .replace(/[ \t]{2,}/g, " ") .replace(/\s+([.,;:])/g, "$1") .trim(); } // Build the LLM prompt from a saved recap record. Pure — exported for // testing. Uses each topic's title + summary (the chunk shape is // { title, summary, … }); a topic with no summary still contributes its // title so the overview knows it was covered. export function buildOverviewPrompt(record) { const title = (record?.title || "Untitled").trim(); const type = record?.type === "podcast" ? "podcast episode" : record?.type === "youtube" ? "video" : "recording"; const topics = Array.isArray(record?.chunks) ? record.chunks : []; const topicBlock = topics .map((c, i) => { const t = (c?.title || `Topic ${i + 1}`).trim(); const s = (c?.summary || "").trim(); return s ? `- ${t}: ${s}` : `- ${t}`; }) .join("\n"); return [ `Below are the per-topic summaries of a ${type} titled "${title}".`, "", "Write a tight 1–2 paragraph overview (about 100–150 words) that captures " + "the main throughline and the few most important takeaways, as if briefing " + "a busy reader who hasn't seen it. Do not invent anything beyond the " + "summaries below, use no headings or bullet points, and write in plain prose.", "", "Topic summaries:", topicBlock, ].join("\n"); } // Operator-identity relay provider for synthesis (operator-absorbed). // Throws if the relay isn't configured (no install id / base URL) — the // caller treats that as "skip this episode", not a fatal error. function buildSynthesisProvider() { // req=null → operator install identity. Swap in a per-recipient cloud // identity here to bill the user instead of the operator. const opts = resolveProviderOpts("relay", { req: null }); return getProvider("relay", opts); } // Synthesize (no cache) — call the relay, scrub, return the overview // text. Throws on no-topics or an empty model result. `provider` is // injectable for testing; defaults to the operator-identity relay. export async function synthesizeEpisodeOverview(record, { provider } = {}) { const topics = Array.isArray(record?.chunks) ? record.chunks : []; if (topics.length === 0) { throw new Error("no topic summaries to synthesize"); } const p = provider || buildSynthesisProvider(); const prompt = buildOverviewPrompt(record); const result = await p.analyzeText({ prompt, retries: 1, // Stable per-episode billing key: a retry within the relay's job // window reuses the same credit rather than charging twice. jobId: record?.id ? `digest-${record.id}` : undefined, }); const text = scrubOperatorStrings(result?.text || ""); if (!text) throw new Error("empty synthesis result"); return text; } // Get-or-generate the cached overview. Returns { overview, cached }. On a // cache miss it synthesizes and (unless save:false) writes the result // back onto the session JSON so the next caller is a cache hit. The // write-back is best-effort — a failed patch just means we re-synthesize // next time, never a user-visible error. export async function getOrCreateEpisodeOverview({ scope, id, record, provider, save = true, }) { const cached = (record?.digestOverview || "").trim(); if (cached) return { overview: cached, cached: true }; const overview = await synthesizeEpisodeOverview(record, { provider }); if (save && scope && id) { try { await patchSession(scope, id, { digestOverview: overview }); } catch { // best-effort cache; ignore } } return { overview, cached: false }; } // ── Daily scan + scheduler (mirrors subscription-reminders.js) ────────── const SEND_HOUR = 8; // 08:00 server-local — when the daily scan acts const SCAN_INTERVAL_MS = 60 * 60 * 1000; // tick hourly; act only at SEND_HOUR const BOOT_DELAY_MS = 2 * 60 * 1000; // A user gets at most one digest per ~day even if the loop ticks more // than once inside the send hour or they add content right after a send. const MIN_RESEND_MS = 20 * 60 * 60 * 1000; const MAX_EPISODES = 10; // cap per email; the rest become an overflow count let scanning = false; let scheduled = false; // Which library scope a user's recaps live under. Mirrors // history.js scopeForRequest: the multi-mode admin keeps the "owner" // scope; everyone else is scoped by their user id. Pure — exported for // testing. export function scopeForUser(user) { return user?.is_admin ? "owner" : user?.id; } // Pick the recaps created after the watermark, oldest first, capped. // Pure — exported for testing. Returns { episodes, overflow, total }. export function selectDigestEpisodes(sessions, watermarkMs, cap = MAX_EPISODES) { const since = typeof watermarkMs === "number" ? watermarkMs : 0; const fresh = (sessions || []) .filter((s) => { const t = new Date(s?.createdAt).getTime(); return Number.isFinite(t) && t > since; }) .sort((a, b) => new Date(a.createdAt) - new Date(b.createdAt)); return { episodes: fresh.slice(0, cap), overflow: Math.max(0, fresh.length - cap), total: fresh.length, }; } function maskEmail(email) { return String(email).replace(/^(.).*(@.*)$/, "$1***$2"); } // Mint (and persist) a user's unsubscribe token if they don't have one // yet. Returns the token. Stable per user — re-enabling reuses it. function ensureUnsubToken(db, user) { if (user.digest_unsub_token) return user.digest_unsub_token; const token = randomBytes(32).toString("base64url"); db.prepare("UPDATE users SET digest_unsub_token = ? WHERE id = ?").run( token, user.id, ); return token; } // Build one user's digest: synthesize an overview per selected episode // (operator-absorbed, cached). Returns { built, failed } where built are // the episode payloads ready for the template (each carrying its source // createdAt) and failed is the createdAt list of episodes that errored — // the caller uses both to set a watermark that never skips a failure. async function buildUserEpisodes(scope, selected) { const built = []; const failed = []; for (const ep of selected) { try { const record = await loadSession(scope, ep.id); if (!record) { failed.push(ep.createdAt); continue; } const { overview } = await getOrCreateEpisodeOverview({ scope, id: ep.id, record, }); built.push({ title: ep.title, type: ep.type, url: ep.url, overview, createdAt: ep.createdAt, }); } catch (err) { failed.push(ep.createdAt); console.warn( `[digest] synthesis failed for ${scope}/${ep.id}: ${err?.message || err}`, ); } } return { built, failed }; } // The watermark to stamp after a send. Advances to the newest // successfully-sent recap, but never past the oldest one that FAILED (so // the next scan retries gaps) and never past un-synthesized overflow // recaps (their createdAt is newer than any sent one, so they're picked // up next scan too). Pure — exported for testing. Returns null when // nothing was sent (caller should not advance). createdAt inputs are ISO // strings; output is ms epoch. export function nextDigestWatermark(sentCreatedAts, failedCreatedAts) { const toMs = (x) => new Date(x).getTime(); const sent = (sentCreatedAts || []).map(toMs).filter(Number.isFinite); if (sent.length === 0) return null; const failed = (failedCreatedAts || []).map(toMs).filter(Number.isFinite); const newestSent = Math.max(...sent); const oldestFailed = failed.length ? Math.min(...failed) : Infinity; return Math.min(newestSent, oldestFailed - 1); } // One scan pass. Self-gating, deduped, NEVER throws — returns a small // summary so the scheduler stays alive. `force` bypasses the send-hour // gate (used by the operator test trigger), not the per-user resend gate. export async function runDigestScan({ force = false } = {}) { // `force` bypasses the send-hour gate (operator test trigger), NOT the // in-progress lock — a forced run alongside the scheduled tick would // otherwise double-send to every opted-in user. if (scanning) return { skipped: "already_running" }; scanning = true; try { const now = Date.now(); if (!force && new Date(now).getHours() !== SEND_HOUR) { return { skipped: "off_hour" }; } if (!isSmtpReady()) return { skipped: "smtp_not_ready" }; const snap = await getConfigSnapshot(); const publicUrl = (snap.recap_public_url || "").trim().replace(/\/$/, ""); if (!publicUrl) return { skipped: "public_url_not_set" }; const db = getDb(); const users = db .prepare( "SELECT id, email, is_admin, last_digest_at, digest_unsub_token FROM users WHERE digest_enabled = 1", ) .all(); let sent = 0; let skipped = 0; for (const user of users) { try { const email = (user.email || "").trim(); if (!email) { skipped++; continue; } // Defensive: a row with no watermark (set via SQL, not the opt-in // endpoint) would dump the whole backlog — start the clock now // and pick up new recaps next scan instead. if (typeof user.last_digest_at !== "number") { db.prepare("UPDATE users SET last_digest_at = ? WHERE id = ?").run( now, user.id, ); skipped++; continue; } if (now - user.last_digest_at < MIN_RESEND_MS) { skipped++; continue; } const scope = scopeForUser(user); if (!scope) { // No usable id (shouldn't happen for a real row) — skip rather // than read an "undefined" scope dir. skipped++; continue; } const sessions = await listScopeSessions(scope); const { episodes: selected, overflow } = selectDigestEpisodes( sessions, user.last_digest_at, MAX_EPISODES, ); if (selected.length === 0) { skipped++; // nothing new — skip the email, leave the watermark continue; } const { built, failed } = await buildUserEpisodes(scope, selected); if (built.length === 0) { // Synthesis failed for all of them — don't advance the // watermark, so the next scan retries the same recaps. skipped++; continue; } const token = ensureUnsubToken(db, user); const message = renderDigestEmail({ brandName: "Recaps", episodes: built, overflowCount: overflow, manageUrl: `${publicUrl}/`, unsubscribeUrl: `${publicUrl}/api/digest/unsubscribe?token=${encodeURIComponent(token)}`, }); await sendMail({ to: email, subject: message.subject, text: message.text, html: message.html, }); // Advance the watermark only after a successful send — to the // newest sent recap, but never past a failed or deferred one, so // the next scan retries gaps instead of skipping them. const watermark = nextDigestWatermark( built.map((e) => e.createdAt), failed, ); db.prepare("UPDATE users SET last_digest_at = ? WHERE id = ?").run( watermark ?? now, user.id, ); sent++; console.log( `[digest] sent to ${maskEmail(email)} (${episodes.length} recap${episodes.length === 1 ? "" : "s"}${overflow ? `, +${overflow} more` : ""})`, ); } catch (err) { console.warn( `[digest] user ${user.id} failed: ${err?.message || err}`, ); skipped++; } } if (sent) { console.log(`[digest] scan complete: ${sent} sent, ${skipped} skipped`); } return { sent, skipped }; } catch (err) { console.warn(`[digest] scan error: ${err?.message || err}`); return { skipped: "error", error: err?.message || String(err) }; } finally { scanning = false; } } // Start the hourly scan loop. Idempotent; self-gates inside the scan, so // it's safe to call whenever multi mode boots. export function startDigestScheduler() { if (scheduled) return; scheduled = true; setTimeout(() => { runDigestScan().catch(() => {}); }, BOOT_DELAY_MS); setInterval(() => { runDigestScan().catch(() => {}); }, SCAN_INTERVAL_MS); console.log("[digest] daily-digest scheduler started"); } // One-click unsubscribe — a public GET (no session) keyed by the per-user // token in the email. Mounted in index.js (multi mode) and whitelisted in // tenant-auth's public paths. Flips digest_enabled off; the in-app toggle // can turn it back on. export function setupDigestRoutes(app) { app.get("/api/digest/unsubscribe", (req, res) => { const token = String(req.query?.token || "").trim(); const page = (msg) => `