Negatively Proportional Model

    pc-seneca-kafka-transport

    1.1.12 • Public • Published

    seneca-kafka-transport

    This plugin allows to send messages to kafka topic and listen for messages from kafka topics.

    Communication is async by design so no response is ever expected.

    Under the wood it uses https://github.com/oleksiyk/kafka to communicate with kafka

    Sending messages

    Messages will be sent as JSON string to the Kafka topic specified in the client.

    Only save the actual message will be sent with no seneca metadata!

    Receiving messages

    We define a Kafka topic we are interested and a groupId so Kafka can keep track of our service offset for the specified topic.

    We also define a pin to indicate the message pattern we are interested.

    This is because we do not expect a 1:1 mapping between kafka topic and seneca patterns.

    Every time a message is received we will always first (at most one pattern) acknowledge the receipt to kafka (which will increase our groupId offset) and then if the message content matches the pattern pinned the plugin will simply call the seneca.act with the messages.

    If maxTime

    Usage

    This is the most basic usage and will expect kafka on localhost on its default port 9092

    Consumer

    require('seneca')()
      .use('pc-seneca-kafka-transport')
      .add('cmd:register,type:user', (msg, reply) => {
         // TRIGGER BUSINESS LOGIC FOR NEW USER REGISTRATION
         reply()
       })
      .listen({
        type: 'kafka',
        pin: 'cmd:register,type:user',
        kafkaTopic: 'user'
      });

    A consumer will open a consumer connection to the topic specified and will consume any message that matches the pin

    Producer

    const client = require('seneca')()
      .use('pc-seneca-kafka-transport')
      .client({
        type: 'kafka',
        pin: 'cmd:register,type:user',
        kafkaTopic: 'user'
      });

    A producer will send any messages that matches the pin to the configured kafka topic

    Global kafka Options

    By default this plugin will try to connect to a kafka broker on localhost:9092. It is using under the wood https://github.com/oleksiyk/kafka so please refer there for details

    To set global kafka options you can pass an options object to the .use declaration that contains the following properties:

    {
      global:{},
      producer:{},
      consumer:{}
    }

    global can be any property that is common to both producer and consumer!

    require('seneca')()
      .use('pc-seneca-kafka-transport', {
        global:{
            connectionString: 'kafka-host1:9092,kafka-host2:9092'
        }
      });

    Transport Options

    Kafka producer requires the topic to connect to and the pin pattern of the message to send to this topic.

    You can pass any options supported by the no-kafka library.

    The producer obj will be merged to the kafka global options provided in the use. call

    const client = require('seneca')()
      .use('pc-seneca-kafka-transport')
      .client({
        type: 'kafka',
        pin: 'cmd:register,type:user',
        kafkaTopic: 'user',
        producer: {
     
        }
      });

    Kafka Consumer requires the topic to connect to and the pin pattern of the message it is listening to.

    You can pass any options supported by the no-kafka library.

    The consumer obj will be merged to the kafka global options provided in the use. call

    if discardIfOlderThan is set, whenever the kafka listener receives a message, if the message has a timestamp property and it is older than the configured value the message will be discarded.

    if discardIfOlderThan is not set or the message does not have a timestamp property all message will be processed normally.

    discardIfOlderThan uses https://www.npmjs.com/package/timestring so it supports any keywords supported by timestring.

    const server = require('seneca')()
      .use('pc-seneca-kafka-transport')
      .listen({
        type: 'kafka',
        pin: 'cmd:register,type:user',
        kafkaTopic: 'user',
        consumer: {
          groupId: 'userManager'
        },
        discardIfOlderThan: '5 minutes'
      });

    It's important to understand that every message received from specified topic will be always acknowledge to kafka and only if the message pattern matched the specified pin will trigger a corresponding eneca act

    This means that you can't have multiple listen with different pins listening on the same topic as they will interfere each other and you can lose messages.

    Instead you should define a wildcard pin that will handle all the topics and call the appropriate act to process the message

    If wildcard is not enough you can simply pass an array of pins. In this case use the prop pins instead of pin.

    Another option, if you have multiple listen calls, is to make sure each one define its own groupId

    Please note also, that if you intend to have multiple instance of the same consumer service for HA you need to make sure to set the groupId to avoid consuming the same message multiple times

    const server = require('seneca')()
      .use('pc-seneca-kafka-transport')
      .listen({
        type: 'kafka',
        pin: 'cmd:register,type:*',
        kafkaTopic: 'user',
        consumer: {
          groupId: 'userManager'
        }
      });

    Issues

    At the moment we are using no-kafka whose producer implementation has a bug and does not set the message timestamp We are bypassing the issue by adding the timestamp to the payload instead.

    Install

    npm i pc-seneca-kafka-transport

    DownloadsWeekly Downloads

    1

    Version

    1.1.12

    License

    ISC

    Unpacked Size

    12.1 kB

    Total Files

    3

    Last publish

    Collaborators

    • davide.talesco