// Subscription storage + discovery helpers, keyed by scope. // // Today the subscription feature is operator-only in multi mode (gated in // license-middleware), so the only active scope is "owner". But the storage, // dedup, check-enumeration, and migration here are all scope-parameterized, // so flipping the gate to per-tenant (see docs/per-tenant-subscriptions-plan.md) // is a matter of passing each user's scope instead of "owner" — no storage // rework. Step 4 of that plan (the background processor acting as each // owning user) is the only remaining piece and needs on-device testing. // // Each scope's state lives under scopeDir(scope) = history//: // subscriptions.json — the user's channel/podcast subscriptions // auto-queue.json — discovered videos awaiting approval / processing // skip-list.json — videoIds the user declined (never re-offer) // seen-list.json — videoIds already offered (don't re-surface) import fs from "fs/promises"; import path from "path"; import { getScopeHistoryDir, getHistoryDir, ROOT_SIDECARS } from "./history.js"; // ── Per-scope file paths ───────────────────────────────────────────────── function subsPath(scope) { return path.join(getScopeHistoryDir(scope), "subscriptions.json"); } function skipPath(scope) { return path.join(getScopeHistoryDir(scope), "skip-list.json"); } function seenPath(scope) { return path.join(getScopeHistoryDir(scope), "seen-list.json"); } function autoQueuePath(scope) { return path.join(getScopeHistoryDir(scope), "auto-queue.json"); } async function ensureScopeDir(scope) { await fs.mkdir(getScopeHistoryDir(scope), { recursive: true }).catch(() => {}); } // Serialize read-modify-write on a given file path so two concurrent // handlers can't each load the same snapshot, mutate, and have the second // write clobber the first. Keyed by absolute path → naturally per-scope. const _fileLocks = new Map(); function withFileLock(key, fn) { const prev = _fileLocks.get(key) || Promise.resolve(); const next = prev.then(fn, fn); // run fn whether prev resolved or rejected _fileLocks.set( key, next.catch(() => {}), ); return next; } // ── Subscriptions ──────────────────────────────────────────────────────── export async function loadSubscriptions(scope) { try { return ( JSON.parse(await fs.readFile(subsPath(scope), "utf-8")).subscriptions || [] ); } catch { return []; } } export async function saveSubscriptions(scope, subs) { return withFileLock(subsPath(scope), async () => { await ensureScopeDir(scope); await fs.writeFile( subsPath(scope), JSON.stringify({ subscriptions: subs }, null, 2), ); }); } // ── Skip list (declined videos — never re-add) ─────────────────────────── export async function loadSkipList(scope) { try { return new Set( JSON.parse(await fs.readFile(skipPath(scope), "utf-8")).videoIds || [], ); } catch { return new Set(); } } export async function addToSkipList(scope, videoId) { return withFileLock(skipPath(scope), async () => { const ids = await loadSkipList(scope); ids.add(videoId); await ensureScopeDir(scope); await fs.writeFile(skipPath(scope), JSON.stringify({ videoIds: [...ids] })); }); } // ── Seen list (already offered — don't re-surface) ─────────────────────── export async function loadSeenList(scope) { try { return new Set( JSON.parse(await fs.readFile(seenPath(scope), "utf-8")).videoIds || [], ); } catch { return new Set(); } } export async function addToSeenList(scope, videoIds) { return withFileLock(seenPath(scope), async () => { const seen = await loadSeenList(scope); for (const id of videoIds) seen.add(id); await ensureScopeDir(scope); await fs.writeFile(seenPath(scope), JSON.stringify({ videoIds: [...seen] })); }); } // ── Auto-queue ─────────────────────────────────────────────────────────── // Read-only load. For mutations use mutateAutoQueue so the read-modify-write // is atomic per scope (the old in-memory global array gave this implicitly). export async function loadAutoQueue(scope) { try { return JSON.parse(await fs.readFile(autoQueuePath(scope), "utf-8")).items || []; } catch { return []; } } export async function saveAutoQueue(scope, items) { return withFileLock(autoQueuePath(scope), async () => { await ensureScopeDir(scope); await fs.writeFile( autoQueuePath(scope), JSON.stringify({ items }, null, 2), ); }); } // Atomic read-modify-write. `fn(items)` may mutate `items` in place and/or // return a replacement array. Returns the saved array. Use for every // status change / add / remove so concurrent handlers don't lose updates. export async function mutateAutoQueue(scope, fn) { return withFileLock(autoQueuePath(scope), async () => { let items = []; try { items = JSON.parse(await fs.readFile(autoQueuePath(scope), "utf-8")).items || []; } catch {} const result = await fn(items); const toSave = Array.isArray(result) ? result : items; await ensureScopeDir(scope); await fs.writeFile( autoQueuePath(scope), JSON.stringify({ items: toSave }, null, 2), ); return toSave; }); } // ── Dedup ──────────────────────────────────────────────────────────────── // All videoIds already summarized in a scope's library. // // CRITICAL: summaries live under scopeDir(scope) = history//, NOT the // top-level history dir. Scanning the top level (the historical bug) found // zero processed videos, so the subscription check never deduped against the // library and re-queued already-summarized videos every run. export async function getProcessedVideoIds(scope = "owner") { const ids = new Set(); const dir = getScopeHistoryDir(scope); try { const files = await fs.readdir(dir); for (const file of files.filter( (f) => f.endsWith(".json") && !ROOT_SIDECARS.has(f), )) { try { const raw = await fs.readFile(path.join(dir, file), "utf-8"); const data = JSON.parse(raw); if (data.videoId) ids.add(data.videoId); } catch {} } } catch {} return ids; } // Pure dedup predicate: is this discovered video already accounted for? // Known = in the library (processed), already queued, declined (skip), or // offered before (seen). Used identically by the podcast + YouTube branches. export function isKnownVideo( id, { processedIds, queuedIds, skippedIds, seenIds } = {}, ) { return !!( (processedIds && processedIds.has(id)) || (queuedIds && queuedIds.has(id)) || (skippedIds && skippedIds.has(id)) || (seenIds && seenIds.has(id)) ); } // ── Scope enumeration (for the periodic check loop) ────────────────────── // Scopes that have at least one subscription. Always includes "owner" (the // operator). Behind the operator-only gate this returns just ["owner"]; when // per-tenant subscriptions ship it picks up each tenant scope automatically. export async function listSubscriptionScopes() { const root = getHistoryDir(); const scopes = new Set(["owner"]); try { const entries = await fs.readdir(root, { withFileTypes: true }); for (const e of entries) { if (!e.isDirectory() || e.name === "owner") continue; try { const subs = JSON.parse( await fs.readFile( path.join(root, e.name, "subscriptions.json"), "utf-8", ), ).subscriptions || []; if (subs.length > 0) scopes.add(e.name); } catch {} } } catch {} return [...scopes]; } // Scopes that have a non-empty auto-queue. The background processor walks // these to find approved items across all owners (a scope can have queued // items even after its subscriptions were deleted, so this is a superset of // listSubscriptionScopes for processing purposes). export async function listAutoQueueScopes() { const root = getHistoryDir(); const scopes = new Set(["owner"]); try { const entries = await fs.readdir(root, { withFileTypes: true }); for (const e of entries) { if (!e.isDirectory() || e.name === "owner") continue; try { const items = JSON.parse( await fs.readFile( path.join(root, e.name, "auto-queue.json"), "utf-8", ), ).items || []; if (items.length > 0) scopes.add(e.name); } catch {} } } catch {} return [...scopes]; } // ── Migration: history-root globals → owner scope (one-time, idempotent) ── // The pre-0.2.147 layout kept subscription state at the history root (one // install-wide store). Move it under the operator's own scope so the // storage is uniformly per-scope. Only moves a file if the source exists // and the destination doesn't (never clobbers). const SUB_FILES = [ "subscriptions.json", "auto-queue.json", "skip-list.json", "seen-list.json", ]; export async function migrateGlobalSubscriptionsToOwner() { const root = getHistoryDir(); const ownerDir = getScopeHistoryDir("owner"); let moved = 0; for (const f of SUB_FILES) { const from = path.join(root, f); const to = path.join(ownerDir, f); try { await fs.access(from); } catch { continue; // source missing → nothing to move } try { await fs.access(to); continue; // dest already exists → don't clobber } catch {} try { await fs.mkdir(ownerDir, { recursive: true }); await fs.rename(from, to); moved++; } catch { // rename can fail across devices — fall back to copy + unlink. try { await fs.mkdir(ownerDir, { recursive: true }); await fs.copyFile(from, to); await fs.unlink(from); moved++; } catch {} } } return moved; }