assemble-worker
TypeScript icon, indicating that this package has built-in type declarations

1.0.0 • Public • Published

assemble-worker

Note

In order for integration tests to pass, pg-amqp-bridge-node must be running.

Introduction

This is a Node.js/PostgreSQL/RabbitMQ worker heavily influence by @benjie's graphile/worker. The goal is to have a similar API to graphile/workerwith a higher throughput enabled by the introduction of RabbitMQ.

Why Graphile Worker?

I like Graphile Worker because I can queue jobs via SQL triggers, which means:

  • I get to interact with external APIs asynchronously when using something like Postgraphile
  • My job queuing runs within PostgreSQL transactions, which means if a queue a job in a transaction that is rolled back, the transaction is not run

However, Graphile Worker runs at about 700 jobs/sec, which is not enough for some use cases. So, just like Graphile Worker, we want to:

  • Store job state in PostgreSQL
  • Have PostgreSQL manage failure behavior, with exponential backoff retry
  • Queue jobs via a PostgreSQL function
  • Define jobs as asynchronous NodeJS functions
  • Have it all work seamlessly™

But we want higher throughput!

Why RabbitMQ?

The bottleneck of the Graphile Worker is the SELECT FOR UPDATE .. SKIP LOCKEDfunctionality, which is a native PostgreSQL feature to ensure that two simultaneous queries selecting from the same table will not select each other's rows. This is necessary to prevent two concurrently running workers, both fetching jobs, from selecting the same job, and running one job twice.

Although Graphile Worker uses LISTEN/NOTIFY to notify workers about new jobs for low job queue -> job start latency, PostgreSQL NOTIFY sends a message to all clients listening on a channel – which makes it unsuitable as a method for delivering jobs to be run. Instead, it’s only suitable for notifying workers that they should query for new jobs.

Fortunately, RabbitMQ uses something called Round Robin dispatch, which ensures that each message is only sent to one consumer at a time – the message may be redelivered to other consumers if that consumer fails to acknowledge it, but it will never be delivered to two consumers at the same time.

By using SubZero’s high throughput pg-amqp-bridge, we’re able to send messages to RabbitMQ from Postgres at a higher rate, and scale our NodeJS workers around them to achieve a much higher throughput!

This library can achieve around 1400 jobs/sec with a concurrency of 1, and around 1800 jobs/sec with a greater concurrency on a single machine. I will update with more benchmarks as I have more data with more workers!

Usage

yarn install assemble-worker
import { run } from 'assemble-worker';

async function main() {
  const worker = await run({
    databaseConnectionString: 'postgres://localhost:5432/my_db',
    // optionally include a pgPool parameter to re-use connections - will
    // override databaseConnectionString if present

    amqpConnectionString: 'amqp://localhost',

    // When jobs aren't queued to be immediately run, PostgreSQL must
    // be 'poked' in order to check if there are jobs that should be run
    // and send them to RabbitMQ
    pokeInterval: 10000, // the default

    taskList: {
      'simple-task': {
        // Concurrency is the number of consumers to spawn on this node instance
        // for this task
        // There is currently no way to limit the rate of task execution
        // across workers – that would require implementing token buckets
        // or maybe using Redis
        concurrency: 1,

        // If you function throws an error, it will be retried
        // Any return value is ignored
        task: async function(payload) {
          // do some work
        }
      }
    }
  });

  // In additon to adding jobs via SQL, you can do
  await worker.addJob({
    queueName: 'simple-task',
    payload: { someValue: 4 }
  });

  // Now your working is running and can be stopped via:
  await worker.stop();
}

Via SQL, queue a job by running:

SELECT assemble_worker.add_job('simple-task', '{"someValue": 4}'::json);

Or, if inside of a PostgreSQL function:

PERFORM assemble_worker.add_job('simple-task', '{"someValue": 4}'::json);

Installation

Assemble Worker will run and manage its own migrations on the database given by databaseConnectionString , and manage its own queue assertions in RabbitMQ for queue’s with keys in taskList.

Follow the instructions on pg-amqp-bridge’s Github for deploying it – I’ll post a sample Kubernetes configuration here when I get a chance.

Releases

Each release gets its own commit on master that includes the version bump and changelog updates. The version bump, changelog updates, commit, and tag are generated by standard-version:

yarn release

Other helpful options are:

# Preview the changes
yarn release --dry-run

# Specify the version manually
yarn release --release-as 1.5.0
# or the semver version type to bump
yarn release --release-as minor

# Specify an alpha release
yarn release --prerelease
# or the pre-release type
yarn release --prerelease alpha

Package Sidebar

Install

npm i assemble-worker

Weekly Downloads

1

Version

1.0.0

License

MIT

Unpacked Size

135 kB

Total Files

64

Last publish

Collaborators

  • ikehz
  • derrick-pr
  • quantumaashish
  • ben-pr-p
  • bchrobot
  • mgoldfield
  • priyachatwani
  • henryk1229