From 70946d450ae7e1b5c9ddc8ae9e045669dd4c4130 Mon Sep 17 00:00:00 2001 From: Tangent Wantwight Date: Sat, 2 May 2020 13:53:43 -0400 Subject: [PATCH] vendor some callbag utilities for better typings --- src/Utilities/Callbag.ts | 191 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 191 insertions(+) create mode 100644 src/Utilities/Callbag.ts diff --git a/src/Utilities/Callbag.ts b/src/Utilities/Callbag.ts new file mode 100644 index 0000000..61cc2c3 --- /dev/null +++ b/src/Utilities/Callbag.ts @@ -0,0 +1,191 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2018 André Staltz(staltz.com) + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +import { Callbag, DATA, END, Sink, Source, START } from "callbag"; + +// vendoring callbag utilities for now until the typings improve upstream + +export function interval(period: number): Callbag { + return (start: START | DATA | END, data?: Sink | number) => { + if (start !== 0) return; + const sink = data as Sink; + let i = 0; + const id = setInterval(() => { + sink(1, i++); + }, period); + sink(0, (type: START | DATA | END) => { + if (type === 2) { + clearInterval(id); + } + }); + }; +} + +export function makeSubject(): Callbag { + const sinks: Sink[] = []; + return (type: START | DATA | END, data?: Sink | T) => { + if (type === 0) { + const sink = data as Sink; + sinks.push(sink); + sink(0, (t: START | DATA | END) => { + if (t === 2) { + const i = sinks.indexOf(sink); + if (i > -1) sinks.splice(i, 1); + } + }); + } else { + const zinkz = sinks.slice(0); + for (let i = 0, n = zinkz.length, sink; i < n; i++) { + sink = zinkz[i]; + if (sinks.indexOf(sink) > -1) { + sink(type as 1 & 2, data); + } + } + } + }; +} + +// modified to never send talkbacks up the chain to avoid confusing subject sources by mistake +export function merge(...sources: Callbag[]): Source { + return (start: START | DATA | END, data?: Callbag | void) => { + if (start !== 0) return; + const sink = data as Callbag; + const n = sources.length; + const sourceTalkbacks: (Sink | undefined)[] = new Array(n); + let startCount = 0; + let endCount = 0; + let ended = false; + const talkback = (t: START | DATA | END, d?: Callbag | void) => { + if (t === 2) ended = true; + //for (let i = 0; i < n; i++) sourceTalkbacks[i]?.(t as 0 & 1 & 2, d); + }; + for (let i = 0; i < n; i++) { + if (ended) return; + sources[i](0, (t: START | DATA | END, d?: Callbag | T) => { + if (t === 0) { + sourceTalkbacks[i] = d as Callbag; + if (++startCount === 1) sink(0, talkback); + } else if (t === 2 && d) { + ended = true; + for (let j = 0; j < n; j++) { + if (j !== i) sourceTalkbacks[j]?.(2); + } + sink(2, d); + } else if (t === 2) { + sourceTalkbacks[i] = void 0; + if (++endCount === n) sink(2); + } else { + sink(t as 0 & 1 & 2, d); + } + }); + } + }; +} + +export function lazy(f: () => T): Callbag { + return (start: START | DATA | END, data?: Callbag | TB) => { + if (start === 0) { + const sink = data as Callbag; + let unsubed = false; + sink(0, (type: START | DATA | END) => { + if (type === 2) unsubed = true; + }); + sink(1, f()); + if (!unsubed) sink(2); + } + }; +} + +// callbag-flatten, but pulling sources has been disabled in favor of passing talkback signals to the current source +export function flattenLatest(source: Callbag>): Callbag { + return (start: START | DATA | END, data?: Callbag | TB) => { + if (start !== 0) return; + const sink = data as Callbag; + let outerEnded = false; + let outerTalkback: Callbag; + let innerTalkback: Callbag | undefined; + function talkback(t: START | DATA | END, d?: Callbag | TB) { + if (t === 1) { + //(innerTalkback || outerTalkback)(1, d as TB); + innerTalkback?.(1, d as TB); + } + if (t === 2) { + innerTalkback?.(2); + innerTalkback = void 0; + outerTalkback(2); + } + } + source(0, (T: START | DATA | END, D?: Callbag> | Callbag) => { + if (T === 0) { + outerTalkback = D as Callbag>; + sink(0, talkback); + } else if (T === 1) { + const innerSource = D as Callbag; + innerTalkback?.(2); + innerTalkback = void 0; + innerSource(0, (t: START | DATA | END, d?: Callbag | T) => { + if (t === 0) { + innerTalkback = d as Callbag; + //innerTalkback(1); + } else if (t === 1) { + sink(1, d as T); + } + else if (t === 2 && d) { + outerTalkback(2); + sink(2, d); + } else if (t === 2) { + if (outerEnded) { + sink(2); + } + else { + innerTalkback = void 0; + //outerTalkback(1); + } + } + }); + } else if (T === 2 && D) { + innerTalkback?.(2); + innerTalkback = void 0; + sink(2, D); + } else if (T === 2) { + if (!innerTalkback) sink(2); + else outerEnded = true; + } + }); + }; +} + +export function defer(factory: () => Callbag): Callbag { + return flattenLatest(lazy(factory)); +}; + +export function map(f: (input: I) => O): (source: Callbag) => Callbag { + return source => (start: START | DATA | END, sink?: Callbag | TB) => { + if (start !== 0) return; + source(0, (t: START | DATA | END, d?: Callbag | I) => { + (sink as Callbag)(t as 0 & 1 & 2, t === 1 ? f(d as I) : d) + }); + }; +} +