Node Stream

const { App, Pipelines, Stream, Batch } = require('../');
const pipeline = Pipelines();

pipeline.source(Stream.consumer({ name: 'process' }))
.flow((data, err, next) => {
  let num = parseInt(data.num);
  Object.assign(data, { num: num + 1 , from : country[Math.floor(Math.random() * country.length)]});
  Object.assign(data, { to : country[Math.floor(Math.random() * country.length)]});
  next(data, err);
})
.sink((data, err, next) => {
  console.log(JSON.stringify(data, null, 3));
  next(data, err);
});

const app = App('test');

app.add(pipeline);

for (let i = 0; i < 7; i++) {
  app.writeStream('process', { num: i });
}

Node Stream + Batch - Reduce

const { App, Pipelines, Stream, Batch } = require('../');
const pipeline = Pipelines();

pipeline.source(Stream.consumer({ name: 'process' }))
  .flow((data, err, next) => {
    let num = parseInt(data.num);
    Object.assign(data, { num: num + 1 , from : countries[Math.floor(Math.random() * country.length)]});
    Object.assign(data, { to : countries[Math.floor(Math.random() * country.length)]});
    // throw new Error('Kaka punjabi');
    //err = new Error('Kaka punjabi');
    next(data, err);
  })
  .flow(Batch.reduce({ number: 5, timeout: 30000, groupBy: "from", attributes: ["num", "to"]}, 
    (aggtr ,data) => {
      let num = parseInt(data.num);
      aggtr.number += num;
      return aggtr;
  }, { number:0}))
  .sink((data, err, next) => {
    console.log(JSON.stringify(data, null, 3));
    next(data, err);
  });

const app = App('test');

app.add(pipeline);

for (let i = 0; i < 7; i++) {
  app.writeStream('process', { num: i });
}

Node Stream + Batch - Map

const { App, Pipelines, Stream, Batch } = require('../');
const pipeline = Pipelines();

pipeline.source(Stream.consumer({ name: 'process' }))
  .flow((data, err, next) => {
    let num = parseInt(data.num);
    Object.assign(data, { num: num + 1 , from : countries[Math.floor(Math.random() * country.length)]});
    Object.assign(data, { to : countries[Math.floor(Math.random() * country.length)]});
    next(data, err);
  })
  .flow(Flowfunc.batch({ number: 5, timeout: 30000 }, (data, err, next) => {
     console.log("...",data);
     let num = parseInt(data.num);
     Object.assign(data, { num: num - 1 });
     return data;
  }))
  .sink((data, err, next) => {
    // console.log(JSON.stringify(data, null, 3));
    next(data, err);
  });

const app = App('test');

app.add(pipeline);

for (let i = 0; i < 7; i++) {
  app.writeStream('process', { num: i });
}