Files
proof-of-work/proof-of-work/lib/ai/generationRunner.ts
T
Keysat 2b0abad68e
CI / proof-of-work (Next.js app) (push) Waiting to run
CI / start9/0.4 (StartOS package code) (push) Waiting to run
v1.2.0:6 — AI "generate today's workout" from a brain-dump
Add a single-session AI flow alongside program generation: describe a
workout in plain words and get a ready-to-log workout back — exercises
with suggested weights, target reps, and set counts grounded in the
user's recent history. The suggestion can be inline-edited or refined
by sending a follow-up instruction back to the model, then "Use this
workout" pre-fills the normal New Workout form (nothing persists until
the user saves through the regular path).

Why reuse, not fork: the existing program-generation spine (detached
background runner, SSE streaming, lenient-JSON preview, 5 providers,
history context, library name->id mapping) already does the hard parts.
A new AIGeneration.kind discriminant ("program" | "workout", default
"program" via boot-time guarded ALTER) selects the parser and keeps the
ephemeral workout rows out of the program-shaped AI history. Refine is a
fresh generation seeded with the prior suggestion (validated through the
same schema before it re-enters the prompt).

Hand-off is sessionStorage -> /main/workouts/new?from=ai -> AiWorkoutPrefill,
which expands each suggestion into N sets and maps effort by cardio-ness
(Gear for cardio, RPE for strength). EditWorkoutData.id is now optional so
the prefill CREATEs rather than PATCHing a nonexistent id. The AI suggests
each weight in that exercise's effective logging unit (the library JSON
carries a per-exercise unit) so the stored number and unit never diverge.

Built + sideloaded to immense-voyage.local as 1.2.0:6; on-box ALTER and
non-root launch confirmed via start-cli. tsc clean (app + packaging),
251 tests pass, next build + s9pk build succeed.
2026-06-19 10:59:12 -05:00

296 lines
9.1 KiB
TypeScript

/**
* v1.1.0:4 — Background-friendly generation runner.
*
* Splits the work in two:
*
* 1. The HTTP route (api/ai/generate) calls `kickoffGeneration` to
* create the pending AIGeneration row, validate config, and start
* the model stream in the background. It returns immediately with
* the new row id; the runner continues even after the request is
* cancelled (because we use waitUntil-style pattern via a
* detached promise that owns its own AbortController).
*
* 2. The HTTP route also opens an SSE stream that subscribes to a
* per-generation in-memory event bus, so the live UI sees text
* deltas as they arrive — same UX as before. If the client
* navigates away the stream closes, but the runner keeps writing
* progress to the database; a poll endpoint returns whatever it
* has.
*
* The in-memory bus is a plain Map keyed by generation id. It only
* lives in this Node process; SSE clients only receive deltas from
* a runner started in the SAME process. That's fine because:
* - Single-process Next.js standalone (the StartOS deployment).
* - Cross-process resume goes through the database (poll endpoint
* reads `progressText`).
*
* Lifecycle:
* pending → runner created the row, model stream started
* completed → runner parsed the JSON successfully (parsedProgram set)
* failed → provider error or parse failure (errorMessage set)
* applied → user clicked Apply, Program created (handled in apply route)
*/
import type { PrismaClient } from '@prisma/client';
import { getProvider } from './providers';
import { parseAIProgram } from './programSchema';
import { parseAIWorkout } from './workoutSchema';
export interface GenerationDelta {
type: 'text' | 'usage' | 'complete' | 'error';
/** For text */
delta?: string;
/** For usage / complete */
tokensIn?: number;
tokensOut?: number;
/** For complete */
parsedOk?: boolean;
errorMessage?: string;
durationMs?: number;
}
interface BusEntry {
/** Subscribers waiting for the next chunk. */
subscribers: Set<(d: GenerationDelta) => void>;
/** Buffered deltas for late-joining subscribers (so a poll-then-subscribe
* client doesn't miss the first few tokens). Bounded — we drop oldest
* if it grows past the limit. */
buffer: GenerationDelta[];
/** True once the runner emits its terminal `complete` chunk. */
finished: boolean;
}
const BUFFER_MAX = 5_000;
const bus = new Map<string, BusEntry>();
function ensureEntry(id: string): BusEntry {
let entry = bus.get(id);
if (!entry) {
entry = { subscribers: new Set(), buffer: [], finished: false };
bus.set(id, entry);
}
return entry;
}
function emit(id: string, d: GenerationDelta) {
const entry = ensureEntry(id);
entry.buffer.push(d);
if (entry.buffer.length > BUFFER_MAX) entry.buffer.shift();
for (const fn of entry.subscribers) {
try {
fn(d);
} catch {
/* subscriber teardown handles its own errors */
}
}
if (d.type === 'complete' || d.type === 'error') {
entry.finished = true;
// Schedule cleanup after a grace period so reconnecting clients can
// catch the tail. 60s is enough for a refresh round-trip.
setTimeout(() => bus.delete(id), 60_000).unref?.();
}
}
/**
* Subscribe to deltas for a generation. Returns an unsubscribe.
* `replay: true` first sends the entire buffer to the new subscriber
* (used by the SSE route — late-joining tabs get the full stream).
*/
export function subscribe(
id: string,
fn: (d: GenerationDelta) => void,
replay = true,
): () => void {
const entry = ensureEntry(id);
if (replay) for (const d of entry.buffer) fn(d);
if (entry.finished) {
// Already done — caller will see all buffered events; nothing more.
return () => {};
}
entry.subscribers.add(fn);
return () => entry.subscribers.delete(fn);
}
export interface KickoffOpts {
prisma: PrismaClient;
userId: string;
/** "program" (multi-week) or "workout" (single day). Selects the
* output parser and is persisted on the row. */
kind: 'program' | 'workout';
templateId: string | null;
templateName: string | null;
userInput: string;
systemPrompt: string;
userPrompt: string;
provider: string;
model: string;
apiKey: string | null;
baseUrl: string | null;
}
/**
* Create the AIGeneration row and start the model stream in the
* background. Returns the new row's id; the caller is expected to
* subscribe via `subscribe(id, fn)` for live deltas (or just rely
* on database polling).
*
* The runner outlives the originating request — it owns its own
* AbortController which is NOT linked to the request signal, so
* navigating away from the Generate page does NOT cancel it.
*/
export async function kickoffGeneration(opts: KickoffOpts): Promise<string> {
const generation = await opts.prisma.aIGeneration.create({
data: {
userId: opts.userId,
kind: opts.kind,
templateId: opts.templateId,
templateName: opts.templateName,
userInput: opts.userInput,
systemPrompt: opts.systemPrompt,
userPrompt: opts.userPrompt,
provider: opts.provider,
model: opts.model,
status: 'pending',
},
});
// Detach: we want this to keep going if the originating request is
// aborted. Standard Node + Next.js standalone behavior — the runner
// holds a strong reference via `bus` so it won't be GC'd mid-flight.
void runGeneration(generation.id, opts).catch((e) => {
// Last-resort safety net; the runner already logs/persists errors,
// but if even that throws we want to know.
console.error('[generation runner] uncaught:', e);
emit(generation.id, {
type: 'error',
errorMessage: `Runner crashed: ${(e as Error).message}`,
});
});
return generation.id;
}
/** How often we flush `progressText` to the database during streaming.
* Trade-off: too frequent = SQLite write churn; too slow = poll-only
* clients see big jumps. 750ms feels right — perceptibly live without
* hammering the WAL. */
const PROGRESS_FLUSH_MS = 750;
async function runGeneration(generationId: string, opts: KickoffOpts) {
const t0 = Date.now();
const provider = getProvider(opts.provider);
if (!provider) {
await opts.prisma.aIGeneration.update({
where: { id: generationId },
data: {
status: 'failed',
errorMessage: `Unknown provider: ${opts.provider}`,
durationMs: Date.now() - t0,
},
});
emit(generationId, {
type: 'error',
errorMessage: `Unknown provider: ${opts.provider}`,
});
return;
}
const ctrl = new AbortController();
let raw = '';
let tokensIn: number | undefined;
let tokensOut: number | undefined;
let providerError: string | null = null;
// Periodic progress flush.
let lastFlushAt = 0;
const maybeFlush = async (force = false) => {
const now = Date.now();
if (!force && now - lastFlushAt < PROGRESS_FLUSH_MS) return;
lastFlushAt = now;
try {
await opts.prisma.aIGeneration.update({
where: { id: generationId },
data: { progressText: raw },
});
} catch {
/* writes can fail under contention; we'll catch up next tick */
}
};
try {
for await (const chunk of provider.generate({
apiKey: opts.apiKey,
baseUrl: opts.baseUrl,
model: opts.model,
systemPrompt: opts.systemPrompt,
userPrompt: opts.userPrompt,
signal: ctrl.signal,
})) {
if (chunk.type === 'text') {
raw += chunk.delta;
emit(generationId, { type: 'text', delta: chunk.delta });
await maybeFlush();
} else if (chunk.type === 'usage') {
tokensIn = chunk.tokensIn;
tokensOut = chunk.tokensOut;
emit(generationId, {
type: 'usage',
tokensIn,
tokensOut,
});
} else if (chunk.type === 'error') {
providerError = chunk.message;
}
}
} catch (e) {
providerError = (e as Error).message;
}
// Final flush + parse.
await maybeFlush(true);
let parsedOk = false;
let parsedJson: string | null = null;
let parseErr: string | null = null;
if (!providerError && raw) {
const r =
opts.kind === 'workout' ? parseAIWorkout(raw) : parseAIProgram(raw);
if (r.ok) {
parsedOk = true;
parsedJson = JSON.stringify('workout' in r ? r.workout : r.program);
} else {
parseErr = r.reason;
}
}
const status = providerError ? 'failed' : parsedOk ? 'completed' : 'failed';
const errorMessage =
providerError ?? (parsedOk ? null : parseErr ?? 'Empty response');
const durationMs = Date.now() - t0;
try {
await opts.prisma.aIGeneration.update({
where: { id: generationId },
data: {
rawResponse: raw || null,
parsedProgram: parsedJson,
tokensIn: tokensIn ?? null,
tokensOut: tokensOut ?? null,
durationMs,
status,
errorMessage,
},
});
} catch (e) {
console.error('[generation runner] final update failed:', e);
}
emit(generationId, {
type: 'complete',
parsedOk,
errorMessage: errorMessage ?? undefined,
tokensIn,
tokensOut,
durationMs,
});
}