fluent-amqp

0.4.1 • Public • Published

fluent-amqp

Fluent syntax for amqp (Rabbit MQ) with (highland) streaming messages and automatic reconnect.

Install

npm install --save fluent-amqp

Use

The code examples here show how to implement the tutorials on https://www.rabbitmq.com using fluent-amqp

1 "Hello World"

https://www.rabbitmq.com/tutorials/tutorial-five-javascript.html

const amqp = require('fluent-amqp')
const q = 'hello'
const message = 'Hello World!'

// send
amqp('amqp://localhost')
  .queue(q, {durable: false})
  .publish(message)
  .then(() => console.log(` [x] Sent '${message}'`))

// recieve
amqp('amqp://localhost')
  .queue(q, {durable: false})
  .subscribe()
  .each(msg => console.log(` [x] Received '${msg.string()}'`))

...with JSON

const amqp = require('fluent-amqp')
const q = 'hello'
const message = {some: 'payload'}

// send
amqp('amqp://localhost')
  .queue(q, {durable: false})
  .publish(message)
  .then(() => console.log(` [x] Sent '${message}'`))

// recieve
amqp('amqp://localhost')
  .queue(q, {durable: false})
  .subscribe()
  .each(msg => console.log(' [x] Received', msg.json()))

2 Work queues

https://www.rabbitmq.com/tutorials/tutorial-two-javascript.html

const amqp = require('fluent-amqp')
const q = 'task_queue'
const message = 'Hello World!'

// send
amqp('amqp://localhost')
  .queue(q, {durable: true})
  .publish(message, {persistent: true})
  .then(() => console.log(` [x] Sent '${message}'`))

// worker
amqp('amqp://localhost')
  .queue(q, {durable: true})
  .subscribe({prefetch: 1, noAck: false})
  .each(msg => {
    console.log(` [x] Received '${msg.string()}'`)
    msg.ack()
  })

3 Publish/Subscribe

https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html

const amqp = require('fluent-amqp')
const ex = 'logs'
const exchangeType = 'fanout'
const message = 'Hello World!'

// publish
amqp('amqp://localhost')
  .exchange(ex, exchangeType, {durable: false})
  .publish(message)
  .then(() => console.log(` [x] Sent '${message}'`))

// subscribe
amqp('amqp://localhost')
  .exchange(ex, exchangeType, {durable: false})
  .queue(q, {durable: true})
  .subscribe()
  .each(msg => {
    console.log(` [x] Received '${msg.string()}'`)
    msg.ack()
  })

4 Routing

https://www.rabbitmq.com/tutorials/tutorial-four-javascript.html

const amqp = require('fluent-amqp')
const ex = 'direct_logs'
const exchangeType = 'direct'
const message = 'Hello World!'
const severity = 'info'

// send
amqp('amqp://localhost')
  .exchange(ex, exchangeType, {durable: false})
  .publish(message, severity)
  .then(() => console.log(` [x] Sent '${message}'`))

// recieve
amqp('amqp://localhost')
  .exchange(ex, exchangeType, {durable: false})
  .queue()
  .subscribe([severity])
  .each(msg => {
    console.log(` [x] Received [${msg.fields.routingKey}] '${msg.string()}'`)
    msg.ack()
  })

5 Topics

https://www.rabbitmq.com/tutorials/tutorial-five-javascript.html

const amqp = require('fluent-amqp')
const ex = 'topic_logs'
const exchangeType = 'topic'
const message = 'Hello World!'
const topic = 'anonymous.info'

// send
amqp('amqp://localhost')
  .exchange(ex, exchangeType, {durable: false})
  .publish(message, topic)
  .then(() => console.log(` [x] Sent '${message}'`))

// recieve
amqp('amqp://localhost')
  .exchange(ex, exchangeType, {durable: false})
  .queue()
  .subscribe(['anonymous.*'])
  .each(msg => {
    console.log(` [x] Received [${msg.fields.routingKey}] '${msg.string()}'`)
    msg.ack()
  })

6 RPC

https://www.rabbitmq.com/tutorials/tutorial-six-javascript.html

Coming later...

Connecting to a cluster

If you are using a cluster and want the client to iterate between the different servers as soon as one fails or becomes unresponsive, just add them as an array.

amqp(['amqp://host1', 'amqp://host2', 'amqp://host3'])

Package Sidebar

Install

npm i fluent-amqp

Weekly Downloads

3

Version

0.4.1

License

MIT

Last publish

Collaborators

  • johanobrink