Source: https://github.com/staltz/callbag-flatten
const fromIter = require('callbag-from-iter'); const iterate = require('callbag-iterate'); const flatten = require('callbag-flatten'); const pipe = require('callbag-pipe'); const map = require('callbag-map'); const source = pipe( fromIter('hi'), map(char => pipe( fromIter([10, 20, 30]), map(num => char + num) )), flatten, iterate(x => console.log(x)) ); // h10 // h20 // h30 // i10 // i20 // i30
let source = pipe( fromIter(["./BaggingInstructionTemplate.csv"]), // start with an array of arguments map((fileName : string) => fromCsvFile(fileName, { delimiter: ',' })), // map each arguments to a new source flatten, // flatten the sources, so the results from the file will become the data filter((d: any) => parseInt(d[1]) > 40), // filter the data on column value take(2), // take just the first 2 rows that passed the filter map((r: any) => [r[0], r[1]]), // map the data as we like //forEach((x: any) => console.log(x)) // we could log here, but the result of a pipe is a source also! ) let src2 = merge(source, source); // we could merge data from 2 different sources, or just execute the full pipe above twice forEach(console.log)(src2); // call forEach with a funcion to execute, that returns a sink, that can be called with a source as argument
import Promise from 'bluebird'; var glob = require('glob'); const globAsync : Function = Promise.promisify(glob); pipe( fromPromise(globAsync('*.csv')), map((x: any) => { return fromIter(x); }), flatten, map((x: any) => { console.log(x); return x; }), map((fileName : string) => fromCsvFile(fileName, { delimiter: ',' })), // map each arguments to a new source flatten, // flatten the sources, so the results from the file will become the data filter((d: any) => parseInt(d[1]) > 40), // filter the data on column value take(2), // take just the first 2 rows that passed the filter map((r: any) => [r[0], r[1]]), // map the data as we like forEach((x: any) => console.log(x)) )
import { MessageType } from "../enums"; var fs = require('fs'); var parse = require('csv-parse'); interface CsvOptions { delimiter: string } export let fromCsvFile = (fileName: string, options: CsvOptions = { delimiter: ':' }) => (start: number, sink: any) => { if (start !== MessageType.START) return; let closed = false; let stream = fs.createReadStream(fileName) stream .pipe(parse(options)) .on('data', (row: any) => { if (!closed) { sink(MessageType.DATA, row); } }) .on('error', (err: any) => { if (!closed) { closed = true; sink(MessageType.END, err); } }) .on('end', function () { closed = true; sink(MessageType.END); }); sink(MessageType.START, (t: number) => { if (t === MessageType.END) { closed = true; stream.close(); } }); };
128100cookie-checkCallbag examples