Callbag source for source (source as trigger for another one)

Date: 2018-09-17
import { MessageType } from "../enums";

export let sourceForSource = (sourceA: Function, sourceB: Function) => {
    return (start: number, sink: any) => {
        if (start !== MessageType.START) return;
        let talkbackA : Function, talkbackB : Function;
        sourceA(MessageType.START, (t: number, data: any) => {
            if (t === MessageType.START) {
                talkbackA = data;
            }
            if (t === MessageType.DATA) {
                sourceB(MessageType.START, (t2: number, data2: any) => {
                    if (t2 === MessageType.START) {
                        talkbackB = data2;
                    }
                    if (t2 === MessageType.DATA) {
                        sink(t2, data2);
                    }
                });
            }
        });
        sink(MessageType.START, (t: number) => {
            if (t === MessageType.END) {
                if (talkbackB) {
                    talkbackB(MessageType.END);
                }
                if (talkbackA) {
                    talkbackA(MessageType.END);
                }
            }
        });
    };
}
pipe(
    sourceForSource(interval(1000), fromCsvFile("./BaggingInstructionTemplate.csv", { delimiter: ',' })),
    filter((d: any) => parseInt(d[1]) > 40),
    take(8),
    map((r: any) => [r[0], r[1]]),
    forEach((x: any) => console.log(x))
)

 

12740cookie-checkCallbag source for source (source as trigger for another one)