kinesiserapy

0.0.4 • Public • Published

Kinesiserapy: a AWS Kinesis event stream interface

This module provides an AWS Kinesis event stream consumer and emitter implementations.

Configuration

AWS credentials and a stream name must be provided to connect to Kinesis. The credentials are passed in an object that must have the following keys

  • AWS_ACCESS_KEY_ID
  • AWS_SECRET_ACCESS_KEY
  • AWS_REGION

The stream name is simply a string.

Emit an event

var kinesiserapy = require('kinesiserapy');

var kinesisAuth = {
    AWS_ACCESS_KEY_ID: "the key id",
    AWS_SECRET_ACCESS_KEY: "the access key",
    AWS_REGION: "the region"
};
var stream = "my-stream";
var emitter = new kinesiserapy.KEmitter(stream,kinesisAuth);

emitter.emit(
    {
        "key": "value"
    },
    function(err) {
        if (err) {
            console.error(err);
        } else {
            console.log('event emitted');
        }
    });

Consuming from a stream

The easiest way to consumer from a stream is to list the shards and spawn a new process to consume from each shard.

var cluster = require('cluster');
var kinesiserapy = require('kinesiserapy');

var kinesisAuth = {
    AWS_ACCESS_KEY_ID: "the key id",
    AWS_SECRET_ACCESS_KEY: "the access key",
    AWS_REGION: "the region"
};
var stream = "my-stream";

if (cluster.isMaster) {
	var kinfo = new kinesiserapy.KInfo(stream, kinesisAuth);
	kinfo.listShards(function(err, shardIds) {
		if (err) {
			console.error(err);
		} else {
			shardIds.forEach(function (shardId, index, shardIds) {
				cluster.fork({stream: stream, shardId: shardId});
			});
		}
	});
} else if (cluster.isWorker) {
	var kConsumer = new kinesiserapy.KConsumer(
		process.env.stream,
		process.env.shardId,
		lambda,
		function(err) {
			console.error(err)
		},
		kinesisAuth);
	kConsumer.consume();
}

function lambda(data) {
    console.log(data);
}

API

kinesiserapy exposes three objects:

  • KInfo to obtain information about the stream
    • listShards(callback) where callback is a fn(err, shardIds). The callback function is called with err null and shardIds being an array of string. If an error occurred, err is not null.
  • KConsumer to consume from a shard
    • consume() starts an asynchronous consumer that loops
  • KEmitter to emit an event
    • emit(obj, cb) emits the object obj and calls the callback cb after the event is emitted.
      The callback has the signature cb(err, data).
      • err is null unless an error occurred.
      • data contains two keys:
        • SequenceNumber: the sequence number of the emitted event
        • ShardId: the shard on which the event was emitted

Readme

Keywords

Package Sidebar

Install

npm i kinesiserapy

Weekly Downloads

1

Version

0.0.4

License

LGPL-3.0+

Last publish

Collaborators

  • pmoermans
  • mondraymond