TLDR: Ever wanted to run asynchronous functions without blocking the request (main thread)? This library allows you to run async functions in the background using Redis-backed queues with a lot of built-in features.
A TypeScript library for managing asynchronous operations using Redis-backed queues powered by BullMQ. This library provides a robust and type-safe way to handle background jobs with automatic retries, logging, and error handling.
- 🚀 Fully type-safe function registry
- 📝 Configurable logging support for Winston loggers
- 🔄 Redis-backed durable queue
- ⚡ Powered by BullMQ
- 🛠️ Customizable worker and job options
- 🔍 Built-in error handling and job monitoring
npm install async-operation-queue
or
bun install async-operation-queue
To use the async-operation-queue
library, you need to define your async function to execute in a registry.
First, create a function registry that contains the functions you want to execute asynchronously. Each function should return a Promise
.
async function asyncOperation1(arg1: string, arg2: number) {
return result;
}
const functionRegistry = {
asyncOperation1,
};
Next, create a operation queue instance with the function registry and redis.
import { AsyncOperationQueue } from "async-operation-queue";
import { Redis } from "ioredis";
const redisClient = new Redis("redis://localhost:6379", { maxRetriesPerRequest: null });
operationQueue = new AsyncOperationQueue(functionRegistry, redisClient);
Finally, enqueue a job with the function name and arguments.
operationQueue.push("asyncOperation1", ["arg1", 2]);
That's it! The job will be executed asynchronously in the background.
The push method is fully type-safe and will throw a compile-time error if the function name or arguments are incorrect.
You can shutdown the queue when you're done. (for example: when the server gracefully shuts down)
operationQueue.shutdown();
You can configure the operation queue with additional options.
import type { JobsOptions, WorkerOptions } from "async-operation-queue";
const workerOptions: WorkerOptions = { concurrency: 5, lockDuration: 30000 }
const jobOptions: JobsOptions = { attempts: 3, backoff: { type: "exponential", delay: 1000 } }
const winstonLogger = createLogger({
level: "info",
format: format.combine(format.timestamp(), format.json()),
transports: [new transports.Console()],
});
operationQueue = new AsyncOperationQueue(functionRegistry, redisClient, {
workerOptions,
jobOptions,
logger: winstonLogger,
queueName: "my-queue",
jobName: "my-job",
isSilent: false
});
For workerOptions and jobOptions, you can refer to the BullMQ documentation for more information.
- Worker options: https://api.docs.bullmq.io/interfaces/v4.WorkerOptions.html. I've choosen to omit the connection type from this for internal connection
- Job options: https://api.docs.bullmq.io/interfaces/v5.BaseJobOptions.html
- Queue options: https://api.docs.bullmq.io/interfaces/v5.QueueOptions.html