https://github.com/callbag/callbag
https://github.com/staltz/callbag-basics
https://github.com/staltz/callbag-pseudo-rxjs
//from https://www.npmjs.com/package/callbag-catch-error
const catchError = fn => source => (start, sink) => {
if (start !== 0) return;
source(0, (type, data) => {
type === 2 && typeof data !== 'undefined' ? fn(data) : sink(type, data);
});
};
function* range(from, to) {
let i = from;
while (i <= to) {
yield i;
i++;
}
}
const fromEvent = (node, name) => (start, sink) => {
if (start !== 0) return;
const handler = ev => sink(1, ev);
sink(0, t => {
if (t === 2) node.removeEventListener(name, handler);
});
node.addEventListener(name, handler);
};
const fromIter = iter => (start, sink) => {
if (start !== 0) return;
const iterator =
typeof Symbol !== 'undefined' && iter[Symbol.iterator] ?
iter[Symbol.iterator]() :
iter;
let inloop = false;
let got1 = false;
let res;
function loop() {
inloop = true;
while (got1) {
got1 = false;
res = iterator.next();
if (res.done) sink(2);
else sink(1, res.value);
}
inloop = false;
}
sink(0, t => {
if (t === 1) {
got1 = true;
if (!inloop && !(res && res.done)) loop();
}
});
};
const interval = (period) => {
return (start, sink) => {
if (start !== 0) return;
let i = 0;
const id = setInterval(() => {
sink(1, i++);
}, period);
sink(0, t => {
if (t === 2) {
clearInterval(id);
}
});
};
};
const forEach = (operation) => {
return (source) => {
let talkback;
source(0, (t, d) => {
if (t === 0) talkback = d;
if (t === 1) operation(d);
if (t === 1 || t === 0) talkback(1);
});
};
};
const loop = (operation) => {
return (source) => {
let talkback;
source(0, (t, d) => {
if (t === 0) talkback = d;
if (t === 1) operation(d);
});
};
};
const delay = (duration) => source => {
return (type, sink) => {
if (type !== 0) return
source(0, (t, d) => {
if (t !== 1) return sink(t, d)
let id = setTimeout(() => {
clearTimeout(id)
sink(1, d)
}, duration)
})
}
};
const until = (fn) => {
return (source) => {
return (type, sink) => {
if (type !== 0) return;
let talkback;
source(0, (t, d) => {
if (t === 0)
{
talkback = d;
sink(0, (t2) => {
if (t2 === 1) talkback(1);
});
}
if (t === 1) {
if (fn(d)) {
talkback(2);
} else {
sink(1, d);
}
}
});
}
}
}
function pipe(...cbs) {
let res = cbs[0];
for (let i = 1, n = cbs.length; i < n; i++) {
let f = cbs[i];
res = f(res);
}
return res;
};
//pipe(fromIter(range(10, 60)), until((d) => d > 40), forEach(x => console.log(x)));
//pipe(interval(500), until((d) => d > 5), forEach(x => console.log(x)));
//pipe(interval(20), until((d) => d > 500), loop(x => console.log(x)));
Source: http://blog.krawaller.se/posts/explaining-callbags-via-typescript-definitions/
export type START = 0; export type DATA = 1; export type END = 2; export type Callbag = (type: START | DATA | END, payload?: any) => void; export type Factory = (...args: Array<any>) => Callbag; export type Operator = (...args: Array<any>) => (source: Callbag) => Callbag; type SourceTalkback = & ((request: DATA) => void) & ((terminate: END) => void); type SinkTalkback = & ((start: START, sourceTalkback: SourceTalkback) => void) & ((deliver: DATA, data: any) => void) & ((terminate: END, error?: any) => void); type SourceInitiator = (type: START, payload: SinkTalkback) => void; type SinkConnector = (source: SourceInitiator) => SourceInitiator | void; type SourceFactory = (...args: Array<any>) => SourceInitiator; type Operator = (...args: Array<any>) => SinkConnector;

Reaping the rewards
With these typings down we can use them as a very precise vocabulary for explaining the process of using callbags!
Here’s the setup and handshake:
- We call a
SinkConnectorpassing in aSourceInitiator:sinkConnector(sourceInitiator) - The
SourceInitiatoris called with aSinkTalkback:sourceInitiator(0,sinkTalkback) - The
SinkTalkbackis called with aSourceTalkback:sinkTalkback(0,sourceTalkback)
And data passing:
- The
SourceTalkbackis called with a request for data (if pullable):sourceTalkback(1) - The
SinkTalkbackis called with data delivery:sinkTalkback(1,data)
And termination:
- The
SourceTalkbackcan be called to terminate the relationship:sourceTalkback(2) - The
SinkTalkbackcan be called to terminate the relationship:sinkTalkback(2) - The
SinkTalkbackcan be called to terminate the relationship due to an error:sinkTalkback(2,error)
https://egghead.io/articles/comparing-callbags-to-rxjs-for-reactive-programming
https://github.com/staltz/callbag-pseudo-rxjs

113800cookie-checkCallbags