This package has been deprecated

Author message:

Package no longer supported. Use at your own risk.

node-rdkafka-commit-manager
TypeScript icon, indicating that this package has built-in type declarations

2.0.2 • Public • Published

node-rdkafka-commit-manager

Build Status Coverage Status npm version

A simple helper for controlling when an offset is ready to be committed via node-rdkafka.

Goals

This package is intended to help you implement at-least-once processing without making a network call for every message you process.

During frequent use, the commit manager will commit offsets only as often as a configurable commit interval.

During infrequent use, the commit manager will always immediately commit if it has seen a period of inactivity exceeding the configurable commit interval.

useCommitManager

The commit manager is exposed via a hook function (you don't need to be using React in order to use this.)

const { readyToCommit, onRebalance } = useCommitManager(consumer, commitIntervalMs);

Arguments

  • consumer: this must be a KafkaConsumer object from node-rdkafka. It must be configured with auto-commit turned off.
  • commitIntervalMs: the number of milliseconds the commit manager should wait between commits.
    • optional: defaults to 5000

Returned Functions

  • readyToCommit(data)
    • data: This must be an object containing the properties "topic", "partition", and "offset" from the Kafka mesage.
    • Call this whenever you are finished with a Kafka message.
  • onRebalance()
    • Call this whenever your KafkaConsumer has its partitions revoked.

Usage Example

This package can be used from JavaScript or TypeScript.

The examples below illustrate a few key points about using this commit manager:

  1. Disable auto-commit feature on your KafkaConsumer.

    • Otherwise, the commit manager will be competing with node-rdkafka's auto-commit behavior.
  2. Use non-flowing mode on your KafkaConsumer.

    • Otherwise, node-rdkafka will provide batches of messages, which may be handled out of order.
    • Since non-flowing mode only allows for one message at a time, it will significantly slow the rate at which a single consumer can process messages, so additional horizontal scaling is necessary to compensate.
  3. Implement a rebalance callback function which calls the the commit manager's onRebalance function any time your KafkaConsumer's partitions are revoked.

    • Otherwise, the commit manager may later try to commit offsets for partitions which it is no longer assigned.

    • The above example covers the minimum responsibilities of the function. See the node-rdkafka and/or librdkafka documentation for more details.

  4. Call the commit manager's readyToCommit function for each Kafka message you process.

    • Only call readyToCommit when you have finished processing the message.
    • Since the data objects provided by node-rdkafka's KafkaConsumer already have all of the necessary properties, you can just use those if you want to.

TypeScript

import { useCommitManager } from "node-rdkafka-commit-manager";
import { CODES, KafkaConsumer } from "node-rdkafka";
 
const consumeNonFlowing = (consumer, consumeTimeout) => {
  consumer.consume(1);
  return setInterval(function() {
    consumer.consume(1);
  }, CONSUME_TIMEOUT_MS);
};
 
const stopConsuming = (consumeInterval) => {
  if (consumeInterval) {
    clearInterval(consumeInterval);
  }
};
 
const rebalanceCallback = async (err: any, assignments: any) => {
  if (err.code === CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
    consumer.assign(assignments);
  } else if (err.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
    consumer.unassign();
    onRebalance();
  } else {
    console.error(`Kafka rebalance error : ${err}`);
  }
};
 
const consumer = new KafkaConsumer(
  {
    "enable.auto.commit": false,
    rebalance_cb: rebalanceCallback.bind(this),
    // <Your global config here (ex: authentication, consumer group, etc.)>
  },
  {
    // <Your topic config here>
  }
);
 
const { readyToCommit, onRebalance } = useCommitManager(consumer);
const CONSUME_TIMEOUT_MS = 1000;
let consumeInterval: NodeJS.Timeout;
consumer
  .on("ready", function() {
    consumer.subscribe(["sample.test.topic"]);
    consumer.setDefaultConsumeTimeout(CONSUME_TIMEOUT_MS);
    consumeInterval = consumeNonFlowing(consumer, CONSUME_TIMEOUT_MS);
  })
  .on("data", function(data: any) {
    stopConsuming(consumeInterval);
    // <Process the Kafka message here.>
    readyToCommit(data);
    consumeInterval = consumeNonFlowing(consumer, CONSUME_TIMEOUT_MS);
  })
  .connect();

JavaScript

const { useCommitManager } = require("node-rdkafka-commit-manager");
const { CODES, KafkaConsumer } = require("node-rdkafka");
 
const consumeNonFlowing = (consumer, consumeTimeout) => {
  consumer.consume(1);
  return setInterval(function() {
    consumer.consume(1);
  }, CONSUME_TIMEOUT_MS);
};
 
const stopConsuming = (consumeInterval) => {
  if (consumeInterval) {
    clearInterval(consumeInterval);
  }
};
 
const rebalanceCallback = async (err, assignments) => {
  if (err.code === CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
    consumer.assign(assignments);
  } else if (err.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
    consumer.unassign();
    onRebalance();
  } else {
    console.error(`Kafka rebalance error : ${err}`);
  }
};
 
const consumer = new KafkaConsumer(
  {
    "enable.auto.commit": false,
    rebalance_cb: rebalanceCallback.bind(this),
    // <Your global config here (ex: authentication, consumer group, etc.)>
  },
  {
    // <Your topic config here>
  }
);
 
const { readyToCommit, onRebalance } = useCommitManager(consumer);
const CONSUME_TIMEOUT_MS = 1000;
let consumeInterval;
consumer
  .on("ready", function() {
    consumer.subscribe(["sample.test.topic"]);
    consumer.setDefaultConsumeTimeout(CONSUME_TIMEOUT_MS);
    consumeInterval = consumeNonFlowing(consumer, CONSUME_TIMEOUT_MS);
  })
  .on("data", function(data) {
    stopConsuming(consumeInterval);
    // <Process the Kafka message here.>
    readyToCommit(data);
    consumeInterval = consumeNonFlowing(consumer, CONSUME_TIMEOUT_MS);
  })
  .connect();

Package Sidebar

Install

npm i node-rdkafka-commit-manager

Weekly Downloads

2

Version

2.0.2

License

MIT

Unpacked Size

12.2 kB

Total Files

6

Last publish

Collaborators

  • hwansolo
  • pwilliams-henselphelps