qottle

1.0.5 • Public • Published

qottle

A queue that supports

  • promises/ async await
  • concurrency control
  • rate limiting and delay
  • priorities

These examples all use promise syntax, but obviously you can substitute async/await if you prefer.

More details on qottle can be found at https://ramblings.mcpher.com/gassnippets2/qottle/

Installation

yarn add qottle

Usage

const Qottle = require('qottle');

You can add actions that need to be performed, and the queue concurrency will allow a number of them to be run concurrently. For example, single threaded queue might be initialized like this

const q = new Qottle({
  concurrent: 1
})

And add items to it like this

queue.add (action, options)

For example - with concurrent set to 1, these items will be run 1 after another

q.add (()=> doSomething())
q.add (()=> doSomethingElse())

Each entry is resolved as a promise, whether or not the original action was a promise -

q.add (()=> console.log('Im running')).then(()=> console.log('ive said im starting'))
q.add (()=> doAnAsyncThing()).then(({result}) => console.log('the result was', result))
q.add (()=> console.log('its all over'))

Or perhaps

Promise.all([
  q.add (()=> console.log('Im running')).then(()=> console.log('ive said im starting'))
  q.add (()=> doAnAsyncThing())
  q.add (()=> console.log('its all over'))
]).then([,pack]=> {
  console.log('the result was', pack.result)
})

Skipping duplicates

If you are running something like Pub/sub you often get requests to do something you already know you have to do, are not ready to ack them, but you don't want to add them to the queue. By providing a key for each entry, usually a digest of some kind of parameters you can selectively add things to the queue only if you dont already know about them.

const q = new Qottle({
  skipDuplcates: true
})

then add entries to the queue with a key

q.add (() => onlyDoOnce ({
  key: someId
}))

qottle will skip any add requests with duplicate keys. Normally a duplicate key only applies to items that are either active or in the queue - not completed items. You can set the option 'sticky' to mean you want qottle to keep a record of all keys it has processed in this instance.

Rate limiting

qottle can help avoid rate limit problems with APIs by applying various rate limit breaking avoidance techniques, such as limiting the number of calls over a given period. See the options section for how this works

Options

Most options can be applied when the queue is initialized, then individually overriden when an entry is added to the queue.

option default purpose
concurrent Infinity How many items can be run at once
skipDuplicates false Enables duplicate skipping where items with the same key are not added to the queue more than once
sticky false whether to keep a record for skipping duplcates of finished items as well as active or queued items
immediate true whether to start the queue whenever something is in it, or to wait for it to be explicity started
priority 100 the order to do things in. Lower values happen before higher values. Where priorities are the same, the order of insertion applies
log false whether to log console info on starting and finishing items
rateLimited false whether rate limiting management is required
rateLimitPeriod 60000 how long to measure rate limiting over
rateLimitDelay 0 how long to wait between starting each concurrent item
rateLimitMax 1 how many items to allow to be outstading at once - this is an additional constraint to the concurrent value
rateLimitMinWait 100 if a delay is required, qottle will calculate how much time is left in the rateLimitPeriod and wait that long before attempting to run. This is the minimum period to wait before trying again. Can be useful where the rate limited API time is slighty out of sync with your client
catchError false normally a run error will be returned to the add function for you to catch. If catchError is set to true, then qottle will catch the error and pass it to the .then() of add(). See later for examples
errorOnDuplicate false When adding to a queue,iIf skipDuplicates is enabled and a dup is detected the entry will resolve, with entry.skipped set to true. If you'd rather it treated a duplicate as an error set errorOnDuplicate to trues
name qottle Can be useful if you have multiple queues and logging enabled - as the log includes the queue name

Events

In addition to the promise resolutions, events can also be triggered. For example

  q.on("finish", ({entry}) => {
    console.log(`${entry.key} is finished and ran for ${entry.runTime}ms`)
  });
eventName triggered on
empty queue is empty
error there's been an error for an item
finish an item has finished
skip an item has been skipped as it had a duplicate key
start an item has started
startqueue the queue has started
stopqueue the queue has been stopped - stopped queues still accept additions
ratewait an entry is waiting for an opportunity to run but cant as it would violate a rate limit rule
add an entry is added

Event payload

The payloads returned for each event can vary on the type but they are some or all of the following properties

property content
entry details of the execution and options for an item
error details of an error
waitTime how long a ratelimit constraint will wait before trying again
result the final result returned from the action

Entry object

The entry object contains all the options applied plus various other info. It is passed as an argument to every action in the queue - for example

q.add (({entry}) => {
  console.log('im executing something for entry', entry.id)
})

It also arrives as an argument to most events

q.on('start', ({entry}) => {
  console.log('entry just started', entry.id)
})

and as part of the completed result of a queue item

q.add (({entry}) => {
  console.log('im executing something for entry', entry.id)
}).then(({entry, result})=> {
  console.log('entry', entry.id,'gave me this result', result)
})

or

q.on('finished', ({entry, result}) => {
  console.log('entry', entry.id,'gave me this result', result)
})

Most methods and events return an Entry object that looks like this.

property content
...options all the options mentioned earlier
status 'finish', 'error', 'queued', 'active' , 'skipped'
queuedAt timestamp when first added
startedAt timestamp when started to run
finishedAt timestamp when finished run
elapsed ms from time queued to time finished
runTime ms it spent actually running
id a unique id
waitTime total ms it spent waiting to run because of a ratelimit constraint
waitStartedAt if forced to wait because of a rate limit constraint this is when it started waiting
waitFinishedAt if forced to wait because of a rate limit constraint this is when it finished waiting and started running
waitUntil if entry is in process of waiting, this is when it will try again
attempts how many times it tried to start
action the function it ran
skipped whether the entry was skipped. Skipped items resolve successfully, but don't run and have this property set to true
error the error if one was thrown. Most useful with catchError: true
context you can pass this as an argument when adding to the queue and pick it up later- ignored by qottle

methods

The are no 'property gets'. All are methods. Where the return value is 'self', you can chain methods.

property content returns
add (action : function , options : object) add to the queue { result: any, error: Error, entry: Entry}
stopQueue() stop the queue running anything else self
startQueue() start the queue - items can be added to the queue whether it's started or not self
isStarted() check if the queue is started boolean
clear() clear any unstarted queued items self
clearSticky() clear all items from the sticky history self
clearRateLimitHistory() clear rate limit history to avoid any outstanding constraint self
remove(entry: Entry) remove an entry from the queued items - pass the entry object over entry
clearListeners(eventName: string} clear all the listeners for a given event name. 'all' as the eventName will clear all listeners self
on(eventName: string, listener: function) add a listener to be executed when a given eventName triggers Listener
off(listener: Listener) pass over the Listener returned from .on to remove a listener Listener

Rate limiting

A key capability of this queue is to deal with rate limiting. A queue can be set up to throttle calls - often to deal with APIS with rate limits. This is over and above the constraint of 'concurrent' which manages how many queue items can be executed at the same time.

Let's say you have an API that allows 10 calls per minute, and you don't mind if the they all run simultaneously.

const q = new qottle({
  rateLimitPeriod: 10 * 60 * 100,
  rateLimitMax: 10
})

Then you can simply add requests to the queue and the queue will submit them according to that rule.

Promise.all ([
  q.add (()=>getSome())
  q.add (()=>getSomeMore())
]).then (results=> {
  console.log('all the results', results)
})

Another API constraint might be the time between individual calls limited to some value like 20 seconds, and only 1 request being processed at a time.

const q = new qottle({
  rateLimitPeriod: 10 * 60 * 100,
  rateLimitMax: 10,
  concurrent: 1,
  rateLimitDelay: 20 * 1000
})

Then you can simply add requests to the queue and the queue will submit them according to that rule.

Promise.all ([
  q.add (()=>getSome())
  q.add (()=>getSomeMore())
]).then (results=> {
  console.log('all the results', results)
})

Error handling

By default you'll deal with errors like this (all queue items are converted to async)

q.add (()=>something())
.then (({result, entry})=>{ ... the result ... })
.catch(({error, entry})=> { ... the error ...})

However, you can ask qottle to catch thrown errors.

const q = new qottle({
  catchError: true
})

Then any errors will be resolved (rather than rejected)

q.add (()=>something())
.then (({result, entry, error})=>{ ... the result ... or check for error })

Error event

Irrespective of the catchError options, q.on('error', ...) will always fire on an error, and q.on('finish',...) will only trigger on a successful finish.

q.on('error', ({entry,error})=> {
  ... will always trigger on an error
})
q.on('finish', ({entry,result})=> {
  ... will not trigger on an error
})

Examples

See the test.js for many examples

Recipes

Here's a couple of more complicated but useful examples

Polling

You can use qottle to manage endless, or constrained polling. In this scenario, we want to poll an aynch API a maximum of 100 times, but no more than 5 times every 10 seconds, and only 1 call at a time.

set up the queue

  const q = new Qottle({
    concurrent: 1,
    rateLimited: true,
    rateLimitPeriod: 10 * 1000,
    rateLimitMax: 5
  });

the number of iterations (or Infinite for ever)

  const ITERATIONS = 100

This is where you'd make the async api call - For simulation as here, qottle has a handy timer you can use for timeouts as promises which just waits for a while then returns how long it waited. You could handle the results, or errors here, or use the qottle finish and error events.

  const action = () => q.timer(Math.floor(Math.random() * 2000));

handle the results of each poll - you could do this on item resolution of q.add, or by using the finish event - as here, where polling results are just being added to an array

  const results = [];
  q.on("finish", ({ entry, result }) => {
    results.push({
      entry,
      result,
    });
  });

create a recursive function for adding stuff to the queue - this one should work for most situations. It will finally resolve when the number of iterations are reached.

  const adder = ({ entry, result } = { entry: { key: 0 } }) =>
    q
      .add(() => action(), { key: entry.key + 1 })
      .then(({ entry, result }) =>
        entry.key < ITERATIONS
          ? adder({ entry, result })
          : Promise.resolve({ entry, result })
      );

kick it off - for testing, at the end, I'm checking that the final result and number of items processed is as expected

  return adder().then(({ entry, result }) => {
    t.is(results.length, entry.key);
    t.is(results.length, ITERATIONS);
  });

Alternative and simpler approach

That polling example, had a recursive approach, where a promise resolution caused a new queue addition. Personaly I prefer this approach, but is a litle difficult to get your head around. Another (simpler) method could be to use the finish event.

  const adder = ({ entry, result } = { entry: { key: 0 } }) =>
    q.add(() => action(), { key: entry.key + 1 })

And use the finish event to call it again

  q.on('finish', (({entry, result})=> {
    // optionally check here for whether it should finish
    adder ({entry, result})
  })

and just start it like this

  adder()

Handling duplicates from pubsub

Pubsub is a great way to orchestrate your services, but often you'll get duplicates. Say you get a message to process something - you won't want to ack that message (and therefore prevent it sending reminders) until you've successfully processed it. On the other hand you don't want to run it again if you have it queued or if you've already run it. Of course this wont work if you have multiple instances of your service, but let's stick to the simple case for now.

If you provide a key (perhaps derived from a hash of the parameters to your service) when you add it to Qottle, and enable skipDuplicates, qottle will not add to the queue but resolve (or reject if you have errorOnDuplicates set) addition requests if the same key is already queued or active. If you have the sticky option enabled, it will also check all finished items for duplicates too.

Here's an simulation, using 2 queues - one playing the pub role, and another the sub role.

Initialize a queue to simulate sending messages from a pub service. Don't start it right away, as we want to first populate it and get the sub queue ready to go.

  const pub = new Qottle({
    immediate: false,
    concurrent: 8,
    name: 'pub'
  });

Populate it with a bunch of messages to be sent at random times, and randomly provoking some duplicate keys amongst them.

  const ps = Promise.all(
    Array.from(new Array(20)).map((f, i, a) =>
      pub.add(
        () => {
          return pub.timer(Math.floor(1000 * Math.random()));
        },
        // cause some duplicates to happen
        { key: Math.floor(a.length * Math.random()) }
      )
    )
  );

Now create a subscription queue and start it. We'll use sticky to skip anything we've ever seen before.

  const q = new Qottle({
    skipDuplicates: true,
    sticky: true,
    name: 'sub'
  });

In this sim, the subscriber will just wait a random amount of time- this is where you'd handle the service request in your live subscription

  const dealWithSub = ({ entry }) => q.timer(Math.floor(2000 * Math.random()))

The finish request on the simulated pub queue would be analagous to the message.on event when using a real pubsub implementation. It'll trigger when each of the queued items is published, and add a task to the subscription queue, which will then check for duplicates and execute.

  pub.on("finish", ({ entry }) => { 
    q.add(dealWithSub, {
      key: entry.key,
    }).then(({ entry, result, error }) => { 
      if (entry.skipped) {
        // .. the entry was not processed as it was a duplicate
      } else { 
        // .. the entry was processed and the result passed here
      }
      return result
    }).catch (({entry, error})=> {
      // handle the error
    })
  })

Finally we can start the pub queue - this will provoke entries in the sub queue

  pub.startQueue()

A real life example would be very simply structured something like this, and could of course contain all the usual ratelimiting etc as required.

  const q = new Qottle({
    skipDuplicates: true,
    sticky: true,
    name: 'sub'
  });

  message.on (msg=>){
    const decodedMessage = somehow(msg)
    q.add(()=>doTheThing(decodedMessage), {key: decodedMessage.hash})
    .then (({entry})=> entry.skipped  ? msg.ack() : null)
    .catch((error)=> {
      msg.nack())
    })
  })

Versions

Current Tags

  • Version
    Downloads (Last 7 Days)
    • Tag
  • 1.0.5
    3
    • latest

Version History

Package Sidebar

Install

npm i qottle

Weekly Downloads

8

Version

1.0.5

License

MIT

Unpacked Size

57.4 kB

Total Files

6

Last publish

Collaborators

  • brucemcpherson