Queue
This library acts as a wrapper around different queue implementations that we might end up using.
Currently implemented backends: SQS
API
The public API that each queue exposes is defined as in Queue. An implementation for a new backend can be created using a new class that extends the public API. The public API is:
-
Queue.create( QueueType : class, ...args )
args is passed to the constructor of theQueueType
. constructor( queueName : string, options : object )
- Available options are:
itemPoolSize : int
getName() : string
-
getItemPool() : ItemPool
( see item-pool.js ) -
receive( count : int ) : Promise
Attempt to receive at mostcount
items -
release( item : object, handled : boolean ) : Promise
Release the item, if nothandled
the item will not be deleted from the queue.handled
defaults tofalse
. -
touch( item : object, options : object ) : Promise
touch / ping a message to keep it in use. -
send( body : object ) : Promise
submit a new queue item connect() : Promise
-
start() : Promise
Start watching the queue for new items -
stop() : void
Stop watching the queue for new items, a final batch might still arrive after callingstop()
-
lock( item : object, options: object ) : Promise
Prevents a message from re-entering the queue. -
unlock( item : object ) : Promise
Release an earlier acquired lock. -
on( event : string, callback : function ) : void
Attach an eventhandler to the queue. -
message
event is triggered for each queue item that arrives. -
error
event is triggered for errors in the_eventLoop
or_lock
.
The public API will then call into the implementation specific methods through an internal API that each implementation should implement. The required private methods are:
-
_fetch( itemsToFetch : int ) : Promise
RequestitemsToFetch
items from the queue. Do not perform any mutations on the raw object before resolving them. -
_transform( item : object ) : object
This method will receive the items retrieved using_fetch
one by one, you can return altered objects from this method to change the queue items. -
_delete( item : object ) : Promise
Remove theitem
from the queue / mark as finished. This method should always receive the instance from the_transform
step, such that you could add hidden fields to identify the item. -
_touch( item : object, options: object ) : Promise
Touch the message to keep it from becoming visible again. -
_send( item : object ) : Promise
Additem
to the queue. -
_connect() : Promise
Start to connect with the backend. -
_lock() : Promise
Prevents a message from re-entering the queue. Default implementation usesqueue.touch
. -
_unlock() : Promise
Releases the lock and allows the item to re-enter the queue.
Libraries
-
BatchOperation Utility to batch
batchSize
items unlesstimeout
expires. The SQS implementation uses this to batchdelete
andsend
operations. - ItemPool Currently only a counter which ensure no more than the poolsize amount of items are in flight.
- sleep Returns a promise that resolves after a timeout.
Runtime configuration options
Configuration can be done through environment variables, options are:
-
BATCH_SIZE
defaults to10
-
QUEUE_POOL_SIZE
defaults to20
-
SQS_AWS_REGION
defaults toAWS_REGION
environment variable. -
SQS_FETCH_WAIT
defaults to20
seconds