Noteworthy Placebo Mongers

    @ambassify/queue

    1.1.4 • Public • Published

    Queue

    CircleCI

    This library acts as a wrapper around different queue implementations that we might end up using.

    Currently implemented backends: SQS

    API

    The public API that each queue exposes is defined as in Queue. An implementation for a new backend can be created using a new class that extends the public API. The public API is:

    • Queue.create( QueueType : class, ...args ) args is passed to the constructor of the QueueType.
    • constructor( queueName : string, options : object )
    • Available options are: itemPoolSize : int
    • getName() : string
    • getItemPool() : ItemPool ( see item-pool.js )
    • receive( count : int ) : Promise Attempt to receive at most count items
    • release( item : object, handled : boolean ) : Promise Release the item, if not handled the item will not be deleted from the queue. handled defaults to false.
    • touch( item : object, options : object ) : Promise touch / ping a message to keep it in use.
    • send( body : object ) : Promise submit a new queue item
    • connect() : Promise
    • start() : Promise Start watching the queue for new items
    • stop() : void Stop watching the queue for new items, a final batch might still arrive after calling stop()
    • lock( item : object, options: object ) : Promise Prevents a message from re-entering the queue.
    • unlock( item : object ) : Promise Release an earlier acquired lock.
    • on( event : string, callback : function ) : void Attach an eventhandler to the queue.
    • message event is triggered for each queue item that arrives.
    • error event is triggered for errors in the _eventLoop or _lock.

    The public API will then call into the implementation specific methods through an internal API that each implementation should implement. The required private methods are:

    • _fetch( itemsToFetch : int ) : Promise Request itemsToFetch items from the queue. Do not perform any mutations on the raw object before resolving them.
    • _transform( item : object ) : object This method will receive the items retrieved using _fetch one by one, you can return altered objects from this method to change the queue items.
    • _delete( item : object ) : Promise Remove the item from the queue / mark as finished. This method should always receive the instance from the _transform step, such that you could add hidden fields to identify the item.
    • _touch( item : object, options: object ) : Promise Touch the message to keep it from becoming visible again.
    • _send( item : object ) : Promise Add item to the queue.
    • _connect() : Promise Start to connect with the backend.
    • _lock() : Promise Prevents a message from re-entering the queue. Default implementation uses queue.touch.
    • _unlock() : Promise Releases the lock and allows the item to re-enter the queue.

    Libraries

    • BatchOperation Utility to batch batchSize items unless timeout expires. The SQS implementation uses this to batch delete and send operations.
    • ItemPool Currently only a counter which ensure no more than the poolsize amount of items are in flight.
    • sleep Returns a promise that resolves after a timeout.

    Runtime configuration options

    Configuration can be done through environment variables, options are:

    • BATCH_SIZE defaults to 10
    • QUEUE_POOL_SIZE defaults to 20
    • SQS_AWS_REGION defaults to AWS_REGION environment variable.
    • SQS_FETCH_WAIT defaults to 20 seconds

    Keywords

    none

    Install

    npm i @ambassify/queue

    DownloadsWeekly Downloads

    15

    Version

    1.1.4

    License

    MIT

    Unpacked Size

    42.9 kB

    Total Files

    23

    Last publish

    Collaborators

    • ambassify
    • jorgenevens
    • sitebase