Nerds Pledge Magnanimously

    TypeScript icon, indicating that this package has built-in type declarations

    1.6.0 • Public • Published


    Gitter NPM version Build Status Test Coverage Dependency Status Follow @FabrixApp on Twitter

    Task Handler for Fabrix

    • RabbitMQ


    $ npm install @fabrix/spool-tasks --save

    Configure Task Settings

    // config/tasks.ts
    export const tasks = {
       * Define worker profiles. Each worker of a given type listens for the
       * "tasks" defined in its profile below. The task names represent a Task
       * defined in Note that 'memoryBound' and 'cpuBound' are
       * arbitrary names.
      profiles: {
        memoryBound: [ 'hiMemoryTask1' ],
        cpuBound: [ 'VideoEncoder', 'hiCpuTask2' ]
       * Set RabbitMQ connection info
      connection: {
        exchange: process.env.TASKER_EXCHANGE, // optional, defaults to `tasker-work-x`
        workQueueName: process.env.TASKER_WORK_QUEUE, // optional, defaults to `tasker-work-q`
        interruptQueueName: process.env.TASKER_INTERRUPT_QUEUE, // optional, defaults to `tasker-interrupt-q`
         * The RabbitMQ connection information.
         * See:
         host: process.env.TASKER_RMQ_HOST,
         user: process.env.TASKER_RMQ_USER,
         pass: process.env.TASKER_RMQ_PASS,
         port: process.env.TASKER_RMQ_PORT,
         vhost: process.env.TASKER_RMQ_VHOST
         * Connection information could also be passed via uri
         uri: process.env.TASKER_RMQ_URI
          * Additional, optional connection options (default values shown)
          heartbeat: 30,
          timeout:, // this is the connection timeout (in milliseconds, per connection attempt), and there is no default
          failAFter: 60, // limits how long rabbot will attempt to connect (in seconds, across all connection attempts). Defaults to 60
          retryLimit: 3, // limits number of consecutive failed attempts
       * Set worker to subscribe to tasks in the matching profile (tasks.profiles).
       * If process.env.WORKER does not match a profile, the application will not subscribe to any tasks
      profile: process.env.WORKER

    Configure worker Environment

    // config/env/worker.ts
    import { TasksSpool } from '@fabrix/spool-tasks'
    export const worker = {
      main: {
         * Only load the spools needed by the workers
        spools: [

    If the worker profiles each require more granular environment configurations, create worker-cpuBound, worker-memoryBound, etc. environments.

    Include tasks in the app object

    Create a directory api/tasks. Any task definitions will be created as classes in this directory. Create api/tasks/index.ts to export all of the tasks. Include this directory in api/index.ts. Here is an example:

    // api/index.ts
    import * as tasks from './tasks'
    export { tasks }


    Define tasks in api.tasks. Tasks are run by a worker processes.

    // api/tasks/VideoEncoder.ts
    import { Task } from '@fabrix/spool-tasks'
    export class VideoEncoder extends Task {
       * "message" is the message from RabbitMQ, and contains all the information
       * the worker needs to do its job. By default, sets this.message and
       * @param message.body.videoFormat
       * @param message.body.videoBuffer
      constructor (app, message) {
        super(app, message)
       * Do work here. When the work is finished (the Promise is resolved), send
       * "ack" to the worker queue. You must override this method.
       * @return Promise
      run () {
        return doWork(this.message)
       * This is a listener which is invoked when the worker is interrupted (specifically,
       * an interrupt is a particular type of message that instructs this worker to
       * stop).
      interrupt () {
        this.log.warn('only encoded', this.currentIndex, 'out of', this.totalItems, 'frames')
       * Perform any necessary cleanup, close connections, etc. This method will be
       * invoked regardless of whether the worker completed successfully or not.
       * @return Promise
      finalize () {
        return doCleanup(this.message)

    To start a task, publish a message via the app.tasker interface:

    const taskId = app.tasker.publish('VideoEncoder', { vidoeUrl: 'http://...' }

    To interrupt a task in progress, use the taskId that is returned from app.tasker.publish(..):

    app.tasker.cancel('VideoEncoder', taskId)


    An example Procfile may look like:

    web: npm start
    memoryBound: NODE_ENV=worker WORKER=memoryBound npm start
    cpuBound: NODE_ENV=worker WORKER=cpuBound npm start


    npm i @fabrix/spool-tasks


    DownloadsWeekly Downloads






    Unpacked Size

    34.6 kB

    Total Files


    Last publish


    • scottbwyatt