Narcissistic Project Managers

    @servicebus/kafkabus

    2.0.5 • Public • Published

    @servicebus/kafkabus

    Build Status codecov

    Simple service bus for sending events between processes using kafka.

    @servicebus is a collection of components made for building distributed systems, and as such, some of the tools exist to help implement patterns such as CQRS (see more below).

    You can use servicebus send/listen for one to one messaging, or publish/subscribe for one to many messaging.

    Send / Listen

    Servicebus allows simple sending and recieving of messages in a 1:1 sender:listener configuration. The following two processes will send an event message called 'my.event' every second from process A to process B via Kafka.

    Calling done will commit the message offset, marking it as processed.

    Process A:

    var bus = require('servicebus').bus();
    bus.listen('my.event', function (event, message, done, fail) {
      console.log(event);
      done() 
    });

    Process B:

    var bus = require('servicebus').bus();
     
    setInterval(async function () {
      bus.send('my.event', { my: 'event' });
    }, 1000);

    Streaming

    Kafkabus uses Transactions by default, but they can be disabled in favor of sending messages as fast as possible. There is also a produceBatch function for publishing many messages at once.

    await bus.send('my.event', { my: 'event' }, { transaction: false });
    await bus.listen('my.event', { transaction: false }, function (event) {
      console.log(event)
    });

    Authentication

    TODO: Kafka auth config

    Publish / Subscribe

    Servicebus can also send messages from 1:N processes in a fan-out architecture. In this pattern, one sender publishes a message and any number of subscribers can receive. The pattern for usage looks very similar to send/listen, and under the hood, makes use of the same produce/consume with different options configured:

    Process A (can be run any number of times, all will receive the event):

    var bus = require('servicebus').bus();
    bus.subscribe('my.event', function (event, message, done, fail) {
      console.log(event);
      done()
    });

    Process B:

    var bus = require('servicebus').bus();
     
    setInterval(async function () {
      await bus.publish('my.event', { my: 'event' });
    }, 1000);

    Middleware

    Servicebus allows for middleware packages to enact behavior at the time a message is sent or received. They are very similar to connect middleware in their usage:

      bus.use(bus.package());
      bus.use(bus.correlate());

    Middleware may define one or two functions to modify incoming or outgoing messages:

      function logIncoming (queueName, message, options, next) {
        log('received ' + util.inspect(message));
        next(null, queueName, message, options);
      }
     
      function logOutgoing (queueName, message, options, next) {    
        log('sending ' + util.inspect(message));
        next(null, queueName, message, options);
      }
     
      return {
        handleIncoming: logIncoming,
        handleOutgoing: logOutgoing
      };

    handleIncoming pipelines behavior to be enacted on an incoming message. handleOutgoing pipelines behavior to be enacted on an outgoing message. To say that the behavior is pipelined is to say that each middleware is called in succession, allowing each to enact its behavior before the next. (in from protocol->servicebus->middleware 1->middleware 2->servicebus->user code)

    Official Middleware

    Correlate

    Correlate simply adds a .cid (Correlation Identity) property to any outgoing message that doesn't already have one. This is useful for following messages in logs across services.

    Correlate Middleware - View on GitHub

    Package

    Package repackages outgoing messages, encapsulating the original message as a .data property and adding additional properties for information like message type and datetime sent:

      // bus.publish('my:event', { my: 'event' });
      {
        my: 'event'
      };

    becomes

      {
        data: {
          my: 'event'
        }
        , datetime: 'Wed, 04 Sep 2013 19:31:11 GMT'
        , type: 'my:event'
      };

    Package Middleware - View on GitHub

    CQRS/ES and servicebus-register-handlers

    register-handlers is a package that allows you to register and entire folder of handlers at once. It also contains some syntactical sugar for use in CQRS systems, allowing you to easy create command handlers and event handlers by simply creating a folder of handlers, and providing register-handlers with that folder name.

    await registerHandlers({
      bus,
      path: path.resolve(process.cwd(), 'handlers')
    })

    If using ES Modules, set modules: true.

    await registerHandlers({
      bus,
      path: path.resolve(process.cwd(), 'handlers'),
      modules: true
    })

    Example Command Handler

    To use a command handler you must export a command string, and a function named listen.

    import log from 'llog'
    import { TodoList } from '../lib/models/TodoList'
    import { todoListRepository } from '../lib/repos/todoListRepository.mjs'
     
    export const command = 'list.item.add'
    export const ack = false // kafka has it's own acks - we don't need servicebus's
     
    log.info({ msg: `registering ${command}`, command })
     
    //
    // WARNING: You can not use an () => {} function here, because the context
    // that contains the bus will not be bound properly!
    //
    export const listen = async function ({ type, data, datetime }, done, fail) {
      try {
        const { bus } = this
        const { todoListId, item } = data
        const { todo, complete } = item
     
        if (!todoListId) throw new Error(`${command} - todoListId must be defined!`)
     
        // JSON logging
        // Great for filtering in Kibana
        log.info({ msg: `executing listen handler for ${command}`, command, todo, complete, type, datetime })
     
        // do something, if it's a model, probably with a repository pattern
        //  * I'm using sourced, and sourced-repo-mongo in this example
     
        let todoList
     
        try {
          todoList = await todoListRepository.getAsync(todoListId)
        } catch (err) {
          log.error('Error calling todoListRepository.getAsync')
          log.error(err)
          throw new Error({ name: 'ERROR_GET_ASYNC' })
        }
     
        if (!todoList) {
          todoList = new TodoList()
     
          todoList.initialize({
            id: todoListId
          })
        }
     
        todoList.addItem(item)
        todoList.on('item.added', () => {
          bus.publish('list.item.added', item)
          log.info({ msg: 'list.item.added', item })
          done()
        })
     
        await todoListRepository.commitAsync(todoList)
     
      } catch (err) {
        log.error(err)
        fail(`Command Handler Failed for ${command} - ${err}`)
      }
    }
     
    // meanwhile... in another service...
    //
    // bus.send('list.item.add', {
    //   item: {
    //     todo: "Make this",
    //     complete: false
    //   }
    // })
    //
    //

    Example Event Handler

    To use an event handler you must export an event string, and a function named subscribe.

    import log from 'llog'
     
    export const event = 'list.item.added'
    export const ack = false
     
    log.info({ msg: `registering ${event}`, event })
     
    export const subscribe = function ({ type, datetime, data }, done) {
      const { item } = data
      const { todo, complete } = item
     
      log.info({ msg: `executing listen handler for ${event}`, event, todo, complete, type, datetime })
      // do something
      done()
    }
     
    // meanwhile... in another service...
    //
    // bus.publish('list.item.added', item)
    //

    Keywords

    none

    Install

    npm i @servicebus/kafkabus

    DownloadsWeekly Downloads

    27

    Version

    2.0.5

    License

    none

    Unpacked Size

    67.6 kB

    Total Files

    40

    Last publish

    Collaborators

    • mateodelnorte
    • patrickleet