redis-consumer
TypeScript icon, indicating that this package has built-in type declarations

1.1.3 • Public • Published

redis-consumer

Simple node package for easy use of Redis Streams functionality. This package allows for creation of a Redis consumer and producer.

Installation

Make sure you have NodeJs installed, then:

npm install redis-consumer

Usage

Basic example

import { RedisClient } from 'redis-consumer';

(async () => {
  // Client name must be unique per client
  const client = new RedisClient({
    groupName: 'mygroup',
    clientName: 'myclient1',
  });

  client.on('error', err => console.log('Redis Client Error', err));

  await client.connect();

  const consumer = client.createConsumer();

  // Redis stream to listen to and processable function
  const stream = {
    name: 'mystream',
    executable: (data, stream) => console.log('Redis message for stream ' + stream, data),
  };

  // Listen for new messages and process them according the
  // defined executable function
  consumer.listen(stream);
})();

When creating the Redis client, make sure to define a group and client name. Note, the client name must be unique in order for Redis to distinguish each individual client within the consumer group.

Class RedisClient

Constructor : new RedisClient(options)

The RedisClient is an extension of the original client from the node-redis package. All constructor options within the node-redis package are available to this class as well.

Example

// Connect client to Redis server with TLS enabled
const client = new RedisClient({
  socket: {
    port: 6380,
    host: 'localhost',
    tls: false,
  },
  password: 'mysupersecurepassword',
  groupName: 'mygroup',
  clientName: 'client1',
});

RedisClientOptions

Parameters Description Required
groupName Name of the consumer group Yes
clientName Name of the client, must be unique per client Yes

Other options can be found in the official node-redis github repository over here.

Methods

For all available methods, please look in the official node-redis repository over here.

createConsumer(options)

createProducer()

  • Returns a RedisProducer

streamExists(key)

  • key key name of the stream
  • Returns a boolean

groupExists(key)

  • key name of the stream
  • Returns a boolean

createGroup(key)

  • key name of the stream
  • Returns a string

Class RedisConsumer

Constructor : client.createConsumer(options)

The RedisConsumer is able to listen for incomming message in a stream. You can define an object or an array of objects in which you can define the name of the stream to listen for and which function should be executed for processing of the message. The consumer has a build-in retry mechanism which triggers an event retry-failed if all retries were unsuccessfull.

When a message is successfully processed (also in retry state), the consumer will send an acknowledgement signal to the Redis server. When the acknowlegdement is performed, the message will be removed from the pending list for that consumer group.

When the consumer starts, it will process all remaining pending messages at first before listening for new incomming messsage. However, you can overrule this behaviour by defining your own starting id.

Example

  const consumer = client.createConsumer({
    COUNT: 3,
    retries: 1,
    retryTime: ['5s'],
  });

  const streams = [
    {
      name: 'mystream',
      id: '>',
      executable: (data) => console.log('Only listen to new messages', data.message)
    },
    {
      name: 'myssecondstresm',
      executable: (data, stream) => console.log('Message for stream ' + stream, data.message)
    }
  ];

  consumer.client.on('retry-failed', (err, data) => {
    console.error('Failed processing message in stream ' + data.stream + '. Amount of retries: ' + data.retries, data.message);
  });

  consumer.client.on('process-error', (err, data) => {
    console.error('An unexpected error occured for stream ' + data.stream, err.message);
  });

  consumer.listen(streams);
});

RedisConsumerOptions

Parameters Description Required Default
COUNT Number of elements to read No 1
BLOCK Time in miliseconds to block while reading stream No 0
retries Amount of retries for processing messages No 3
retryTime Time interval between each retry No ['15s', '1m', '15m']

More information about the BLOCK and COUNT parameters can be found at the official docs of Redis.

The retryTime is an array of time strings. Seconds, minutes and hours are supported ('s', 'm', 'h'). When there are less items in the retryTime array than the amount of retries, the last time string item is used.

If you want to disable the retry mechanism, select a value of 0 for retries.

Methods

listen(streams)

addAckMessage(stream, id)

  • stream key name of the stream
  • id id of the message

Adds the message to the acknowlegdement list.

StreamToListen Object

{
  name: 'mystream',                                        // Keyname of the Redis stream
  executable: (message, stream) => console.log(message),   // Message processing function to be executed
  id: '>'                                                  // Optional, start listining from the message id. Defaults to '0-0'
}

Class RedisProducer

Constructor : client.createProducer()

The RedisProducer is used to add new messages to the Redis stream.

Example

const message = {
  firstName: 'John',
  lastName: 'Doe',
};

const producer = client.createProducer();
producer.add('mystream', message);

Methods

add(stream, message)

  • stream key name of the stream
  • message object/message to add to the stream

Events

Event name Description
process-error Event is triggered on the RedisConsumer when an error occurs during execution of the streams processing function. First argument is the error object, second argument is an object containing the stream, message and retries.
retry Event is triggered on the RedisConsumer just before a retry is attempted. A data object with properties stream, message, retries and timestamp is forwarded to the event.
retry-failed Event is triggered on the RedisConsumer when retry of the message has failed/ended. The first argument that is forwarded to the event is the error. The second arguments is a data object with properties stream, message, retries and timestamps.

Typescript

This package has full Typescript support. See the example below on how to define a processing function with typed message data.

import { RedisClient, StreamsToListen, StreamMessageReply } from 'redis-consumer';

const client = new RedisClient({
  groupName: 'mygroup',
  clientName: 'myclient1',
});
await client.connect();

const streams: StreamsToListen = [
  {
    name: 'mystream',
    executable: processing,
  },
];

const consumer = client.createConsumer();
consumer.listen(streams);

// Define interface of your message data
interface MyMessage {
  firstName: string;
  lastName: string;
}

function processing(data: StreamMessageReply<MyMessage>) {
  const message = data.message;
  const fullName = message.firstName + ' ' + message.lastName; // Full typing of message

  console.log('Hello, my name is ' + fullName);
}

Package Sidebar

Install

npm i redis-consumer

Weekly Downloads

0

Version

1.1.3

License

ISC

Unpacked Size

27.3 kB

Total Files

14

Last publish

Collaborators

  • tomasko