Source: https://github.com/staltz/callbag-flatten
const fromIter = require('callbag-from-iter');
const iterate = require('callbag-iterate');
const flatten = require('callbag-flatten');
const pipe = require('callbag-pipe');
const map = require('callbag-map');
const source = pipe(
fromIter('hi'),
map(char => pipe(
fromIter([10, 20, 30]),
map(num => char + num)
)),
flatten,
iterate(x => console.log(x))
);
// h10
// h20
// h30
// i10
// i20
// i30
let source = pipe(
fromIter(["./BaggingInstructionTemplate.csv"]), // start with an array of arguments
map((fileName : string) => fromCsvFile(fileName, { delimiter: ',' })), // map each arguments to a new source
flatten, // flatten the sources, so the results from the file will become the data
filter((d: any) => parseInt(d[1]) > 40), // filter the data on column value
take(2), // take just the first 2 rows that passed the filter
map((r: any) => [r[0], r[1]]), // map the data as we like
//forEach((x: any) => console.log(x)) // we could log here, but the result of a pipe is a source also!
)
let src2 = merge(source, source); // we could merge data from 2 different sources, or just execute the full pipe above twice
forEach(console.log)(src2); // call forEach with a funcion to execute, that returns a sink, that can be called with a source as argument
import Promise from 'bluebird';
var glob = require('glob');
const globAsync : Function = Promise.promisify(glob);
pipe(
fromPromise(globAsync('*.csv')),
map((x: any) => {
return fromIter(x);
}),
flatten,
map((x: any) => {
console.log(x);
return x;
}),
map((fileName : string) => fromCsvFile(fileName, { delimiter: ',' })), // map each arguments to a new source
flatten, // flatten the sources, so the results from the file will become the data
filter((d: any) => parseInt(d[1]) > 40), // filter the data on column value
take(2), // take just the first 2 rows that passed the filter
map((r: any) => [r[0], r[1]]), // map the data as we like
forEach((x: any) => console.log(x))
)
import { MessageType } from "../enums";
var fs = require('fs');
var parse = require('csv-parse');
interface CsvOptions {
delimiter: string
}
export let fromCsvFile = (fileName: string, options: CsvOptions = { delimiter: ':' }) => (start: number, sink: any) => {
if (start !== MessageType.START) return;
let closed = false;
let stream = fs.createReadStream(fileName)
stream
.pipe(parse(options))
.on('data', (row: any) => {
if (!closed) {
sink(MessageType.DATA, row);
}
})
.on('error', (err: any) => {
if (!closed) {
closed = true;
sink(MessageType.END, err);
}
})
.on('end', function () {
closed = true;
sink(MessageType.END);
});
sink(MessageType.START, (t: number) => {
if (t === MessageType.END) {
closed = true;
stream.close();
}
});
};
128100cookie-checkCallbag examples