Configurations are stored in configuration files within your application, and can be overridden and extended by environment variables, command line parameters, or external sources. See : http://lorenwest.github.io/node-config/
Create a config folder
Create a file(s) with the name of your environnement(s) like test.json
Paste this configuration template :
{
"name":"your-application-name",
"kafka":{
"consumer":{
"config":{
"metadata.broker.list":"localhost:9094",
"group.id":"test.group"
}
},
"producer":{
"config":{
"metadata.broker.list":"localhost:9094"
},
"topicsPrefix":"s3pweb."
}
},
"logger":{
"source":false,
"console":{
"enable":true,
"level":"debug"
},
"file":{
"enable":true,
"level":"info",
"dir":"./logs"
},
"server":{
"enable":true,
"level":"trace",
"url":"0.0.0.0",
"port":"9998",
"type":"elk"
},
"ringBuffer":{
"enable":true,
"size":50
}
}
}
Create instance
var producer =require("..").producer({ log: log, prom: clientPrometheus });
var consumer =require("..").consumer({ log: log, prom: clientPrometheus });
or you could overwrite the config section by passing it as params
var producer =require("..").producer({ log: log, prom: clientPrometheus, config:{"metadata.broker.list":"localhost:9094"});
var consumer =require("..").consumer({ log: log, prom: clientPrometheus, config:{"metadata.broker.list":"localhost:9094","group.id":"test.group"}});
Example :
To produce some messages
constclientPrometheus=require("prom-client");
// const faker = require('faker')
functionlater(delay){
returnnewPromise(function(resolve){
setTimeout(resolve, delay);
});
}
var t =async()=>{
var log =require("s3pweb-logger").logger;
try{
var producer =require("..").producer({ log: log, prom: clientPrometheus });
awaitproducer.connect();
console.log("connected");
console.log("wait a little");
awaitlater(200);
// var topicName = faker.random.alphaNumeric(10)
var topicName ="test223";
for(let index =0; index <10; index++){
try{
var p1 =producer.sendMessagesAndWaitReport({
topic: topicName,
messages:[
{ message:1.1},
{ message:1.2},
{ message:1.3},
{ message:1.4},
{ message:1.1},
{ message:1.2},
{ message:1.3},
{ message:1.4}
],
partition:0,
key:"key1"
});
var p2 =producer.sendMessagesAndWaitReport({
topic: topicName,
messages:[
{ message:2.1},
{ message:2.2},
{ message:2.3},
{ message:2.4},
{ message:2.1},
{ message:2.2},
{ message:2.3},
{ message:2.4},
{ message:2.1},
{ message:2.2},
{ message:2.3},
{ message:2.4}
]
});
awaitPromise.all([p1, p2]);
}catch(error){
console.log(`Loop ${index} -> error`, error);
awaitlater(5000);
}
awaitlater(750);
}
awaitproducer.disconnect();
}catch(error){
console.log(error);
}
};
t();
To consume some messages
constclientPrometheus=require("prom-client");
functionlater(delay){
returnnewPromise(function(resolve){
setTimeout(resolve, delay);
});
}
var t =async()=>{
var log =require("s3pweb-logger").logger;
var consumer =require("..").consumer({ log: log, prom: clientPrometheus });
awaitconsumer.connect(["s3pweb.test223"]);
console.log("wait a little");
awaitlater(200);
var previousPartition =0;
var cpt =0;
while(cpt <200){
var messages =awaitconsumer.listen({number:10, autoCommit:true);
for(let message of messages){
cpt++;
var txt =`${message.partition} - ${message.offset} * `;