Node Stream
const { App, Pipelines, Stream, Batch } = require('../');
const pipeline = Pipelines();
pipeline.source(Stream.consumer({ name: 'process' }))
.flow((data, err, next) => {
let num = parseInt(data.num);
Object.assign(data, { num: num + 1 , from : country[Math.floor(Math.random() * country.length)]});
Object.assign(data, { to : country[Math.floor(Math.random() * country.length)]});
next(data, err);
})
.sink((data, err, next) => {
console.log(JSON.stringify(data, null, 3));
next(data, err);
});
const app = App('test');
app.add(pipeline);
for (let i = 0; i < 7; i++) {
app.writeStream('process', { num: i });
}
Node Stream + Batch - Reduce
const { App, Pipelines, Stream, Batch } = require('../');
const pipeline = Pipelines();
pipeline.source(Stream.consumer({ name: 'process' }))
.flow((data, err, next) => {
let num = parseInt(data.num);
Object.assign(data, { num: num + 1 , from : countries[Math.floor(Math.random() * country.length)]});
Object.assign(data, { to : countries[Math.floor(Math.random() * country.length)]});
// throw new Error('Kaka punjabi');
//err = new Error('Kaka punjabi');
next(data, err);
})
.flow(Batch.reduce({ number: 5, timeout: 30000, groupBy: "from", attributes: ["num", "to"]},
(aggtr ,data) => {
let num = parseInt(data.num);
aggtr.number += num;
return aggtr;
}, { number:0}))
.sink((data, err, next) => {
console.log(JSON.stringify(data, null, 3));
next(data, err);
});
const app = App('test');
app.add(pipeline);
for (let i = 0; i < 7; i++) {
app.writeStream('process', { num: i });
}
Node Stream + Batch - Map
const { App, Pipelines, Stream, Batch } = require('../');
const pipeline = Pipelines();
pipeline.source(Stream.consumer({ name: 'process' }))
.flow((data, err, next) => {
let num = parseInt(data.num);
Object.assign(data, { num: num + 1 , from : countries[Math.floor(Math.random() * country.length)]});
Object.assign(data, { to : countries[Math.floor(Math.random() * country.length)]});
next(data, err);
})
.flow(Flowfunc.batch({ number: 5, timeout: 30000 }, (data, err, next) => {
console.log("...",data);
let num = parseInt(data.num);
Object.assign(data, { num: num - 1 });
return data;
}))
.sink((data, err, next) => {
// console.log(JSON.stringify(data, null, 3));
next(data, err);
});
const app = App('test');
app.add(pipeline);
for (let i = 0; i < 7; i++) {
app.writeStream('process', { num: i });
}