kafka-wire-protocol

0.1.4 • Public • Published

Build Status

kafka-wire-protocol

A pure JS (ES6) implementation of the Kafka wire protocol as described here. Deviations from those docs are described here

npm install --save kafka-wire-protocol

This is not full Kafka client, just an implementation of the base tcp wire protocol. This library focuses on supporting all APIs and all versions described in the wire protocol. The purpose of this library is to create a common building block for JS Kafka clients and as a general Kafka utility. This library does not work with Zookeeper directly.

Library Goals

  • Use pure JS (ES6).
  • Have as few external dependencies as possible
    • Currently only dependencies are a CRC32 library and a library to handle 64 bit integers in JS.
  • Support all versions of Kafka APIs
    • Since there is no Zookeeper support in this library certain operations are not possible in older versions of Kafka that required communication with Zookeeper.
  • Only implement the wire protocol. (Avoid scope creep).
    • Allow these decisions to be made independently of this library
      • Memory management
      • Connection management
      • ...

Examples

Using the protocol directly with your own tcp socket

var net = require('net');
var { Protocol } = require('kafka-wire-protocol');

var protocol = new Protocol({
  clientId: 'my-test-kafka-client'
});

var socket = net.connect({
  host: 'localhost', // assuming your running Kafka locally
  port: 9092 // default Kafka port
}, () => {

  // Attach the protocol.response handler method to the
  // on 'data' event to accumulate and parse API responses
  socket.on('data', protocol.response);
  socket.on('error', () => console.log('error'));

  // Build Metadata request buffer
  var reqBuf = protocol.request(
    'Metadata', // Name of API
    { // data structured as specified by wire protocol docs
      // to be parsed into binary message format
      topics: [
        // Assuming you've created this topic already
        { topic: 'my-test-topic' }
      ]
    },
    Buffer.alloc(2400), // Buffer to parse data into
    0, // Starting offset to use in buffer provided

    // Callback to invoke once entire response is received
    // and parsed
    (err, result) => {
      if(err) throw err;

      // Send metadata response to stdout
      console.log(JSON.stringify(result, null, 2));
    }
  );

  socket.write(reqBuf, 'binary', () => {
    // Kafka brokers seem to need help understanding a
    // message is over
    socket.write("\n\n\n\n", 'utf8');
  });
});

Doing the same thing with the SimpleClient

WARNING SimpleClient is not for Production use

var { SimpleClient } = require('kafka-wire-protocol');

var client = new SimpleClient({
  host: 'localhost', // Assuming Kafka is running locally
  port: 9092, // Default Kafka port
  clientId: 'my-test-kafka-client',
  timeout: 1000,
  acks: 1 // Level of broker persistence guarantee
}, () => { // callback invoked once connection is made
  client.request(
    'Metadata', // API name
    { // Data to be sent
      topics: [
        { topic: 'my-test-topic' }
      ]
    }, (err, result) => {
      if(err) throw err;
      console.log(JSON.stringify(result, null, 2));
    }
  );
});

Deviations from Kafka docs

  • All fields in requests/responses are in camelCase instead of the snake_case seen in the docs. (I'm open to changing this or supporting both)
  • Error code fields are replaced with an error object like this:
{
    "error": {
      "name": "OFFSET_OUT_OF_RANGE",
      "code": 1,
      "retry": false,
      "message": "The requested offset is not within the range of offsets maintained by the server."
    }
}

All api error objects can be found in the lib/protocol/definitions.js file.

Package Sidebar

Install

npm i kafka-wire-protocol

Weekly Downloads

4

Version

0.1.4

License

MIT

Last publish

Collaborators

  • bspates