138 lines
4.7 KiB
JavaScript
138 lines
4.7 KiB
JavaScript
"use strict";
|
|
Object.defineProperty(exports, "__esModule", { value: true });
|
|
exports.Watchable = void 0;
|
|
const AbortedError_1 = require("./AbortedError");
|
|
const deepEqual_1 = require("./deepEqual");
|
|
const Drop_1 = require("./Drop");
|
|
class Watchable {
|
|
constructor(effects, options) {
|
|
this.effects = effects;
|
|
this.mapFn = options?.map ?? ((a) => a);
|
|
this.eqFn = options?.eq ?? ((a, b) => (0, deepEqual_1.deepEqual)(a, b));
|
|
}
|
|
/**
|
|
* Produce a stream of raw values. Default implementation uses fetch() with
|
|
* effects callback in a loop. Override for custom subscription mechanisms
|
|
* (e.g. fs.watch).
|
|
*/
|
|
async *produce(abort) {
|
|
const resolveCell = { resolve: () => { } };
|
|
this.effects.onLeaveContext(() => {
|
|
resolveCell.resolve();
|
|
});
|
|
abort.addEventListener('abort', () => resolveCell.resolve());
|
|
while (this.effects.isInContext && !abort.aborted) {
|
|
let callback = () => { };
|
|
const waitForNext = new Promise((resolve) => {
|
|
callback = resolve;
|
|
resolveCell.resolve = resolve;
|
|
});
|
|
yield await this.fetch(() => callback());
|
|
await waitForNext;
|
|
}
|
|
}
|
|
/**
|
|
* Lifecycle hook called when const() registers a subscription.
|
|
* Return a cleanup function to be called when the subscription ends.
|
|
* Override for side effects like FileHelper's consts tracking.
|
|
*/
|
|
onConstRegistered(_value) { }
|
|
/**
|
|
* Internal generator that maps raw values and deduplicates using eq.
|
|
*/
|
|
async *watchGen(abort) {
|
|
let prev = null;
|
|
for await (const raw of this.produce(abort)) {
|
|
if (abort.aborted)
|
|
return;
|
|
const mapped = this.mapFn(raw);
|
|
if (!prev || !this.eqFn(prev.value, mapped)) {
|
|
prev = { value: mapped };
|
|
yield mapped;
|
|
}
|
|
}
|
|
}
|
|
/**
|
|
* Returns the value. Reruns the context from which it has been called if the underlying value changes
|
|
*/
|
|
async const() {
|
|
const abort = new AbortController();
|
|
const gen = this.watchGen(abort.signal);
|
|
const res = await gen.next();
|
|
const value = res.value;
|
|
if (this.effects.constRetry) {
|
|
const constRetry = this.effects.constRetry;
|
|
const cleanup = this.onConstRegistered(value);
|
|
gen.next().then((a) => {
|
|
abort.abort();
|
|
cleanup?.();
|
|
if (!a.done) {
|
|
constRetry();
|
|
}
|
|
}, () => {
|
|
abort.abort();
|
|
cleanup?.();
|
|
});
|
|
}
|
|
else {
|
|
abort.abort();
|
|
}
|
|
return value;
|
|
}
|
|
/**
|
|
* Returns the value. Does nothing if the value changes
|
|
*/
|
|
async once() {
|
|
return this.mapFn(await this.fetch());
|
|
}
|
|
/**
|
|
* Watches the value. Returns an async iterator that yields whenever the value changes
|
|
*/
|
|
watch(abort) {
|
|
const ctrl = new AbortController();
|
|
abort?.addEventListener('abort', () => ctrl.abort());
|
|
return Drop_1.DropGenerator.of((async function* (gen) {
|
|
yield* gen;
|
|
throw new AbortedError_1.AbortedError();
|
|
})(this.watchGen(ctrl.signal)), () => ctrl.abort());
|
|
}
|
|
/**
|
|
* Watches the value. Takes a custom callback function to run whenever the value changes
|
|
*/
|
|
onChange(callback) {
|
|
;
|
|
(async () => {
|
|
const ctrl = new AbortController();
|
|
for await (const value of this.watchGen(ctrl.signal)) {
|
|
try {
|
|
const res = await callback(value);
|
|
if (res.cancel) {
|
|
ctrl.abort();
|
|
break;
|
|
}
|
|
}
|
|
catch (e) {
|
|
console.error(`callback function threw an error @ ${this.label}.onChange`, e);
|
|
}
|
|
}
|
|
})()
|
|
.catch((e) => callback(undefined, e))
|
|
.catch((e) => console.error(`callback function threw an error @ ${this.label}.onChange`, e));
|
|
}
|
|
/**
|
|
* Watches the value. Returns when the predicate is true
|
|
*/
|
|
waitFor(pred) {
|
|
const ctrl = new AbortController();
|
|
return Drop_1.DropPromise.of(Promise.resolve().then(async () => {
|
|
for await (const next of this.watchGen(ctrl.signal)) {
|
|
if (pred(next)) {
|
|
return next;
|
|
}
|
|
}
|
|
throw new AbortedError_1.AbortedError();
|
|
}), () => ctrl.abort());
|
|
}
|
|
}
|
|
exports.Watchable = Watchable;
|
|
//# sourceMappingURL=Watchable.js.map
|