https://github.com/callbag/callbag
https://github.com/staltz/callbag-basics
https://github.com/staltz/callbag-pseudo-rxjs
//from https://www.npmjs.com/package/callbag-catch-error const catchError = fn => source => (start, sink) => { if (start !== 0) return; source(0, (type, data) => { type === 2 && typeof data !== 'undefined' ? fn(data) : sink(type, data); }); }; function* range(from, to) { let i = from; while (i <= to) { yield i; i++; } } const fromEvent = (node, name) => (start, sink) => { if (start !== 0) return; const handler = ev => sink(1, ev); sink(0, t => { if (t === 2) node.removeEventListener(name, handler); }); node.addEventListener(name, handler); }; const fromIter = iter => (start, sink) => { if (start !== 0) return; const iterator = typeof Symbol !== 'undefined' && iter[Symbol.iterator] ? iter[Symbol.iterator]() : iter; let inloop = false; let got1 = false; let res; function loop() { inloop = true; while (got1) { got1 = false; res = iterator.next(); if (res.done) sink(2); else sink(1, res.value); } inloop = false; } sink(0, t => { if (t === 1) { got1 = true; if (!inloop && !(res && res.done)) loop(); } }); }; const interval = (period) => { return (start, sink) => { if (start !== 0) return; let i = 0; const id = setInterval(() => { sink(1, i++); }, period); sink(0, t => { if (t === 2) { clearInterval(id); } }); }; }; const forEach = (operation) => { return (source) => { let talkback; source(0, (t, d) => { if (t === 0) talkback = d; if (t === 1) operation(d); if (t === 1 || t === 0) talkback(1); }); }; }; const loop = (operation) => { return (source) => { let talkback; source(0, (t, d) => { if (t === 0) talkback = d; if (t === 1) operation(d); }); }; }; const delay = (duration) => source => { return (type, sink) => { if (type !== 0) return source(0, (t, d) => { if (t !== 1) return sink(t, d) let id = setTimeout(() => { clearTimeout(id) sink(1, d) }, duration) }) } }; const until = (fn) => { return (source) => { return (type, sink) => { if (type !== 0) return; let talkback; source(0, (t, d) => { if (t === 0) { talkback = d; sink(0, (t2) => { if (t2 === 1) talkback(1); }); } if (t === 1) { if (fn(d)) { talkback(2); } else { sink(1, d); } } }); } } } function pipe(...cbs) { let res = cbs[0]; for (let i = 1, n = cbs.length; i < n; i++) { let f = cbs[i]; res = f(res); } return res; }; //pipe(fromIter(range(10, 60)), until((d) => d > 40), forEach(x => console.log(x))); //pipe(interval(500), until((d) => d > 5), forEach(x => console.log(x))); //pipe(interval(20), until((d) => d > 500), loop(x => console.log(x)));
Source: http://blog.krawaller.se/posts/explaining-callbags-via-typescript-definitions/
export type START = 0; export type DATA = 1; export type END = 2; export type Callbag = (type: START | DATA | END, payload?: any) => void; export type Factory = (...args: Array<any>) => Callbag; export type Operator = (...args: Array<any>) => (source: Callbag) => Callbag; type SourceTalkback = & ((request: DATA) => void) & ((terminate: END) => void); type SinkTalkback = & ((start: START, sourceTalkback: SourceTalkback) => void) & ((deliver: DATA, data: any) => void) & ((terminate: END, error?: any) => void); type SourceInitiator = (type: START, payload: SinkTalkback) => void; type SinkConnector = (source: SourceInitiator) => SourceInitiator | void; type SourceFactory = (...args: Array<any>) => SourceInitiator; type Operator = (...args: Array<any>) => SinkConnector;
Reaping the rewards
With these typings down we can use them as a very precise vocabulary for explaining the process of using callbags!
Here’s the setup and handshake:
- We call a
SinkConnector
passing in aSourceInitiator
:sinkConnector(sourceInitiator)
- The
SourceInitiator
is called with aSinkTalkback
:sourceInitiator(0,sinkTalkback)
- The
SinkTalkback
is called with aSourceTalkback
:sinkTalkback(0,sourceTalkback)
And data passing:
- The
SourceTalkback
is called with a request for data (if pullable):sourceTalkback(1)
- The
SinkTalkback
is called with data delivery:sinkTalkback(1,data)
And termination:
- The
SourceTalkback
can be called to terminate the relationship:sourceTalkback(2)
- The
SinkTalkback
can be called to terminate the relationship:sinkTalkback(2)
- The
SinkTalkback
can be called to terminate the relationship due to an error:sinkTalkback(2,error)
https://egghead.io/articles/comparing-callbags-to-rxjs-for-reactive-programming
https://github.com/staltz/callbag-pseudo-rxjs
113800cookie-checkCallbags