Files
recap/server/daily-digest.js
Keysat b4fa5d7be8 Add opt-in Daily Digest (daily email of last 24h of library recaps)
Multi-mode, off by default. Each new recap is synthesized into a 1-2
paragraph overview via the relay (operator-absorbed) and cached onto the
session JSON; a daily 08:00 scan emails opted-in users their fresh
recaps, deduped by a per-user watermark that never skips a failed or
over-cap recap. One-click tokenized unsubscribe; settings-modal toggle;
admin test trigger. Bumps to 0.2.158.
2026-06-15 19:50:48 -05:00

427 lines
16 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Daily Digest — per-episode overview synthesis (multi-mode / cloud).
//
// Phase 2 of the Daily Digest feature: turn a saved recap's stored topic
// summaries into a 12 paragraph overview via the relay LLM, then cache
// the result back onto the session JSON (`digestOverview`) so it's
// generated at most once per episode. The daily scan + email (phase 3)
// will call getOrCreateEpisodeOverview(); no scheduler lives here yet.
//
// Cost ownership (Q1 = operator-absorbed): the synthesis call uses the
// OPERATOR's relay identity — the same credit pool that free signed-in
// users' summaries already draw from (resolveProviderOpts with req=null
// → operator install identity). A retention email shouldn't silently
// drain the recipient's quota for recaps they already made. To bill the
// recipient instead, build the provider with their cloud identity at the
// one marked line below.
import { randomBytes } from "crypto";
import { getProvider, resolveProviderOpts } from "./providers/index.js";
import { patchSession, loadSession, listScopeSessions } from "./history.js";
import { getDb } from "./db.js";
import { sendMail, isSmtpReady } from "./smtp.js";
import { renderDigestEmail } from "./email-template.js";
import { getConfigSnapshot } from "./config.js";
// Operator-internal vocabulary the sibling relay could surface in model
// output (backend / hardware names, LAN hosts). Scrubbed before any
// digest text reaches a cloud user — the same error-boundary rule the
// rest of the app follows. This is a backstop: the synthesis input is
// the recap's own (already user-facing) topic summaries, so a leak here
// is unlikely. Kept conservative to avoid mangling legitimate prose —
// only unambiguous infra tokens and private/LAN hosts, never common
// words or public data.
const OPERATOR_TERMS = [
/\bspark[\s-]?control\b/gi,
/\bparakeet\b/gi,
/\bsortformer\b/gi,
/\btitanet\b/gi,
/\bvllm\b/gi,
];
const LAN_HOST_RE = /\bhttps?:\/\/[^\s)]*\.local\b[^\s)]*/gi;
const PRIVATE_IP_RE =
/\b(?:10|127)\.\d{1,3}\.\d{1,3}\.\d{1,3}\b|\b192\.168\.\d{1,3}\.\d{1,3}\b|\b172\.(?:1[6-9]|2\d|3[01])\.\d{1,3}\.\d{1,3}\b/g;
export function scrubOperatorStrings(text) {
if (!text) return "";
let out = String(text);
for (const re of OPERATOR_TERMS) out = out.replace(re, "");
out = out.replace(LAN_HOST_RE, "");
out = out.replace(PRIVATE_IP_RE, "");
// Tidy whitespace / orphaned punctuation the removals may have left.
return out
.replace(/[ \t]{2,}/g, " ")
.replace(/\s+([.,;:])/g, "$1")
.trim();
}
// Build the LLM prompt from a saved recap record. Pure — exported for
// testing. Uses each topic's title + summary (the chunk shape is
// { title, summary, … }); a topic with no summary still contributes its
// title so the overview knows it was covered.
export function buildOverviewPrompt(record) {
const title = (record?.title || "Untitled").trim();
const type =
record?.type === "podcast"
? "podcast episode"
: record?.type === "youtube"
? "video"
: "recording";
const topics = Array.isArray(record?.chunks) ? record.chunks : [];
const topicBlock = topics
.map((c, i) => {
const t = (c?.title || `Topic ${i + 1}`).trim();
const s = (c?.summary || "").trim();
return s ? `- ${t}: ${s}` : `- ${t}`;
})
.join("\n");
return [
`Below are the per-topic summaries of a ${type} titled "${title}".`,
"",
"Write a tight 12 paragraph overview (about 100150 words) that captures " +
"the main throughline and the few most important takeaways, as if briefing " +
"a busy reader who hasn't seen it. Do not invent anything beyond the " +
"summaries below, use no headings or bullet points, and write in plain prose.",
"",
"Topic summaries:",
topicBlock,
].join("\n");
}
// Operator-identity relay provider for synthesis (operator-absorbed).
// Throws if the relay isn't configured (no install id / base URL) — the
// caller treats that as "skip this episode", not a fatal error.
function buildSynthesisProvider() {
// req=null → operator install identity. Swap in a per-recipient cloud
// identity here to bill the user instead of the operator.
const opts = resolveProviderOpts("relay", { req: null });
return getProvider("relay", opts);
}
// Synthesize (no cache) — call the relay, scrub, return the overview
// text. Throws on no-topics or an empty model result. `provider` is
// injectable for testing; defaults to the operator-identity relay.
export async function synthesizeEpisodeOverview(record, { provider } = {}) {
const topics = Array.isArray(record?.chunks) ? record.chunks : [];
if (topics.length === 0) {
throw new Error("no topic summaries to synthesize");
}
const p = provider || buildSynthesisProvider();
const prompt = buildOverviewPrompt(record);
const result = await p.analyzeText({
prompt,
retries: 1,
// Stable per-episode billing key: a retry within the relay's job
// window reuses the same credit rather than charging twice.
jobId: record?.id ? `digest-${record.id}` : undefined,
});
const text = scrubOperatorStrings(result?.text || "");
if (!text) throw new Error("empty synthesis result");
return text;
}
// Get-or-generate the cached overview. Returns { overview, cached }. On a
// cache miss it synthesizes and (unless save:false) writes the result
// back onto the session JSON so the next caller is a cache hit. The
// write-back is best-effort — a failed patch just means we re-synthesize
// next time, never a user-visible error.
export async function getOrCreateEpisodeOverview({
scope,
id,
record,
provider,
save = true,
}) {
const cached = (record?.digestOverview || "").trim();
if (cached) return { overview: cached, cached: true };
const overview = await synthesizeEpisodeOverview(record, { provider });
if (save && scope && id) {
try {
await patchSession(scope, id, { digestOverview: overview });
} catch {
// best-effort cache; ignore
}
}
return { overview, cached: false };
}
// ── Daily scan + scheduler (mirrors subscription-reminders.js) ──────────
const SEND_HOUR = 8; // 08:00 server-local — when the daily scan acts
const SCAN_INTERVAL_MS = 60 * 60 * 1000; // tick hourly; act only at SEND_HOUR
const BOOT_DELAY_MS = 2 * 60 * 1000;
// A user gets at most one digest per ~day even if the loop ticks more
// than once inside the send hour or they add content right after a send.
const MIN_RESEND_MS = 20 * 60 * 60 * 1000;
const MAX_EPISODES = 10; // cap per email; the rest become an overflow count
let scanning = false;
let scheduled = false;
// Which library scope a user's recaps live under. Mirrors
// history.js scopeForRequest: the multi-mode admin keeps the "owner"
// scope; everyone else is scoped by their user id. Pure — exported for
// testing.
export function scopeForUser(user) {
return user?.is_admin ? "owner" : user?.id;
}
// Pick the recaps created after the watermark, oldest first, capped.
// Pure — exported for testing. Returns { episodes, overflow, total }.
export function selectDigestEpisodes(sessions, watermarkMs, cap = MAX_EPISODES) {
const since = typeof watermarkMs === "number" ? watermarkMs : 0;
const fresh = (sessions || [])
.filter((s) => {
const t = new Date(s?.createdAt).getTime();
return Number.isFinite(t) && t > since;
})
.sort((a, b) => new Date(a.createdAt) - new Date(b.createdAt));
return {
episodes: fresh.slice(0, cap),
overflow: Math.max(0, fresh.length - cap),
total: fresh.length,
};
}
function maskEmail(email) {
return String(email).replace(/^(.).*(@.*)$/, "$1***$2");
}
// Mint (and persist) a user's unsubscribe token if they don't have one
// yet. Returns the token. Stable per user — re-enabling reuses it.
function ensureUnsubToken(db, user) {
if (user.digest_unsub_token) return user.digest_unsub_token;
const token = randomBytes(32).toString("base64url");
db.prepare("UPDATE users SET digest_unsub_token = ? WHERE id = ?").run(
token,
user.id,
);
return token;
}
// Build one user's digest: synthesize an overview per selected episode
// (operator-absorbed, cached). Returns { built, failed } where built are
// the episode payloads ready for the template (each carrying its source
// createdAt) and failed is the createdAt list of episodes that errored —
// the caller uses both to set a watermark that never skips a failure.
async function buildUserEpisodes(scope, selected) {
const built = [];
const failed = [];
for (const ep of selected) {
try {
const record = await loadSession(scope, ep.id);
if (!record) {
failed.push(ep.createdAt);
continue;
}
const { overview } = await getOrCreateEpisodeOverview({
scope,
id: ep.id,
record,
});
built.push({
title: ep.title,
type: ep.type,
url: ep.url,
overview,
createdAt: ep.createdAt,
});
} catch (err) {
failed.push(ep.createdAt);
console.warn(
`[digest] synthesis failed for ${scope}/${ep.id}: ${err?.message || err}`,
);
}
}
return { built, failed };
}
// The watermark to stamp after a send. Advances to the newest
// successfully-sent recap, but never past the oldest one that FAILED (so
// the next scan retries gaps) and never past un-synthesized overflow
// recaps (their createdAt is newer than any sent one, so they're picked
// up next scan too). Pure — exported for testing. Returns null when
// nothing was sent (caller should not advance). createdAt inputs are ISO
// strings; output is ms epoch.
export function nextDigestWatermark(sentCreatedAts, failedCreatedAts) {
const toMs = (x) => new Date(x).getTime();
const sent = (sentCreatedAts || []).map(toMs).filter(Number.isFinite);
if (sent.length === 0) return null;
const failed = (failedCreatedAts || []).map(toMs).filter(Number.isFinite);
const newestSent = Math.max(...sent);
const oldestFailed = failed.length ? Math.min(...failed) : Infinity;
return Math.min(newestSent, oldestFailed - 1);
}
// One scan pass. Self-gating, deduped, NEVER throws — returns a small
// summary so the scheduler stays alive. `force` bypasses the send-hour
// gate (used by the operator test trigger), not the per-user resend gate.
export async function runDigestScan({ force = false } = {}) {
// `force` bypasses the send-hour gate (operator test trigger), NOT the
// in-progress lock — a forced run alongside the scheduled tick would
// otherwise double-send to every opted-in user.
if (scanning) return { skipped: "already_running" };
scanning = true;
try {
const now = Date.now();
if (!force && new Date(now).getHours() !== SEND_HOUR) {
return { skipped: "off_hour" };
}
if (!isSmtpReady()) return { skipped: "smtp_not_ready" };
const snap = await getConfigSnapshot();
const publicUrl = (snap.recap_public_url || "").trim().replace(/\/$/, "");
if (!publicUrl) return { skipped: "public_url_not_set" };
const db = getDb();
const users = db
.prepare(
"SELECT id, email, is_admin, last_digest_at, digest_unsub_token FROM users WHERE digest_enabled = 1",
)
.all();
let sent = 0;
let skipped = 0;
for (const user of users) {
try {
const email = (user.email || "").trim();
if (!email) {
skipped++;
continue;
}
// Defensive: a row with no watermark (set via SQL, not the opt-in
// endpoint) would dump the whole backlog — start the clock now
// and pick up new recaps next scan instead.
if (typeof user.last_digest_at !== "number") {
db.prepare("UPDATE users SET last_digest_at = ? WHERE id = ?").run(
now,
user.id,
);
skipped++;
continue;
}
if (now - user.last_digest_at < MIN_RESEND_MS) {
skipped++;
continue;
}
const scope = scopeForUser(user);
if (!scope) {
// No usable id (shouldn't happen for a real row) — skip rather
// than read an "undefined" scope dir.
skipped++;
continue;
}
const sessions = await listScopeSessions(scope);
const { episodes: selected, overflow } = selectDigestEpisodes(
sessions,
user.last_digest_at,
MAX_EPISODES,
);
if (selected.length === 0) {
skipped++; // nothing new — skip the email, leave the watermark
continue;
}
const { built, failed } = await buildUserEpisodes(scope, selected);
if (built.length === 0) {
// Synthesis failed for all of them — don't advance the
// watermark, so the next scan retries the same recaps.
skipped++;
continue;
}
const token = ensureUnsubToken(db, user);
const message = renderDigestEmail({
brandName: "Recaps",
episodes: built,
overflowCount: overflow,
manageUrl: `${publicUrl}/`,
unsubscribeUrl: `${publicUrl}/api/digest/unsubscribe?token=${encodeURIComponent(token)}`,
});
await sendMail({
to: email,
subject: message.subject,
text: message.text,
html: message.html,
});
// Advance the watermark only after a successful send — to the
// newest sent recap, but never past a failed or deferred one, so
// the next scan retries gaps instead of skipping them.
const watermark = nextDigestWatermark(
built.map((e) => e.createdAt),
failed,
);
db.prepare("UPDATE users SET last_digest_at = ? WHERE id = ?").run(
watermark ?? now,
user.id,
);
sent++;
console.log(
`[digest] sent to ${maskEmail(email)} (${episodes.length} recap${episodes.length === 1 ? "" : "s"}${overflow ? `, +${overflow} more` : ""})`,
);
} catch (err) {
console.warn(
`[digest] user ${user.id} failed: ${err?.message || err}`,
);
skipped++;
}
}
if (sent) {
console.log(`[digest] scan complete: ${sent} sent, ${skipped} skipped`);
}
return { sent, skipped };
} catch (err) {
console.warn(`[digest] scan error: ${err?.message || err}`);
return { skipped: "error", error: err?.message || String(err) };
} finally {
scanning = false;
}
}
// Start the hourly scan loop. Idempotent; self-gates inside the scan, so
// it's safe to call whenever multi mode boots.
export function startDigestScheduler() {
if (scheduled) return;
scheduled = true;
setTimeout(() => {
runDigestScan().catch(() => {});
}, BOOT_DELAY_MS);
setInterval(() => {
runDigestScan().catch(() => {});
}, SCAN_INTERVAL_MS);
console.log("[digest] daily-digest scheduler started");
}
// One-click unsubscribe — a public GET (no session) keyed by the per-user
// token in the email. Mounted in index.js (multi mode) and whitelisted in
// tenant-auth's public paths. Flips digest_enabled off; the in-app toggle
// can turn it back on.
export function setupDigestRoutes(app) {
app.get("/api/digest/unsubscribe", (req, res) => {
const token = String(req.query?.token || "").trim();
const page = (msg) =>
`<!doctype html><html><body style="margin:0;padding:48px 16px;background:#fafafa;font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',Roboto,sans-serif;text-align:center;color:#333;"><div style="max-width:420px;margin:0 auto;background:#fff;border-radius:8px;padding:32px;font-size:15px;line-height:1.6;">${msg}</div></body></html>`;
if (!token) {
return res.status(400).send(page("Invalid unsubscribe link."));
}
try {
const r = getDb()
.prepare(
"UPDATE users SET digest_enabled = 0 WHERE digest_unsub_token = ?",
)
.run(token);
if (r.changes === 0) {
return res
.status(404)
.send(page("This unsubscribe link is no longer valid."));
}
return res.send(
page(
"You've been unsubscribed from the daily digest. You can turn it back on anytime in Settings.",
),
);
} catch (err) {
console.error("[digest] unsubscribe failed:", err);
return res
.status(500)
.send(page("Something went wrong. Please try again later."));
}
});
}