Net Possibility Multiplier

    TypeScript icon, indicating that this package has built-in type declarations

    1.0.0 • Public • Published



    In order for integration tests to pass, pg-amqp-bridge-node must be running.


    This is a Node.js/PostgreSQL/RabbitMQ worker heavily influence by @benjie's graphile/worker. The goal is to have a similar API to graphile/workerwith a higher throughput enabled by the introduction of RabbitMQ.

    Why Graphile Worker?

    I like Graphile Worker because I can queue jobs via SQL triggers, which means:

    • I get to interact with external APIs asynchronously when using something like Postgraphile
    • My job queuing runs within PostgreSQL transactions, which means if a queue a job in a transaction that is rolled back, the transaction is not run

    However, Graphile Worker runs at about 700 jobs/sec, which is not enough for some use cases. So, just like Graphile Worker, we want to:

    • Store job state in PostgreSQL
    • Have PostgreSQL manage failure behavior, with exponential backoff retry
    • Queue jobs via a PostgreSQL function
    • Define jobs as asynchronous NodeJS functions
    • Have it all work seamlessly™

    But we want higher throughput!

    Why RabbitMQ?

    The bottleneck of the Graphile Worker is the SELECT FOR UPDATE .. SKIP LOCKEDfunctionality, which is a native PostgreSQL feature to ensure that two simultaneous queries selecting from the same table will not select each other's rows. This is necessary to prevent two concurrently running workers, both fetching jobs, from selecting the same job, and running one job twice.

    Although Graphile Worker uses LISTEN/NOTIFY to notify workers about new jobs for low job queue -> job start latency, PostgreSQL NOTIFY sends a message to all clients listening on a channel – which makes it unsuitable as a method for delivering jobs to be run. Instead, it’s only suitable for notifying workers that they should query for new jobs.

    Fortunately, RabbitMQ uses something called Round Robin dispatch, which ensures that each message is only sent to one consumer at a time – the message may be redelivered to other consumers if that consumer fails to acknowledge it, but it will never be delivered to two consumers at the same time.

    By using SubZero’s high throughput pg-amqp-bridge, we’re able to send messages to RabbitMQ from Postgres at a higher rate, and scale our NodeJS workers around them to achieve a much higher throughput!

    This library can achieve around 1400 jobs/sec with a concurrency of 1, and around 1800 jobs/sec with a greater concurrency on a single machine. I will update with more benchmarks as I have more data with more workers!


    yarn install assemble-worker
    import { run } from 'assemble-worker';
    async function main() {
      const worker = await run({
        databaseConnectionString: 'postgres://localhost:5432/my_db',
        // optionally include a pgPool parameter to re-use connections - will
        // override databaseConnectionString if present
        amqpConnectionString: 'amqp://localhost',
        // When jobs aren't queued to be immediately run, PostgreSQL must
        // be 'poked' in order to check if there are jobs that should be run
        // and send them to RabbitMQ
        pokeInterval: 10000, // the default
        taskList: {
          'simple-task': {
            // Concurrency is the number of consumers to spawn on this node instance
            // for this task
            // There is currently no way to limit the rate of task execution
            // across workers – that would require implementing token buckets
            // or maybe using Redis
            concurrency: 1,
            // If you function throws an error, it will be retried
            // Any return value is ignored
            task: async function(payload) {
              // do some work
      // In additon to adding jobs via SQL, you can do
      await worker.addJob({
        queueName: 'simple-task',
        payload: { someValue: 4 }
      // Now your working is running and can be stopped via:
      await worker.stop();

    Via SQL, queue a job by running:

    SELECT assemble_worker.add_job('simple-task', '{"someValue": 4}'::json);

    Or, if inside of a PostgreSQL function:

    PERFORM assemble_worker.add_job('simple-task', '{"someValue": 4}'::json);


    Assemble Worker will run and manage its own migrations on the database given by databaseConnectionString , and manage its own queue assertions in RabbitMQ for queue’s with keys in taskList.

    Follow the instructions on pg-amqp-bridge’s Github for deploying it – I’ll post a sample Kubernetes configuration here when I get a chance.


    Each release gets its own commit on master that includes the version bump and changelog updates. The version bump, changelog updates, commit, and tag are generated by standard-version:

    yarn release

    Other helpful options are:

    # Preview the changes
    yarn release --dry-run
    # Specify the version manually
    yarn release --release-as 1.5.0
    # or the semver version type to bump
    yarn release --release-as minor
    # Specify an alpha release
    yarn release --prerelease
    # or the pre-release type
    yarn release --prerelease alpha


    npm i assemble-worker

    DownloadsWeekly Downloads






    Unpacked Size

    135 kB

    Total Files


    Last publish


    • derrick-pr
    • quantumaashish
    • ben-pr-p
    • bchrobot
    • mgoldfield
    • priyachatwani