node package manager


A promise-based AMQP API build on node-amqp

AMQP as Promised


A high-level promise-based API built on node-amqp, extended with functions for AMQP-based RPC.

npm install amqp-as-promised

conf = require './myconf.json' # see example conf below
amqpc = (require 'amqp-as-promised') conf.amqp

As of version 0.1.0, the following config parameters are accepted, although we also try to keep backwards compatibility with the older format.

Connection settings accepted by node-amqp. You need to at minimum specify either

  • host
  • vhost
  • login
  • password


  • url.

If true, means there will be no AMQP connection. Default: false

  • timeout: timeout in ms for rpc calls. Default: 1000ms
  • logLevel: sets the log level. Defaults to INFO. Possible levels are DEBUG, INFO, WARN, ERROR
    "connection": {
        "host": "",
        "vhost": "test",
        "login": "test",
        "password": "supersecret"
    "logLevel": "warn",
    "local": false,
    "rpc": {
        "timeout": 2000

Or with url:

    "connection": {
        "url": "amqp://myuser:supersecret@"
    "logLevel": "warn"

Examples'myexchange').then (ex) ->
    msg = {}
    msg.domain = domain
    ex.publish(''msg).then ->
        console.log 'published message!'

This is shorthand for binding and subscribing.

amqpc.bind 'myexchange''myqueue''mytopic.#'(msg, headers, del) ->
    console.log 'received message'msg

To bind an anonymous queue.

amqpc.bind 'myexchange', '', 'mytopic.#', (msg, headers, del) ->
    console.log 'received message', msg

Or even shorter

amqpc.bind 'myexchange''mytopic.#'(msg, headers, del) ->
    console.log 'received message'msg

To bind the queue to the exchange without subscribing to it, skip the last parameter (the subscription callback). This is essentially the same as queue.bind myexchange, 'mytopic', except the exchange and queue are specified by their names:

amqpc.bind 'myexchange''myqueue''mytopic.#'

To create an anomymous queue.

amqpc.queue().then (q) -> console.log 'my queue'q

to send a message to a service that honors the replyTo/correlationId contract:

amqpc.rpc('myexchange''routing.key'msg[headers][options]).then (response) ->
    console.log 'received message'response
  • headers is an optional parameter holding any custom headers to be passed on the RPC service.
  • options supports the following settings
    • timeout - the timeout in ms for this call

Note! In earlier versions the response was an array that included the response headers. As of version 0.1.0, this is no longer the case.

To set up a message consumer that automatically honors the replyTo/correlationId contract:

amqpc.serve 'myexchange''mytopic.#'(msg, headers, del) ->
    return { result: 'ok' }

The value returned from the handler will be sent back on the queue specified by the replyTo header, with the correlationId set.

If an exception is thrown by the handler, it will be propagated back to the client as an object with a error property containing the error message.

To rate limit the rpc calls to 5 concurrent, we use an options object to set {ack:true, prefetchCount:5}.

Notice that the message acking is handled by the rpc backend wrapper.

amqpc.serve 'myexchange''mytopic.#'{ack:trueprefetchCount:5}(msg, headers, del) ->
    return { result: 'ok' }
graceful = (opts) -> 'Shutting down'
    amqpc.shutdown().then ->
        process.exit 0
process.on 'SIGINT'graceful
process.on 'SIGTERM'graceful


A promise for an exchange. If opts is omitted, then passive:true is assumed.

A promise for a queue. If qname is omitted, "" is used. If opts is omitted, then exclusive:true is assumed if the name is empty, or passive:true if not.

Thus, amqpc.queue() will create a new exclusive, anonymous, queue that is automatically deleted on disconnect, while amqpc.queue('my-queue') will try to passively declare the existing queue my-queue.

See queue.* below.

Shorthand for

  1. If exchange is a string, then look up the existing exchange with that name.
  2. If queue is a string, then look up the existing queue with that name.
  3. Bind queue to exchange/topic.
  4. Subscribe callback to queue (optional).
  • exchange - an exchange object or a string with the name of an exchange
  • queue - a queue object or a string with the name of a queue
  • topic - a string with the topic name.
  • callback - a function that takes the arguments (msg, headers, deliveryinfo).

Will unbind all queues and unsubscribe all callbacks then gracefully shut down the socket connection.

Read only property that tells whether conf.local was true.

Publishes a message, returning a promise.

Binds the queue to the given exchange (object, or string). Will unbind if queue was already bound.

Unbinds the queue (if currently bound).

Subscribes the callback to this queue. Will unsubscribe any previous callback. If opts is omitted, defaults to ack: false, prefetchCount: 1

Unsubscribes current callback (if any).

To be used with queue.subscribe({ack:true}, callback). reject rejects the previous message and will requeue it if requeue is true.

Read only property with the queue name.

Perform an AMQP-based remote procedure call, and returns a promise for the return value:

  1. Creates an exlusive, anonymous, return queue if doesn't already exist.
  2. Publishes an RPC-style message on the given exchange, with the specified routingkey, headers and options. The replyTo and correlationId headers are set automatically.
  3. Waits for a reply on the return queue, and resolves the promise with the contents of the reply. If no reply is received before the timeout, the promise is instead rejected.
  • exchange - the name of an exchange, or an exchange object
  • routingkey
  • headers - AMQP headers to be sent with the message. See exchange.publish().
  • options - valid options are:
    • timeout - timeout in milliseconds. If none is specified, the default value specified when creating the client is used.
    • compress - set to true to use payload compression

Since 0.4.0

The RPC mechanism has a transparent payload gzip compression of JSON objects Buffer. When activated both request and response are compressed. To activate, the rpc client must ask for compression by setting the compress option.


amqpc.rpc('myexchange''routing.key'msg[headers]{compress:true}).then (response) ->
    console.log 'received message'response

Since 0.4.0

The RPC supports Q style progress which can be used to send partial responses.


amqpc.serve 'myexchange''routing.key',  (msg, headers, del, progress) ->
    ... do some stuff
    progress "it's almost done!!!"
    ... do more stuff
    return "here's the result"

Client side

amqpc.rpc('myexchange''routing.key'msg).progress (partial) ->
    console.log 'the server tries to tell me'partial
.then (response) ->
    console.log 'received message'response