Callbag examples

Date: 2018-09-18


const fromIter = require('callbag-from-iter');
const iterate = require('callbag-iterate');
const flatten = require('callbag-flatten');
const pipe = require('callbag-pipe');
const map = require('callbag-map');

const source = pipe(
  map(char => pipe(
    fromIter([10, 20, 30]),
    map(num => char + num)
  iterate(x => console.log(x))

// h10
// h20
// h30
// i10
// i20
// i30
let source = pipe(
    fromIter(["./BaggingInstructionTemplate.csv"]), // start with an array of arguments
    map((fileName : string) => fromCsvFile(fileName, { delimiter: ',' })), // map each arguments to a new source
    flatten, // flatten the sources, so the results from the file will become the data
    filter((d: any) => parseInt(d[1]) > 40), // filter the data on column value
    take(2), // take just the first 2 rows that passed the filter
    map((r: any) => [r[0], r[1]]), // map the data as we like 
    //forEach((x: any) => console.log(x)) // we could log here, but the result of a pipe is a source also!

let src2  = merge(source, source); // we could merge data from 2 different sources, or just execute the full pipe above twice

forEach(console.log)(src2); // call forEach with a funcion to execute, that returns a sink, that can be called with a source as argument
import Promise from 'bluebird';
var glob = require('glob');
const globAsync : Function = Promise.promisify(glob);

    map((x: any) => {
        return fromIter(x);
    map((x: any) => {
        return x;
    map((fileName : string) => fromCsvFile(fileName, { delimiter: ',' })), // map each arguments to a new source
    flatten, // flatten the sources, so the results from the file will become the data
    filter((d: any) => parseInt(d[1]) > 40), // filter the data on column value
    take(2), // take just the first 2 rows that passed the filter
    map((r: any) => [r[0], r[1]]), // map the data as we like
    forEach((x: any) => console.log(x))
import { MessageType } from "../enums";

var fs = require('fs');
var parse = require('csv-parse');

interface CsvOptions {
    delimiter: string

export let fromCsvFile = (fileName: string, options: CsvOptions = { delimiter: ':' }) => (start: number, sink: any) => {
    if (start !== MessageType.START) return;

    let closed = false;
    let stream = fs.createReadStream(fileName)
        .on('data', (row: any) => {
            if (!closed) {
                sink(MessageType.DATA, row);
        .on('error', (err: any) => {
            if (!closed) {
                closed = true;
                sink(MessageType.END, err);
        .on('end', function () {
            closed = true;
    sink(MessageType.START, (t: number) => {
        if (t === MessageType.END) {
            closed = true;
12810cookie-checkCallbag examples