moleculer-psql-queue

0.2.0 • Public • Published

Moleculer logo

Job queue mixin for graphile-worker.

Install

$ npm install moleculer-psql-queue

Configuration

  1. Start your PostgreSQL db.

  2. Create an empty db psql -U postgres -c 'CREATE DATABASE task_queue'.

    Replace task_queue with your db name

  3. Use graphile-worker CLI to init the schema for the jobs npx graphile-worker -c "postgres://postgres:postgres@localhost:5444/task_queue" --schema-only.

    Set your connection URL (more info: check PSQL docs) and replace task_queue with db name that you've defined in step 2)

Usage

Create queue worker service

const PsqlQueueService = require("moleculer-psql-queue");

broker.createService({
    name: "task-worker",

    mixins: [
        PsqlQueueService(
            "postgres://postgres:postgres@localhost:5444/task_queue",
            // Default opts
            {
                // Name of the property in service schema.
                schemaProperty: "queues",
                // Name of the method in Service to create jobs
                createJobMethodName: "createJob",
                // Name of the property in Service to produce jobs
                producerPropertyName: "$producer",
                // Name of the property in Service to consume jobs
                consumerPropertyName: "$consumer",
                // Name of the internal queue that's used to store the job handlers
                internalQueueName: "$queue",
                // Name of the property in Service settings to register job event handlers
                jobEventHandlersSettingsProperty: "jobEventHandlers",

                // Optional producer configs: More info: https://github.com/graphile/worker#workerutilsoptions
                producerOpts: {},
                // Optional worker configs. More info: https://github.com/graphile/worker#runneroptions
                queueOpts: {
                    concurrency: 5,
                    // Install signal handlers for graceful shutdown on SIGINT, SIGTERM, etc
                    noHandleSignals: false,
                    pollInterval: 1000,
                },
            }
        ),
    ],

    queues: {
        /**
         * @param {Object} payload Message payload
         * @param {import('graphile-worker').JobHelpers} helpers graphile-worker
         * More info about helpers: https://github.com/graphile/worker#creating-task-executors
         */
        "sample.task"(payload, helpers) {
            // if (Math.random() > 0.5) {
            this.logger.info('New "simple.task" received!', payload);
            return;
            // } else {
            //	throw new Error('Random "sample.task" error');
            // }
        },

        "another.task": {
            /**
             * @param {Object} payload Message payload
             * @param {import('graphile-worker').JobHelpers} helpers Postgres helpers
             * More info about helpers: https://github.com/graphile/worker#creating-task-executors
             */
            process(payload, helpers) {
                this.logger.info('New "another.task" job received!', payload);
            },
        },
    },
});

Customize worker logger

const PsqlQueueService = require("moleculer-psql-queue");

broker.createService({
    name: "task-worker",

    mixins: [
        PsqlQueueService(
            "postgres://postgres:postgres@localhost:5444/task_queue"
        ),
    ],

    methods: {
        /**
         * Replaces Default logger with custom one.
         * By default uses Moleculer logger instance
         * More info: https://github.com/graphile/worker#logger
         */
        initLogger() {
            /**
             * @param {String} level Log level
             * @param {String} message Message to log
             * @param {Object} meta  Additional metadata
             */
            return (level, message, meta) => {
                this.loggerQueue[level](message);
            };
        },
    },

    // Add Workers here
    queues: {},
});

Listen to queue events

const PsqlQueueService = require("moleculer-psql-queue");

broker.createService({
    name: "task-worker",

    mixins: [
        PsqlQueueService(
            "postgres://postgres:postgres@localhost:5444/task_queue"
        ),
    ],

    settings: {
        /**
         * @type {Record<String, Function>}
         * For a complete list of events see: https://github.com/graphile/worker#workerevents
         */
        jobEventHandlers: {
            /**
             * @param {{
             *  worker: import('graphile-worker').Worker,
             *  job: import('graphile-worker').Job
             * }}
             * @this {import('moleculer').Service}
             */
            "job:success": function ({ worker, job }) {
                this.logger.info(
                    `Worker ${worker.workerId} completed job ${job.id}`
                );
            },
        },
    },

    // Add Workers here
    queues: {},
});

Create Task

const PsqlQueueService = require("moleculer-psql-queue");

broker.createService({
    name: "pub",

    mixins: [
        PsqlQueueService(
            "postgres://postgres:postgres@localhost:5444/task_queue"
        ),
    ],

    /**
     * Service started lifecycle event handler
     * @this {import('moleculer').Service}
     */
    async started() {
        try {
            /**
             * @param {String} name Task name
             * @param {Object} payload Payload to pass to the task
             * @param {import('graphile-worker').TaskSpec?} opts
             */
            await this.createJob("sample.task", {
                id: 1,
                name: "simple.task",
            });
        } catch (error) {
            this.logger.error('Error creating "sample.task" job', error);
        }
    },
});

Advanced Usage

The graphile-worker lib provides some advanced features like administration functions. These functions can be used to manage the queue and can be accessed via the this.$producer property of the service.

const PsqlQueueService = require("moleculer-psql-queue");

broker.createService({
    name: "pub",

    mixins: [
        PsqlQueueService(
            "postgres://postgres:postgres@localhost:5444/task_queue"
        ),
    ],

    /**
     * Service started lifecycle event handler
     * @this {import('moleculer').Service}
     */
    async started() {
        // Add the job via raw graphile-worker client
        // For more info check the docs: https://github.com/graphile/worker#administration-functions
        this.$producer.addJob("sample.task", {
            id: 1,
            name: "simple.task",
        });
    },
});

Test

$ npm test

In development with watching

$ npm run ci

License

The project is available under the MIT license.

Contact

Copyright (c) 2016-2022 MoleculerJS

@moleculerjs @MoleculerJS

Readme

Keywords

Package Sidebar

Install

npm i moleculer-psql-queue

Weekly Downloads

0

Version

0.2.0

License

MIT

Unpacked Size

113 kB

Total Files

24

Last publish

Collaborators

  • icebob