RabRPC
Yet another opinionated RPC library based on RabbitMQ (through rabbot)
Features
- Promise-based interface (thanks rabbot)
- Convention over configuration in exchange, queue, routingKeys naming
Implemented producer/consumer patterns:
- Request / Response
- Publish / Subscribe
- Send / Receive
Installation
npm install --save rabrpc# or yarn add rabrpc
Examples
Initialization
Important!
rpc.configure
should be called after binding handlers viarpc.respond
in consumer microservice and must be called before requesting data withrpc.request
in provider microservice
rpc.configure(config, [transformConfig = true])
config
- rabrpc or rabbot config objecttransformConfig
- ifconfig
is a rabbot settingtransformConfig
must be false, default true
rabbot json configuration
If you need more flexibility, you can pass a valid rabbot configuration into rpc.configure
The only requirement is name exchanges, queues and bindings with convention
const config = connection: user: 'guest' pass: 'guest' server: '127.0.0.1' // server: "127.0.0.1, 194.66.82.11", // server: ["127.0.0.1", "194.66.82.11"], port: 5672 timeout: 2000 vhost: '%2fmyhost' exchanges: name: 'config-ex.1' type: 'fanout' publishTimeout: 1000 name: 'config-ex.2' type: 'topic' alternate: 'alternate-ex.2' persistent: true name: 'dead-letter-ex.2' type: 'fanout' queues: name: 'config-q.1' limit: 100 queueLimit: 1000 name: 'config-q.2' subscribe: true deadLetter: 'dead-letter-ex.2' bindings: exchange: 'config-ex.1' target: 'config-q.1' keys: 'bob' 'fred' exchange: 'config-ex.2' target: 'config-q.2' keys: 'test1' rpc // transform config = false
Request / Response
Responder initialization
const rpc = // singleton const config = // uri connection: 'amqp://guest:guest@localhost:5672/?heartbeat=10' // or object passed to rabbot see https://github.com/arobson/rabbot#configuration-via-json // connection: {user: 'guest', pass: 'guest', server: 'localhost', ...} // respond configuration in consumer microservice // this config will create exchange(s), queue(s) and binding(s) // request configation in provider microservice only create exchange(s) res: // string or object or array of strings or objects serviceName: 'foo-service-name' // rabbot queue options, see https://github.com/arobson/rabbot#addqueue-queuename-options-connectionname- // subscribe: true is default messageTtl: 30000 limit: 10 // ...etc // somewhere in your microservice initialization cyclerpc // returns promise
Requester initialization
const rpc = // singleton const config = connection: '<URI string>' // see above // requesting resource configuration // this config will create only exchange(s) // respond configuration in consumer microservice will create queue(s) and binding(s) // req: 'foo-service-name' | ['foo-service-name', 'bar-service-name'] | {serviceName: 'foo-service-name'} | [{serviceName: 'foo-service-name'}, {serviceName: 'bar-service-name'}] req: 'foo-service-name' // somewhere in your microservice initialization cyclerpc // returns promise
Convention
Parmeter | Value | Example |
---|---|---|
exchange | req-res.serviceName |
req-res.foo-service-name |
queue | req-res.serviceName |
req-res.foo-service-name |
routingKey | serviceName |
foo-service-name |
messageType | version .serviceName .action |
v1.foo-service-name.someAction |
Response
rpc.respond(messageType, handler, [raw = false])
messageType
- full path for service action, e.g.'v1.images.resize'
or'v1.users.role.findAll'
where second part (images
,users
) is a serviceName specified in config (in rabbot using as type of message)handler
- function, which takespayload
ormessage
,responseActions
andmessageType
payload
ormessage
-message
ifraw
istrue
otherwisemessage.body
responseActions
- object with 3 functionssuccess
,fail
,error
messageType
- type of rabbot message (usefull when listening for types, which contain*
or#
)
raw
- iftrue
then first argument forhandler
will be rabbotmessage
insteadmessage.body
by default
Example
const rpc = // before initializationrpc // handler can aslo just return promise, or `.then`able or value and result will be replied with success status// exception or rejected promise will cause replying error (be sure throw `Error` with message) rpcrpc rpc // in your service initialization cyclerpc
Request
rpc.request(messageType, payload, [options], [raw = false])
messageType
- seerpc.respond
messageType argumentpayload
- payload data, which will passed into respond handler (see supported payload)options
- rabbot request options (will be merged with defaults:{replyTimeout: 10000}
)raw
- resolve rabbot replymessage
instead ofmessage.body
returns Promise
, which resolved with body
(or message
if raw
is true
)
body
ormessage
Example
const rpc = // request allowed only after initializationrpc
Publish / Subscribe
Subscriber initialization
const rpc = // singleton const config = connection: 'amqp://guest:guest@localhost:5672/?heartbeat=10' sub: // string or object or array of strings or objects serviceName: 'foo-service-name' limit: 10 // ...etc // somewhere in your microservice initialization cyclerpc // returns promise
Publisher initialization
const rpc = // singleton const config = connection: '<URI string>' // see above // pub: 'foo-service-name' | ['foo-service-name', 'bar-service-name'] | {serviceName: 'foo-service-name'} | [{serviceName: 'foo-service-name'}, {serviceName: 'bar-service-name'}] pub: 'foo-service-name' // somewhere in your microservice initialization cyclerpc // returns promise
Convention
Parmeter | Value | Example |
---|---|---|
exchange | pub-sub.serviceName |
pub-sub.foo-service-name |
queue | pub-sub.serviceName .uuid4 |
pub-sub.foo-service-name.110ec58a-a0f2-4ac4-8393-c866d813b8d1 |
routingKey | serviceName |
foo-service-name |
messageType | version .serviceName .action |
v1.foo-service-name.someAction |
Subscribe
rpc.subscribe(messageType, handler, [raw = false])
messageType
- full path for service action, e.g.'v1.images.archive'
or'v1.statistics.synchronize'
where second part (images
,statistics
) is a serviceName specified in config (in rabbot using as type of message)handler
- function, which takespayload
ormessage
,actions
andmessageType
payload
ormessage
-message
ifraw
istrue
otherwisemessage.body
messageType
- type of rabbot message (usefull when listening for types, which contain*
or#
)
raw
- iftrue
then first argument forhandler
will be rabbotmessage
insteadmessage.body
by default
Example
const rpc = // before initializationrpc // always auto ack // in your service initialization cyclerpc
Publish
rpc.publish(messageType, payload, [options])
messageType
- seerpc.respond
messageType argumentpayload
- payload data, which will passed into respond handler (see supported payload)options
- rabbot publish options (will be merged with defaults:{replyTimeout: 10000}
)
returns rabbot publish Promise
(see Rabbot Publish)
Example
const rpc = // publish allowed only after initializationrpc
Send / Receive
Receiver initialization
const rpc = // singleton const config = connection: 'amqp://guest:guest@localhost:5672/?heartbeat=10' recv: // string or object or array of strings or objects serviceName: 'foo-service-name' messageTtl: 30000 limit: 10 // ...etc // somewhere in your microservice initialization cyclerpc // returns promise
Sender initialization
const rpc = // singleton const config = connection: '<URI string>' // see above // send: 'foo-service-name' | ['foo-service-name', 'bar-service-name'] | {serviceName: 'foo-service-name'} | [{serviceName: 'foo-service-name'}, {serviceName: 'bar-service-name'}] send: 'foo-service-name' // somewhere in your microservice initialization cyclerpc // returns promise
Convention
Parmeter | Value | Example |
---|---|---|
exchange | send-recv.serviceName |
send-recv.foo-service-name |
queue | send-recv.serviceName |
send-recv.foo-service-name |
routingKey | serviceName |
foo-service-name |
messageType | version .serviceName .action |
v1.foo-service-name.someAction |
Receive
rpc.receive(messageType, handler, [raw = false])
messageType
- full path for service action, e.g.'v1.images.archive'
or'v1.statistics.synchronize'
where second part (images
,statistics
) is a serviceName specified in config (in rabbot using as type of message)handler
- function, which takespayload
,actions
andmessageType
payload
ormessage
-message
ifraw
istrue
otherwisemessage.body
actions
- object with 3 functionsack
,nack
,reject
(see Rabbot Message API)messageType
- type of rabbot message (usefull when listening for types, which contain*
or#
)
raw
- iftrue
then first argument forhandler
will be rabbotmessage
insteadmessage.body
by default
Example
const rpc = // before initializationrpc // handler can aslo just return promise, or `.then`able or value and message will be ack'ed on promise resolution// exception or rejected promise will cause nack'ing message rpc // auto ack rpc // in your service initialization cyclerpc
Send
rpc.send(messageType, payload, [options])
messageType
- seerpc.respond
messageType argumentpayload
- payload data, which will passed into respond handler (see supported payload)options
- rabbot publish options (will be merged with defaults:{replyTimeout: 10000}
)
returns rabbot publish Promise
(see Rabbot Publish)
Example
const rpc = // send allowed only after initializationrpc
Graceful shutdown
rpc.stopSubscription()
remove all handlers, unsubscribe from queues
Supported payload
string
number
null
- JSON serializable
Object
Buffer