Conveyor MQ
A fast, robust and extensible distributed task/job queue for Node.js, powered by Redis.
Introduction
Conveyor MQ is a general purpose, distributed task/job queue for Node.js, powered by Redis.
Conveyor MQ implements a reliable queue which provides strong guarantees around the reliability of tasks in the event of network or server errors, for example. Conveyor MQ offers at-least-once and exactly-once task delivery through the use of error or stall task retries. Conveyor MQ is implemented using a highly efficient and performant, polling-free design making use of brpoplpush
from Redis.
; const queueName = 'my-queue';const redisConfig = host: '127.0.0.1' port: 6379 ; const manager = ;manager; const worker = ;
Features
- Task management
- Retry tasks on error or stall with customizable retry strategies
- Tasks which expire
- Task execution timeouts
- Delayed/Scheduled tasks
- Task progress
- Events
- Task, Queue and Worker events
- Concurrent worker processing
- Fast & efficient, polling-free design
- Highly extensible design with plugins
- Task rate limits
- Async/await/Promise APIs
- Robust
- Atomic operations with Redis transactions
- At-least-once task delivery
- High test code coverage
- High performance
- Minimised network overhead using Redis pipelining and multi commands
- Uses Redis Lua scripting for improved performance and atomicity
Table of Contents
- Introduction
- Features
- Table of Contents
- Quick Start Guide
- Overview
- API Reference
- Examples
- Roadmap
- Contributing
- License
Installation
npm:
npm install --save conveyor-mq
yarn:
yarn add conveyor-mq
You will also need Redis >=3.2
Quick Start Guide
; const redisConfig = host: '127.0.0.1' port: 6379 ;const queue = 'myQueue'; // Create a manager which is used to add tasks to the queue, and query various properties of a queue:const manager = ; // Add a task to the queue by calling manager.enqueueTask:const task = data: x: 1 y: 2 ;manager; // Schedule a task to be added to the queue later by calling manager.scheduleTask:const scheduledTask = data: x: 1 y: 2 enqueueAfter: '2020-05-03';manager; // Create a listener and subscribe to the task_complete event:const listener = ;listener; // Create a worker which will process tasks on the queue:const worker = ; // Create an orchestrator to monitor the queue for stalled tasks, and enqueue scheduled tasks:const orchestrator = ;
Overview
Tasks
The most basic implementation of a task is an object with a data
key:
const myTask = data: x: 1 y: 2 ;
Task life cycle
A task will move through various statuses throughout its life cycle within the queue:
scheduled
: The task has been scheduled to be enqueued at a later time. (Delayed/Scheduled task)
queued
: The task has been enqueued on the queue and is pending processing by a worker.
processing
: The task has picked up by a worker and is being processed.
success
: The task has been successfully processed by a worker.
failed
: The task has been unsuccessfully processed by a worker and has exhausted all error & stall retires.
Task status flow diagram:
-> success /scheduled -> queued -> processing^ ^ \|--- or ---|----------< Stall & error reties \ -> failed
*Note: success
and failed
statuses both represent the final outcome of a task, after all stall/error retrying has been attempted and exhausted.
Manager
A manager is responsible for enqueuing tasks, as well as querying various properties of a queue.
Create a manager by calling createManager
and passing a queue
and redisConfig
parameter.
Add a task to the queue by calling manager.enqueueTask
with an object { task: { data: x: 1, y: 2} }
.
For more information, see createManager, Enqueuing tasks
; // Create a manager instance:const manager = ; // Add a task to the queue:await manager; // Get a task:const task = await manager;/* task = { ... status: 'queued', data: { x: 1, y: 2, }, ... }*/
Enqueuing tasks
Tasks are added to a queue (enqueued) by using a manager's enqueueTask
function.
; const myTask = // A custom task id. If omitted, an id will be auto generated by manager.enqueueTask. id: 'my-custom-id' // Custom task data for processing: data: x: 1 y: 2 // The maximum number of times a task can be retried after due to an error: errorRetryLimit: 3 // The maximum number of times a task can be retried being after having stalled: stallRetryLimit: 3 // The maximum number of times a task can be retired at all (error + stall): retryLimit: 5 // The maximum time a task is allowed to execute for after which it will fail with a timeout error: executionTimeout: 5000 // Custom retry strategy: retryBackoff: strategy: 'linear' factor: 10 // Schedules a task to only be enqueued after this time: enqueueAfter: '2020-05-01' // Time after which a task will expire and fail if only picked up by a worker after the time: expiresAt: '2020-05-06' // Time after an acknowledgement after which a task will be considered stalled and re-enqueued by an orchestrator: stallTimeout: 5000 // Frequency at which a task is acknowledged by a worker when being processed: taskAcknowledgementInterval: 1000; const manager = ; const enqueuedTask = await manager;
Task retries
Conveyor MQ implements a number of different task retry mechanisms which can be controlled by various task properties.
errorRetryLimit
controls the maximum number of times a task is allowed to be retried after encountering an error whilst being processed.
// Create a task which can be retried on error a maximum of 2 times:const task = data: x: 1 y: 2 errorRetryLimit: 2 ;
errorRetries
is the number of times a task has been retried because of an error.
// See how many times a task has been retried due to an error:const task = await manager;/* task = { ... id: 'my-task-id', errorRetries: 2, ... }*/
stallRetryLimit
controls the maximum number of times a task is allowed to be retried after encountering becoming stalled whilst being processed.
// Create a task which can be retried on stall a maximum of 2 times:const task = data: x: 1 y: 2 stallRetryLimit: 2 ;
stallRetries
is the number of times a task has been retried after having stalled:
// See how many times a task has been retried due to an error:const task = await manager;/* task = { ... id: 'my-task-id', stallRetries: 2, ... }*/
retryLimit
controls the maximum number of times a task is allowed to be retried after either stalling or erroring whilst being processed.
// Create a task which can be retried on stall or error a maximum of 2 times:const task = data: x: 1 y: 2 retryLimit: 2 ;
retries
is the number of times a task has been retried in total (error + stall retries)
// See how many times a task has been retried in total:const task = await manager;/* task = { ... id: 'my-task-id', retries: 2, ... }*/
Worker
A worker is responsible for taking enqueued tasks off of the queue and processing them.
Create a worker by calling createWorker
with a queue
, redisConfig
and handler
parameter.
The handler
parameter should be a function which receives a task and is responsible for processing the task.
The handler should return a promise which should resolve if the task was successful, or reject if failed.
For more information, see createWorker and Processing tasks
; // Create a worker which will start monitoring the queue for tasks and process them:const worker = ;
Processing tasks
Tasks are processed on the queue by workers which can be created using createWorker
. Once created, a worker will begin monitoring a queue for tasks to process using an efficient, non-polling implementation making use of the brpoplpush
Redis command.
A worker can paused and resumed by calling worker.pause
and worker.start
respectively.
; const worker = ;
Orchestrator
An orchestrator is responsible for various queue maintenance operations including re-enqueueing stalled tasks, and enqueueing delayed/scheduled tasks.
Create an orchestrator by calling createOrchestrator
with a queue
and redisConfig
parameter. The orchestrator will then begin monitoring the queue for stalled tasks and re-enqueueing them if needed, as well as enqueueing scheduled tasks.
For more information, see createOrchestrator and Stalling tasks
; // Create an orchestrator:const orchestrator = ;
Stalled tasks
As part of the at-least-once task delivery strategy, Conveyor MQ implements stalled or stuck task checking and retrying.
While a worker
is busy processing a task, it will periodically acknowledge that it is still currently processing the task. The interval at which a processing task is acknowledged by a worker during processing at is controlled by Task.taskAcknowledgementInterval
and otherwise falls back to Worker.defaultTaskAcknowledgementInterval
.
A task is considered stalled if while it is being processed by a worker, the worker fails to acknowledge that it is currently working on the task. This situation occurs mainly when either a worker goes offline or crashes unexpectedly whilst processing a task, or if the Node event loop on the worker becomes blocked while processing a task.
The time since a task was last acknowledged after which it is considered stalled is controlled by Task.stallInterval
and otherwise falls back to Worker.defaultStallInterval
.
Note: An orchestrator is required to be running on the queue which will monitor and re-enqueue any stalled tasks. It is recommended to have only a single orchestrator run per queue to minimize Redis overhead, however multiple orchestrators can be run simultaneously.
Scheduled tasks
Tasks can be scheduled to be added to the queue at some future point in time. To schedule a task, include a enqueueAfter
property on a task and call manager.scheduleTask
:
const scheduledTask = data: x: 1 y: 2 enqueueAfter: '2020-05-15'; const task: enqueuedTask = await manager;/* enqueuedTask = { ... data: { x: 1, y: 2 }, status: 'scheduled', enqueueAfter: '2020-05-15', ... }*/
Note: An orchestrator is required to be running on the queue which will monitor and enqueue any scheduled tasks. It is recommended to have only a single orchestrator run per queue to minimize Redis overhead, however multiple orchestrators can be run simultaneously.
Listener
A listener is responsible for listening and subscribing to events. Use listener.on
to subscribe to various task, queue and worker related events.
For more information, see createListener and Event
; // Create a listener:const listener = ; // Listen for the 'task_complete' event:listener;
Plugins
Conveyor MQ is highly extensible through its plugin & hooks architecture. The createManager
, createWorker
and createOrchestrator
functions have an optional hooks
parameter through which various hook functions can be passed to hook into the various queue lifecycle actions.
Plugins can be created by implementing hook functions, and then calling registerPlugins
to register plugins.
Create a plugin
A plugin is a simple object with keys corresponding to hook names, and values of functions.
; // Create a simple plugin.const myPlugin = console console console console console console console; // Register the plugin and unpack new createManager and createWorker functions.const createManager createWorker = ; const queue = 'my-queue';const redisConfig = host: 'localhost' port: 6370 ; // Create a manager which is registered with myPlugin.const manager = ; // Create a worker which is registered with myPlugin.const manager = ;
See the Plugins example for more information.
Sharing Redis connections
Redis connections can be shared between a manager, worker and orchestrator as an optimization to reduce the total number of Redis connections used. This is particularly useful to do when your Redis server is hosted and priced based on the number of active connections, such as on Heroku or Compose.
The functions createManager
, createWorker
and createOrchestrator
each take an optional redisClient
parameter where a shared Redis client can be passed. The shared Redis client must first be configured with the custom Lua scripts by calling loadLuaScripts({ client })
.
See the Shared redis client example for more details.
Debugging
Conveyor MQ makes use of the debug package for debug logging.
Enable Conveyor MQ debug logging by setting the DEBUG
environment variable to conveyor-mq:*
and then executing your project/app in the same shell session:
export DEBUG=conveyor-mq:*node ./my-app.js
API Reference
Manager
- createManager
- manager.enqueueTask
- manager.enqueueTasks
- manager.scheduleTask
- manager.scheduleTasks
- manager.onTaskComplete
- manager.getTaskById
- manager.getTasksById
- manager.getTaskCounts
- manager.getWorkers
- manager.removeTaskById
- manager.pauseQueue
- manager.resumeQueue
- manager.setQueueRateLimit
- manager.getQueueRateLimit
- manager.destroyQueue
- manager.quit
Worker
createManager
Creates a manager instance which is responsible for adding tasks to the queue, as well as querying various properties of the queue. Returns a promise which resolves with a manager instance.
; const manager = ;
manager.enqueueTask
Enqueues a task on the queue.
Returns a promise which resolves with a TaskResponse
.
const task = id: 'my-custom-id' // A custom task id. If omitted, an id (uuid string) will be auto generated by manager.enqueueTask. data: x: 1 y: 2 // Custom task data. errorRetryLimit: 3 // The maximum number of times a task can be retried after due to an error. stallRetryLimit: 3 // The maximum number of times a task can be retried being after having stalled. retryLimit: 5 // The maximum number of times a task can be retired at all (error + stall). executionTimeout: 5000 // The maximum time a task is allowed to execute for after which it will fail with a timeout error. retryBackoff: strategy: 'linear' factor: 10 // Custom retry strategy. expiresAt: '2020-05-06' // Time after which a task will expire and fail if only picked up by a worker after the time. stallTimeout: 5000 // Time after an acknowledgement after which a task will be considered stalled and re-enqueued by an orchestrator. taskAcknowledgementInterval: 1000 // Frequency at which a task is acknowledged by a worker while being processed.; const task: enqueuedTask // The enqueued task is returned. onTaskComplete // A function which returns a promise that resolves once the task is complete.} = await manager;
manager.enqueueTasks
Enqueues multiple tasks in a single transaction.
Returns a promise which resolves with a list of TaskResponse
's.
const task1 = data: x: 1 ;const task2 = data: y: 2 ; const task: enqueuedTask1 task: enqueuedTask2 = await manager;
manager.scheduleTask
Schedules a task to be enqueued at a later time.
Returns a promise which resolves with a TaskResponse
.
const myScheduledTask = data: x: 1 y: 2 enqueueAfter: '2020-05-30'; const task // Scheduled task. onTaskComplete // A function which returns a promise that resolves on task complete.} = await manager;
manager.scheduleTasks
Schedules a task to be enqueued at a later time.
Returns a promise which resolves with a list of TaskResponse
's.
const myScheduledTask = data: x: 1 y: 2 enqueueAfter: '2020-05-30'; const task onTaskComplete = await manager;
manager.onTaskComplete
A function which takes a taskId
and returns a promise that resolves with the task once the task has completed.
const task = await manager;await manager;console;
manager.getTaskById
Gets a task from the queue. Returns a promise that resolves with the task from the queue.
const task = await manager;
manager.getTasksById
Gets multiple tasks from the queue in a transaction. Returns a promises that resolves with a list of tasks.
const tasks = await manager;
manager.getTaskCounts
Gets the count of tasks per status. Returns a promise that resolves with the counts of tasks per status.
const scheduledCount queuedCount processingCount successCount failedCount = await manager;
manager.getWorkers
Gets the workers connected to the queue. Returns a promise that resolves with a list of workers.
const workers = await manager;
manager.removeTaskById
Removes a given task from the queue by id. Returns a promise.
await manager;
manager.pauseQueue
Pauses a queue.
await manager;
manager.resumeQueue
Resumes a queue.
await manager;
manager.setQueueRateLimit
Sets the rate limit of a queue. (100 tasks every 60 seconds)
await manager;
manager.getQueueRateLimit
Gets the rate limit of a queue.
const rateLimit = await manager;// rateLimit = { points: 100, duration: 60 }
manager.destroyQueue
Destroys all queue data and data structures. Returns a promise.
await manager;
manager.quit
Quits a manager, disconnecting all redis connections and listeners. Returns a promise.
await manager;
Worker
createWorker
A worker is responsible for taking enqueued tasks off of the queue and processing them. Create a worker by calling createWorker
with at least a queue
, redisConfig
and handler
parameter.
The handler
parameter should be a function which receives a task and is responsible for processing the task.
The handler should return a promise which should resolve if the task was successful, or reject if failed.
; // Create a worker which will start monitoring the queue for tasks and process them:const worker = ;
All worker params:
; const worker = ;
Examples
Simple example
Express example
Scheduled task example
Child/sub tasks example
Task types example
Shared redis client example
Plugins example
Roadmap
- Improve documentation
- Task priorities
- Recurring tasks
- Task rate limiting
- Performance optimisations
- Child process workers
- Web UI
Contributing
See CONTRIBUTING.md
License
See LICENSE