@flatfile/queue
TypeScript icon, indicating that this package has built-in type declarations

1.1.0 • Public • Published

@flatfile/queue

This library provides a full-service queue abstraction layer for all systems in Flatfile. It provides a queue agnostic approach that allows us flexibility on the queue engine and effective testing.

Roadmap

  • [ ] Add support for a Redis Queue
  • [ ] Add callback methods to the worker for job failure, timeout, etc.
  • [ ] Verify behavior of SIGINT / SIGTERM on graceful shutdown
  • [ ] Ensure consistent queue depth alerting

Drivers

SyncDriver

This will execute jobs immediately. This is very helpful for testing job logic, as it does not require testing the internal operations of a specific queue engine.

GraphileDriver({ databaseUrl })

This provides a powerful Postgres backed queue implementation with ms latency.

BullMQDriver (coming soon)

Add support for a redis-based BullMQ implementation

Writing a new Worker

import { Worker } from '@flatfile/queue'
import { Injectable } from '@nestjs/common'

@Injectable() // -> optional if using in Nest
export class MyWorker extends Worker<{ myProp: string }> {
  /**
   * Provide a custom worker name, otherwise will use class name
   * @default MyWorker
   */
  workerName = 'custom-name'

  /**
   * Determine the execution priority of this job, lowe numbers are higher priority
   * @default 50
   */
  priority = 1

  /**
   * Delay the execution of this job by a number of ms
   * @default 0
   */
  delay = 0

  /**
   * Set a number of milliseconds from enqueue where this job should be expired
   * If the job is delayed unintentionally beyond this point it will be discarded
   * @default 0
   */
  expiration = 0

  /**
   * Provide a set of retry options for this job
   * @default 6
   */
  maxAttempts = 3

  execute({ myProp }, attempt) {
    // write logic here
  }
}

Triggering a Job in Nest

When you're ready to send a new job, you just need to call addJob(worker, params, overrides) and the magic will happen. If you're using SyncProvider it will execute immediately, otherwise it'll execute as soon as the queue reaches that job.

@Injectable()
class MyService {
  constructor(
    private queueProvider: QueueProvider,
    private myWorker: MyWorker
  ) {}

  async myMethod() {
    // enqueue something (await because maybe it'll happen synchronously)
    await this.queueProvider.queue.addJob(
      this.myWorker,
      { myProp },
      { delay: 500 }
    )
  }
}

Implementing a Nest Worker Job

import { NestFactory } from '@nestjs/core'
import { QueueProvider } from '@flatfile/queue'
import { AppModule } from './App.module'

async function bootstrap() {
  const app = await NestFactory.createApplicationContext(AppModule)
  const { queue, workers } = app.get(QueueProvider)

  await queue.runWorkers(...workers)
}
bootstrap()
ts-node ./worker.ts

Implementing a Filtered Nest Worker

async function bootstrap() {
  const app = await NestFactory.createApplicationContext(AppModule)
  const { queue, workers } = app.get(QueueProvider)

  // provider.workers magically detects any Workers exported from any module
  // use a filter function to run specific workers on a give node
  await queue.runWorkers(
    ...workers.filter((w) => process.argv.includes(w.name))
  )
}
bootstrap()
ts-node ./worker.ts hard-job other-job

Readme

Keywords

none

Package Sidebar

Install

npm i @flatfile/queue

Weekly Downloads

0

Version

1.1.0

License

ISC

Unpacked Size

1.97 MB

Total Files

29

Last publish

Collaborators

  • sambarrowclough
  • carlbrugger
  • hansjhoffman
  • haleymt
  • mmccooyyy
  • ahollenbeck
  • maerf0x0
  • rjhyde
  • mpoythress
  • flatderek
  • ashleygmulligan
  • alnoor
  • flatfilecolin
  • bigcountrycrane
  • flatfileinfra
  • bangarang
  • madmandrit
  • roberto-alcantara-ffile
  • mairechew
  • jmmander
  • srmotter
  • driscollrp
  • sarocu
  • dboskovic
  • brentkulwicki
  • nate.ferrero
  • jaredwalters