base2020/src/utilities/Callbag.ts

217 lines
8.3 KiB
TypeScript

/**
* The MIT License (MIT)
*
* Copyright (c) 2018 André Staltz(staltz.com)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
import { Callbag, DATA, END, Sink, Source, START } from "callbag";
// vendoring callbag utilities for now until the typings improve upstream
export function interval(period: number): Callbag<never, number> {
return (start: START | DATA | END, data?: Sink<number> | number) => {
if (start !== 0) return;
const sink = data as Sink<number>;
let i = 0;
const id = setInterval(() => {
sink(1, i++);
}, period);
sink(0, (type: START | DATA | END) => {
if (type === 2) {
clearInterval(id);
}
});
};
}
export function makeSubject<T>(): Callbag<T, T> {
const sinks: Sink<T>[] = [];
return (type: START | DATA | END, data?: Sink<T> | T) => {
if (type === 0) {
const sink = data as Sink<T>;
sinks.push(sink);
sink(0, (t: START | DATA | END) => {
if (t === 2) {
const i = sinks.indexOf(sink);
if (i > -1) sinks.splice(i, 1);
}
});
} else {
const zinkz = sinks.slice(0);
for (let i = 0, n = zinkz.length, sink; i < n; i++) {
sink = zinkz[i];
if (sinks.indexOf(sink) > -1) {
sink(type as 1 & 2, data);
}
}
}
};
}
// modified to never send talkbacks up the chain to avoid confusing subject sources by mistake
export function merge<T>(...sources: Callbag<never, T>[]): Source<T> {
return (start: START | DATA | END, data?: Callbag<T, void> | void) => {
if (start !== 0) return;
const sink = data as Callbag<T, void>;
const n = sources.length;
const sourceTalkbacks: (Sink<never> | undefined)[] = new Array(n);
let startCount = 0;
let endCount = 0;
let ended = false;
const talkback = (t: START | DATA | END, d?: Callbag<T, void> | void) => {
if (t === 2) ended = true;
//for (let i = 0; i < n; i++) sourceTalkbacks[i]?.(t as 0 & 1 & 2, d);
};
for (let i = 0; i < n; i++) {
if (ended) return;
sources[i](0, (t: START | DATA | END, d?: Callbag<never, T> | T) => {
if (t === 0) {
sourceTalkbacks[i] = d as Callbag<never, T>;
if (++startCount === 1) sink(0, talkback);
} else if (t === 2 && d) {
ended = true;
for (let j = 0; j < n; j++) {
if (j !== i) sourceTalkbacks[j]?.(2);
}
sink(2, d);
} else if (t === 2) {
sourceTalkbacks[i] = void 0;
if (++endCount === n) sink(2);
} else {
sink(t as 0 & 1 & 2, d);
}
});
}
};
}
export function lazy<T, TB>(f: () => T): Callbag<TB, T> {
return (start: START | DATA | END, data?: Callbag<T, TB> | TB) => {
if (start === 0) {
const sink = data as Callbag<T, TB>;
let unsubed = false;
sink(0, (type: START | DATA | END) => {
if (type === 2) unsubed = true;
});
sink(1, f());
if (!unsubed) sink(2);
}
};
}
// callbag-flatten, but pulling sources has been disabled in favor of passing talkback signals to the current source
export function flattenLatest<T, TB>(source: Callbag<TB, Callbag<TB, T>>): Callbag<TB, T> {
return (start: START | DATA | END, data?: Callbag<T, TB> | TB) => {
if (start !== 0) return;
const sink = data as Callbag<T, TB>;
let outerEnded = false;
let outerTalkback: Callbag<TB, T>;
let innerTalkback: Callbag<TB, T> | undefined;
function talkback(t: START | DATA | END, d?: Callbag<T, TB> | TB) {
if (t === 1) {
//(innerTalkback || outerTalkback)(1, d as TB);
innerTalkback?.(1, d as TB);
}
if (t === 2) {
innerTalkback?.(2);
innerTalkback = void 0;
outerTalkback(2);
}
}
source(0, (T: START | DATA | END, D?: Callbag<TB, Callbag<TB, T>> | Callbag<TB, T>) => {
if (T === 0) {
outerTalkback = D as Callbag<TB, Callbag<TB, T>>;
sink(0, talkback);
} else if (T === 1) {
const innerSource = D as Callbag<TB, T>;
innerTalkback?.(2);
innerTalkback = void 0;
innerSource(0, (t: START | DATA | END, d?: Callbag<TB, T> | T) => {
if (t === 0) {
innerTalkback = d as Callbag<TB, T>;
//innerTalkback(1);
} else if (t === 1) {
sink(1, d as T);
}
else if (t === 2 && d) {
outerTalkback(2);
sink(2, d);
} else if (t === 2) {
if (outerEnded) {
sink(2);
}
else {
innerTalkback = void 0;
//outerTalkback(1);
}
}
});
} else if (T === 2 && D) {
innerTalkback?.(2);
innerTalkback = void 0;
sink(2, D);
} else if (T === 2) {
if (!innerTalkback) sink(2);
else outerEnded = true;
}
});
};
}
export function defer<T, TB>(factory: () => Callbag<TB, T>): Callbag<TB, T> {
return flattenLatest(lazy(factory));
};
export function map<I, O, TB>(f: (input: I) => O): (source: Callbag<TB, I>) => Callbag<TB, O> {
return source => (start: START | DATA | END, sink?: Callbag<O, TB> | TB) => {
if (start !== 0) return;
source(0, (t: START | DATA | END, d?: Callbag<TB, I> | I) => {
(sink as Callbag<O, TB>)(t as 0 & 1 & 2, t === 1 ? f(d as I) : d)
});
};
}
/**
* Intercepts talkback data sent to a source and provides them to a closure instead.
* Does not disconnect signals; they are sent upstream as normal.
*/
export function catchTalkback<T, TB>(talkbackHandler: (data: TB) => void): (source: Source<T>) => Callbag<TB, T> {
return (source: Source<T>) => (type: START | DATA | END, data?: Callbag<T, TB> | TB) => {
if (type !== 0) return;
let sourceTalkback: Callbag<never, unknown> | undefined;
const sink = data as Callbag<T, TB>;
source(0, (type: START | DATA | END, data?: Callbag<never, unknown> | T) => {
if (type === 0) {
sourceTalkback = data as Callbag<never, unknown>;
sink(0, (type: START | DATA | END, data?: Callbag<T, TB> | TB) => {
if (type === 1) {
talkbackHandler(data as TB);
} else {
sourceTalkback?.(type as 0 & 1 & 2, data);
}
});
} else {
sink(type as 1 & 2, data);
}
});
};
}