itemsense-queue-connector

2.0.1 • Public • Published

itemsense-queue-connector

Creates and manages connections to ItemSense item, threshold, and health queues. This module can be used to create connectors in your app's main process, or it can be run as a child process.

Targets ES6+.

NOTE: v2.0.0 should not be used - the itemQueue<event> references were mistakenly changed.


Table of Contents


Install

npm i itemsense-queue-connector

Test

From your project directory...

cd ./node_modules/itemsense-queue-connector

Set the environmental variables HOSTNAME, USERNAME, PASSWORD, and (optional) PORT and QUEUE. Then run the npm test.

Linux/MacOS Example

HOST=192.168.1.64 USERNAME=username PASSWORD=password npm test

The test will run for 5 minutes before shutting itself down. The options being used for each of the connectors are slightly different, and will be output to the console for you to see (or you can look at the test.js file).


Usage

Create a Connector in the Same Process

// require module
const iqc = require('itemsense-queue-connector');
 
// create connector
const connector = iqc.createConnector();
 
// create options
let options = iqc.createOptions({
  hostname: '127.0.0.1',
  username: 'username',
  password: 'password'
});
 
// start the connector with the defined options
connector.start(options);
 
// listen for messages
connector.on(iqc.event.itemQueueMessage, message => {
  console.log(`received item queue message: ${message}`);
});
connector.on(iqc.event.healthQueueMessage, message => {
  console.log(`received health queue message: ${message}`);
});
connector.on(iqc.event.thresholdQueueMessage, message => {
  console.log(`received threshold queue message: ${message}`);
});
connector.on(iqc.event.itemQueueConnected, queue => {
  console.log(`connected to item queue [ ${queue} ]`);
});
connector.on(iqc.event.thresholdQueueConnected, queue => {
  console.log(`connected to threshold queue [ ${queue} ]`);
});
connector.on(iqc.event.healthQueueConnected, queue => {
  console.log(`connected to health queue [ ${queue} ]`);
});
connector.on('error', message => {
  console.log(`received error: ${message}`);
});
// you can use the event names directly - see Events section
connector.on('info', message => {
  console.log(`received info: ${message}`);
});
 
// ...
 
// shutdown the connector when finished, or when you need to change options
connector.shutdown();

Create a Connector as a Child Process

const fork = require('child_process').fork;
 
// require module
const iqc = require('itemsense-queue-connector');
 
// fork the itemsense-queue-connector module
const connector = fork('./node_modules/itemsense-queue-connector');
 
// create options
let options = iqc.createOptions({
  hostname: '127.0.0.1',
  username: 'username',
  password: 'password'
});
 
// start the connector with the defined options
connector.send({ command: 'start', options });
 
// listen for queue messages
connector.on('message', message => {
  switch (message.event) {
    case iqc.event.itemQueueMessage:
      console.log(`received item queue message: ${message.data}`);
      break;
    case iqc.event.thresholdQueueMessage:
      console.log(`received threshold queue message: ${message.data}`)
      break;
    case iqc.event.healthQueueMessage:
      console.log(`received health queue message: ${message.data}`);
      break;
    case iqc.event.itemQueueConnected:
      console.log(`connected to item queue [ ${message.data} ]`);
      break;
    case iqc.event.thresholdQueueConnected:
      console.log(`connected to threshold queue [ ${message.data} ]`);
      break;
    case iqc.event.healthQueueConnected:
      console.log(`connected to health queue [ ${message.data} ]`);
      break;
    case 'error':
      console.log(`received error: ${message.data}`);
      break;
    // you can use the event names directly - see Events section
    case: 'info':
      console.log(`received info: ${message.data}`);
      break;
  }
});
 
// ...
 
// shutdown the connector when finished, or when you need to change options
connector.send({ command: 'shutdown' });

Module Exports

Name Type Args Description
event Object n/a key:value pair of event names that a connector will emit
createOptions Function options: Object Creates and returns options that can be used to start a connector
createConnector Function None Creates and returns a new connector instance
const iqc = require('itemsense-queue-connector');
const connector = iqc.createConnector();
 
let options = iqc.createOptions({ hostname: '192.168.1.64' }); // creates and merges options
 
connector.start(options);

Connector Methods

Name Description
start Starts the connector with the given options
shutdown Shutdown the connector
const iqc = require('itemsense-queue-connector');
let options = iqc.createOptions(); // will create default options
 
const connector = iqc.createConnector();
 
connector.start(options);
// ...
connector.shutdown();

IMPORTANT: start should only be called when the connector has not been started, OR it has been shutdown.


Child Process Commands

Commands are used to control a connector that's running as a child process.

Command Description
start Start the connector to start with the provided options
shutdown Shutdown the connector
connector.send({ command: 'start', options: options });
// ...
connector.send({ command: 'shutdown' });

Options

Option Type Default Description
id String ItemSenseConnector the id of the connector instance - useful when multiple connectors exist
hostname String 127.0.0.1 the IP address or hostname of the ItemSense server
port Number 80 the port the ItemSense API is available on - used to configure queues
username String username of a user with a role of DataReader or Admin
password String the password for the username
connectionRetryInterval Number 5000 the time, in milliseconds, between connection attempts if a network error occurs
connectionHeartbeatInterval Number 30000 the time, in milliseconds, that the AMQP connection will be checked
itemQueueName String a queue name to connect to, if it does not exist on the server, a new queue will be created
itemQueueFilter Object {} used to configure a new queue for items
thresholdQueueName String a queue name to connect to, if it does not exist on the server, a new queue will be created
ignoreAbsent Boolean false if true, messages where toZone === 'ABSENT' will not be sent to listeners
maxObservationTimeDelta Number 0 the maximum delta, in milliseconds, that an observationTime can be from the current time


connectionRetryInterval

Min: 1000 ms
Max: None
Default: 30000 ms

If a connector's AMQP connection fails or is interrupted, the connector will try to reconnect to the server at this interval.


connectionHeartbeatInterval

Min: 1000 ms
Max: None
Default: 30000 ms

This is the rate that the AMQP connection is checked for connectivity. When two consecutive heartbeat checks fail, the connection is considered to be lost, at which point an ampqConnectionError event is sent to all listeners, and the connector will try to reconnect to the server.

IMPORTANT: If the connection between your app and the ItemSense server is unreliable, the connectionHeartbeatInterval may need to be higher than the default 30000 ms. This will allow more time for the AMQP connection to recover, and your connector will be able to receive any queue messages that were pushed onto the queue while the network connection was temporarily unavailable. Also, the connectionHeartbeatInterval should be, at most, 1/3 of the average time it takes for your ItemSense server to restart - see the ItemSense Queue Issue section for more details.


itemQueueName

Depending on how the connector is being used, when starting the connector, it may be beneficial to provide an existing queue name. Any messages that are waiting to be consumed in the queue will be consumed by the connector and events will be sent to listeners unless the observationTime delta exceeds maxObservationTimeDelta.


itemQueueFilter

The itemQueueFilter option is sent to the ItemSense server when creating a new queue. The filter can contain any of the following properties.

IMPORTANT: If the connector options you're using have itemQueueFilter as well as a itemQueueName defined, and the queue exists, then itemQueueFilter will not be used until the connector has to create a new queue.

Excerpt from the Impinj - ItemSense API Documentation

Property Type Description
fromFacility String The name of a facility to monitor for tag exit events
toFacility String The name of a facility to monitor for tag entry events
fromZone String The name of the zone to monitor for tag exit events
toZone String The name of the zone to monitor for tag entry events
epc String A hexadecimal string representing an EPC prefix of an item. Only the items with EPCs that start with this prefix will be returned
jobId String The ID of the job to monitor for tag events
distance Number The minimum distance a tag must move before a queue event (or message) is created
zoneTransitionsOnly Boolean Flag to only create queue events for tags that have transitioned between zones. Default value is true.

ignoreAbsent

The ignoreAbsent option can be useful when your ItemSense jobs have tag expiration on, but you're not interested in the ABSENT messages that get generated when a tag expires. For example, if you have a reader set up to see when a tag moves into a zone, but it is possible that the tag will not be read by any other reader before it passes by that same reader again, the tag would be ignored the next time it passes by unless tag expiration is on. Tag expiration can create bursts of heavy queue traffic so ignoreAbsent is more beneficial when the connector is run as a child process. This is because, if the connector is running in the same process as your app, all of the queue messages will be pushed into the Node event loop and have to be handled by the connector in your app (even just to ignore them), BUT if a child process gets the ABSENT message, it will not pass it onto your app's process for handling.


maxObservationTimeDelta

The maxObservationTimeDelta option, when set to a value greater than 0, is used to 'silence' queue messages that have an observationTime delta that is older than this value. This can be helpful in some cases because ItemSense will continue to push messages onto the queue for up to an hour while no clients are connected to the queue. This means there could possibly be thousands of outdated messages that will flood your listeners. If your listeners are running in another process, the flood of incoming messages may not be a problem, otherwise it could slow your entire application down (depends on what your listeners are doing with the messages).


Events

Event Msg Data Type Description
info String Helpful information that may want to be logged or stored elsewhere
itemQueueMessage JSON A parsed item queue message from ItemSense
itemQueueConnected String The name of the queue the connector just connected to
healthQueueMessage JSON A parsed health queue message from ItemSense
healthQueueConnected String The name of the queue the connector just connected to
amqpConnectionClosed String AMQP connection closed
error Error Error object

Same Process Event Handling

When a connector is in the same process, the event message will be the type defined in the Events section.

connector.on('itemQueueMessage', message => {
  typeof message; // object
  console.log(message);
  /*
 
  {"epc":"2017011308040A01102001F2","tagId":"","jobId":"a7a2a244-1444-4900-86e0-a5d47a91a849","fromZone":"ABSENT","fromFloor":null,"toZone":"ZONE_NAME","toFloor":null,"fromFacility":null,"toFacility":"FACILITY","fromX":null,"fromY":null,"toX":null,"toY":null,"observationTime":"2018-09-11T12:58:00.077Z"}
  
  */
});

Child Process Event Handling

Message Structure

{
  "event": "some-event",
  "data": "some-event-data"
}

When a connector sends an event from a child process, the message will be an Object that has an event property and a data property. The message.event is a String and the message.data will be the type defined in the Events section, depending on the event.

connector.on('message', message => {
  typeof message; // object
  switch (message.event) {
    case 'itemQueueMessage':
      typeof message.data; // object
      console.log(message.data);
    /*
 
    {"epc":"2017011308040A01102001F2","tagId":"","jobId":"a7a2a244-1444-4900-86e0-a5d47a91a849","fromZone":"ABSENT","fromFloor":null,"toZone":"ZONE_NAME","toFloor":null,"fromFacility":null,"toFacility":"FACILITY","fromX":null,"fromY":null,"toX":null,"toY":null,"observationTime":"2018-09-11T12:58:00.077Z"}
    
    */
  }
});

ItemSense Queue Issue

Up to the latest ItemSense release at this time there is an issue where the queues persist when the ItemSense server reboots, but if you connect to a queue that was created prior to the reboot you will not receive any messages.

In order for this module to provide a reliable solution for connecting to ItemSense queues, the connectors will create a new queue anytime an AMQP error occurs. Unfortunately, this means that the if there is a network interruption between the connector and the ItemSense server, but ItemSense is still generating queue messages, you will not reconnect to the same queue. This means queue messages could be missed when there are network interruptions. Since there is no way for a connector to tell when it connects to a 'dead' queue, it's safer to connect to a new queue.

If you have other means of determining whether your ItemSense server was rebooted or if the network connection was just interrupted, then you may want to listen for the amqpConnectionError event, shutdown your connector manually, and then start the connector with the same options that include the existing queue name. Otherwise just let the connector recover itself.

To Do

  • All caught up.

Package Sidebar

Install

npm i itemsense-queue-connector

Weekly Downloads

1

Version

2.0.1

License

MIT

Unpacked Size

49.5 kB

Total Files

7

Last publish

Collaborators

  • rledford