kafka-queue

1.0.4 • Public • Published

Kafka Queue

Wrapper around Kafka with built in assumptions to make a keyed-message queuing system a little easier to build. This wrapper was built to support an IoT -like system, where devices in the outside world are communicating to a cloud service. Each device has a unique deviceId. Any process that consumes a device should consume devices with the same deviceId always, and those messages should arrive in order.

This wrapper uses the no-kafka library, which supports retries on connection failures automatcially. This wrapper was developed and tested using this docker container.

Usage

Producer (enqueue)

let Q = require( 'kafka-queue' )( config );
Q.producer.connect( function( err ) {
  if ( err ) exit( err );
  let message = { deviceId: '1001', p1: 'p1', p2: 'p2' };
  Q.producer.send( queueName, message, function( err ) {
    if ( err ) console.error( err );
    exit();
  });
});

Consumer (dequeue)

let Q = require( 'kafka-queue' )( config );
Q.consumer.connect( queueName, groupId, function( message, cb ) {
  let handle = message.handle;
  let msg = message.msg;
 
  console.log( JSON.stringify( msg ) );
  // when you know processing is good/done, advance the consumer's position in the queue
  // to the next message.
  cb();
});

Calling the cb() passed to your message handler controls the message commit. Called with an err as the first argument will cancel the commit.

You can explicity handle commits, although you should have to normally:

Q.consumer.commit( handle, function( err ) {
  if ( err ) console.error( 'commit error:', err );
});

The groupId is optional to the call to connect(). If not specified, then your config should contain a groupId. If groupId is explicitly passed, it will override any value specified in the config.

Config

This wrapper expects config to look like:

{
  "keyField": "deviceId",
  "connectionString": "192.168.99.103:9092",
  "logger": {
    "logLevel": 1
  }
}

The config is generally the same as documented here. The keyField is required and is the name of the field in the incoming messages (being passed to producer.send()) that contains the device id that you want to use as a key.

Example Application

You can run a simple test in this directory. The test environment consists of an "ingest.js" script that emulates three devices sending messages into the system. These messages get sent to the "ingest" queue. There is a "relayer.js" script that reads from the ingest queue and duplicates those incoming messages to a "staging" queue and a "prod" queue. There is a "pipeline.js" script that reads from the staging or prod queue (specifiy on the command line) and prints the messages to stdout.

If you create the "ingest" queue with one partition, you can run one instance of relayer.js. If you create the queue with two partions you can run two instances of the relayer.js script ... and so forth. Same with the other queues. Let us say that you create all three (ingest, staging, prod) with 2 partions each. Then you can run 2 instances of the relayer.js script and four instances of the pipeline.js script; two with "staging" as an argument and two with "prod" as an argument. Then run ingest.js and you'll see messages flow through the system, being duplicated into the two stacks and being "worked on" in the pipeline scripts.

If you kill one of the instances in a pair, you'll see the other instance begin to take over the processing of the killed instance. If you restart the killed instance, it'll begin to process its own messages again.

You should see that messages with id 'X' will always get sent to a consistent instance of the pipeline.js script, except when that instance dies, in which case 'X' will start getting processed by a remaining instance.

Setting up Kafka for this example

Create a docker machine to host zookeeper and kafka. Execute eval $(docker-machine env MACHINE), then execute "sh RUN.sh".

Reference Documentation

Readme

Keywords

none

Package Sidebar

Install

npm i kafka-queue

Weekly Downloads

1

Version

1.0.4

License

ISC

Last publish

Collaborators

  • aqpeeb