b4fa5d7be8
Multi-mode, off by default. Each new recap is synthesized into a 1-2 paragraph overview via the relay (operator-absorbed) and cached onto the session JSON; a daily 08:00 scan emails opted-in users their fresh recaps, deduped by a per-user watermark that never skips a failed or over-cap recap. One-click tokenized unsubscribe; settings-modal toggle; admin test trigger. Bumps to 0.2.158.
427 lines
16 KiB
JavaScript
427 lines
16 KiB
JavaScript
// Daily Digest — per-episode overview synthesis (multi-mode / cloud).
|
||
//
|
||
// Phase 2 of the Daily Digest feature: turn a saved recap's stored topic
|
||
// summaries into a 1–2 paragraph overview via the relay LLM, then cache
|
||
// the result back onto the session JSON (`digestOverview`) so it's
|
||
// generated at most once per episode. The daily scan + email (phase 3)
|
||
// will call getOrCreateEpisodeOverview(); no scheduler lives here yet.
|
||
//
|
||
// Cost ownership (Q1 = operator-absorbed): the synthesis call uses the
|
||
// OPERATOR's relay identity — the same credit pool that free signed-in
|
||
// users' summaries already draw from (resolveProviderOpts with req=null
|
||
// → operator install identity). A retention email shouldn't silently
|
||
// drain the recipient's quota for recaps they already made. To bill the
|
||
// recipient instead, build the provider with their cloud identity at the
|
||
// one marked line below.
|
||
|
||
import { randomBytes } from "crypto";
|
||
import { getProvider, resolveProviderOpts } from "./providers/index.js";
|
||
import { patchSession, loadSession, listScopeSessions } from "./history.js";
|
||
import { getDb } from "./db.js";
|
||
import { sendMail, isSmtpReady } from "./smtp.js";
|
||
import { renderDigestEmail } from "./email-template.js";
|
||
import { getConfigSnapshot } from "./config.js";
|
||
|
||
// Operator-internal vocabulary the sibling relay could surface in model
|
||
// output (backend / hardware names, LAN hosts). Scrubbed before any
|
||
// digest text reaches a cloud user — the same error-boundary rule the
|
||
// rest of the app follows. This is a backstop: the synthesis input is
|
||
// the recap's own (already user-facing) topic summaries, so a leak here
|
||
// is unlikely. Kept conservative to avoid mangling legitimate prose —
|
||
// only unambiguous infra tokens and private/LAN hosts, never common
|
||
// words or public data.
|
||
const OPERATOR_TERMS = [
|
||
/\bspark[\s-]?control\b/gi,
|
||
/\bparakeet\b/gi,
|
||
/\bsortformer\b/gi,
|
||
/\btitanet\b/gi,
|
||
/\bvllm\b/gi,
|
||
];
|
||
const LAN_HOST_RE = /\bhttps?:\/\/[^\s)]*\.local\b[^\s)]*/gi;
|
||
const PRIVATE_IP_RE =
|
||
/\b(?:10|127)\.\d{1,3}\.\d{1,3}\.\d{1,3}\b|\b192\.168\.\d{1,3}\.\d{1,3}\b|\b172\.(?:1[6-9]|2\d|3[01])\.\d{1,3}\.\d{1,3}\b/g;
|
||
|
||
export function scrubOperatorStrings(text) {
|
||
if (!text) return "";
|
||
let out = String(text);
|
||
for (const re of OPERATOR_TERMS) out = out.replace(re, "");
|
||
out = out.replace(LAN_HOST_RE, "");
|
||
out = out.replace(PRIVATE_IP_RE, "");
|
||
// Tidy whitespace / orphaned punctuation the removals may have left.
|
||
return out
|
||
.replace(/[ \t]{2,}/g, " ")
|
||
.replace(/\s+([.,;:])/g, "$1")
|
||
.trim();
|
||
}
|
||
|
||
// Build the LLM prompt from a saved recap record. Pure — exported for
|
||
// testing. Uses each topic's title + summary (the chunk shape is
|
||
// { title, summary, … }); a topic with no summary still contributes its
|
||
// title so the overview knows it was covered.
|
||
export function buildOverviewPrompt(record) {
|
||
const title = (record?.title || "Untitled").trim();
|
||
const type =
|
||
record?.type === "podcast"
|
||
? "podcast episode"
|
||
: record?.type === "youtube"
|
||
? "video"
|
||
: "recording";
|
||
const topics = Array.isArray(record?.chunks) ? record.chunks : [];
|
||
const topicBlock = topics
|
||
.map((c, i) => {
|
||
const t = (c?.title || `Topic ${i + 1}`).trim();
|
||
const s = (c?.summary || "").trim();
|
||
return s ? `- ${t}: ${s}` : `- ${t}`;
|
||
})
|
||
.join("\n");
|
||
return [
|
||
`Below are the per-topic summaries of a ${type} titled "${title}".`,
|
||
"",
|
||
"Write a tight 1–2 paragraph overview (about 100–150 words) that captures " +
|
||
"the main throughline and the few most important takeaways, as if briefing " +
|
||
"a busy reader who hasn't seen it. Do not invent anything beyond the " +
|
||
"summaries below, use no headings or bullet points, and write in plain prose.",
|
||
"",
|
||
"Topic summaries:",
|
||
topicBlock,
|
||
].join("\n");
|
||
}
|
||
|
||
// Operator-identity relay provider for synthesis (operator-absorbed).
|
||
// Throws if the relay isn't configured (no install id / base URL) — the
|
||
// caller treats that as "skip this episode", not a fatal error.
|
||
function buildSynthesisProvider() {
|
||
// req=null → operator install identity. Swap in a per-recipient cloud
|
||
// identity here to bill the user instead of the operator.
|
||
const opts = resolveProviderOpts("relay", { req: null });
|
||
return getProvider("relay", opts);
|
||
}
|
||
|
||
// Synthesize (no cache) — call the relay, scrub, return the overview
|
||
// text. Throws on no-topics or an empty model result. `provider` is
|
||
// injectable for testing; defaults to the operator-identity relay.
|
||
export async function synthesizeEpisodeOverview(record, { provider } = {}) {
|
||
const topics = Array.isArray(record?.chunks) ? record.chunks : [];
|
||
if (topics.length === 0) {
|
||
throw new Error("no topic summaries to synthesize");
|
||
}
|
||
const p = provider || buildSynthesisProvider();
|
||
const prompt = buildOverviewPrompt(record);
|
||
const result = await p.analyzeText({
|
||
prompt,
|
||
retries: 1,
|
||
// Stable per-episode billing key: a retry within the relay's job
|
||
// window reuses the same credit rather than charging twice.
|
||
jobId: record?.id ? `digest-${record.id}` : undefined,
|
||
});
|
||
const text = scrubOperatorStrings(result?.text || "");
|
||
if (!text) throw new Error("empty synthesis result");
|
||
return text;
|
||
}
|
||
|
||
// Get-or-generate the cached overview. Returns { overview, cached }. On a
|
||
// cache miss it synthesizes and (unless save:false) writes the result
|
||
// back onto the session JSON so the next caller is a cache hit. The
|
||
// write-back is best-effort — a failed patch just means we re-synthesize
|
||
// next time, never a user-visible error.
|
||
export async function getOrCreateEpisodeOverview({
|
||
scope,
|
||
id,
|
||
record,
|
||
provider,
|
||
save = true,
|
||
}) {
|
||
const cached = (record?.digestOverview || "").trim();
|
||
if (cached) return { overview: cached, cached: true };
|
||
const overview = await synthesizeEpisodeOverview(record, { provider });
|
||
if (save && scope && id) {
|
||
try {
|
||
await patchSession(scope, id, { digestOverview: overview });
|
||
} catch {
|
||
// best-effort cache; ignore
|
||
}
|
||
}
|
||
return { overview, cached: false };
|
||
}
|
||
|
||
// ── Daily scan + scheduler (mirrors subscription-reminders.js) ──────────
|
||
|
||
const SEND_HOUR = 8; // 08:00 server-local — when the daily scan acts
|
||
const SCAN_INTERVAL_MS = 60 * 60 * 1000; // tick hourly; act only at SEND_HOUR
|
||
const BOOT_DELAY_MS = 2 * 60 * 1000;
|
||
// A user gets at most one digest per ~day even if the loop ticks more
|
||
// than once inside the send hour or they add content right after a send.
|
||
const MIN_RESEND_MS = 20 * 60 * 60 * 1000;
|
||
const MAX_EPISODES = 10; // cap per email; the rest become an overflow count
|
||
|
||
let scanning = false;
|
||
let scheduled = false;
|
||
|
||
// Which library scope a user's recaps live under. Mirrors
|
||
// history.js scopeForRequest: the multi-mode admin keeps the "owner"
|
||
// scope; everyone else is scoped by their user id. Pure — exported for
|
||
// testing.
|
||
export function scopeForUser(user) {
|
||
return user?.is_admin ? "owner" : user?.id;
|
||
}
|
||
|
||
// Pick the recaps created after the watermark, oldest first, capped.
|
||
// Pure — exported for testing. Returns { episodes, overflow, total }.
|
||
export function selectDigestEpisodes(sessions, watermarkMs, cap = MAX_EPISODES) {
|
||
const since = typeof watermarkMs === "number" ? watermarkMs : 0;
|
||
const fresh = (sessions || [])
|
||
.filter((s) => {
|
||
const t = new Date(s?.createdAt).getTime();
|
||
return Number.isFinite(t) && t > since;
|
||
})
|
||
.sort((a, b) => new Date(a.createdAt) - new Date(b.createdAt));
|
||
return {
|
||
episodes: fresh.slice(0, cap),
|
||
overflow: Math.max(0, fresh.length - cap),
|
||
total: fresh.length,
|
||
};
|
||
}
|
||
|
||
function maskEmail(email) {
|
||
return String(email).replace(/^(.).*(@.*)$/, "$1***$2");
|
||
}
|
||
|
||
// Mint (and persist) a user's unsubscribe token if they don't have one
|
||
// yet. Returns the token. Stable per user — re-enabling reuses it.
|
||
function ensureUnsubToken(db, user) {
|
||
if (user.digest_unsub_token) return user.digest_unsub_token;
|
||
const token = randomBytes(32).toString("base64url");
|
||
db.prepare("UPDATE users SET digest_unsub_token = ? WHERE id = ?").run(
|
||
token,
|
||
user.id,
|
||
);
|
||
return token;
|
||
}
|
||
|
||
// Build one user's digest: synthesize an overview per selected episode
|
||
// (operator-absorbed, cached). Returns { built, failed } where built are
|
||
// the episode payloads ready for the template (each carrying its source
|
||
// createdAt) and failed is the createdAt list of episodes that errored —
|
||
// the caller uses both to set a watermark that never skips a failure.
|
||
async function buildUserEpisodes(scope, selected) {
|
||
const built = [];
|
||
const failed = [];
|
||
for (const ep of selected) {
|
||
try {
|
||
const record = await loadSession(scope, ep.id);
|
||
if (!record) {
|
||
failed.push(ep.createdAt);
|
||
continue;
|
||
}
|
||
const { overview } = await getOrCreateEpisodeOverview({
|
||
scope,
|
||
id: ep.id,
|
||
record,
|
||
});
|
||
built.push({
|
||
title: ep.title,
|
||
type: ep.type,
|
||
url: ep.url,
|
||
overview,
|
||
createdAt: ep.createdAt,
|
||
});
|
||
} catch (err) {
|
||
failed.push(ep.createdAt);
|
||
console.warn(
|
||
`[digest] synthesis failed for ${scope}/${ep.id}: ${err?.message || err}`,
|
||
);
|
||
}
|
||
}
|
||
return { built, failed };
|
||
}
|
||
|
||
// The watermark to stamp after a send. Advances to the newest
|
||
// successfully-sent recap, but never past the oldest one that FAILED (so
|
||
// the next scan retries gaps) and never past un-synthesized overflow
|
||
// recaps (their createdAt is newer than any sent one, so they're picked
|
||
// up next scan too). Pure — exported for testing. Returns null when
|
||
// nothing was sent (caller should not advance). createdAt inputs are ISO
|
||
// strings; output is ms epoch.
|
||
export function nextDigestWatermark(sentCreatedAts, failedCreatedAts) {
|
||
const toMs = (x) => new Date(x).getTime();
|
||
const sent = (sentCreatedAts || []).map(toMs).filter(Number.isFinite);
|
||
if (sent.length === 0) return null;
|
||
const failed = (failedCreatedAts || []).map(toMs).filter(Number.isFinite);
|
||
const newestSent = Math.max(...sent);
|
||
const oldestFailed = failed.length ? Math.min(...failed) : Infinity;
|
||
return Math.min(newestSent, oldestFailed - 1);
|
||
}
|
||
|
||
// One scan pass. Self-gating, deduped, NEVER throws — returns a small
|
||
// summary so the scheduler stays alive. `force` bypasses the send-hour
|
||
// gate (used by the operator test trigger), not the per-user resend gate.
|
||
export async function runDigestScan({ force = false } = {}) {
|
||
// `force` bypasses the send-hour gate (operator test trigger), NOT the
|
||
// in-progress lock — a forced run alongside the scheduled tick would
|
||
// otherwise double-send to every opted-in user.
|
||
if (scanning) return { skipped: "already_running" };
|
||
scanning = true;
|
||
try {
|
||
const now = Date.now();
|
||
if (!force && new Date(now).getHours() !== SEND_HOUR) {
|
||
return { skipped: "off_hour" };
|
||
}
|
||
if (!isSmtpReady()) return { skipped: "smtp_not_ready" };
|
||
const snap = await getConfigSnapshot();
|
||
const publicUrl = (snap.recap_public_url || "").trim().replace(/\/$/, "");
|
||
if (!publicUrl) return { skipped: "public_url_not_set" };
|
||
|
||
const db = getDb();
|
||
const users = db
|
||
.prepare(
|
||
"SELECT id, email, is_admin, last_digest_at, digest_unsub_token FROM users WHERE digest_enabled = 1",
|
||
)
|
||
.all();
|
||
|
||
let sent = 0;
|
||
let skipped = 0;
|
||
for (const user of users) {
|
||
try {
|
||
const email = (user.email || "").trim();
|
||
if (!email) {
|
||
skipped++;
|
||
continue;
|
||
}
|
||
// Defensive: a row with no watermark (set via SQL, not the opt-in
|
||
// endpoint) would dump the whole backlog — start the clock now
|
||
// and pick up new recaps next scan instead.
|
||
if (typeof user.last_digest_at !== "number") {
|
||
db.prepare("UPDATE users SET last_digest_at = ? WHERE id = ?").run(
|
||
now,
|
||
user.id,
|
||
);
|
||
skipped++;
|
||
continue;
|
||
}
|
||
if (now - user.last_digest_at < MIN_RESEND_MS) {
|
||
skipped++;
|
||
continue;
|
||
}
|
||
const scope = scopeForUser(user);
|
||
if (!scope) {
|
||
// No usable id (shouldn't happen for a real row) — skip rather
|
||
// than read an "undefined" scope dir.
|
||
skipped++;
|
||
continue;
|
||
}
|
||
const sessions = await listScopeSessions(scope);
|
||
const { episodes: selected, overflow } = selectDigestEpisodes(
|
||
sessions,
|
||
user.last_digest_at,
|
||
MAX_EPISODES,
|
||
);
|
||
if (selected.length === 0) {
|
||
skipped++; // nothing new — skip the email, leave the watermark
|
||
continue;
|
||
}
|
||
const { built, failed } = await buildUserEpisodes(scope, selected);
|
||
if (built.length === 0) {
|
||
// Synthesis failed for all of them — don't advance the
|
||
// watermark, so the next scan retries the same recaps.
|
||
skipped++;
|
||
continue;
|
||
}
|
||
const token = ensureUnsubToken(db, user);
|
||
const message = renderDigestEmail({
|
||
brandName: "Recaps",
|
||
episodes: built,
|
||
overflowCount: overflow,
|
||
manageUrl: `${publicUrl}/`,
|
||
unsubscribeUrl: `${publicUrl}/api/digest/unsubscribe?token=${encodeURIComponent(token)}`,
|
||
});
|
||
await sendMail({
|
||
to: email,
|
||
subject: message.subject,
|
||
text: message.text,
|
||
html: message.html,
|
||
});
|
||
// Advance the watermark only after a successful send — to the
|
||
// newest sent recap, but never past a failed or deferred one, so
|
||
// the next scan retries gaps instead of skipping them.
|
||
const watermark = nextDigestWatermark(
|
||
built.map((e) => e.createdAt),
|
||
failed,
|
||
);
|
||
db.prepare("UPDATE users SET last_digest_at = ? WHERE id = ?").run(
|
||
watermark ?? now,
|
||
user.id,
|
||
);
|
||
sent++;
|
||
console.log(
|
||
`[digest] sent to ${maskEmail(email)} (${episodes.length} recap${episodes.length === 1 ? "" : "s"}${overflow ? `, +${overflow} more` : ""})`,
|
||
);
|
||
} catch (err) {
|
||
console.warn(
|
||
`[digest] user ${user.id} failed: ${err?.message || err}`,
|
||
);
|
||
skipped++;
|
||
}
|
||
}
|
||
if (sent) {
|
||
console.log(`[digest] scan complete: ${sent} sent, ${skipped} skipped`);
|
||
}
|
||
return { sent, skipped };
|
||
} catch (err) {
|
||
console.warn(`[digest] scan error: ${err?.message || err}`);
|
||
return { skipped: "error", error: err?.message || String(err) };
|
||
} finally {
|
||
scanning = false;
|
||
}
|
||
}
|
||
|
||
// Start the hourly scan loop. Idempotent; self-gates inside the scan, so
|
||
// it's safe to call whenever multi mode boots.
|
||
export function startDigestScheduler() {
|
||
if (scheduled) return;
|
||
scheduled = true;
|
||
setTimeout(() => {
|
||
runDigestScan().catch(() => {});
|
||
}, BOOT_DELAY_MS);
|
||
setInterval(() => {
|
||
runDigestScan().catch(() => {});
|
||
}, SCAN_INTERVAL_MS);
|
||
console.log("[digest] daily-digest scheduler started");
|
||
}
|
||
|
||
// One-click unsubscribe — a public GET (no session) keyed by the per-user
|
||
// token in the email. Mounted in index.js (multi mode) and whitelisted in
|
||
// tenant-auth's public paths. Flips digest_enabled off; the in-app toggle
|
||
// can turn it back on.
|
||
export function setupDigestRoutes(app) {
|
||
app.get("/api/digest/unsubscribe", (req, res) => {
|
||
const token = String(req.query?.token || "").trim();
|
||
const page = (msg) =>
|
||
`<!doctype html><html><body style="margin:0;padding:48px 16px;background:#fafafa;font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',Roboto,sans-serif;text-align:center;color:#333;"><div style="max-width:420px;margin:0 auto;background:#fff;border-radius:8px;padding:32px;font-size:15px;line-height:1.6;">${msg}</div></body></html>`;
|
||
if (!token) {
|
||
return res.status(400).send(page("Invalid unsubscribe link."));
|
||
}
|
||
try {
|
||
const r = getDb()
|
||
.prepare(
|
||
"UPDATE users SET digest_enabled = 0 WHERE digest_unsub_token = ?",
|
||
)
|
||
.run(token);
|
||
if (r.changes === 0) {
|
||
return res
|
||
.status(404)
|
||
.send(page("This unsubscribe link is no longer valid."));
|
||
}
|
||
return res.send(
|
||
page(
|
||
"You've been unsubscribed from the daily digest. You can turn it back on anytime in Settings.",
|
||
),
|
||
);
|
||
} catch (err) {
|
||
console.error("[digest] unsubscribe failed:", err);
|
||
return res
|
||
.status(500)
|
||
.send(page("Something went wrong. Please try again later."));
|
||
}
|
||
});
|
||
}
|