A streaming microservice framework for Node.js
Overview
A streaming microservice framework for Node.js with which you can perform vairous operations on real-time data like transformation,aggregation,validation etc.
Getting Started
A typical wi|las pipeline must consist of a source and a sink.
A typical source & sink represents the input stream and output sink repectively. Currently Kafka and node streams are supported.
Data transformation takes place in pipeline flows , their is no restriction in number of flows which can be added to a pipeline however their can be only on Source and one sink.
Pipeline
A wi|las pipeline can be crated by following code snipet.
const { Pipeline } = require('@xblockchainlabs/willa');
const pipeline = new Pipeline();
Source
As disccussed above source represents the input to the pipeline i.e the source is the starting point of a pipeline , currently wi|la support 2 types of source source
and sourceCommitable
.
source
can be either be a Node Stream or a Kafka Stream whereas sourceCommitable
currently supports only kafka streams.
Inorder to attach a source/sourceCommitable to the pipeline use follow code snipet.
Node Stream
pipeline.source(Stream.consumer(options));
Kafka Stream
pipeline.source(Kafka.consumer(options));
Kafka Stream (commitable)
pipeline.sourceCommitable(Kafka.consumer(options));
options
{
name: String,
errorStrategy: String ['reset' , 'uncommit' , 'report' ]
}
Note
name is mandatory to be passed , whereas default errorStrategy is set to `reset`
Flow
Flow represents or perform logical opreation for the transformation of the incomming data. Thier is no limitation on the number of flows which can be chainned in a pipeline however flow cannot be either first of last element of a pipeline.
A typical wi|la flow accpect 2 types of operation i.e lamda function and the Batch processes.
Lamda functions
.flow((data, err, next) => {
// your custom logic
next(data, err);
})
Batch
Batch is used in the cases where you want pipeline to wait for certain time / number of data inputs and then perform your logical operation.
wi|la provides two types of batch .
Reduce
.flow(Batch.reduce(options,reducerfunc,initailvalue)
Map
.flow(Batch.reduce(options,mapfunc)
options
{
number: Number,
timeout: Number (time in ms),
groupBy: String || [String],
attributes: String || [String]
}
Sink
As disccussed above sink represents the output/result to the pipeline i.e the sink is the end point of a pipeline.
source
can be either be a Node Stream or a Kafka Stream.
Inorder to attach a sink to the pipeline use follow code snipet.
.sink((data, err, next) => {
next(data, err);
});
Kafka
.sink(Kafka.producer(options));
Starting the App
once we have created your pipeline you need to add it to the app
. to do that use following code snipet.
const { App , Pipeline } = require('@xblockchainlabs/willa');
const app = App(name ,options);
app.add(pipeline);
options
{
kafka: kafkaconfig
}