"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.Watchable = void 0; const AbortedError_1 = require("./AbortedError"); const deepEqual_1 = require("./deepEqual"); const Drop_1 = require("./Drop"); class Watchable { constructor(effects, options) { this.effects = effects; this.mapFn = options?.map ?? ((a) => a); this.eqFn = options?.eq ?? ((a, b) => (0, deepEqual_1.deepEqual)(a, b)); } /** * Produce a stream of raw values. Default implementation uses fetch() with * effects callback in a loop. Override for custom subscription mechanisms * (e.g. fs.watch). */ async *produce(abort) { const resolveCell = { resolve: () => { } }; this.effects.onLeaveContext(() => { resolveCell.resolve(); }); abort.addEventListener('abort', () => resolveCell.resolve()); while (this.effects.isInContext && !abort.aborted) { let callback = () => { }; const waitForNext = new Promise((resolve) => { callback = resolve; resolveCell.resolve = resolve; }); yield await this.fetch(() => callback()); await waitForNext; } } /** * Lifecycle hook called when const() registers a subscription. * Return a cleanup function to be called when the subscription ends. * Override for side effects like FileHelper's consts tracking. */ onConstRegistered(_value) { } /** * Internal generator that maps raw values and deduplicates using eq. */ async *watchGen(abort) { let prev = null; for await (const raw of this.produce(abort)) { if (abort.aborted) return; const mapped = this.mapFn(raw); if (!prev || !this.eqFn(prev.value, mapped)) { prev = { value: mapped }; yield mapped; } } } /** * Returns the value. Reruns the context from which it has been called if the underlying value changes */ async const() { const abort = new AbortController(); const gen = this.watchGen(abort.signal); const res = await gen.next(); const value = res.value; if (this.effects.constRetry) { const constRetry = this.effects.constRetry; const cleanup = this.onConstRegistered(value); gen.next().then((a) => { abort.abort(); cleanup?.(); if (!a.done) { constRetry(); } }, () => { abort.abort(); cleanup?.(); }); } else { abort.abort(); } return value; } /** * Returns the value. Does nothing if the value changes */ async once() { return this.mapFn(await this.fetch()); } /** * Watches the value. Returns an async iterator that yields whenever the value changes */ watch(abort) { const ctrl = new AbortController(); abort?.addEventListener('abort', () => ctrl.abort()); return Drop_1.DropGenerator.of((async function* (gen) { yield* gen; throw new AbortedError_1.AbortedError(); })(this.watchGen(ctrl.signal)), () => ctrl.abort()); } /** * Watches the value. Takes a custom callback function to run whenever the value changes */ onChange(callback) { ; (async () => { const ctrl = new AbortController(); for await (const value of this.watchGen(ctrl.signal)) { try { const res = await callback(value); if (res.cancel) { ctrl.abort(); break; } } catch (e) { console.error(`callback function threw an error @ ${this.label}.onChange`, e); } } })() .catch((e) => callback(undefined, e)) .catch((e) => console.error(`callback function threw an error @ ${this.label}.onChange`, e)); } /** * Watches the value. Returns when the predicate is true */ waitFor(pred) { const ctrl = new AbortController(); return Drop_1.DropPromise.of(Promise.resolve().then(async () => { for await (const next of this.watchGen(ctrl.signal)) { if (pred(next)) { return next; } } throw new AbortedError_1.AbortedError(); }), () => ctrl.abort()); } } exports.Watchable = Watchable; //# sourceMappingURL=Watchable.js.map