agenda_queue_worker
TypeScript icon, indicating that this package has built-in type declarations

3.1.5 • Public • Published

Agenda queue worker (for nodejs)

A small wrapper around the agenda queue service to allow shared connections and jobs working on ES6 classes.

Agenda is a nodejs queue library that persists to MongoDB, see https://github.com/rschmukler/agenda for details.

This service adds the following:

  • ES6 classes as jobs
  • Shared connection pooling (for high frequency jobs)
  • Clustering for workers running across multiple processes
  • Typescript examples, using ASYNC / AWAIT and promises
  • Attachment of loggers

The source code and examples are in Typescript, but would work equally well in vanilla javascript.

npm install -s agenda_queue_worker

See the example directory for details of how to use it.

Usage

Each job type is its own ES6 class, implementing the QueueJob interface. These jobs are then registered to the worker.

The class needs one method, run(payload: any): Promise <void> {....}, which returns a promise to specify whether the job ran successfully or not.

Example Job class:

export class ExampleJob implements QueueJob
    constructor(private systemConfig: any, private connections: Map <string, any>, jobLogger: JobLogger) {
    }

    async run(data: any): Promise<void> {
        this.logger.info("I'm starting with " +  JSON.stringify(data))
        this.logger.debug("My system config is" + JSON.stringify(this.systemConfig))

        let dbClient = (<ExampleConnection> this.connections.get("ExampleConnection")).client
        let dataFromDb = await dbClient.getData()

        const random = Math.round((Math.random() * 10))
        if (random === 5) {
            throw new Error("Arrgggg, something went wrong...")
        }


        return new Promise <void> ((resolve, reject) => {
            setTimeout(() => {
                this.logger.info("Got data from db:" + JSON.stringify(dataFromDb))
                resolve()
            }, 1000)
        })
    }

Each job is provided with 3 parameters in the constructor.

systemConfig

This is the system config that is injected into the app to start with. Designed to be readonly, this is a way of passing global settings to your all your jobs (e.g. temp directories, AWS keys etc).

connections

The service is designed to work with primed connection pools if need be, saving each job the need to setup and tear down database connections. These are "initialised" on startup and reused by any job.

logger

This is the job logger, which will pass the log message on to all attached loggers. Has 3 levels, debug for low level debug info, info for basic job info, and error if you wish to log an error.

Shared connections / pooling

The service can work with primed connection pools, which are provided to each job. This is to facilitate high frequency jobs, avoiding the need to setup / teardown a database connection each time the jbo runs. An example of such a connection can be seen in the example folder.

Example worker start script

creates a worker thread per CPU
import * as os from "os"
import {SystemConfig} from "./SystemConfig";
import {ExampleJob} from "./ExampleJob";
import {WorkerConfig} from "../src/WorkerConfig";
import {QueueWorker} from "../src/QueueWorker";
import {ExampleConnectionConfig, ExampleConnection} from "./ExampleConnection";

//Worker config is all that is needed to setup any worker.  No need to prime the Mongo instance, all is installed at startup
let workerConfig: WorkerConfig = {
    mongo: {
        connectionString: "mongodb://127.0.0.1:27017/agenda-test",
        collectionName: "agenda-jobs"
    },
    numberOfWorkerThreads: os.cpus().length 
}

//This can be any config that you wish to provide to jobs, that you'd prefer not to hard code.  Passed in the constructor of every job.
let systemConfig: SystemConfig = {
    tempDirectory: "/var/tmp/queue_temp"
}

//This is the main queue worker class
let qw = new QueueWorker(workerConfig, systemConfig)

//This is an example of a connection config that you'd like to keep connected through various jobs
let exampleConnectionConfig: ExampleConnectionConfig = {
    connectionString: "localhost:1234"
}

//This method registers the connection, passing the name, type and config. This connection can be retrieved by name in any job
qw.registerConnectionDefinition({
    name: "ExampleConnection",
    type: ExampleConnection,
    config: exampleConnectionConfig
})

//This method registers a logger, passing the type and config.  You can attach as many loggers as you like, and create types following `AgendaLogger` interface.  Generally a good idea to have at least 1.
qw.registerLoggerDefinition({
    type: StandardOutLogger,
    config: null
})

//This is how any job type is registered.  An optional schedule can be specified.
qw.registerJobType({
    name: "ExampleJob",
    type: ExampleJob,
    timeout: 10 * 1000,
    schedule: {
        every: "3 seconds",
        jobData: {
            "this": "that"
        }
    }
})

//Starts the job service
qw.start()

Readme

Keywords

none

Package Sidebar

Install

npm i agenda_queue_worker

Weekly Downloads

5

Version

3.1.5

License

ISC

Unpacked Size

43.3 kB

Total Files

42

Last publish

Collaborators

  • paul.grimshaw