Add opt-in Daily Digest (daily email of last 24h of library recaps)
Multi-mode, off by default. Each new recap is synthesized into a 1-2 paragraph overview via the relay (operator-absorbed) and cached onto the session JSON; a daily 08:00 scan emails opted-in users their fresh recaps, deduped by a per-user watermark that never skips a failed or over-cap recap. One-click tokenized unsubscribe; settings-modal toggle; admin test trigger. Bumps to 0.2.158.
This commit is contained in:
@@ -0,0 +1,426 @@
|
||||
// Daily Digest — per-episode overview synthesis (multi-mode / cloud).
|
||||
//
|
||||
// Phase 2 of the Daily Digest feature: turn a saved recap's stored topic
|
||||
// summaries into a 1–2 paragraph overview via the relay LLM, then cache
|
||||
// the result back onto the session JSON (`digestOverview`) so it's
|
||||
// generated at most once per episode. The daily scan + email (phase 3)
|
||||
// will call getOrCreateEpisodeOverview(); no scheduler lives here yet.
|
||||
//
|
||||
// Cost ownership (Q1 = operator-absorbed): the synthesis call uses the
|
||||
// OPERATOR's relay identity — the same credit pool that free signed-in
|
||||
// users' summaries already draw from (resolveProviderOpts with req=null
|
||||
// → operator install identity). A retention email shouldn't silently
|
||||
// drain the recipient's quota for recaps they already made. To bill the
|
||||
// recipient instead, build the provider with their cloud identity at the
|
||||
// one marked line below.
|
||||
|
||||
import { randomBytes } from "crypto";
|
||||
import { getProvider, resolveProviderOpts } from "./providers/index.js";
|
||||
import { patchSession, loadSession, listScopeSessions } from "./history.js";
|
||||
import { getDb } from "./db.js";
|
||||
import { sendMail, isSmtpReady } from "./smtp.js";
|
||||
import { renderDigestEmail } from "./email-template.js";
|
||||
import { getConfigSnapshot } from "./config.js";
|
||||
|
||||
// Operator-internal vocabulary the sibling relay could surface in model
|
||||
// output (backend / hardware names, LAN hosts). Scrubbed before any
|
||||
// digest text reaches a cloud user — the same error-boundary rule the
|
||||
// rest of the app follows. This is a backstop: the synthesis input is
|
||||
// the recap's own (already user-facing) topic summaries, so a leak here
|
||||
// is unlikely. Kept conservative to avoid mangling legitimate prose —
|
||||
// only unambiguous infra tokens and private/LAN hosts, never common
|
||||
// words or public data.
|
||||
const OPERATOR_TERMS = [
|
||||
/\bspark[\s-]?control\b/gi,
|
||||
/\bparakeet\b/gi,
|
||||
/\bsortformer\b/gi,
|
||||
/\btitanet\b/gi,
|
||||
/\bvllm\b/gi,
|
||||
];
|
||||
const LAN_HOST_RE = /\bhttps?:\/\/[^\s)]*\.local\b[^\s)]*/gi;
|
||||
const PRIVATE_IP_RE =
|
||||
/\b(?:10|127)\.\d{1,3}\.\d{1,3}\.\d{1,3}\b|\b192\.168\.\d{1,3}\.\d{1,3}\b|\b172\.(?:1[6-9]|2\d|3[01])\.\d{1,3}\.\d{1,3}\b/g;
|
||||
|
||||
export function scrubOperatorStrings(text) {
|
||||
if (!text) return "";
|
||||
let out = String(text);
|
||||
for (const re of OPERATOR_TERMS) out = out.replace(re, "");
|
||||
out = out.replace(LAN_HOST_RE, "");
|
||||
out = out.replace(PRIVATE_IP_RE, "");
|
||||
// Tidy whitespace / orphaned punctuation the removals may have left.
|
||||
return out
|
||||
.replace(/[ \t]{2,}/g, " ")
|
||||
.replace(/\s+([.,;:])/g, "$1")
|
||||
.trim();
|
||||
}
|
||||
|
||||
// Build the LLM prompt from a saved recap record. Pure — exported for
|
||||
// testing. Uses each topic's title + summary (the chunk shape is
|
||||
// { title, summary, … }); a topic with no summary still contributes its
|
||||
// title so the overview knows it was covered.
|
||||
export function buildOverviewPrompt(record) {
|
||||
const title = (record?.title || "Untitled").trim();
|
||||
const type =
|
||||
record?.type === "podcast"
|
||||
? "podcast episode"
|
||||
: record?.type === "youtube"
|
||||
? "video"
|
||||
: "recording";
|
||||
const topics = Array.isArray(record?.chunks) ? record.chunks : [];
|
||||
const topicBlock = topics
|
||||
.map((c, i) => {
|
||||
const t = (c?.title || `Topic ${i + 1}`).trim();
|
||||
const s = (c?.summary || "").trim();
|
||||
return s ? `- ${t}: ${s}` : `- ${t}`;
|
||||
})
|
||||
.join("\n");
|
||||
return [
|
||||
`Below are the per-topic summaries of a ${type} titled "${title}".`,
|
||||
"",
|
||||
"Write a tight 1–2 paragraph overview (about 100–150 words) that captures " +
|
||||
"the main throughline and the few most important takeaways, as if briefing " +
|
||||
"a busy reader who hasn't seen it. Do not invent anything beyond the " +
|
||||
"summaries below, use no headings or bullet points, and write in plain prose.",
|
||||
"",
|
||||
"Topic summaries:",
|
||||
topicBlock,
|
||||
].join("\n");
|
||||
}
|
||||
|
||||
// Operator-identity relay provider for synthesis (operator-absorbed).
|
||||
// Throws if the relay isn't configured (no install id / base URL) — the
|
||||
// caller treats that as "skip this episode", not a fatal error.
|
||||
function buildSynthesisProvider() {
|
||||
// req=null → operator install identity. Swap in a per-recipient cloud
|
||||
// identity here to bill the user instead of the operator.
|
||||
const opts = resolveProviderOpts("relay", { req: null });
|
||||
return getProvider("relay", opts);
|
||||
}
|
||||
|
||||
// Synthesize (no cache) — call the relay, scrub, return the overview
|
||||
// text. Throws on no-topics or an empty model result. `provider` is
|
||||
// injectable for testing; defaults to the operator-identity relay.
|
||||
export async function synthesizeEpisodeOverview(record, { provider } = {}) {
|
||||
const topics = Array.isArray(record?.chunks) ? record.chunks : [];
|
||||
if (topics.length === 0) {
|
||||
throw new Error("no topic summaries to synthesize");
|
||||
}
|
||||
const p = provider || buildSynthesisProvider();
|
||||
const prompt = buildOverviewPrompt(record);
|
||||
const result = await p.analyzeText({
|
||||
prompt,
|
||||
retries: 1,
|
||||
// Stable per-episode billing key: a retry within the relay's job
|
||||
// window reuses the same credit rather than charging twice.
|
||||
jobId: record?.id ? `digest-${record.id}` : undefined,
|
||||
});
|
||||
const text = scrubOperatorStrings(result?.text || "");
|
||||
if (!text) throw new Error("empty synthesis result");
|
||||
return text;
|
||||
}
|
||||
|
||||
// Get-or-generate the cached overview. Returns { overview, cached }. On a
|
||||
// cache miss it synthesizes and (unless save:false) writes the result
|
||||
// back onto the session JSON so the next caller is a cache hit. The
|
||||
// write-back is best-effort — a failed patch just means we re-synthesize
|
||||
// next time, never a user-visible error.
|
||||
export async function getOrCreateEpisodeOverview({
|
||||
scope,
|
||||
id,
|
||||
record,
|
||||
provider,
|
||||
save = true,
|
||||
}) {
|
||||
const cached = (record?.digestOverview || "").trim();
|
||||
if (cached) return { overview: cached, cached: true };
|
||||
const overview = await synthesizeEpisodeOverview(record, { provider });
|
||||
if (save && scope && id) {
|
||||
try {
|
||||
await patchSession(scope, id, { digestOverview: overview });
|
||||
} catch {
|
||||
// best-effort cache; ignore
|
||||
}
|
||||
}
|
||||
return { overview, cached: false };
|
||||
}
|
||||
|
||||
// ── Daily scan + scheduler (mirrors subscription-reminders.js) ──────────
|
||||
|
||||
const SEND_HOUR = 8; // 08:00 server-local — when the daily scan acts
|
||||
const SCAN_INTERVAL_MS = 60 * 60 * 1000; // tick hourly; act only at SEND_HOUR
|
||||
const BOOT_DELAY_MS = 2 * 60 * 1000;
|
||||
// A user gets at most one digest per ~day even if the loop ticks more
|
||||
// than once inside the send hour or they add content right after a send.
|
||||
const MIN_RESEND_MS = 20 * 60 * 60 * 1000;
|
||||
const MAX_EPISODES = 10; // cap per email; the rest become an overflow count
|
||||
|
||||
let scanning = false;
|
||||
let scheduled = false;
|
||||
|
||||
// Which library scope a user's recaps live under. Mirrors
|
||||
// history.js scopeForRequest: the multi-mode admin keeps the "owner"
|
||||
// scope; everyone else is scoped by their user id. Pure — exported for
|
||||
// testing.
|
||||
export function scopeForUser(user) {
|
||||
return user?.is_admin ? "owner" : user?.id;
|
||||
}
|
||||
|
||||
// Pick the recaps created after the watermark, oldest first, capped.
|
||||
// Pure — exported for testing. Returns { episodes, overflow, total }.
|
||||
export function selectDigestEpisodes(sessions, watermarkMs, cap = MAX_EPISODES) {
|
||||
const since = typeof watermarkMs === "number" ? watermarkMs : 0;
|
||||
const fresh = (sessions || [])
|
||||
.filter((s) => {
|
||||
const t = new Date(s?.createdAt).getTime();
|
||||
return Number.isFinite(t) && t > since;
|
||||
})
|
||||
.sort((a, b) => new Date(a.createdAt) - new Date(b.createdAt));
|
||||
return {
|
||||
episodes: fresh.slice(0, cap),
|
||||
overflow: Math.max(0, fresh.length - cap),
|
||||
total: fresh.length,
|
||||
};
|
||||
}
|
||||
|
||||
function maskEmail(email) {
|
||||
return String(email).replace(/^(.).*(@.*)$/, "$1***$2");
|
||||
}
|
||||
|
||||
// Mint (and persist) a user's unsubscribe token if they don't have one
|
||||
// yet. Returns the token. Stable per user — re-enabling reuses it.
|
||||
function ensureUnsubToken(db, user) {
|
||||
if (user.digest_unsub_token) return user.digest_unsub_token;
|
||||
const token = randomBytes(32).toString("base64url");
|
||||
db.prepare("UPDATE users SET digest_unsub_token = ? WHERE id = ?").run(
|
||||
token,
|
||||
user.id,
|
||||
);
|
||||
return token;
|
||||
}
|
||||
|
||||
// Build one user's digest: synthesize an overview per selected episode
|
||||
// (operator-absorbed, cached). Returns { built, failed } where built are
|
||||
// the episode payloads ready for the template (each carrying its source
|
||||
// createdAt) and failed is the createdAt list of episodes that errored —
|
||||
// the caller uses both to set a watermark that never skips a failure.
|
||||
async function buildUserEpisodes(scope, selected) {
|
||||
const built = [];
|
||||
const failed = [];
|
||||
for (const ep of selected) {
|
||||
try {
|
||||
const record = await loadSession(scope, ep.id);
|
||||
if (!record) {
|
||||
failed.push(ep.createdAt);
|
||||
continue;
|
||||
}
|
||||
const { overview } = await getOrCreateEpisodeOverview({
|
||||
scope,
|
||||
id: ep.id,
|
||||
record,
|
||||
});
|
||||
built.push({
|
||||
title: ep.title,
|
||||
type: ep.type,
|
||||
url: ep.url,
|
||||
overview,
|
||||
createdAt: ep.createdAt,
|
||||
});
|
||||
} catch (err) {
|
||||
failed.push(ep.createdAt);
|
||||
console.warn(
|
||||
`[digest] synthesis failed for ${scope}/${ep.id}: ${err?.message || err}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
return { built, failed };
|
||||
}
|
||||
|
||||
// The watermark to stamp after a send. Advances to the newest
|
||||
// successfully-sent recap, but never past the oldest one that FAILED (so
|
||||
// the next scan retries gaps) and never past un-synthesized overflow
|
||||
// recaps (their createdAt is newer than any sent one, so they're picked
|
||||
// up next scan too). Pure — exported for testing. Returns null when
|
||||
// nothing was sent (caller should not advance). createdAt inputs are ISO
|
||||
// strings; output is ms epoch.
|
||||
export function nextDigestWatermark(sentCreatedAts, failedCreatedAts) {
|
||||
const toMs = (x) => new Date(x).getTime();
|
||||
const sent = (sentCreatedAts || []).map(toMs).filter(Number.isFinite);
|
||||
if (sent.length === 0) return null;
|
||||
const failed = (failedCreatedAts || []).map(toMs).filter(Number.isFinite);
|
||||
const newestSent = Math.max(...sent);
|
||||
const oldestFailed = failed.length ? Math.min(...failed) : Infinity;
|
||||
return Math.min(newestSent, oldestFailed - 1);
|
||||
}
|
||||
|
||||
// One scan pass. Self-gating, deduped, NEVER throws — returns a small
|
||||
// summary so the scheduler stays alive. `force` bypasses the send-hour
|
||||
// gate (used by the operator test trigger), not the per-user resend gate.
|
||||
export async function runDigestScan({ force = false } = {}) {
|
||||
// `force` bypasses the send-hour gate (operator test trigger), NOT the
|
||||
// in-progress lock — a forced run alongside the scheduled tick would
|
||||
// otherwise double-send to every opted-in user.
|
||||
if (scanning) return { skipped: "already_running" };
|
||||
scanning = true;
|
||||
try {
|
||||
const now = Date.now();
|
||||
if (!force && new Date(now).getHours() !== SEND_HOUR) {
|
||||
return { skipped: "off_hour" };
|
||||
}
|
||||
if (!isSmtpReady()) return { skipped: "smtp_not_ready" };
|
||||
const snap = await getConfigSnapshot();
|
||||
const publicUrl = (snap.recap_public_url || "").trim().replace(/\/$/, "");
|
||||
if (!publicUrl) return { skipped: "public_url_not_set" };
|
||||
|
||||
const db = getDb();
|
||||
const users = db
|
||||
.prepare(
|
||||
"SELECT id, email, is_admin, last_digest_at, digest_unsub_token FROM users WHERE digest_enabled = 1",
|
||||
)
|
||||
.all();
|
||||
|
||||
let sent = 0;
|
||||
let skipped = 0;
|
||||
for (const user of users) {
|
||||
try {
|
||||
const email = (user.email || "").trim();
|
||||
if (!email) {
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
// Defensive: a row with no watermark (set via SQL, not the opt-in
|
||||
// endpoint) would dump the whole backlog — start the clock now
|
||||
// and pick up new recaps next scan instead.
|
||||
if (typeof user.last_digest_at !== "number") {
|
||||
db.prepare("UPDATE users SET last_digest_at = ? WHERE id = ?").run(
|
||||
now,
|
||||
user.id,
|
||||
);
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
if (now - user.last_digest_at < MIN_RESEND_MS) {
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
const scope = scopeForUser(user);
|
||||
if (!scope) {
|
||||
// No usable id (shouldn't happen for a real row) — skip rather
|
||||
// than read an "undefined" scope dir.
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
const sessions = await listScopeSessions(scope);
|
||||
const { episodes: selected, overflow } = selectDigestEpisodes(
|
||||
sessions,
|
||||
user.last_digest_at,
|
||||
MAX_EPISODES,
|
||||
);
|
||||
if (selected.length === 0) {
|
||||
skipped++; // nothing new — skip the email, leave the watermark
|
||||
continue;
|
||||
}
|
||||
const { built, failed } = await buildUserEpisodes(scope, selected);
|
||||
if (built.length === 0) {
|
||||
// Synthesis failed for all of them — don't advance the
|
||||
// watermark, so the next scan retries the same recaps.
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
const token = ensureUnsubToken(db, user);
|
||||
const message = renderDigestEmail({
|
||||
brandName: "Recaps",
|
||||
episodes: built,
|
||||
overflowCount: overflow,
|
||||
manageUrl: `${publicUrl}/`,
|
||||
unsubscribeUrl: `${publicUrl}/api/digest/unsubscribe?token=${encodeURIComponent(token)}`,
|
||||
});
|
||||
await sendMail({
|
||||
to: email,
|
||||
subject: message.subject,
|
||||
text: message.text,
|
||||
html: message.html,
|
||||
});
|
||||
// Advance the watermark only after a successful send — to the
|
||||
// newest sent recap, but never past a failed or deferred one, so
|
||||
// the next scan retries gaps instead of skipping them.
|
||||
const watermark = nextDigestWatermark(
|
||||
built.map((e) => e.createdAt),
|
||||
failed,
|
||||
);
|
||||
db.prepare("UPDATE users SET last_digest_at = ? WHERE id = ?").run(
|
||||
watermark ?? now,
|
||||
user.id,
|
||||
);
|
||||
sent++;
|
||||
console.log(
|
||||
`[digest] sent to ${maskEmail(email)} (${episodes.length} recap${episodes.length === 1 ? "" : "s"}${overflow ? `, +${overflow} more` : ""})`,
|
||||
);
|
||||
} catch (err) {
|
||||
console.warn(
|
||||
`[digest] user ${user.id} failed: ${err?.message || err}`,
|
||||
);
|
||||
skipped++;
|
||||
}
|
||||
}
|
||||
if (sent) {
|
||||
console.log(`[digest] scan complete: ${sent} sent, ${skipped} skipped`);
|
||||
}
|
||||
return { sent, skipped };
|
||||
} catch (err) {
|
||||
console.warn(`[digest] scan error: ${err?.message || err}`);
|
||||
return { skipped: "error", error: err?.message || String(err) };
|
||||
} finally {
|
||||
scanning = false;
|
||||
}
|
||||
}
|
||||
|
||||
// Start the hourly scan loop. Idempotent; self-gates inside the scan, so
|
||||
// it's safe to call whenever multi mode boots.
|
||||
export function startDigestScheduler() {
|
||||
if (scheduled) return;
|
||||
scheduled = true;
|
||||
setTimeout(() => {
|
||||
runDigestScan().catch(() => {});
|
||||
}, BOOT_DELAY_MS);
|
||||
setInterval(() => {
|
||||
runDigestScan().catch(() => {});
|
||||
}, SCAN_INTERVAL_MS);
|
||||
console.log("[digest] daily-digest scheduler started");
|
||||
}
|
||||
|
||||
// One-click unsubscribe — a public GET (no session) keyed by the per-user
|
||||
// token in the email. Mounted in index.js (multi mode) and whitelisted in
|
||||
// tenant-auth's public paths. Flips digest_enabled off; the in-app toggle
|
||||
// can turn it back on.
|
||||
export function setupDigestRoutes(app) {
|
||||
app.get("/api/digest/unsubscribe", (req, res) => {
|
||||
const token = String(req.query?.token || "").trim();
|
||||
const page = (msg) =>
|
||||
`<!doctype html><html><body style="margin:0;padding:48px 16px;background:#fafafa;font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',Roboto,sans-serif;text-align:center;color:#333;"><div style="max-width:420px;margin:0 auto;background:#fff;border-radius:8px;padding:32px;font-size:15px;line-height:1.6;">${msg}</div></body></html>`;
|
||||
if (!token) {
|
||||
return res.status(400).send(page("Invalid unsubscribe link."));
|
||||
}
|
||||
try {
|
||||
const r = getDb()
|
||||
.prepare(
|
||||
"UPDATE users SET digest_enabled = 0 WHERE digest_unsub_token = ?",
|
||||
)
|
||||
.run(token);
|
||||
if (r.changes === 0) {
|
||||
return res
|
||||
.status(404)
|
||||
.send(page("This unsubscribe link is no longer valid."));
|
||||
}
|
||||
return res.send(
|
||||
page(
|
||||
"You've been unsubscribed from the daily digest. You can turn it back on anytime in Settings.",
|
||||
),
|
||||
);
|
||||
} catch (err) {
|
||||
console.error("[digest] unsubscribe failed:", err);
|
||||
return res
|
||||
.status(500)
|
||||
.send(page("Something went wrong. Please try again later."));
|
||||
}
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user