Callbags

Date: 2018-06-18

https://github.com/callbag/callbag

https://github.com/staltz/callbag-basics

https://github.com/staltz/callbag-pseudo-rxjs

//from https://www.npmjs.com/package/callbag-catch-error
const catchError = fn => source => (start, sink) => {
  if (start !== 0) return;
  source(0, (type, data) => {
    type === 2 && typeof data !== 'undefined' ? fn(data) : sink(type, data);
  });
};

function* range(from, to) {
  let i = from;
  while (i <= to) {
    yield i;
    i++;
  }
}

const fromEvent = (node, name) => (start, sink) => {
  if (start !== 0) return;
  const handler = ev => sink(1, ev);
  sink(0, t => {
    if (t === 2) node.removeEventListener(name, handler);
  });
  node.addEventListener(name, handler);
};

const fromIter = iter => (start, sink) => {
    if (start !== 0) return;
    const iterator =
        typeof Symbol !== 'undefined' && iter[Symbol.iterator] ?
        iter[Symbol.iterator]() :
        iter;
    let inloop = false;
    let got1 = false;
    let res;

    function loop() {
        inloop = true;
        while (got1) {
            got1 = false;
            res = iterator.next();
            if (res.done) sink(2);
            else sink(1, res.value);
        }
        inloop = false;
    }
    sink(0, t => {
        if (t === 1) {
            got1 = true;
            if (!inloop && !(res && res.done)) loop();
        }
    });
};

const interval = (period) => {
    return (start, sink) => {
        if (start !== 0) return;
        let i = 0;
        const id = setInterval(() => {
            sink(1, i++);
        }, period);
        sink(0, t => {
            if (t === 2) {
                clearInterval(id);
            }
        });
    };
};

const forEach = (operation) => {
    return (source) => {
        let talkback;
        source(0, (t, d) => {
            if (t === 0) talkback = d;
            if (t === 1) operation(d);
            if (t === 1 || t === 0) talkback(1);
        });
    };
};

const loop = (operation) => {
    return (source) => {
        let talkback;
        source(0, (t, d) => {
            if (t === 0) talkback = d;
            if (t === 1) operation(d);
        });
    };
};

const delay = (duration) => source => {
    return (type, sink) => {
        if (type !== 0) return
        source(0, (t, d) => {
            if (t !== 1) return sink(t, d)
            let id = setTimeout(() => {
                clearTimeout(id)
                sink(1, d)
            }, duration)
        })
    }
};

const until = (fn) => {
    return (source) => {
        return (type, sink) => {
            if (type !== 0) return;
            let talkback;
            source(0, (t, d) => {
                if (t === 0)
                {
                    talkback = d;
                    sink(0, (t2) => {
                        if (t2 === 1) talkback(1);
                    });
                }
                if (t === 1) {
                    if (fn(d)) {
                        talkback(2);
                    } else {
                        sink(1, d);
                    }
                }
            });
        }
    }
}

function pipe(...cbs) {
    let res = cbs[0];
    for (let i = 1, n = cbs.length; i < n; i++) {
        let f = cbs[i];
        res = f(res);
    }
    return res;
};

//pipe(fromIter(range(10, 60)),  until((d) => d > 40), forEach(x => console.log(x)));
//pipe(interval(500), until((d) => d > 5), forEach(x => console.log(x)));
//pipe(interval(20), until((d) => d > 500), loop(x => console.log(x)));

Source: http://blog.krawaller.se/posts/explaining-callbags-via-typescript-definitions/

export type START = 0;
export type DATA = 1;
export type END = 2;

export type Callbag = (type: START | DATA | END, payload?: any) => void;
export type Factory = (...args: Array<any>) => Callbag;
export type Operator = (...args: Array<any>) => (source: Callbag) => Callbag;

type SourceTalkback = 
  & ((request: DATA) => void)
  & ((terminate: END) => void);

type SinkTalkback = 
  & ((start: START, sourceTalkback: SourceTalkback) => void)
  & ((deliver: DATA, data: any) => void)
  & ((terminate: END, error?: any) => void);

type SourceInitiator = (type: START, payload: SinkTalkback) => void;

type SinkConnector = (source: SourceInitiator) => SourceInitiator | void;

type SourceFactory = (...args: Array<any>) => SourceInitiator;

type Operator = (...args: Array<any>) => SinkConnector;

Reaping the rewards

With these typings down we can use them as a very precise vocabulary for explaining the process of using callbags!

Here’s the setup and handshake:

  1. We call a SinkConnector passing in a SourceInitiatorsinkConnector(sourceInitiator)
  2. The SourceInitiator is called with a SinkTalkbacksourceInitiator(0,sinkTalkback)
  3. The SinkTalkback is called with a SourceTalkbacksinkTalkback(0,sourceTalkback)

And data passing:

  1. The SourceTalkback is called with a request for data (if pullable): sourceTalkback(1)
  2. The SinkTalkback is called with data delivery: sinkTalkback(1,data)

And termination:

  • The SourceTalkback can be called to terminate the relationship: sourceTalkback(2)
  • The SinkTalkback can be called to terminate the relationship: sinkTalkback(2)
  • The SinkTalkback can be called to terminate the relationship due to an error: sinkTalkback(2,error)

https://egghead.io/articles/comparing-callbags-to-rxjs-for-reactive-programming

https://github.com/staltz/callbag-pseudo-rxjs

11380cookie-checkCallbags