nsq.js
JavaScript NSQ client WIP.
Features
- actually written in js :p
- easier debugging via debug() instrumentation
- native json message support
- does not arbitrarily apply backoff on requeues
- disabling of auto-RDY support for manual control (high throughput etc)
- reconnection to dead nsqd nodes
- graceful close support
Installation
$ npm install nsq.js
About
Debugging
The DEBUG environment variable can be used to enable traces within the module, for example all nsq debug() calls except fo the framer:
$ DEBUG=nsq*,-nsq:framer node test
nsq:reader connect nsqd 0.0.0.0:4150 events/ingestion [5] +0ms
nsq:connection connect: 0.0.0.0:4150 V2 +0ms
nsq:connection command: IDENTIFY null +2ms
nsq:connection command: SUB ["events","ingestion"] +1ms
nsq:connection command: RDY [5] +0ms
nsq:connection connect: undefined:4150 V2 +0ms
nsq:connection command: IDENTIFY null +1ms
nsq:connection command: PUB ["events"] +0ms
nsq:reconnect reset backoff +0ms
nsq:reconnect reset backoff +1ms
nsq:connection response OK +3ms
nsq:connection response OK +0ms
nsq:connection response OK +0ms
Requeue backoff
The NSQD documentation recommends applying backoff when requeueing implying that the consumer is faulty, IMO this is a weird default, and the opposite of what we need so it's not applied in this client.
Example
var nsq = ; // subscribe var reader = nsq; reader; reader; reader; // publish var writer = nsq; writer;
API
nsq.reader(options)
Create a reader:
id
connection identifier (seeclient_id
in the spec)topic
topic namechannel
channel namensqd
array of nsqd addressesnsqlookupd
array of nsqlookupd addressesmaxAttempts
max attempts before discarding [Infinity]maxConnectionAttempts
max reconnection attempts [Infinity]maxInFlight
max messages distributed across connections [10]msgTimeout
session-specific msg timeoutpollInterval
nsqlookupd poll interval[10000]ready
whenfalse
auto-RDY maintenance will be disabledtrace
trace function
Events:
message
(msg) incoming messagediscard
(msg) discarded messageerror response
(err) response from nsqerror
(err)
reader#close([fn])
Close the reader's connection(s) and fire the optional [fn] when completed.
nsq.writer([options|address])
Create a writer. By default a connection attempt to 0.0.0.0:4150 will be made unless one of the following options are provided:
port
numberhost
namensqd
array of nsqd addressesnsqlookupd
array of nsqlookupd addresses
Events:
error response
(err) response from nsqerror
(err)
writer#publish(topic, message, [fn])
Publish the given message
to topic
where message
may be a string, buffer, or object. An array of messages
may be passed, in which case a MPUT is performed.
writer#close([fn])
Close the writer's connection(s) and fire the optional [fn] when completed.
Message
A single message.
Message#finish()
Mark message as complete.
Message#requeue([delay])
Re-queue the message immediately, or with the
given delay
in milliseconds, or a string such
as "5s", "10m" etc.
Message#touch()
Reset the message's timeout, increasing the length of time before NSQD considers it timed out.
Message#json()
Return parsed JSON object.
Tracing
The following jstrace probes are available:
connection:ready
ready count sentconnection:message
message receivedmessage:finish
finished a messagemessage:requeue
requeued a messagemessage:touch
touched a message
Running tests
nsqd --lookupd-tcp-address=0.0.0.0:4160 --broadcast-address=localhost &
nsqadmin --lookupd-http-address=0.0.0.0:4161 &
nsqlookupd --broadcast-address=localhost &
make test
License
MIT