qottle

    1.0.5 • Public • Published

    qottle

    A queue that supports

    • promises/ async await
    • concurrency control
    • rate limiting and delay
    • priorities

    These examples all use promise syntax, but obviously you can substitute async/await if you prefer.

    More details on qottle can be found at https://ramblings.mcpher.com/gassnippets2/qottle/

    Installation

    yarn add qottle
    

    Usage

    const Qottle = require('qottle');
    
    

    You can add actions that need to be performed, and the queue concurrency will allow a number of them to be run concurrently. For example, single threaded queue might be initialized like this

    const q = new Qottle({
      concurrent: 1
    })
    

    And add items to it like this

    queue.add (action, options)
    

    For example - with concurrent set to 1, these items will be run 1 after another

    q.add (()=> doSomething())
    q.add (()=> doSomethingElse())
    

    Each entry is resolved as a promise, whether or not the original action was a promise -

    q.add (()=> console.log('Im running')).then(()=> console.log('ive said im starting'))
    q.add (()=> doAnAsyncThing()).then(({result}) => console.log('the result was', result))
    q.add (()=> console.log('its all over'))
    

    Or perhaps

    Promise.all([
      q.add (()=> console.log('Im running')).then(()=> console.log('ive said im starting'))
      q.add (()=> doAnAsyncThing())
      q.add (()=> console.log('its all over'))
    ]).then([,pack]=> {
      console.log('the result was', pack.result)
    })
    

    Skipping duplicates

    If you are running something like Pub/sub you often get requests to do something you already know you have to do, are not ready to ack them, but you don't want to add them to the queue. By providing a key for each entry, usually a digest of some kind of parameters you can selectively add things to the queue only if you dont already know about them.

    const q = new Qottle({
      skipDuplcates: true
    })
    

    then add entries to the queue with a key

    q.add (() => onlyDoOnce ({
      key: someId
    }))
    

    qottle will skip any add requests with duplicate keys. Normally a duplicate key only applies to items that are either active or in the queue - not completed items. You can set the option 'sticky' to mean you want qottle to keep a record of all keys it has processed in this instance.

    Rate limiting

    qottle can help avoid rate limit problems with APIs by applying various rate limit breaking avoidance techniques, such as limiting the number of calls over a given period. See the options section for how this works

    Options

    Most options can be applied when the queue is initialized, then individually overriden when an entry is added to the queue.

    option default purpose
    concurrent Infinity How many items can be run at once
    skipDuplicates false Enables duplicate skipping where items with the same key are not added to the queue more than once
    sticky false whether to keep a record for skipping duplcates of finished items as well as active or queued items
    immediate true whether to start the queue whenever something is in it, or to wait for it to be explicity started
    priority 100 the order to do things in. Lower values happen before higher values. Where priorities are the same, the order of insertion applies
    log false whether to log console info on starting and finishing items
    rateLimited false whether rate limiting management is required
    rateLimitPeriod 60000 how long to measure rate limiting over
    rateLimitDelay 0 how long to wait between starting each concurrent item
    rateLimitMax 1 how many items to allow to be outstading at once - this is an additional constraint to the concurrent value
    rateLimitMinWait 100 if a delay is required, qottle will calculate how much time is left in the rateLimitPeriod and wait that long before attempting to run. This is the minimum period to wait before trying again. Can be useful where the rate limited API time is slighty out of sync with your client
    catchError false normally a run error will be returned to the add function for you to catch. If catchError is set to true, then qottle will catch the error and pass it to the .then() of add(). See later for examples
    errorOnDuplicate false When adding to a queue,iIf skipDuplicates is enabled and a dup is detected the entry will resolve, with entry.skipped set to true. If you'd rather it treated a duplicate as an error set errorOnDuplicate to trues
    name qottle Can be useful if you have multiple queues and logging enabled - as the log includes the queue name

    Events

    In addition to the promise resolutions, events can also be triggered. For example

      q.on("finish", ({entry}) => {
        console.log(`${entry.key} is finished and ran for ${entry.runTime}ms`)
      });
    
    eventName triggered on
    empty queue is empty
    error there's been an error for an item
    finish an item has finished
    skip an item has been skipped as it had a duplicate key
    start an item has started
    startqueue the queue has started
    stopqueue the queue has been stopped - stopped queues still accept additions
    ratewait an entry is waiting for an opportunity to run but cant as it would violate a rate limit rule
    add an entry is added

    Event payload

    The payloads returned for each event can vary on the type but they are some or all of the following properties

    property content
    entry details of the execution and options for an item
    error details of an error
    waitTime how long a ratelimit constraint will wait before trying again
    result the final result returned from the action

    Entry object

    The entry object contains all the options applied plus various other info. It is passed as an argument to every action in the queue - for example

    q.add (({entry}) => {
      console.log('im executing something for entry', entry.id)
    })
    

    It also arrives as an argument to most events

    q.on('start', ({entry}) => {
      console.log('entry just started', entry.id)
    })
    

    and as part of the completed result of a queue item

    q.add (({entry}) => {
      console.log('im executing something for entry', entry.id)
    }).then(({entry, result})=> {
      console.log('entry', entry.id,'gave me this result', result)
    })
    

    or

    q.on('finished', ({entry, result}) => {
      console.log('entry', entry.id,'gave me this result', result)
    })
    

    Most methods and events return an Entry object that looks like this.

    property content
    ...options all the options mentioned earlier
    status 'finish', 'error', 'queued', 'active' , 'skipped'
    queuedAt timestamp when first added
    startedAt timestamp when started to run
    finishedAt timestamp when finished run
    elapsed ms from time queued to time finished
    runTime ms it spent actually running
    id a unique id
    waitTime total ms it spent waiting to run because of a ratelimit constraint
    waitStartedAt if forced to wait because of a rate limit constraint this is when it started waiting
    waitFinishedAt if forced to wait because of a rate limit constraint this is when it finished waiting and started running
    waitUntil if entry is in process of waiting, this is when it will try again
    attempts how many times it tried to start
    action the function it ran
    skipped whether the entry was skipped. Skipped items resolve successfully, but don't run and have this property set to true
    error the error if one was thrown. Most useful with catchError: true
    context you can pass this as an argument when adding to the queue and pick it up later- ignored by qottle

    methods

    The are no 'property gets'. All are methods. Where the return value is 'self', you can chain methods.

    property content returns
    add (action : function , options : object) add to the queue { result: any, error: Error, entry: Entry}
    stopQueue() stop the queue running anything else self
    startQueue() start the queue - items can be added to the queue whether it's started or not self
    isStarted() check if the queue is started boolean
    clear() clear any unstarted queued items self
    clearSticky() clear all items from the sticky history self
    clearRateLimitHistory() clear rate limit history to avoid any outstanding constraint self
    remove(entry: Entry) remove an entry from the queued items - pass the entry object over entry
    clearListeners(eventName: string} clear all the listeners for a given event name. 'all' as the eventName will clear all listeners self
    on(eventName: string, listener: function) add a listener to be executed when a given eventName triggers Listener
    off(listener: Listener) pass over the Listener returned from .on to remove a listener Listener

    Rate limiting

    A key capability of this queue is to deal with rate limiting. A queue can be set up to throttle calls - often to deal with APIS with rate limits. This is over and above the constraint of 'concurrent' which manages how many queue items can be executed at the same time.

    Let's say you have an API that allows 10 calls per minute, and you don't mind if the they all run simultaneously.

    const q = new qottle({
      rateLimitPeriod: 10 * 60 * 100,
      rateLimitMax: 10
    })
    

    Then you can simply add requests to the queue and the queue will submit them according to that rule.

    Promise.all ([
      q.add (()=>getSome())
      q.add (()=>getSomeMore())
    ]).then (results=> {
      console.log('all the results', results)
    })
    

    Another API constraint might be the time between individual calls limited to some value like 20 seconds, and only 1 request being processed at a time.

    const q = new qottle({
      rateLimitPeriod: 10 * 60 * 100,
      rateLimitMax: 10,
      concurrent: 1,
      rateLimitDelay: 20 * 1000
    })
    

    Then you can simply add requests to the queue and the queue will submit them according to that rule.

    Promise.all ([
      q.add (()=>getSome())
      q.add (()=>getSomeMore())
    ]).then (results=> {
      console.log('all the results', results)
    })
    

    Error handling

    By default you'll deal with errors like this (all queue items are converted to async)

    q.add (()=>something())
    .then (({result, entry})=>{ ... the result ... })
    .catch(({error, entry})=> { ... the error ...})
    

    However, you can ask qottle to catch thrown errors.

    const q = new qottle({
      catchError: true
    })
    

    Then any errors will be resolved (rather than rejected)

    q.add (()=>something())
    .then (({result, entry, error})=>{ ... the result ... or check for error })
    

    Error event

    Irrespective of the catchError options, q.on('error', ...) will always fire on an error, and q.on('finish',...) will only trigger on a successful finish.

    q.on('error', ({entry,error})=> {
      ... will always trigger on an error
    })
    
    q.on('finish', ({entry,result})=> {
      ... will not trigger on an error
    })
    

    Examples

    See the test.js for many examples

    Recipes

    Here's a couple of more complicated but useful examples

    Polling

    You can use qottle to manage endless, or constrained polling. In this scenario, we want to poll an aynch API a maximum of 100 times, but no more than 5 times every 10 seconds, and only 1 call at a time.

    set up the queue

      const q = new Qottle({
        concurrent: 1,
        rateLimited: true,
        rateLimitPeriod: 10 * 1000,
        rateLimitMax: 5
      });
    

    the number of iterations (or Infinite for ever)

      const ITERATIONS = 100
    

    This is where you'd make the async api call - For simulation as here, qottle has a handy timer you can use for timeouts as promises which just waits for a while then returns how long it waited. You could handle the results, or errors here, or use the qottle finish and error events.

      const action = () => q.timer(Math.floor(Math.random() * 2000));
    

    handle the results of each poll - you could do this on item resolution of q.add, or by using the finish event - as here, where polling results are just being added to an array

      const results = [];
      q.on("finish", ({ entry, result }) => {
        results.push({
          entry,
          result,
        });
      });
    
    

    create a recursive function for adding stuff to the queue - this one should work for most situations. It will finally resolve when the number of iterations are reached.

      const adder = ({ entry, result } = { entry: { key: 0 } }) =>
        q
          .add(() => action(), { key: entry.key + 1 })
          .then(({ entry, result }) =>
            entry.key < ITERATIONS
              ? adder({ entry, result })
              : Promise.resolve({ entry, result })
          );
    

    kick it off - for testing, at the end, I'm checking that the final result and number of items processed is as expected

      return adder().then(({ entry, result }) => {
        t.is(results.length, entry.key);
        t.is(results.length, ITERATIONS);
      });
    
    

    Alternative and simpler approach

    That polling example, had a recursive approach, where a promise resolution caused a new queue addition. Personaly I prefer this approach, but is a litle difficult to get your head around. Another (simpler) method could be to use the finish event.

      const adder = ({ entry, result } = { entry: { key: 0 } }) =>
        q.add(() => action(), { key: entry.key + 1 })
    
    

    And use the finish event to call it again

      q.on('finish', (({entry, result})=> {
        // optionally check here for whether it should finish
        adder ({entry, result})
      })
    
    

    and just start it like this

      adder()
    

    Handling duplicates from pubsub

    Pubsub is a great way to orchestrate your services, but often you'll get duplicates. Say you get a message to process something - you won't want to ack that message (and therefore prevent it sending reminders) until you've successfully processed it. On the other hand you don't want to run it again if you have it queued or if you've already run it. Of course this wont work if you have multiple instances of your service, but let's stick to the simple case for now.

    If you provide a key (perhaps derived from a hash of the parameters to your service) when you add it to Qottle, and enable skipDuplicates, qottle will not add to the queue but resolve (or reject if you have errorOnDuplicates set) addition requests if the same key is already queued or active. If you have the sticky option enabled, it will also check all finished items for duplicates too.

    Here's an simulation, using 2 queues - one playing the pub role, and another the sub role.

    Initialize a queue to simulate sending messages from a pub service. Don't start it right away, as we want to first populate it and get the sub queue ready to go.

      const pub = new Qottle({
        immediate: false,
        concurrent: 8,
        name: 'pub'
      });
    

    Populate it with a bunch of messages to be sent at random times, and randomly provoking some duplicate keys amongst them.

      const ps = Promise.all(
        Array.from(new Array(20)).map((f, i, a) =>
          pub.add(
            () => {
              return pub.timer(Math.floor(1000 * Math.random()));
            },
            // cause some duplicates to happen
            { key: Math.floor(a.length * Math.random()) }
          )
        )
      );
    

    Now create a subscription queue and start it. We'll use sticky to skip anything we've ever seen before.

      const q = new Qottle({
        skipDuplicates: true,
        sticky: true,
        name: 'sub'
      });
    

    In this sim, the subscriber will just wait a random amount of time- this is where you'd handle the service request in your live subscription

      const dealWithSub = ({ entry }) => q.timer(Math.floor(2000 * Math.random()))
    

    The finish request on the simulated pub queue would be analagous to the message.on event when using a real pubsub implementation. It'll trigger when each of the queued items is published, and add a task to the subscription queue, which will then check for duplicates and execute.

      pub.on("finish", ({ entry }) => { 
        q.add(dealWithSub, {
          key: entry.key,
        }).then(({ entry, result, error }) => { 
          if (entry.skipped) {
            // .. the entry was not processed as it was a duplicate
          } else { 
            // .. the entry was processed and the result passed here
          }
          return result
        }).catch (({entry, error})=> {
          // handle the error
        })
      })
    

    Finally we can start the pub queue - this will provoke entries in the sub queue

      pub.startQueue()
    

    A real life example would be very simply structured something like this, and could of course contain all the usual ratelimiting etc as required.

      const q = new Qottle({
        skipDuplicates: true,
        sticky: true,
        name: 'sub'
      });
    
      message.on (msg=>){
        const decodedMessage = somehow(msg)
        q.add(()=>doTheThing(decodedMessage), {key: decodedMessage.hash})
        .then (({entry})=> entry.skipped  ? msg.ack() : null)
        .catch((error)=> {
          msg.nack())
        })
      })
    

    Install

    npm i qottle

    DownloadsWeekly Downloads

    0

    Version

    1.0.5

    License

    MIT

    Unpacked Size

    57.4 kB

    Total Files

    6

    Last publish

    Collaborators

    • brucemcpherson