@flatfile/queue
This library provides a full-service queue abstraction layer for all systems in Flatfile. It provides a queue agnostic approach that allows us flexibility on the queue engine and effective testing.
Roadmap
- [ ] Add support for a Redis Queue
- [ ] Add callback methods to the worker for job failure, timeout, etc.
- [ ] Verify behavior of SIGINT / SIGTERM on graceful shutdown
- [ ] Ensure consistent queue depth alerting
Drivers
SyncDriver
This will execute jobs immediately. This is very helpful for testing job logic, as it does not require testing the internal operations of a specific queue engine.
GraphileDriver({ databaseUrl })
This provides a powerful Postgres backed queue implementation with ms latency.
BullMQDriver
(coming soon)
Add support for a redis-based BullMQ implementation
Writing a new Worker
import { Worker } from '@flatfile/queue'
import { Injectable } from '@nestjs/common'
@Injectable() // -> optional if using in Nest
export class MyWorker extends Worker<{ myProp: string }> {
/**
* Provide a custom worker name, otherwise will use class name
* @default MyWorker
*/
workerName = 'custom-name'
/**
* Determine the execution priority of this job, lowe numbers are higher priority
* @default 50
*/
priority = 1
/**
* Delay the execution of this job by a number of ms
* @default 0
*/
delay = 0
/**
* Set a number of milliseconds from enqueue where this job should be expired
* If the job is delayed unintentionally beyond this point it will be discarded
* @default 0
*/
expiration = 0
/**
* Provide a set of retry options for this job
* @default 6
*/
maxAttempts = 3
execute({ myProp }, attempt) {
// write logic here
}
}
Triggering a Job in Nest
When you're ready to send a new job, you just need to call addJob(worker, params, overrides)
and the magic will happen. If you're using SyncProvider
it will execute immediately, otherwise it'll execute as soon as the queue reaches that job.
@Injectable()
class MyService {
constructor(
private queueProvider: QueueProvider,
private myWorker: MyWorker
) {}
async myMethod() {
// enqueue something (await because maybe it'll happen synchronously)
await this.queueProvider.queue.addJob(
this.myWorker,
{ myProp },
{ delay: 500 }
)
}
}
Implementing a Nest Worker Job
import { NestFactory } from '@nestjs/core'
import { QueueProvider } from '@flatfile/queue'
import { AppModule } from './App.module'
async function bootstrap() {
const app = await NestFactory.createApplicationContext(AppModule)
const { queue, workers } = app.get(QueueProvider)
await queue.runWorkers(...workers)
}
bootstrap()
ts-node ./worker.ts
Implementing a Filtered Nest Worker
async function bootstrap() {
const app = await NestFactory.createApplicationContext(AppModule)
const { queue, workers } = app.get(QueueProvider)
// provider.workers magically detects any Workers exported from any module
// use a filter function to run specific workers on a give node
await queue.runWorkers(
...workers.filter((w) => process.argv.includes(w.name))
)
}
bootstrap()
ts-node ./worker.ts hard-job other-job