NestJS queue management with BullMQ and Redis. Simple and type-safe.
npm install @furkanogutcu/nest-queue
or
yarn add @furkanogutcu/nest-queue
- Easy integration with NestJS applications
- Support for Redis connection through
@furkanogutcu/nest-redis
- BullMQ-based queue management with a simplified API
- Global or modular registration options
- Configurable queue options with BullMQ compatibility
- Synchronous and asynchronous module configuration
- Type-safe interface with TypeScript
- Worker decorators for easy queue processing
- Bull Board UI integration for queue monitoring
import { Module } from '@nestjs/common';
import { QueueModule } from '@furkanogutcu/nest-queue';
import { AppController } from './app.controller';
import { AppService } from './app.service';
@Module({
imports: [
QueueModule.register({
redis: {
url: 'redis://localhost:6379',
},
config: {
// Set default options for all queues
defaultQueueOptions: {
prefix: 'myapp', // Set Redis key prefix for all queues
defaultJobOptions: {
attempts: 3,
removeOnComplete: 100,
removeOnFail: 200,
},
},
},
queues: [
{ name: 'emails' }, // Will use default options
{
name: 'notifications',
options: {
// Queue-specific options override defaults
prefix: 'notification-service', // Override the default prefix
defaultJobOptions: {
attempts: 5, // Override the default attempts
backoff: {
type: 'exponential',
delay: 1000,
},
},
},
},
],
isGlobal: true, // optional
}),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
By default, BullMQ creates Redis keys with the format bull:QUEUE_NAME
. You can customize this prefix in three ways:
Set a default prefix for all queues in the module:
QueueModule.register({
redis: {
url: 'redis://localhost:6379',
},
config: {
defaultQueueOptions: {
prefix: 'myapp', // All queues will use this prefix by default
},
},
queues: [
{ name: 'emails' }, // Will create Redis keys like "myapp:emails"
{ name: 'notifications' }, // Will create Redis keys like "myapp:notifications"
],
});
Set a prefix for specific queues:
QueueModule.register({
redis: {
url: 'redis://localhost:6379',
},
queues: [
{
name: 'emails',
options: {
prefix: 'mail-service', // This will create Redis keys like "mail-service:emails"
},
},
{
name: 'notifications',
options: {
prefix: 'notification-service', // This will create Redis keys like "notification-service:notifications"
defaultJobOptions: {
attempts: 3,
},
},
},
],
});
Set a prefix when creating a queue with the service:
const customQueue = this.queueService.create('metrics', {
prefix: 'analytics', // This will create Redis keys like "analytics:metrics"
});
You can define default options that will be applied to all queues in your application through the config.defaultQueueOptions
property. These defaults will be used unless overridden at the queue level.
This is useful when you want to apply consistent settings across your queues, such as Redis key prefix, retry attempts, job removal policies, or rate limiting.
QueueModule.register({
redis: {
url: 'redis://localhost:6379',
},
config: {
// Default options for ALL queues
defaultQueueOptions: {
prefix: 'myapp', // Default Redis key prefix for all queues
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'fixed',
delay: 5000,
},
removeOnComplete: 100,
removeOnFail: 200,
},
limiter: {
max: 50,
duration: 10000,
},
},
},
queues: [
{
name: 'low-priority',
// This queue will use all the default options
},
{
name: 'high-priority',
options: {
// Override specific defaults while keeping others
defaultJobOptions: {
attempts: 5,
priority: 2,
// backoff, removeOnComplete and removeOnFail from defaults are preserved
},
// limiter and prefix from defaults are preserved
},
},
{
name: 'custom-queue',
options: {
// Completely custom options that ignore all defaults
prefix: 'custom',
defaultJobOptions: {
attempts: 1,
removeOnComplete: false,
},
limiter: null, // Disable rate limiting for this queue
},
},
],
});
The defaultQueueOptions
accepts all standard BullMQ queue options including:
-
prefix
: Default Redis key prefix for all queues (replaces the old redisKeyPrefix option) -
defaultJobOptions
: Default settings for jobs added to the queues -
limiter
: Default rate limiting settings - Any other valid BullMQ QueueOptions
The QueueService
provides methods to interact with your queues. It allows you to add jobs, create new queues, and get existing ones.
import { Injectable } from '@nestjs/common';
import { QueueService } from '@furkanogutcu/nest-queue';
@Injectable()
export class AppService {
constructor(private readonly queueService: QueueService) {}
async addJob() {
// Get an existing queue
const emailQueue = this.queueService.get('emails');
// Add a job to the queue
await emailQueue.add('send-welcome-email', {
userId: 1,
email: 'user@example.com',
});
}
async createCustomQueue() {
// Create a new queue (or get existing one with the same name)
const customQueue = this.queueService.create('custom-queue', {
defaultJobOptions: {
attempts: 5,
removeOnComplete: true,
removeOnFail: false,
},
});
return customQueue;
}
async getAllQueues() {
// Get all registered queues
const queues = this.queueService.getAll();
return queues;
}
}
Workers are responsible for processing jobs from queues. This library provides decorators to easily create and register workers.
🔄 Dependency Injection Support: Workers are fully integrated with NestJS dependency injection. When you register a worker as a provider, it will be instantiated by NestJS with all its dependencies properly injected. The library uses a self-registration pattern where workers automatically register themselves in the internal registry during module initialization, ensuring single instance usage and proper DI support.
import { Worker, BaseWorker, Job } from '@furkanogutcu/nest-queue';
import { Injectable } from '@nestjs/common';
@Worker('emails', {
concurrency: 5, // Optional: Process 5 jobs at a time
limiter: {
// Optional: Rate limiting
max: 100, // Maximum number of jobs to process
duration: 1000, // Time window in milliseconds
},
})
@Injectable()
export class EmailWorker extends BaseWorker {
constructor(
private readonly emailService: EmailService,
private readonly userService: UserService,
) {
super();
}
async process(job: Job): Promise<any> {
// Process the job with injected dependencies
const { email, userId } = job.data;
// Use injected services
const user = await this.userService.findById(userId);
await this.emailService.sendWelcomeEmail(user.email, user.name);
console.log(`Email sent to ${email} for user ${userId}`);
// Return result if needed
return { status: 'sent', timestamp: new Date() };
}
}
import { Module } from '@nestjs/common';
import { QueueModule } from '@furkanogutcu/nest-queue';
import { EmailWorker } from './email.worker';
@Module({
imports: [
QueueModule.register({
redis: {
url: 'redis://localhost:6379',
},
queues: [{ name: 'emails' }],
}),
],
providers: [EmailWorker], // Register the worker
})
export class AppModule {}
import { Injectable } from '@nestjs/common';
import { QueueService } from '@furkanogutcu/nest-queue';
@Injectable()
export class EmailService {
constructor(private readonly queueService: QueueService) {}
async sendWelcomeEmail(userId: number, email: string): Promise<void> {
const emailQueue = this.queueService.get('emails');
await emailQueue.add(
'welcome-email',
{
userId,
email,
template: 'welcome',
},
{
// Job-specific options (override queue defaults)
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
// Add job to delayed queue (execute after 5 seconds)
delay: 5000,
},
);
}
}
For scenarios where you need to load configuration asynchronously:
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { QueueModule } from '@furkanogutcu/nest-queue';
@Module({
imports: [
ConfigModule.forRoot(),
QueueModule.registerAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (configService: ConfigService) => ({
redis: {
url: configService.get('REDIS_URL'),
},
config: {
defaultQueueOptions: {
prefix: configService.get('REDIS_KEY_PREFIX'),
defaultJobOptions: {
attempts: configService.get('QUEUE_DEFAULT_ATTEMPTS', 3),
removeOnComplete: configService.get('QUEUE_REMOVE_ON_COMPLETE', 100),
removeOnFail: configService.get('QUEUE_REMOVE_ON_FAIL', 200),
},
},
},
queues: [
{ name: 'emails' }, // Will use default options
{
name: 'notifications',
options: {
// Queue-specific options override defaults
defaultJobOptions: {
attempts: configService.get('NOTIFICATIONS_QUEUE_ATTEMPTS', 5),
},
},
},
],
isGlobal: true,
}),
}),
],
})
export class AppModule {}
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { RedisModule, RedisService } from '@furkanogutcu/nest-redis';
import { QueueModule } from '@furkanogutcu/nest-queue';
@Module({
imports: [
ConfigModule.forRoot(),
RedisModule.registerAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (configService: ConfigService) => ({
url: configService.get('REDIS_URL'),
}),
isGlobal: true,
}),
QueueModule.registerAsync({
inject: [RedisService],
useFactory: (redisService: RedisService) => ({
redis: redisService, // Pass the existing RedisService instance
queues: [{ name: 'emails' }, { name: 'notifications' }],
}),
isGlobal: true,
}),
],
})
export class AppModule {}
You can enable the Bull Board UI to monitor and manage your queues:
import { Module } from '@nestjs/common';
import { QueueModule } from '@furkanogutcu/nest-queue';
@Module({
imports: [
QueueModule.register({
redis: {
url: 'redis://localhost:6379',
},
queues: [{ name: 'emails' }, { name: 'notifications' }],
// Configure Bull Board UI
bullBoard: {
route: 'admin/queues', // Will be accessible at /admin/queues
},
}),
],
})
export class AppModule {}
Then in your main.ts file, set up the Bull Board UI:
import { NestFactory } from '@nestjs/core';
import { NestExpressApplication } from '@nestjs/platform-express';
import { QueueService } from '@furkanogutcu/nest-queue';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.create<NestExpressApplication>(AppModule);
// Get the QueueService to set up Bull Board
const queueService = app.get(QueueService);
queueService.bullBoard.setup(app);
await app.listen(3000);
}
bootstrap();
You can add authentication to the Bull Board UI to restrict access:
QueueModule.register({
redis: {
url: 'redis://localhost:6379',
},
queues: [{ name: 'emails' }, { name: 'notifications' }],
bullBoard: {
route: 'admin/queues',
// Add authentication
auth: {
username: 'admin',
password: 'securepassword',
},
},
});
When the auth
option is provided, the Bull Board UI will require Basic HTTP Authentication with the specified username and password.
- Node.js 18+
- npm or yarn
Clone the project
git clone https://github.com/furkanogutcu/nest-queue.git
Go to the project directory
cd nest-queue
Install dependencies
npm install
Start the server
npm run start:dev
This project is licensed under the MIT License.