pc-seneca-kafka-transport

1.1.12 • Public • Published

seneca-kafka-transport

This plugin allows to send messages to kafka topic and listen for messages from kafka topics.

Communication is async by design so no response is ever expected.

Under the wood it uses https://github.com/oleksiyk/kafka to communicate with kafka

Sending messages

Messages will be sent as JSON string to the Kafka topic specified in the client.

Only save the actual message will be sent with no seneca metadata!

Receiving messages

We define a Kafka topic we are interested and a groupId so Kafka can keep track of our service offset for the specified topic.

We also define a pin to indicate the message pattern we are interested.

This is because we do not expect a 1:1 mapping between kafka topic and seneca patterns.

Every time a message is received we will always first (at most one pattern) acknowledge the receipt to kafka (which will increase our groupId offset) and then if the message content matches the pattern pinned the plugin will simply call the seneca.act with the messages.

If maxTime

Usage

This is the most basic usage and will expect kafka on localhost on its default port 9092

Consumer

require('seneca')()
  .use('pc-seneca-kafka-transport')
  .add('cmd:register,type:user', (msg, reply) => {
     // TRIGGER BUSINESS LOGIC FOR NEW USER REGISTRATION
     reply()
   })
  .listen({
    type: 'kafka',
    pin: 'cmd:register,type:user',
    kafkaTopic: 'user'
  });

A consumer will open a consumer connection to the topic specified and will consume any message that matches the pin

Producer

const client = require('seneca')()
  .use('pc-seneca-kafka-transport')
  .client({
    type: 'kafka',
    pin: 'cmd:register,type:user',
    kafkaTopic: 'user'
  });

A producer will send any messages that matches the pin to the configured kafka topic

Global kafka Options

By default this plugin will try to connect to a kafka broker on localhost:9092. It is using under the wood https://github.com/oleksiyk/kafka so please refer there for details

To set global kafka options you can pass an options object to the .use declaration that contains the following properties:

{
  global:{},
  producer:{},
  consumer:{}
}

global can be any property that is common to both producer and consumer!

require('seneca')()
  .use('pc-seneca-kafka-transport', {
    global:{
        connectionString: 'kafka-host1:9092,kafka-host2:9092'
    }
  });

Transport Options

Kafka producer requires the topic to connect to and the pin pattern of the message to send to this topic.

You can pass any options supported by the no-kafka library.

The producer obj will be merged to the kafka global options provided in the use. call

const client = require('seneca')()
  .use('pc-seneca-kafka-transport')
  .client({
    type: 'kafka',
    pin: 'cmd:register,type:user',
    kafkaTopic: 'user',
    producer: {

    }
  });

Kafka Consumer requires the topic to connect to and the pin pattern of the message it is listening to.

You can pass any options supported by the no-kafka library.

The consumer obj will be merged to the kafka global options provided in the use. call

if discardIfOlderThan is set, whenever the kafka listener receives a message, if the message has a timestamp property and it is older than the configured value the message will be discarded.

if discardIfOlderThan is not set or the message does not have a timestamp property all message will be processed normally.

discardIfOlderThan uses https://www.npmjs.com/package/timestring so it supports any keywords supported by timestring.

const server = require('seneca')()
  .use('pc-seneca-kafka-transport')
  .listen({
    type: 'kafka',
    pin: 'cmd:register,type:user',
    kafkaTopic: 'user',
    consumer: {
      groupId: 'userManager'
    },
    discardIfOlderThan: '5 minutes'
  });

It's important to understand that every message received from specified topic will be always acknowledge to kafka and only if the message pattern matched the specified pin will trigger a corresponding eneca act

This means that you can't have multiple listen with different pins listening on the same topic as they will interfere each other and you can lose messages.

Instead you should define a wildcard pin that will handle all the topics and call the appropriate act to process the message

If wildcard is not enough you can simply pass an array of pins. In this case use the prop pins instead of pin.

Another option, if you have multiple listen calls, is to make sure each one define its own groupId

Please note also, that if you intend to have multiple instance of the same consumer service for HA you need to make sure to set the groupId to avoid consuming the same message multiple times

const server = require('seneca')()
  .use('pc-seneca-kafka-transport')
  .listen({
    type: 'kafka',
    pin: 'cmd:register,type:*',
    kafkaTopic: 'user',
    consumer: {
      groupId: 'userManager'
    }
  });

Issues

At the moment we are using no-kafka whose producer implementation has a bug and does not set the message timestamp We are bypassing the issue by adding the timestamp to the payload instead.

Package Sidebar

Install

npm i pc-seneca-kafka-transport

Weekly Downloads

0

Version

1.1.12

License

ISC

Unpacked Size

12.1 kB

Total Files

3

Last publish

Collaborators

  • davide.talesco