wi|la

A streaming microservice framework for Node.js


Overview

A streaming microservice framework for Node.js with which you can perform vairous operations on real-time data like transformation,aggregation,validation etc.


Getting Started

A typical wi|las pipeline must consist of a source and a sink.

A typical source & sink represents the input stream and output sink repectively. Currently Kafka and node streams are supported.

Data transformation takes place in pipeline flows , their is no restriction in number of flows which can be added to a pipeline however their can be only on Source and one sink.

Pipeline

A wi|las pipeline can be crated by following code snipet.

const { Pipeline } = require('@xblockchainlabs/willa');
const pipeline = new Pipeline();

Source

As disccussed above source represents the input to the pipeline i.e the source is the starting point of a pipeline , currently wi|la support 2 types of source source and sourceCommitable.

source can be either be a Node Stream or a Kafka Stream whereas sourceCommitable currently supports only kafka streams.

Inorder to attach a source/sourceCommitable to the pipeline use follow code snipet.

Node Stream

pipeline.source(Stream.consumer(options));

Kafka Stream

pipeline.source(Kafka.consumer(options));

Kafka Stream (commitable)

pipeline.sourceCommitable(Kafka.consumer(options));

options

{
  name: String,
  errorStrategy: String ['reset' , 'uncommit' , 'report' ]
}

Note

name is mandatory to be passed , whereas default errorStrategy is set to `reset`

Flow

Flow represents or perform logical opreation for the transformation of the incomming data. Thier is no limitation on the number of flows which can be chainned in a pipeline however flow cannot be either first of last element of a pipeline.

A typical wi|la flow accpect 2 types of operation i.e lamda function and the Batch processes.

Lamda functions

.flow((data, err, next) => {
    // your custom logic
    next(data, err);
  })

Batch

Batch is used in the cases where you want pipeline to wait for certain time / number of data inputs and then perform your logical operation.

wi|la provides two types of batch .

Reduce

.flow(Batch.reduce(options,reducerfunc,initailvalue)

Map

.flow(Batch.reduce(options,mapfunc)

options

{ 
  number: Number, 
  timeout: Number (time in ms),
  groupBy: String || [String], 
  attributes: String || [String]
}

Sink

As disccussed above sink represents the output/result to the pipeline i.e the sink is the end point of a pipeline.

source can be either be a Node Stream or a Kafka Stream.

Inorder to attach a sink to the pipeline use follow code snipet.

.sink((data, err, next) => {
    next(data, err);
  });

Kafka

.sink(Kafka.producer(options));

Starting the App

once we have created your pipeline you need to add it to the app. to do that use following code snipet.

const { App , Pipeline } = require('@xblockchainlabs/willa');
const app = App(name ,options);
app.add(pipeline);

options

{
  kafka: kafkaconfig
}