Modest Queue
Description
Simple FIFO Redis queue that fails to a DLQ after the configured amount of retries. It builds upon the advice outlined here and currently supports:
- Configurable retry count
- Configurable visibility timeout
- Automatic retries before moving to the included DLQ
- Ability to replay messages that have landed in the DLQ
- Delayed messages
- Priority messages
- Configurable message retry strategies
- Typescript types!
Requirements
This library requires that the underlying redis instance is at least version 5. This is due to this library using zpopmin for simplicity. Without it semaphores/locking would have been needed which would have increased the complexity of the code.
Benchmark
I'm not sure I'd state this is a particularly robust benchmark - but on a 15" 2018 macbook pro with the following jest code:
describe('Benchmark', () => {
const sut = generateSut()
const testMessage = faker.random.words()
beforeEach(async () => {
await sut.initialize()
})
afterEach(async () => {
await sut.destroyQueue()
await sut.dispose()
})
it('Publishes and consumes 1000 messages', async () => {
const startTime = Date.now()
await Promise.all(
new Array(1000).fill(undefined).map(_ => sut.publish(testMessage))
)
const publishFinishedTime = Date.now()
const queueStatsPostPublish = await sut.queueStats()
expect(queueStatsPostPublish).toEqual({
queue: 1000,
dlq: 0,
inflight: 0,
delayed: 0
})
await Promise.all(
new Array(1000).fill(undefined).map(async _ => {
const message = await sut.pollForMessage()
await sut.messageSucceeded(message!)
})
)
const pollingFinishedTime = Date.now()
const queueStatsPostPolling = await sut.queueStats()
expect(queueStatsPostPolling).toEqual({
queue: 0,
dlq: 0,
inflight: 0,
delayed: 0
})
console.log(
`it took ${
(publishFinishedTime - startTime) / 1000
} seconds to publish 1000 messages`
)
console.log(
`it took ${
(pollingFinishedTime - publishFinishedTime) / 1000
} seconds to poll 1000 messages`
)
})
})
We get:
it took 0.145 seconds to publish 1000 messages
it took 0.228 seconds to poll 1000 messages
Setup
This library uses class-transformer, and as such must have the following line placed in the root of the application
import 'reflect-metadata'
Example
Simply create a queue like so:
import 'reflect-metadata'
import { ModestQueue } from './modest-queue'
/**
* Create a queue without passing in an ioredis connection, allowing the library
* to handle the initialization and disposing of the underlying ioredis connection
*/
const myQueue = new ModestQueue({
queueName: 'my-queue',
connectionString: 'redis://127.0.0.1:6379'
})
await myQueue.initialize()
// Place a value on the queue - this will likely be an object that you JSON.stringify(), but for more complex objects I suggest you make use of class-transformer
const mySimplyQueueMessage = 'This is a test'
await myQueue.publish(mySimplyQueueMessage)
// ...in a worker somewhere, pop this message off to consume it
const simpleMessage = await myQueue.pollForMessage()
// Deserialize using JSON.parse() or if you are using class-transformer its deserialize() and work on it
// If it was successful and you don't want to retry it simply...
await myQueue.messageSucceeded(simpleMessage)
// If it failed and it needs retrying...
await myQueue.messageFailed(simpleMessage)
// delayed messages are created like so - this will wait 1 second before being able to be polled
await myQueue.publish(mySimplyQueueMessage. {delay: 1000})
// priority messages are created like so. 0 is very low priority, 100 is very high priority. you can combine delayed with priority...I cannot fathom a use case for that, but it is respected.
await myQueue.publish(mySimplyQueueMessage. {priority: 75})
// Dispose of the queue - this doesn't destroy it in redis, it simply closes the redis connection and stops polling for inflight messages that might need requeueing
await myQueue.dispose()
If you are using multiple queues and want to reuse an existing ioredis connection you may, by doing the following:
import 'reflect-metadata'
import Redis from 'ioredis'
import { Connection, ModestQueue } from './modest-queue'
/**
* Create a queue by passing in an ioredis connection. It is up to you to dispose of this connection.
* It is still possible to access this connection via myQueue['connection']
* and so you could call quit on that:
* await myQueue['connection'].quit()
*/
const connection = new Redis('redis://127.0.0.1:6379')
const myQueue = new ModestQueue({
queueName: 'my-queue',
connection
})
/**
* This call does significantly less when you supply your own connection!
* However, it will start processing messages that are inflight that might require requeueing!
*/
await myQueue.initialize()
// ...
/**
* Dispose of the queue - this doesn't destroy it in redis, it simply stops polling
* for inflight messages that might need requeueing. It is up to you to close the supplied redis connection!
*/
await myQueue.dispose()
Replaying messages from the Dead Letter Queue
If you have some messages hit the DLQ and you would like to try them again - simply make the following call:
const amountReplayed = await myQueue.replayDLQ()
The above will attempt to replay 10 messages from the DLQ by default - this can be overridden by supplying a different integer.
Getting the queue statistics
You may get how many messages are currently being processed by workers (inflight), how many are queued and how many are in the dead letter queue by simply calling:
const queueStats = await myQueue.queueStats()
// { queue: 1, dlq: 1, inflight: 1, delayed: 1 }
Setting up retry strategies
This allows each retry to be delayed by a custom amount. Simply use the retryStrategy
option when setting up the queue to pass in the custom logic you desire. A very basic version might look like this:
/**
* A very simple retry strategy that increases the delay by a further second each retry
*/
export class SimpleRetryStrategy implements RetryStrategy {
calculateRetryDelay (currentAttempt: number): Milliseconds {
return currentAttempt * 1000
}
}
And to use it:
import 'reflect-metadata'
import Redis from 'ioredis'
import { Connection, ModestQueue } from './modest-queue'
const connection = new Redis('redis://127.0.0.1:6379')
const myQueue = new ModestQueue({
queueName: 'my-queue',
connection,
retryStrategy: new SimpleRetryStrategy()
})
If no retry strategy is specified, the default retry strategy is used.
The default the retry strategy exponentially increases the delay between retries from 5ms to 2.5 hrs for the first 10 attempts (if the default retry count is used). Each retry delay includes a jitter of up to 10% to avoid deadlock-related errors from continually blocking.
This approach was heavily influenced from this fantastic library here: https://bus.node-ts.com/guide/retry-strategies
To which a bus adapter, that uses modest-queue is located here: https://github.com/node-ts/bus-redis
Destroying/migrating away from modest-queue
If you decide this library is not right for you, you can cleanup the redis keys by calling:
await myQueue.destroyQueue()
Note: this comes with no guarantees, and I would suggest you review the code before running this. Consider yourself warned!
messageFailed
or messageSucceeded
?
Why do I need to call If you don't tell the queue that the message succeeded, it will wait for 30 seconds (unless you specified a different visibilityTimeout
) before placing the same message on the queue to be retried. If it failed, then you can let the queue know and it will queue it to be retried. The reason we have a visibilityTimeout
is to add resilience - what happens if a message is pollForMessage
'd off of the queue and the worker handling it crashes - we don't want that job lost!
Tests
The tests can be run from the command line by first creating a redis instance:
docker-compose up -d
followed by
yarn test:watch modest-queue.integration